142 lines
4.0 KiB
Python
142 lines
4.0 KiB
Python
"""
|
|
Utility functions for Face Mask extension.
|
|
|
|
Provides helper functions for server status, cache info, etc.
|
|
"""
|
|
|
|
import os
|
|
import urllib.request
|
|
import urllib.error
|
|
import json
|
|
import tempfile
|
|
from typing import Dict, Tuple, Optional
|
|
|
|
|
|
def get_server_status() -> Dict:
|
|
"""
|
|
Get server status and GPU information.
|
|
|
|
Returns:
|
|
dict: {
|
|
'running': bool,
|
|
'gpu_available': bool,
|
|
'gpu_device': str or None,
|
|
'gpu_count': int,
|
|
'rocm_version': str or None,
|
|
}
|
|
"""
|
|
result = {
|
|
'running': False,
|
|
'gpu_available': False,
|
|
'gpu_device': None,
|
|
'gpu_count': 0,
|
|
'rocm_version': None,
|
|
}
|
|
|
|
try:
|
|
with urllib.request.urlopen("http://127.0.0.1:8181/status", timeout=1) as response:
|
|
data = json.loads(response.read().decode('utf-8'))
|
|
result['running'] = data.get('status') == 'running'
|
|
result['gpu_available'] = data.get('gpu_available', False)
|
|
result['gpu_device'] = data.get('gpu_device')
|
|
result['gpu_count'] = data.get('gpu_count', 0)
|
|
result['rocm_version'] = data.get('rocm_version')
|
|
except (urllib.error.URLError, ConnectionRefusedError, TimeoutError):
|
|
result['running'] = False
|
|
|
|
return result
|
|
|
|
|
|
def get_cache_root() -> str:
|
|
"""
|
|
Resolve cache root directory from scene setting or defaults.
|
|
|
|
Priority:
|
|
1) Scene setting: facemask_cache_dir (if non-empty)
|
|
2) Saved blend file directory + .mask_cache
|
|
3) Temp directory + blender_mask_cache
|
|
"""
|
|
import bpy
|
|
|
|
scene = getattr(bpy.context, "scene", None)
|
|
cache_setting = ""
|
|
if scene is not None:
|
|
cache_setting = (getattr(scene, "facemask_cache_dir", "") or "").strip()
|
|
|
|
if cache_setting:
|
|
return bpy.path.abspath(cache_setting)
|
|
|
|
blend_file = bpy.data.filepath
|
|
if blend_file:
|
|
project_dir = os.path.dirname(blend_file)
|
|
return os.path.join(project_dir, ".mask_cache")
|
|
return os.path.join(tempfile.gettempdir(), "blender_mask_cache")
|
|
|
|
|
|
def get_cache_dir_for_strip(strip_name: str) -> str:
|
|
"""Get cache directory path for a specific strip."""
|
|
return os.path.join(get_cache_root(), strip_name)
|
|
|
|
|
|
def get_detections_path_for_strip(strip_name: str) -> str:
|
|
"""Get msgpack detection cache path for a specific strip."""
|
|
return os.path.join(get_cache_dir_for_strip(strip_name), "detections.msgpack")
|
|
|
|
|
|
def check_detection_cache(strip_name: str) -> bool:
|
|
"""Detection cache ファイルが存在し有効かどうか確認する。"""
|
|
path = get_detections_path_for_strip(strip_name)
|
|
try:
|
|
return os.path.exists(path) and os.path.getsize(path) > 0
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def get_cache_info(strip_name: Optional[str] = None) -> Tuple[str, int, int]:
|
|
"""
|
|
Get cache directory information.
|
|
|
|
Args:
|
|
strip_name: If provided, get info for specific strip. Otherwise, get info for all cache.
|
|
|
|
Returns:
|
|
Tuple of (cache_path, total_size_bytes, file_count)
|
|
"""
|
|
if strip_name:
|
|
cache_path = get_cache_dir_for_strip(strip_name)
|
|
else:
|
|
cache_path = get_cache_root()
|
|
|
|
# Calculate size and count
|
|
total_size = 0
|
|
file_count = 0
|
|
|
|
if os.path.exists(cache_path):
|
|
for root, dirs, files in os.walk(cache_path):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
try:
|
|
total_size += os.path.getsize(file_path)
|
|
file_count += 1
|
|
except OSError:
|
|
pass
|
|
|
|
return cache_path, total_size, file_count
|
|
|
|
|
|
def format_size(size_bytes: int) -> str:
|
|
"""
|
|
Format bytes to human-readable size.
|
|
|
|
Args:
|
|
size_bytes: Size in bytes
|
|
|
|
Returns:
|
|
Formatted string (e.g., "1.5 MB")
|
|
"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.1f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.1f} TB"
|