From ed962a5f89a9d48a1bf488c6baae1e0f73e45498 Mon Sep 17 00:00:00 2001 From: zaqxs123456 <18200842@life.hkbu.edu.hk> Date: Thu, 3 Oct 2024 17:37:37 +0800 Subject: [PATCH] update --- .gitignore | 3 +- openpose_gen.py | 170 ++++++++++++------------------------------- process_json_file.py | 89 ++++++++++++++++++++++ skeleton_lib.py | 110 ++++++++++++++++++++++++++++ 4 files changed, 247 insertions(+), 125 deletions(-) create mode 100644 process_json_file.py create mode 100644 skeleton_lib.py diff --git a/.gitignore b/.gitignore index 9ac9695..fe8c47e 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,5 @@ cython_debug/ FencersKeyPoints/* # output folder -output/* \ No newline at end of file +output/* +fixed/* \ No newline at end of file diff --git a/openpose_gen.py b/openpose_gen.py index 8329585..913402f 100644 --- a/openpose_gen.py +++ b/openpose_gen.py @@ -1,100 +1,22 @@ import json +import os +import random import numpy as np from typing import List import math import cv2 +import skeleton_lib as skel +import process_json_file as pjf +import sys +sys.path.append('./') -coco_limbSeq = [ - [2, 3], [2, 6], [3, 4], [4, 5], - [6, 7], [7, 8], [2, 9], [9, 10], - [10, 11], [2, 12], [12, 13], [13, 14], - [2, 1], [1, 15], [15, 17], [1, 16], - [16, 18], -] -coco_colors = [ - [255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0], - [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255], - [170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85] -] - -body_25_limbSeq = [ - [1, 8], [1, 2], [1, 5], [2, 3], - [3, 4], [5, 6], [6, 7], [8, 9], - [9, 10], [10, 11], [8, 12], [12, 13], - [13, 14], [1, 0], [0, 15], [15, 17], - [0, 16], [16, 18], [14, 19], [19, 20], - [14, 21], [11, 22], [22, 23], [11, 24] -] -body_25_colors = [ - [255, 0, 85], [255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], - [0, 255, 0], [255, 0, 0], [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], - [0, 0, 255], [255, 0, 170], [170, 0, 255], [255, 0, 255], [85, 0, 255], [0, 0, 255], [0, 0, 255], - [0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255] -] - -body_25B_limbSeq = [ - [0, 1], [0, 2], [1, 3], [2, 4], [5, 7], [6, 8], [7, 9], [8, 10], - [5, 11], [6, 12], [11, 13], [12, 14], [13, 15], [14, 16], [15, 19], - [19, 20], [15, 21], [16, 22], [22, 23], [16, 24], [5, 17], [6, 17], - [17, 18], [11, 12] -] - -body_25B_colors = [ - [255, 0, 85], [170, 0, 255], [255, 0, 170], [85, 0, 255], [255, 0, 255], - [170, 255, 0], [255, 85, 0], [85, 255, 0], [255, 170, 0], [0, 255, 0], - [255, 255, 0], [0, 170, 255], [0, 255, 85], [0, 85, 255], [0, 255, 170], - [0, 0, 255], [0, 255, 255], [255, 0, 0], [255, 0, 0], [0, 0, 255], - [0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255] -] - -class Keypoint: - def __init__(self, x: float, y: float, confidence: float = 1.0): - """ - Initialize a Keypoint object. - - Args: - x (float): The x-coordinate of the keypoint. - y (float): The y-coordinate of the keypoint. - confidence (float): The confidence score of the keypoint. Default is 1.0. - """ - self.x = x - self.y = y - self.confidence = confidence - - def __repr__(self): - return f"Keypoint(x={self.x}, y={self.y}, confidence={self.confidence})" - -class Skeleton: - def __init__(self, keypoints: List[Keypoint]): - self.keypoints = keypoints - - def __repr__(self): - return f"Skeleton(keypoints={self.keypoints})" - -class Skeleton_Seqence: - def __init__(self, skeletons: List[Skeleton]): - self.skeletons = skeletons - - def __repr__(self): - return f"Skeleton_Seqence(Skeleton_frames={self.skeletons})" - - def get_frame(self, frame_index: int) -> Skeleton: - return self.skeletons[frame_index] - - def add_frame(self, skeleton: Skeleton): - self.skeletons.append(skeleton) - -def get_time_slice_for_Skeleton_Seqences(skeleton_seqences: List[Skeleton_Seqence], frame_index: int) -> List[Skeleton]: - return [skeleton_seq.get_frame(frame_index) for skeleton_seq in skeleton_seqences] - - -def is_normalized(keypoints: List[Keypoint]) -> bool: +def is_normalized(keypoints: List[skel.Keypoint]) -> bool: for keypoint in keypoints: if not (0 <= keypoint.x <= 1 and 0 <= keypoint.y <= 1): return False return True -def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors, xinsr_stick_scaling: bool = False) -> np.ndarray: +def draw_bodypose(canvas: np.ndarray, keypoints: List[skel.Keypoint], limbSeq, colors, xinsr_stick_scaling: bool = False) -> np.ndarray: """ Draw keypoints and limbs representing body pose on a given canvas. @@ -115,20 +37,27 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors H, W, _ = canvas.shape CH, CW, _ = canvas.shape - stickwidth = 4 + stickwidth = 2 # Ref: https://huggingface.co/xinsir/controlnet-openpose-sdxl-1.0 max_side = max(CW, CH) if xinsr_stick_scaling: stick_scale = 1 if max_side < 500 else min(2 + (max_side // 1000), 7) - else: + else : stick_scale = 1 + + if keypoints is None or len(keypoints) == 0: + return canvas for (k1_index, k2_index), color in zip(limbSeq, colors): - keypoint1 = keypoints[k1_index - 1] - keypoint2 = keypoints[k2_index - 1] + keypoint1 = keypoints[k1_index] + keypoint2 = keypoints[k2_index] - if keypoint1 is None or keypoint2 is None: + if keypoint1 is None or keypoint2 is None or keypoint1.confidence == 0 or keypoint2.confidence == 0: + # if keypoint1 is None or keypoint1.confidence == 0: + # print(f"keypoint failed: {k1_index}") + # if keypoint2 is None or keypoint2.confidence == 0: + # print(f"keypoint failed: {k2_index}") continue Y = np.array([keypoint1.x, keypoint2.x]) * float(W) @@ -141,7 +70,7 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors cv2.fillConvexPoly(canvas, polygon, [int(float(c) * 0.6) for c in color]) for keypoint, color in zip(keypoints, colors): - if keypoint is None: + if keypoint is None or keypoint.confidence == 0: continue x, y = keypoint.x, keypoint.y @@ -151,15 +80,8 @@ def draw_bodypose(canvas: np.ndarray, keypoints: List[Keypoint], limbSeq, colors return canvas -def json_to_keypoints_openpose(json_file: str) -> List[Keypoint]: - with open(json_file, 'r') as file: - data = json.load(file) - keypoints = data[0]['people'][0]['pose_keypoints_2d'] - keypoints = [Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)] - return keypoints - -def coordinates_to_keypoints(coordinates: list) -> List[Keypoint]: - keypoints = [Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)] +def coordinates_to_keypoints(coordinates: list) -> List[skel.Keypoint]: + keypoints = [skel.Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)] return keypoints def save_bodypose(width: int, height: int, coordinates: list): @@ -168,36 +90,36 @@ def save_bodypose(width: int, height: int, coordinates: list): canvas = np.zeros((height, width, 3), dtype=np.uint8) keypoints = coordinates_to_keypoints(coordinates) - canvas = draw_bodypose(canvas, keypoints, coco_limbSeq, coco_colors) + canvas = draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors) # Save as body_pose_output0000.png, body_pose_output0001.png, ... cv2.imwrite('body_pose_output%04d.png' % save_bodypose.counter, canvas) save_bodypose.counter += 1 # Increment the counter -def array_json_to_Skeleton_Seqences(json_file: str) -> List[Skeleton_Seqence]: - with open(json_file, 'r') as file: - data = json.load(file) - - skeleton_sequences = [] - for frame in data: - for i in range(len(frame)): - while len(skeleton_sequences) <= i: - skeleton_sequences.append(None) - skeleton_sequences[i] = Skeleton_Seqence([]) - skeleton = Skeleton([Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]]) - skeleton_sequences[i].add_frame(skeleton) - return skeleton_sequences - def main(): - json_file = '0001_002_00_01_1.json' - image_path = 'test' - skeleton_sequences = array_json_to_Skeleton_Seqences(json_file) - for i in range(20): - sliced = get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) - canvas = np.zeros((480, 640, 3), dtype=np.uint8) - for skeleton in sliced: + directory = './fixed' + json_files = [f for f in os.listdir(directory) if f.endswith('.json')] + if not json_files: + print("No JSON files found in the directory.") + return + + json_file = os.path.join(directory, random.choice(json_files)) + # json_file = './test_output.json' + image_path = './output/test' + print(json_file) + + skeleton_sequences = pjf.array_json_to_Skeleton_Seqences(json_file) + frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None) + sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)] + + for i in range(frame_count): + sliced = sliced_list[i] + canvas = np.zeros((360, 640, 3), dtype=np.uint8) + + for j, skeleton in enumerate(sliced): keypoints = skeleton.keypoints - canvas = draw_bodypose(canvas, keypoints, body_25B_limbSeq, body_25B_colors) + skeleton_sequences[j].get_frame(i).keypoints = keypoints + canvas = draw_bodypose(canvas, keypoints, skel.body_25_limbSeq, skel.body_25_colors) cv2.imwrite(image_path + '_' + str(i) + '.png', canvas) diff --git a/process_json_file.py b/process_json_file.py new file mode 100644 index 0000000..7313d78 --- /dev/null +++ b/process_json_file.py @@ -0,0 +1,89 @@ +import os +import json +import sys +import numpy as np +from typing import List +import skeleton_lib as skel +import concurrent.futures +sys.path.append('./') + +def json_to_keypoints_openpose(json_file: str) -> List[skel.Keypoint]: + with open(json_file, 'r') as file: + data = json.load(file) + keypoints = data[0]['people'][0]['pose_keypoints_2d'] + keypoints = [skel.Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)] + return keypoints + +def array_json_to_Skeleton_Seqences(json_file: str) -> List[skel.Skeleton_Seqence]: + with open(json_file, 'r') as file: + data = json.load(file) + + skeleton_sequences = [] + for frame in data: + for i in range(len(frame)): + while len(skeleton_sequences) <= i: + skeleton_sequences.append(None) + skeleton_sequences[i] = skel.Skeleton_Seqence([]) + skeleton = skel.Skeleton([skel.Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]]) + skeleton_sequences[i].add_frame(skeleton) + return skeleton_sequences + +def Skeleton_Seqences_save_to_array_json(skeleton_sequences: List[skel.Skeleton_Seqence], json_file: str): + # Ensure the directory exists + os.makedirs(os.path.dirname(json_file), exist_ok=True) + + data = [] + + for i in range(len(skeleton_sequences[0].skeletons_frame)): + sliced = skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) + sequence_data = [] + for skeleton in sliced: + keypoints_data = [[kp.x, kp.y, kp.confidence] for kp in skeleton.keypoints] + sequence_data.append(keypoints_data) + data.append(sequence_data) + + with open(json_file, 'w') as file: + json.dump(data, file, indent=4) + +def process_json_file(json_file, directory): + json_file = os.path.join(directory, json_file) + # print(json_file) + + skeleton_sequences = array_json_to_Skeleton_Seqences(json_file) + frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None) + sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)] + + for i in range(frame_count): + last_sliced = sliced_list[i - 1] if i > 0 else None + next_sliced = sliced_list[i + 1] if i < frame_count - 1 else None + sliced = sliced_list[i] + + for j, skeleton in enumerate(sliced): + last_keypoints = last_sliced[j].keypoints if last_sliced else None + next_keypoints = next_sliced[j].keypoints if next_sliced else None + keypoints = skeleton.keypoints + keypoints = skel.fix_keypoints(keypoints, last_keypoints, next_keypoints) + skeleton_sequences[j].get_frame(i).keypoints = keypoints + + Skeleton_Seqences_save_to_array_json(skeleton_sequences, './fixed/' + os.path.basename(json_file)) + +def process_json_files_chunk(json_files_chunk, directory): + for json_file in json_files_chunk: + process_json_file(json_file, directory) + +def process_json_files_multi_threaded(json_files, directory): + directory = './FencersKeyPoints' + json_files = [f for f in os.listdir(directory) if f.endswith('.json')] + if not json_files: + print("No JSON files found in the directory.") + return + + json_files_chunks = np.array_split(json_files, 12) + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(process_json_files_chunk, chunk, directory) for chunk in json_files_chunks] + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + print(f"Error processing file chunk: {e}") \ No newline at end of file diff --git a/skeleton_lib.py b/skeleton_lib.py new file mode 100644 index 0000000..6fb462c --- /dev/null +++ b/skeleton_lib.py @@ -0,0 +1,110 @@ +from typing import List + +coco_limbSeq = [ + [1, 2], [1, 5], [2, 3], [3, 4], + [5, 6], [6, 7], [1, 8], [8, 9], + [9, 10], [1, 11], [11, 12], [12, 13], + [1, 0], [0, 14], [14, 16], [0, 15], + [15, 17], +] + +coco_colors = [ + [255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0], + [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255], + [170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85] +] + +body_25_limbSeq = [ + [1, 8], [1, 2], [1, 5], [2, 3], + [3, 4], [5, 6], [6, 7], [8, 9], + [9, 10], [10, 11], [8, 12], [12, 13], + [13, 14], [1, 0], [0, 15], [15, 17], + [0, 16], [16, 18], [14, 19], [19, 20], + [14, 21], [11, 22], [22, 23], [11, 24] +] +body_25_colors = [ + [255, 0, 85], [255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], + [0, 255, 0], [255, 0, 0], [0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], + [0, 0, 255], [255, 0, 170], [170, 0, 255], [255, 0, 255], [85, 0, 255], [0, 0, 255], [0, 0, 255], + [0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255] +] + +body_25B_limbSeq = [ + [0, 1], [0, 2], [1, 3], [2, 4], [5, 7], [6, 8], [7, 9], [8, 10], + [5, 11], [6, 12], [11, 13], [12, 14], [13, 15], [14, 16], [15, 19], + [19, 20], [15, 21], [16, 22], [22, 23], [16, 24], [5, 17], [6, 17], + [17, 18], [11, 12] +] + +body_25B_colors = [ + [255, 0, 85], [170, 0, 255], [255, 0, 170], [85, 0, 255], [255, 0, 255], + [170, 255, 0], [255, 85, 0], [85, 255, 0], [255, 170, 0], [0, 255, 0], + [255, 255, 0], [0, 170, 255], [0, 255, 85], [0, 85, 255], [0, 255, 170], + [0, 0, 255], [0, 255, 255], [255, 0, 0], [255, 0, 0], [0, 0, 255], + [0, 0, 255], [0, 0, 255], [0, 255, 255], [0, 255, 255], [0, 255, 255] +] + +class Keypoint: + def __init__(self, x: float, y: float, confidence: float = 1.0): + """ + Initialize a Keypoint object. + + Args: + x (float): The x-coordinate of the keypoint. + y (float): The y-coordinate of the keypoint. + confidence (float): The confidence score of the keypoint. Default is 1.0. + """ + self.x = x + self.y = y + self.confidence = confidence + + def __repr__(self): + return f"Keypoint(x={self.x}, y={self.y}, confidence={self.confidence})" + +class Skeleton: + def __init__(self, keypoints: List[Keypoint]): + self.keypoints = keypoints + + def __repr__(self): + return f"Skeleton(keypoints={self.keypoints})" + + def is_healthy_skeleton(self): + for keypoint in self.keypoints: + if keypoint.confidence == 0.0: + return False + return True + +class Skeleton_Seqence: + def __init__(self, skeletons: List[Skeleton]): + self.skeletons_frame = skeletons + + def __repr__(self): + return f"Skeleton_Seqence(Skeleton_frames={self.skeletons_frame})" + + def get_frame(self, frame_index: int) -> Skeleton: + return self.skeletons_frame[frame_index] + + def add_frame(self, skeleton: Skeleton): + self.skeletons_frame.append(skeleton) + + def is_healthy_seqence(self): + for skeleton in self.skeletons_frame: + if not skeleton.is_healthy_skeleton(): + return False + return True + +def fix_keypoints(keypoints, last_keypoints, next_keypoints): + if not keypoints or not last_keypoints or not next_keypoints: + return keypoints + for i, keypoint in enumerate(keypoints): + if keypoint.confidence == 0.0 and last_keypoints and next_keypoints: + last_keypoint = last_keypoints[i] + next_keypoint = next_keypoints[i] + if last_keypoint.confidence > 0 and next_keypoint.confidence > 0: + keypoint.x = (last_keypoint.x + next_keypoint.x) / 2 + keypoint.y = (last_keypoint.y + next_keypoint.y) / 2 + keypoint.confidence = (last_keypoint.confidence + next_keypoint.confidence) / 2 + return keypoints + +def get_time_slice_for_Skeleton_Seqences(skeleton_seqences: List[Skeleton_Seqence], frame_index: int) -> List[Skeleton]: + return [skeleton_seq.get_frame(frame_index) for skeleton_seq in skeleton_seqences] \ No newline at end of file