Compare commits

...

2 Commits

Author SHA1 Message Date
0d63b2ef6d 旧debug資産の整理 2026-02-16 16:08:22 +09:00
e693f5b694 feat: 中間データのmsgpack移行 2026-02-16 16:07:38 +09:00
15 changed files with 282 additions and 1128 deletions

View File

@ -19,7 +19,7 @@
python server/main.py
# サーバーのGPU状態を確認
python test_server_api.py --status
curl -s http://127.0.0.1:8181/status | jq
```
出力例:
@ -40,20 +40,3 @@ python test_server_api.py --status
ROCm Version (HIP): 7.0.51831
======================================================================
```
### 処理プロセスの単体デバッグ
顔検出処理をBlenderから独立してテストできます。
```bash
# 画像ファイルでテスト
python debug_detector.py --image test.jpg
# 動画ファイルでテスト
python debug_detector.py --video test.mp4 --frame 0
# クイックテスト(簡易版)
./test_quick.sh test.jpg
```
詳細は [docs/debugging.md](docs/debugging.md) を参照してください。

View File

@ -29,7 +29,7 @@ class AsyncBakeGenerator:
def start(
self,
video_path: str,
mask_path: str,
detections_path: str,
output_path: str,
blur_size: int,
fmt: str,
@ -53,7 +53,7 @@ class AsyncBakeGenerator:
self.worker_thread = threading.Thread(
target=self._worker,
args=(video_path, mask_path, output_path, blur_size, fmt),
args=(video_path, detections_path, output_path, blur_size, fmt),
daemon=True,
)
self.worker_thread.start()
@ -72,7 +72,7 @@ class AsyncBakeGenerator:
def _worker(
self,
video_path: str,
mask_path: str,
detections_path: str,
output_path: str,
blur_size: int,
fmt: str,
@ -85,7 +85,7 @@ class AsyncBakeGenerator:
client = get_client()
task_id = client.bake_blur(
video_path=video_path,
mask_path=mask_path,
detections_path=detections_path,
output_path=output_path,
blur_size=blur_size,
fmt=fmt,

View File

@ -9,8 +9,7 @@ Blender's UI remains responsive via bpy.app.timers.
import os
import threading
import queue
from functools import partial
from typing import Optional, Callable, Tuple
from typing import Optional, Callable
from pathlib import Path
# Will be imported when running inside Blender
@ -150,9 +149,20 @@ class AsyncMaskGenerator:
while self.is_running:
status = client.get_task_status(task_id)
state = status.get("status")
total = status.get("total", 0)
if total > 0:
self.total_frames = total
if state == "completed":
self.result_queue.put(("done", output_dir))
final_progress = status.get("progress", self.total_frames)
if final_progress >= 0:
self.progress_queue.put(("progress", final_progress))
result_path = status.get(
"result_path",
os.path.join(output_dir, "detections.msgpack"),
)
self.result_queue.put(("done", result_path))
return
elif state == "failed":
@ -167,7 +177,7 @@ class AsyncMaskGenerator:
# Report progress
progress = status.get("progress", 0)
if progress > 0:
if progress >= 0:
self.progress_queue.put(("progress", progress))
time.sleep(0.5)
@ -206,6 +216,16 @@ class AsyncMaskGenerator:
try:
msg_type, data = self.result_queue.get_nowait()
self.is_running = False
# Ensure UI receives a final progress update before completion.
if (
msg_type == "done"
and self.total_frames > 0
and self.current_frame < self.total_frames
and self._on_progress
):
self.current_frame = self.total_frames
self._on_progress(self.current_frame, self.total_frames)
if self._on_complete:
self._on_complete(msg_type, data)

View File

@ -252,7 +252,7 @@ class InferenceClient:
def bake_blur(
self,
video_path: str,
mask_path: str,
detections_path: str,
output_path: str,
blur_size: int,
fmt: str,
@ -268,7 +268,7 @@ class InferenceClient:
data = {
"video_path": video_path,
"mask_path": mask_path,
"detections_path": detections_path,
"output_path": output_path,
"blur_size": blur_size,
"format": fmt,

View File

@ -78,6 +78,11 @@ def get_cache_dir_for_strip(strip_name: str) -> str:
return os.path.join(get_cache_root(), strip_name)
def get_detections_path_for_strip(strip_name: str) -> str:
"""Get msgpack detection cache path for a specific strip."""
return os.path.join(get_cache_dir_for_strip(strip_name), "detections.msgpack")
def get_cache_info(strip_name: Optional[str] = None) -> Tuple[str, int, int]:
"""
Get cache directory information.

View File

@ -1,267 +0,0 @@
#!/usr/bin/env python3
"""
顔検出処理の単体デバッグスクリプト
Usage:
# 画像ファイルで検出をテスト
python debug_detector.py --image path/to/image.jpg
# 動画ファイルで検出をテスト(指定フレームのみ)
python debug_detector.py --video path/to/video.mp4 --frame 100
# 動画ファイルで複数フレームをテスト
python debug_detector.py --video path/to/video.mp4 --start 0 --end 10
# 結果を保存
python debug_detector.py --image test.jpg --output result.jpg
"""
import argparse
import sys
from pathlib import Path
import cv2
import numpy as np
# プロジェクトルートをパスに追加
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from server.detector import YOLOFaceDetector
def draw_detections(image: np.ndarray, detections, mask=None):
"""
検出結果を画像に描画
Args:
image: 元画像BGR
detections: 検出結果のリスト [(x, y, w, h, conf), ...]
mask: マスク画像オプション
Returns:
描画済み画像
"""
output = image.copy()
# マスクをオーバーレイ
if mask is not None:
# マスクを3チャンネルに変換
mask_colored = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
# 赤色でオーバーレイ(半透明)
mask_overlay = np.zeros_like(output)
mask_overlay[:, :, 2] = mask # 赤チャンネル
output = cv2.addWeighted(output, 1.0, mask_overlay, 0.3, 0)
# バウンディングボックスを描画
for (x, y, w, h, conf) in detections:
# ボックス
cv2.rectangle(output, (x, y), (x + w, y + h), (0, 255, 0), 2)
# 信頼度テキスト
label = f"{conf:.2f}"
label_size, baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
y_label = max(y, label_size[1])
cv2.rectangle(
output,
(x, y_label - label_size[1]),
(x + label_size[0], y_label + baseline),
(0, 255, 0),
-1
)
cv2.putText(
output,
label,
(x, y_label),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1
)
return output
def debug_image(args, detector):
"""画像ファイルで検出をデバッグ"""
print(f"画像を読み込み中: {args.image}")
image = cv2.imread(args.image)
if image is None:
print(f"エラー: 画像を読み込めません: {args.image}")
return
print(f"画像サイズ: {image.shape[1]}x{image.shape[0]}")
# 検出実行
print("顔検出を実行中...")
detections = detector.detect(image)
print(f"\n検出結果: {len(detections)}個の顔を検出")
for i, (x, y, w, h, conf) in enumerate(detections):
print(f" [{i+1}] x={x}, y={y}, w={w}, h={h}, conf={conf:.3f}")
# マスク生成
if len(detections) > 0:
mask = detector.generate_mask(
image.shape,
detections,
mask_scale=args.mask_scale,
feather_radius=args.feather_radius
)
else:
mask = None
# 結果を描画
result = draw_detections(image, detections, mask)
# 表示または保存
if args.output:
cv2.imwrite(args.output, result)
print(f"\n結果を保存しました: {args.output}")
if mask is not None and args.save_mask:
mask_path = args.output.replace('.', '_mask.')
cv2.imwrite(mask_path, mask)
print(f"マスクを保存しました: {mask_path}")
else:
cv2.imshow("Detection Result", result)
if mask is not None:
cv2.imshow("Mask", mask)
print("\nキーを押して終了してください...")
cv2.waitKey(0)
cv2.destroyAllWindows()
def debug_video(args, detector):
"""動画ファイルで検出をデバッグ"""
print(f"動画を読み込み中: {args.video}")
cap = cv2.VideoCapture(args.video)
if not cap.isOpened():
print(f"エラー: 動画を開けません: {args.video}")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"動画情報: {width}x{height}, {fps:.2f}fps, {total_frames}フレーム")
# フレーム範囲の決定
start_frame = args.start if args.start is not None else args.frame
end_frame = args.end if args.end is not None else args.frame
start_frame = max(0, min(start_frame, total_frames - 1))
end_frame = max(0, min(end_frame, total_frames - 1))
print(f"処理範囲: フレーム {start_frame} - {end_frame}")
# 出力動画の準備
out_writer = None
if args.output:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_writer = cv2.VideoWriter(args.output, fourcc, fps, (width, height))
# フレーム処理
for frame_idx in range(start_frame, end_frame + 1):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
print(f"警告: フレーム {frame_idx} を読み込めませんでした")
continue
# 検出実行
detections = detector.detect(frame)
# マスク生成
if len(detections) > 0:
mask = detector.generate_mask(
frame.shape,
detections,
mask_scale=args.mask_scale,
feather_radius=args.feather_radius
)
else:
mask = None
# 結果を描画
result = draw_detections(frame, detections, mask)
print(f"フレーム {frame_idx}: {len(detections)}個の顔を検出")
# 保存または表示
if out_writer:
out_writer.write(result)
else:
cv2.imshow(f"Frame {frame_idx}", result)
if mask is not None:
cv2.imshow("Mask", mask)
key = cv2.waitKey(0 if end_frame == start_frame else 30)
if key == ord('q'):
break
cap.release()
if out_writer:
out_writer.release()
print(f"\n結果を保存しました: {args.output}")
else:
cv2.destroyAllWindows()
def main():
parser = argparse.ArgumentParser(
description="顔検出処理の単体デバッグスクリプト",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
# 入力ソース
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument("--image", type=str, help="テスト用画像ファイル")
input_group.add_argument("--video", type=str, help="テスト用動画ファイル")
# 動画用オプション
parser.add_argument("--frame", type=int, default=0, help="処理する動画フレーム番号(デフォルト: 0")
parser.add_argument("--start", type=int, help="処理開始フレーム(動画のみ)")
parser.add_argument("--end", type=int, help="処理終了フレーム(動画のみ)")
# 検出パラメータ
parser.add_argument("--conf", type=float, default=0.5, help="信頼度閾値(デフォルト: 0.5")
parser.add_argument("--iou", type=float, default=0.45, help="NMS IoU閾値デフォルト: 0.45")
parser.add_argument("--mask-scale", type=float, default=1.5, help="マスクスケール(デフォルト: 1.5")
parser.add_argument("--feather-radius", type=int, default=20, help="マスクぼかし半径(デフォルト: 20")
# 出力オプション
parser.add_argument("--output", "-o", type=str, help="結果画像/動画の保存先")
parser.add_argument("--save-mask", action="store_true", help="マスク画像も保存する(画像のみ)")
# モデル
parser.add_argument("--model", type=str, help="カスタムモデルパス")
args = parser.parse_args()
# 検出器を初期化
print("YOLOFaceDetectorを初期化中...")
detector = YOLOFaceDetector(
model_path=args.model,
conf_threshold=args.conf,
iou_threshold=args.iou
)
# モデルを事前ロード
print("モデルをロード中...")
_ = detector.model
print("準備完了\n")
# デバッグ実行
if args.image:
debug_image(args, detector)
else:
debug_video(args, detector)
if __name__ == "__main__":
main()

View File

@ -1,151 +0,0 @@
# デバッグガイド
## 処理プロセスの単体デバッグ
顔検出処理をBlenderアドオンから独立してテストできます。
### セットアップ
```bash
# 仮想環境をアクティベート
source .venv/bin/activate
# 必要なパッケージがインストールされていることを確認
pip install ultralytics opencv-python torch
```
### 基本的な使い方
#### 画像ファイルで検出をテスト
```bash
# 検出結果を画面に表示
python debug_detector.py --image path/to/image.jpg
# 検出結果を保存
python debug_detector.py --image path/to/image.jpg --output result.jpg
# マスク画像も保存
python debug_detector.py --image path/to/image.jpg --output result.jpg --save-mask
```
#### 動画ファイルで検出をテスト
```bash
# 特定のフレームをテスト
python debug_detector.py --video path/to/video.mp4 --frame 100
# フレーム範囲をテスト(画面表示)
python debug_detector.py --video path/to/video.mp4 --start 0 --end 10
# フレーム範囲を処理して動画保存
python debug_detector.py --video path/to/video.mp4 --start 0 --end 100 --output result.mp4
```
### パラメータ調整
```bash
# 信頼度閾値を調整(デフォルト: 0.5
python debug_detector.py --image test.jpg --conf 0.3
# NMS IoU閾値を調整デフォルト: 0.45
python debug_detector.py --image test.jpg --iou 0.5
# マスクサイズを調整(デフォルト: 1.5
python debug_detector.py --image test.jpg --mask-scale 2.0
# マスクのぼかし半径を調整(デフォルト: 20
python debug_detector.py --image test.jpg --feather-radius 30
```
### カスタムモデルの使用
```bash
python debug_detector.py --image test.jpg --model path/to/custom_model.pt
```
## 推論サーバーの単体起動
推論サーバーを単独で起動してテストすることもできます。
### サーバー起動
```bash
# 仮想環境をアクティベート
source .venv/bin/activate
# サーバーを起動ポート8181
python server/main.py
```
### APIテスト
別のターミナルで:
```bash
# サーバー状態を確認
curl http://127.0.0.1:8181/status
# マスク生成をリクエスト
curl -X POST http://127.0.0.1:8181/generate \
-H "Content-Type: application/json" \
-d '{
"video_path": "/path/to/video.mp4",
"output_dir": "/tmp/masks",
"start_frame": 0,
"end_frame": 10,
"conf_threshold": 0.5,
"iou_threshold": 0.45,
"mask_scale": 1.5
}'
# タスクの状態を確認task_idは上記レスポンスから取得
curl http://127.0.0.1:8181/tasks/{task_id}
# タスクをキャンセル
curl -X POST http://127.0.0.1:8181/tasks/{task_id}/cancel
```
## トラブルシューティング
### GPUROCmが認識されない
```bash
# PyTorchがROCmを認識しているか確認
python -c "import torch; print(f'CUDA available: {torch.cuda.is_available()}')"
# ROCm環境変数を確認
echo $ROCM_PATH
echo $HSA_OVERRIDE_GFX_VERSION
```
環境変数が設定されていない場合:
```bash
source .envrc
# または
eval "$(direnv export bash)"
```
### モデルが見つからない
デフォルトモデルは `models/yolov8n-face-lindevs.pt` に配置する必要があります。
```bash
ls -l models/yolov8n-face-lindevs.pt
```
### メモリ不足エラー
大きな動画を処理する場合、メモリ不足になる可能性があります:
- フレーム範囲を小さく分割して処理
- `--conf` 閾値を上げて検出数を減らす
- より小さいモデルを使用
## デバッグのベストプラクティス
1. **まず画像でテスト**: 動画よりも画像の方が早く結果を確認できます
2. **パラメータの影響を理解**: `--conf`、`--mask-scale` などを変えて結果を比較
3. **小さいフレーム範囲から始める**: 動画テストは最初は5-10フレーム程度で
4. **結果を保存して比較**: `--output` オプションで結果を保存し、パラメータごとに比較

View File

@ -61,26 +61,22 @@
# venvをアクティベート
source "$VENV_DIR/bin/activate"
# 必要なパッケージのインストール確認とインストール
# PyTorch ROCm版の導入GPU未認識時のみ
if ! python -c "import torch; print(torch.cuda.is_available())" 2>/dev/null | grep -q "True"; then
echo "[Setup] Installing Python dependencies..."
# まずPyTorch ROCm版をインストールROCm 7.0 nightly - ROCm 7.1.1環境で動作確認済み)
echo "[Setup] Installing PyTorch ROCm dependencies..."
pip install --quiet --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/rocm7.0
# 次に通常のPyPIから他のパッケージをインストール
pip install --quiet \
ultralytics \
opencv-python-headless \
numpy \
fastapi \
uvicorn \
pydantic
# opencv-pythonがインストールされていたら削除headless版のみ使用
pip uninstall -y opencv-python opencv 2>/dev/null || true
# opencv-python-headlessを再インストールして確実にする
pip install --quiet --force-reinstall opencv-python-headless
echo "[Setup] Dependencies installed successfully"
fi
# プロジェクト依存requirements.txtを同期
if [ -f "$PWD/requirements.txt" ]; then
echo "[Setup] Syncing Python dependencies from requirements.txt..."
pip install --quiet -r "$PWD/requirements.txt"
fi
# opencv-pythonが入っていた場合はheadlessに統一
pip uninstall -y opencv-python opencv 2>/dev/null || true
pip install --quiet --upgrade opencv-python-headless
# Pythonパスにカレントディレクトリを追加
export PYTHONPATH="$PWD:$PYTHONPATH"

View File

@ -12,6 +12,7 @@ from bpy.types import Operator
from ..core.async_bake_generator import get_bake_generator
from ..core.async_generator import get_generator as get_mask_generator
from ..core.utils import get_detections_path_for_strip
KEY_ORIGINAL = "facemask_original_filepath"
@ -28,19 +29,9 @@ FORMAT_EXT = {
}
def _find_mask_strip(seq_editor, strip_name: str):
return seq_editor.strips.get(f"{strip_name}_mask")
def _resolve_mask_path(mask_strip) -> str:
if mask_strip.type == "MOVIE":
return bpy.path.abspath(mask_strip.filepath)
return ""
def _output_path(video_strip, mask_path: str, fmt: str) -> str:
def _output_path(video_strip, detections_path: str, fmt: str) -> str:
ext = FORMAT_EXT.get(fmt, "mp4")
out_dir = os.path.dirname(mask_path)
out_dir = os.path.dirname(detections_path)
safe_name = video_strip.name.replace("/", "_").replace("\\", "_")
return os.path.join(out_dir, f"{safe_name}_blurred.{ext}")
@ -83,22 +74,17 @@ class SEQUENCER_OT_bake_and_swap_blur_source(Operator):
scene = context.scene
video_strip = seq_editor.active_strip
mask_strip = _find_mask_strip(seq_editor, video_strip.name)
if not mask_strip:
self.report({"ERROR"}, f"Mask strip not found: {video_strip.name}_mask")
return {"CANCELLED"}
video_path = bpy.path.abspath(video_strip.filepath)
mask_path = _resolve_mask_path(mask_strip)
detections_path = get_detections_path_for_strip(video_strip.name)
if not os.path.exists(video_path):
self.report({"ERROR"}, f"Source video not found: {video_path}")
return {"CANCELLED"}
if not mask_path or not os.path.exists(mask_path):
self.report({"ERROR"}, f"Mask video not found: {mask_path}")
if not os.path.exists(detections_path):
self.report({"ERROR"}, f"Detection cache not found: {detections_path}")
return {"CANCELLED"}
bake_format = scene.facemask_bake_format
output_path = _output_path(video_strip, mask_path, bake_format)
output_path = _output_path(video_strip, detections_path, bake_format)
blur_size = int(scene.facemask_bake_blur_size)
# Reuse baked cache when parameters match and file still exists.
@ -164,7 +150,7 @@ class SEQUENCER_OT_bake_and_swap_blur_source(Operator):
try:
bake_generator.start(
video_path=video_path,
mask_path=mask_path,
detections_path=detections_path,
output_path=output_path,
blur_size=blur_size,
fmt=bake_format.lower(),

View File

@ -55,8 +55,7 @@ class SEQUENCER_OT_generate_face_mask(Operator):
# Check cache - if masks already exist, use them
expected_frame_count = strip.frame_final_end - strip.frame_final_start + 1
if self._check_cache(output_dir, expected_frame_count):
self.report({'INFO'}, f"Using cached masks from {output_dir}")
self._add_mask_strip(context, strip.name, output_dir)
self.report({'INFO'}, f"Using cached detections from {output_dir}")
return {'FINISHED'}
# Get frame range
@ -71,19 +70,25 @@ class SEQUENCER_OT_generate_face_mask(Operator):
self.report({'WARNING'}, "Mask generation already in progress")
return {'CANCELLED'}
# Store strip name for callback
strip_name = strip.name
def on_complete(status, data):
"""Called when mask generation completes."""
wm = context.window_manager
wm.mask_total = max(wm.mask_total, generator.total_frames)
if status == "done":
wm.mask_progress = wm.mask_total
elif status in {"error", "cancelled"}:
wm.mask_progress = min(wm.mask_progress, wm.mask_total)
if status == "done":
# Add mask strip to sequence editor
self._add_mask_strip(context, strip_name, data)
print(f"[FaceMask] Mask generation completed: {data}")
elif status == "error":
print(f"[FaceMask] Error: {data}")
elif status == "cancelled":
print("[FaceMask] Generation cancelled")
for area in context.screen.areas:
if area.type == 'SEQUENCE_EDITOR':
area.tag_redraw()
def on_progress(current, total):
"""Called on progress updates."""
@ -143,119 +148,27 @@ class SEQUENCER_OT_generate_face_mask(Operator):
if not os.path.exists(cache_dir):
return False
# Check for MP4 video (new format)
mask_video = os.path.join(cache_dir, "mask.mp4")
if os.path.exists(mask_video):
# Prefer frame-count verification when cv2 is available, but do not
# hard-fail on Blender Python environments without cv2.
try:
import cv2
detections_path = os.path.join(cache_dir, "detections.msgpack")
if not os.path.exists(detections_path):
return False
cap = cv2.VideoCapture(mask_video)
if cap.isOpened():
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
# Accept cache if at least 90% of frames exist
return frame_count >= expected_frames * 0.9
cap.release()
# Quick sanity check: non-empty file
try:
if os.path.getsize(detections_path) <= 0:
return False
except Exception:
# Fallback: treat existing MP4 cache as valid when cv2 is unavailable.
return True
except OSError:
return False
# Fallback: check for PNG sequence (backward compatibility)
mask_files = [f for f in os.listdir(cache_dir)
if f.startswith("mask_") and f.endswith(".png")]
# Optional frame count verification if msgpack is available
try:
import msgpack
# Accept cache if at least 90% of frames exist
return len(mask_files) >= expected_frames * 0.9
def _add_mask_strip(self, context, source_strip_name: str, mask_path: str):
"""Add mask video as a new strip.
Args:
context: Blender context
source_strip_name: Name of the source video strip
mask_path: Path to mask video file or directory (for backward compatibility)
"""
scene = context.scene
seq_editor = scene.sequence_editor
if not seq_editor:
return
# Find source strip (Blender 5.0 uses 'strips' instead of 'sequences')
source_strip = seq_editor.strips.get(source_strip_name)
if not source_strip:
return
# Check if mask_path is a video file or directory (backward compatibility)
if os.path.isfile(mask_path):
# New format: single MP4 file
mask_video = mask_path
else:
# Old format: directory with PNG sequence (backward compatibility)
mask_video = os.path.join(mask_path, "mask.mp4")
if not os.path.exists(mask_video):
# Fallback to PNG sequence
mask_files = sorted([
f for f in os.listdir(mask_path)
if f.startswith("mask_") and f.endswith(".png")
])
if not mask_files:
return
first_mask = os.path.join(mask_path, mask_files[0])
self._add_mask_strip_png_sequence(context, source_strip_name, mask_path, mask_files, first_mask)
return
# Find an empty channel
used_channels = {s.channel for s in seq_editor.strips}
new_channel = source_strip.channel + 1
while new_channel in used_channels:
new_channel += 1
# Add movie strip (Blender 5.0 API)
mask_strip = seq_editor.strips.new_movie(
name=f"{source_strip_name}_mask",
filepath=mask_video,
channel=new_channel,
frame_start=source_strip.frame_final_start,
)
# Set blend mode for mask
mask_strip.blend_type = 'ALPHA_OVER'
mask_strip.blend_alpha = 0.5
def _add_mask_strip_png_sequence(self, context, source_strip_name, mask_dir, mask_files, first_mask):
"""Backward compatibility: Add PNG sequence as mask strip."""
scene = context.scene
seq_editor = scene.sequence_editor
source_strip = seq_editor.strips.get(source_strip_name)
if not source_strip:
return
# Find an empty channel
used_channels = {s.channel for s in seq_editor.strips}
new_channel = source_strip.channel + 1
while new_channel in used_channels:
new_channel += 1
# Add image sequence (Blender 5.0 API)
mask_strip = seq_editor.strips.new_image(
name=f"{source_strip_name}_mask",
filepath=first_mask,
channel=new_channel,
frame_start=source_strip.frame_final_start,
)
# Add remaining frames
for mask_file in mask_files[1:]:
mask_strip.elements.append(mask_file)
# Set blend mode for mask
mask_strip.blend_type = 'ALPHA_OVER'
mask_strip.blend_alpha = 0.5
with open(detections_path, "rb") as f:
payload = msgpack.unpackb(f.read(), raw=False)
frames = payload.get("frames", [])
return len(frames) >= expected_frames * 0.9
except Exception:
return True
class SEQUENCER_OT_cancel_mask_generation(Operator):

View File

@ -5,12 +5,18 @@ Provides a sidebar panel in the Video Sequence Editor
for controlling mask generation and blur application.
"""
import os
import bpy
from bpy.types import Panel
from ..core.async_bake_generator import get_bake_generator
from ..core.async_generator import get_generator
from ..core.utils import get_server_status, get_cache_info, format_size
from ..core.utils import (
get_server_status,
get_cache_info,
format_size,
get_detections_path_for_strip,
)
class SEQUENCER_PT_face_mask(Panel):
@ -186,19 +192,19 @@ class SEQUENCER_PT_face_mask(Panel):
row = box.row()
row.label(text=f"Strip: {strip.name}")
# Check for existing mask
seq_editor = context.scene.sequence_editor
mask_name = f"{strip.name}_mask"
has_mask = mask_name in seq_editor.strips
detections_path = get_detections_path_for_strip(strip.name)
has_mask = bpy.path.abspath(detections_path) and os.path.exists(
bpy.path.abspath(detections_path)
)
if has_mask:
row = box.row()
row.label(text="Mask exists", icon='CHECKMARK')
row.label(text="Detection cache exists", icon='CHECKMARK')
# Generate button
op = box.operator(
"sequencer.generate_face_mask",
text="Generate Face Mask" if not has_mask else "Regenerate Mask",
text="Generate Detection Cache" if not has_mask else "Regenerate Cache",
icon='FACE_MAPS',
)
@ -206,14 +212,14 @@ class SEQUENCER_PT_face_mask(Panel):
"""Draw blur application controls."""
box = layout.box()
box.label(text="Blur Bake", icon='MATFLUID')
# Check for mask strip
seq_editor = context.scene.sequence_editor
mask_name = f"{strip.name}_mask"
has_mask = mask_name in seq_editor.strips
detections_path = get_detections_path_for_strip(strip.name)
has_mask = bpy.path.abspath(detections_path) and os.path.exists(
bpy.path.abspath(detections_path)
)
if not has_mask:
box.label(text="Generate a mask first", icon='INFO')
box.label(text="Generate detection cache first", icon='INFO')
return
# Bake parameters

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
ultralytics
opencv-python-headless
msgpack
numpy
fastapi
uvicorn
pydantic

View File

@ -33,7 +33,6 @@ fix_library_path()
import threading
import uuid
import queue
import traceback
from typing import Dict, Optional, List
from pathlib import Path
@ -43,6 +42,7 @@ from pydantic import BaseModel
import uvicorn
import cv2
import numpy as np
import msgpack
# Add project root to path for imports if needed
sys.path.append(str(Path(__file__).parent.parent))
@ -86,7 +86,7 @@ class GenerateRequest(BaseModel):
class BakeRequest(BaseModel):
video_path: str
mask_path: str
detections_path: str
output_path: str
blur_size: int = 50
format: str = "mp4"
@ -122,185 +122,79 @@ def _build_video_writer(
raise RuntimeError(f"Failed to create video writer for format='{fmt}'")
def _scale_bbox(
x: int,
y: int,
w: int,
h: int,
scale: float,
frame_width: int,
frame_height: int,
) -> Optional[List[int]]:
"""Scale bbox around center and clamp to frame boundaries."""
if w <= 0 or h <= 0:
return None
center_x = x + (w * 0.5)
center_y = y + (h * 0.5)
scaled_w = max(1, int(w * scale))
scaled_h = max(1, int(h * scale))
x1 = max(0, int(center_x - scaled_w * 0.5))
y1 = max(0, int(center_y - scaled_h * 0.5))
x2 = min(frame_width, x1 + scaled_w)
y2 = min(frame_height, y1 + scaled_h)
out_w = x2 - x1
out_h = y2 - y1
if out_w <= 0 or out_h <= 0:
return None
return [x1, y1, out_w, out_h]
def process_video_task(task_id: str, req: GenerateRequest):
"""Background task to process video with async MP4 output."""
writer = None
write_queue = None
writer_thread = None
"""Background task to detect faces and save bbox cache as msgpack."""
cap = None
try:
tasks[task_id].status = TaskStatus.PROCESSING
cancel_event = cancel_events.get(task_id)
# Verify video exists
if not os.path.exists(req.video_path):
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = f"Video not found: {req.video_path}"
return
# Initialize detector (will load model on first run)
print(f"Loading detector for task {task_id}...")
detector = get_detector(
conf_threshold=req.conf_threshold,
iou_threshold=req.iou_threshold
iou_threshold=req.iou_threshold,
)
_ = detector.model
# Open video
cap = cv2.VideoCapture(req.video_path)
if not cap.isOpened():
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Failed to open video"
return
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
end_frame = min(req.end_frame, total_video_frames - 1)
frames_to_process = end_frame - req.start_frame + 1
tasks[task_id].total = frames_to_process
# Ensure output directory exists
os.makedirs(req.output_dir, exist_ok=True)
# Setup MP4 writer (grayscale)
output_video_path = os.path.join(req.output_dir, "mask.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height), isColor=False)
if not writer.isOpened():
if frames_to_process <= 0:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Failed to create video writer"
cap.release()
tasks[task_id].message = "Invalid frame range"
return
# Async writer setup
write_queue = queue.Queue(maxsize=30) # Buffer up to 30 frames
writer_running = threading.Event()
writer_running.set()
tasks[task_id].total = frames_to_process
os.makedirs(req.output_dir, exist_ok=True)
output_msgpack_path = os.path.join(req.output_dir, "detections.msgpack")
def async_writer():
"""Background thread for writing frames to video."""
while writer_running.is_set() or not write_queue.empty():
try:
mask = write_queue.get(timeout=0.1)
if mask is not None:
writer.write(mask)
write_queue.task_done()
except queue.Empty:
continue
writer_thread = threading.Thread(target=async_writer, daemon=True)
writer_thread.start()
print(f"Starting processing: {req.video_path} ({frames_to_process} frames) -> {output_video_path}")
# Batch processing configuration
BATCH_SIZE = 5 # Optimal batch size for 4K video (72.9% improvement)
frame_buffer = []
TEMPORAL_SIDE_WEIGHT = 0.7
TEMPORAL_CENTER_WEIGHT = 1.0
# Temporal blending state (streaming, low-memory)
prev_mask = None
curr_mask = None
wrote_first_frame = False
def _scale_mask(mask: np.ndarray, weight: float) -> np.ndarray:
"""Scale mask intensity for temporal blending."""
if weight == 1.0:
return mask
return cv2.convertScaleAbs(mask, alpha=weight, beta=0)
def _blend_edge(base: np.ndarray, neighbor: np.ndarray) -> np.ndarray:
"""Blend for first/last frame (one-sided temporal context)."""
base_w = _scale_mask(base, TEMPORAL_CENTER_WEIGHT)
neighbor_w = _scale_mask(neighbor, TEMPORAL_SIDE_WEIGHT)
return cv2.max(base_w, neighbor_w)
def _blend_middle(prev: np.ndarray, cur: np.ndarray, nxt: np.ndarray) -> np.ndarray:
"""Blend for middle frames (previous/current/next temporal context)."""
prev_w = _scale_mask(prev, TEMPORAL_SIDE_WEIGHT)
cur_w = _scale_mask(cur, TEMPORAL_CENTER_WEIGHT)
nxt_w = _scale_mask(nxt, TEMPORAL_SIDE_WEIGHT)
return cv2.max(cur_w, cv2.max(prev_w, nxt_w))
def push_mask_temporal(raw_mask: np.ndarray):
"""Push mask and emit blended output in frame order."""
nonlocal prev_mask, curr_mask, wrote_first_frame
if prev_mask is None:
prev_mask = raw_mask
return
if curr_mask is None:
curr_mask = raw_mask
return
if not wrote_first_frame:
write_queue.put(_blend_edge(prev_mask, curr_mask))
wrote_first_frame = True
# Emit blended current frame using prev/current/next
write_queue.put(_blend_middle(prev_mask, curr_mask, raw_mask))
# Slide temporal window
prev_mask = curr_mask
curr_mask = raw_mask
def flush_temporal_tail():
"""Flush remaining masks after all frames are processed."""
if prev_mask is None:
return
# Single-frame case
if curr_mask is None:
write_queue.put(_scale_mask(prev_mask, TEMPORAL_CENTER_WEIGHT))
return
# Two-frame case
if not wrote_first_frame:
write_queue.put(_blend_edge(prev_mask, curr_mask))
# Always emit last frame with one-sided blend
write_queue.put(_blend_edge(curr_mask, prev_mask))
def process_batch():
"""Process accumulated batch of frames."""
if not frame_buffer:
return
# Batch inference at full resolution
batch_detections = detector.detect_batch(frame_buffer)
# Generate masks for each frame
for i, detections in enumerate(batch_detections):
frame = frame_buffer[i]
# Generate mask at original resolution
mask = detector.generate_mask(
frame.shape,
detections,
mask_scale=req.mask_scale
)
# Temporal blend before async write
push_mask_temporal(mask)
# Clear buffer
frame_buffer.clear()
# Seek once to the starting frame. Avoid random-access seek on every frame.
if req.start_frame > 0:
seek_ok = cap.set(cv2.CAP_PROP_POS_FRAMES, req.start_frame)
if not seek_ok:
print(
f"[FaceMask] Warning: CAP_PROP_POS_FRAMES seek failed, "
f"fallback to sequential skip ({req.start_frame} frames)"
)
for _ in range(req.start_frame):
ret, _ = cap.read()
if not ret:
@ -310,65 +204,97 @@ def process_video_task(task_id: str, req: GenerateRequest):
)
return
# Process loop with batching
frame_buffer: List[np.ndarray] = []
frame_detections: List[List[List[float]]] = []
batch_size = 5
current_count = 0
for frame_idx in range(req.start_frame, end_frame + 1):
def process_batch():
nonlocal current_count
if not frame_buffer:
return
batch_detections = detector.detect_batch(frame_buffer)
for detections in batch_detections:
packed_detections: List[List[float]] = []
for x, y, w, h, conf in detections:
scaled = _scale_bbox(
int(x),
int(y),
int(w),
int(h),
float(req.mask_scale),
width,
height,
)
if scaled is None:
continue
packed_detections.append(
[scaled[0], scaled[1], scaled[2], scaled[3], float(conf)]
)
frame_detections.append(packed_detections)
current_count += 1
tasks[task_id].progress = current_count
frame_buffer.clear()
print(
f"Starting detection cache generation: {req.video_path} "
f"({frames_to_process} frames) -> {output_msgpack_path}"
)
for _ in range(req.start_frame, end_frame + 1):
if cancel_event and cancel_event.is_set():
tasks[task_id].status = TaskStatus.CANCELLED
tasks[task_id].message = "Cancelled by user"
break
# Read next frame sequentially (after one-time initial seek)
ret, frame = cap.read()
if not ret:
break
if ret:
# Store frame for batch processing
frame_buffer.append(frame)
frame_buffer.append(frame)
if len(frame_buffer) >= batch_size:
process_batch()
# Process batch when full
if len(frame_buffer) >= BATCH_SIZE:
process_batch()
# Update progress
current_count += 1
tasks[task_id].progress = current_count
# Process remaining frames in buffer
if frame_buffer:
process_batch()
flush_temporal_tail()
# Cleanup
writer_running.clear()
write_queue.join() # Wait for all frames to be written
if writer_thread:
writer_thread.join(timeout=5)
cap.release()
if writer:
writer.release()
if tasks[task_id].status == TaskStatus.PROCESSING:
payload = {
"version": 1,
"video_path": req.video_path,
"start_frame": req.start_frame,
"end_frame": req.start_frame + len(frame_detections) - 1,
"width": width,
"height": height,
"fps": fps,
"mask_scale": float(req.mask_scale),
"frames": frame_detections,
}
with open(output_msgpack_path, "wb") as f:
f.write(msgpack.packb(payload, use_bin_type=True))
tasks[task_id].status = TaskStatus.COMPLETED
tasks[task_id].result_path = output_video_path # Return video path
tasks[task_id].message = "Processing completed successfully"
print(f"Task {task_id} completed: {output_video_path}")
tasks[task_id].result_path = output_msgpack_path
tasks[task_id].message = "Detection cache completed"
print(f"Task {task_id} completed: {output_msgpack_path}")
except Exception as e:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = str(e)
print(f"Error in task {task_id}: {e}")
traceback.print_exc()
finally:
# Cleanup
if cap:
cap.release()
if task_id in cancel_events:
del cancel_events[task_id]
def process_bake_task(task_id: str, req: BakeRequest):
"""Background task to bake blur into a regular video file."""
"""Background task to bake blur using bbox detections in msgpack."""
src_cap = None
mask_cap = None
writer = None
try:
@ -379,59 +305,56 @@ def process_bake_task(task_id: str, req: BakeRequest):
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = f"Video not found: {req.video_path}"
return
if not os.path.exists(req.mask_path):
if not os.path.exists(req.detections_path):
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = f"Mask video not found: {req.mask_path}"
tasks[task_id].message = f"Detections file not found: {req.detections_path}"
return
src_cap = cv2.VideoCapture(req.video_path)
mask_cap = cv2.VideoCapture(req.mask_path)
if not src_cap.isOpened():
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Failed to open source video"
return
if not mask_cap.isOpened():
with open(req.detections_path, "rb") as f:
payload = msgpack.unpackb(f.read(), raw=False)
frames = payload.get("frames")
if not isinstance(frames, list):
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Failed to open mask video"
tasks[task_id].message = "Invalid detections format: 'frames' is missing"
return
src_fps = src_cap.get(cv2.CAP_PROP_FPS) or 30.0
src_width = int(src_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
src_height = int(src_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
src_frames = int(src_cap.get(cv2.CAP_PROP_FRAME_COUNT))
mask_frames = int(mask_cap.get(cv2.CAP_PROP_FRAME_COUNT))
if src_width <= 0 or src_height <= 0:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Invalid source video dimensions"
return
total = min(src_frames, mask_frames) if src_frames > 0 and mask_frames > 0 else 0
total = min(src_frames, len(frames)) if src_frames > 0 else len(frames)
if total <= 0:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].message = "Source/mask frame count is zero"
tasks[task_id].message = "Source/detections frame count is zero"
return
tasks[task_id].total = total
output_dir = os.path.dirname(req.output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
writer = _build_video_writer(req.output_path, req.format, src_fps, src_width, src_height)
# Kernel size must be odd and >= 1
blur_size = max(1, int(req.blur_size))
if blur_size % 2 == 0:
blur_size += 1
feather_radius = max(3, min(25, blur_size // 3))
feather_kernel = feather_radius * 2 + 1
print(f"[FaceMask] Starting blur bake: {req.video_path} + {req.mask_path} -> {req.output_path}")
if src_frames != mask_frames:
print(
f"[FaceMask] Warning: frame count mismatch "
f"(src={src_frames}, mask={mask_frames}), processing {total} frames"
)
print(
f"[FaceMask] Starting blur bake (bbox-msgpack): {req.video_path} + "
f"{req.detections_path} -> {req.output_path}"
)
for idx in range(total):
if cancel_event and cancel_event.is_set():
@ -440,29 +363,61 @@ def process_bake_task(task_id: str, req: BakeRequest):
break
src_ok, src_frame = src_cap.read()
mask_ok, mask_frame = mask_cap.read()
if not src_ok or not mask_ok:
if not src_ok:
break
if mask_frame.ndim == 3:
mask_gray = cv2.cvtColor(mask_frame, cv2.COLOR_BGR2GRAY)
else:
mask_gray = mask_frame
frame_boxes = frames[idx] if idx < len(frames) else []
if not frame_boxes:
writer.write(src_frame)
tasks[task_id].progress = idx + 1
continue
if mask_gray.shape[0] != src_height or mask_gray.shape[1] != src_width:
mask_gray = cv2.resize(
mask_gray,
(src_width, src_height),
interpolation=cv2.INTER_LINEAR,
)
mask_gray = np.zeros((src_height, src_width), dtype=np.uint8)
for box in frame_boxes:
if not isinstance(box, list) or len(box) < 4:
continue
x, y, w, h = int(box[0]), int(box[1]), int(box[2]), int(box[3])
if w <= 0 or h <= 0:
continue
center = (x + w // 2, y + h // 2)
axes = (max(1, w // 2), max(1, h // 2))
cv2.ellipse(mask_gray, center, axes, 0, 0, 360, 255, -1)
blurred = cv2.GaussianBlur(src_frame, (blur_size, blur_size), 0)
alpha = (mask_gray.astype(np.float32) / 255.0)[..., np.newaxis]
composed = (src_frame.astype(np.float32) * (1.0 - alpha)) + (
blurred.astype(np.float32) * alpha
if cv2.countNonZero(mask_gray) == 0:
writer.write(src_frame)
tasks[task_id].progress = idx + 1
continue
mask_gray = cv2.GaussianBlur(mask_gray, (feather_kernel, feather_kernel), 0)
_, mask_binary = cv2.threshold(mask_gray, 2, 255, cv2.THRESH_BINARY)
non_zero_coords = cv2.findNonZero(mask_binary)
if non_zero_coords is None:
writer.write(src_frame)
tasks[task_id].progress = idx + 1
continue
x, y, w, h = cv2.boundingRect(non_zero_coords)
blur_margin = max(1, (blur_size // 2) + feather_radius)
x1 = max(0, x - blur_margin)
y1 = max(0, y - blur_margin)
x2 = min(src_width, x + w + blur_margin)
y2 = min(src_height, y + h + blur_margin)
roi_src = src_frame[y1:y2, x1:x2]
roi_mask = mask_gray[y1:y2, x1:x2]
if roi_src.size == 0:
writer.write(src_frame)
tasks[task_id].progress = idx + 1
continue
roi_blurred = cv2.GaussianBlur(roi_src, (blur_size, blur_size), 0)
roi_alpha = (roi_mask.astype(np.float32) / 255.0)[..., np.newaxis]
roi_composed = (roi_src.astype(np.float32) * (1.0 - roi_alpha)) + (
roi_blurred.astype(np.float32) * roi_alpha
)
writer.write(np.clip(composed, 0, 255).astype(np.uint8))
output_frame = src_frame.copy()
output_frame[y1:y2, x1:x2] = np.clip(roi_composed, 0, 255).astype(np.uint8)
writer.write(output_frame)
tasks[task_id].progress = idx + 1
if tasks[task_id].status == TaskStatus.PROCESSING:
@ -479,8 +434,6 @@ def process_bake_task(task_id: str, req: BakeRequest):
finally:
if src_cap:
src_cap.release()
if mask_cap:
mask_cap.release()
if writer:
writer.release()
if task_id in cancel_events:

View File

@ -1,73 +0,0 @@
#!/bin/bash
# クイックテストスクリプト
# 処理プロセスが正常に動作するか確認
set -e
echo "=== 顔検出処理のクイックテスト ==="
echo ""
# 仮想環境の確認
if [ ! -d ".venv" ]; then
echo "エラー: .venv が見つかりません"
echo "仮想環境を作成してください: python -m venv .venv"
exit 1
fi
# 環境変数の読み込み
if [ -f ".env" ]; then
echo "環境変数を読み込み中..."
export $(cat .env | grep -v '^#' | xargs)
fi
# 仮想環境をアクティベート
source .venv/bin/activate
# モデルの確認
MODEL_PATH="models/yolov8n-face-lindevs.pt"
if [ ! -f "$MODEL_PATH" ]; then
echo "警告: モデルファイルが見つかりません: $MODEL_PATH"
echo "デフォルトモデルをダウンロードしてください"
fi
# テスト画像の確認
if [ $# -eq 0 ]; then
echo "使い方: $0 <画像ファイルまたは動画ファイル>"
echo ""
echo "例:"
echo " $0 test.jpg"
echo " $0 test.mp4"
exit 1
fi
INPUT_FILE="$1"
if [ ! -f "$INPUT_FILE" ]; then
echo "エラー: ファイルが見つかりません: $INPUT_FILE"
exit 1
fi
# ファイルタイプの判定
EXT="${INPUT_FILE##*.}"
EXT_LOWER=$(echo "$EXT" | tr '[:upper:]' '[:lower:]')
echo "入力ファイル: $INPUT_FILE"
echo ""
# GPU情報の表示
echo "=== GPU情報 ==="
python -c "import torch; print(f'PyTorch CUDA available: {torch.cuda.is_available()}'); print(f'Device: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else \"CPU\"}') if torch.cuda.is_available() else None" 2>/dev/null || echo "PyTorchが見つかりません"
echo ""
# テスト実行
echo "=== 検出テストを開始 ==="
if [[ "$EXT_LOWER" == "mp4" || "$EXT_LOWER" == "avi" || "$EXT_LOWER" == "mov" ]]; then
# 動画の場合は最初の1フレームのみテスト
python debug_detector.py --video "$INPUT_FILE" --frame 0
else
# 画像の場合
python debug_detector.py --image "$INPUT_FILE"
fi
echo ""
echo "=== テスト完了 ==="

View File

@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""
推論サーバーAPIのテストスクリプト
Usage:
# サーバーの状態確認
python test_server_api.py --status
# マスク生成のテスト
python test_server_api.py --video test.mp4 --output /tmp/masks --start 0 --end 10
"""
import argparse
import json
import time
import urllib.request
import urllib.error
from pathlib import Path
import sys
SERVER_URL = "http://127.0.0.1:8181"
def check_status():
"""サーバーの状態を確認"""
try:
with urllib.request.urlopen(f"{SERVER_URL}/status", timeout=2) as response:
data = json.loads(response.read().decode('utf-8'))
print("✓ サーバーは稼働中です")
print(f" Status: {data.get('status')}")
print(f" GPU Available: {data.get('gpu_available')}")
if data.get('gpu_device'):
print(f" GPU Device: {data.get('gpu_device')}")
if data.get('gpu_count'):
print(f" GPU Count: {data.get('gpu_count')}")
if data.get('rocm_version'):
print(f" ROCm Version: {data.get('rocm_version')}")
return True
except (urllib.error.URLError, ConnectionRefusedError, TimeoutError) as e:
print("✗ サーバーに接続できません")
print(f" エラー: {e}")
print("\nサーバーを起動してください:")
print(" ./run_server.sh")
return False
def submit_task(video_path, output_dir, start_frame, end_frame, conf, iou, mask_scale):
"""マスク生成タスクを送信"""
data = {
"video_path": video_path,
"output_dir": output_dir,
"start_frame": start_frame,
"end_frame": end_frame,
"conf_threshold": conf,
"iou_threshold": iou,
"mask_scale": mask_scale,
}
req = urllib.request.Request(
f"{SERVER_URL}/generate",
data=json.dumps(data).encode('utf-8'),
headers={'Content-Type': 'application/json'},
method='POST'
)
try:
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
return result
except urllib.error.HTTPError as e:
error_msg = e.read().decode('utf-8')
raise RuntimeError(f"サーバーエラー: {error_msg}")
def get_task_status(task_id):
"""タスクの状態を取得"""
try:
with urllib.request.urlopen(f"{SERVER_URL}/tasks/{task_id}") as response:
return json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError:
return {"status": "unknown"}
def cancel_task(task_id):
"""タスクをキャンセル"""
try:
req = urllib.request.Request(
f"{SERVER_URL}/tasks/{task_id}/cancel",
method='POST'
)
with urllib.request.urlopen(req):
pass
return True
except urllib.error.HTTPError:
return False
def monitor_task(task_id, poll_interval=0.5):
"""タスクの進行状況を監視"""
print(f"\nタスクID: {task_id}")
print("進行状況を監視中...\n")
last_progress = -1
while True:
status = get_task_status(task_id)
state = status.get('status')
progress = status.get('progress', 0)
total = status.get('total', 0)
# 進行状況の表示
if progress != last_progress and total > 0:
percentage = (progress / total) * 100
bar_length = 40
filled = int(bar_length * progress / total)
bar = '=' * filled + '-' * (bar_length - filled)
print(f"\r[{bar}] {progress}/{total} ({percentage:.1f}%)", end='', flush=True)
last_progress = progress
# 終了状態のチェック
if state == "completed":
print("\n\n✓ 処理が完了しました")
print(f" 出力先: {status.get('result_path')}")
print(f" メッセージ: {status.get('message')}")
return True
elif state == "failed":
print("\n\n✗ 処理が失敗しました")
print(f" エラー: {status.get('message')}")
return False
elif state == "cancelled":
print("\n\n- 処理がキャンセルされました")
return False
time.sleep(poll_interval)
def main():
parser = argparse.ArgumentParser(
description="推論サーバーAPIのテストスクリプト"
)
# 操作モード
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--status", action="store_true", help="サーバーの状態を確認")
mode_group.add_argument("--video", type=str, help="処理する動画ファイル")
# タスクパラメータ
parser.add_argument("--output", type=str, default="/tmp/masks", help="マスク出力先ディレクトリ")
parser.add_argument("--start", type=int, default=0, help="開始フレーム")
parser.add_argument("--end", type=int, default=10, help="終了フレーム")
parser.add_argument("--conf", type=float, default=0.5, help="信頼度閾値")
parser.add_argument("--iou", type=float, default=0.45, help="NMS IoU閾値")
parser.add_argument("--mask-scale", type=float, default=1.5, help="マスクスケール")
# その他のオプション
parser.add_argument("--no-wait", action="store_true", help="タスク送信後、完了を待たない")
args = parser.parse_args()
# 状態確認モード
if args.status:
check_status()
return
# マスク生成モード
print("=== 推論サーバーAPIテスト ===\n")
# サーバーの確認
if not check_status():
sys.exit(1)
# 動画ファイルの確認
if not Path(args.video).exists():
print(f"\n✗ 動画ファイルが見つかりません: {args.video}")
sys.exit(1)
video_path = str(Path(args.video).absolute())
output_dir = str(Path(args.output).absolute())
print(f"\n動画: {video_path}")
print(f"出力先: {output_dir}")
print(f"フレーム範囲: {args.start} - {args.end}")
print(f"パラメータ: conf={args.conf}, iou={args.iou}, mask_scale={args.mask_scale}")
# タスクを送信
print("\nタスクを送信中...")
try:
result = submit_task(
video_path,
output_dir,
args.start,
args.end,
args.conf,
args.iou,
args.mask_scale
)
except Exception as e:
print(f"\n✗ タスク送信失敗: {e}")
sys.exit(1)
task_id = result['id']
print(f"✓ タスクが送信されました (ID: {task_id})")
# 完了待機
if not args.no_wait:
try:
success = monitor_task(task_id)
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\n中断されました")
print("タスクをキャンセル中...")
if cancel_task(task_id):
print("✓ タスクをキャンセルしました")
sys.exit(130)
else:
print("\nタスクの状態を確認するには:")
print(f" python test_server_api.py --task-status {task_id}")
if __name__ == "__main__":
main()