merge: 00001KVWECEQG worker runtime registry

This commit is contained in:
Keisuke Hirata 2026-06-24 20:11:00 +09:00
commit 1251c0ca70
No known key found for this signature in database
8 changed files with 1059 additions and 560 deletions

1
Cargo.lock generated
View File

@ -6058,6 +6058,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"sha2 0.11.0",
"tempfile", "tempfile",
"thiserror 2.0.18", "thiserror 2.0.18",
"ticket", "ticket",

View File

@ -16,6 +16,7 @@ rusqlite.workspace = true
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true serde_json.workspace = true
serde_yaml.workspace = true serde_yaml.workspace = true
sha2.workspace = true
thiserror.workspace = true thiserror.workspace = true
ticket.workspace = true ticket.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] } tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] }

File diff suppressed because it is too large Load Diff

View File

@ -40,6 +40,15 @@ pub enum Error {
MissingFrontmatter(String), MissingFrontmatter(String),
#[error("unknown local host `{0}`")] #[error("unknown local host `{0}`")]
UnknownHost(String), UnknownHost(String),
#[error("unknown local worker `{0}`")]
UnknownWorker(String),
#[error("invalid runtime {kind} `{value}`")]
InvalidRuntimeIdentifier { kind: String, value: String },
#[error("runtime `{runtime_id}` does not support `{capability}`")]
RuntimeCapabilityUnsupported {
runtime_id: String,
capability: String,
},
#[error("unknown local repository `{0}`")] #[error("unknown local repository `{0}`")]
UnknownRepository(String), UnknownRepository(String),
#[error("workspace identity error: {0}")] #[error("workspace identity error: {0}")]

View File

@ -3,7 +3,7 @@ use std::process::{Command, Output};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::hosts::RuntimeDiagnostic; use crate::hosts::{DiagnosticSeverity, RuntimeDiagnostic};
const LEGACY_LOCAL_REPOSITORY_ID: &str = "local"; const LEGACY_LOCAL_REPOSITORY_ID: &str = "local";
const LOCAL_REPOSITORY_PREFIX: &str = "local-"; const LOCAL_REPOSITORY_PREFIX: &str = "local-";
@ -340,7 +340,11 @@ fn truncate_field(value: &str, limit: usize) -> String {
fn diagnostic(code: &str, severity: &str, message: String) -> RuntimeDiagnostic { fn diagnostic(code: &str, severity: &str, message: String) -> RuntimeDiagnostic {
RuntimeDiagnostic { RuntimeDiagnostic {
code: code.to_string(), code: code.to_string(),
severity: severity.to_string(), severity: match severity {
"error" => DiagnosticSeverity::Error,
"warning" => DiagnosticSeverity::Warning,
_ => DiagnosticSeverity::Info,
},
message, message,
} }
} }

View File

@ -11,7 +11,8 @@ use serde::{Deserialize, Serialize};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use crate::hosts::{ use crate::hosts::{
HostSummary, LocalRuntimeBridge, RuntimeDiagnostic, WorkerSummary, WorkspaceWorkerRuntime, DiagnosticSeverity, HostSummary, LocalPodRuntime, RuntimeDiagnostic, RuntimeSummary,
WorkerRuntimeRegistry, WorkerSummary,
}; };
use crate::identity::WorkspaceIdentity; use crate::identity::WorkspaceIdentity;
use crate::records::{ use crate::records::{
@ -63,7 +64,7 @@ pub struct WorkspaceApi {
config: ServerConfig, config: ServerConfig,
store: Arc<dyn ControlPlaneStore>, store: Arc<dyn ControlPlaneStore>,
records: LocalProjectRecordReader, records: LocalProjectRecordReader,
runtime: Arc<dyn WorkspaceWorkerRuntime>, runtime: Arc<WorkerRuntimeRegistry>,
} }
impl WorkspaceApi { impl WorkspaceApi {
@ -77,11 +78,11 @@ impl WorkspaceApi {
updated_at: config.workspace_created_at.clone(), updated_at: config.workspace_created_at.clone(),
}) })
.await?; .await?;
let runtime = Arc::new(LocalRuntimeBridge::new( let runtime = Arc::new(WorkerRuntimeRegistry::for_local_pods(LocalPodRuntime::new(
config.workspace_id.clone(), config.workspace_id.clone(),
config.workspace_root.clone(), config.workspace_root.clone(),
config.local_runtime_data_dir.clone(), config.local_runtime_data_dir.clone(),
)); )));
Ok(Self { Ok(Self {
records: LocalProjectRecordReader::new(config.workspace_root.clone()), records: LocalProjectRecordReader::new(config.workspace_root.clone()),
config, config,
@ -125,6 +126,7 @@ pub fn build_router(api: WorkspaceApi) -> Router {
get(repository_tickets), get(repository_tickets),
) )
.route("/api/hosts", get(list_hosts)) .route("/api/hosts", get(list_hosts))
.route("/api/runtimes", get(list_runtimes))
.route("/api/workers", get(list_workers)) .route("/api/workers", get(list_workers))
.route("/api/hosts/{host_id}/workers", get(list_host_workers)) .route("/api/hosts/{host_id}/workers", get(list_host_workers))
.fallback(get(static_or_spa_fallback)) .fallback(get(static_or_spa_fallback))
@ -381,7 +383,7 @@ async fn repository_tickets(
source: "workspace_local_ticket_fallback".to_string(), source: "workspace_local_ticket_fallback".to_string(),
diagnostics: vec![RuntimeDiagnostic { diagnostics: vec![RuntimeDiagnostic {
code: "repository_ticket_target_metadata_absent".to_string(), code: "repository_ticket_target_metadata_absent".to_string(),
severity: "info".to_string(), severity: DiagnosticSeverity::Info,
message: "Ticket target Repository metadata is not available yet; Kanban groups all workspace-local Tickets by state as a read-only fallback.".to_string(), message: "Ticket target Repository metadata is not available yet; Kanban groups all workspace-local Tickets by state as a read-only fallback.".to_string(),
}], }],
})) }))
@ -396,11 +398,25 @@ async fn list_hosts(
workspace_id: api.config.workspace_id, workspace_id: api.config.workspace_id,
limit, limit,
items: runtime_hosts.items, items: runtime_hosts.items,
source: "worker_runtime".to_string(), source: "worker_runtime_registry".to_string(),
diagnostics: runtime_hosts.diagnostics, diagnostics: runtime_hosts.diagnostics,
})) }))
} }
async fn list_runtimes(
State(api): State<WorkspaceApi>,
) -> ApiResult<Json<RuntimeListResponse<RuntimeSummary>>> {
let limit = api.config.max_records.min(200);
let runtimes = api.runtime.list_runtimes(limit);
Ok(Json(RuntimeListResponse {
workspace_id: api.config.workspace_id,
limit,
items: runtimes.items,
source: "worker_runtime_registry".to_string(),
diagnostics: runtimes.diagnostics,
}))
}
async fn list_workers( async fn list_workers(
State(api): State<WorkspaceApi>, State(api): State<WorkspaceApi>,
) -> ApiResult<Json<RuntimeListResponse<WorkerSummary>>> { ) -> ApiResult<Json<RuntimeListResponse<WorkerSummary>>> {
@ -411,16 +427,18 @@ async fn list_host_workers(
State(api): State<WorkspaceApi>, State(api): State<WorkspaceApi>,
AxumPath(host_id): AxumPath<String>, AxumPath(host_id): AxumPath<String>,
) -> ApiResult<Json<RuntimeListResponse<WorkerSummary>>> { ) -> ApiResult<Json<RuntimeListResponse<WorkerSummary>>> {
let runtime_hosts = api.runtime.list_hosts(1); let limit = api.config.max_records.min(200);
let expected_host_id = runtime_hosts let runtime_workers = api
.items .runtime
.first() .list_workers_for_host(&host_id, limit)
.map(|host| host.host_id.as_str()) .map_err(|err| err.into_error())?;
.ok_or_else(|| Error::UnknownHost(host_id.clone()))?; Ok(Json(RuntimeListResponse {
if host_id != expected_host_id { workspace_id: api.config.workspace_id,
return Err(Error::UnknownHost(host_id).into()); limit,
} items: runtime_workers.items,
workers_response(api).map(Json) source: "worker_runtime_registry".to_string(),
diagnostics: runtime_workers.diagnostics,
}))
} }
fn workers_response(api: WorkspaceApi) -> ApiResult<RuntimeListResponse<WorkerSummary>> { fn workers_response(api: WorkspaceApi) -> ApiResult<RuntimeListResponse<WorkerSummary>> {
@ -430,7 +448,7 @@ fn workers_response(api: WorkspaceApi) -> ApiResult<RuntimeListResponse<WorkerSu
workspace_id: api.config.workspace_id, workspace_id: api.config.workspace_id,
limit, limit,
items: runtime_workers.items, items: runtime_workers.items,
source: "worker_runtime".to_string(), source: "worker_runtime_registry".to_string(),
diagnostics: runtime_workers.diagnostics, diagnostics: runtime_workers.diagnostics,
}) })
} }
@ -589,11 +607,14 @@ impl From<Error> for ApiError {
impl IntoResponse for ApiError { impl IntoResponse for ApiError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let status = match &self.0 { let status = match &self.0 {
Error::InvalidRuntimeIdentifier { .. } => StatusCode::BAD_REQUEST,
Error::InvalidRecordId(_) Error::InvalidRecordId(_)
| Error::MissingFrontmatter(_) | Error::MissingFrontmatter(_)
| Error::UnknownHost(_) | Error::UnknownHost(_)
| Error::UnknownWorker(_)
| Error::UnknownRepository(_) => StatusCode::NOT_FOUND, | Error::UnknownRepository(_) => StatusCode::NOT_FOUND,
Error::Ticket(_) => StatusCode::NOT_FOUND, Error::Ticket(_) => StatusCode::NOT_FOUND,
Error::RuntimeCapabilityUnsupported { .. } => StatusCode::NOT_IMPLEMENTED,
_ => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR,
}; };
( (
@ -703,25 +724,36 @@ mod tests {
assert_eq!(unknown_repository_response.status(), StatusCode::NOT_FOUND); assert_eq!(unknown_repository_response.status(), StatusCode::NOT_FOUND);
let hosts = get_json(app.clone(), "/api/hosts").await; let hosts = get_json(app.clone(), "/api/hosts").await;
assert_eq!(hosts["items"][0]["host_id"], TEST_REPOSITORY_ID); assert_eq!(hosts["source"], "worker_runtime_registry");
assert_eq!(hosts["items"][0]["kind"], "local_host"); assert_eq!(hosts["items"][0]["runtime_id"], "local-pod-runtime");
let host_id = hosts["items"][0]["host_id"].as_str().unwrap().to_string();
assert!(host_id.starts_with("local-"));
assert!(host_id.len() <= 120);
assert_ne!(host_id, TEST_REPOSITORY_ID);
assert_eq!(hosts["items"][0]["kind"], "local-pod-host");
assert_eq!( assert_eq!(
hosts["items"][0]["capabilities"]["local_pod_inspection"], hosts["items"][0]["capabilities"]["local_pod_inspection"],
"unavailable" "available"
); );
assert_eq!(
hosts["items"][0]["capabilities"]["workspace_scope"],
"current_workspace"
);
assert!(!hosts.to_string().contains("metadata.json"));
let runtimes = get_json(app.clone(), "/api/runtimes").await;
assert_eq!(runtimes["source"], "worker_runtime_registry");
assert_eq!(runtimes["items"][0]["runtime_id"], "local-pod-runtime");
assert_eq!(runtimes["items"][0]["host_ids"][0], host_id);
let workers = get_json(app.clone(), "/api/workers").await; let workers = get_json(app.clone(), "/api/workers").await;
assert!(workers["items"].as_array().unwrap().is_empty()); assert!(workers["items"].as_array().unwrap().is_empty());
assert_eq!( assert_eq!(
workers["diagnostics"][0]["code"], workers["diagnostics"][0]["code"],
"local_pod_metadata_root_missing" "local_pod_registry_unreadable"
); );
let host_workers = get_json( let host_workers = get_json(app.clone(), &format!("/api/hosts/{host_id}/workers")).await;
app.clone(),
&format!("/api/hosts/{TEST_REPOSITORY_ID}/workers"),
)
.await;
assert!(host_workers["items"].as_array().unwrap().is_empty()); assert!(host_workers["items"].as_array().unwrap().is_empty());
let runs_response = app let runs_response = app

View File

@ -546,6 +546,14 @@
<dt>Local inspection</dt> <dt>Local inspection</dt>
<dd>{host.capabilities.local_pod_inspection}</dd> <dd>{host.capabilities.local_pod_inspection}</dd>
</div> </div>
<div>
<dt>Runtime</dt>
<dd><code>{host.runtime_id}</code></dd>
</div>
<div>
<dt>Scope</dt>
<dd>{host.capabilities.workspace_scope}</dd>
</div>
<div> <div>
<dt>Platform</dt> <dt>Platform</dt>
<dd>{host.capabilities.os} / {host.capabilities.arch}</dd> <dd>{host.capabilities.os} / {host.capabilities.arch}</dd>
@ -590,7 +598,7 @@
</td> </td>
<td><code>{worker.host_id}</code></td> <td><code>{worker.host_id}</code></td>
<td>{worker.state} · {worker.status}</td> <td>{worker.state} · {worker.status}</td>
<td>{worker.workspace_root ?? 'unknown'}</td> <td>{worker.workspace.visibility} · {worker.workspace.identity}</td>
<td>{worker.implementation.kind}</td> <td>{worker.implementation.kind}</td>
</tr> </tr>
{/each} {/each}

View File

@ -25,35 +25,70 @@ export type Diagnostic = {
message: string; message: string;
}; };
export type RuntimeCapabilities = {
can_list_hosts: boolean;
can_list_workers: boolean;
can_get_worker: boolean;
can_spawn_worker: boolean;
can_stop_worker: boolean;
can_accept_input: boolean;
can_stream_events: boolean;
can_read_bounded_transcript: boolean;
has_workspace_fs: boolean;
has_shell: boolean;
has_git: boolean;
supports_worktrees: boolean;
supports_backend_internal_tools: boolean;
local_pod_inspection: string;
workspace_scope: string;
os: string;
arch: string;
max_workers: number;
};
export type Runtime = {
runtime_id: string;
label: string;
kind: string;
status: string;
host_ids: string[];
capabilities: RuntimeCapabilities;
diagnostics: Diagnostic[];
};
export type Host = { export type Host = {
runtime_id: string;
host_id: string; host_id: string;
label: string; label: string;
kind: string; kind: string;
status: string; status: string;
observed_at: string; observed_at: string;
last_seen_at: string; last_seen_at: string | null;
capabilities: { capabilities: RuntimeCapabilities;
local_pod_inspection: string;
workspace_root: string;
os: string;
arch: string;
max_workers: number;
};
diagnostics: Diagnostic[]; diagnostics: Diagnostic[];
}; };
export type WorkerCapabilities = {
can_accept_input: boolean;
can_stream_events: boolean;
can_stop: boolean;
can_spawn_followup: boolean;
can_read_bounded_transcript: boolean;
};
export type Worker = { export type Worker = {
runtime_id: string;
worker_id: string; worker_id: string;
host_id: string; host_id: string;
label: string; label: string;
pod_name: string; role?: string | null;
role?: string; profile?: string | null;
profile?: string; workspace: { visibility: string; identity: string };
workspace_root?: string;
state: string; state: string;
status: string; status: string;
last_seen_at?: string; last_seen_at?: string | null;
implementation: { kind: string; pod_name: string }; implementation: { kind: string; display_hint: string };
capabilities: WorkerCapabilities;
diagnostics: Diagnostic[]; diagnostics: Diagnostic[];
}; };