blender-mask-peoples/server/detector.py

433 lines
14 KiB
Python

"""
YOLOv8 Head Detector using CrowdHuman-trained model with PyTorch ROCm support.
Directly detects human heads (frontal, profile, rear) using the Owen718
CrowdHuman YOLOv8 model, which was trained on dense crowd scenes.
"""
import os
from typing import List, Tuple, Optional
import numpy as np
def _download_model(dest_path: str):
"""モデルが存在しない場合に手動ダウンロード手順を表示して例外を送出する。"""
gdrive_id = "1qlBmiEU4GBV13fxPhLZqjhjBbREvs8-m"
raise RuntimeError(
f"モデルファイルが見つかりません: {dest_path}\n"
"以下の手順でダウンロードしてください:\n"
f" 1. https://drive.google.com/file/d/{gdrive_id} を開く\n"
f" 2. ダウンロードしたファイルを {dest_path} に配置する"
)
class YOLOHeadDetector:
"""
Head detector using CrowdHuman-trained YOLOv8 model with PyTorch ROCm support.
Directly detects heads (class 0: head) without pose estimation,
enabling robust detection of rear-facing, side-facing, and partially
visible people in dense crowd scenes.
"""
DEFAULT_MODEL = os.path.join("models", "crowdhuman_yolov8_head.pt")
def __init__(
self,
model_path: Optional[str] = None,
conf_threshold: float = 0.25,
iou_threshold: float = 0.45,
input_size: Tuple[int, int] = (640, 640),
):
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.input_size = input_size
self._model = None
self._model_path = model_path
self._device = None
@property
def model(self):
"""Lazy-load YOLO head detection model."""
if self._model is None:
from ultralytics import YOLO
import torch
if self._model_path is not None:
if not os.path.exists(self._model_path):
raise FileNotFoundError(f"Model not found: {self._model_path}")
model_path = self._model_path
else:
model_path = self.DEFAULT_MODEL
if not os.path.exists(model_path):
_download_model(model_path)
if torch.cuda.is_available():
self._device = 'cuda'
device_name = torch.cuda.get_device_name(0)
print(f"[FaceMask] Using ROCm GPU for inference: {device_name}")
else:
self._device = 'cpu'
print("[FaceMask] Using CPU for inference (ROCm GPU not available)")
try:
self._model = YOLO(model_path)
print(f"[FaceMask] Head detection model loaded: {model_path}")
print(f"[FaceMask] Device: {self._device}")
except Exception as e:
print(f"[FaceMask] Error loading model: {e}")
import traceback
traceback.print_exc()
raise
return self._model
def _results_to_detections(self, result) -> List[Tuple[int, int, int, int, float]]:
"""Convert a single YOLO result to (x, y, w, h, conf) tuples."""
if result.boxes is None:
return []
detections = []
for box in result.boxes:
conf = float(box.conf[0].cpu().numpy())
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
detections.append((int(x1), int(y1), int(x2 - x1), int(y2 - y1), conf))
return detections
def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int, float]]:
"""
Detect heads in a frame.
Args:
frame: BGR image as numpy array (H, W, C)
Returns:
List of detections as (x, y, width, height, confidence)
"""
import torch
print(f"[FaceMask] Inference device: {self._device}, CUDA available: {torch.cuda.is_available()}")
try:
results = self.model.predict(
frame,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device=self._device,
)
except Exception as e:
print(f"[FaceMask] ERROR during inference: {e}")
import traceback
traceback.print_exc()
print("[FaceMask] Falling back to CPU inference...")
self._device = 'cpu'
results = self.model.predict(
frame,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device='cpu',
)
if results:
return self._results_to_detections(results[0])
return []
def detect_batch(self, frames: List[np.ndarray]) -> List[List[Tuple[int, int, int, int, float]]]:
"""
Detect heads in multiple frames at once (batch processing).
Args:
frames: List of BGR images as numpy arrays (H, W, C)
Returns:
List of detection lists, one per frame.
Each detection: (x, y, width, height, confidence)
"""
if not frames:
return []
try:
results = self.model.predict(
frames,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device=self._device,
)
except Exception as e:
print(f"[FaceMask] ERROR during batch inference: {e}")
import traceback
traceback.print_exc()
print("[FaceMask] Falling back to CPU inference...")
self._device = 'cpu'
results = self.model.predict(
frames,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device='cpu',
)
return [self._results_to_detections(r) for r in results]
def generate_mask(
self,
frame_shape: Tuple[int, int, int],
detections: List[Tuple[int, int, int, int, float]],
mask_scale: float = 1.5,
feather_radius: int = 20,
) -> np.ndarray:
"""
Generate a mask image from head detections.
Args:
frame_shape: Shape of the original frame (height, width, channels)
detections: List of head detections (x, y, w, h, conf)
mask_scale: Scale factor for mask region
feather_radius: Radius for edge feathering
Returns:
Grayscale mask image (white = blur, black = keep)
"""
import cv2
height, width = frame_shape[:2]
mask = np.zeros((height, width), dtype=np.uint8)
for (x, y, w, h, conf) in detections:
center_x = x + w // 2
center_y = y + h // 2
scaled_w = int(w * mask_scale)
scaled_h = int(h * mask_scale)
cv2.ellipse(
mask,
(center_x, center_y),
(scaled_w // 2, scaled_h // 2),
0, 0, 360,
255, -1,
)
if feather_radius > 0 and len(detections) > 0:
kernel_size = feather_radius * 2 + 1
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0)
return mask
# Singleton instance
_detector: Optional[YOLOHeadDetector] = None
def get_detector(**kwargs) -> YOLOHeadDetector:
"""Get or create the global YOLO head detector instance."""
global _detector
if _detector is None:
_detector = YOLOHeadDetector(**kwargs)
return _detector
# ---------------------------------------------------------------------------
# Pose-based head detector (YOLOv8 pose estimation)
# ---------------------------------------------------------------------------
# COCO pose keypoint indices
_HEAD_KP = [0, 1, 2, 3, 4] # nose, left_eye, right_eye, left_ear, right_ear
_SHOULDER_KP = [5, 6] # left_shoulder, right_shoulder
_KP_CONF_THRESH = 0.3
def _head_bbox_from_pose(
kp_xy: np.ndarray,
kp_conf: np.ndarray,
person_x1: float,
person_y1: float,
person_x2: float,
person_y2: float,
) -> Tuple[int, int, int, int]:
"""
Estimate head bounding box (x, y, w, h) from COCO pose keypoints.
Strategy:
1. Use head keypoints (0-4: nose, eyes, ears) if visible.
2. Fall back to shoulder keypoints (5-6) to infer head position.
3. Last resort: use top of the person bounding box.
"""
person_w = max(person_x2 - person_x1, 1.0)
# --- Step 1: head keypoints ---
visible_head = [
(float(kp_xy[i][0]), float(kp_xy[i][1]))
for i in _HEAD_KP
if float(kp_conf[i]) > _KP_CONF_THRESH
]
if visible_head:
xs = [p[0] for p in visible_head]
ys = [p[1] for p in visible_head]
kp_x1, kp_y1 = min(xs), min(ys)
kp_x2, kp_y2 = max(xs), max(ys)
span = max(kp_x2 - kp_x1, kp_y2 - kp_y1, 1.0)
cx = (kp_x1 + kp_x2) / 2.0
cy = (kp_y1 + kp_y2) / 2.0
r = max(span * 0.5, person_w * 0.10)
x1 = int(cx - r)
y1 = int(cy - r)
x2 = int(cx + r)
y2 = int(cy + r)
return x1, y1, x2 - x1, y2 - y1
# --- Step 2: shoulder keypoints ---
visible_shoulder = [
(float(kp_xy[i][0]), float(kp_xy[i][1]))
for i in _SHOULDER_KP
if float(kp_conf[i]) > _KP_CONF_THRESH
]
if visible_shoulder:
cx = sum(p[0] for p in visible_shoulder) / len(visible_shoulder)
cy_sh = sum(p[1] for p in visible_shoulder) / len(visible_shoulder)
if len(visible_shoulder) == 2:
sh_width = abs(visible_shoulder[1][0] - visible_shoulder[0][0])
else:
sh_width = person_w * 0.5
r = max(sh_width * 0.3, person_w * 0.12)
cy = cy_sh - r * 1.3
x1 = int(cx - r)
y1 = int(cy - r)
x2 = int(cx + r)
y2 = int(cy + r)
return x1, y1, x2 - x1, y2 - y1
# --- Step 3: person bbox top ---
r = max(person_w * 0.15, 20.0)
cx = (person_x1 + person_x2) / 2.0
x1 = int(cx - r)
y1 = int(person_y1)
x2 = int(cx + r)
y2 = int(person_y1 + r * 2.0)
return x1, y1, x2 - x1, y2 - y1
class YOLOPoseHeadDetector:
"""
Head detector using YOLOv8 pose estimation with PyTorch ROCm support.
Extracts head bounding boxes from COCO pose keypoints (nose, eyes, ears).
yolov8l-pose.pt is auto-downloaded by Ultralytics on first use.
"""
DEFAULT_MODEL = os.path.join("models", "yolov8l-pose.pt")
def __init__(
self,
model_path: Optional[str] = None,
conf_threshold: float = 0.25,
iou_threshold: float = 0.45,
input_size: Tuple[int, int] = (640, 640),
):
self.conf_threshold = conf_threshold
self.iou_threshold = iou_threshold
self.input_size = input_size
self._model = None
self._model_path = model_path
self._device = None
@property
def model(self):
"""Lazy-load YOLO pose model."""
if self._model is None:
from ultralytics import YOLO
import torch
model_path = self._model_path if self._model_path is not None else self.DEFAULT_MODEL
if torch.cuda.is_available():
self._device = 'cuda'
device_name = torch.cuda.get_device_name(0)
print(f"[FaceMask] Using ROCm GPU for pose inference: {device_name}")
else:
self._device = 'cpu'
print("[FaceMask] Using CPU for pose inference (ROCm GPU not available)")
try:
self._model = YOLO(model_path)
print(f"[FaceMask] Pose model loaded: {model_path}")
print(f"[FaceMask] Device: {self._device}")
except Exception as e:
print(f"[FaceMask] Error loading pose model: {e}")
import traceback
traceback.print_exc()
raise
return self._model
def _results_to_detections(self, result) -> List[Tuple[int, int, int, int, float]]:
"""Convert a single YOLO pose result to (x, y, w, h, conf) tuples."""
detections = []
if result.boxes is None or result.keypoints is None:
return detections
boxes = result.boxes
keypoints = result.keypoints
for i, box in enumerate(boxes):
conf = float(box.conf[0].cpu().numpy())
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
kp_data = keypoints.data[i].cpu().numpy() # shape (17, 3): x, y, conf
kp_xy = kp_data[:, :2]
kp_conf = kp_data[:, 2]
hx, hy, hw, hh = _head_bbox_from_pose(
kp_xy, kp_conf,
float(x1), float(y1), float(x2), float(y2),
)
detections.append((hx, hy, hw, hh, conf))
return detections
def detect_batch(self, frames: List[np.ndarray]) -> List[List[Tuple[int, int, int, int, float]]]:
"""Detect heads in multiple frames at once (batch processing)."""
if not frames:
return []
try:
results = self.model.predict(
frames,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device=self._device,
)
except Exception as e:
print(f"[FaceMask] ERROR during pose batch inference: {e}")
import traceback
traceback.print_exc()
print("[FaceMask] Falling back to CPU inference...")
self._device = 'cpu'
results = self.model.predict(
frames,
conf=self.conf_threshold,
iou=self.iou_threshold,
imgsz=self.input_size[0],
verbose=False,
device='cpu',
)
return [self._results_to_detections(r) for r in results]
# Pose detector singleton
_pose_detector: Optional[YOLOPoseHeadDetector] = None
def get_pose_detector(**kwargs) -> YOLOPoseHeadDetector:
"""Get or create the global YOLO pose head detector instance."""
global _pose_detector
if _pose_detector is None:
_pose_detector = YOLOPoseHeadDetector(**kwargs)
return _pose_detector