""" Generate Mask Operator for Face Detection in VSE. Provides operators to generate face mask image sequences from video strips in the Video Sequence Editor. """ import os import bpy from bpy.props import IntProperty from bpy.types import Operator from ..core.async_generator import get_generator from ..core.utils import get_cache_dir_for_strip class SEQUENCER_OT_generate_face_mask(Operator): """Generate face mask image sequence from video strip.""" bl_idname = "sequencer.generate_face_mask" bl_label = "Generate Face Mask" bl_description = "Detect faces and generate mask image sequence" bl_options = {'REGISTER', 'UNDO'} @classmethod def poll(cls, context): """Check if operator can run.""" if not context.scene.sequence_editor: return False strip = context.scene.sequence_editor.active_strip if not strip: return False return strip.type in {'MOVIE', 'IMAGE'} def execute(self, context): strip = context.scene.sequence_editor.active_strip scene = context.scene # Get video path if strip.type == 'MOVIE': video_path = bpy.path.abspath(strip.filepath) else: # Image sequence - get directory video_path = bpy.path.abspath(strip.directory) if not os.path.exists(video_path): self.report({'ERROR'}, f"Video file not found: {video_path}") return {'CANCELLED'} # Determine output directory output_dir = self._get_cache_dir(context, strip) # Check cache - if masks already exist, use them expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1 if self._check_cache(output_dir, expected_frame_count): self.report({'INFO'}, f"Using cached detections from {output_dir}") return {'FINISHED'} # 動画の実際のフレーム数を取得(Blenderプロジェクトのfpsと動画のfpsが # 異なる場合にタイムライン上のフレーム数では不足するため) import cv2 as _cv2 _cap = _cv2.VideoCapture(video_path) total_video_frames = int(_cap.get(_cv2.CAP_PROP_FRAME_COUNT)) fps = _cap.get(_cv2.CAP_PROP_FPS) or (scene.render.fps / scene.render.fps_base) _cap.release() if total_video_frames <= 0: self.report({'ERROR'}, f"Could not read frame count from video: {video_path}") return {'CANCELLED'} # Start async generation generator = get_generator() if generator.is_running: self.report({'WARNING'}, "Mask generation already in progress") return {'CANCELLED'} def on_complete(status, data): """Called when mask generation completes.""" wm = context.window_manager wm.mask_total = max(wm.mask_total, generator.total_frames) if status == "done": wm.mask_progress = wm.mask_total elif status in {"error", "cancelled"}: wm.mask_progress = min(wm.mask_progress, wm.mask_total) if status == "done": print(f"[FaceMask] Mask generation completed: {data}") elif status == "error": print(f"[FaceMask] Error: {data}") elif status == "cancelled": print("[FaceMask] Generation cancelled") for area in context.screen.areas: if area.type == 'SEQUENCE_EDITOR': area.tag_redraw() def on_progress(current, total): """Called on progress updates.""" # Update window manager properties for UI wm = context.window_manager wm.mask_progress = current wm.mask_total = total # Force UI redraw for area in context.screen.areas: if area.type == 'SEQUENCE_EDITOR': area.tag_redraw() # Initialize progress wm = context.window_manager wm.mask_progress = 0 wm.mask_total = total_video_frames # Get parameters from scene properties conf_threshold = scene.facemask_conf_threshold iou_threshold = scene.facemask_iou_threshold # Start generation generator.start( video_path=video_path, output_dir=output_dir, start_frame=0, end_frame=total_video_frames - 1, fps=fps, conf_threshold=conf_threshold, iou_threshold=iou_threshold, on_complete=on_complete, on_progress=on_progress, ) self.report({'INFO'}, f"Started mask generation for {strip.name}") return {'FINISHED'} def _get_cache_dir(self, context, strip) -> str: """Get or create cache directory for mask images.""" cache_dir = get_cache_dir_for_strip(strip.name) os.makedirs(cache_dir, exist_ok=True) return cache_dir def _check_cache(self, cache_dir: str, expected_frames: int) -> bool: """Check if cached masks exist and are complete. Args: cache_dir: Path to cache directory expected_frames: Number of frames expected Returns: True if cache exists and is valid """ if not os.path.exists(cache_dir): return False detections_path = os.path.join(cache_dir, "detections.msgpack") if not os.path.exists(detections_path): return False # Quick sanity check: non-empty file try: if os.path.getsize(detections_path) <= 0: return False except OSError: return False # Optional frame count verification if msgpack is available try: import msgpack with open(detections_path, "rb") as f: payload = msgpack.unpackb(f.read(), raw=False) frames = payload.get("frames", []) return len(frames) >= expected_frames * 0.9 except Exception: return True class SEQUENCER_OT_cancel_mask_generation(Operator): """Cancel ongoing mask generation.""" bl_idname = "sequencer.cancel_mask_generation" bl_label = "Cancel Mask Generation" bl_description = "Cancel the current mask generation process" bl_options = {'REGISTER'} def execute(self, context): generator = get_generator() if generator.is_running: generator.cancel() self.report({'INFO'}, "Mask generation cancelled") else: self.report({'WARNING'}, "No mask generation in progress") return {'FINISHED'} # Registration classes = [ SEQUENCER_OT_generate_face_mask, SEQUENCER_OT_cancel_mask_generation, ] def register(): for cls in classes: bpy.utils.register_class(cls) # Add progress properties to window manager bpy.types.WindowManager.mask_progress = IntProperty(default=0) bpy.types.WindowManager.mask_total = IntProperty(default=0) def unregister(): # Remove properties del bpy.types.WindowManager.mask_progress del bpy.types.WindowManager.mask_total for cls in reversed(classes): bpy.utils.unregister_class(cls)