Image_Gen_Server/process_json_file.py

89 lines
3.7 KiB
Python
Raw Normal View History

2024-10-03 09:37:37 +00:00
import os
import json
import sys
import numpy as np
from typing import List
import skeleton_lib as skel
import concurrent.futures
sys.path.append('./')
def json_to_keypoints_openpose(json_file: str) -> List[skel.Keypoint]:
with open(json_file, 'r') as file:
data = json.load(file)
keypoints = data[0]['people'][0]['pose_keypoints_2d']
keypoints = [skel.Keypoint(keypoints[i], keypoints[i + 1]) for i in range(0, len(keypoints), 3)]
return keypoints
def array_json_to_Skeleton_Seqences(json_file: str) -> List[skel.Skeleton_Seqence]:
with open(json_file, 'r') as file:
data = json.load(file)
skeleton_sequences = []
for frame in data:
for i in range(len(frame)):
while len(skeleton_sequences) <= i:
skeleton_sequences.append(None)
skeleton_sequences[i] = skel.Skeleton_Seqence([])
skeleton = skel.Skeleton([skel.Keypoint(keypoint[0], keypoint[1], keypoint[2]) for keypoint in frame[i]])
skeleton_sequences[i].add_frame(skeleton)
return skeleton_sequences
def Skeleton_Seqences_save_to_array_json(skeleton_sequences: List[skel.Skeleton_Seqence], json_file: str):
# Ensure the directory exists
os.makedirs(os.path.dirname(json_file), exist_ok=True)
data = []
for i in range(len(skeleton_sequences[0].skeletons_frame)):
sliced = skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i)
sequence_data = []
for skeleton in sliced:
keypoints_data = [[kp.x, kp.y, kp.confidence] for kp in skeleton.keypoints]
sequence_data.append(keypoints_data)
data.append(sequence_data)
with open(json_file, 'w') as file:
json.dump(data, file, indent=4)
def process_json_file(json_file, directory):
json_file = os.path.join(directory, json_file)
# print(json_file)
skeleton_sequences = array_json_to_Skeleton_Seqences(json_file)
frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None)
sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)]
for i in range(frame_count):
last_sliced = sliced_list[i - 1] if i > 0 else None
next_sliced = sliced_list[i + 1] if i < frame_count - 1 else None
sliced = sliced_list[i]
for j, skeleton in enumerate(sliced):
last_keypoints = last_sliced[j].keypoints if last_sliced else None
next_keypoints = next_sliced[j].keypoints if next_sliced else None
keypoints = skeleton.keypoints
keypoints = skel.fix_keypoints(keypoints, last_keypoints, next_keypoints)
skeleton_sequences[j].get_frame(i).keypoints = keypoints
Skeleton_Seqences_save_to_array_json(skeleton_sequences, './fixed/' + os.path.basename(json_file))
def process_json_files_chunk(json_files_chunk, directory):
for json_file in json_files_chunk:
process_json_file(json_file, directory)
def process_json_files_multi_threaded(json_files, directory):
directory = './FencersKeyPoints'
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
if not json_files:
print("No JSON files found in the directory.")
return
json_files_chunks = np.array_split(json_files, 12)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_json_files_chunk, chunk, directory) for chunk in json_files_chunks]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Error processing file chunk: {e}")