2024-09-23 10:03:14 +00:00
|
|
|
import json
|
2024-10-03 09:37:37 +00:00
|
|
|
import os
|
|
|
|
import random
|
2024-09-23 10:03:14 +00:00
|
|
|
from typing import List
|
|
|
|
import cv2
|
2024-10-16 03:01:19 +00:00
|
|
|
import numpy as np
|
|
|
|
import urllib
|
2024-10-03 09:37:37 +00:00
|
|
|
import skeleton_lib as skel
|
|
|
|
import process_json_file as pjf
|
2024-10-16 03:01:19 +00:00
|
|
|
from urllib import request
|
2024-10-07 10:45:43 +00:00
|
|
|
from requests_toolbelt.multipart.encoder import MultipartEncoder
|
2024-10-03 09:37:37 +00:00
|
|
|
import sys
|
2024-10-14 06:50:56 +00:00
|
|
|
import hashlib
|
2024-10-03 09:37:37 +00:00
|
|
|
sys.path.append('./')
|
2024-09-23 10:03:14 +00:00
|
|
|
|
2024-10-24 15:40:15 +00:00
|
|
|
# read json from info.json
|
|
|
|
info = json.load(open('info.json'))
|
|
|
|
server_address = info['comfyui_address']
|
|
|
|
expo_openpose_dir = info['expo_openpose_dir']
|
2024-10-14 07:55:21 +00:00
|
|
|
|
2024-10-03 09:37:37 +00:00
|
|
|
def coordinates_to_keypoints(coordinates: list) -> List[skel.Keypoint]:
|
|
|
|
keypoints = [skel.Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)]
|
2024-10-03 03:39:20 +00:00
|
|
|
return keypoints
|
|
|
|
|
2024-10-24 17:56:14 +00:00
|
|
|
def expo_save_bodypose(width: int, height: int, coordinates: list, batch: int, step: int, save_dir: str, limbSeq: list[int], colors: list[int]) -> str:
|
2024-10-24 15:40:15 +00:00
|
|
|
canvas = np.zeros((height, width, 3), dtype=np.uint8)
|
|
|
|
keypoints = coordinates_to_keypoints(coordinates)
|
2024-10-24 17:56:14 +00:00
|
|
|
canvas = skel.draw_bodypose(canvas, keypoints, limbSeq, colors)
|
2024-10-24 15:40:15 +00:00
|
|
|
|
|
|
|
# Save as {batch}_{step}.png, {batch}_{step}.png, ...
|
2024-10-24 17:56:14 +00:00
|
|
|
if not os.path.exists(save_dir):
|
|
|
|
os.makedirs(save_dir)
|
|
|
|
image_path = os.path.join(save_dir, '%d_%d.png' % (batch, step))
|
2024-10-24 15:40:15 +00:00
|
|
|
image_path = image_path.replace('\\', '/')
|
|
|
|
cv2.imwrite(image_path, canvas)
|
|
|
|
|
|
|
|
return image_path
|
|
|
|
|
2024-10-14 06:50:56 +00:00
|
|
|
def save_bodypose(width: int, height: int, coordinates: list, pid: str) -> None:
|
2024-10-03 03:39:20 +00:00
|
|
|
if not hasattr(save_bodypose, 'counter'):
|
|
|
|
save_bodypose.counter = 0 # Initialize the counter attribute
|
|
|
|
|
|
|
|
canvas = np.zeros((height, width, 3), dtype=np.uint8)
|
|
|
|
keypoints = coordinates_to_keypoints(coordinates)
|
2024-10-16 03:01:19 +00:00
|
|
|
canvas = skel.draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors)
|
2024-10-03 03:39:20 +00:00
|
|
|
|
|
|
|
# Save as body_pose_output0000.png, body_pose_output0001.png, ...
|
2024-10-16 03:01:19 +00:00
|
|
|
output_dir = 'output'
|
|
|
|
if not os.path.exists(output_dir):
|
|
|
|
os.makedirs(output_dir)
|
2024-10-14 06:50:56 +00:00
|
|
|
image_path = 'output/body_pose_output%04d.png' % save_bodypose.counter
|
|
|
|
cv2.imwrite(image_path, canvas)
|
2024-10-03 03:39:20 +00:00
|
|
|
save_bodypose.counter += 1 # Increment the counter
|
|
|
|
|
2024-10-14 06:50:56 +00:00
|
|
|
return image_path
|
2024-10-07 10:45:43 +00:00
|
|
|
|
2024-10-14 06:50:56 +00:00
|
|
|
def save_bodypose_mulit(width: int, height: int, coordinates_list: list, pid: str) -> None:
|
|
|
|
if not hasattr(save_bodypose_mulit, 'counter'):
|
|
|
|
save_bodypose_mulit.counter = 0 # Initialize the counter attribute
|
|
|
|
|
|
|
|
canvas = np.zeros((height, width, 3), dtype=np.uint8)
|
|
|
|
for coordinates in coordinates_list:
|
|
|
|
keypoints = coordinates_to_keypoints(coordinates)
|
2024-10-16 03:01:19 +00:00
|
|
|
canvas = skel.draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors)
|
2024-10-14 06:50:56 +00:00
|
|
|
|
|
|
|
# Save as body_pose_output0000.png, body_pose_output0001.png, ...
|
2024-10-16 03:01:19 +00:00
|
|
|
output_dir = 'output'
|
|
|
|
if not os.path.exists(output_dir):
|
|
|
|
os.makedirs(output_dir)
|
|
|
|
|
2024-10-14 06:50:56 +00:00
|
|
|
image_path = 'output/body_pose_output_multi%04d.png' % save_bodypose_mulit.counter
|
|
|
|
cv2.imwrite(image_path, canvas)
|
|
|
|
save_bodypose_mulit.counter += 1 # Increment the counter
|
|
|
|
|
|
|
|
return image_path
|
|
|
|
|
2024-10-24 15:40:15 +00:00
|
|
|
def queue_prompt(prompt):
|
2024-10-07 10:45:43 +00:00
|
|
|
p = {"prompt": prompt}
|
|
|
|
data = json.dumps(p).encode('utf-8')
|
2024-10-14 06:50:56 +00:00
|
|
|
req = request.Request("http://{}/prompt".format(server_address), data=data)
|
2024-10-07 10:45:43 +00:00
|
|
|
request.urlopen(req)
|
|
|
|
|
2024-10-24 15:40:15 +00:00
|
|
|
def upload_image(input_image, image_name="", image_type="input", overwrite=True) -> str:
|
|
|
|
if image_name == "":
|
|
|
|
# generate a random name here
|
|
|
|
image_name = hashlib.sha256(str(random.getrandbits(256)).encode('utf-8')).hexdigest() + ".png"
|
|
|
|
|
2024-10-14 06:50:56 +00:00
|
|
|
# Check if input_image is a valid file path
|
|
|
|
if isinstance(input_image, str) and os.path.isfile(input_image):
|
|
|
|
file = open(input_image, 'rb')
|
|
|
|
close_file = True
|
|
|
|
else:
|
|
|
|
file = input_image
|
|
|
|
close_file = False
|
|
|
|
|
|
|
|
try:
|
|
|
|
multipart_data = MultipartEncoder(
|
|
|
|
fields={
|
2024-10-24 15:40:15 +00:00
|
|
|
'image': (image_name, file, 'image/png'),
|
2024-10-14 06:50:56 +00:00
|
|
|
'type': image_type,
|
|
|
|
'overwrite': str(overwrite).lower()
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
data = multipart_data
|
|
|
|
headers = {'Content-Type': multipart_data.content_type}
|
|
|
|
request = urllib.request.Request("http://{}/upload/image".format(server_address), data=data, headers=headers)
|
|
|
|
with urllib.request.urlopen(request) as response:
|
2024-10-24 15:40:15 +00:00
|
|
|
return json.loads(response.read().decode('utf-8'))["name"]
|
2024-10-14 06:50:56 +00:00
|
|
|
finally:
|
|
|
|
if close_file:
|
|
|
|
file.close()
|
|
|
|
|
2024-10-24 15:40:15 +00:00
|
|
|
def upload_image_circular_queue(image_path, size, unqiue_id):
|
2024-10-14 06:50:56 +00:00
|
|
|
# create a dict in this function to store the counter for each unique_id, key is the unique_id, value is the counter
|
|
|
|
if not hasattr(upload_image_circular_queue, 'id_counter_dict'):
|
|
|
|
upload_image_circular_queue.id_counter_dict = {}
|
|
|
|
|
|
|
|
if unqiue_id not in upload_image_circular_queue.id_counter_dict:
|
|
|
|
upload_image_circular_queue.id_counter_dict[unqiue_id] = 0
|
|
|
|
|
|
|
|
image_name = hashlib.sha256((unqiue_id + str(upload_image_circular_queue.id_counter_dict[unqiue_id])).encode('utf-8')).hexdigest() + ".png"
|
|
|
|
upload_image_circular_queue.id_counter_dict[unqiue_id] += 1 % size
|
2024-10-24 15:40:15 +00:00
|
|
|
upload_image(image_path, image_name)
|
2024-10-14 06:50:56 +00:00
|
|
|
|
|
|
|
return image_name
|
2024-10-07 10:45:43 +00:00
|
|
|
|
2024-10-21 04:38:29 +00:00
|
|
|
def visualize_for_fixed_dataset(json_file: str):
|
2024-10-03 09:37:37 +00:00
|
|
|
directory = './fixed'
|
|
|
|
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
|
|
|
|
if not json_files:
|
|
|
|
print("No JSON files found in the directory.")
|
|
|
|
return
|
2024-10-03 03:39:20 +00:00
|
|
|
|
2024-10-18 07:06:09 +00:00
|
|
|
# json_file = os.path.join(directory, random.choice(json_files))
|
|
|
|
json_file = './fixed/0001_002_00_01_1.json'
|
2024-10-03 09:37:37 +00:00
|
|
|
# json_file = './test_output.json'
|
2024-10-18 07:06:09 +00:00
|
|
|
# create ./output directory if it does not exist
|
|
|
|
os.makedirs('output', exist_ok=True)
|
2024-10-03 09:37:37 +00:00
|
|
|
image_path = './output/test'
|
|
|
|
print(json_file)
|
2024-10-03 03:39:20 +00:00
|
|
|
|
2024-10-03 09:37:37 +00:00
|
|
|
skeleton_sequences = pjf.array_json_to_Skeleton_Seqences(json_file)
|
|
|
|
frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None)
|
|
|
|
sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)]
|
|
|
|
|
|
|
|
for i in range(frame_count):
|
|
|
|
sliced = sliced_list[i]
|
|
|
|
canvas = np.zeros((360, 640, 3), dtype=np.uint8)
|
|
|
|
|
|
|
|
for j, skeleton in enumerate(sliced):
|
2024-10-03 03:39:20 +00:00
|
|
|
keypoints = skeleton.keypoints
|
2024-10-03 09:37:37 +00:00
|
|
|
skeleton_sequences[j].get_frame(i).keypoints = keypoints
|
2024-10-16 03:01:19 +00:00
|
|
|
canvas = skel.draw_bodypose(canvas, keypoints, skel.body_25_limbSeq, skel.body_25_colors)
|
2024-10-03 03:39:20 +00:00
|
|
|
|
|
|
|
cv2.imwrite(image_path + '_' + str(i) + '.png', canvas)
|
|
|
|
|
2024-10-21 04:38:29 +00:00
|
|
|
def visualize_for_new_yolo_dataset():
|
|
|
|
directory = './new_yolo_keypoints'
|
|
|
|
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
|
|
|
|
if not json_files:
|
|
|
|
print("No JSON files found in the directory.")
|
|
|
|
return
|
|
|
|
|
|
|
|
json_file = os.path.join(directory, random.choice(json_files))
|
|
|
|
# create ./output directory if it does not exist
|
|
|
|
os.makedirs('output', exist_ok=True)
|
|
|
|
|
|
|
|
frames = pjf.read_new_yolo_keypoints(json_file)
|
|
|
|
|
|
|
|
for i, skeletons in enumerate(frames):
|
|
|
|
canvas = np.zeros((360, 640, 3), dtype=np.uint8)
|
|
|
|
for skeleton in skeletons:
|
|
|
|
keypoints = skeleton.keypoints
|
|
|
|
canvas = skel.draw_bodypose(canvas, keypoints, skel.yolo_coco_limbSeq, skel.yolo_coco_colors)
|
|
|
|
|
|
|
|
print(os.path.join('./output', os.path.basename(json_file) + '_' + str(i) + '.png'))
|
|
|
|
cv2.imwrite(os.path.join('./output', 'a_test' + '_' + str(i) + '.png'), canvas)
|
|
|
|
|
|
|
|
def main():
|
|
|
|
visualize_for_new_yolo_dataset()
|
|
|
|
|
2024-10-03 03:39:20 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
main()
|