256 lines
9.4 KiB
Python
256 lines
9.4 KiB
Python
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import shutil
|
|
import subprocess
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
info = json.load(open('info.json'))
|
|
expo_raw_sd_dir = info['expo_raw_sd_dir']
|
|
expo_postprocessed_dir = info['expo_postprocessed_dir']
|
|
expo_postprocess_temp_dir = info['expo_postprocess_temp_dir']
|
|
|
|
def expo_get_step_by_name(image_name: str) -> int:
|
|
return int(image_name.split('_')[1].split('.')[0])
|
|
|
|
def expo_get_batch_by_name(image_name: str) -> int:
|
|
return int(image_name.split('_')[0])
|
|
|
|
def expo_shuffle_image_steps(image_dir) -> list[list[str]]:
|
|
images = {}
|
|
|
|
# Read and categorize image paths by step
|
|
for image_name in os.listdir(image_dir):
|
|
step = expo_get_step_by_name(image_name)
|
|
image_path = os.path.join(image_dir, image_name)
|
|
if step in images:
|
|
images[step].append(image_path)
|
|
else:
|
|
images[step] = [image_path]
|
|
|
|
# Shuffle the image paths for each step
|
|
for step_images in images.values():
|
|
random.shuffle(step_images)
|
|
|
|
# Convert the dictionary to a 2D list and find the minimum length
|
|
shuffled_images = list(images.values())
|
|
min_length = min(len(step_images) for step_images in shuffled_images)
|
|
|
|
# Crop each list to the minimum length
|
|
shuffled_images = [step_images[:min_length] for step_images in shuffled_images]
|
|
|
|
# finally, get the first image of each step, put them in a list, then the second image of each step, basically transpose the list
|
|
shuffled_images = list(map(list, zip(*shuffled_images)))
|
|
|
|
return shuffled_images
|
|
|
|
def expo_add_to_background_image(background_path: str, image_path: str, output_path: str, x: int, y: int) -> str:
|
|
# Use ImageMagick to blend the image with the background using Linear Light blend mode
|
|
command = [
|
|
"magick",
|
|
background_path,
|
|
image_path,
|
|
"-geometry", f"+{x}+{y}",
|
|
"-compose", "LinearLight",
|
|
"-composite",
|
|
output_path
|
|
]
|
|
subprocess.run(command, check=True)
|
|
|
|
return output_path
|
|
|
|
def expo_add_logo(background_path: str, image_path: str, output_path: str, x: int, y: int) -> str:
|
|
# Use ImageMagick to blend the image with the background using normal blend mode
|
|
command = [
|
|
"magick",
|
|
background_path,
|
|
image_path,
|
|
"-geometry", f"+{x}+{y}",
|
|
"-compose", "Over",
|
|
"-composite",
|
|
output_path
|
|
]
|
|
subprocess.run(command, check=True)
|
|
|
|
return output_path
|
|
|
|
def expo_resize_fencer(image_path: str, output_path: str, width: int, height: int) -> str:
|
|
# Use ImageMagick to resize the image
|
|
command = [
|
|
"magick",
|
|
image_path,
|
|
"-resize", f"{width}x{height}",
|
|
output_path
|
|
]
|
|
subprocess.run(command, check=True)
|
|
return output_path
|
|
|
|
def expo_resize_fencers(path_list: list[str], is_left: bool, width: int, height: int) -> list[str]:
|
|
output_dir = os.path.join(expo_postprocess_temp_dir, f"{'left' if is_left else 'right'}_fencers")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
resized_paths = [os.path.join(output_dir, f"{i}.png") for i in range(len(path_list))]
|
|
futures_to_index = {}
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
for i, image_path in enumerate(path_list):
|
|
output_path = resized_paths[i]
|
|
future = executor.submit(expo_resize_fencer, image_path, output_path, width, height)
|
|
futures_to_index[future] = i
|
|
|
|
for future in as_completed(futures_to_index):
|
|
index = futures_to_index[future]
|
|
resized_paths[index] = future.result()
|
|
|
|
return resized_paths
|
|
|
|
def expo_motion_blur_fencer(image_path: str, output_path: str, sigma: float, direction: float) -> str:
|
|
# Use ImageMagick to apply motion blur to the image with the specified direction
|
|
command = [
|
|
"magick",
|
|
image_path,
|
|
"-motion-blur", f"0x{sigma}+{direction}",
|
|
output_path
|
|
]
|
|
subprocess.run(command, check=True)
|
|
return output_path
|
|
|
|
def expo_motion_blur_fencers(path_list: list[str]) -> list[str]:
|
|
futures = []
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
for i, image_path in enumerate(path_list):
|
|
sigma = 15 - 15 * i / (len(path_list) - 1)
|
|
direction = 0
|
|
future = executor.submit(expo_motion_blur_fencer, image_path, image_path, sigma, direction)
|
|
futures.append(future)
|
|
|
|
for future in as_completed(futures):
|
|
future.result()
|
|
|
|
def expo_overlay_bg_gradient(image_path: str, output_path: str, bg_gradient_path: str) -> str:
|
|
# Use ImageMagick to overlay the image with a background gradient
|
|
command = [
|
|
"magick",
|
|
image_path,
|
|
bg_gradient_path,
|
|
"-compose", "Overlay",
|
|
"-composite",
|
|
output_path
|
|
]
|
|
subprocess.run(command, check=True)
|
|
return output_path
|
|
|
|
def expo_decrese_opacity(image_path: str, output_path: str, opacity: int) -> str:
|
|
# Use ImageMagick to decrease the opacity of the image
|
|
command = [
|
|
"magick",
|
|
image_path,
|
|
"-channel", "A",
|
|
"-evaluate", "multiply", f"{opacity/100}",
|
|
output_path
|
|
]
|
|
subprocess.run(command, check=True)
|
|
return output_path
|
|
|
|
def expo_decrese_opacities(path_list: list[str]) -> list[str]:
|
|
futures = []
|
|
with ThreadPoolExecutor() as executor:
|
|
for i, image_path in enumerate(path_list):
|
|
opacity = 30 + 70 * i / (len(path_list) - 1)
|
|
future = executor.submit(expo_decrese_opacity, image_path, image_path, opacity)
|
|
futures.append(future)
|
|
|
|
for future in as_completed(futures):
|
|
future.result()
|
|
|
|
def output_to_display_folder(output_image_paths):
|
|
# copy the output images to the display folder (expo_postprocessed_dir)
|
|
# the format is {session}_{candidate}.png, this session should be the max session from expo_postprocess_dir, the candidate should be the index of the output_image_paths
|
|
session = str(current_session()).zfill(5)
|
|
for i, image_path in enumerate(output_image_paths):
|
|
candidate = str(i).zfill(5)
|
|
output_image_path = os.path.join(expo_postprocessed_dir, f"{session}_{candidate}.png")
|
|
# copy the image
|
|
shutil.copy(image_path, output_image_path)
|
|
|
|
def current_session():
|
|
max_session = 0
|
|
|
|
for file in os.listdir(expo_postprocessed_dir):
|
|
if file.endswith(".png"):
|
|
session = int(file.split("_")[0])
|
|
if session > max_session:
|
|
max_session = session
|
|
|
|
return max_session + 1
|
|
|
|
def expo_postprocess_main():
|
|
print("Postprocessing")
|
|
os.makedirs(expo_postprocessed_dir, exist_ok=True)
|
|
os.makedirs(expo_postprocess_temp_dir, exist_ok=True)
|
|
|
|
left_fencer_raw_image_dir = os.path.join(expo_raw_sd_dir, 'left_fencer')
|
|
right_fencer_raw_image_dir = os.path.join(expo_raw_sd_dir, 'right_fencer')
|
|
|
|
if not os.path.exists(left_fencer_raw_image_dir) or not os.path.exists(right_fencer_raw_image_dir):
|
|
print("Raw images not found")
|
|
return
|
|
|
|
left_shuffled_images_paths = expo_shuffle_image_steps(left_fencer_raw_image_dir)
|
|
right_shuffled_images_paths = expo_shuffle_image_steps(right_fencer_raw_image_dir)
|
|
|
|
background_path = os.path.join(expo_postprocess_temp_dir, 'background.png')
|
|
logo_path = os.path.join(expo_postprocess_temp_dir, 'logo.png')
|
|
|
|
if not os.path.exists(background_path):
|
|
background = np.zeros((720, 1080, 3), dtype=np.uint8)
|
|
cv2.imwrite(background_path, background)
|
|
|
|
bg_gradient_folder = os.path.join(expo_postprocess_temp_dir, 'bg_gradient')
|
|
bg_gradients = [os.path.join(bg_gradient_folder, f"{i:02d}.png") for i in range(4)]
|
|
|
|
output_files = []
|
|
|
|
for i, candidate_list in enumerate(left_shuffled_images_paths):
|
|
left_fencer_paths = expo_resize_fencers(candidate_list, True, 450, 450)
|
|
expo_motion_blur_fencers(left_fencer_paths)
|
|
expo_decrese_opacities(left_fencer_paths)
|
|
|
|
temp_output_path = os.path.join(expo_postprocess_temp_dir, f"temp_{i}.png")
|
|
output_files.append(temp_output_path)
|
|
temp_background_path = background_path
|
|
for j, left_fencer_path in enumerate(left_fencer_paths):
|
|
x_position = 34 * math.pow(j, 1.3) - 100
|
|
y_position = 170
|
|
expo_add_to_background_image(temp_background_path, left_fencer_path, temp_output_path, x_position, y_position)
|
|
temp_background_path = temp_output_path
|
|
|
|
for i, candidate_list in enumerate(right_shuffled_images_paths):
|
|
if i > len(left_shuffled_images_paths) - 1:
|
|
break
|
|
right_fencer_paths = expo_resize_fencers(candidate_list, False, 450, 450)
|
|
expo_motion_blur_fencers(right_fencer_paths)
|
|
expo_decrese_opacities(right_fencer_paths)
|
|
|
|
temp_output_path = os.path.join(expo_postprocess_temp_dir, f"temp_{i}.png")
|
|
if not os.path.exists(temp_output_path):
|
|
break
|
|
|
|
for j, right_fencer_path in enumerate(right_fencer_paths):
|
|
x_position = 540 - (34 * math.pow(j, 1.3) - 170)
|
|
y_position = 170
|
|
expo_add_to_background_image(temp_output_path, right_fencer_path, temp_output_path, x_position, y_position)
|
|
temp_background_path = temp_output_path
|
|
|
|
expo_overlay_bg_gradient(temp_output_path, temp_output_path, bg_gradients[i % len(bg_gradients)])
|
|
expo_add_logo(temp_output_path, logo_path, temp_output_path, 650, 630)
|
|
|
|
output_to_display_folder(output_files)
|
|
|
|
if __name__ == '__main__':
|
|
expo_postprocess_main() |