blender-mask-peoples/core/utils.py
2026-02-22 04:36:28 +09:00

142 lines
4.0 KiB
Python

"""
Utility functions for Face Mask extension.
Provides helper functions for server status, cache info, etc.
"""
import os
import urllib.request
import urllib.error
import json
import tempfile
from typing import Dict, Tuple, Optional
def get_server_status() -> Dict:
"""
Get server status and GPU information.
Returns:
dict: {
'running': bool,
'gpu_available': bool,
'gpu_device': str or None,
'gpu_count': int,
'rocm_version': str or None,
}
"""
result = {
'running': False,
'gpu_available': False,
'gpu_device': None,
'gpu_count': 0,
'rocm_version': None,
}
try:
with urllib.request.urlopen("http://127.0.0.1:8181/status", timeout=1) as response:
data = json.loads(response.read().decode('utf-8'))
result['running'] = data.get('status') == 'running'
result['gpu_available'] = data.get('gpu_available', False)
result['gpu_device'] = data.get('gpu_device')
result['gpu_count'] = data.get('gpu_count', 0)
result['rocm_version'] = data.get('rocm_version')
except (urllib.error.URLError, ConnectionRefusedError, TimeoutError):
result['running'] = False
return result
def get_cache_root() -> str:
"""
Resolve cache root directory from scene setting or defaults.
Priority:
1) Scene setting: facemask_cache_dir (if non-empty)
2) Saved blend file directory + .mask_cache
3) Temp directory + blender_mask_cache
"""
import bpy
scene = getattr(bpy.context, "scene", None)
cache_setting = ""
if scene is not None:
cache_setting = (getattr(scene, "facemask_cache_dir", "") or "").strip()
if cache_setting:
return bpy.path.abspath(cache_setting)
blend_file = bpy.data.filepath
if blend_file:
project_dir = os.path.dirname(blend_file)
return os.path.join(project_dir, ".mask_cache")
return os.path.join(tempfile.gettempdir(), "blender_mask_cache")
def get_cache_dir_for_strip(strip_name: str) -> str:
"""Get cache directory path for a specific strip."""
return os.path.join(get_cache_root(), strip_name)
def get_detections_path_for_strip(strip_name: str) -> str:
"""Get msgpack detection cache path for a specific strip."""
return os.path.join(get_cache_dir_for_strip(strip_name), "detections.msgpack")
def check_detection_cache(strip_name: str) -> bool:
"""Detection cache ファイルが存在し有効かどうか確認する。"""
path = get_detections_path_for_strip(strip_name)
try:
return os.path.exists(path) and os.path.getsize(path) > 0
except OSError:
return False
def get_cache_info(strip_name: Optional[str] = None) -> Tuple[str, int, int]:
"""
Get cache directory information.
Args:
strip_name: If provided, get info for specific strip. Otherwise, get info for all cache.
Returns:
Tuple of (cache_path, total_size_bytes, file_count)
"""
if strip_name:
cache_path = get_cache_dir_for_strip(strip_name)
else:
cache_path = get_cache_root()
# Calculate size and count
total_size = 0
file_count = 0
if os.path.exists(cache_path):
for root, dirs, files in os.walk(cache_path):
for file in files:
file_path = os.path.join(root, file)
try:
total_size += os.path.getsize(file_path)
file_count += 1
except OSError:
pass
return cache_path, total_size, file_count
def format_size(size_bytes: int) -> str:
"""
Format bytes to human-readable size.
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., "1.5 MB")
"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"