import json import math import os import random import shutil import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed import cv2 import numpy as np info = json.load(open('info.json')) expo_raw_sd_dir = info['expo_raw_sd_dir'] expo_postprocessed_dir = info['expo_postprocessed_dir'] expo_postprocess_temp_dir = info['expo_postprocess_temp_dir'] def expo_get_step_by_name(image_name: str) -> int: return int(image_name.split('_')[1].split('.')[0]) def expo_get_batch_by_name(image_name: str) -> int: return int(image_name.split('_')[0]) def expo_shuffle_image_steps(image_dir) -> list[list[str]]: images = {} # Read and categorize image paths by step for image_name in os.listdir(image_dir): step = expo_get_step_by_name(image_name) image_path = os.path.join(image_dir, image_name) if step in images: images[step].append(image_path) else: images[step] = [image_path] # Shuffle the image paths for each step for step_images in images.values(): random.shuffle(step_images) # Convert the dictionary to a 2D list and find the minimum length shuffled_images = list(images.values()) min_length = min(len(step_images) for step_images in shuffled_images) # Crop each list to the minimum length shuffled_images = [step_images[:min_length] for step_images in shuffled_images] # finally, get the first image of each step, put them in a list, then the second image of each step, basically transpose the list shuffled_images = list(map(list, zip(*shuffled_images))) return shuffled_images def expo_add_to_background_image(background_path: str, image_path: str, output_path: str, x: int, y: int) -> str: # Use ImageMagick to blend the image with the background using Linear Light blend mode command = [ "magick", background_path, image_path, "-geometry", f"+{x}+{y}", "-compose", "LinearLight", "-composite", output_path ] subprocess.run(command, check=True) return output_path def expo_add_logo(background_path: str, image_path: str, output_path: str, x: int, y: int) -> str: # Use ImageMagick to blend the image with the background using normal blend mode command = [ "magick", background_path, image_path, "-geometry", f"+{x}+{y}", "-compose", "Over", "-composite", output_path ] subprocess.run(command, check=True) return output_path def expo_resize_fencer(image_path: str, output_path: str, width: int, height: int) -> str: # Use ImageMagick to resize the image command = [ "magick", image_path, "-resize", f"{width}x{height}", output_path ] subprocess.run(command, check=True) return output_path def expo_resize_fencers(path_list: list[str], is_left: bool, width: int, height: int) -> list[str]: output_dir = os.path.join(expo_postprocess_temp_dir, f"{'left' if is_left else 'right'}_fencers") os.makedirs(output_dir, exist_ok=True) resized_paths = [os.path.join(output_dir, f"{i}.png") for i in range(len(path_list))] futures_to_index = {} with ThreadPoolExecutor() as executor: for i, image_path in enumerate(path_list): output_path = resized_paths[i] future = executor.submit(expo_resize_fencer, image_path, output_path, width, height) futures_to_index[future] = i for future in as_completed(futures_to_index): index = futures_to_index[future] resized_paths[index] = future.result() return resized_paths def expo_motion_blur_fencer(image_path: str, output_path: str, sigma: float, direction: float) -> str: # Use ImageMagick to apply motion blur to the image with the specified direction command = [ "magick", image_path, "-motion-blur", f"0x{sigma}+{direction}", output_path ] subprocess.run(command, check=True) return output_path def expo_motion_blur_fencers(path_list: list[str]) -> list[str]: futures = [] with ThreadPoolExecutor() as executor: for i, image_path in enumerate(path_list): sigma = 15 - 15 * i / (len(path_list) - 1) direction = 0 future = executor.submit(expo_motion_blur_fencer, image_path, image_path, sigma, direction) futures.append(future) for future in as_completed(futures): future.result() def expo_overlay_bg_gradient(image_path: str, output_path: str, bg_gradient_path: str) -> str: # Use ImageMagick to overlay the image with a background gradient command = [ "magick", image_path, bg_gradient_path, "-compose", "Overlay", "-composite", output_path ] subprocess.run(command, check=True) return output_path def expo_decrese_opacity(image_path: str, output_path: str, opacity: int) -> str: # Use ImageMagick to decrease the opacity of the image command = [ "magick", image_path, "-channel", "A", "-evaluate", "multiply", f"{opacity/100}", output_path ] subprocess.run(command, check=True) return output_path def expo_decrese_opacities(path_list: list[str]) -> list[str]: futures = [] with ThreadPoolExecutor() as executor: for i, image_path in enumerate(path_list): opacity = 30 + 70 * i / (len(path_list) - 1) future = executor.submit(expo_decrese_opacity, image_path, image_path, opacity) futures.append(future) for future in as_completed(futures): future.result() def output_to_display_folder(output_image_paths): # copy the output images to the display folder (expo_postprocessed_dir) # the format is {session}_{candidate}.png, this session should be the max session from expo_postprocess_dir, the candidate should be the index of the output_image_paths session = str(current_session()).zfill(5) for i, image_path in enumerate(output_image_paths): candidate = str(i).zfill(5) output_image_path = os.path.join(expo_postprocessed_dir, f"{session}_{candidate}.png") # copy the image shutil.copy(image_path, output_image_path) def current_session(): max_session = 0 for file in os.listdir(expo_postprocessed_dir): if file.endswith(".png"): session = int(file.split("_")[0]) if session > max_session: max_session = session return max_session + 1 def expo_postprocess_main(): print("Postprocessing") os.makedirs(expo_postprocessed_dir, exist_ok=True) os.makedirs(expo_postprocess_temp_dir, exist_ok=True) left_fencer_raw_image_dir = os.path.join(expo_raw_sd_dir, 'left_fencer') right_fencer_raw_image_dir = os.path.join(expo_raw_sd_dir, 'right_fencer') if not os.path.exists(left_fencer_raw_image_dir) or not os.path.exists(right_fencer_raw_image_dir): print("Raw images not found") return left_shuffled_images_paths = expo_shuffle_image_steps(left_fencer_raw_image_dir) right_shuffled_images_paths = expo_shuffle_image_steps(right_fencer_raw_image_dir) background_path = os.path.join(expo_postprocess_temp_dir, 'background.png') logo_path = os.path.join(expo_postprocess_temp_dir, 'logo.png') if not os.path.exists(background_path): background = np.zeros((720, 1080, 3), dtype=np.uint8) cv2.imwrite(background_path, background) bg_gradient_folder = os.path.join(expo_postprocess_temp_dir, 'bg_gradient') bg_gradients = [os.path.join(bg_gradient_folder, f"{i:02d}.png") for i in range(4)] output_files = [] for i, candidate_list in enumerate(left_shuffled_images_paths): left_fencer_paths = expo_resize_fencers(candidate_list, True, 450, 450) expo_motion_blur_fencers(left_fencer_paths) expo_decrese_opacities(left_fencer_paths) temp_output_path = os.path.join(expo_postprocess_temp_dir, f"temp_{i}.png") output_files.append(temp_output_path) temp_background_path = background_path for j, left_fencer_path in enumerate(left_fencer_paths): x_position = 34 * math.pow(j, 1.3) - 100 y_position = 170 expo_add_to_background_image(temp_background_path, left_fencer_path, temp_output_path, x_position, y_position) temp_background_path = temp_output_path for i, candidate_list in enumerate(right_shuffled_images_paths): if i > len(left_shuffled_images_paths) - 1: break right_fencer_paths = expo_resize_fencers(candidate_list, False, 450, 450) expo_motion_blur_fencers(right_fencer_paths) expo_decrese_opacities(right_fencer_paths) temp_output_path = os.path.join(expo_postprocess_temp_dir, f"temp_{i}.png") if not os.path.exists(temp_output_path): break for j, right_fencer_path in enumerate(right_fencer_paths): x_position = 540 - (34 * math.pow(j, 1.3) - 170) y_position = 170 expo_add_to_background_image(temp_output_path, right_fencer_path, temp_output_path, x_position, y_position) temp_background_path = temp_output_path expo_overlay_bg_gradient(temp_output_path, temp_output_path, bg_gradients[i % len(bg_gradients)]) expo_add_logo(temp_output_path, logo_path, temp_output_path, 650, 630) output_to_display_folder(output_files) if __name__ == '__main__': expo_postprocess_main()