diff --git a/core/async_bake_generator.py b/core/async_bake_generator.py index 7888aa2..d8facea 100644 --- a/core/async_bake_generator.py +++ b/core/async_bake_generator.py @@ -29,7 +29,7 @@ class AsyncBakeGenerator: def start( self, video_path: str, - mask_path: str, + detections_path: str, output_path: str, blur_size: int, fmt: str, @@ -53,7 +53,7 @@ class AsyncBakeGenerator: self.worker_thread = threading.Thread( target=self._worker, - args=(video_path, mask_path, output_path, blur_size, fmt), + args=(video_path, detections_path, output_path, blur_size, fmt), daemon=True, ) self.worker_thread.start() @@ -72,7 +72,7 @@ class AsyncBakeGenerator: def _worker( self, video_path: str, - mask_path: str, + detections_path: str, output_path: str, blur_size: int, fmt: str, @@ -85,7 +85,7 @@ class AsyncBakeGenerator: client = get_client() task_id = client.bake_blur( video_path=video_path, - mask_path=mask_path, + detections_path=detections_path, output_path=output_path, blur_size=blur_size, fmt=fmt, diff --git a/core/async_generator.py b/core/async_generator.py index 171c425..2743502 100644 --- a/core/async_generator.py +++ b/core/async_generator.py @@ -9,8 +9,7 @@ Blender's UI remains responsive via bpy.app.timers. import os import threading import queue -from functools import partial -from typing import Optional, Callable, Tuple +from typing import Optional, Callable from pathlib import Path # Will be imported when running inside Blender @@ -150,9 +149,20 @@ class AsyncMaskGenerator: while self.is_running: status = client.get_task_status(task_id) state = status.get("status") + + total = status.get("total", 0) + if total > 0: + self.total_frames = total if state == "completed": - self.result_queue.put(("done", output_dir)) + final_progress = status.get("progress", self.total_frames) + if final_progress >= 0: + self.progress_queue.put(("progress", final_progress)) + result_path = status.get( + "result_path", + os.path.join(output_dir, "detections.msgpack"), + ) + self.result_queue.put(("done", result_path)) return elif state == "failed": @@ -167,7 +177,7 @@ class AsyncMaskGenerator: # Report progress progress = status.get("progress", 0) - if progress > 0: + if progress >= 0: self.progress_queue.put(("progress", progress)) time.sleep(0.5) @@ -206,6 +216,16 @@ class AsyncMaskGenerator: try: msg_type, data = self.result_queue.get_nowait() self.is_running = False + + # Ensure UI receives a final progress update before completion. + if ( + msg_type == "done" + and self.total_frames > 0 + and self.current_frame < self.total_frames + and self._on_progress + ): + self.current_frame = self.total_frames + self._on_progress(self.current_frame, self.total_frames) if self._on_complete: self._on_complete(msg_type, data) diff --git a/core/inference_client.py b/core/inference_client.py index 3bfae95..92de61b 100644 --- a/core/inference_client.py +++ b/core/inference_client.py @@ -252,7 +252,7 @@ class InferenceClient: def bake_blur( self, video_path: str, - mask_path: str, + detections_path: str, output_path: str, blur_size: int, fmt: str, @@ -268,7 +268,7 @@ class InferenceClient: data = { "video_path": video_path, - "mask_path": mask_path, + "detections_path": detections_path, "output_path": output_path, "blur_size": blur_size, "format": fmt, diff --git a/core/utils.py b/core/utils.py index 486567b..c6115b1 100644 --- a/core/utils.py +++ b/core/utils.py @@ -78,6 +78,11 @@ def get_cache_dir_for_strip(strip_name: str) -> str: return os.path.join(get_cache_root(), strip_name) +def get_detections_path_for_strip(strip_name: str) -> str: + """Get msgpack detection cache path for a specific strip.""" + return os.path.join(get_cache_dir_for_strip(strip_name), "detections.msgpack") + + def get_cache_info(strip_name: Optional[str] = None) -> Tuple[str, int, int]: """ Get cache directory information. diff --git a/flake.nix b/flake.nix index 7b81f1b..ec46343 100644 --- a/flake.nix +++ b/flake.nix @@ -61,26 +61,22 @@ # venvをアクティベート source "$VENV_DIR/bin/activate" - # 必要なパッケージのインストール確認とインストール + # PyTorch ROCm版の導入(GPU未認識時のみ) if ! python -c "import torch; print(torch.cuda.is_available())" 2>/dev/null | grep -q "True"; then - echo "[Setup] Installing Python dependencies..." - # まずPyTorch ROCm版をインストール(ROCm 7.0 nightly - ROCm 7.1.1環境で動作確認済み) + echo "[Setup] Installing PyTorch ROCm dependencies..." pip install --quiet --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/rocm7.0 - # 次に通常のPyPIから他のパッケージをインストール - pip install --quiet \ - ultralytics \ - opencv-python-headless \ - numpy \ - fastapi \ - uvicorn \ - pydantic - # opencv-pythonがインストールされていたら削除(headless版のみ使用) - pip uninstall -y opencv-python opencv 2>/dev/null || true - # opencv-python-headlessを再インストールして確実にする - pip install --quiet --force-reinstall opencv-python-headless - echo "[Setup] Dependencies installed successfully" fi + # プロジェクト依存(requirements.txt)を同期 + if [ -f "$PWD/requirements.txt" ]; then + echo "[Setup] Syncing Python dependencies from requirements.txt..." + pip install --quiet -r "$PWD/requirements.txt" + fi + + # opencv-pythonが入っていた場合はheadlessに統一 + pip uninstall -y opencv-python opencv 2>/dev/null || true + pip install --quiet --upgrade opencv-python-headless + # Pythonパスにカレントディレクトリを追加 export PYTHONPATH="$PWD:$PYTHONPATH" diff --git a/operators/apply_blur.py b/operators/apply_blur.py index abde68e..e7ec761 100644 --- a/operators/apply_blur.py +++ b/operators/apply_blur.py @@ -12,6 +12,7 @@ from bpy.types import Operator from ..core.async_bake_generator import get_bake_generator from ..core.async_generator import get_generator as get_mask_generator +from ..core.utils import get_detections_path_for_strip KEY_ORIGINAL = "facemask_original_filepath" @@ -28,19 +29,9 @@ FORMAT_EXT = { } -def _find_mask_strip(seq_editor, strip_name: str): - return seq_editor.strips.get(f"{strip_name}_mask") - - -def _resolve_mask_path(mask_strip) -> str: - if mask_strip.type == "MOVIE": - return bpy.path.abspath(mask_strip.filepath) - return "" - - -def _output_path(video_strip, mask_path: str, fmt: str) -> str: +def _output_path(video_strip, detections_path: str, fmt: str) -> str: ext = FORMAT_EXT.get(fmt, "mp4") - out_dir = os.path.dirname(mask_path) + out_dir = os.path.dirname(detections_path) safe_name = video_strip.name.replace("/", "_").replace("\\", "_") return os.path.join(out_dir, f"{safe_name}_blurred.{ext}") @@ -83,22 +74,17 @@ class SEQUENCER_OT_bake_and_swap_blur_source(Operator): scene = context.scene video_strip = seq_editor.active_strip - mask_strip = _find_mask_strip(seq_editor, video_strip.name) - if not mask_strip: - self.report({"ERROR"}, f"Mask strip not found: {video_strip.name}_mask") - return {"CANCELLED"} - video_path = bpy.path.abspath(video_strip.filepath) - mask_path = _resolve_mask_path(mask_strip) + detections_path = get_detections_path_for_strip(video_strip.name) if not os.path.exists(video_path): self.report({"ERROR"}, f"Source video not found: {video_path}") return {"CANCELLED"} - if not mask_path or not os.path.exists(mask_path): - self.report({"ERROR"}, f"Mask video not found: {mask_path}") + if not os.path.exists(detections_path): + self.report({"ERROR"}, f"Detection cache not found: {detections_path}") return {"CANCELLED"} bake_format = scene.facemask_bake_format - output_path = _output_path(video_strip, mask_path, bake_format) + output_path = _output_path(video_strip, detections_path, bake_format) blur_size = int(scene.facemask_bake_blur_size) # Reuse baked cache when parameters match and file still exists. @@ -164,7 +150,7 @@ class SEQUENCER_OT_bake_and_swap_blur_source(Operator): try: bake_generator.start( video_path=video_path, - mask_path=mask_path, + detections_path=detections_path, output_path=output_path, blur_size=blur_size, fmt=bake_format.lower(), diff --git a/operators/generate_mask.py b/operators/generate_mask.py index 64bc65d..3d2226c 100644 --- a/operators/generate_mask.py +++ b/operators/generate_mask.py @@ -55,8 +55,7 @@ class SEQUENCER_OT_generate_face_mask(Operator): # Check cache - if masks already exist, use them expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1 if self._check_cache(output_dir, expected_frame_count): - self.report({'INFO'}, f"Using cached masks from {output_dir}") - self._add_mask_strip(context, strip.name, output_dir) + self.report({'INFO'}, f"Using cached detections from {output_dir}") return {'FINISHED'} # Get frame range @@ -71,19 +70,25 @@ class SEQUENCER_OT_generate_face_mask(Operator): self.report({'WARNING'}, "Mask generation already in progress") return {'CANCELLED'} - # Store strip name for callback - strip_name = strip.name - def on_complete(status, data): """Called when mask generation completes.""" + wm = context.window_manager + wm.mask_total = max(wm.mask_total, generator.total_frames) + if status == "done": + wm.mask_progress = wm.mask_total + elif status in {"error", "cancelled"}: + wm.mask_progress = min(wm.mask_progress, wm.mask_total) + if status == "done": - # Add mask strip to sequence editor - self._add_mask_strip(context, strip_name, data) print(f"[FaceMask] Mask generation completed: {data}") elif status == "error": print(f"[FaceMask] Error: {data}") elif status == "cancelled": print("[FaceMask] Generation cancelled") + + for area in context.screen.areas: + if area.type == 'SEQUENCE_EDITOR': + area.tag_redraw() def on_progress(current, total): """Called on progress updates.""" @@ -143,119 +148,27 @@ class SEQUENCER_OT_generate_face_mask(Operator): if not os.path.exists(cache_dir): return False - # Check for MP4 video (new format) - mask_video = os.path.join(cache_dir, "mask.mp4") - if os.path.exists(mask_video): - # Prefer frame-count verification when cv2 is available, but do not - # hard-fail on Blender Python environments without cv2. - try: - import cv2 + detections_path = os.path.join(cache_dir, "detections.msgpack") + if not os.path.exists(detections_path): + return False - cap = cv2.VideoCapture(mask_video) - if cap.isOpened(): - frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - cap.release() - # Accept cache if at least 90% of frames exist - return frame_count >= expected_frames * 0.9 - cap.release() + # Quick sanity check: non-empty file + try: + if os.path.getsize(detections_path) <= 0: return False - except Exception: - # Fallback: treat existing MP4 cache as valid when cv2 is unavailable. - return True + except OSError: + return False - # Fallback: check for PNG sequence (backward compatibility) - mask_files = [f for f in os.listdir(cache_dir) - if f.startswith("mask_") and f.endswith(".png")] + # Optional frame count verification if msgpack is available + try: + import msgpack - # Accept cache if at least 90% of frames exist - return len(mask_files) >= expected_frames * 0.9 - - def _add_mask_strip(self, context, source_strip_name: str, mask_path: str): - """Add mask video as a new strip. - - Args: - context: Blender context - source_strip_name: Name of the source video strip - mask_path: Path to mask video file or directory (for backward compatibility) - """ - scene = context.scene - seq_editor = scene.sequence_editor - - if not seq_editor: - return - - # Find source strip (Blender 5.0 uses 'strips' instead of 'sequences') - source_strip = seq_editor.strips.get(source_strip_name) - if not source_strip: - return - - # Check if mask_path is a video file or directory (backward compatibility) - if os.path.isfile(mask_path): - # New format: single MP4 file - mask_video = mask_path - else: - # Old format: directory with PNG sequence (backward compatibility) - mask_video = os.path.join(mask_path, "mask.mp4") - if not os.path.exists(mask_video): - # Fallback to PNG sequence - mask_files = sorted([ - f for f in os.listdir(mask_path) - if f.startswith("mask_") and f.endswith(".png") - ]) - if not mask_files: - return - first_mask = os.path.join(mask_path, mask_files[0]) - self._add_mask_strip_png_sequence(context, source_strip_name, mask_path, mask_files, first_mask) - return - - # Find an empty channel - used_channels = {s.channel for s in seq_editor.strips} - new_channel = source_strip.channel + 1 - while new_channel in used_channels: - new_channel += 1 - - # Add movie strip (Blender 5.0 API) - mask_strip = seq_editor.strips.new_movie( - name=f"{source_strip_name}_mask", - filepath=mask_video, - channel=new_channel, - frame_start=source_strip.frame_final_start, - ) - - # Set blend mode for mask - mask_strip.blend_type = 'ALPHA_OVER' - mask_strip.blend_alpha = 0.5 - - def _add_mask_strip_png_sequence(self, context, source_strip_name, mask_dir, mask_files, first_mask): - """Backward compatibility: Add PNG sequence as mask strip.""" - scene = context.scene - seq_editor = scene.sequence_editor - source_strip = seq_editor.strips.get(source_strip_name) - - if not source_strip: - return - - # Find an empty channel - used_channels = {s.channel for s in seq_editor.strips} - new_channel = source_strip.channel + 1 - while new_channel in used_channels: - new_channel += 1 - - # Add image sequence (Blender 5.0 API) - mask_strip = seq_editor.strips.new_image( - name=f"{source_strip_name}_mask", - filepath=first_mask, - channel=new_channel, - frame_start=source_strip.frame_final_start, - ) - - # Add remaining frames - for mask_file in mask_files[1:]: - mask_strip.elements.append(mask_file) - - # Set blend mode for mask - mask_strip.blend_type = 'ALPHA_OVER' - mask_strip.blend_alpha = 0.5 + with open(detections_path, "rb") as f: + payload = msgpack.unpackb(f.read(), raw=False) + frames = payload.get("frames", []) + return len(frames) >= expected_frames * 0.9 + except Exception: + return True class SEQUENCER_OT_cancel_mask_generation(Operator): diff --git a/panels/vse_panel.py b/panels/vse_panel.py index 7644602..4f4e619 100644 --- a/panels/vse_panel.py +++ b/panels/vse_panel.py @@ -5,12 +5,18 @@ Provides a sidebar panel in the Video Sequence Editor for controlling mask generation and blur application. """ +import os import bpy from bpy.types import Panel from ..core.async_bake_generator import get_bake_generator from ..core.async_generator import get_generator -from ..core.utils import get_server_status, get_cache_info, format_size +from ..core.utils import ( + get_server_status, + get_cache_info, + format_size, + get_detections_path_for_strip, +) class SEQUENCER_PT_face_mask(Panel): @@ -186,19 +192,19 @@ class SEQUENCER_PT_face_mask(Panel): row = box.row() row.label(text=f"Strip: {strip.name}") - # Check for existing mask - seq_editor = context.scene.sequence_editor - mask_name = f"{strip.name}_mask" - has_mask = mask_name in seq_editor.strips + detections_path = get_detections_path_for_strip(strip.name) + has_mask = bpy.path.abspath(detections_path) and os.path.exists( + bpy.path.abspath(detections_path) + ) if has_mask: row = box.row() - row.label(text="✓ Mask exists", icon='CHECKMARK') + row.label(text="✓ Detection cache exists", icon='CHECKMARK') # Generate button op = box.operator( "sequencer.generate_face_mask", - text="Generate Face Mask" if not has_mask else "Regenerate Mask", + text="Generate Detection Cache" if not has_mask else "Regenerate Cache", icon='FACE_MAPS', ) @@ -206,14 +212,14 @@ class SEQUENCER_PT_face_mask(Panel): """Draw blur application controls.""" box = layout.box() box.label(text="Blur Bake", icon='MATFLUID') - - # Check for mask strip - seq_editor = context.scene.sequence_editor - mask_name = f"{strip.name}_mask" - has_mask = mask_name in seq_editor.strips + + detections_path = get_detections_path_for_strip(strip.name) + has_mask = bpy.path.abspath(detections_path) and os.path.exists( + bpy.path.abspath(detections_path) + ) if not has_mask: - box.label(text="Generate a mask first", icon='INFO') + box.label(text="Generate detection cache first", icon='INFO') return # Bake parameters diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..77a8608 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +ultralytics +opencv-python-headless +msgpack +numpy +fastapi +uvicorn +pydantic diff --git a/server/main.py b/server/main.py index 5da0b65..2872869 100644 --- a/server/main.py +++ b/server/main.py @@ -33,7 +33,6 @@ fix_library_path() import threading import uuid -import queue import traceback from typing import Dict, Optional, List from pathlib import Path @@ -43,6 +42,7 @@ from pydantic import BaseModel import uvicorn import cv2 import numpy as np +import msgpack # Add project root to path for imports if needed sys.path.append(str(Path(__file__).parent.parent)) @@ -86,7 +86,7 @@ class GenerateRequest(BaseModel): class BakeRequest(BaseModel): video_path: str - mask_path: str + detections_path: str output_path: str blur_size: int = 50 format: str = "mp4" @@ -122,185 +122,79 @@ def _build_video_writer( raise RuntimeError(f"Failed to create video writer for format='{fmt}'") +def _scale_bbox( + x: int, + y: int, + w: int, + h: int, + scale: float, + frame_width: int, + frame_height: int, +) -> Optional[List[int]]: + """Scale bbox around center and clamp to frame boundaries.""" + if w <= 0 or h <= 0: + return None + + center_x = x + (w * 0.5) + center_y = y + (h * 0.5) + scaled_w = max(1, int(w * scale)) + scaled_h = max(1, int(h * scale)) + + x1 = max(0, int(center_x - scaled_w * 0.5)) + y1 = max(0, int(center_y - scaled_h * 0.5)) + x2 = min(frame_width, x1 + scaled_w) + y2 = min(frame_height, y1 + scaled_h) + out_w = x2 - x1 + out_h = y2 - y1 + if out_w <= 0 or out_h <= 0: + return None + return [x1, y1, out_w, out_h] + + def process_video_task(task_id: str, req: GenerateRequest): - """Background task to process video with async MP4 output.""" - writer = None - write_queue = None - writer_thread = None + """Background task to detect faces and save bbox cache as msgpack.""" + cap = None try: tasks[task_id].status = TaskStatus.PROCESSING cancel_event = cancel_events.get(task_id) - # Verify video exists if not os.path.exists(req.video_path): tasks[task_id].status = TaskStatus.FAILED tasks[task_id].message = f"Video not found: {req.video_path}" return - # Initialize detector (will load model on first run) print(f"Loading detector for task {task_id}...") detector = get_detector( conf_threshold=req.conf_threshold, - iou_threshold=req.iou_threshold + iou_threshold=req.iou_threshold, ) _ = detector.model - # Open video cap = cv2.VideoCapture(req.video_path) if not cap.isOpened(): tasks[task_id].status = TaskStatus.FAILED tasks[task_id].message = "Failed to open video" return - # Get video properties - fps = cap.get(cv2.CAP_PROP_FPS) + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) end_frame = min(req.end_frame, total_video_frames - 1) frames_to_process = end_frame - req.start_frame + 1 - - tasks[task_id].total = frames_to_process - - # Ensure output directory exists - os.makedirs(req.output_dir, exist_ok=True) - - # Setup MP4 writer (grayscale) - output_video_path = os.path.join(req.output_dir, "mask.mp4") - fourcc = cv2.VideoWriter_fourcc(*'mp4v') - writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height), isColor=False) - - if not writer.isOpened(): + if frames_to_process <= 0: tasks[task_id].status = TaskStatus.FAILED - tasks[task_id].message = "Failed to create video writer" - cap.release() + tasks[task_id].message = "Invalid frame range" return - # Async writer setup - write_queue = queue.Queue(maxsize=30) # Buffer up to 30 frames - writer_running = threading.Event() - writer_running.set() + tasks[task_id].total = frames_to_process + os.makedirs(req.output_dir, exist_ok=True) + output_msgpack_path = os.path.join(req.output_dir, "detections.msgpack") - def async_writer(): - """Background thread for writing frames to video.""" - while writer_running.is_set() or not write_queue.empty(): - try: - mask = write_queue.get(timeout=0.1) - if mask is not None: - writer.write(mask) - write_queue.task_done() - except queue.Empty: - continue - - writer_thread = threading.Thread(target=async_writer, daemon=True) - writer_thread.start() - - print(f"Starting processing: {req.video_path} ({frames_to_process} frames) -> {output_video_path}") - - # Batch processing configuration - BATCH_SIZE = 5 # Optimal batch size for 4K video (72.9% improvement) - frame_buffer = [] - TEMPORAL_SIDE_WEIGHT = 0.7 - TEMPORAL_CENTER_WEIGHT = 1.0 - - # Temporal blending state (streaming, low-memory) - prev_mask = None - curr_mask = None - wrote_first_frame = False - - def _scale_mask(mask: np.ndarray, weight: float) -> np.ndarray: - """Scale mask intensity for temporal blending.""" - if weight == 1.0: - return mask - return cv2.convertScaleAbs(mask, alpha=weight, beta=0) - - def _blend_edge(base: np.ndarray, neighbor: np.ndarray) -> np.ndarray: - """Blend for first/last frame (one-sided temporal context).""" - base_w = _scale_mask(base, TEMPORAL_CENTER_WEIGHT) - neighbor_w = _scale_mask(neighbor, TEMPORAL_SIDE_WEIGHT) - return cv2.max(base_w, neighbor_w) - - def _blend_middle(prev: np.ndarray, cur: np.ndarray, nxt: np.ndarray) -> np.ndarray: - """Blend for middle frames (previous/current/next temporal context).""" - prev_w = _scale_mask(prev, TEMPORAL_SIDE_WEIGHT) - cur_w = _scale_mask(cur, TEMPORAL_CENTER_WEIGHT) - nxt_w = _scale_mask(nxt, TEMPORAL_SIDE_WEIGHT) - return cv2.max(cur_w, cv2.max(prev_w, nxt_w)) - - def push_mask_temporal(raw_mask: np.ndarray): - """Push mask and emit blended output in frame order.""" - nonlocal prev_mask, curr_mask, wrote_first_frame - - if prev_mask is None: - prev_mask = raw_mask - return - - if curr_mask is None: - curr_mask = raw_mask - return - - if not wrote_first_frame: - write_queue.put(_blend_edge(prev_mask, curr_mask)) - wrote_first_frame = True - - # Emit blended current frame using prev/current/next - write_queue.put(_blend_middle(prev_mask, curr_mask, raw_mask)) - - # Slide temporal window - prev_mask = curr_mask - curr_mask = raw_mask - - def flush_temporal_tail(): - """Flush remaining masks after all frames are processed.""" - if prev_mask is None: - return - - # Single-frame case - if curr_mask is None: - write_queue.put(_scale_mask(prev_mask, TEMPORAL_CENTER_WEIGHT)) - return - - # Two-frame case - if not wrote_first_frame: - write_queue.put(_blend_edge(prev_mask, curr_mask)) - - # Always emit last frame with one-sided blend - write_queue.put(_blend_edge(curr_mask, prev_mask)) - - def process_batch(): - """Process accumulated batch of frames.""" - if not frame_buffer: - return - - # Batch inference at full resolution - batch_detections = detector.detect_batch(frame_buffer) - - # Generate masks for each frame - for i, detections in enumerate(batch_detections): - frame = frame_buffer[i] - - # Generate mask at original resolution - mask = detector.generate_mask( - frame.shape, - detections, - mask_scale=req.mask_scale - ) - - # Temporal blend before async write - push_mask_temporal(mask) - - # Clear buffer - frame_buffer.clear() - - # Seek once to the starting frame. Avoid random-access seek on every frame. if req.start_frame > 0: seek_ok = cap.set(cv2.CAP_PROP_POS_FRAMES, req.start_frame) if not seek_ok: - print( - f"[FaceMask] Warning: CAP_PROP_POS_FRAMES seek failed, " - f"fallback to sequential skip ({req.start_frame} frames)" - ) for _ in range(req.start_frame): ret, _ = cap.read() if not ret: @@ -310,65 +204,97 @@ def process_video_task(task_id: str, req: GenerateRequest): ) return - # Process loop with batching + frame_buffer: List[np.ndarray] = [] + frame_detections: List[List[List[float]]] = [] + batch_size = 5 current_count = 0 - for frame_idx in range(req.start_frame, end_frame + 1): + + def process_batch(): + nonlocal current_count + if not frame_buffer: + return + + batch_detections = detector.detect_batch(frame_buffer) + for detections in batch_detections: + packed_detections: List[List[float]] = [] + for x, y, w, h, conf in detections: + scaled = _scale_bbox( + int(x), + int(y), + int(w), + int(h), + float(req.mask_scale), + width, + height, + ) + if scaled is None: + continue + packed_detections.append( + [scaled[0], scaled[1], scaled[2], scaled[3], float(conf)] + ) + frame_detections.append(packed_detections) + current_count += 1 + tasks[task_id].progress = current_count + + frame_buffer.clear() + + print( + f"Starting detection cache generation: {req.video_path} " + f"({frames_to_process} frames) -> {output_msgpack_path}" + ) + + for _ in range(req.start_frame, end_frame + 1): if cancel_event and cancel_event.is_set(): tasks[task_id].status = TaskStatus.CANCELLED tasks[task_id].message = "Cancelled by user" break - # Read next frame sequentially (after one-time initial seek) ret, frame = cap.read() + if not ret: + break - if ret: - # Store frame for batch processing - frame_buffer.append(frame) + frame_buffer.append(frame) + if len(frame_buffer) >= batch_size: + process_batch() - # Process batch when full - if len(frame_buffer) >= BATCH_SIZE: - process_batch() - - # Update progress - current_count += 1 - tasks[task_id].progress = current_count - - # Process remaining frames in buffer if frame_buffer: process_batch() - flush_temporal_tail() - - # Cleanup - writer_running.clear() - write_queue.join() # Wait for all frames to be written - if writer_thread: - writer_thread.join(timeout=5) - - cap.release() - if writer: - writer.release() if tasks[task_id].status == TaskStatus.PROCESSING: + payload = { + "version": 1, + "video_path": req.video_path, + "start_frame": req.start_frame, + "end_frame": req.start_frame + len(frame_detections) - 1, + "width": width, + "height": height, + "fps": fps, + "mask_scale": float(req.mask_scale), + "frames": frame_detections, + } + with open(output_msgpack_path, "wb") as f: + f.write(msgpack.packb(payload, use_bin_type=True)) + tasks[task_id].status = TaskStatus.COMPLETED - tasks[task_id].result_path = output_video_path # Return video path - tasks[task_id].message = "Processing completed successfully" - print(f"Task {task_id} completed: {output_video_path}") - + tasks[task_id].result_path = output_msgpack_path + tasks[task_id].message = "Detection cache completed" + print(f"Task {task_id} completed: {output_msgpack_path}") + except Exception as e: tasks[task_id].status = TaskStatus.FAILED tasks[task_id].message = str(e) print(f"Error in task {task_id}: {e}") traceback.print_exc() finally: - # Cleanup + if cap: + cap.release() if task_id in cancel_events: del cancel_events[task_id] def process_bake_task(task_id: str, req: BakeRequest): - """Background task to bake blur into a regular video file.""" + """Background task to bake blur using bbox detections in msgpack.""" src_cap = None - mask_cap = None writer = None try: @@ -379,59 +305,56 @@ def process_bake_task(task_id: str, req: BakeRequest): tasks[task_id].status = TaskStatus.FAILED tasks[task_id].message = f"Video not found: {req.video_path}" return - - if not os.path.exists(req.mask_path): + if not os.path.exists(req.detections_path): tasks[task_id].status = TaskStatus.FAILED - tasks[task_id].message = f"Mask video not found: {req.mask_path}" + tasks[task_id].message = f"Detections file not found: {req.detections_path}" return src_cap = cv2.VideoCapture(req.video_path) - mask_cap = cv2.VideoCapture(req.mask_path) - if not src_cap.isOpened(): tasks[task_id].status = TaskStatus.FAILED tasks[task_id].message = "Failed to open source video" return - if not mask_cap.isOpened(): + + with open(req.detections_path, "rb") as f: + payload = msgpack.unpackb(f.read(), raw=False) + frames = payload.get("frames") + if not isinstance(frames, list): tasks[task_id].status = TaskStatus.FAILED - tasks[task_id].message = "Failed to open mask video" + tasks[task_id].message = "Invalid detections format: 'frames' is missing" return src_fps = src_cap.get(cv2.CAP_PROP_FPS) or 30.0 src_width = int(src_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) src_height = int(src_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) src_frames = int(src_cap.get(cv2.CAP_PROP_FRAME_COUNT)) - mask_frames = int(mask_cap.get(cv2.CAP_PROP_FRAME_COUNT)) - if src_width <= 0 or src_height <= 0: tasks[task_id].status = TaskStatus.FAILED tasks[task_id].message = "Invalid source video dimensions" return - total = min(src_frames, mask_frames) if src_frames > 0 and mask_frames > 0 else 0 + total = min(src_frames, len(frames)) if src_frames > 0 else len(frames) if total <= 0: tasks[task_id].status = TaskStatus.FAILED - tasks[task_id].message = "Source/mask frame count is zero" + tasks[task_id].message = "Source/detections frame count is zero" return tasks[task_id].total = total output_dir = os.path.dirname(req.output_path) if output_dir: os.makedirs(output_dir, exist_ok=True) - writer = _build_video_writer(req.output_path, req.format, src_fps, src_width, src_height) - # Kernel size must be odd and >= 1 blur_size = max(1, int(req.blur_size)) if blur_size % 2 == 0: blur_size += 1 + feather_radius = max(3, min(25, blur_size // 3)) + feather_kernel = feather_radius * 2 + 1 - print(f"[FaceMask] Starting blur bake: {req.video_path} + {req.mask_path} -> {req.output_path}") - if src_frames != mask_frames: - print( - f"[FaceMask] Warning: frame count mismatch " - f"(src={src_frames}, mask={mask_frames}), processing {total} frames" - ) + print( + f"[FaceMask] Starting blur bake (bbox-msgpack): {req.video_path} + " + f"{req.detections_path} -> {req.output_path}" + ) for idx in range(total): if cancel_event and cancel_event.is_set(): @@ -440,29 +363,61 @@ def process_bake_task(task_id: str, req: BakeRequest): break src_ok, src_frame = src_cap.read() - mask_ok, mask_frame = mask_cap.read() - if not src_ok or not mask_ok: + if not src_ok: break - if mask_frame.ndim == 3: - mask_gray = cv2.cvtColor(mask_frame, cv2.COLOR_BGR2GRAY) - else: - mask_gray = mask_frame + frame_boxes = frames[idx] if idx < len(frames) else [] + if not frame_boxes: + writer.write(src_frame) + tasks[task_id].progress = idx + 1 + continue - if mask_gray.shape[0] != src_height or mask_gray.shape[1] != src_width: - mask_gray = cv2.resize( - mask_gray, - (src_width, src_height), - interpolation=cv2.INTER_LINEAR, - ) + mask_gray = np.zeros((src_height, src_width), dtype=np.uint8) + for box in frame_boxes: + if not isinstance(box, list) or len(box) < 4: + continue + x, y, w, h = int(box[0]), int(box[1]), int(box[2]), int(box[3]) + if w <= 0 or h <= 0: + continue + center = (x + w // 2, y + h // 2) + axes = (max(1, w // 2), max(1, h // 2)) + cv2.ellipse(mask_gray, center, axes, 0, 0, 360, 255, -1) - blurred = cv2.GaussianBlur(src_frame, (blur_size, blur_size), 0) - alpha = (mask_gray.astype(np.float32) / 255.0)[..., np.newaxis] - composed = (src_frame.astype(np.float32) * (1.0 - alpha)) + ( - blurred.astype(np.float32) * alpha + if cv2.countNonZero(mask_gray) == 0: + writer.write(src_frame) + tasks[task_id].progress = idx + 1 + continue + + mask_gray = cv2.GaussianBlur(mask_gray, (feather_kernel, feather_kernel), 0) + _, mask_binary = cv2.threshold(mask_gray, 2, 255, cv2.THRESH_BINARY) + non_zero_coords = cv2.findNonZero(mask_binary) + if non_zero_coords is None: + writer.write(src_frame) + tasks[task_id].progress = idx + 1 + continue + + x, y, w, h = cv2.boundingRect(non_zero_coords) + blur_margin = max(1, (blur_size // 2) + feather_radius) + x1 = max(0, x - blur_margin) + y1 = max(0, y - blur_margin) + x2 = min(src_width, x + w + blur_margin) + y2 = min(src_height, y + h + blur_margin) + + roi_src = src_frame[y1:y2, x1:x2] + roi_mask = mask_gray[y1:y2, x1:x2] + if roi_src.size == 0: + writer.write(src_frame) + tasks[task_id].progress = idx + 1 + continue + + roi_blurred = cv2.GaussianBlur(roi_src, (blur_size, blur_size), 0) + roi_alpha = (roi_mask.astype(np.float32) / 255.0)[..., np.newaxis] + roi_composed = (roi_src.astype(np.float32) * (1.0 - roi_alpha)) + ( + roi_blurred.astype(np.float32) * roi_alpha ) - writer.write(np.clip(composed, 0, 255).astype(np.uint8)) - + output_frame = src_frame.copy() + output_frame[y1:y2, x1:x2] = np.clip(roi_composed, 0, 255).astype(np.uint8) + writer.write(output_frame) tasks[task_id].progress = idx + 1 if tasks[task_id].status == TaskStatus.PROCESSING: @@ -479,8 +434,6 @@ def process_bake_task(task_id: str, req: BakeRequest): finally: if src_cap: src_cap.release() - if mask_cap: - mask_cap.release() if writer: writer.release() if task_id in cancel_events: