""" Batch Bake operator: sequentially Generate Detection Cache → Bake for all selected MOVIE strips in the VSE. """ import os import bpy from bpy.props import IntProperty, StringProperty from bpy.types import Operator from ..core.batch_processor import get_batch_processor from ..core.async_generator import get_generator as get_mask_generator from ..core.async_bake_generator import get_bake_generator from .apply_blur import KEY_ORIGINAL, KEY_MODE, _set_strip_source class SEQUENCER_OT_batch_bake_selected(Operator): """Generate detection cache and bake blur for all selected MOVIE strips.""" bl_idname = "sequencer.batch_bake_selected" bl_label = "Batch Bake Selected" bl_description = "Generate detection cache and bake blur for all selected MOVIE strips" bl_options = {"REGISTER"} @classmethod def poll(cls, context): if not context.scene.sequence_editor: return False if get_batch_processor().is_running: return False if get_mask_generator().is_running: return False if get_bake_generator().is_running: return False seq_editor = context.scene.sequence_editor return any(s.select and s.type == "MOVIE" for s in seq_editor.strips) def execute(self, context): seq_editor = context.scene.sequence_editor strips = [s for s in seq_editor.strips if s.select and s.type == "MOVIE"] if not strips: self.report({"WARNING"}, "No MOVIE strips selected") return {"CANCELLED"} batch = get_batch_processor() def on_item_complete(idx, total, strip_name, status): pass # wm properties already updated by BatchProcessor def on_all_complete(results): done = sum(1 for r in results if r["status"] == "done") total = len(results) print(f"[FaceMask] Batch finished: {done}/{total} strips completed") wm = context.window_manager wm.batch_current = 0 wm.batch_total = len(strips) wm.batch_current_name = "" batch.start(context, strips, on_item_complete=on_item_complete, on_all_complete=on_all_complete) self.report({"INFO"}, f"Batch bake started for {len(strips)} strips") return {"FINISHED"} class SEQUENCER_OT_batch_regenerate_cache(Operator): """Regenerate detection cache for all selected MOVIE strips (ignore existing cache).""" bl_idname = "sequencer.batch_regenerate_cache" bl_label = "Batch Regenerate Cache" bl_description = "Regenerate detection cache for all selected MOVIE strips" bl_options = {"REGISTER"} @classmethod def poll(cls, context): if not context.scene.sequence_editor: return False if get_batch_processor().is_running: return False if get_mask_generator().is_running: return False if get_bake_generator().is_running: return False seq_editor = context.scene.sequence_editor return any(s.select and s.type == "MOVIE" for s in seq_editor.strips) def execute(self, context): seq_editor = context.scene.sequence_editor strips = [s for s in seq_editor.strips if s.select and s.type == "MOVIE"] if not strips: self.report({"WARNING"}, "No MOVIE strips selected") return {"CANCELLED"} batch = get_batch_processor() def on_all_complete(results): done = sum(1 for r in results if r["status"] == "done") print(f"[FaceMask] Batch regenerate finished: {done}/{len(results)} strips") batch.start( context, strips, on_all_complete=on_all_complete, mode="mask_only", ) self.report({"INFO"}, f"Batch regenerate cache started for {len(strips)} strips") return {"FINISHED"} class SEQUENCER_OT_batch_restore_original(Operator): """Restore original source for all selected MOVIE strips.""" bl_idname = "sequencer.batch_restore_original" bl_label = "Batch Restore Original" bl_description = "Restore original source filepath for all selected MOVIE strips" bl_options = {"REGISTER", "UNDO"} @classmethod def poll(cls, context): if not context.scene.sequence_editor: return False if get_batch_processor().is_running: return False seq_editor = context.scene.sequence_editor return any(s.select and s.type == "MOVIE" for s in seq_editor.strips) def execute(self, context): seq_editor = context.scene.sequence_editor strips = [s for s in seq_editor.strips if s.select and s.type == "MOVIE"] restored = 0 skipped = 0 for strip in strips: original_path = strip.get(KEY_ORIGINAL) if not original_path or not os.path.exists(original_path): skipped += 1 continue if strip.get(KEY_MODE, "original") != "original": _set_strip_source(strip, original_path) strip[KEY_MODE] = "original" restored += 1 self.report( {"INFO"}, f"Restored {restored} strip(s)" + (f", skipped {skipped} (no original stored)" if skipped else ""), ) return {"FINISHED"} class SEQUENCER_OT_cancel_batch_bake(Operator): """Cancel ongoing batch bake.""" bl_idname = "sequencer.cancel_batch_bake" bl_label = "Cancel Batch Bake" bl_description = "Cancel the current batch bake process" bl_options = {"REGISTER"} def execute(self, context): batch = get_batch_processor() if batch.is_running: batch.cancel() self.report({"INFO"}, "Batch bake cancelled") else: self.report({"WARNING"}, "No batch bake in progress") return {"FINISHED"} classes = [ SEQUENCER_OT_batch_bake_selected, SEQUENCER_OT_batch_regenerate_cache, SEQUENCER_OT_batch_restore_original, SEQUENCER_OT_cancel_batch_bake, ] def register(): for cls in classes: bpy.utils.register_class(cls) bpy.types.WindowManager.batch_current = IntProperty(default=0) bpy.types.WindowManager.batch_total = IntProperty(default=0) bpy.types.WindowManager.batch_current_name = StringProperty(default="") def unregister(): del bpy.types.WindowManager.batch_current_name del bpy.types.WindowManager.batch_total del bpy.types.WindowManager.batch_current for cls in reversed(classes): bpy.utils.unregister_class(cls)