blender-mask-peoples/server/detector.py
2026-02-18 20:21:53 +09:00

314 lines
10 KiB
Python

"""
YOLOv8 Pose Head Detector using PyTorch with ROCm support.
Detects human heads from all angles (frontal, profile, rear) by using
YOLOv8 pose estimation and extracting head bounding boxes from keypoints.
"""
import os
from typing import List, Tuple, Optional
import numpy as np
# COCO pose keypoint indices
_HEAD_KP = [0, 1, 2, 3, 4] # nose, left_eye, right_eye, left_ear, right_ear
_SHOULDER_KP = [5, 6] # left_shoulder, right_shoulder
_KP_CONF_THRESH = 0.3
def _head_bbox_from_pose(
kp_xy: np.ndarray,
kp_conf: np.ndarray,
person_x1: float,
person_y1: float,
person_x2: float,
person_y2: float,
) -> Tuple[int, int, int, int]:
"""
Estimate head bounding box (x, y, w, h) from COCO pose keypoints.
Strategy:
1. Use head keypoints (0-4: nose, eyes, ears) if visible.
2. Fall back to shoulder keypoints (5-6) to infer head position.
3. Last resort: use top of the person bounding box.
"""
person_w = max(person_x2 - person_x1, 1.0)
# --- Step 1: head keypoints ---
visible_head = [
(float(kp_xy[i][0]), float(kp_xy[i][1]))
for i in _HEAD_KP
if float(kp_conf[i]) > _KP_CONF_THRESH
]
if visible_head:
xs = [p[0] for p in visible_head]
ys = [p[1] for p in visible_head]
kp_x1, kp_y1 = min(xs), min(ys)
kp_x2, kp_y2 = max(xs), max(ys)
span = max(kp_x2 - kp_x1, kp_y2 - kp_y1, 1.0)
cx = (kp_x1 + kp_x2) / 2.0
cy = (kp_y1 + kp_y2) / 2.0
# Head radius: inter-landmark span ≈ 80% of head width, so expand by ~1.25
# Shift center upward slightly to include scalp
r = max(span * 1.25, person_w * 0.20)
x1 = int(cx - r)
y1 = int(cy - r * 1.15) # extra margin above (scalp)
x2 = int(cx + r)
y2 = int(cy + r * 0.85) # less margin below (chin)
return x1, y1, x2 - x1, y2 - y1
# --- Step 2: shoulder keypoints ---
visible_shoulder = [
(float(kp_xy[i][0]), float(kp_xy[i][1]))
for i in _SHOULDER_KP
if float(kp_conf[i]) > _KP_CONF_THRESH
]
if visible_shoulder:
cx = sum(p[0] for p in visible_shoulder) / len(visible_shoulder)
cy_sh = sum(p[1] for p in visible_shoulder) / len(visible_shoulder)
if len(visible_shoulder) == 2:
sh_width = abs(visible_shoulder[1][0] - visible_shoulder[0][0])
else:
sh_width = person_w * 0.5
r = max(sh_width * 0.5, person_w * 0.20)
cy = cy_sh - r * 1.3 # head center is above shoulders
x1 = int(cx - r)
y1 = int(cy - r)
x2 = int(cx + r)
y2 = int(cy + r)
return x1, y1, x2 - x1, y2 - y1
# --- Step 3: person bbox top ---
r = max(person_w * 0.35, 20.0)
cx = (person_x1 + person_x2) / 2.0
x1 = int(cx - r)
y1 = int(person_y1)
x2 = int(cx + r)
y2 = int(person_y1 + r * 2.0)
return x1, y1, x2 - x1, y2 - y1
class YOLOPoseHeadDetector:
"""
Head detector using YOLOv8 pose estimation with PyTorch ROCm support.
Extracts head bounding boxes from COCO pose keypoints (nose, eyes, ears)
so that detection works regardless of the person's facing direction.
"""
# Standard Ultralytics model — auto-downloaded on first use
DEFAULT_MODEL = os.path.join("models", "yolov8n-pose.pt")
def __init__(
self,
model_path: Optional[str] = None,
conf_threshold: float = 0.25,
iou_threshold: float = 0.45,
input_size: Tuple[int, int] = (640, 640),
):
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.input_size = input_size
self._model = None
self._model_path = model_path
self._device = None
@property
def model(self):
"""Lazy-load YOLO pose model."""
if self._model is None:
from ultralytics import YOLO
import torch
# Use provided path or let Ultralytics auto-download the default
if self._model_path is not None:
if not os.path.exists(self._model_path):
raise FileNotFoundError(f"Model not found: {self._model_path}")
model_path = self._model_path
else:
model_path = self.DEFAULT_MODEL
os.makedirs(os.path.dirname(model_path), exist_ok=True)
if torch.cuda.is_available():
self._device = 'cuda'
device_name = torch.cuda.get_device_name(0)
print(f"[FaceMask] Using ROCm GPU for inference: {device_name}")
else:
self._device = 'cpu'
print("[FaceMask] Using CPU for inference (ROCm GPU not available)")
try:
self._model = YOLO(model_path)
print(f"[FaceMask] Pose model loaded: {model_path}")
print(f"[FaceMask] Device: {self._device}")
except Exception as e:
print(f"[FaceMask] Error loading model: {e}")
import traceback
traceback.print_exc()
raise
return self._model
def _results_to_detections(self, result) -> List[Tuple[int, int, int, int, float]]:
"""Convert a single YOLO pose result to (x, y, w, h, conf) tuples."""
detections = []
if result.boxes is None or result.keypoints is None:
return detections
boxes = result.boxes
keypoints = result.keypoints
for i, box in enumerate(boxes):
conf = float(box.conf[0].cpu().numpy())
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
# Extract keypoints for this person
kp_data = keypoints.data[i].cpu().numpy() # shape (17, 3): x, y, conf
kp_xy = kp_data[:, :2]
kp_conf = kp_data[:, 2]
hx, hy, hw, hh = _head_bbox_from_pose(
kp_xy, kp_conf,
float(x1), float(y1), float(x2), float(y2),
)
detections.append((hx, hy, hw, hh, conf))
return detections
def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int, float]]:
"""
Detect heads in a frame.
Args:
frame: BGR image as numpy array (H, W, C)
Returns:
List of detections as (x, y, width, height, confidence)
"""
import torch
print(f"[FaceMask] Inference device: {self._device}, CUDA available: {torch.cuda.is_available()}")
try:
results = self.model.predict(
frame,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device=self._device,
)
except Exception as e:
print(f"[FaceMask] ERROR during inference: {e}")
import traceback
traceback.print_exc()
print("[FaceMask] Falling back to CPU inference...")
self._device = 'cpu'
results = self.model.predict(
frame,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device='cpu',
)
if results:
return self._results_to_detections(results[0])
return []
def detect_batch(self, frames: List[np.ndarray]) -> List[List[Tuple[int, int, int, int, float]]]:
"""
Detect heads in multiple frames at once (batch processing).
Args:
frames: List of BGR images as numpy arrays (H, W, C)
Returns:
List of detection lists, one per frame.
Each detection: (x, y, width, height, confidence)
"""
if not frames:
return []
try:
results = self.model.predict(
frames,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device=self._device,
)
except Exception as e:
print(f"[FaceMask] ERROR during batch inference: {e}")
import traceback
traceback.print_exc()
print("[FaceMask] Falling back to CPU inference...")
self._device = 'cpu'
results = self.model.predict(
frames,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device='cpu',
)
return [self._results_to_detections(r) for r in results]
def generate_mask(
self,
frame_shape: Tuple[int, int, int],
detections: List[Tuple[int, int, int, int, float]],
mask_scale: float = 1.5,
feather_radius: int = 20,
) -> np.ndarray:
"""
Generate a mask image from head detections.
Args:
frame_shape: Shape of the original frame (height, width, channels)
detections: List of head detections (x, y, w, h, conf)
mask_scale: Scale factor for mask region
feather_radius: Radius for edge feathering
Returns:
Grayscale mask image (white = blur, black = keep)
"""
import cv2
height, width = frame_shape[:2]
mask = np.zeros((height, width), dtype=np.uint8)
for (x, y, w, h, conf) in detections:
center_x = x + w // 2
center_y = y + h // 2
scaled_w = int(w * mask_scale)
scaled_h = int(h * mask_scale)
cv2.ellipse(
mask,
(center_x, center_y),
(scaled_w // 2, scaled_h // 2),
0, 0, 360,
255, -1,
)
if feather_radius > 0 and len(detections) > 0:
kernel_size = feather_radius * 2 + 1
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0)
return mask
# Singleton instance
_detector: Optional[YOLOPoseHeadDetector] = None
def get_detector(**kwargs) -> YOLOPoseHeadDetector:
"""Get or create the global YOLO pose head detector instance."""
global _detector
if _detector is None:
_detector = YOLOPoseHeadDetector(**kwargs)
return _detector