""" Generate Mask Operator for Face Detection in VSE. Provides operators to generate face mask image sequences from video strips in the Video Sequence Editor. """ import os import bpy from bpy.props import IntProperty from bpy.types import Operator from ..core.async_generator import get_generator from ..core.utils import get_cache_dir_for_strip class SEQUENCER_OT_generate_face_mask(Operator): """Generate face mask image sequence from video strip.""" bl_idname = "sequencer.generate_face_mask" bl_label = "Generate Face Mask" bl_description = "Detect faces and generate mask image sequence" bl_options = {'REGISTER', 'UNDO'} @classmethod def poll(cls, context): """Check if operator can run.""" if not context.scene.sequence_editor: return False strip = context.scene.sequence_editor.active_strip if not strip: return False return strip.type in {'MOVIE', 'IMAGE'} def execute(self, context): strip = context.scene.sequence_editor.active_strip scene = context.scene # Get video path if strip.type == 'MOVIE': video_path = bpy.path.abspath(strip.filepath) else: # Image sequence - get directory video_path = bpy.path.abspath(strip.directory) if not os.path.exists(video_path): self.report({'ERROR'}, f"Video file not found: {video_path}") return {'CANCELLED'} # Determine output directory output_dir = self._get_cache_dir(context, strip) # Check cache - if masks already exist, use them expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1 if self._check_cache(output_dir, expected_frame_count): self.report({'INFO'}, f"Using cached masks from {output_dir}") self._add_mask_strip(context, strip.name, output_dir) return {'FINISHED'} # Get frame range start_frame = strip.frame_final_start end_frame = strip.frame_final_end fps = scene.render.fps / scene.render.fps_base # Start async generation generator = get_generator() if generator.is_running: self.report({'WARNING'}, "Mask generation already in progress") return {'CANCELLED'} # Store strip name for callback strip_name = strip.name def on_complete(status, data): """Called when mask generation completes.""" if status == "done": # Add mask strip to sequence editor self._add_mask_strip(context, strip_name, data) print(f"[FaceMask] Mask generation completed: {data}") elif status == "error": print(f"[FaceMask] Error: {data}") elif status == "cancelled": print("[FaceMask] Generation cancelled") def on_progress(current, total): """Called on progress updates.""" # Update window manager properties for UI wm = context.window_manager wm.mask_progress = current wm.mask_total = total # Force UI redraw for area in context.screen.areas: if area.type == 'SEQUENCE_EDITOR': area.tag_redraw() # Initialize progress wm = context.window_manager wm.mask_progress = 0 wm.mask_total = end_frame - start_frame + 1 # Get parameters from scene properties conf_threshold = scene.facemask_conf_threshold iou_threshold = scene.facemask_iou_threshold mask_scale = scene.facemask_mask_scale # Start generation generator.start( video_path=video_path, output_dir=output_dir, start_frame=0, # Frame indices in video end_frame=end_frame - start_frame, fps=fps, conf_threshold=conf_threshold, iou_threshold=iou_threshold, mask_scale=mask_scale, on_complete=on_complete, on_progress=on_progress, ) self.report({'INFO'}, f"Started mask generation for {strip.name}") return {'FINISHED'} def _get_cache_dir(self, context, strip) -> str: """Get or create cache directory for mask images.""" cache_dir = get_cache_dir_for_strip(strip.name) os.makedirs(cache_dir, exist_ok=True) return cache_dir def _check_cache(self, cache_dir: str, expected_frames: int) -> bool: """Check if cached masks exist and are complete. Args: cache_dir: Path to cache directory expected_frames: Number of frames expected Returns: True if cache exists and is valid """ if not os.path.exists(cache_dir): return False # Check for MP4 video (new format) mask_video = os.path.join(cache_dir, "mask.mp4") if os.path.exists(mask_video): # Prefer frame-count verification when cv2 is available, but do not # hard-fail on Blender Python environments without cv2. try: import cv2 cap = cv2.VideoCapture(mask_video) if cap.isOpened(): frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() # Accept cache if at least 90% of frames exist return frame_count >= expected_frames * 0.9 cap.release() return False except Exception: # Fallback: treat existing MP4 cache as valid when cv2 is unavailable. return True # Fallback: check for PNG sequence (backward compatibility) mask_files = [f for f in os.listdir(cache_dir) if f.startswith("mask_") and f.endswith(".png")] # Accept cache if at least 90% of frames exist return len(mask_files) >= expected_frames * 0.9 def _add_mask_strip(self, context, source_strip_name: str, mask_path: str): """Add mask video as a new strip. Args: context: Blender context source_strip_name: Name of the source video strip mask_path: Path to mask video file or directory (for backward compatibility) """ scene = context.scene seq_editor = scene.sequence_editor if not seq_editor: return # Find source strip (Blender 5.0 uses 'strips' instead of 'sequences') source_strip = seq_editor.strips.get(source_strip_name) if not source_strip: return # Check if mask_path is a video file or directory (backward compatibility) if os.path.isfile(mask_path): # New format: single MP4 file mask_video = mask_path else: # Old format: directory with PNG sequence (backward compatibility) mask_video = os.path.join(mask_path, "mask.mp4") if not os.path.exists(mask_video): # Fallback to PNG sequence mask_files = sorted([ f for f in os.listdir(mask_path) if f.startswith("mask_") and f.endswith(".png") ]) if not mask_files: return first_mask = os.path.join(mask_path, mask_files[0]) self._add_mask_strip_png_sequence(context, source_strip_name, mask_path, mask_files, first_mask) return # Find an empty channel used_channels = {s.channel for s in seq_editor.strips} new_channel = source_strip.channel + 1 while new_channel in used_channels: new_channel += 1 # Add movie strip (Blender 5.0 API) mask_strip = seq_editor.strips.new_movie( name=f"{source_strip_name}_mask", filepath=mask_video, channel=new_channel, frame_start=source_strip.frame_final_start, ) # Set blend mode for mask mask_strip.blend_type = 'ALPHA_OVER' mask_strip.blend_alpha = 0.5 def _add_mask_strip_png_sequence(self, context, source_strip_name, mask_dir, mask_files, first_mask): """Backward compatibility: Add PNG sequence as mask strip.""" scene = context.scene seq_editor = scene.sequence_editor source_strip = seq_editor.strips.get(source_strip_name) if not source_strip: return # Find an empty channel used_channels = {s.channel for s in seq_editor.strips} new_channel = source_strip.channel + 1 while new_channel in used_channels: new_channel += 1 # Add image sequence (Blender 5.0 API) mask_strip = seq_editor.strips.new_image( name=f"{source_strip_name}_mask", filepath=first_mask, channel=new_channel, frame_start=source_strip.frame_final_start, ) # Add remaining frames for mask_file in mask_files[1:]: mask_strip.elements.append(mask_file) # Set blend mode for mask mask_strip.blend_type = 'ALPHA_OVER' mask_strip.blend_alpha = 0.5 class SEQUENCER_OT_cancel_mask_generation(Operator): """Cancel ongoing mask generation.""" bl_idname = "sequencer.cancel_mask_generation" bl_label = "Cancel Mask Generation" bl_description = "Cancel the current mask generation process" bl_options = {'REGISTER'} def execute(self, context): generator = get_generator() if generator.is_running: generator.cancel() self.report({'INFO'}, "Mask generation cancelled") else: self.report({'WARNING'}, "No mask generation in progress") return {'FINISHED'} # Registration classes = [ SEQUENCER_OT_generate_face_mask, SEQUENCER_OT_cancel_mask_generation, ] def register(): for cls in classes: bpy.utils.register_class(cls) # Add progress properties to window manager bpy.types.WindowManager.mask_progress = IntProperty(default=0) bpy.types.WindowManager.mask_total = IntProperty(default=0) def unregister(): # Remove properties del bpy.types.WindowManager.mask_progress del bpy.types.WindowManager.mask_total for cls in reversed(classes): bpy.utils.unregister_class(cls)