import json import os import random from typing import List import cv2 import numpy as np import urllib import skeleton_lib as skel import process_json_file as pjf from urllib import request from requests_toolbelt.multipart.encoder import MultipartEncoder import sys import hashlib sys.path.append('./') # read json from info.json info = json.load(open('info.json')) server_address = info['comfyui_address'] expo_openpose_dir = info['expo_openpose_dir'] def coordinates_to_keypoints(coordinates: list) -> List[skel.Keypoint]: keypoints = [skel.Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)] return keypoints def expo_save_bodypose(width: int, height: int, coordinates: list, batch: int, step: int, save_dir: str, limbSeq: list[int], colors: list[int]) -> str: canvas = np.zeros((height, width, 3), dtype=np.uint8) keypoints = coordinates_to_keypoints(coordinates) canvas = skel.draw_bodypose(canvas, keypoints, limbSeq, colors) # Save as {batch}_{step}.png, {batch}_{step}.png, ... if not os.path.exists(save_dir): os.makedirs(save_dir) image_path = os.path.join(save_dir, '%d_%d.png' % (batch, step)) image_path = image_path.replace('\\', '/') cv2.imwrite(image_path, canvas) return image_path def save_bodypose(width: int, height: int, coordinates: list, pid: str) -> None: if not hasattr(save_bodypose, 'counter'): save_bodypose.counter = 0 # Initialize the counter attribute canvas = np.zeros((height, width, 3), dtype=np.uint8) keypoints = coordinates_to_keypoints(coordinates) canvas = skel.draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors) # Save as body_pose_output0000.png, body_pose_output0001.png, ... output_dir = 'output' if not os.path.exists(output_dir): os.makedirs(output_dir) image_path = 'output/body_pose_output%04d.png' % save_bodypose.counter cv2.imwrite(image_path, canvas) save_bodypose.counter += 1 # Increment the counter return image_path def save_bodypose_mulit(width: int, height: int, coordinates_list: list, pid: str) -> None: if not hasattr(save_bodypose_mulit, 'counter'): save_bodypose_mulit.counter = 0 # Initialize the counter attribute canvas = np.zeros((height, width, 3), dtype=np.uint8) for coordinates in coordinates_list: keypoints = coordinates_to_keypoints(coordinates) canvas = skel.draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors) # Save as body_pose_output0000.png, body_pose_output0001.png, ... output_dir = 'output' if not os.path.exists(output_dir): os.makedirs(output_dir) image_path = 'output/body_pose_output_multi%04d.png' % save_bodypose_mulit.counter cv2.imwrite(image_path, canvas) save_bodypose_mulit.counter += 1 # Increment the counter return image_path def queue_prompt(prompt): p = {"prompt": prompt} data = json.dumps(p).encode('utf-8') req = request.Request("http://{}/prompt".format(server_address), data=data) request.urlopen(req) def upload_image(input_image, image_name="", image_type="input", overwrite=True) -> str: if image_name == "": # generate a random name here image_name = hashlib.sha256(str(random.getrandbits(256)).encode('utf-8')).hexdigest() + ".png" # Check if input_image is a valid file path if isinstance(input_image, str) and os.path.isfile(input_image): file = open(input_image, 'rb') close_file = True else: file = input_image close_file = False try: multipart_data = MultipartEncoder( fields={ 'image': (image_name, file, 'image/png'), 'type': image_type, 'overwrite': str(overwrite).lower() } ) data = multipart_data headers = {'Content-Type': multipart_data.content_type} request = urllib.request.Request("http://{}/upload/image".format(server_address), data=data, headers=headers) with urllib.request.urlopen(request) as response: return json.loads(response.read().decode('utf-8'))["name"] finally: if close_file: file.close() def upload_image_circular_queue(image_path, size, unqiue_id): # create a dict in this function to store the counter for each unique_id, key is the unique_id, value is the counter if not hasattr(upload_image_circular_queue, 'id_counter_dict'): upload_image_circular_queue.id_counter_dict = {} if unqiue_id not in upload_image_circular_queue.id_counter_dict: upload_image_circular_queue.id_counter_dict[unqiue_id] = 0 image_name = hashlib.sha256((unqiue_id + str(upload_image_circular_queue.id_counter_dict[unqiue_id])).encode('utf-8')).hexdigest() + ".png" upload_image_circular_queue.id_counter_dict[unqiue_id] += 1 % size upload_image(image_path, image_name) return image_name def visualize_for_fixed_dataset(json_file: str): directory = './fixed' json_files = [f for f in os.listdir(directory) if f.endswith('.json')] if not json_files: print("No JSON files found in the directory.") return # json_file = os.path.join(directory, random.choice(json_files)) json_file = './fixed/0001_002_00_01_1.json' # json_file = './test_output.json' # create ./output directory if it does not exist os.makedirs('output', exist_ok=True) image_path = './output/test' print(json_file) skeleton_sequences = pjf.array_json_to_Skeleton_Seqences(json_file) frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None) sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)] for i in range(frame_count): sliced = sliced_list[i] canvas = np.zeros((360, 640, 3), dtype=np.uint8) for j, skeleton in enumerate(sliced): keypoints = skeleton.keypoints skeleton_sequences[j].get_frame(i).keypoints = keypoints canvas = skel.draw_bodypose(canvas, keypoints, skel.body_25_limbSeq, skel.body_25_colors) cv2.imwrite(image_path + '_' + str(i) + '.png', canvas) def visualize_for_new_yolo_dataset(): directory = './new_yolo_keypoints' json_files = [f for f in os.listdir(directory) if f.endswith('.json')] if not json_files: print("No JSON files found in the directory.") return json_file = os.path.join(directory, random.choice(json_files)) # create ./output directory if it does not exist os.makedirs('output', exist_ok=True) frames = pjf.read_new_yolo_keypoints(json_file) for i, skeletons in enumerate(frames): canvas = np.zeros((360, 640, 3), dtype=np.uint8) for skeleton in skeletons: keypoints = skeleton.keypoints canvas = skel.draw_bodypose(canvas, keypoints, skel.yolo_coco_limbSeq, skel.yolo_coco_colors) print(os.path.join('./output', os.path.basename(json_file) + '_' + str(i) + '.png')) cv2.imwrite(os.path.join('./output', 'a_test' + '_' + str(i) + '.png'), canvas) def main(): visualize_for_new_yolo_dataset() if __name__ == '__main__': main()