This commit is contained in:
zaqxs123456 2024-10-03 17:37:37 +08:00
parent e7b688d71d
commit ed962a5f89
4 changed files with 247 additions and 125 deletions

1
.gitignore vendored
View File

@ -165,3 +165,4 @@ FencersKeyPoints/*
# output folder # output folder
output/* output/*
fixed/*

View File

@ -1,100 +1,22 @@
import json import json
import os
import random
import numpy as np import numpy as np
from typing import List from typing import List
import math import math
import cv2 import cv2
import skeleton_lib as skel
import process_json_file as pjf
import sys
sys.path.append('./')
coco_limbSeq = [ def is_normalized(keypoints: List[skel.Keypoint]) -> bool:
[2, 3], [2, 6], [3, 4], [4, 5],
[6, 7], [7, 8], [2, 9], [9, 10],
[10, 11], [2, 12], [12, 13], [13, 14],
[2, 1], [1, 15], [15, 17], [1, 16],
[16, 18],
]
coco_colors = [
[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],
[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],
[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]
]
body_25_limbSeq = [
[1, 8], [1, 2], [1, 5], [2, 3],
[3, 4], [5, 6], [6, 7], [8, 9],
[9, 10], [10, 11], [8, 12], [12, 13],
[13, 14], [1, 0], [0, 15], [15, 17],
[0, 16], [16, 18], [14, 19], [19, 20],
[14, 21], [11, 22], [22, 23], [11, 24]
]
body_25_colors = [
[255, 0, 85], [255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0],
[0, 255, 0], [255, 0, 0], [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255],
[0, 0, 255], [255, 0, 170], [170, 0, 255], [255, 0, 255], [85, 0, 255], [0, 0, 255], [0, 0, 255],
[0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255]
]
body_25B_limbSeq = [
[0, 1], [0, 2], [1, 3], [2, 4], [5, 7], [6, 8], [7, 9], [8, 10],
[5, 11], [6, 12], [11, 13], [12, 14], [13, 15], [14, 16], [15, 19],
[19, 20], [15, 21], [16, 22], [22, 23], [16, 24], [5, 17], [6, 17],
[17, 18], [11, 12]
]
body_25B_colors = [
[255, 0, 85], [170, 0, 255], [255, 0, 170], [85, 0, 255], [255, 0, 255],
[170, 255, 0], [255, 85, 0], [85, 255, 0], [255, 170, 0], [0, 255, 0],
[255, 255, 0], [0, 170, 255], [0, 255, 85], [0, 85, 255], [0, 255, 170],
[0, 0, 255], [0, 255, 255], [255, 0, 0], [255, 0, 0], [0, 0, 255],
[0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255]
]
class Keypoint:
def __init__(self, x: float, y: float, confidence: float = 1.0):
"""
Initialize a Keypoint object.
Args:
x (float): The x-coordinate of the keypoint.
y (float): The y-coordinate of the keypoint.
confidence (float): The confidence score of the keypoint. Default is 1.0.
"""
self.x = x
self.y = y
self.confidence = confidence
def __repr__(self):
return f"Keypoint(x={self.x}, y={self.y}, confidence={self.confidence})"
class Skeleton:
def __init__(self, keypoints: List[Keypoint]):
self.keypoints = keypoints
def __repr__(self):
return f"Skeleton(keypoints={self.keypoints})"
class Skeleton_Seqence:
def __init__(self, skeletons: List[Skeleton]):
self.skeletons = skeletons
def __repr__(self):
return f"Skeleton_Seqence(Skeleton_frames={self.skeletons})"
def get_frame(self, frame_index: int) -> Skeleton:
return self.skeletons[frame_index]
def add_frame(self, skeleton: Skeleton):
self.skeletons.append(skeleton)
def get_time_slice_for_Skeleton_Seqences(skeleton_seqences: List[Skeleton_Seqence], frame_index: int) -> List[Skeleton]:
return [skeleton_seq.get_frame(frame_index) for skeleton_seq in skeleton_seqences]
def is_normalized(keypoints: List[Keypoint]) -> bool:
for keypoint in keypoints: for keypoint in keypoints:
if not (0 <= keypoint.x <= 1 and 0 <= keypoint.y <= 1): if not (0 <= keypoint.x <= 1 and 0 <= keypoint.y <= 1):
return False return False
return True return True
def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors, xinsr_stick_scaling: bool = False) -> np.ndarray: def draw_bodypose(canvas: np.ndarray, keypoints: List[skel.Keypoint], limbSeq, colors, xinsr_stick_scaling: bool = False) -> np.ndarray:
""" """
Draw keypoints and limbs representing body pose on a given canvas. Draw keypoints and limbs representing body pose on a given canvas.
@ -115,7 +37,7 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors
H, W, _ = canvas.shape H, W, _ = canvas.shape
CH, CW, _ = canvas.shape CH, CW, _ = canvas.shape
stickwidth = 4 stickwidth = 2
# Ref: https://huggingface.co/xinsir/controlnet-openpose-sdxl-1.0 # Ref: https://huggingface.co/xinsir/controlnet-openpose-sdxl-1.0
max_side = max(CW, CH) max_side = max(CW, CH)
@ -124,11 +46,18 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors
else : else :
stick_scale = 1 stick_scale = 1
for (k1_index, k2_index), color in zip(limbSeq, colors): if keypoints is None or len(keypoints) == 0:
keypoint1 = keypoints[k1_index - 1] return canvas
keypoint2 = keypoints[k2_index - 1]
if keypoint1 is None or keypoint2 is None: for (k1_index, k2_index), color in zip(limbSeq, colors):
keypoint1 = keypoints[k1_index]
keypoint2 = keypoints[k2_index]
if keypoint1 is None or keypoint2 is None or keypoint1.confidence == 0 or keypoint2.confidence == 0:
# if keypoint1 is None or keypoint1.confidence == 0:
# print(f"keypoint failed: {k1_index}")
# if keypoint2 is None or keypoint2.confidence == 0:
# print(f"keypoint failed: {k2_index}")
continue continue
Y = np.array([keypoint1.x, keypoint2.x]) * float(W) Y = np.array([keypoint1.x, keypoint2.x]) * float(W)
@ -141,7 +70,7 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors
cv2.fillConvexPoly(canvas, polygon, [int(float(c) * 0.6) for c in color]) cv2.fillConvexPoly(canvas, polygon, [int(float(c) * 0.6) for c in color])
for keypoint, color in zip(keypoints, colors): for keypoint, color in zip(keypoints, colors):
if keypoint is None: if keypoint is None or keypoint.confidence == 0:
continue continue
x, y = keypoint.x, keypoint.y x, y = keypoint.x, keypoint.y
@ -151,15 +80,8 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors
return canvas return canvas
def json_to_keypoints_openpose(json_file: str) -> List[Keypoint]: def coordinates_to_keypoints(coordinates: list) -> List[skel.Keypoint]:
with open(json_file, 'r') as file: keypoints = [skel.Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)]
data = json.load(file)
keypoints = data[0]['people'][0]['pose_keypoints_2d']
keypoints = [Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)]
return keypoints
def coordinates_to_keypoints(coordinates: list) -> List[Keypoint]:
keypoints = [Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)]
return keypoints return keypoints
def save_bodypose(width: int, height: int, coordinates: list): def save_bodypose(width: int, height: int, coordinates: list):
@ -168,36 +90,36 @@ def save_bodypose(width: int, height: int, coordinates: list):
canvas = np.zeros((height, width, 3), dtype=np.uint8) canvas = np.zeros((height, width, 3), dtype=np.uint8)
keypoints = coordinates_to_keypoints(coordinates) keypoints = coordinates_to_keypoints(coordinates)
canvas = draw_bodypose(canvas, keypoints, coco_limbSeq, coco_colors) canvas = draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors)
# Save as body_pose_output0000.png, body_pose_output0001.png, ... # Save as body_pose_output0000.png, body_pose_output0001.png, ...
cv2.imwrite('body_pose_output%04d.png' % save_bodypose.counter, canvas) cv2.imwrite('body_pose_output%04d.png' % save_bodypose.counter, canvas)
save_bodypose.counter += 1 # Increment the counter save_bodypose.counter += 1 # Increment the counter
def array_json_to_Skeleton_Seqences(json_file: str) -> List[Skeleton_Seqence]:
with open(json_file, 'r') as file:
data = json.load(file)
skeleton_sequences = []
for frame in data:
for i in range(len(frame)):
while len(skeleton_sequences) <= i:
skeleton_sequences.append(None)
skeleton_sequences[i] = Skeleton_Seqence([])
skeleton = Skeleton([Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]])
skeleton_sequences[i].add_frame(skeleton)
return skeleton_sequences
def main(): def main():
json_file = '0001_002_00_01_1.json' directory = './fixed'
image_path = 'test' json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
skeleton_sequences = array_json_to_Skeleton_Seqences(json_file) if not json_files:
for i in range(20): print("No JSON files found in the directory.")
sliced = get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) return
canvas = np.zeros((480, 640, 3), dtype=np.uint8)
for skeleton in sliced: json_file = os.path.join(directory, random.choice(json_files))
# json_file = './test_output.json'
image_path = './output/test'
print(json_file)
skeleton_sequences = pjf.array_json_to_Skeleton_Seqences(json_file)
frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None)
sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)]
for i in range(frame_count):
sliced = sliced_list[i]
canvas = np.zeros((360, 640, 3), dtype=np.uint8)
for j, skeleton in enumerate(sliced):
keypoints = skeleton.keypoints keypoints = skeleton.keypoints
canvas = draw_bodypose(canvas, keypoints, body_25B_limbSeq, body_25B_colors) skeleton_sequences[j].get_frame(i).keypoints = keypoints
canvas = draw_bodypose(canvas, keypoints, skel.body_25_limbSeq, skel.body_25_colors)
cv2.imwrite(image_path + '_' + str(i) + '.png', canvas) cv2.imwrite(image_path + '_' + str(i) + '.png', canvas)

89
process_json_file.py Normal file
View File

@ -0,0 +1,89 @@
import os
import json
import sys
import numpy as np
from typing import List
import skeleton_lib as skel
import concurrent.futures
sys.path.append('./')
def json_to_keypoints_openpose(json_file: str) -> List[skel.Keypoint]:
with open(json_file, 'r') as file:
data = json.load(file)
keypoints = data[0]['people'][0]['pose_keypoints_2d']
keypoints = [skel.Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)]
return keypoints
def array_json_to_Skeleton_Seqences(json_file: str) -> List[skel.Skeleton_Seqence]:
with open(json_file, 'r') as file:
data = json.load(file)
skeleton_sequences = []
for frame in data:
for i in range(len(frame)):
while len(skeleton_sequences) <= i:
skeleton_sequences.append(None)
skeleton_sequences[i] = skel.Skeleton_Seqence([])
skeleton = skel.Skeleton([skel.Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]])
skeleton_sequences[i].add_frame(skeleton)
return skeleton_sequences
def Skeleton_Seqences_save_to_array_json(skeleton_sequences: List[skel.Skeleton_Seqence], json_file: str):
# Ensure the directory exists
os.makedirs(os.path.dirname(json_file), exist_ok=True)
data = []
for i in range(len(skeleton_sequences[0].skeletons_frame)):
sliced = skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i)
sequence_data = []
for skeleton in sliced:
keypoints_data = [[kp.x, kp.y, kp.confidence] for kp in skeleton.keypoints]
sequence_data.append(keypoints_data)
data.append(sequence_data)
with open(json_file, 'w') as file:
json.dump(data, file, indent=4)
def process_json_file(json_file, directory):
json_file = os.path.join(directory, json_file)
# print(json_file)
skeleton_sequences = array_json_to_Skeleton_Seqences(json_file)
frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None)
sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)]
for i in range(frame_count):
last_sliced = sliced_list[i - 1] if i > 0 else None
next_sliced = sliced_list[i + 1] if i < frame_count - 1 else None
sliced = sliced_list[i]
for j, skeleton in enumerate(sliced):
last_keypoints = last_sliced[j].keypoints if last_sliced else None
next_keypoints = next_sliced[j].keypoints if next_sliced else None
keypoints = skeleton.keypoints
keypoints = skel.fix_keypoints(keypoints, last_keypoints, next_keypoints)
skeleton_sequences[j].get_frame(i).keypoints = keypoints
Skeleton_Seqences_save_to_array_json(skeleton_sequences, './fixed/' + os.path.basename(json_file))
def process_json_files_chunk(json_files_chunk, directory):
for json_file in json_files_chunk:
process_json_file(json_file, directory)
def process_json_files_multi_threaded(json_files, directory):
directory = './FencersKeyPoints'
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
if not json_files:
print("No JSON files found in the directory.")
return
json_files_chunks = np.array_split(json_files, 12)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_json_files_chunk, chunk, directory) for chunk in json_files_chunks]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Error processing file chunk: {e}")

110
skeleton_lib.py Normal file
View File

@ -0,0 +1,110 @@
from typing import List
coco_limbSeq = [
[1, 2], [1, 5], [2, 3], [3, 4],
[5, 6], [6, 7], [1, 8], [8, 9],
[9, 10], [1, 11], [11, 12], [12, 13],
[1, 0], [0, 14], [14, 16], [0, 15],
[15, 17],
]
coco_colors = [
[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],
[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],
[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]
]
body_25_limbSeq = [
[1, 8], [1, 2], [1, 5], [2, 3],
[3, 4], [5, 6], [6, 7], [8, 9],
[9, 10], [10, 11], [8, 12], [12, 13],
[13, 14], [1, 0], [0, 15], [15, 17],
[0, 16], [16, 18], [14, 19], [19, 20],
[14, 21], [11, 22], [22, 23], [11, 24]
]
body_25_colors = [
[255, 0, 85], [255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0],
[0, 255, 0], [255, 0, 0], [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255],
[0, 0, 255], [255, 0, 170], [170, 0, 255], [255, 0, 255], [85, 0, 255], [0, 0, 255], [0, 0, 255],
[0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255]
]
body_25B_limbSeq = [
[0, 1], [0, 2], [1, 3], [2, 4], [5, 7], [6, 8], [7, 9], [8, 10],
[5, 11], [6, 12], [11, 13], [12, 14], [13, 15], [14, 16], [15, 19],
[19, 20], [15, 21], [16, 22], [22, 23], [16, 24], [5, 17], [6, 17],
[17, 18], [11, 12]
]
body_25B_colors = [
[255, 0, 85], [170, 0, 255], [255, 0, 170], [85, 0, 255], [255, 0, 255],
[170, 255, 0], [255, 85, 0], [85, 255, 0], [255, 170, 0], [0, 255, 0],
[255, 255, 0], [0, 170, 255], [0, 255, 85], [0, 85, 255], [0, 255, 170],
[0, 0, 255], [0, 255, 255], [255, 0, 0], [255, 0, 0], [0, 0, 255],
[0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255]
]
class Keypoint:
def __init__(self, x: float, y: float, confidence: float = 1.0):
"""
Initialize a Keypoint object.
Args:
x (float): The x-coordinate of the keypoint.
y (float): The y-coordinate of the keypoint.
confidence (float): The confidence score of the keypoint. Default is 1.0.
"""
self.x = x
self.y = y
self.confidence = confidence
def __repr__(self):
return f"Keypoint(x={self.x}, y={self.y}, confidence={self.confidence})"
class Skeleton:
def __init__(self, keypoints: List[Keypoint]):
self.keypoints = keypoints
def __repr__(self):
return f"Skeleton(keypoints={self.keypoints})"
def is_healthy_skeleton(self):
for keypoint in self.keypoints:
if keypoint.confidence == 0.0:
return False
return True
class Skeleton_Seqence:
def __init__(self, skeletons: List[Skeleton]):
self.skeletons_frame = skeletons
def __repr__(self):
return f"Skeleton_Seqence(Skeleton_frames={self.skeletons_frame})"
def get_frame(self, frame_index: int) -> Skeleton:
return self.skeletons_frame[frame_index]
def add_frame(self, skeleton: Skeleton):
self.skeletons_frame.append(skeleton)
def is_healthy_seqence(self):
for skeleton in self.skeletons_frame:
if not skeleton.is_healthy_skeleton():
return False
return True
def fix_keypoints(keypoints, last_keypoints, next_keypoints):
if not keypoints or not last_keypoints or not next_keypoints:
return keypoints
for i, keypoint in enumerate(keypoints):
if keypoint.confidence == 0.0 and last_keypoints and next_keypoints:
last_keypoint = last_keypoints[i]
next_keypoint = next_keypoints[i]
if last_keypoint.confidence > 0 and next_keypoint.confidence > 0:
keypoint.x = (last_keypoint.x + next_keypoint.x) / 2
keypoint.y = (last_keypoint.y + next_keypoint.y) / 2
keypoint.confidence = (last_keypoint.confidence + next_keypoint.confidence) / 2
return keypoints
def get_time_slice_for_Skeleton_Seqences(skeleton_seqences: List[Skeleton_Seqence], frame_index: int) -> List[Skeleton]:
return [skeleton_seq.get_frame(frame_index) for skeleton_seq in skeleton_seqences]