""" Generate Mask Operator for Face Detection in VSE. Provides operators to generate face mask image sequences from video strips in the Video Sequence Editor. """ import os import bpy from bpy.props import FloatProperty, IntProperty from bpy.types import Operator from ..core.async_generator import get_generator class SEQUENCER_OT_generate_face_mask(Operator): """Generate face mask image sequence from video strip.""" bl_idname = "sequencer.generate_face_mask" bl_label = "Generate Face Mask" bl_description = "Detect faces and generate mask image sequence" bl_options = {'REGISTER', 'UNDO'} # Detection parameters scale_factor: FloatProperty( name="Scale Factor", description="Detection scale factor (larger = faster but less accurate)", default=1.1, min=1.01, max=2.0, ) min_neighbors: IntProperty( name="Min Neighbors", description="Minimum neighbors for detection (higher = fewer false positives)", default=5, min=1, max=20, ) mask_scale: FloatProperty( name="Mask Scale", description="Scale factor for mask region (1.0 = exact face size)", default=1.5, min=1.0, max=3.0, ) @classmethod def poll(cls, context): """Check if operator can run.""" if not context.scene.sequence_editor: return False strip = context.scene.sequence_editor.active_strip if not strip: return False return strip.type in {'MOVIE', 'IMAGE'} def execute(self, context): strip = context.scene.sequence_editor.active_strip scene = context.scene # Get video path if strip.type == 'MOVIE': video_path = bpy.path.abspath(strip.filepath) else: # Image sequence - get directory video_path = bpy.path.abspath(strip.directory) if not os.path.exists(video_path): self.report({'ERROR'}, f"Video file not found: {video_path}") return {'CANCELLED'} # Determine output directory output_dir = self._get_cache_dir(context, strip) # Check cache - if masks already exist, use them expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1 if self._check_cache(output_dir, expected_frame_count): self.report({'INFO'}, f"Using cached masks from {output_dir}") self._add_mask_strip(context, strip.name, output_dir) return {'FINISHED'} # Get frame range start_frame = strip.frame_final_start end_frame = strip.frame_final_end fps = scene.render.fps / scene.render.fps_base # Start async generation generator = get_generator() if generator.is_running: self.report({'WARNING'}, "Mask generation already in progress") return {'CANCELLED'} # Store strip name for callback strip_name = strip.name def on_complete(status, data): """Called when mask generation completes.""" if status == "done": # Add mask strip to sequence editor self._add_mask_strip(context, strip_name, data) print(f"[FaceMask] Mask generation completed: {data}") elif status == "error": print(f"[FaceMask] Error: {data}") elif status == "cancelled": print("[FaceMask] Generation cancelled") def on_progress(current, total): """Called on progress updates.""" # Update window manager properties for UI wm = context.window_manager wm.mask_progress = current wm.mask_total = total # Force UI redraw for area in context.screen.areas: if area.type == 'SEQUENCE_EDITOR': area.tag_redraw() # Initialize progress wm = context.window_manager wm.mask_progress = 0 wm.mask_total = end_frame - start_frame + 1 # Start generation generator.start( video_path=video_path, output_dir=output_dir, start_frame=0, # Frame indices in video end_frame=end_frame - start_frame, fps=fps, scale_factor=self.scale_factor, min_neighbors=self.min_neighbors, mask_scale=self.mask_scale, on_complete=on_complete, on_progress=on_progress, ) self.report({'INFO'}, f"Started mask generation for {strip.name}") return {'FINISHED'} def _get_cache_dir(self, context, strip) -> str: """Get or create cache directory for mask images.""" import tempfile # Use temp directory with project-specific subdirectory # This avoids issues with extension_path_user package name resolution blend_file = bpy.data.filepath if blend_file: # Use blend file directory if saved project_dir = os.path.dirname(blend_file) cache_dir = os.path.join(project_dir, ".mask_cache", strip.name) else: # Use temp directory for unsaved projects cache_dir = os.path.join(tempfile.gettempdir(), "blender_mask_cache", strip.name) os.makedirs(cache_dir, exist_ok=True) return cache_dir def _check_cache(self, cache_dir: str, expected_frames: int) -> bool: """Check if cached masks exist and are complete. Args: cache_dir: Path to cache directory expected_frames: Number of frames expected Returns: True if cache exists and has at least 90% of expected frames """ if not os.path.exists(cache_dir): return False mask_files = [f for f in os.listdir(cache_dir) if f.startswith("mask_") and f.endswith(".png")] # Accept cache if at least 90% of frames exist # (some frames may have been skipped due to read errors) return len(mask_files) >= expected_frames * 0.9 def _add_mask_strip(self, context, source_strip_name: str, mask_dir: str): """Add mask image sequence as a new strip.""" scene = context.scene seq_editor = scene.sequence_editor if not seq_editor: return # Find source strip (Blender 5.0 uses 'strips' instead of 'sequences') source_strip = seq_editor.strips.get(source_strip_name) if not source_strip: return # Get first mask image mask_files = sorted([ f for f in os.listdir(mask_dir) if f.startswith("mask_") and f.endswith(".png") ]) if not mask_files: return first_mask = os.path.join(mask_dir, mask_files[0]) # Find an empty channel used_channels = {s.channel for s in seq_editor.strips} new_channel = source_strip.channel + 1 while new_channel in used_channels: new_channel += 1 # Add image sequence (Blender 5.0 API) mask_strip = seq_editor.strips.new_image( name=f"{source_strip_name}_mask", filepath=first_mask, channel=new_channel, frame_start=source_strip.frame_final_start, ) # Add remaining frames for mask_file in mask_files[1:]: mask_strip.elements.append(mask_file) # Set blend mode for mask mask_strip.blend_type = 'ALPHA_OVER' mask_strip.blend_alpha = 0.5 class SEQUENCER_OT_cancel_mask_generation(Operator): """Cancel ongoing mask generation.""" bl_idname = "sequencer.cancel_mask_generation" bl_label = "Cancel Mask Generation" bl_description = "Cancel the current mask generation process" bl_options = {'REGISTER'} def execute(self, context): generator = get_generator() if generator.is_running: generator.cancel() self.report({'INFO'}, "Mask generation cancelled") else: self.report({'WARNING'}, "No mask generation in progress") return {'FINISHED'} # Registration classes = [ SEQUENCER_OT_generate_face_mask, SEQUENCER_OT_cancel_mask_generation, ] def register(): for cls in classes: bpy.utils.register_class(cls) # Add progress properties to window manager bpy.types.WindowManager.mask_progress = IntProperty(default=0) bpy.types.WindowManager.mask_total = IntProperty(default=0) def unregister(): # Remove properties del bpy.types.WindowManager.mask_progress del bpy.types.WindowManager.mask_total for cls in reversed(classes): bpy.utils.unregister_class(cls)