blender-mask-peoples/operators/generate_mask.py

217 lines
7.0 KiB
Python

"""
Generate Mask Operator for Face Detection in VSE.
Provides operators to generate face mask image sequences
from video strips in the Video Sequence Editor.
"""
import os
import bpy
from bpy.props import IntProperty
from bpy.types import Operator
from ..core.async_generator import get_generator
from ..core.utils import get_cache_dir_for_strip
class SEQUENCER_OT_generate_face_mask(Operator):
"""Generate face mask image sequence from video strip."""
bl_idname = "sequencer.generate_face_mask"
bl_label = "Generate Face Mask"
bl_description = "Detect faces and generate mask image sequence"
bl_options = {'REGISTER', 'UNDO'}
@classmethod
def poll(cls, context):
"""Check if operator can run."""
if not context.scene.sequence_editor:
return False
strip = context.scene.sequence_editor.active_strip
if not strip:
return False
return strip.type in {'MOVIE', 'IMAGE'}
def execute(self, context):
strip = context.scene.sequence_editor.active_strip
scene = context.scene
# Get video path
if strip.type == 'MOVIE':
video_path = bpy.path.abspath(strip.filepath)
else:
# Image sequence - get directory
video_path = bpy.path.abspath(strip.directory)
if not os.path.exists(video_path):
self.report({'ERROR'}, f"Video file not found: {video_path}")
return {'CANCELLED'}
# Determine output directory
output_dir = self._get_cache_dir(context, strip)
# Check cache - if masks already exist, use them
expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1
if self._check_cache(output_dir, expected_frame_count):
self.report({'INFO'}, f"Using cached detections from {output_dir}")
return {'FINISHED'}
# Get frame range
start_frame = strip.frame_final_start
end_frame = strip.frame_final_end
fps = scene.render.fps / scene.render.fps_base
# Start async generation
generator = get_generator()
if generator.is_running:
self.report({'WARNING'}, "Mask generation already in progress")
return {'CANCELLED'}
def on_complete(status, data):
"""Called when mask generation completes."""
wm = context.window_manager
wm.mask_total = max(wm.mask_total, generator.total_frames)
if status == "done":
wm.mask_progress = wm.mask_total
elif status in {"error", "cancelled"}:
wm.mask_progress = min(wm.mask_progress, wm.mask_total)
if status == "done":
print(f"[FaceMask] Mask generation completed: {data}")
elif status == "error":
print(f"[FaceMask] Error: {data}")
elif status == "cancelled":
print("[FaceMask] Generation cancelled")
for area in context.screen.areas:
if area.type == 'SEQUENCE_EDITOR':
area.tag_redraw()
def on_progress(current, total):
"""Called on progress updates."""
# Update window manager properties for UI
wm = context.window_manager
wm.mask_progress = current
wm.mask_total = total
# Force UI redraw
for area in context.screen.areas:
if area.type == 'SEQUENCE_EDITOR':
area.tag_redraw()
# Initialize progress
wm = context.window_manager
wm.mask_progress = 0
wm.mask_total = end_frame - start_frame + 1
# Get parameters from scene properties
conf_threshold = scene.facemask_conf_threshold
iou_threshold = scene.facemask_iou_threshold
mask_scale = scene.facemask_mask_scale
# Start generation
generator.start(
video_path=video_path,
output_dir=output_dir,
start_frame=0, # Frame indices in video
end_frame=end_frame - start_frame,
fps=fps,
conf_threshold=conf_threshold,
iou_threshold=iou_threshold,
mask_scale=mask_scale,
on_complete=on_complete,
on_progress=on_progress,
)
self.report({'INFO'}, f"Started mask generation for {strip.name}")
return {'FINISHED'}
def _get_cache_dir(self, context, strip) -> str:
"""Get or create cache directory for mask images."""
cache_dir = get_cache_dir_for_strip(strip.name)
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
def _check_cache(self, cache_dir: str, expected_frames: int) -> bool:
"""Check if cached masks exist and are complete.
Args:
cache_dir: Path to cache directory
expected_frames: Number of frames expected
Returns:
True if cache exists and is valid
"""
if not os.path.exists(cache_dir):
return False
detections_path = os.path.join(cache_dir, "detections.msgpack")
if not os.path.exists(detections_path):
return False
# Quick sanity check: non-empty file
try:
if os.path.getsize(detections_path) <= 0:
return False
except OSError:
return False
# Optional frame count verification if msgpack is available
try:
import msgpack
with open(detections_path, "rb") as f:
payload = msgpack.unpackb(f.read(), raw=False)
frames = payload.get("frames", [])
return len(frames) >= expected_frames * 0.9
except Exception:
return True
class SEQUENCER_OT_cancel_mask_generation(Operator):
"""Cancel ongoing mask generation."""
bl_idname = "sequencer.cancel_mask_generation"
bl_label = "Cancel Mask Generation"
bl_description = "Cancel the current mask generation process"
bl_options = {'REGISTER'}
def execute(self, context):
generator = get_generator()
if generator.is_running:
generator.cancel()
self.report({'INFO'}, "Mask generation cancelled")
else:
self.report({'WARNING'}, "No mask generation in progress")
return {'FINISHED'}
# Registration
classes = [
SEQUENCER_OT_generate_face_mask,
SEQUENCER_OT_cancel_mask_generation,
]
def register():
for cls in classes:
bpy.utils.register_class(cls)
# Add progress properties to window manager
bpy.types.WindowManager.mask_progress = IntProperty(default=0)
bpy.types.WindowManager.mask_total = IntProperty(default=0)
def unregister():
# Remove properties
del bpy.types.WindowManager.mask_progress
del bpy.types.WindowManager.mask_total
for cls in reversed(classes):
bpy.utils.unregister_class(cls)