blender-mask-peoples/operators/generate_mask.py
2026-02-12 22:52:00 +09:00

309 lines
11 KiB
Python

"""
Generate Mask Operator for Face Detection in VSE.
Provides operators to generate face mask image sequences
from video strips in the Video Sequence Editor.
"""
import os
import bpy
from bpy.props import IntProperty
from bpy.types import Operator
from ..core.async_generator import get_generator
class SEQUENCER_OT_generate_face_mask(Operator):
"""Generate face mask image sequence from video strip."""
bl_idname = "sequencer.generate_face_mask"
bl_label = "Generate Face Mask"
bl_description = "Detect faces and generate mask image sequence"
bl_options = {'REGISTER', 'UNDO'}
@classmethod
def poll(cls, context):
"""Check if operator can run."""
if not context.scene.sequence_editor:
return False
strip = context.scene.sequence_editor.active_strip
if not strip:
return False
return strip.type in {'MOVIE', 'IMAGE'}
def execute(self, context):
strip = context.scene.sequence_editor.active_strip
scene = context.scene
# Get video path
if strip.type == 'MOVIE':
video_path = bpy.path.abspath(strip.filepath)
else:
# Image sequence - get directory
video_path = bpy.path.abspath(strip.directory)
if not os.path.exists(video_path):
self.report({'ERROR'}, f"Video file not found: {video_path}")
return {'CANCELLED'}
# Determine output directory
output_dir = self._get_cache_dir(context, strip)
# Check cache - if masks already exist, use them
expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1
if self._check_cache(output_dir, expected_frame_count):
self.report({'INFO'}, f"Using cached masks from {output_dir}")
self._add_mask_strip(context, strip.name, output_dir)
return {'FINISHED'}
# Get frame range
start_frame = strip.frame_final_start
end_frame = strip.frame_final_end
fps = scene.render.fps / scene.render.fps_base
# Start async generation
generator = get_generator()
if generator.is_running:
self.report({'WARNING'}, "Mask generation already in progress")
return {'CANCELLED'}
# Store strip name for callback
strip_name = strip.name
def on_complete(status, data):
"""Called when mask generation completes."""
if status == "done":
# Add mask strip to sequence editor
self._add_mask_strip(context, strip_name, data)
print(f"[FaceMask] Mask generation completed: {data}")
elif status == "error":
print(f"[FaceMask] Error: {data}")
elif status == "cancelled":
print("[FaceMask] Generation cancelled")
def on_progress(current, total):
"""Called on progress updates."""
# Update window manager properties for UI
wm = context.window_manager
wm.mask_progress = current
wm.mask_total = total
# Force UI redraw
for area in context.screen.areas:
if area.type == 'SEQUENCE_EDITOR':
area.tag_redraw()
# Initialize progress
wm = context.window_manager
wm.mask_progress = 0
wm.mask_total = end_frame - start_frame + 1
# Get parameters from scene properties
conf_threshold = scene.facemask_conf_threshold
iou_threshold = scene.facemask_iou_threshold
mask_scale = scene.facemask_mask_scale
# Start generation
generator.start(
video_path=video_path,
output_dir=output_dir,
start_frame=0, # Frame indices in video
end_frame=end_frame - start_frame,
fps=fps,
conf_threshold=conf_threshold,
iou_threshold=iou_threshold,
mask_scale=mask_scale,
on_complete=on_complete,
on_progress=on_progress,
)
self.report({'INFO'}, f"Started mask generation for {strip.name}")
return {'FINISHED'}
def _get_cache_dir(self, context, strip) -> str:
"""Get or create cache directory for mask images."""
import tempfile
# Use temp directory with project-specific subdirectory
# This avoids issues with extension_path_user package name resolution
blend_file = bpy.data.filepath
if blend_file:
# Use blend file directory if saved
project_dir = os.path.dirname(blend_file)
cache_dir = os.path.join(project_dir, ".mask_cache", strip.name)
else:
# Use temp directory for unsaved projects
cache_dir = os.path.join(tempfile.gettempdir(), "blender_mask_cache", strip.name)
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
def _check_cache(self, cache_dir: str, expected_frames: int) -> bool:
"""Check if cached masks exist and are complete.
Args:
cache_dir: Path to cache directory
expected_frames: Number of frames expected
Returns:
True if cache exists and is valid
"""
if not os.path.exists(cache_dir):
return False
# Check for MP4 video (new format)
mask_video = os.path.join(cache_dir, "mask.mp4")
if os.path.exists(mask_video):
# Verify video has expected number of frames
import cv2
cap = cv2.VideoCapture(mask_video)
if cap.isOpened():
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
# Accept cache if at least 90% of frames exist
return frame_count >= expected_frames * 0.9
cap.release()
return False
# Fallback: check for PNG sequence (backward compatibility)
mask_files = [f for f in os.listdir(cache_dir)
if f.startswith("mask_") and f.endswith(".png")]
# Accept cache if at least 90% of frames exist
return len(mask_files) >= expected_frames * 0.9
def _add_mask_strip(self, context, source_strip_name: str, mask_path: str):
"""Add mask video as a new strip.
Args:
context: Blender context
source_strip_name: Name of the source video strip
mask_path: Path to mask video file or directory (for backward compatibility)
"""
scene = context.scene
seq_editor = scene.sequence_editor
if not seq_editor:
return
# Find source strip (Blender 5.0 uses 'strips' instead of 'sequences')
source_strip = seq_editor.strips.get(source_strip_name)
if not source_strip:
return
# Check if mask_path is a video file or directory (backward compatibility)
if os.path.isfile(mask_path):
# New format: single MP4 file
mask_video = mask_path
else:
# Old format: directory with PNG sequence (backward compatibility)
mask_video = os.path.join(mask_path, "mask.mp4")
if not os.path.exists(mask_video):
# Fallback to PNG sequence
mask_files = sorted([
f for f in os.listdir(mask_path)
if f.startswith("mask_") and f.endswith(".png")
])
if not mask_files:
return
first_mask = os.path.join(mask_path, mask_files[0])
self._add_mask_strip_png_sequence(context, source_strip_name, mask_path, mask_files, first_mask)
return
# Find an empty channel
used_channels = {s.channel for s in seq_editor.strips}
new_channel = source_strip.channel + 1
while new_channel in used_channels:
new_channel += 1
# Add movie strip (Blender 5.0 API)
mask_strip = seq_editor.strips.new_movie(
name=f"{source_strip_name}_mask",
filepath=mask_video,
channel=new_channel,
frame_start=source_strip.frame_final_start,
)
# Set blend mode for mask
mask_strip.blend_type = 'ALPHA_OVER'
mask_strip.blend_alpha = 0.5
def _add_mask_strip_png_sequence(self, context, source_strip_name, mask_dir, mask_files, first_mask):
"""Backward compatibility: Add PNG sequence as mask strip."""
scene = context.scene
seq_editor = scene.sequence_editor
source_strip = seq_editor.strips.get(source_strip_name)
if not source_strip:
return
# Find an empty channel
used_channels = {s.channel for s in seq_editor.strips}
new_channel = source_strip.channel + 1
while new_channel in used_channels:
new_channel += 1
# Add image sequence (Blender 5.0 API)
mask_strip = seq_editor.strips.new_image(
name=f"{source_strip_name}_mask",
filepath=first_mask,
channel=new_channel,
frame_start=source_strip.frame_final_start,
)
# Add remaining frames
for mask_file in mask_files[1:]:
mask_strip.elements.append(mask_file)
# Set blend mode for mask
mask_strip.blend_type = 'ALPHA_OVER'
mask_strip.blend_alpha = 0.5
class SEQUENCER_OT_cancel_mask_generation(Operator):
"""Cancel ongoing mask generation."""
bl_idname = "sequencer.cancel_mask_generation"
bl_label = "Cancel Mask Generation"
bl_description = "Cancel the current mask generation process"
bl_options = {'REGISTER'}
def execute(self, context):
generator = get_generator()
if generator.is_running:
generator.cancel()
self.report({'INFO'}, "Mask generation cancelled")
else:
self.report({'WARNING'}, "No mask generation in progress")
return {'FINISHED'}
# Registration
classes = [
SEQUENCER_OT_generate_face_mask,
SEQUENCER_OT_cancel_mask_generation,
]
def register():
for cls in classes:
bpy.utils.register_class(cls)
# Add progress properties to window manager
bpy.types.WindowManager.mask_progress = IntProperty(default=0)
bpy.types.WindowManager.mask_total = IntProperty(default=0)
def unregister():
# Remove properties
del bpy.types.WindowManager.mask_progress
del bpy.types.WindowManager.mask_total
for cls in reversed(classes):
bpy.utils.unregister_class(cls)