""" Bake-and-swap blur operators for VSE. This module bakes masked blur into a regular video file using the inference server, then swaps the active strip's source filepath to the baked result. """ import os import bpy from bpy.props import IntProperty from bpy.types import Operator from ..core.async_bake_generator import get_bake_generator from ..core.async_generator import get_generator as get_mask_generator from ..core.utils import get_detections_path_for_strip KEY_ORIGINAL = "facemask_original_filepath" KEY_BAKED = "facemask_baked_filepath" KEY_MODE = "facemask_source_mode" KEY_FORMAT = "facemask_bake_format" KEY_BLUR_SIZE = "facemask_bake_blur_size" KEY_DISPLAY_SCALE = "facemask_bake_display_scale" FORMAT_EXT = { "MP4": "mp4", "AVI": "avi", "MOV": "mov", } def _output_path(video_strip, detections_path: str, fmt: str) -> str: ext = FORMAT_EXT.get(fmt, "mp4") out_dir = os.path.dirname(detections_path) safe_name = video_strip.name.replace("/", "_").replace("\\", "_") return os.path.join(out_dir, f"{safe_name}_blurred.{ext}") def _reload_movie_strip(strip): if hasattr(strip, "reload"): try: strip.reload() except Exception: pass def _set_strip_source(strip, filepath: str): strip.filepath = filepath _reload_movie_strip(strip) class SEQUENCER_OT_bake_and_swap_blur_source(Operator): """Bake masked blur and replace active strip source with baked video.""" bl_idname = "sequencer.bake_and_swap_blur_source" bl_label = "Bake & Swap Source" bl_description = "Bake masked blur to video and swap active strip source" bl_options = {"REGISTER", "UNDO"} @classmethod def poll(cls, context): if not context.scene.sequence_editor: return False # Prevent overlapping heavy tasks if get_mask_generator().is_running: return False if get_bake_generator().is_running: return False strip = context.scene.sequence_editor.active_strip return bool(strip and strip.type == "MOVIE") def execute(self, context): seq_editor = context.scene.sequence_editor scene = context.scene video_strip = seq_editor.active_strip video_path = bpy.path.abspath(video_strip.filepath) detections_path = get_detections_path_for_strip(video_strip.name) if not os.path.exists(video_path): self.report({"ERROR"}, f"Source video not found: {video_path}") return {"CANCELLED"} if not os.path.exists(detections_path): self.report({"ERROR"}, f"Detection cache not found: {detections_path}") return {"CANCELLED"} bake_format = scene.facemask_bake_format output_path = _output_path(video_strip, detections_path, bake_format) blur_size = int(scene.facemask_bake_blur_size) display_scale = float(scene.facemask_bake_display_scale) # Reuse baked cache when parameters match and file still exists. cached_baked_path = video_strip.get(KEY_BAKED) cached_format = video_strip.get(KEY_FORMAT) cached_blur_size = video_strip.get(KEY_BLUR_SIZE) cached_display_scale = video_strip.get(KEY_DISPLAY_SCALE) try: cached_blur_size_int = int(cached_blur_size) except (TypeError, ValueError): cached_blur_size_int = None try: cached_display_scale_f = float(cached_display_scale) except (TypeError, ValueError): cached_display_scale_f = None if ( cached_baked_path and os.path.exists(cached_baked_path) and cached_format == bake_format and cached_blur_size_int == blur_size and cached_display_scale_f == display_scale ): if video_strip.get(KEY_MODE) != "baked": video_strip[KEY_MODE] = "baked" _set_strip_source(video_strip, cached_baked_path) self.report({"INFO"}, "Using cached baked blur") return {"FINISHED"} bake_generator = get_bake_generator() wm = context.window_manager def on_complete(status, data): strip = context.scene.sequence_editor.strips.get(video_strip.name) if not strip: print(f"[FaceMask] Bake complete but strip no longer exists: {video_strip.name}") return if status == "done": result_path = data or output_path original_path = strip.get(KEY_ORIGINAL) current_mode = strip.get(KEY_MODE, "original") if not original_path or current_mode != "baked": strip[KEY_ORIGINAL] = video_path strip[KEY_BAKED] = result_path strip[KEY_MODE] = "baked" strip[KEY_FORMAT] = bake_format strip[KEY_BLUR_SIZE] = blur_size strip[KEY_DISPLAY_SCALE] = display_scale _set_strip_source(strip, result_path) print(f"[FaceMask] Bake completed and source swapped: {result_path}") elif status == "error": print(f"[FaceMask] Bake failed: {data}") elif status == "cancelled": print("[FaceMask] Bake cancelled") for area in context.screen.areas: if area.type == "SEQUENCE_EDITOR": area.tag_redraw() def on_progress(current, total): wm.bake_progress = current wm.bake_total = max(total, 1) for area in context.screen.areas: if area.type == "SEQUENCE_EDITOR": area.tag_redraw() wm.bake_progress = 0 wm.bake_total = 1 try: bake_generator.start( video_path=video_path, detections_path=detections_path, output_path=output_path, blur_size=blur_size, display_scale=display_scale, fmt=bake_format.lower(), on_complete=on_complete, on_progress=on_progress, ) except Exception as e: self.report({"ERROR"}, f"Failed to start bake: {e}") return {"CANCELLED"} self.report({"INFO"}, "Started blur bake in background") return {"FINISHED"} class SEQUENCER_OT_restore_original_source(Operator): """Restore active strip source filepath to original video.""" bl_idname = "sequencer.restore_original_source" bl_label = "Restore Original Source" bl_description = "Restore active strip to original source filepath" bl_options = {"REGISTER", "UNDO"} @classmethod def poll(cls, context): if not context.scene.sequence_editor: return False if get_bake_generator().is_running: return False strip = context.scene.sequence_editor.active_strip if not strip or strip.type != "MOVIE": return False return bool(strip.get(KEY_ORIGINAL)) def execute(self, context): strip = context.scene.sequence_editor.active_strip original_path = strip.get(KEY_ORIGINAL) if not original_path: self.report({"ERROR"}, "Original source path is not stored") return {"CANCELLED"} if not os.path.exists(original_path): self.report({"ERROR"}, f"Original source not found: {original_path}") return {"CANCELLED"} _set_strip_source(strip, original_path) strip[KEY_MODE] = "original" self.report({"INFO"}, "Restored original source") return {"FINISHED"} class SEQUENCER_OT_apply_mask_blur(Operator): """Compatibility alias: run bake-and-swap blur workflow.""" bl_idname = "sequencer.apply_mask_blur" bl_label = "Apply Mask Blur" bl_description = "Compatibility alias for Bake & Swap Source" bl_options = {"REGISTER", "UNDO"} @classmethod def poll(cls, context): return SEQUENCER_OT_bake_and_swap_blur_source.poll(context) def execute(self, context): return bpy.ops.sequencer.bake_and_swap_blur_source("EXEC_DEFAULT") class SEQUENCER_OT_cancel_bake_blur(Operator): """Cancel ongoing blur bake.""" bl_idname = "sequencer.cancel_bake_blur" bl_label = "Cancel Blur Bake" bl_description = "Cancel current blur bake process" bl_options = {"REGISTER"} def execute(self, context): bake_generator = get_bake_generator() if bake_generator.is_running: bake_generator.cancel() self.report({"INFO"}, "Blur bake cancelled") else: self.report({"WARNING"}, "No blur bake in progress") return {"FINISHED"} classes = [ SEQUENCER_OT_bake_and_swap_blur_source, SEQUENCER_OT_restore_original_source, SEQUENCER_OT_cancel_bake_blur, SEQUENCER_OT_apply_mask_blur, ] def register(): for cls in classes: bpy.utils.register_class(cls) bpy.types.WindowManager.bake_progress = IntProperty(default=0) bpy.types.WindowManager.bake_total = IntProperty(default=0) def unregister(): del bpy.types.WindowManager.bake_progress del bpy.types.WindowManager.bake_total for cls in reversed(classes): bpy.utils.unregister_class(cls)