""" Generate Mask Operator for Face Detection in VSE. Provides operators to generate face mask image sequences from video strips in the Video Sequence Editor. """ import os import bpy from bpy.props import IntProperty from bpy.types import Operator from ..core.async_generator import get_generator from ..core.utils import get_cache_dir_for_strip class SEQUENCER_OT_generate_face_mask(Operator): """Generate face mask image sequence from video strip.""" bl_idname = "sequencer.generate_face_mask" bl_label = "Generate Face Mask" bl_description = "Detect faces and generate mask image sequence" bl_options = {'REGISTER', 'UNDO'} @classmethod def poll(cls, context): """Check if operator can run.""" if not context.scene.sequence_editor: return False strip = context.scene.sequence_editor.active_strip if not strip: return False return strip.type in {'MOVIE', 'IMAGE'} def execute(self, context): strip = context.scene.sequence_editor.active_strip scene = context.scene # Get video path if strip.type == 'MOVIE': video_path = bpy.path.abspath(strip.filepath) else: # Image sequence - get directory video_path = bpy.path.abspath(strip.directory) if not os.path.exists(video_path): self.report({'ERROR'}, f"Video file not found: {video_path}") return {'CANCELLED'} # Determine output directory output_dir = self._get_cache_dir(context, strip) # Check cache - if masks already exist, use them expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1 if self._check_cache(output_dir, expected_frame_count): self.report({'INFO'}, f"Using cached detections from {output_dir}") return {'FINISHED'} # Get frame range start_frame = strip.frame_final_start end_frame = strip.frame_final_end fps = scene.render.fps / scene.render.fps_base # Start async generation generator = get_generator() if generator.is_running: self.report({'WARNING'}, "Mask generation already in progress") return {'CANCELLED'} def on_complete(status, data): """Called when mask generation completes.""" wm = context.window_manager wm.mask_total = max(wm.mask_total, generator.total_frames) if status == "done": wm.mask_progress = wm.mask_total elif status in {"error", "cancelled"}: wm.mask_progress = min(wm.mask_progress, wm.mask_total) if status == "done": print(f"[FaceMask] Mask generation completed: {data}") elif status == "error": print(f"[FaceMask] Error: {data}") elif status == "cancelled": print("[FaceMask] Generation cancelled") for area in context.screen.areas: if area.type == 'SEQUENCE_EDITOR': area.tag_redraw() def on_progress(current, total): """Called on progress updates.""" # Update window manager properties for UI wm = context.window_manager wm.mask_progress = current wm.mask_total = total # Force UI redraw for area in context.screen.areas: if area.type == 'SEQUENCE_EDITOR': area.tag_redraw() # Initialize progress wm = context.window_manager wm.mask_progress = 0 wm.mask_total = end_frame - start_frame + 1 # Get parameters from scene properties conf_threshold = scene.facemask_conf_threshold iou_threshold = scene.facemask_iou_threshold mask_scale = scene.facemask_mask_scale # Start generation generator.start( video_path=video_path, output_dir=output_dir, start_frame=0, # Frame indices in video end_frame=end_frame - start_frame, fps=fps, conf_threshold=conf_threshold, iou_threshold=iou_threshold, mask_scale=mask_scale, on_complete=on_complete, on_progress=on_progress, ) self.report({'INFO'}, f"Started mask generation for {strip.name}") return {'FINISHED'} def _get_cache_dir(self, context, strip) -> str: """Get or create cache directory for mask images.""" cache_dir = get_cache_dir_for_strip(strip.name) os.makedirs(cache_dir, exist_ok=True) return cache_dir def _check_cache(self, cache_dir: str, expected_frames: int) -> bool: """Check if cached masks exist and are complete. Args: cache_dir: Path to cache directory expected_frames: Number of frames expected Returns: True if cache exists and is valid """ if not os.path.exists(cache_dir): return False detections_path = os.path.join(cache_dir, "detections.msgpack") if not os.path.exists(detections_path): return False # Quick sanity check: non-empty file try: if os.path.getsize(detections_path) <= 0: return False except OSError: return False # Optional frame count verification if msgpack is available try: import msgpack with open(detections_path, "rb") as f: payload = msgpack.unpackb(f.read(), raw=False) frames = payload.get("frames", []) return len(frames) >= expected_frames * 0.9 except Exception: return True class SEQUENCER_OT_cancel_mask_generation(Operator): """Cancel ongoing mask generation.""" bl_idname = "sequencer.cancel_mask_generation" bl_label = "Cancel Mask Generation" bl_description = "Cancel the current mask generation process" bl_options = {'REGISTER'} def execute(self, context): generator = get_generator() if generator.is_running: generator.cancel() self.report({'INFO'}, "Mask generation cancelled") else: self.report({'WARNING'}, "No mask generation in progress") return {'FINISHED'} # Registration classes = [ SEQUENCER_OT_generate_face_mask, SEQUENCER_OT_cancel_mask_generation, ] def register(): for cls in classes: bpy.utils.register_class(cls) # Add progress properties to window manager bpy.types.WindowManager.mask_progress = IntProperty(default=0) bpy.types.WindowManager.mask_total = IntProperty(default=0) def unregister(): # Remove properties del bpy.types.WindowManager.mask_progress del bpy.types.WindowManager.mask_total for cls in reversed(classes): bpy.utils.unregister_class(cls)