This commit is contained in:
Jason Kneen 2024-08-15 01:36:18 +01:00 committed by GitHub
commit 58b838c96a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 368 additions and 161 deletions

3
.gitignore vendored
View File

@ -22,3 +22,6 @@ models/inswapper_128.onnx
models/GFPGANv1.4.pth
*.onnx
models/DMDNet.pth
.venv/
tf_env/
.tf_env/

View File

@ -74,12 +74,13 @@ python run.py --execution-provider coreml
```
### [](https://github.com/s0md3v/roop/wiki/2.-Acceleration#coreml-execution-provider-apple-legacy)CoreML Execution Provider (Apple Legacy)
Metal support has been added for improved performance on macOS devices.
1. Install dependencies:
```
pip uninstall onnxruntime onnxruntime-coreml
pip install onnxruntime-coreml==1.13.1
pip uninstall onnxruntime onnxruntime-silicon
pip install onnxruntime-silicon==1.13.1
```

View File

@ -5,6 +5,8 @@ if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# Force TensorFlow to use Metal
os.environ['TENSORFLOW_METAL'] = '1'
import warnings
from typing import List
import platform
@ -27,6 +29,40 @@ if 'ROCMExecutionProvider' in modules.globals.execution_providers:
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
def get_system_memory() -> int:
"""
Get the total system memory in GB.
Returns:
int: Total system memory in GB.
"""
if platform.system().lower() == 'darwin':
try:
import psutil
return psutil.virtual_memory().total // (1024 ** 3)
except ImportError:
# If psutil is not available, return a default value
return 16 # Assuming 16GB as a default for macOS
else:
# For other systems, we can use psutil if available, or implement system-specific methods
try:
import psutil
return psutil.virtual_memory().total // (1024 ** 3)
except ImportError:
# If psutil is not available, return a default value
return 8 # Assuming 8GB as a default for other systems
def suggest_max_memory() -> int:
"""
Suggest the maximum memory to use based on the system's total memory.
Returns:
int: Suggested maximum memory in GB.
"""
total_memory = get_system_memory()
# Suggest using 70% of total memory, but not more than 64GB
suggested_memory = min(int(total_memory * 0.7), 64)
return max(suggested_memory, 4) # Ensure at least 4GB is suggested
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
@ -35,23 +71,19 @@ def parse_args() -> None:
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=True)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=True)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libvpx-vp9', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=1, choices=range(52), metavar='[0-51]')
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['coreml'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('--video-processor', help='video processor to use', dest='video_processor', default='cv2', choices=['cv2', 'ffmpeg'])
program.add_argument('--model', help='model to use for face swapping', dest='model', default='inswapper_128.onnx')
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
# register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
program.add_argument('--gpu-threads', help=argparse.SUPPRESS, dest='gpu_threads_deprecated', type=int)
args = program.parse_args()
modules.globals.source_path = args.source_path
@ -66,10 +98,11 @@ def parse_args() -> None:
modules.globals.video_encoder = args.video_encoder
modules.globals.video_quality = args.video_quality
modules.globals.max_memory = args.max_memory
modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_providers = ['CoreMLExecutionProvider'] # Force CoreML
modules.globals.execution_threads = args.execution_threads
modules.globals.video_processor = args.video_processor
modules.globals.model = args.model
#for ENHANCER tumbler:
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
@ -77,77 +110,32 @@ def parse_args() -> None:
modules.globals.nsfw = False
# translate deprecated args
if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated
if args.gpu_vendor_deprecated == 'apple':
print('\033[33mArgument --gpu-vendor apple is deprecated. Use --execution-provider coreml instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['coreml'])
if args.gpu_vendor_deprecated == 'nvidia':
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.gpu_threads_deprecated
def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin':
return 4
return 16
return 6
return 4
def suggest_execution_providers() -> List[str]:
return encode_execution_providers(onnxruntime.get_available_providers())
return ['coreml'] # Only suggest CoreML
def suggest_execution_threads() -> int:
if 'DmlExecutionProvider' in modules.globals.execution_providers:
return 1
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
return 1
return 8
if platform.system().lower() == 'darwin':
return 12
return 4
def limit_resources() -> None:
# prevent tensorflow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
# limit memory usage
if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else:
import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
memory = modules.globals.max_memory * 1024 ** 6
import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
torch.cuda.empty_cache()
pass # No need to release CUDA resources
def pre_check() -> bool:
@ -169,23 +157,28 @@ def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
# process image to image
if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw == False:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
return
# process image to videos
process_image()
else:
process_video()
def process_image():
if modules.globals.nsfw == False:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
def process_video():
if modules.globals.nsfw == False:
from modules.predicter import predict_video
if predict_video(modules.globals.target_path):
@ -193,13 +186,14 @@ def start() -> None:
update_status('Creating temp resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
if modules.globals.video_processor == 'cv2':
extract_frames_cv2(modules.globals.target_path)
else:
extract_frames_ffmpeg(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources()
# handles fps
if modules.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path)
@ -208,7 +202,6 @@ def start() -> None:
else:
update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path)
# handle audio
if modules.globals.keep_audio:
if modules.globals.keep_fps:
update_status('Restoring audio...')
@ -217,7 +210,6 @@ def start() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path)
else:
move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
@ -225,6 +217,30 @@ def start() -> None:
update_status('Processing to video failed!')
def extract_frames_cv2(target_path: str) -> None:
import cv2
capture = cv2.VideoCapture(target_path)
frame_num = 0
while True:
success, frame = capture.read()
if not success:
break
cv2.imwrite(f'{get_temp_frame_paths(target_path)}/%04d.png' % frame_num, frame)
frame_num += 1
capture.release()
def extract_frames_ffmpeg(target_path: str) -> None:
import ffmpeg
(
ffmpeg
.input(target_path)
.output(f'{get_temp_frame_paths(target_path)}/%04d.png', start_number=0)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
def destroy() -> None:
if modules.globals.target_path:
clean_temp(modules.globals.target_path)
@ -239,6 +255,69 @@ def run() -> None:
if not frame_processor.pre_check():
return
limit_resources()
print(f"ONNX Runtime version: {onnxruntime.__version__}")
print(f"Available execution providers: {onnxruntime.get_available_providers()}")
print(f"Selected execution provider: CoreMLExecutionProvider")
# Configure ONNX Runtime to use only CoreML
onnxruntime.set_default_logger_severity(3) # Set to WARNING level
options = onnxruntime.SessionOptions()
options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
# Test CoreML with a dummy model
try:
import numpy as np
from onnx import helper, TensorProto
# Create a simple ONNX model
X = helper.make_tensor_value_info('input', TensorProto.FLOAT, [1, 3, 224, 224])
Y = helper.make_tensor_value_info('output', TensorProto.FLOAT, [1, 3, 224, 224])
node = helper.make_node('Identity', ['input'], ['output'])
graph = helper.make_graph([node], 'test_model', [X], [Y])
model = helper.make_model(graph)
# Save the model
model_path = 'test_model.onnx'
with open(model_path, 'wb') as f:
f.write(model.SerializeToString())
# Create a CoreML session
session = onnxruntime.InferenceSession(model_path, options, providers=['CoreMLExecutionProvider'])
# Run inference
input_data = np.random.rand(1, 3, 224, 224).astype(np.float32)
output = session.run(None, {'input': input_data})
print("CoreML init successful and being used")
print(f"Input shape: {input_data.shape}, Output shape: {output[0].shape}")
# Clean up
os.remove(model_path)
except Exception as e:
print(f"Error testing CoreML: {str(e)}")
print("The application may not be able to use GPU acceleration")
# Configure TensorFlow to use Metal
try:
tf_devices = tensorflow.config.list_physical_devices()
print("TensorFlow devices:", tf_devices)
if any('GPU' in device.name for device in tf_devices):
print("TensorFlow is using GPU (Metal)")
else:
print("TensorFlow is not using GPU")
except Exception as e:
print(f"Error configuring TensorFlow: {str(e)}")
# Configure PyTorch to use MPS (Metal Performance Shaders)
try:
if torch.backends.mps.is_available():
print("PyTorch is using MPS (Metal Performance Shaders)")
torch.set_default_device('mps')
else:
print("PyTorch MPS is not available")
except Exception as e:
print(f"Error configuring PyTorch: {str(e)}")
if modules.globals.headless:
start()
else:

View File

@ -24,6 +24,7 @@ execution_providers: List[str] = []
execution_threads = None
headless = None
log_level = 'error'
model = None
fp_ui: Dict[str, bool] = {}
nsfw = None
camera_input_combobox = None

View File

@ -2,6 +2,7 @@ from typing import Any, List
import cv2
import insightface
import threading
import numpy as np
import modules.globals
import modules.processors.frame.core
@ -17,7 +18,7 @@ NAME = 'DLC.FACE-SWAPPER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'])
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/' + modules.globals.model])
return True
@ -39,13 +40,24 @@ def get_face_swapper() -> Any:
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = resolve_relative_path('../models/inswapper_128_fp16.onnx')
model_path = resolve_relative_path('../models/' + modules.globals.model)
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers)
return FACE_SWAPPER
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
try:
print("Debug: Starting face swap")
print(f"Debug: temp_frame shape: {temp_frame.shape}, dtype: {temp_frame.dtype}")
print(f"Debug: target_face keys: {target_face.keys()}")
print(f"Debug: source_face keys: {source_face.keys()}")
result = get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
print("Debug: Face swap completed successfully")
return result
except Exception as e:
print(f"Error in swap_face: {str(e)}")
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:

View File

@ -1,9 +1,10 @@
import os
import time
import webbrowser
import customtkinter as ctk
from typing import Callable, Tuple
import cv2
from PIL import Image, ImageOps
from PIL import Image, ImageOps, ImageDraw, ImageFont
import numpy as np
import modules.globals
import modules.metadata
@ -17,8 +18,8 @@ ROOT_HEIGHT = 700
ROOT_WIDTH = 600
PREVIEW = None
PREVIEW_MAX_HEIGHT = 700
PREVIEW_MAX_WIDTH = 1200
PREVIEW_MAX_HEIGHT = 720
PREVIEW_MAX_WIDTH = 1280
RECENT_DIRECTORY_SOURCE = None
RECENT_DIRECTORY_TARGET = None
@ -75,7 +76,6 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7)
@ -92,23 +92,36 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
# nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()))
# nsfw_switch.place(relx=0.6, rely=0.7)
video_processor_label = ctk.CTkLabel(root, text="Video Processor:")
video_processor_label.place(relx=0.1, rely=0.75)
video_processor_var = ctk.StringVar(value=modules.globals.video_processor)
video_processor_menu = ctk.CTkOptionMenu(root, variable=video_processor_var, values=["cv2", "ffmpeg"], command=lambda choice: setattr(modules.globals, 'video_processor', choice))
video_processor_menu.place(relx=0.3, rely=0.75)
model_label = ctk.CTkLabel(root, text="Model:")
model_label.place(relx=0.1, rely=0.8)
model_var = ctk.StringVar(value=modules.globals.model)
model_entry = ctk.CTkEntry(root, textvariable=model_var)
model_entry.place(relx=0.3, rely=0.8, relwidth=0.4)
model_entry.bind("<FocusOut>", lambda event: setattr(modules.globals, 'model', model_var.get()))
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
start_button.place(relx=0.15, rely=0.85, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button.place(relx=0.4, rely=0.85, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button.place(relx=0.65, rely=0.85, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview())
live_button.place(relx=0.40, rely=0.86, relwidth=0.2, relheight=0.05)
live_button.place(relx=0.40, rely=0.91, relwidth=0.2, relheight=0.05)
status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
status_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.place(relx=0.1, rely=0.98, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
@ -200,17 +213,32 @@ def render_image_preview(image_path: str, size: Tuple[int, int]) -> ctk.CTkImage
def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: int = 0) -> ctk.CTkImage:
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
if modules.globals.video_processor == 'cv2':
import cv2
capture = cv2.VideoCapture(video_path)
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
else:
import ffmpeg
probe = ffmpeg.probe(video_path)
time = float(probe['streams'][0]['duration']) // 2
out, _ = (
ffmpeg
.input(video_path, ss=time)
.filter('scale', size[0], size[1])
.output('pipe:', vframes=1, format='rawvideo', pix_fmt='rgb24')
.run(capture_stdout=True)
)
image = Image.frombytes('RGB', size, out)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
def toggle_preview() -> None:
@ -241,14 +269,20 @@ def update_preview(frame_number: int = 0) -> None:
quit()
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(modules.globals.source_path)),
get_one_face(modules.globals.source_path),
temp_frame
)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = Image.fromarray(temp_frame)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
def draw_fps(image, fps):
draw = ImageDraw.Draw(image)
font = ImageFont.truetype("/System/Library/Fonts/Supplemental/Arial.ttf", 36)
draw.text((10, 10), f"FPS: {fps:.2f}", font=font, fill=(255, 255, 255))
return image
def webcam_preview():
if modules.globals.source_path is None:
# No image selected
@ -256,12 +290,31 @@ def webcam_preview():
global preview_label, PREVIEW
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
PREVIEW_MAX_WIDTH = 960
PREVIEW_MAX_HEIGHT = 540
if modules.globals.video_processor == 'cv2':
import cv2
cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary)
if not cap.isOpened():
update_status("Error: Unable to open webcam. Please check your camera connection.")
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 30) # Set the frame rate of the webcam
else:
import ffmpeg
import subprocess
command = [
'ffmpeg',
'-f', 'avfoundation',
'-framerate', '30',
'-video_size', '1280x720',
'-i', '0:none',
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
'-'
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
preview_label.configure(image=None) # Reset the preview image before startup
@ -269,25 +322,45 @@ def webcam_preview():
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None # Initialize variable for the selected face image
# Load the source image
if modules.globals.video_processor == 'cv2':
import cv2
source_image = cv2.imread(modules.globals.source_path)
source_image = cv2.cvtColor(source_image, cv2.COLOR_BGR2RGB)
else:
source_image = np.array(Image.open(modules.globals.source_path))
source_face = get_one_face(source_image)
prev_frame_time = time.time()
fps = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
temp_frame = frame.copy() #Create a copy of the frame
if modules.globals.video_processor == 'cv2':
ret, frame = cap.read()
if not ret:
break
temp_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
in_bytes = process.stdout.read(1280 * 720 * 3)
if not in_bytes:
break
temp_frame = np.frombuffer(in_bytes, np.uint8).reshape([720, 1280, 3])
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
temp_frame = frame_processor.process_frame(source_face, temp_frame)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
image = Image.fromarray(image)
# Calculate FPS
current_time = time.time()
fps = 1 / (current_time - prev_frame_time)
prev_frame_time = current_time
image = Image.fromarray(temp_frame)
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
# Draw FPS on the image
image = draw_fps(image, fps)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
@ -295,5 +368,8 @@ def webcam_preview():
if PREVIEW.state() == 'withdrawn':
break
cap.release()
if modules.globals.video_processor == 'cv2':
cap.release()
else:
process.terminate()
PREVIEW.withdraw() # Close preview window when loop is finished

View File

@ -9,6 +9,7 @@ import urllib
from pathlib import Path
from typing import List, Any
from tqdm import tqdm
import cv2
import modules.globals
@ -26,8 +27,8 @@ def run_ffmpeg(args: List[str]) -> bool:
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
return True
except Exception:
pass
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e.output.decode().strip()}") # Capture and print the error message
return False
@ -44,7 +45,34 @@ def detect_fps(target_path: str) -> float:
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
cap = cv2.VideoCapture(target_path)
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Save the frame
cv2.imwrite(os.path.join(temp_directory_path, f'{frame_count:04d}.png'), frame)
frame_count += 1
cap.release()
def extract_frames_ffmpeg(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
os.makedirs(temp_directory_path, exist_ok=True) # Ensure output directory exists
try:
(
ffmpeg
.input(target_path)
.output(os.path.join(temp_directory_path, '%04d.png'), start_number=0)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except Exception as e:
print(f"Error running FFmpeg: {str(e)}")
def create_video(target_path: str, fps: float = 30.0) -> None:

View File

@ -1,23 +1,30 @@
--extra-index-url https://download.pytorch.org/whl/cu118
# Deep Live Cam requirements
numpy==1.23.5
opencv-python==4.8.1.78
onnx==1.16.0
insightface==0.7.3
psutil==5.9.8
tk==0.1.0
customtkinter==5.2.2
# Core dependencies
numpy==1.26.4
onnxruntime-silicon==1.16.3
pillow==9.5.0
torch==2.0.1+cu118; sys_platform != 'darwin'
torch==2.0.1; sys_platform == 'darwin'
torchvision==0.15.2+cu118; sys_platform != 'darwin'
torchvision==0.15.2; sys_platform == 'darwin'
onnxruntime==1.18.0; sys_platform == 'darwin' and platform_machine != 'arm64'
onnxruntime-silicon==1.16.3; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.18.0; sys_platform != 'darwin'
tensorflow==2.13.0rc1; sys_platform == 'darwin'
tensorflow==2.12.1; sys_platform != 'darwin'
opennsfw2==0.10.2
protobuf==4.23.2
insightface==0.7.3
torch==2.1.0
tensorflow-macos==2.16.2
tensorflow-metal==1.1.0
# Image processing
scikit-image==0.24.0
matplotlib==3.9.1.post1
# Machine learning
scikit-learn==1.5.1
# Utilities
tqdm==4.66.4
gfpgan==1.3.8
requests==2.32.3
prettytable==3.11.0
# Video processing (optional)
opencv-python==4.8.1.78 # Optional: for cv2 video processing
ffmpeg-python==0.2.0 # For ffmpeg video processing
# Optional dependencies (comment out if not needed)
# albumentations==1.4.13
# coloredlogs==15.0.1