433 lines
14 KiB
Python
433 lines
14 KiB
Python
"""
|
|
YOLOv8 Head Detector using CrowdHuman-trained model with PyTorch ROCm support.
|
|
|
|
Directly detects human heads (frontal, profile, rear) using the Owen718
|
|
CrowdHuman YOLOv8 model, which was trained on dense crowd scenes.
|
|
"""
|
|
|
|
import os
|
|
from typing import List, Tuple, Optional
|
|
import numpy as np
|
|
|
|
|
|
def _download_model(dest_path: str):
|
|
"""モデルが存在しない場合に手動ダウンロード手順を表示して例外を送出する。"""
|
|
gdrive_id = "1qlBmiEU4GBV13fxPhLZqjhjBbREvs8-m"
|
|
raise RuntimeError(
|
|
f"モデルファイルが見つかりません: {dest_path}\n"
|
|
"以下の手順でダウンロードしてください:\n"
|
|
f" 1. https://drive.google.com/file/d/{gdrive_id} を開く\n"
|
|
f" 2. ダウンロードしたファイルを {dest_path} に配置する"
|
|
)
|
|
|
|
|
|
class YOLOHeadDetector:
|
|
"""
|
|
Head detector using CrowdHuman-trained YOLOv8 model with PyTorch ROCm support.
|
|
|
|
Directly detects heads (class 0: head) without pose estimation,
|
|
enabling robust detection of rear-facing, side-facing, and partially
|
|
visible people in dense crowd scenes.
|
|
"""
|
|
|
|
DEFAULT_MODEL = os.path.join("models", "crowdhuman_yolov8_head.pt")
|
|
|
|
def __init__(
|
|
self,
|
|
model_path: Optional[str] = None,
|
|
conf_threshold: float = 0.25,
|
|
iou_threshold: float = 0.45,
|
|
input_size: Tuple[int, int] = (640, 640),
|
|
):
|
|
self.conf_threshold = conf_threshold
|
|
self.iou_threshold = iou_threshold
|
|
self.input_size = input_size
|
|
self._model = None
|
|
self._model_path = model_path
|
|
self._device = None
|
|
|
|
@property
|
|
def model(self):
|
|
"""Lazy-load YOLO head detection model."""
|
|
if self._model is None:
|
|
from ultralytics import YOLO
|
|
import torch
|
|
|
|
if self._model_path is not None:
|
|
if not os.path.exists(self._model_path):
|
|
raise FileNotFoundError(f"Model not found: {self._model_path}")
|
|
model_path = self._model_path
|
|
else:
|
|
model_path = self.DEFAULT_MODEL
|
|
if not os.path.exists(model_path):
|
|
_download_model(model_path)
|
|
|
|
if torch.cuda.is_available():
|
|
self._device = 'cuda'
|
|
device_name = torch.cuda.get_device_name(0)
|
|
print(f"[FaceMask] Using ROCm GPU for inference: {device_name}")
|
|
else:
|
|
self._device = 'cpu'
|
|
print("[FaceMask] Using CPU for inference (ROCm GPU not available)")
|
|
|
|
try:
|
|
self._model = YOLO(model_path)
|
|
print(f"[FaceMask] Head detection model loaded: {model_path}")
|
|
print(f"[FaceMask] Device: {self._device}")
|
|
except Exception as e:
|
|
print(f"[FaceMask] Error loading model: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
return self._model
|
|
|
|
def _results_to_detections(self, result) -> List[Tuple[int, int, int, int, float]]:
|
|
"""Convert a single YOLO result to (x, y, w, h, conf) tuples."""
|
|
if result.boxes is None:
|
|
return []
|
|
detections = []
|
|
for box in result.boxes:
|
|
conf = float(box.conf[0].cpu().numpy())
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
detections.append((int(x1), int(y1), int(x2 - x1), int(y2 - y1), conf))
|
|
return detections
|
|
|
|
def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int, float]]:
|
|
"""
|
|
Detect heads in a frame.
|
|
|
|
Args:
|
|
frame: BGR image as numpy array (H, W, C)
|
|
|
|
Returns:
|
|
List of detections as (x, y, width, height, confidence)
|
|
"""
|
|
import torch
|
|
print(f"[FaceMask] Inference device: {self._device}, CUDA available: {torch.cuda.is_available()}")
|
|
try:
|
|
results = self.model.predict(
|
|
frame,
|
|
conf=self.conf_threshold,
|
|
iou=self.iou_threshold,
|
|
imgsz=self.input_size[0],
|
|
verbose=False,
|
|
device=self._device,
|
|
)
|
|
except Exception as e:
|
|
print(f"[FaceMask] ERROR during inference: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
print("[FaceMask] Falling back to CPU inference...")
|
|
self._device = 'cpu'
|
|
results = self.model.predict(
|
|
frame,
|
|
conf=self.conf_threshold,
|
|
iou=self.iou_threshold,
|
|
imgsz=self.input_size[0],
|
|
verbose=False,
|
|
device='cpu',
|
|
)
|
|
|
|
if results:
|
|
return self._results_to_detections(results[0])
|
|
return []
|
|
|
|
def detect_batch(self, frames: List[np.ndarray]) -> List[List[Tuple[int, int, int, int, float]]]:
|
|
"""
|
|
Detect heads in multiple frames at once (batch processing).
|
|
|
|
Args:
|
|
frames: List of BGR images as numpy arrays (H, W, C)
|
|
|
|
Returns:
|
|
List of detection lists, one per frame.
|
|
Each detection: (x, y, width, height, confidence)
|
|
"""
|
|
if not frames:
|
|
return []
|
|
|
|
try:
|
|
results = self.model.predict(
|
|
frames,
|
|
conf=self.conf_threshold,
|
|
iou=self.iou_threshold,
|
|
imgsz=self.input_size[0],
|
|
verbose=False,
|
|
device=self._device,
|
|
)
|
|
except Exception as e:
|
|
print(f"[FaceMask] ERROR during batch inference: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
print("[FaceMask] Falling back to CPU inference...")
|
|
self._device = 'cpu'
|
|
results = self.model.predict(
|
|
frames,
|
|
conf=self.conf_threshold,
|
|
iou=self.iou_threshold,
|
|
imgsz=self.input_size[0],
|
|
verbose=False,
|
|
device='cpu',
|
|
)
|
|
|
|
return [self._results_to_detections(r) for r in results]
|
|
|
|
def generate_mask(
|
|
self,
|
|
frame_shape: Tuple[int, int, int],
|
|
detections: List[Tuple[int, int, int, int, float]],
|
|
mask_scale: float = 1.5,
|
|
feather_radius: int = 20,
|
|
) -> np.ndarray:
|
|
"""
|
|
Generate a mask image from head detections.
|
|
|
|
Args:
|
|
frame_shape: Shape of the original frame (height, width, channels)
|
|
detections: List of head detections (x, y, w, h, conf)
|
|
mask_scale: Scale factor for mask region
|
|
feather_radius: Radius for edge feathering
|
|
|
|
Returns:
|
|
Grayscale mask image (white = blur, black = keep)
|
|
"""
|
|
import cv2
|
|
|
|
height, width = frame_shape[:2]
|
|
mask = np.zeros((height, width), dtype=np.uint8)
|
|
|
|
for (x, y, w, h, conf) in detections:
|
|
center_x = x + w // 2
|
|
center_y = y + h // 2
|
|
scaled_w = int(w * mask_scale)
|
|
scaled_h = int(h * mask_scale)
|
|
|
|
cv2.ellipse(
|
|
mask,
|
|
(center_x, center_y),
|
|
(scaled_w // 2, scaled_h // 2),
|
|
0, 0, 360,
|
|
255, -1,
|
|
)
|
|
|
|
if feather_radius > 0 and len(detections) > 0:
|
|
kernel_size = feather_radius * 2 + 1
|
|
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0)
|
|
|
|
return mask
|
|
|
|
|
|
# Singleton instance
|
|
_detector: Optional[YOLOHeadDetector] = None
|
|
|
|
|
|
def get_detector(**kwargs) -> YOLOHeadDetector:
|
|
"""Get or create the global YOLO head detector instance."""
|
|
global _detector
|
|
if _detector is None:
|
|
_detector = YOLOHeadDetector(**kwargs)
|
|
return _detector
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pose-based head detector (YOLOv8 pose estimation)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# COCO pose keypoint indices
|
|
_HEAD_KP = [0, 1, 2, 3, 4] # nose, left_eye, right_eye, left_ear, right_ear
|
|
_SHOULDER_KP = [5, 6] # left_shoulder, right_shoulder
|
|
_KP_CONF_THRESH = 0.3
|
|
|
|
|
|
def _head_bbox_from_pose(
|
|
kp_xy: np.ndarray,
|
|
kp_conf: np.ndarray,
|
|
person_x1: float,
|
|
person_y1: float,
|
|
person_x2: float,
|
|
person_y2: float,
|
|
) -> Tuple[int, int, int, int]:
|
|
"""
|
|
Estimate head bounding box (x, y, w, h) from COCO pose keypoints.
|
|
|
|
Strategy:
|
|
1. Use head keypoints (0-4: nose, eyes, ears) if visible.
|
|
2. Fall back to shoulder keypoints (5-6) to infer head position.
|
|
3. Last resort: use top of the person bounding box.
|
|
"""
|
|
person_w = max(person_x2 - person_x1, 1.0)
|
|
|
|
# --- Step 1: head keypoints ---
|
|
visible_head = [
|
|
(float(kp_xy[i][0]), float(kp_xy[i][1]))
|
|
for i in _HEAD_KP
|
|
if float(kp_conf[i]) > _KP_CONF_THRESH
|
|
]
|
|
if visible_head:
|
|
xs = [p[0] for p in visible_head]
|
|
ys = [p[1] for p in visible_head]
|
|
kp_x1, kp_y1 = min(xs), min(ys)
|
|
kp_x2, kp_y2 = max(xs), max(ys)
|
|
span = max(kp_x2 - kp_x1, kp_y2 - kp_y1, 1.0)
|
|
cx = (kp_x1 + kp_x2) / 2.0
|
|
cy = (kp_y1 + kp_y2) / 2.0
|
|
r = max(span * 0.5, person_w * 0.10)
|
|
x1 = int(cx - r)
|
|
y1 = int(cy - r)
|
|
x2 = int(cx + r)
|
|
y2 = int(cy + r)
|
|
return x1, y1, x2 - x1, y2 - y1
|
|
|
|
# --- Step 2: shoulder keypoints ---
|
|
visible_shoulder = [
|
|
(float(kp_xy[i][0]), float(kp_xy[i][1]))
|
|
for i in _SHOULDER_KP
|
|
if float(kp_conf[i]) > _KP_CONF_THRESH
|
|
]
|
|
if visible_shoulder:
|
|
cx = sum(p[0] for p in visible_shoulder) / len(visible_shoulder)
|
|
cy_sh = sum(p[1] for p in visible_shoulder) / len(visible_shoulder)
|
|
if len(visible_shoulder) == 2:
|
|
sh_width = abs(visible_shoulder[1][0] - visible_shoulder[0][0])
|
|
else:
|
|
sh_width = person_w * 0.5
|
|
r = max(sh_width * 0.3, person_w * 0.12)
|
|
cy = cy_sh - r * 1.3
|
|
x1 = int(cx - r)
|
|
y1 = int(cy - r)
|
|
x2 = int(cx + r)
|
|
y2 = int(cy + r)
|
|
return x1, y1, x2 - x1, y2 - y1
|
|
|
|
# --- Step 3: person bbox top ---
|
|
r = max(person_w * 0.15, 20.0)
|
|
cx = (person_x1 + person_x2) / 2.0
|
|
x1 = int(cx - r)
|
|
y1 = int(person_y1)
|
|
x2 = int(cx + r)
|
|
y2 = int(person_y1 + r * 2.0)
|
|
return x1, y1, x2 - x1, y2 - y1
|
|
|
|
|
|
class YOLOPoseHeadDetector:
|
|
"""
|
|
Head detector using YOLOv8 pose estimation with PyTorch ROCm support.
|
|
|
|
Extracts head bounding boxes from COCO pose keypoints (nose, eyes, ears).
|
|
yolov8l-pose.pt is auto-downloaded by Ultralytics on first use.
|
|
"""
|
|
|
|
DEFAULT_MODEL = os.path.join("models", "yolov8l-pose.pt")
|
|
|
|
def __init__(
|
|
self,
|
|
model_path: Optional[str] = None,
|
|
conf_threshold: float = 0.25,
|
|
iou_threshold: float = 0.45,
|
|
input_size: Tuple[int, int] = (640, 640),
|
|
):
|
|
self.conf_threshold = conf_threshold
|
|
self.iou_threshold = iou_threshold
|
|
self.input_size = input_size
|
|
self._model = None
|
|
self._model_path = model_path
|
|
self._device = None
|
|
|
|
@property
|
|
def model(self):
|
|
"""Lazy-load YOLO pose model."""
|
|
if self._model is None:
|
|
from ultralytics import YOLO
|
|
import torch
|
|
|
|
model_path = self._model_path if self._model_path is not None else self.DEFAULT_MODEL
|
|
|
|
if torch.cuda.is_available():
|
|
self._device = 'cuda'
|
|
device_name = torch.cuda.get_device_name(0)
|
|
print(f"[FaceMask] Using ROCm GPU for pose inference: {device_name}")
|
|
else:
|
|
self._device = 'cpu'
|
|
print("[FaceMask] Using CPU for pose inference (ROCm GPU not available)")
|
|
|
|
try:
|
|
self._model = YOLO(model_path)
|
|
print(f"[FaceMask] Pose model loaded: {model_path}")
|
|
print(f"[FaceMask] Device: {self._device}")
|
|
except Exception as e:
|
|
print(f"[FaceMask] Error loading pose model: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
return self._model
|
|
|
|
def _results_to_detections(self, result) -> List[Tuple[int, int, int, int, float]]:
|
|
"""Convert a single YOLO pose result to (x, y, w, h, conf) tuples."""
|
|
detections = []
|
|
if result.boxes is None or result.keypoints is None:
|
|
return detections
|
|
|
|
boxes = result.boxes
|
|
keypoints = result.keypoints
|
|
|
|
for i, box in enumerate(boxes):
|
|
conf = float(box.conf[0].cpu().numpy())
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
|
|
kp_data = keypoints.data[i].cpu().numpy() # shape (17, 3): x, y, conf
|
|
kp_xy = kp_data[:, :2]
|
|
kp_conf = kp_data[:, 2]
|
|
|
|
hx, hy, hw, hh = _head_bbox_from_pose(
|
|
kp_xy, kp_conf,
|
|
float(x1), float(y1), float(x2), float(y2),
|
|
)
|
|
detections.append((hx, hy, hw, hh, conf))
|
|
|
|
return detections
|
|
|
|
def detect_batch(self, frames: List[np.ndarray]) -> List[List[Tuple[int, int, int, int, float]]]:
|
|
"""Detect heads in multiple frames at once (batch processing)."""
|
|
if not frames:
|
|
return []
|
|
|
|
try:
|
|
results = self.model.predict(
|
|
frames,
|
|
conf=self.conf_threshold,
|
|
iou=self.iou_threshold,
|
|
imgsz=self.input_size[0],
|
|
verbose=False,
|
|
device=self._device,
|
|
)
|
|
except Exception as e:
|
|
print(f"[FaceMask] ERROR during pose batch inference: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
print("[FaceMask] Falling back to CPU inference...")
|
|
self._device = 'cpu'
|
|
results = self.model.predict(
|
|
frames,
|
|
conf=self.conf_threshold,
|
|
iou=self.iou_threshold,
|
|
imgsz=self.input_size[0],
|
|
verbose=False,
|
|
device='cpu',
|
|
)
|
|
|
|
return [self._results_to_detections(r) for r in results]
|
|
|
|
|
|
# Pose detector singleton
|
|
_pose_detector: Optional[YOLOPoseHeadDetector] = None
|
|
|
|
|
|
def get_pose_detector(**kwargs) -> YOLOPoseHeadDetector:
|
|
"""Get or create the global YOLO pose head detector instance."""
|
|
global _pose_detector
|
|
if _pose_detector is None:
|
|
_pose_detector = YOLOPoseHeadDetector(**kwargs)
|
|
return _pose_detector
|