167 lines
6.3 KiB
Python
167 lines
6.3 KiB
Python
import base64
|
|
import hashlib
|
|
import json
|
|
import random
|
|
import uuid
|
|
import cv2
|
|
from flask import Flask, request, jsonify
|
|
import sys
|
|
import os
|
|
from PIL import Image
|
|
import io
|
|
|
|
import numpy as np
|
|
import websocket
|
|
import openpose_gen as opg
|
|
from comfy_socket import get_images
|
|
from postprocessing import expo_shuffle_image_steps, expo_add_to_background_image
|
|
sys.path.append('./')
|
|
app = Flask(__name__)
|
|
|
|
info = json.load(open('info.json'))
|
|
|
|
comfyui_address = info['comfyui_address']
|
|
expo_raw_sd_dir = info['expo_raw_sd_dir']
|
|
expo_openpose_dir = info['expo_openpose_dir']
|
|
expo_postprocessed_dir = info['expo_postprocessed_dir']
|
|
expo_postprocess_temp_dir = info['expo_postprocess_temp_dir']
|
|
|
|
@app.route('/expo_fencing_pose', methods=['POST'])
|
|
def expo_fencing_pose():
|
|
if request.is_json:
|
|
data = request.get_json()
|
|
coordinates = data['coordinates']
|
|
canvas_size = data['canvas_size']
|
|
batch = data['batch']
|
|
step = data['step']
|
|
|
|
if coordinates is None or canvas_size is None or 'batch' not in data or 'step' not in data:
|
|
return jsonify({"status": "error", "message": "Missing data"}), 422
|
|
|
|
openpose_image_path = opg.expo_save_bodypose(canvas_size[0], canvas_size[1], coordinates, batch, step)
|
|
print(openpose_image_path)
|
|
expo_fencer_prompt(openpose_image_path, batch, step)
|
|
|
|
return jsonify({"status": "success", "message": "Data received"}), 201
|
|
else:
|
|
return jsonify({"status": "error", "message": "Request must be JSON"}), 415
|
|
|
|
def expo_fencer_prompt(openpose_image_path, batch, step):
|
|
|
|
prompt = json.loads(open("./prompts/fencer_03.json", "r", encoding="utf-8").read())
|
|
|
|
openpose_image_name = opg.upload_image(openpose_image_path)
|
|
opg.upload_image("./images/ref_black.png", "ref_black.png")
|
|
|
|
print(openpose_image_name)
|
|
|
|
prompt["3"]["inputs"]["seed"] = random.randint(0, 10000000000)
|
|
prompt["29"]["inputs"]['image'] = "ref_black.png"
|
|
prompt["17"]["inputs"]['image'] = openpose_image_name
|
|
|
|
client_id = hashlib.sha256(str(random.getrandbits(256)).encode('utf-8')).hexdigest()
|
|
ws = websocket.WebSocket()
|
|
ws.connect("ws://{}/ws?clientId={}".format(comfyui_address, client_id))
|
|
images = get_images(ws, prompt, client_id)
|
|
for node_id in images:
|
|
for idx, image_data in enumerate(images[node_id]):
|
|
image = Image.open(io.BytesIO(image_data))
|
|
image_path = os.path.join(expo_raw_sd_dir, f"{batch}_{step}.png")
|
|
image.save(image_path)
|
|
|
|
def expo_clear_images():
|
|
for file in os.listdir(expo_openpose_dir):
|
|
os.remove(os.path.join(expo_openpose_dir, file))
|
|
for file in os.listdir(expo_raw_sd_dir):
|
|
os.remove(os.path.join(expo_raw_sd_dir, file))
|
|
|
|
@app.route('/expo_postprocess', methods=['POST'])
|
|
def expo_postprocess():
|
|
print("Postprocessing")
|
|
os.makedirs(expo_postprocess_temp_dir, exist_ok=True)
|
|
|
|
shuffled_images_paths = expo_shuffle_image_steps()
|
|
background_path = os.path.join(expo_postprocess_temp_dir, 'background.png')
|
|
if not os.path.exists(background_path):
|
|
background = np.zeros((1000, 1500, 3), dtype=np.uint8)
|
|
cv2.imwrite(background_path, background)
|
|
|
|
expo_add_to_background_image(background_path, shuffled_images_paths[0][0], 0, 0)
|
|
cv2.imwrite(os.path.join(expo_postprocessed_dir, 'postprocessed.png'), background)
|
|
|
|
# expo_clear_images()
|
|
|
|
@app.route('/gen_image', methods=['POST'])
|
|
def gen_image():
|
|
if request.is_json:
|
|
data = request.get_json()
|
|
coordinates = data['coordinates']
|
|
canvas_size = data['canvas_size']
|
|
pid = data['pid']
|
|
|
|
if not coordinates or not canvas_size:
|
|
return jsonify({"status": "error", "message": "Missing data"}), 422
|
|
|
|
openpose_image_path = opg.save_bodypose(canvas_size[0], canvas_size[1], coordinates, pid)
|
|
# gen_fencer_prompt(openpose_image_path, pid, comfyui_address)
|
|
|
|
return jsonify({"status": "success", "message": "Data received"}), 201
|
|
else:
|
|
return jsonify({"status": "error", "message": "Request must be JSON"}), 415
|
|
|
|
|
|
@app.route('/gen_group_pic', methods=['POST'])
|
|
def gen_group_pic():
|
|
if request.is_json:
|
|
data = request.get_json()
|
|
coordinates_list = data['coordinates_list']
|
|
canvas_size = data['canvas_size']
|
|
pid = data['pid']
|
|
base_image = base64.b64decode(data['base_image'])
|
|
|
|
if not coordinates_list or not canvas_size or not base_image or not pid:
|
|
return jsonify({"status": "error", "message": "Missing data"}), 422
|
|
|
|
for i in range(len(coordinates_list)):
|
|
coordinates_list[i] = coordinates_list[i]['coordinates']
|
|
|
|
openpose_image_path = opg.save_bodypose_mulit(canvas_size[0], canvas_size[1], coordinates_list, pid)
|
|
gen_group_pic_prompt(openpose_image_path, base_image, pid, comfyui_address)
|
|
|
|
return jsonify({"status": "success", "message": "Data received"}), 201
|
|
else:
|
|
return jsonify({"status": "error", "message": "Request must be JSON"}), 415
|
|
|
|
def gen_fencer_prompt(openpose_image_path, pid, comfyUI_address):
|
|
with open("./prompts/fencerAPI.json", "r") as f:
|
|
prompt_json = f.read()
|
|
prompt = json.loads(prompt_json)
|
|
|
|
openpose_image_name = opg.upload_image_circular_queue(openpose_image_path, 20, pid, comfyUI_address)
|
|
opg.upload_image("./images/ref_black.png", "ref_black.png")
|
|
|
|
prompt["3"]["inputs"]["seed"] = random.randint(0, 10000000000)
|
|
prompt["29"]["inputs"]['image'] = "./images/ref_black.png"
|
|
prompt["17"]["inputs"]['image'] = openpose_image_name
|
|
|
|
opg.queue_prompt(prompt, comfyUI_address)
|
|
|
|
def gen_group_pic_prompt(openpose_image_path, base_image, pid, comfyUI_address):
|
|
with open("./prompts/group_pic.json", "r") as f:
|
|
prompt_json = f.read()
|
|
prompt = json.loads(prompt_json)
|
|
|
|
openpose_image_name = opg.upload_image_circular_queue(openpose_image_path, 30, pid, comfyUI_address)
|
|
base_image_name = opg.upload_image_circular_queue(base_image, 30, pid, comfyUI_address)
|
|
|
|
prompt["3"]["inputs"]["seed"] = random.randint(0, 10000000000)
|
|
prompt["10"]["inputs"]['image'] = openpose_image_name
|
|
prompt["14"]["inputs"]['image'] = base_image_name
|
|
|
|
opg.queue_prompt(prompt, comfyUI_address)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
expo_postprocess()
|
|
# app.run(debug=True) |