""" YOLOv8 Pose Head Detector using PyTorch with ROCm support. Detects human heads from all angles (frontal, profile, rear) by using YOLOv8 pose estimation and extracting head bounding boxes from keypoints. """ import os from typing import List, Tuple, Optional import numpy as np # COCO pose keypoint indices _HEAD_KP = [0, 1, 2, 3, 4] # nose, left_eye, right_eye, left_ear, right_ear _SHOULDER_KP = [5, 6] # left_shoulder, right_shoulder _KP_CONF_THRESH = 0.3 def _head_bbox_from_pose( kp_xy: np.ndarray, kp_conf: np.ndarray, person_x1: float, person_y1: float, person_x2: float, person_y2: float, ) -> Tuple[int, int, int, int]: """ Estimate head bounding box (x, y, w, h) from COCO pose keypoints. Strategy: 1. Use head keypoints (0-4: nose, eyes, ears) if visible. 2. Fall back to shoulder keypoints (5-6) to infer head position. 3. Last resort: use top of the person bounding box. """ person_w = max(person_x2 - person_x1, 1.0) # --- Step 1: head keypoints --- visible_head = [ (float(kp_xy[i][0]), float(kp_xy[i][1])) for i in _HEAD_KP if float(kp_conf[i]) > _KP_CONF_THRESH ] if visible_head: xs = [p[0] for p in visible_head] ys = [p[1] for p in visible_head] kp_x1, kp_y1 = min(xs), min(ys) kp_x2, kp_y2 = max(xs), max(ys) span = max(kp_x2 - kp_x1, kp_y2 - kp_y1, 1.0) cx = (kp_x1 + kp_x2) / 2.0 cy = (kp_y1 + kp_y2) / 2.0 # Head radius: inter-landmark span ≈ 80% of head width, so expand by ~1.25 # Shift center upward slightly to include scalp r = max(span * 1.25, person_w * 0.20) x1 = int(cx - r) y1 = int(cy - r * 1.15) # extra margin above (scalp) x2 = int(cx + r) y2 = int(cy + r * 0.85) # less margin below (chin) return x1, y1, x2 - x1, y2 - y1 # --- Step 2: shoulder keypoints --- visible_shoulder = [ (float(kp_xy[i][0]), float(kp_xy[i][1])) for i in _SHOULDER_KP if float(kp_conf[i]) > _KP_CONF_THRESH ] if visible_shoulder: cx = sum(p[0] for p in visible_shoulder) / len(visible_shoulder) cy_sh = sum(p[1] for p in visible_shoulder) / len(visible_shoulder) if len(visible_shoulder) == 2: sh_width = abs(visible_shoulder[1][0] - visible_shoulder[0][0]) else: sh_width = person_w * 0.5 r = max(sh_width * 0.5, person_w * 0.20) cy = cy_sh - r * 1.3 # head center is above shoulders x1 = int(cx - r) y1 = int(cy - r) x2 = int(cx + r) y2 = int(cy + r) return x1, y1, x2 - x1, y2 - y1 # --- Step 3: person bbox top --- r = max(person_w * 0.35, 20.0) cx = (person_x1 + person_x2) / 2.0 x1 = int(cx - r) y1 = int(person_y1) x2 = int(cx + r) y2 = int(person_y1 + r * 2.0) return x1, y1, x2 - x1, y2 - y1 class YOLOPoseHeadDetector: """ Head detector using YOLOv8 pose estimation with PyTorch ROCm support. Extracts head bounding boxes from COCO pose keypoints (nose, eyes, ears) so that detection works regardless of the person's facing direction. """ # Standard Ultralytics model — auto-downloaded on first use DEFAULT_MODEL = "yolov8n-pose.pt" def __init__( self, model_path: Optional[str] = None, conf_threshold: float = 0.25, iou_threshold: float = 0.45, input_size: Tuple[int, int] = (640, 640), ): self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold self.input_size = input_size self._model = None self._model_path = model_path self._device = None @property def model(self): """Lazy-load YOLO pose model.""" if self._model is None: from ultralytics import YOLO import torch # Use provided path or let Ultralytics auto-download the default if self._model_path is not None: if not os.path.exists(self._model_path): raise FileNotFoundError(f"Model not found: {self._model_path}") model_path = self._model_path else: model_path = self.DEFAULT_MODEL if torch.cuda.is_available(): self._device = 'cuda' device_name = torch.cuda.get_device_name(0) print(f"[FaceMask] Using ROCm GPU for inference: {device_name}") else: self._device = 'cpu' print("[FaceMask] Using CPU for inference (ROCm GPU not available)") try: self._model = YOLO(model_path) print(f"[FaceMask] Pose model loaded: {model_path}") print(f"[FaceMask] Device: {self._device}") except Exception as e: print(f"[FaceMask] Error loading model: {e}") import traceback traceback.print_exc() raise return self._model def _results_to_detections(self, result) -> List[Tuple[int, int, int, int, float]]: """Convert a single YOLO pose result to (x, y, w, h, conf) tuples.""" detections = [] if result.boxes is None or result.keypoints is None: return detections boxes = result.boxes keypoints = result.keypoints for i, box in enumerate(boxes): conf = float(box.conf[0].cpu().numpy()) x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() # Extract keypoints for this person kp_data = keypoints.data[i].cpu().numpy() # shape (17, 3): x, y, conf kp_xy = kp_data[:, :2] kp_conf = kp_data[:, 2] hx, hy, hw, hh = _head_bbox_from_pose( kp_xy, kp_conf, float(x1), float(y1), float(x2), float(y2), ) detections.append((hx, hy, hw, hh, conf)) return detections def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int, float]]: """ Detect heads in a frame. Args: frame: BGR image as numpy array (H, W, C) Returns: List of detections as (x, y, width, height, confidence) """ import torch print(f"[FaceMask] Inference device: {self._device}, CUDA available: {torch.cuda.is_available()}") try: results = self.model.predict( frame, conf=self.conf_threshold, iou=self.iou_threshold, imgsz=self.input_size[0], verbose=False, device=self._device, ) except Exception as e: print(f"[FaceMask] ERROR during inference: {e}") import traceback traceback.print_exc() print("[FaceMask] Falling back to CPU inference...") self._device = 'cpu' results = self.model.predict( frame, conf=self.conf_threshold, iou=self.iou_threshold, imgsz=self.input_size[0], verbose=False, device='cpu', ) if results: return self._results_to_detections(results[0]) return [] def detect_batch(self, frames: List[np.ndarray]) -> List[List[Tuple[int, int, int, int, float]]]: """ Detect heads in multiple frames at once (batch processing). Args: frames: List of BGR images as numpy arrays (H, W, C) Returns: List of detection lists, one per frame. Each detection: (x, y, width, height, confidence) """ if not frames: return [] try: results = self.model.predict( frames, conf=self.conf_threshold, iou=self.iou_threshold, imgsz=self.input_size[0], verbose=False, device=self._device, ) except Exception as e: print(f"[FaceMask] ERROR during batch inference: {e}") import traceback traceback.print_exc() print("[FaceMask] Falling back to CPU inference...") self._device = 'cpu' results = self.model.predict( frames, conf=self.conf_threshold, iou=self.iou_threshold, imgsz=self.input_size[0], verbose=False, device='cpu', ) return [self._results_to_detections(r) for r in results] def generate_mask( self, frame_shape: Tuple[int, int, int], detections: List[Tuple[int, int, int, int, float]], mask_scale: float = 1.5, feather_radius: int = 20, ) -> np.ndarray: """ Generate a mask image from head detections. Args: frame_shape: Shape of the original frame (height, width, channels) detections: List of head detections (x, y, w, h, conf) mask_scale: Scale factor for mask region feather_radius: Radius for edge feathering Returns: Grayscale mask image (white = blur, black = keep) """ import cv2 height, width = frame_shape[:2] mask = np.zeros((height, width), dtype=np.uint8) for (x, y, w, h, conf) in detections: center_x = x + w // 2 center_y = y + h // 2 scaled_w = int(w * mask_scale) scaled_h = int(h * mask_scale) cv2.ellipse( mask, (center_x, center_y), (scaled_w // 2, scaled_h // 2), 0, 0, 360, 255, -1, ) if feather_radius > 0 and len(detections) > 0: kernel_size = feather_radius * 2 + 1 mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0) return mask # Singleton instance _detector: Optional[YOLOPoseHeadDetector] = None def get_detector(**kwargs) -> YOLOPoseHeadDetector: """Get or create the global YOLO pose head detector instance.""" global _detector if _detector is None: _detector = YOLOPoseHeadDetector(**kwargs) return _detector