229 lines
8.0 KiB
Python
229 lines
8.0 KiB
Python
"""
|
||
Generate Mask Operator for Face Detection in VSE.
|
||
|
||
Provides operators to generate face mask image sequences
|
||
from video strips in the Video Sequence Editor.
|
||
"""
|
||
|
||
import os
|
||
import bpy
|
||
from bpy.props import IntProperty, BoolProperty
|
||
from bpy.types import Operator
|
||
|
||
from ..core.async_generator import get_generator
|
||
from ..core.inference_client import get_client
|
||
from ..core.utils import get_cache_dir_for_strip, check_detection_cache
|
||
|
||
|
||
def compute_strip_frame_range(strip, scene, client) -> tuple:
|
||
"""(start_frame, end_frame, source_fps) を返す。失敗時は例外を送出。"""
|
||
video_path = bpy.path.abspath(strip.filepath)
|
||
video_info = client.get_video_info(video_path)
|
||
total_video_frames = int(video_info.get("frame_count", 0))
|
||
source_fps = float(video_info.get("fps", 0.0))
|
||
if total_video_frames <= 0:
|
||
raise ValueError(f"Could not read frame count from video: {video_path}")
|
||
if source_fps <= 0:
|
||
source_fps = scene.render.fps / scene.render.fps_base
|
||
project_fps = scene.render.fps / scene.render.fps_base
|
||
fps_ratio = source_fps / project_fps
|
||
start_frame = int(round(strip.frame_offset_start * fps_ratio))
|
||
end_frame = start_frame + int(round(strip.frame_final_duration * fps_ratio)) - 1
|
||
start_frame = max(0, min(start_frame, total_video_frames - 1))
|
||
end_frame = max(start_frame, min(end_frame, total_video_frames - 1))
|
||
return start_frame, end_frame, source_fps
|
||
|
||
|
||
def get_image_strip_files(strip) -> tuple:
|
||
"""IMAGE strip の (abs_image_dir, filenames_list) を返す。"""
|
||
image_dir = bpy.path.abspath(strip.directory)
|
||
filenames = [elem.filename for elem in strip.elements]
|
||
return image_dir, filenames
|
||
|
||
|
||
def compute_image_strip_range(strip) -> tuple:
|
||
"""IMAGE strip のアクティブ範囲 (start_index, end_index) を返す。"""
|
||
total_elements = len(strip.elements)
|
||
start_idx = max(0, int(strip.frame_offset_start))
|
||
end_idx = start_idx + int(strip.frame_final_duration) - 1
|
||
start_idx = min(start_idx, total_elements - 1)
|
||
end_idx = max(start_idx, min(end_idx, total_elements - 1))
|
||
return start_idx, end_idx
|
||
|
||
|
||
def start_mask_gen_for_strip(context, strip, on_complete, on_progress):
|
||
"""Strip のマスク生成を開始する共通処理(MOVIE / IMAGE 両対応)。
|
||
|
||
generator.is_running 等のエラー時は例外を送出する。
|
||
wm.mask_progress / mask_total を初期化してから generator.start*() を呼ぶ。
|
||
"""
|
||
scene = context.scene
|
||
wm = context.window_manager
|
||
generator = get_generator()
|
||
|
||
if generator.is_running:
|
||
raise RuntimeError("Mask generation already in progress")
|
||
|
||
output_dir = get_cache_dir_for_strip(strip.name)
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
wm.mask_progress = 0
|
||
|
||
if strip.type == "IMAGE":
|
||
image_dir, filenames = get_image_strip_files(strip)
|
||
if not filenames:
|
||
raise ValueError("Image strip has no elements")
|
||
start_idx, end_idx = compute_image_strip_range(strip)
|
||
wm.mask_total = end_idx - start_idx + 1
|
||
generator.start_images(
|
||
image_dir=image_dir,
|
||
filenames=filenames,
|
||
output_dir=output_dir,
|
||
start_index=start_idx,
|
||
end_index=end_idx,
|
||
conf_threshold=scene.facemask_conf_threshold,
|
||
iou_threshold=scene.facemask_iou_threshold,
|
||
on_complete=on_complete,
|
||
on_progress=on_progress,
|
||
)
|
||
else:
|
||
client = get_client()
|
||
start_frame, end_frame, source_fps = compute_strip_frame_range(strip, scene, client)
|
||
wm.mask_total = end_frame - start_frame + 1
|
||
generator.start(
|
||
video_path=bpy.path.abspath(strip.filepath),
|
||
output_dir=output_dir,
|
||
start_frame=start_frame,
|
||
end_frame=end_frame,
|
||
fps=source_fps,
|
||
conf_threshold=scene.facemask_conf_threshold,
|
||
iou_threshold=scene.facemask_iou_threshold,
|
||
on_complete=on_complete,
|
||
on_progress=on_progress,
|
||
)
|
||
|
||
|
||
class SEQUENCER_OT_generate_face_mask(Operator):
|
||
"""Generate face mask image sequence from video strip."""
|
||
|
||
bl_idname = "sequencer.generate_face_mask"
|
||
bl_label = "Generate Face Mask"
|
||
bl_description = "Detect faces and generate mask image sequence"
|
||
bl_options = {'REGISTER', 'UNDO'}
|
||
|
||
force: BoolProperty(
|
||
name="Force Regenerate",
|
||
description="既存のキャッシュを無視して再生成する",
|
||
default=False,
|
||
)
|
||
|
||
@classmethod
|
||
def poll(cls, context):
|
||
if not context.scene.sequence_editor:
|
||
return False
|
||
strip = context.scene.sequence_editor.active_strip
|
||
if not strip:
|
||
return False
|
||
return strip.type in {'MOVIE', 'IMAGE'}
|
||
|
||
def execute(self, context):
|
||
strip = context.scene.sequence_editor.active_strip
|
||
|
||
# ファイル存在確認
|
||
if strip.type == 'MOVIE':
|
||
video_path = bpy.path.abspath(strip.filepath)
|
||
else:
|
||
video_path = bpy.path.abspath(strip.directory)
|
||
|
||
if not os.path.exists(video_path):
|
||
self.report({'ERROR'}, f"Video file not found: {video_path}")
|
||
return {'CANCELLED'}
|
||
|
||
# キャッシュ確認(force=True の場合はスキップ)
|
||
if not self.force and check_detection_cache(strip.name):
|
||
self.report({'INFO'}, f"Using cached detections for {strip.name}")
|
||
return {'FINISHED'}
|
||
|
||
generator = get_generator()
|
||
|
||
def on_complete(status, data):
|
||
wm = context.window_manager
|
||
wm.mask_total = max(wm.mask_total, generator.total_frames)
|
||
if status == "done":
|
||
wm.mask_progress = wm.mask_total
|
||
elif status in {"error", "cancelled"}:
|
||
wm.mask_progress = min(wm.mask_progress, wm.mask_total)
|
||
|
||
if status == "done":
|
||
print(f"[FaceMask] Mask generation completed: {data}")
|
||
elif status == "error":
|
||
print(f"[FaceMask] Error: {data}")
|
||
elif status == "cancelled":
|
||
print("[FaceMask] Generation cancelled")
|
||
|
||
for area in context.screen.areas:
|
||
if area.type == 'SEQUENCE_EDITOR':
|
||
area.tag_redraw()
|
||
|
||
def on_progress(current, total):
|
||
wm = context.window_manager
|
||
wm.mask_progress = current
|
||
wm.mask_total = total
|
||
for area in context.screen.areas:
|
||
if area.type == 'SEQUENCE_EDITOR':
|
||
area.tag_redraw()
|
||
|
||
try:
|
||
start_mask_gen_for_strip(context, strip, on_complete, on_progress)
|
||
except RuntimeError as e:
|
||
self.report({'WARNING'}, str(e))
|
||
return {'CANCELLED'}
|
||
except Exception as e:
|
||
self.report({'ERROR'}, f"Failed to start mask generation: {e}")
|
||
return {'CANCELLED'}
|
||
|
||
self.report({'INFO'}, f"Started mask generation for {strip.name}")
|
||
return {'FINISHED'}
|
||
|
||
|
||
class SEQUENCER_OT_cancel_mask_generation(Operator):
|
||
"""Cancel ongoing mask generation."""
|
||
|
||
bl_idname = "sequencer.cancel_mask_generation"
|
||
bl_label = "Cancel Mask Generation"
|
||
bl_description = "Cancel the current mask generation process"
|
||
bl_options = {'REGISTER'}
|
||
|
||
def execute(self, context):
|
||
generator = get_generator()
|
||
|
||
if generator.is_running:
|
||
generator.cancel()
|
||
self.report({'INFO'}, "Mask generation cancelled")
|
||
else:
|
||
self.report({'WARNING'}, "No mask generation in progress")
|
||
|
||
return {'FINISHED'}
|
||
|
||
|
||
# Registration
|
||
classes = [
|
||
SEQUENCER_OT_generate_face_mask,
|
||
SEQUENCER_OT_cancel_mask_generation,
|
||
]
|
||
|
||
|
||
def register():
|
||
for cls in classes:
|
||
bpy.utils.register_class(cls)
|
||
|
||
bpy.types.WindowManager.mask_progress = IntProperty(default=0)
|
||
bpy.types.WindowManager.mask_total = IntProperty(default=0)
|
||
|
||
|
||
def unregister():
|
||
del bpy.types.WindowManager.mask_progress
|
||
del bpy.types.WindowManager.mask_total
|
||
|
||
for cls in reversed(classes):
|
||
bpy.utils.unregister_class(cls)
|