132 lines
5.2 KiB
Python
132 lines
5.2 KiB
Python
import os
|
|
import json
|
|
import sys
|
|
import cv2
|
|
import numpy as np
|
|
import pandas as pd
|
|
from typing import List
|
|
import skeleton_lib as skel
|
|
import concurrent.futures
|
|
sys.path.append('./')
|
|
|
|
def json_to_keypoints_openpose(json_file: str) -> List[skel.Keypoint]:
|
|
with open(json_file, 'r') as file:
|
|
data = json.load(file)
|
|
keypoints = data[0]['people'][0]['pose_keypoints_2d']
|
|
keypoints = [skel.Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)]
|
|
return keypoints
|
|
|
|
def array_json_to_Skeleton_Seqences(json_file: str) -> List[skel.Skeleton_Seqence]:
|
|
with open(json_file, 'r') as file:
|
|
data = json.load(file)
|
|
|
|
skeleton_sequences = []
|
|
for frame in data:
|
|
for i in range(len(frame)):
|
|
while len(skeleton_sequences) <= i:
|
|
skeleton_sequences.append(None)
|
|
skeleton_sequences[i] = skel.Skeleton_Seqence([])
|
|
skeleton = skel.Skeleton([skel.Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]])
|
|
skeleton_sequences[i].add_frame(skeleton)
|
|
return skeleton_sequences
|
|
|
|
def Skeleton_Seqences_save_to_array_json(skeleton_sequences: List[skel.Skeleton_Seqence], json_file: str):
|
|
# Ensure the directory exists
|
|
os.makedirs(os.path.dirname(json_file), exist_ok=True)
|
|
|
|
data = []
|
|
|
|
for i in range(len(skeleton_sequences[0].skeletons_frame)):
|
|
sliced = skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i)
|
|
sequence_data = []
|
|
for skeleton in sliced:
|
|
keypoints_data = [[kp.x, kp.y, kp.confidence] for kp in skeleton.keypoints]
|
|
sequence_data.append(keypoints_data)
|
|
data.append(sequence_data)
|
|
|
|
with open(json_file, 'w') as file:
|
|
json.dump(data, file, indent=4)
|
|
|
|
def process_json_file(json_file, directory, output_directory):
|
|
json_file = os.path.join(directory, json_file)
|
|
# print(json_file)
|
|
|
|
skeleton_sequences = array_json_to_Skeleton_Seqences(json_file)
|
|
frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None)
|
|
sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)]
|
|
|
|
for i in range(frame_count):
|
|
last_sliced = sliced_list[i - 1] if i > 0 else None
|
|
next_sliced = sliced_list[i + 1] if i < frame_count - 1 else None
|
|
sliced = sliced_list[i]
|
|
|
|
for j, skeleton in enumerate(sliced):
|
|
last_keypoints = last_sliced[j].keypoints if last_sliced else None
|
|
next_keypoints = next_sliced[j].keypoints if next_sliced else None
|
|
keypoints = skeleton.keypoints
|
|
keypoints = skel.fix_keypoints(keypoints, last_keypoints, next_keypoints)
|
|
skeleton_sequences[j].get_frame(i).keypoints = keypoints
|
|
|
|
Skeleton_Seqences_save_to_array_json(skeleton_sequences, output_directory + os.path.basename(json_file))
|
|
|
|
def process_json_files_chunk(json_files_chunk, directory, output_directory):
|
|
for json_file in json_files_chunk:
|
|
process_json_file(json_file, directory, output_directory)
|
|
|
|
def process_json_files_multi_threaded(json_files, directory, output_directory):
|
|
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
|
|
if not json_files:
|
|
print("No JSON files found in the directory.")
|
|
return
|
|
|
|
json_files_chunks = np.array_split(json_files, 64)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
futures = [executor.submit(process_json_files_chunk, chunk, directory) for chunk in json_files_chunks]
|
|
for future in concurrent.futures.as_completed(futures):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
print(f"Error processing file chunk: {e}")
|
|
|
|
def process_clip_descriptor(input_file_path, output_file_path):
|
|
ClipDescriptorKaggle = pd.read_csv(input_file_path)
|
|
|
|
# Add a new column to store the YouTube video ID by extracting it from the URL
|
|
ClipDescriptorKaggle['video_id'] = ClipDescriptorKaggle['URL'].apply(lambda x: x.split('=')[1].split('&')[0])
|
|
|
|
# Save the processed DataFrame to a new CSV file
|
|
with open(output_file_path, 'w') as file:
|
|
ClipDescriptorKaggle.to_csv(file, index=False)
|
|
|
|
def get_frames_from_fixed_json(json_file):
|
|
frames = []
|
|
with open(json_file, 'r') as file:
|
|
data = json.load(file)
|
|
for frame in data:
|
|
skeletons = []
|
|
for i in range(2): # Assuming there are always 2 skeletons
|
|
keypoints = []
|
|
for point in frame[i]:
|
|
keypoint = skel.Keypoint(point[0], point[1], point[2])
|
|
keypoints.append(keypoint)
|
|
skeletons.append(skel.Skeleton(keypoints))
|
|
frames.append(skeletons)
|
|
|
|
return frames
|
|
|
|
def main():
|
|
|
|
descriptor = pd.read_csv('./ClipDescriptorKaggle_processed.csv')
|
|
|
|
frames = get_frames_from_fixed_json('./fixed/0050_001_08_08_1.json')
|
|
# print(frames[0][0].keypoints[0])
|
|
|
|
canvas = np.zeros((360, 640, 3), dtype=np.uint8)
|
|
canvas = skel.draw_bodypose(canvas, frames[0][0].keypoints, skel.body_25_limbSeq, skel.body_25_colors)
|
|
|
|
#save the image
|
|
cv2.imwrite('test.png', canvas)
|
|
|
|
if __name__ == '__main__':
|
|
main() |