feat: add remote runtime registry source

This commit is contained in:
Keisuke Hirata 2026-06-26 15:15:29 +09:00
parent ec4ada9464
commit aeb12b3b8e
No known key found for this signature in database
6 changed files with 1070 additions and 30 deletions

1
Cargo.lock generated
View File

@ -6036,6 +6036,7 @@ dependencies = [
"pod-store",
"project-record",
"protocol",
"reqwest",
"rusqlite",
"serde",
"serde_json",

View File

@ -14,6 +14,7 @@ futures.workspace = true
pod-store = { workspace = true }
protocol = { workspace = true }
project-record.workspace = true
reqwest = { version = "0.13", default-features = false, features = ["blocking", "json", "native-tls"] }
rusqlite.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true

File diff suppressed because it is too large Load Diff

View File

@ -50,6 +50,12 @@ pub enum Error {
},
#[error("invalid runtime {kind} `{value}`")]
InvalidRuntimeIdentifier { kind: String, value: String },
#[error("runtime `{runtime_id}` operation failed ({code}): {message}")]
RuntimeOperationFailed {
runtime_id: String,
code: String,
message: String,
},
#[error("runtime `{runtime_id}` does not support `{capability}`")]
RuntimeCapabilityUnsupported {
runtime_id: String,

View File

@ -13,9 +13,10 @@ use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use crate::hosts::{
DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime, RuntimeDiagnostic,
RuntimeRegistry, RuntimeSummary, WorkerInputRequest, WorkerInputResult, WorkerSpawnRequest,
WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection,
DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime,
RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic, RuntimeRegistry, RuntimeSummary,
WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest, WorkerLifecycleResult,
WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection,
};
use crate::identity::WorkspaceIdentity;
use crate::observation::{
@ -47,6 +48,7 @@ pub struct ServerConfig {
pub max_records: usize,
pub local_runtime_data_dir: Option<PathBuf>,
pub runtime_event_sources: Vec<RuntimeObservationSourceConfig>,
pub remote_runtime_sources: Vec<RemoteRuntimeConfig>,
}
impl ServerConfig {
@ -64,6 +66,7 @@ impl ServerConfig {
max_records: 200,
local_runtime_data_dir: manifest::paths::data_dir(),
runtime_event_sources: Vec::new(),
remote_runtime_sources: Vec::new(),
}
}
}
@ -88,14 +91,19 @@ impl WorkspaceApi {
updated_at: config.workspace_created_at.clone(),
})
.await?;
let runtime = Arc::new(RuntimeRegistry::for_workspace(
let mut runtime = RuntimeRegistry::for_workspace(
LocalWorkerRuntime::new(
config.workspace_id.clone(),
config.workspace_root.clone(),
config.local_runtime_data_dir.clone(),
),
EmbeddedWorkerRuntime::new_memory(config.workspace_id.clone()),
));
);
for remote_config in config.remote_runtime_sources.iter().cloned() {
runtime
.register(RemoteWorkerRuntime::new(remote_config).map_err(|err| err.into_error())?);
}
let runtime = Arc::new(runtime);
let observation_proxy = BackendObservationProxy::new(config.runtime_event_sources.clone());
Ok(Self {
records: LocalProjectRecordReader::new(config.workspace_root.clone()),
@ -155,6 +163,14 @@ pub fn build_router(api: WorkspaceApi) -> Router {
"/api/runtimes/{runtime_id}/workers/{worker_id}/input",
post(send_runtime_worker_input),
)
.route(
"/api/runtimes/{runtime_id}/workers/{worker_id}/stop",
post(stop_runtime_worker),
)
.route(
"/api/runtimes/{runtime_id}/workers/{worker_id}/cancel",
post(cancel_runtime_worker),
)
.route(
"/api/runtimes/{runtime_id}/workers/{worker_id}/transcript",
get(get_runtime_worker_transcript),
@ -499,6 +515,30 @@ async fn send_runtime_worker_input(
Ok(Json(result))
}
async fn stop_runtime_worker(
State(api): State<WorkspaceApi>,
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
Json(request): Json<WorkerLifecycleRequest>,
) -> ApiResult<Json<WorkerLifecycleResult>> {
let result = api
.runtime
.stop_worker(&runtime_id, &worker_id, request)
.map_err(|err| err.into_error())?;
Ok(Json(result))
}
async fn cancel_runtime_worker(
State(api): State<WorkspaceApi>,
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
Json(request): Json<WorkerLifecycleRequest>,
) -> ApiResult<Json<WorkerLifecycleResult>> {
let result = api
.runtime
.cancel_worker(&runtime_id, &worker_id, request)
.map_err(|err| err.into_error())?;
Ok(Json(result))
}
async fn get_runtime_worker_transcript(
State(api): State<WorkspaceApi>,
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
@ -523,11 +563,16 @@ async fn worker_observation_ws(
Ok(source) => ws.on_upgrade(move |socket| {
worker_observation_ws_session(api.observation_proxy, source, query, socket)
}),
Err(ObservationProxyError::WorkerNotFound(_)) => {
match api.runtime.observation_source(&runtime_id, &worker_id) {
Ok(source) => ws.on_upgrade(move |socket| {
worker_observation_ws_session(api.observation_proxy, source, query, socket)
}),
Err(error) => ApiError(error.into_error()).into_response(),
}
}
Err(error) => {
let status = match error {
ObservationProxyError::WorkerNotFound(_) => StatusCode::NOT_FOUND,
_ => StatusCode::BAD_REQUEST,
};
let status = StatusCode::BAD_REQUEST;
(
status,
Json(serde_json::json!({
@ -844,6 +889,16 @@ impl IntoResponse for ApiError {
| Error::UnknownRepository(_) => StatusCode::NOT_FOUND,
Error::Ticket(_) => StatusCode::NOT_FOUND,
Error::RuntimeCapabilityUnsupported { .. } => StatusCode::NOT_IMPLEMENTED,
Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_auth_failed" => {
StatusCode::UNAUTHORIZED
}
Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_timeout" => {
StatusCode::GATEWAY_TIMEOUT
}
Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_unsupported" => {
StatusCode::NOT_IMPLEMENTED
}
Error::RuntimeOperationFailed { .. } => StatusCode::BAD_GATEWAY,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
(

View File

@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
filter = sourceFilter;
};
cargoHash = "sha256-5vmZTzO5PSRPHvQfiK0rNiBkHNyc0y3BCeDJNFJaAqA=";
cargoHash = "sha256-kZ9TAb1lNpslAhzcyC2RyIZg5Yh5hrAGCTZIhhYl/e4=";
depsExtraArgs = {
# Older fetchCargoVendor utilities used crates.io's API download endpoint,