This commit is contained in:
Keisuke Hirata 2026-02-06 10:13:26 +09:00
parent 3c28cb0c94
commit c0ad2a551d
13 changed files with 893 additions and 250 deletions

1
.envrc Normal file
View File

@ -0,0 +1 @@
use flake

2
.gitignore vendored
View File

@ -1,7 +1,7 @@
.mask_cache/
*.mp4
test.blend
wheels/
models/
# Python
__pycache__/

5
README.md Normal file
View File

@ -0,0 +1,5 @@
# Blender Plugin: Mask Peoples
街歩き映像に対して自動モザイクを掛けるために開発しました。
使用https://github.com/akanametov/yolo-face

View File

@ -1,9 +1,9 @@
schema_version = "1.0.0"
id = "mask_peoples"
version = "0.2.0"
version = "0.3.0"
name = "Face Mask Blur"
tagline = "Detect faces and apply blur in VSE for privacy protection"
tagline = "GPU-accelerated face detection and blur in VSE using YOLOv11"
maintainer = "Hare"
type = "add-on"
license = ["SPDX:GPL-3.0-or-later"]
@ -15,11 +15,5 @@ copyright = ["2026 Hare"]
# Valid tags from Blender extension platform
tags = ["Sequencer"]
# Bundled Python wheels - Blender will install these automatically
wheels = [
"./wheels/numpy-2.2.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"./wheels/opencv_python_headless-4.13.0.92-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl",
]
[permissions]
files = "Read video frames and write mask image cache"

View File

@ -1,5 +1,4 @@
"""Core module exports."""
from .face_detector import FaceDetector
from .async_generator import AsyncMaskGenerator, get_generator
from .compositor_setup import create_mask_blur_node_tree, get_or_create_blur_node_tree

View File

@ -43,14 +43,14 @@ class AsyncMaskGenerator:
start_frame: int,
end_frame: int,
fps: float,
scale_factor: float = 1.1,
min_neighbors: int = 5,
conf_threshold: float = 0.5,
iou_threshold: float = 0.45,
mask_scale: float = 1.5,
on_complete: Optional[Callable] = None,
on_progress: Optional[Callable] = None,
):
"""
Start asynchronous mask generation.
Start asynchronous mask generation with YOLO GPU acceleration.
Args:
video_path: Path to source video file
@ -58,8 +58,8 @@ class AsyncMaskGenerator:
start_frame: First frame to process
end_frame: Last frame to process
fps: Video frame rate (for seeking)
scale_factor: Face detection scale factor
min_neighbors: Face detection min neighbors
conf_threshold: YOLO confidence threshold
iou_threshold: YOLO NMS IoU threshold
mask_scale: Mask region scale factor
on_complete: Callback when processing completes (called from main thread)
on_progress: Callback for progress updates (called from main thread)
@ -93,8 +93,8 @@ class AsyncMaskGenerator:
start_frame,
end_frame,
fps,
scale_factor,
min_neighbors,
conf_threshold,
iou_threshold,
mask_scale,
),
daemon=True,
@ -120,77 +120,62 @@ class AsyncMaskGenerator:
start_frame: int,
end_frame: int,
fps: float,
scale_factor: float,
min_neighbors: int,
conf_threshold: float,
iou_threshold: float,
mask_scale: float,
):
"""
Worker thread function. Runs face detection and saves masks.
IMPORTANT: Do NOT use bpy in this function!
Worker thread function. Delegates to inference server and polls status.
"""
try:
import cv2
print(f"[FaceMask] OpenCV loaded: {cv2.__version__}")
from .face_detector import FaceDetector
except ImportError as e:
print(f"[FaceMask] Import error: {e}")
self.result_queue.put(("error", str(e)))
return
import time
from .inference_client import get_client
try:
# Initialize detector
detector = FaceDetector(
scale_factor=scale_factor,
min_neighbors=min_neighbors,
client = get_client()
# Start task on server
print(f"[FaceMask] Requesting generation on server...")
task_id = client.generate_mask(
video_path=video_path,
output_dir=output_dir,
start_frame=start_frame,
end_frame=end_frame,
conf_threshold=conf_threshold,
iou_threshold=iou_threshold,
mask_scale=mask_scale,
)
print(f"[FaceMask] Task started: {task_id}")
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"[FaceMask] Failed to open video: {video_path}")
self.result_queue.put(("error", f"Failed to open video: {video_path}"))
return
total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"[FaceMask] Video opened, total frames: {total_video_frames}")
# Process frames
for frame_idx in range(start_frame, end_frame + 1):
if not self.is_running:
# Poll loop
while self.is_running:
status = client.get_task_status(task_id)
state = status.get("status")
if state == "completed":
self.result_queue.put(("done", output_dir))
return
elif state == "failed":
error_msg = status.get("message", "Unknown server error")
print(f"[FaceMask] Server task failed: {error_msg}")
self.result_queue.put(("error", error_msg))
return
elif state == "cancelled":
self.result_queue.put(("cancelled", None))
return
# Seek to frame
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
# Skip unreadable frames
continue
# Detect faces
detections = detector.detect(frame)
# Generate mask
mask = detector.generate_mask(
frame.shape,
detections,
mask_scale=mask_scale,
)
# Save mask
mask_filename = f"mask_{frame_idx:06d}.png"
mask_path = os.path.join(output_dir, mask_filename)
cv2.imwrite(mask_path, mask)
# Report progress
self.progress_queue.put(("progress", frame_idx - start_frame + 1))
progress = status.get("progress", 0)
if progress > 0:
self.progress_queue.put(("progress", progress))
time.sleep(0.5)
cap.release()
# Report completion
self.result_queue.put(("done", output_dir))
# If loop exited but task not done, cancel server task
print("[FaceMask] Cancelling server task...")
client.cancel_task(task_id)
self.result_queue.put(("cancelled", None))
except Exception as e:
import traceback

View File

@ -1,160 +0,0 @@
"""
Face detector using OpenCV Haar Cascades.
This module provides face detection functionality optimized for
privacy blur in video editing workflows.
"""
import os
from typing import List, Tuple, Optional
import numpy as np
class FaceDetector:
"""
Face detector using OpenCV Haar Cascades.
Optimized for privacy blur use case:
- Detects frontal faces
- Configurable detection sensitivity
- Generates feathered masks for smooth blur edges
"""
def __init__(
self,
scale_factor: float = 1.1,
min_neighbors: int = 5,
min_size: Tuple[int, int] = (30, 30),
):
"""
Initialize the face detector.
Args:
scale_factor: Image pyramid scale factor
min_neighbors: Minimum neighbors for detection
min_size: Minimum face size in pixels
"""
self.scale_factor = scale_factor
self.min_neighbors = min_neighbors
self.min_size = min_size
self._classifier = None
@property
def classifier(self):
"""Lazy-load the Haar cascade classifier."""
if self._classifier is None:
import cv2
# Use haarcascade for frontal face detection
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
if not os.path.exists(cascade_path):
raise RuntimeError(f"Haar cascade not found: {cascade_path}")
self._classifier = cv2.CascadeClassifier(cascade_path)
return self._classifier
def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
"""
Detect faces in a frame.
Args:
frame: BGR image as numpy array
Returns:
List of face bounding boxes as (x, y, width, height)
"""
import cv2
# Convert to grayscale for detection
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Detect faces
faces = self.classifier.detectMultiScale(
gray,
scaleFactor=self.scale_factor,
minNeighbors=self.min_neighbors,
minSize=self.min_size,
flags=cv2.CASCADE_SCALE_IMAGE,
)
# Convert to list of tuples
return [tuple(face) for face in faces]
def generate_mask(
self,
frame_shape: Tuple[int, int, int],
detections: List[Tuple[int, int, int, int]],
mask_scale: float = 1.5,
feather_radius: int = 20,
) -> np.ndarray:
"""
Generate a mask image from face detections.
Args:
frame_shape: Shape of the original frame (height, width, channels)
detections: List of face bounding boxes
mask_scale: Scale factor for mask region (1.0 = exact bounding box)
feather_radius: Radius for edge feathering
Returns:
Grayscale mask image (white = blur, black = keep)
"""
import cv2
height, width = frame_shape[:2]
mask = np.zeros((height, width), dtype=np.uint8)
for (x, y, w, h) in detections:
# Scale the bounding box
center_x = x + w // 2
center_y = y + h // 2
scaled_w = int(w * mask_scale)
scaled_h = int(h * mask_scale)
# Calculate scaled bounding box
x1 = max(0, center_x - scaled_w // 2)
y1 = max(0, center_y - scaled_h // 2)
x2 = min(width, center_x + scaled_w // 2)
y2 = min(height, center_y + scaled_h // 2)
# Draw ellipse for more natural face shape
cv2.ellipse(
mask,
(center_x, center_y),
(scaled_w // 2, scaled_h // 2),
0, # angle
0, 360, # arc
255, # color (white)
-1, # filled
)
# Apply Gaussian blur for feathering
if feather_radius > 0 and len(detections) > 0:
# Ensure kernel size is odd
kernel_size = feather_radius * 2 + 1
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0)
return mask
def detect_faces_batch(
frames: List[np.ndarray],
detector: Optional[FaceDetector] = None,
) -> List[List[Tuple[int, int, int, int]]]:
"""
Detect faces in multiple frames.
Args:
frames: List of BGR images
detector: Optional detector instance (creates one if not provided)
Returns:
List of detection lists, one per frame
"""
if detector is None:
detector = FaceDetector()
return [detector.detect(frame) for frame in frames]

159
core/inference_client.py Normal file
View File

@ -0,0 +1,159 @@
"""
Client for interacting with the external inference server.
Manages the server process and handles HTTP communication
using standard library (avoiding requests dependency).
"""
import subprocess
import time
import json
import urllib.request
import urllib.error
import threading
import os
import signal
from typing import Optional, Dict, Any, Tuple
class InferenceClient:
"""Client for the YOLO inference server."""
SERVER_URL = "http://127.0.0.1:8181"
def __init__(self):
self.server_process: Optional[subprocess.Popen] = None
self._server_lock = threading.Lock()
def start_server(self):
"""Start the inference server process."""
with self._server_lock:
if self.is_server_running():
return
print("[FaceMask] Starting inference server...")
# Find project root
# Assuming this file is in core/inference_client.py
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
server_script = os.path.join(root_dir, "server", "main.py")
# Use system python (assumed to have dependencies via Nix/venv)
# In user's environment, 'python' should refer to the environment python
python_cmd = "python"
# Start process
self.server_process = subprocess.Popen(
[python_cmd, server_script],
cwd=root_dir,
text=True,
preexec_fn=os.setsid, # Create new process group
)
# Wait for startup
for _ in range(20): # Wait up to 10 seconds
if self.is_server_running():
print("[FaceMask] Server started successfully")
return
# Check if process died
if self.server_process.poll() is not None:
raise RuntimeError(f"Server failed to start (rc={self.server_process.returncode})")
time.sleep(0.5)
raise RuntimeError("Server startup timed out")
def stop_server(self):
"""Stop the inference server."""
with self._server_lock:
if self.server_process:
print("[FaceMask] Stopping server...")
try:
os.killpg(os.getpgid(self.server_process.pid), signal.SIGTERM)
self.server_process.wait(timeout=3)
except (ProcessLookupError, subprocess.TimeoutExpired):
pass
finally:
self.server_process = None
def is_server_running(self) -> bool:
"""Check if server is responding."""
try:
with urllib.request.urlopen(f"{self.SERVER_URL}/status", timeout=1) as response:
return response.status == 200
except (urllib.error.URLError, ConnectionRefusedError, TimeoutError):
return False
def generate_mask(
self,
video_path: str,
output_dir: str,
start_frame: int,
end_frame: int,
conf_threshold: float,
iou_threshold: float,
mask_scale: float,
) -> str:
"""
Request mask generation.
Returns:
task_id (str)
"""
if not self.is_server_running():
self.start_server()
data = {
"video_path": video_path,
"output_dir": output_dir,
"start_frame": start_frame,
"end_frame": end_frame,
"conf_threshold": conf_threshold,
"iou_threshold": iou_threshold,
"mask_scale": mask_scale,
}
req = urllib.request.Request(
f"{self.SERVER_URL}/generate",
data=json.dumps(data).encode('utf-8'),
headers={'Content-Type': 'application/json'},
method='POST'
)
try:
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
return result['id']
except urllib.error.HTTPError as e:
raise RuntimeError(f"Server error: {e.read().decode('utf-8')}")
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""Get status of a task."""
try:
with urllib.request.urlopen(f"{self.SERVER_URL}/tasks/{task_id}") as response:
return json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError:
return {"status": "unknown"}
def cancel_task(self, task_id: str):
"""Cancel a task."""
try:
req = urllib.request.Request(
f"{self.SERVER_URL}/tasks/{task_id}/cancel",
method='POST'
)
with urllib.request.urlopen(req):
pass
except urllib.error.HTTPError:
pass
# Singleton
_client: Optional[InferenceClient] = None
def get_client() -> InferenceClient:
global _client
if _client is None:
_client = InferenceClient()
return _client

61
flake.lock Normal file
View File

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1770115704,
"narHash": "sha256-KHFT9UWOF2yRPlAnSXQJh6uVcgNcWlFqqiAZ7OVlHNc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e6eae2ee2110f3d31110d5c222cd395303343b08",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

48
flake.nix Normal file
View File

@ -0,0 +1,48 @@
{
description = "Blender VoiceVox Plugin Development Environment";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs =
{
self,
nixpkgs,
flake-utils,
}:
flake-utils.lib.eachDefaultSystem (
system:
let
pkgs = nixpkgs.legacyPackages.${system};
in
{
devShells.default = pkgs.mkShell {
buildInputs = with pkgs; [
python311
python311Packages.pip
python311Packages.requests
python311Packages.fastapi
python311Packages.uvicorn
python311Packages.numpy
python311Packages.opencv4
python311Packages.onnxruntime
git
];
shellHook = ''
python --version
blender --version | head -n 1
# Pythonパスにカレントディレクトリを追加
export PYTHONPATH="$PWD:$PYTHONPATH"
# アドオンのインストールパスを環境変数として設定
export BLENDER_USER_SCRIPTS="$HOME/.config/blender/5.0/scripts"
export BLENDER_USER_ADDONS="$BLENDER_USER_SCRIPTS/addons"
'';
};
}
);
}

View File

@ -21,21 +21,21 @@ class SEQUENCER_OT_generate_face_mask(Operator):
bl_description = "Detect faces and generate mask image sequence"
bl_options = {'REGISTER', 'UNDO'}
# Detection parameters
scale_factor: FloatProperty(
name="Scale Factor",
description="Detection scale factor (larger = faster but less accurate)",
default=1.1,
min=1.01,
max=2.0,
# YOLO Detection parameters
conf_threshold: FloatProperty(
name="Confidence",
description="YOLO confidence threshold (higher = fewer false positives)",
default=0.25,
min=0.1,
max=1.0,
)
min_neighbors: IntProperty(
name="Min Neighbors",
description="Minimum neighbors for detection (higher = fewer false positives)",
default=5,
min=1,
max=20,
iou_threshold: FloatProperty(
name="IOU Threshold",
description="Non-maximum suppression IOU threshold",
default=0.45,
min=0.1,
max=1.0,
)
mask_scale: FloatProperty(
@ -133,8 +133,8 @@ class SEQUENCER_OT_generate_face_mask(Operator):
start_frame=0, # Frame indices in video
end_frame=end_frame - start_frame,
fps=fps,
scale_factor=self.scale_factor,
min_neighbors=self.min_neighbors,
conf_threshold=self.conf_threshold,
iou_threshold=self.iou_threshold,
mask_scale=self.mask_scale,
on_complete=on_complete,
on_progress=on_progress,

371
server/detector.py Normal file
View File

@ -0,0 +1,371 @@
"""
YOLOv11 Face Detector using ONNX Runtime with GPU support.
This module provides high-performance face detection using
YOLOv11-face model with CUDA acceleration.
"""
import os
from typing import List, Tuple, Optional
from pathlib import Path
import numpy as np
class YOLOFaceDetector:
"""
YOLOv11 face detector with ONNX Runtime GPU support.
Features:
- CUDA GPU acceleration
- High accuracy face detection
- NMS for overlapping detections
"""
# Default model path relative to this file
DEFAULT_MODEL = "yolov11n-face.onnx"
def __init__(
self,
model_path: Optional[str] = None,
conf_threshold: float = 0.25,
iou_threshold: float = 0.45,
input_size: Tuple[int, int] = (640, 640),
):
"""
Initialize the YOLO face detector.
Args:
model_path: Path to ONNX model file. If None, uses default model.
conf_threshold: Confidence threshold for detections
iou_threshold: IoU threshold for NMS
input_size: Model input size (width, height)
"""
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.input_size = input_size
self._session = None
self._model_path = model_path
@property
def session(self):
"""Lazy-load ONNX Runtime session."""
if self._session is None:
import onnxruntime as ort
# Determine model path
if self._model_path is None:
# Assuming models are in ../models relative to server/detector.py
models_dir = Path(__file__).parent.parent / "models"
model_path = str(models_dir / self.DEFAULT_MODEL)
else:
model_path = self._model_path
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model not found: {model_path}")
# Configure providers (prefer CUDA)
providers = []
if 'CUDAExecutionProvider' in ort.get_available_providers():
providers.append('CUDAExecutionProvider')
print("[FaceMask] Using CUDA GPU for inference")
providers.append('CPUExecutionProvider')
# Create session
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self._session = ort.InferenceSession(
model_path,
sess_options=sess_options,
providers=providers,
)
print(f"[FaceMask] YOLO model loaded: {model_path}")
print(f"[FaceMask] Providers: {self._session.get_providers()}")
return self._session
def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int, float]]:
"""
Detect faces in a frame.
Args:
frame: BGR image as numpy array (H, W, C)
Returns:
List of detections as (x, y, width, height, confidence)
"""
import cv2
original_height, original_width = frame.shape[:2]
input_tensor = self._preprocess(frame)
# print(f"[DEBUG] Input tensor shape: {input_tensor.shape}, Range: [{input_tensor.min():.3f}, {input_tensor.max():.3f}]", flush=True)
# Run inference
input_name = self.session.get_inputs()[0].name
outputs = self.session.run(None, {input_name: input_tensor})
raw_output = outputs[0]
# print(f"[DEBUG] Raw output shape: {raw_output.shape}, Range: [{raw_output.min():.3f}, {raw_output.max():.3f}]", flush=True)
# Postprocess
detections = self._postprocess(
raw_output,
original_width,
original_height,
)
# print(f"[DEBUG] Detections found: {len(detections)}", flush=True)
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
"""Preprocess frame for YOLO input with letterbox resizing."""
import cv2
# Letterbox resize
shape = frame.shape[:2] # current shape [height, width]
new_shape = self.input_size
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
frame = cv2.resize(frame, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
# Add border
frame = cv2.copyMakeBorder(frame, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114))
# Store metadata for postprocessing
self._last_letterbox_meta = {'ratio': ratio, 'dwdh': (dw, dh)}
# Convert BGR to RGB
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Normalize to [0, 1]
normalized = rgb.astype(np.float32) / 255.0
# Transpose to CHW format
transposed = np.transpose(normalized, (2, 0, 1))
# Add batch dimension
batched = np.expand_dims(transposed, axis=0)
return batched
def _postprocess(
self,
output: np.ndarray,
original_width: int,
original_height: int,
) -> List[Tuple[int, int, int, int, float]]:
"""
Postprocess YOLO output to get detections.
"""
# Output shape: [1, num_detections, 5+] where 5 = x_center, y_center, w, h, conf
# Handle different output formats
if output.shape[1] < output.shape[2]:
# Format: [1, 5+, num_detections] - transpose
output = np.transpose(output[0], (1, 0))
else:
output = output[0]
# Debug confidence stats
# if output.shape[1] >= 5:
# max_conf = output[:, 4].max()
# print(f"[DEBUG] Max confidence in raw output: {max_conf:.4f}", flush=True)
# Filter by confidence
confidences = output[:, 4]
mask = confidences > self.conf_threshold
filtered = output[mask]
if len(filtered) == 0:
return []
# Get letterbox metadata
if hasattr(self, '_last_letterbox_meta') and self._last_letterbox_meta:
ratio = self._last_letterbox_meta['ratio']
dw, dh = self._last_letterbox_meta['dwdh']
# Extract coordinates
x_center = filtered[:, 0]
y_center = filtered[:, 1]
width = filtered[:, 2]
height = filtered[:, 3]
confidences = filtered[:, 4]
# Convert center to corner
x1 = x_center - width / 2
y1 = y_center - height / 2
x2 = x_center + width / 2
y2 = y_center + height / 2
# Adjust for letterbox padding
x1 -= dw
y1 -= dh
x2 -= dw
y2 -= dh
# Adjust for resizing
x1 /= ratio[0]
y1 /= ratio[1]
x2 /= ratio[0]
y2 /= ratio[1]
# Clip to image bounds
x1 = np.clip(x1, 0, original_width)
y1 = np.clip(y1, 0, original_height)
x2 = np.clip(x2, 0, original_width)
y2 = np.clip(y2, 0, original_height)
# Convert back to x, y, w, h
final_x = x1
final_y = y1
final_w = x2 - x1
final_h = y2 - y1
else:
# Fallback for non-letterbox (legacy)
scale_x = original_width / self.input_size[0]
scale_y = original_height / self.input_size[1]
x_center = filtered[:, 0] * scale_x
y_center = filtered[:, 1] * scale_y
width = filtered[:, 2] * scale_x
height = filtered[:, 3] * scale_y
confidences = filtered[:, 4]
final_x = x_center - width / 2
final_y = y_center - height / 2
final_w = width
final_h = height
# Apply NMS
boxes = np.stack([final_x, final_y, final_w, final_h], axis=1)
indices = self._nms(boxes, confidences, self.iou_threshold)
# Format output
detections = []
for i in indices:
x = int(final_x[i])
y = int(final_y[i])
w = int(final_w[i])
h = int(final_h[i])
conf = float(confidences[i])
detections.append((x, y, w, h, conf))
return detections
def _nms(
self,
boxes: np.ndarray,
scores: np.ndarray,
iou_threshold: float,
) -> List[int]:
"""Non-Maximum Suppression."""
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = x1 + boxes[:, 2]
y2 = y1 + boxes[:, 3]
areas = boxes[:, 2] * boxes[:, 3]
order = scores.argsort()[::-1]
keep = []
while len(order) > 0:
i = order[0]
keep.append(i)
if len(order) == 1:
break
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0, xx2 - xx1)
h = np.maximum(0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= iou_threshold)[0]
order = order[inds + 1]
return keep
def generate_mask(
self,
frame_shape: Tuple[int, int, int],
detections: List[Tuple[int, int, int, int, float]],
mask_scale: float = 1.5,
feather_radius: int = 20,
) -> np.ndarray:
"""
Generate a mask image from face detections.
Args:
frame_shape: Shape of the original frame (height, width, channels)
detections: List of face detections (x, y, w, h, conf)
mask_scale: Scale factor for mask region
feather_radius: Radius for edge feathering
Returns:
Grayscale mask image (white = blur, black = keep)
"""
import cv2
height, width = frame_shape[:2]
mask = np.zeros((height, width), dtype=np.uint8)
for (x, y, w, h, conf) in detections:
# Scale the bounding box
center_x = x + w // 2
center_y = y + h // 2
scaled_w = int(w * mask_scale)
scaled_h = int(h * mask_scale)
# Draw ellipse for natural face shape
cv2.ellipse(
mask,
(center_x, center_y),
(scaled_w // 2, scaled_h // 2),
0, # angle
0, 360, # arc
255, # color (white)
-1, # filled
)
# Apply Gaussian blur for feathering
if feather_radius > 0 and len(detections) > 0:
kernel_size = feather_radius * 2 + 1
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0)
return mask
# Singleton instance
_detector: Optional[YOLOFaceDetector] = None
def get_detector(**kwargs) -> YOLOFaceDetector:
"""Get or create the global YOLO detector instance."""
global _detector
if _detector is None:
_detector = YOLOFaceDetector(**kwargs)
return _detector

180
server/main.py Normal file
View File

@ -0,0 +1,180 @@
"""
Face Detection Inference Server.
This FastAPI application runs in a separate process to handle
GPU-accelerated face detection using ONNX Runtime.
"""
import os
import sys
import threading
import uuid
import queue
import traceback
from typing import Dict, Optional, List
from pathlib import Path
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import uvicorn
import cv2
import numpy as np
# Add project root to path for imports if needed
sys.path.append(str(Path(__file__).parent.parent))
from server.detector import YOLOFaceDetector, get_detector
app = FastAPI(title="Face Mask Inference Server")
# Task storage
class TaskStatus:
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class Task(BaseModel):
id: str
status: str
progress: int = 0
total: int = 0
message: Optional[str] = None
result_path: Optional[str] = None
# In-memory storage
tasks: Dict[str, Task] = {}
cancel_events: Dict[str, threading.Event] = {}
class GenerateRequest(BaseModel):
video_path: str
output_dir: str
start_frame: int
end_frame: int
conf_threshold: float = 0.5
iou_threshold: float = 0.45
mask_scale: float = 1.5
def process_video_task(task_id: str, req: GenerateRequest):
"""Background task to process video."""
try:
tasks[task_id].status = TaskStatus.PROCESSING
cancel_event = cancel_events.get(task_id)
# Verify video exists
if not os.path.exists(req.video_path):
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = f"Video not found: {req.video_path}"
return
# Initialize detector (will load model on first run)
print(f"Loading detector for task {task_id}...")
detector = get_detector(
conf_threshold=req.conf_threshold,
iou_threshold=req.iou_threshold
)
# Ensure session is loaded
_ = detector.session
# Open video
cap = cv2.VideoCapture(req.video_path)
if not cap.isOpened():
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Failed to open video"
return
# Determine frame range
total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
end_frame = min(req.end_frame, total_video_frames - 1)
frames_to_process = end_frame - req.start_frame + 1
tasks[task_id].total = frames_to_process
# Ensure output directory exists
os.makedirs(req.output_dir, exist_ok=True)
print(f"Starting processing: {req.video_path} ({frames_to_process} frames)")
# Process loop
current_count = 0
for frame_idx in range(req.start_frame, end_frame + 1):
if cancel_event and cancel_event.is_set():
tasks[task_id].status = TaskStatus.CANCELLED
tasks[task_id].message = "Cancelled by user"
break
# Read frame
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
# Detect
detections = detector.detect(frame)
# Generate mask
mask = detector.generate_mask(
frame.shape,
detections,
mask_scale=req.mask_scale
)
# Save
mask_filename = f"mask_{current_count:06d}.png" # Note: using relative index for filename
mask_path = os.path.join(req.output_dir, mask_filename)
cv2.imwrite(mask_path, mask)
# Update progress
current_count += 1
tasks[task_id].progress = current_count
cap.release()
if tasks[task_id].status == TaskStatus.PROCESSING:
tasks[task_id].status = TaskStatus.COMPLETED
tasks[task_id].result_path = req.output_dir
tasks[task_id].message = "Processing completed successfully"
print(f"Task {task_id} completed.")
except Exception as e:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = str(e)
print(f"Error in task {task_id}: {e}")
traceback.print_exc()
finally:
# Cleanup
if task_id in cancel_events:
del cancel_events[task_id]
@app.get("/status")
def get_status():
return {"status": "running", "gpu_available": True} # TODO: check GPU
@app.post("/generate", response_model=Task)
def generate_mask_endpoint(req: GenerateRequest, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
task = Task(id=task_id, status=TaskStatus.PENDING)
tasks[task_id] = task
cancel_events[task_id] = threading.Event()
background_tasks.add_task(process_video_task, task_id, req)
return task
@app.get("/tasks/{task_id}", response_model=Task)
def get_task(task_id: str):
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
return tasks[task_id]
@app.post("/tasks/{task_id}/cancel")
def cancel_task(task_id: str):
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
if task_id in cancel_events:
cancel_events[task_id].set()
return {"message": "Cancellation requested"}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8181)