import os import json import sys import cv2 import numpy as np import pandas as pd from typing import List import skeleton_lib as skel import concurrent.futures sys.path.append('./') def json_to_keypoints_openpose(json_file: str) -> List[skel.Keypoint]: with open(json_file, 'r') as file: data = json.load(file) keypoints = data[0]['people'][0]['pose_keypoints_2d'] keypoints = [skel.Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)] return keypoints def array_json_to_Skeleton_Seqences(json_file: str) -> List[skel.Skeleton_Seqence]: with open(json_file, 'r') as file: data = json.load(file) skeleton_sequences = [] for frame in data: for i in range(len(frame)): while len(skeleton_sequences) <= i: skeleton_sequences.append(None) skeleton_sequences[i] = skel.Skeleton_Seqence([]) skeleton = skel.Skeleton([skel.Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]]) skeleton_sequences[i].add_frame(skeleton) return skeleton_sequences def Skeleton_Seqences_save_to_array_json(skeleton_sequences: List[skel.Skeleton_Seqence], json_file: str): # Ensure the directory exists os.makedirs(os.path.dirname(json_file), exist_ok=True) data = [] for i in range(len(skeleton_sequences[0].skeletons_frame)): sliced = skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) sequence_data = [] for skeleton in sliced: keypoints_data = [[kp.x, kp.y, kp.confidence] for kp in skeleton.keypoints] sequence_data.append(keypoints_data) data.append(sequence_data) with open(json_file, 'w') as file: json.dump(data, file, indent=4) def process_json_file(json_file, directory, output_directory): json_file = os.path.join(directory, json_file) # print(json_file) skeleton_sequences = array_json_to_Skeleton_Seqences(json_file) frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None) sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)] for i in range(frame_count): last_sliced = sliced_list[i - 1] if i > 0 else None next_sliced = sliced_list[i + 1] if i < frame_count - 1 else None sliced = sliced_list[i] for j, skeleton in enumerate(sliced): last_keypoints = last_sliced[j].keypoints if last_sliced else None next_keypoints = next_sliced[j].keypoints if next_sliced else None keypoints = skeleton.keypoints keypoints = skel.fix_keypoints(keypoints, last_keypoints, next_keypoints) skeleton_sequences[j].get_frame(i).keypoints = keypoints Skeleton_Seqences_save_to_array_json(skeleton_sequences, output_directory + os.path.basename(json_file)) def process_json_files_chunk(json_files_chunk, directory, output_directory): for json_file in json_files_chunk: process_json_file(json_file, directory, output_directory) def process_json_files_multi_threaded(json_files, directory, output_directory): json_files = [f for f in os.listdir(directory) if f.endswith('.json')] if not json_files: print("No JSON files found in the directory.") return json_files_chunks = np.array_split(json_files, 64) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(process_json_files_chunk, chunk, directory) for chunk in json_files_chunks] for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: print(f"Error processing file chunk: {e}") def process_clip_descriptor(input_file_path, output_file_path): ClipDescriptorKaggle = pd.read_csv(input_file_path) # Add a new column to store the YouTube video ID by extracting it from the URL ClipDescriptorKaggle['video_id'] = ClipDescriptorKaggle['URL'].apply(lambda x: x.split('=')[1].split('&')[0]) # Save the processed DataFrame to a new CSV file with open(output_file_path, 'w') as file: ClipDescriptorKaggle.to_csv(file, index=False) def get_frames_from_fixed_json(json_file): frames = [] with open(json_file, 'r') as file: data = json.load(file) for frame in data: skeletons = [] for i in range(2): # Assuming there are always 2 skeletons keypoints = [] for point in frame[i]: keypoint = skel.Keypoint(point[0], point[1], point[2]) keypoints.append(keypoint) skeletons.append(skel.Skeleton(keypoints)) frames.append(skeletons) return frames def main(): descriptor = pd.read_csv('./ClipDescriptorKaggle_processed.csv') frames = get_frames_from_fixed_json('./fixed/0050_001_08_08_1.json') # print(frames[0][0].keypoints[0]) canvas = np.zeros((360, 640, 3), dtype=np.uint8) canvas = skel.draw_bodypose(canvas, frames[0][0].keypoints, skel.body_25_limbSeq, skel.body_25_colors) #save the image cv2.imwrite('test.png', canvas) if __name__ == '__main__': main()