Image_Gen_Server/openpose_gen.py

199 lines
7.6 KiB
Python
Raw Normal View History

2024-09-23 10:03:14 +00:00
import json
2024-10-03 09:37:37 +00:00
import os
import random
2024-09-23 10:03:14 +00:00
import numpy as np
from typing import List
import math
import cv2
2024-10-03 09:37:37 +00:00
import skeleton_lib as skel
import process_json_file as pjf
2024-10-07 10:45:43 +00:00
import urllib
from urllib import request, parse
from requests_toolbelt.multipart.encoder import MultipartEncoder
2024-10-03 09:37:37 +00:00
import sys
2024-10-14 06:50:56 +00:00
import hashlib
2024-10-03 09:37:37 +00:00
sys.path.append('./')
2024-09-23 10:03:14 +00:00
2024-10-14 07:55:21 +00:00
server_address = "localhost:8188"
2024-10-03 09:37:37 +00:00
def is_normalized(keypoints: List[skel.Keypoint]) -> bool:
2024-09-23 10:03:14 +00:00
for keypoint in keypoints:
if not (0 <= keypoint.x <= 1 and 0 <= keypoint.y <= 1):
return False
return True
2024-10-03 09:37:37 +00:00
def draw_bodypose(canvas: np.ndarray, keypoints: List[skel.Keypoint], limbSeq, colors, xinsr_stick_scaling: bool = False) -> np.ndarray:
2024-09-23 10:03:14 +00:00
"""
Draw keypoints and limbs representing body pose on a given canvas.
Args:
canvas (np.ndarray): A 3D numpy array representing the canvas (image) on which to draw the body pose.
keypoints (List[Keypoint]): A list of Keypoint objects representing the body keypoints to be drawn.
xinsr_stick_scaling (bool): Whether or not scaling stick width for xinsr ControlNet
Returns:
np.ndarray: A 3D numpy array representing the modified canvas with the drawn body pose.
Note:
The function expects the x and y coordinates of the keypoints to be normalized between 0 and 1.
"""
if not is_normalized(keypoints):
H, W = 1.0, 1.0
else:
H, W, _ = canvas.shape
CH, CW, _ = canvas.shape
2024-10-03 09:37:37 +00:00
stickwidth = 2
2024-09-23 10:03:14 +00:00
# Ref: https://huggingface.co/xinsir/controlnet-openpose-sdxl-1.0
max_side = max(CW, CH)
if xinsr_stick_scaling:
stick_scale = 1 if max_side < 500 else min(2 + (max_side // 1000), 7)
2024-10-03 09:37:37 +00:00
else :
2024-09-23 10:03:14 +00:00
stick_scale = 1
2024-10-03 09:37:37 +00:00
if keypoints is None or len(keypoints) == 0:
return canvas
2024-09-23 10:03:14 +00:00
for (k1_index, k2_index), color in zip(limbSeq, colors):
2024-10-03 09:37:37 +00:00
keypoint1 = keypoints[k1_index]
keypoint2 = keypoints[k2_index]
if keypoint1 is None or keypoint2 is None or keypoint1.confidence == 0 or keypoint2.confidence == 0:
# if keypoint1 is None or keypoint1.confidence == 0:
# print(f"keypoint failed: {k1_index}")
# if keypoint2 is None or keypoint2.confidence == 0:
# print(f"keypoint failed: {k2_index}")
2024-09-23 10:03:14 +00:00
continue
Y = np.array([keypoint1.x, keypoint2.x]) * float(W)
X = np.array([keypoint1.y, keypoint2.y]) * float(H)
mX = np.mean(X)
mY = np.mean(Y)
length = ((X[0] - X[1]) ** 2 + (Y[0] - Y[1]) ** 2) ** 0.5
angle = math.degrees(math.atan2(X[0] - X[1], Y[0] - Y[1]))
polygon = cv2.ellipse2Poly((int(mY), int(mX)), (int(length / 2), stickwidth*stick_scale), int(angle), 0, 360, 1)
cv2.fillConvexPoly(canvas, polygon, [int(float(c) * 0.6) for c in color])
for keypoint, color in zip(keypoints, colors):
2024-10-03 09:37:37 +00:00
if keypoint is None or keypoint.confidence == 0:
2024-09-23 10:03:14 +00:00
continue
x, y = keypoint.x, keypoint.y
x = int(x * W)
y = int(y * H)
cv2.circle(canvas, (int(x), int(y)), 4, color, thickness=-1)
return canvas
2024-10-03 09:37:37 +00:00
def coordinates_to_keypoints(coordinates: list) -> List[skel.Keypoint]:
keypoints = [skel.Keypoint(coordinates[i], coordinates[i + 1]) for i in range(0, len(coordinates), 3)]
2024-10-03 03:39:20 +00:00
return keypoints
2024-10-14 06:50:56 +00:00
def save_bodypose(width: int, height: int, coordinates: list, pid: str) -> None:
2024-10-03 03:39:20 +00:00
if not hasattr(save_bodypose, 'counter'):
save_bodypose.counter = 0 # Initialize the counter attribute
canvas = np.zeros((height, width, 3), dtype=np.uint8)
keypoints = coordinates_to_keypoints(coordinates)
2024-10-03 09:37:37 +00:00
canvas = draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors)
2024-10-03 03:39:20 +00:00
# Save as body_pose_output0000.png, body_pose_output0001.png, ...
2024-10-14 06:50:56 +00:00
image_path = 'output/body_pose_output%04d.png' % save_bodypose.counter
cv2.imwrite(image_path, canvas)
2024-10-03 03:39:20 +00:00
save_bodypose.counter += 1 # Increment the counter
2024-10-14 06:50:56 +00:00
return image_path
2024-10-07 10:45:43 +00:00
2024-10-14 06:50:56 +00:00
def save_bodypose_mulit(width: int, height: int, coordinates_list: list, pid: str) -> None:
if not hasattr(save_bodypose_mulit, 'counter'):
save_bodypose_mulit.counter = 0 # Initialize the counter attribute
canvas = np.zeros((height, width, 3), dtype=np.uint8)
for coordinates in coordinates_list:
keypoints = coordinates_to_keypoints(coordinates)
canvas = draw_bodypose(canvas, keypoints, skel.coco_limbSeq, skel.coco_colors)
# Save as body_pose_output0000.png, body_pose_output0001.png, ...
image_path = 'output/body_pose_output_multi%04d.png' % save_bodypose_mulit.counter
cv2.imwrite(image_path, canvas)
save_bodypose_mulit.counter += 1 # Increment the counter
return image_path
def queue_prompt(prompt, server_address):
2024-10-07 10:45:43 +00:00
p = {"prompt": prompt}
data = json.dumps(p).encode('utf-8')
2024-10-14 06:50:56 +00:00
req = request.Request("http://{}/prompt".format(server_address), data=data)
2024-10-07 10:45:43 +00:00
request.urlopen(req)
2024-10-14 06:50:56 +00:00
def upload_image(input_image, name, server_address, image_type="input", overwrite=False):
# Check if input_image is a valid file path
if isinstance(input_image, str) and os.path.isfile(input_image):
file = open(input_image, 'rb')
close_file = True
else:
file = input_image
close_file = False
try:
multipart_data = MultipartEncoder(
fields={
'image': (name, file, 'image/png'),
'type': image_type,
'overwrite': str(overwrite).lower()
}
)
data = multipart_data
headers = {'Content-Type': multipart_data.content_type}
request = urllib.request.Request("http://{}/upload/image".format(server_address), data=data, headers=headers)
with urllib.request.urlopen(request) as response:
return response.read()
finally:
if close_file:
file.close()
def upload_image_circular_queue(image_path, size, unqiue_id, server_address):
# create a dict in this function to store the counter for each unique_id, key is the unique_id, value is the counter
if not hasattr(upload_image_circular_queue, 'id_counter_dict'):
upload_image_circular_queue.id_counter_dict = {}
if unqiue_id not in upload_image_circular_queue.id_counter_dict:
upload_image_circular_queue.id_counter_dict[unqiue_id] = 0
image_name = hashlib.sha256((unqiue_id + str(upload_image_circular_queue.id_counter_dict[unqiue_id])).encode('utf-8')).hexdigest() + ".png"
upload_image_circular_queue.id_counter_dict[unqiue_id] += 1 % size
upload_image(image_path, image_name, server_address)
return image_name
2024-10-07 10:45:43 +00:00
2024-10-03 09:37:37 +00:00
def main():
directory = './fixed'
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
if not json_files:
print("No JSON files found in the directory.")
return
2024-10-03 03:39:20 +00:00
2024-10-03 09:37:37 +00:00
json_file = os.path.join(directory, random.choice(json_files))
# json_file = './test_output.json'
image_path = './output/test'
print(json_file)
2024-10-03 03:39:20 +00:00
2024-10-03 09:37:37 +00:00
skeleton_sequences = pjf.array_json_to_Skeleton_Seqences(json_file)
frame_count = max(len(skeleton_sequences[i].skeletons_frame) for i in range(len(skeleton_sequences)) if skeleton_sequences[i] is not None)
sliced_list = [skel.get_time_slice_for_Skeleton_Seqences(skeleton_sequences, i) for i in range(frame_count)]
for i in range(frame_count):
sliced = sliced_list[i]
canvas = np.zeros((360, 640, 3), dtype=np.uint8)
for j, skeleton in enumerate(sliced):
2024-10-03 03:39:20 +00:00
keypoints = skeleton.keypoints
2024-10-03 09:37:37 +00:00
skeleton_sequences[j].get_frame(i).keypoints = keypoints
canvas = draw_bodypose(canvas, keypoints, skel.body_25_limbSeq, skel.body_25_colors)
2024-10-03 03:39:20 +00:00
cv2.imwrite(image_path + '_' + str(i) + '.png', canvas)
if __name__ == '__main__':
main()