2024-10-24 15:40:15 +00:00
import json
import math
import os
import random
import shutil
import subprocess
from concurrent . futures import ThreadPoolExecutor , as_completed
import cv2
import numpy as np
info = json . load ( open ( ' info.json ' ) )
expo_raw_sd_dir = info [ ' expo_raw_sd_dir ' ]
expo_postprocessed_dir = info [ ' expo_postprocessed_dir ' ]
expo_postprocess_temp_dir = info [ ' expo_postprocess_temp_dir ' ]
def expo_get_step_by_name ( image_name : str ) - > int :
return int ( image_name . split ( ' _ ' ) [ 1 ] . split ( ' . ' ) [ 0 ] )
def expo_get_batch_by_name ( image_name : str ) - > int :
return int ( image_name . split ( ' _ ' ) [ 0 ] )
2024-10-24 17:56:14 +00:00
def expo_shuffle_image_steps ( image_dir ) - > list [ list [ str ] ] :
2024-10-24 15:40:15 +00:00
images = { }
# Read and categorize image paths by step
2024-10-24 17:56:14 +00:00
for image_name in os . listdir ( image_dir ) :
2024-10-24 15:40:15 +00:00
step = expo_get_step_by_name ( image_name )
2024-10-24 17:56:14 +00:00
image_path = os . path . join ( image_dir , image_name )
2024-10-24 15:40:15 +00:00
if step in images :
images [ step ] . append ( image_path )
else :
images [ step ] = [ image_path ]
# Shuffle the image paths for each step
for step_images in images . values ( ) :
random . shuffle ( step_images )
# Convert the dictionary to a 2D list and find the minimum length
shuffled_images = list ( images . values ( ) )
min_length = min ( len ( step_images ) for step_images in shuffled_images )
# Crop each list to the minimum length
shuffled_images = [ step_images [ : min_length ] for step_images in shuffled_images ]
# finally, get the first image of each step, put them in a list, then the second image of each step, basically transpose the list
shuffled_images = list ( map ( list , zip ( * shuffled_images ) ) )
return shuffled_images
def expo_add_to_background_image ( background_path : str , image_path : str , output_path : str , x : int , y : int ) - > str :
# Use ImageMagick to blend the image with the background using Linear Light blend mode
command = [
" magick " ,
background_path ,
image_path ,
" -geometry " , f " + { x } + { y } " ,
" -compose " , " LinearLight " ,
" -composite " ,
output_path
]
subprocess . run ( command , check = True )
return output_path
def expo_add_logo ( background_path : str , image_path : str , output_path : str , x : int , y : int ) - > str :
# Use ImageMagick to blend the image with the background using normal blend mode
command = [
" magick " ,
background_path ,
image_path ,
" -geometry " , f " + { x } + { y } " ,
" -compose " , " Over " ,
" -composite " ,
output_path
]
subprocess . run ( command , check = True )
return output_path
def expo_resize_fencer ( image_path : str , output_path : str , width : int , height : int ) - > str :
# Use ImageMagick to resize the image
command = [
" magick " ,
image_path ,
" -resize " , f " { width } x { height } " ,
output_path
]
subprocess . run ( command , check = True )
return output_path
def expo_resize_fencers ( path_list : list [ str ] , is_left : bool , width : int , height : int ) - > list [ str ] :
output_dir = os . path . join ( expo_postprocess_temp_dir , f " { ' left ' if is_left else ' right ' } _fencers " )
os . makedirs ( output_dir , exist_ok = True )
resized_paths = [ os . path . join ( output_dir , f " { i } .png " ) for i in range ( len ( path_list ) ) ]
futures_to_index = { }
with ThreadPoolExecutor ( ) as executor :
for i , image_path in enumerate ( path_list ) :
output_path = resized_paths [ i ]
future = executor . submit ( expo_resize_fencer , image_path , output_path , width , height )
futures_to_index [ future ] = i
for future in as_completed ( futures_to_index ) :
index = futures_to_index [ future ]
resized_paths [ index ] = future . result ( )
return resized_paths
def expo_motion_blur_fencer ( image_path : str , output_path : str , sigma : float , direction : float ) - > str :
# Use ImageMagick to apply motion blur to the image with the specified direction
command = [
" magick " ,
image_path ,
" -motion-blur " , f " 0x { sigma } + { direction } " ,
output_path
]
subprocess . run ( command , check = True )
return output_path
2024-10-25 07:53:59 +00:00
def expo_motion_blur_fencers ( path_list : list [ str ] ) - > list [ str ] :
2024-10-24 15:40:15 +00:00
futures = [ ]
with ThreadPoolExecutor ( ) as executor :
for i , image_path in enumerate ( path_list ) :
sigma = 15 - 15 * i / ( len ( path_list ) - 1 )
2024-10-25 07:53:59 +00:00
direction = 0
2024-10-24 15:40:15 +00:00
future = executor . submit ( expo_motion_blur_fencer , image_path , image_path , sigma , direction )
futures . append ( future )
for future in as_completed ( futures ) :
future . result ( )
def expo_overlay_bg_gradient ( image_path : str , output_path : str , bg_gradient_path : str ) - > str :
# Use ImageMagick to overlay the image with a background gradient
command = [
" magick " ,
image_path ,
bg_gradient_path ,
" -compose " , " Overlay " ,
" -composite " ,
output_path
]
subprocess . run ( command , check = True )
return output_path
def expo_decrese_opacity ( image_path : str , output_path : str , opacity : int ) - > str :
# Use ImageMagick to decrease the opacity of the image
command = [
" magick " ,
image_path ,
" -channel " , " A " ,
" -evaluate " , " multiply " , f " { opacity / 100 } " ,
output_path
]
subprocess . run ( command , check = True )
return output_path
def expo_decrese_opacities ( path_list : list [ str ] ) - > list [ str ] :
futures = [ ]
with ThreadPoolExecutor ( ) as executor :
for i , image_path in enumerate ( path_list ) :
opacity = 30 + 70 * i / ( len ( path_list ) - 1 )
future = executor . submit ( expo_decrese_opacity , image_path , image_path , opacity )
futures . append ( future )
for future in as_completed ( futures ) :
future . result ( )
def output_to_display_folder ( output_image_paths ) :
# copy the output images to the display folder (expo_postprocessed_dir)
# the format is {session}_{candidate}.png, this session should be the max session from expo_postprocess_dir, the candidate should be the index of the output_image_paths
session = str ( current_session ( ) ) . zfill ( 5 )
for i , image_path in enumerate ( output_image_paths ) :
candidate = str ( i ) . zfill ( 5 )
output_image_path = os . path . join ( expo_postprocessed_dir , f " { session } _ { candidate } .png " )
# copy the image
shutil . copy ( image_path , output_image_path )
def current_session ( ) :
max_session = 0
for file in os . listdir ( expo_postprocessed_dir ) :
if file . endswith ( " .png " ) :
session = int ( file . split ( " _ " ) [ 0 ] )
if session > max_session :
max_session = session
return max_session + 1
2024-10-24 17:56:14 +00:00
def expo_postprocess_main ( ) :
2024-10-24 15:40:15 +00:00
print ( " Postprocessing " )
2024-10-25 03:43:28 +00:00
os . makedirs ( expo_postprocessed_dir , exist_ok = True )
2024-10-24 15:40:15 +00:00
os . makedirs ( expo_postprocess_temp_dir , exist_ok = True )
2024-10-24 17:56:14 +00:00
left_fencer_raw_image_dir = os . path . join ( expo_raw_sd_dir , ' left_fencer ' )
right_fencer_raw_image_dir = os . path . join ( expo_raw_sd_dir , ' right_fencer ' )
if not os . path . exists ( left_fencer_raw_image_dir ) or not os . path . exists ( right_fencer_raw_image_dir ) :
print ( " Raw images not found " )
return
left_shuffled_images_paths = expo_shuffle_image_steps ( left_fencer_raw_image_dir )
right_shuffled_images_paths = expo_shuffle_image_steps ( right_fencer_raw_image_dir )
2024-10-24 15:40:15 +00:00
background_path = os . path . join ( expo_postprocess_temp_dir , ' background.png ' )
logo_path = os . path . join ( expo_postprocess_temp_dir , ' logo.png ' )
2024-10-24 17:56:14 +00:00
2024-10-24 15:40:15 +00:00
if not os . path . exists ( background_path ) :
background = np . zeros ( ( 720 , 1080 , 3 ) , dtype = np . uint8 )
cv2 . imwrite ( background_path , background )
bg_gradient_folder = os . path . join ( expo_postprocess_temp_dir , ' bg_gradient ' )
bg_gradients = [ os . path . join ( bg_gradient_folder , f " { i : 02d } .png " ) for i in range ( 4 ) ]
output_files = [ ]
2024-10-24 17:56:14 +00:00
for i , candidate_list in enumerate ( left_shuffled_images_paths ) :
2024-10-25 07:53:59 +00:00
left_fencer_paths = expo_resize_fencers ( candidate_list , True , 450 , 450 )
expo_motion_blur_fencers ( left_fencer_paths )
2024-10-24 15:40:15 +00:00
expo_decrese_opacities ( left_fencer_paths )
temp_output_path = os . path . join ( expo_postprocess_temp_dir , f " temp_ { i } .png " )
output_files . append ( temp_output_path )
temp_background_path = background_path
for j , left_fencer_path in enumerate ( left_fencer_paths ) :
2024-10-25 07:53:59 +00:00
x_position = 34 * math . pow ( j , 1.3 ) - 100
y_position = 170
2024-10-24 15:40:15 +00:00
expo_add_to_background_image ( temp_background_path , left_fencer_path , temp_output_path , x_position , y_position )
temp_background_path = temp_output_path
2024-10-24 17:56:14 +00:00
for i , candidate_list in enumerate ( right_shuffled_images_paths ) :
if i > len ( left_shuffled_images_paths ) - 1 :
break
2024-10-25 07:53:59 +00:00
right_fencer_paths = expo_resize_fencers ( candidate_list , False , 450 , 450 )
expo_motion_blur_fencers ( right_fencer_paths )
2024-10-24 17:56:14 +00:00
expo_decrese_opacities ( right_fencer_paths )
temp_output_path = os . path . join ( expo_postprocess_temp_dir , f " temp_ { i } .png " )
if not os . path . exists ( temp_output_path ) :
break
for j , right_fencer_path in enumerate ( right_fencer_paths ) :
2024-10-25 07:53:59 +00:00
x_position = 540 - ( 34 * math . pow ( j , 1.3 ) - 170 )
y_position = 170
2024-10-24 17:56:14 +00:00
expo_add_to_background_image ( temp_output_path , right_fencer_path , temp_output_path , x_position , y_position )
temp_background_path = temp_output_path
2024-10-24 15:40:15 +00:00
2024-10-24 17:56:14 +00:00
expo_overlay_bg_gradient ( temp_output_path , temp_output_path , bg_gradients [ i % len ( bg_gradients ) ] )
2024-10-25 07:53:59 +00:00
expo_add_logo ( temp_output_path , logo_path , temp_output_path , 650 , 630 )
2024-10-24 15:40:15 +00:00
output_to_display_folder ( output_files )
if __name__ == ' __main__ ' :
2024-10-24 17:56:14 +00:00
expo_postprocess_main ( )