274 lines
8.9 KiB
Python
274 lines
8.9 KiB
Python
"""
|
|
Generate Mask Operator for Face Detection in VSE.
|
|
|
|
Provides operators to generate face mask image sequences
|
|
from video strips in the Video Sequence Editor.
|
|
"""
|
|
|
|
import os
|
|
import bpy
|
|
from bpy.props import FloatProperty, IntProperty
|
|
from bpy.types import Operator
|
|
|
|
from ..core.async_generator import get_generator
|
|
|
|
|
|
class SEQUENCER_OT_generate_face_mask(Operator):
|
|
"""Generate face mask image sequence from video strip."""
|
|
|
|
bl_idname = "sequencer.generate_face_mask"
|
|
bl_label = "Generate Face Mask"
|
|
bl_description = "Detect faces and generate mask image sequence"
|
|
bl_options = {'REGISTER', 'UNDO'}
|
|
|
|
# YOLO Detection parameters
|
|
conf_threshold: FloatProperty(
|
|
name="Confidence",
|
|
description="YOLO confidence threshold (higher = fewer false positives)",
|
|
default=0.25,
|
|
min=0.1,
|
|
max=1.0,
|
|
)
|
|
|
|
iou_threshold: FloatProperty(
|
|
name="IOU Threshold",
|
|
description="Non-maximum suppression IOU threshold",
|
|
default=0.45,
|
|
min=0.1,
|
|
max=1.0,
|
|
)
|
|
|
|
mask_scale: FloatProperty(
|
|
name="Mask Scale",
|
|
description="Scale factor for mask region (1.0 = exact face size)",
|
|
default=1.5,
|
|
min=1.0,
|
|
max=3.0,
|
|
)
|
|
|
|
@classmethod
|
|
def poll(cls, context):
|
|
"""Check if operator can run."""
|
|
if not context.scene.sequence_editor:
|
|
return False
|
|
|
|
strip = context.scene.sequence_editor.active_strip
|
|
if not strip:
|
|
return False
|
|
|
|
return strip.type in {'MOVIE', 'IMAGE'}
|
|
|
|
def execute(self, context):
|
|
strip = context.scene.sequence_editor.active_strip
|
|
scene = context.scene
|
|
|
|
# Get video path
|
|
if strip.type == 'MOVIE':
|
|
video_path = bpy.path.abspath(strip.filepath)
|
|
else:
|
|
# Image sequence - get directory
|
|
video_path = bpy.path.abspath(strip.directory)
|
|
|
|
if not os.path.exists(video_path):
|
|
self.report({'ERROR'}, f"Video file not found: {video_path}")
|
|
return {'CANCELLED'}
|
|
|
|
# Determine output directory
|
|
output_dir = self._get_cache_dir(context, strip)
|
|
|
|
# Check cache - if masks already exist, use them
|
|
expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1
|
|
if self._check_cache(output_dir, expected_frame_count):
|
|
self.report({'INFO'}, f"Using cached masks from {output_dir}")
|
|
self._add_mask_strip(context, strip.name, output_dir)
|
|
return {'FINISHED'}
|
|
|
|
# Get frame range
|
|
start_frame = strip.frame_final_start
|
|
end_frame = strip.frame_final_end
|
|
fps = scene.render.fps / scene.render.fps_base
|
|
|
|
# Start async generation
|
|
generator = get_generator()
|
|
|
|
if generator.is_running:
|
|
self.report({'WARNING'}, "Mask generation already in progress")
|
|
return {'CANCELLED'}
|
|
|
|
# Store strip name for callback
|
|
strip_name = strip.name
|
|
|
|
def on_complete(status, data):
|
|
"""Called when mask generation completes."""
|
|
if status == "done":
|
|
# Add mask strip to sequence editor
|
|
self._add_mask_strip(context, strip_name, data)
|
|
print(f"[FaceMask] Mask generation completed: {data}")
|
|
elif status == "error":
|
|
print(f"[FaceMask] Error: {data}")
|
|
elif status == "cancelled":
|
|
print("[FaceMask] Generation cancelled")
|
|
|
|
def on_progress(current, total):
|
|
"""Called on progress updates."""
|
|
# Update window manager properties for UI
|
|
wm = context.window_manager
|
|
wm.mask_progress = current
|
|
wm.mask_total = total
|
|
|
|
# Force UI redraw
|
|
for area in context.screen.areas:
|
|
if area.type == 'SEQUENCE_EDITOR':
|
|
area.tag_redraw()
|
|
|
|
# Initialize progress
|
|
wm = context.window_manager
|
|
wm.mask_progress = 0
|
|
wm.mask_total = end_frame - start_frame + 1
|
|
|
|
# Start generation
|
|
generator.start(
|
|
video_path=video_path,
|
|
output_dir=output_dir,
|
|
start_frame=0, # Frame indices in video
|
|
end_frame=end_frame - start_frame,
|
|
fps=fps,
|
|
conf_threshold=self.conf_threshold,
|
|
iou_threshold=self.iou_threshold,
|
|
mask_scale=self.mask_scale,
|
|
on_complete=on_complete,
|
|
on_progress=on_progress,
|
|
)
|
|
|
|
self.report({'INFO'}, f"Started mask generation for {strip.name}")
|
|
return {'FINISHED'}
|
|
|
|
def _get_cache_dir(self, context, strip) -> str:
|
|
"""Get or create cache directory for mask images."""
|
|
import tempfile
|
|
|
|
# Use temp directory with project-specific subdirectory
|
|
# This avoids issues with extension_path_user package name resolution
|
|
blend_file = bpy.data.filepath
|
|
if blend_file:
|
|
# Use blend file directory if saved
|
|
project_dir = os.path.dirname(blend_file)
|
|
cache_dir = os.path.join(project_dir, ".mask_cache", strip.name)
|
|
else:
|
|
# Use temp directory for unsaved projects
|
|
cache_dir = os.path.join(tempfile.gettempdir(), "blender_mask_cache", strip.name)
|
|
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
return cache_dir
|
|
|
|
def _check_cache(self, cache_dir: str, expected_frames: int) -> bool:
|
|
"""Check if cached masks exist and are complete.
|
|
|
|
Args:
|
|
cache_dir: Path to cache directory
|
|
expected_frames: Number of frames expected
|
|
|
|
Returns:
|
|
True if cache exists and has at least 90% of expected frames
|
|
"""
|
|
if not os.path.exists(cache_dir):
|
|
return False
|
|
|
|
mask_files = [f for f in os.listdir(cache_dir)
|
|
if f.startswith("mask_") and f.endswith(".png")]
|
|
|
|
# Accept cache if at least 90% of frames exist
|
|
# (some frames may have been skipped due to read errors)
|
|
return len(mask_files) >= expected_frames * 0.9
|
|
|
|
def _add_mask_strip(self, context, source_strip_name: str, mask_dir: str):
|
|
"""Add mask image sequence as a new strip."""
|
|
scene = context.scene
|
|
seq_editor = scene.sequence_editor
|
|
|
|
if not seq_editor:
|
|
return
|
|
|
|
# Find source strip (Blender 5.0 uses 'strips' instead of 'sequences')
|
|
source_strip = seq_editor.strips.get(source_strip_name)
|
|
if not source_strip:
|
|
return
|
|
|
|
# Get first mask image
|
|
mask_files = sorted([
|
|
f for f in os.listdir(mask_dir)
|
|
if f.startswith("mask_") and f.endswith(".png")
|
|
])
|
|
|
|
if not mask_files:
|
|
return
|
|
|
|
first_mask = os.path.join(mask_dir, mask_files[0])
|
|
|
|
# Find an empty channel
|
|
used_channels = {s.channel for s in seq_editor.strips}
|
|
new_channel = source_strip.channel + 1
|
|
while new_channel in used_channels:
|
|
new_channel += 1
|
|
|
|
# Add image sequence (Blender 5.0 API)
|
|
mask_strip = seq_editor.strips.new_image(
|
|
name=f"{source_strip_name}_mask",
|
|
filepath=first_mask,
|
|
channel=new_channel,
|
|
frame_start=source_strip.frame_final_start,
|
|
)
|
|
|
|
# Add remaining frames
|
|
for mask_file in mask_files[1:]:
|
|
mask_strip.elements.append(mask_file)
|
|
|
|
# Set blend mode for mask
|
|
mask_strip.blend_type = 'ALPHA_OVER'
|
|
mask_strip.blend_alpha = 0.5
|
|
|
|
|
|
class SEQUENCER_OT_cancel_mask_generation(Operator):
|
|
"""Cancel ongoing mask generation."""
|
|
|
|
bl_idname = "sequencer.cancel_mask_generation"
|
|
bl_label = "Cancel Mask Generation"
|
|
bl_description = "Cancel the current mask generation process"
|
|
bl_options = {'REGISTER'}
|
|
|
|
def execute(self, context):
|
|
generator = get_generator()
|
|
|
|
if generator.is_running:
|
|
generator.cancel()
|
|
self.report({'INFO'}, "Mask generation cancelled")
|
|
else:
|
|
self.report({'WARNING'}, "No mask generation in progress")
|
|
|
|
return {'FINISHED'}
|
|
|
|
|
|
# Registration
|
|
classes = [
|
|
SEQUENCER_OT_generate_face_mask,
|
|
SEQUENCER_OT_cancel_mask_generation,
|
|
]
|
|
|
|
|
|
def register():
|
|
for cls in classes:
|
|
bpy.utils.register_class(cls)
|
|
|
|
# Add progress properties to window manager
|
|
bpy.types.WindowManager.mask_progress = IntProperty(default=0)
|
|
bpy.types.WindowManager.mask_total = IntProperty(default=0)
|
|
|
|
|
|
def unregister():
|
|
# Remove properties
|
|
del bpy.types.WindowManager.mask_progress
|
|
del bpy.types.WindowManager.mask_total
|
|
|
|
for cls in reversed(classes):
|
|
bpy.utils.unregister_class(cls)
|