merge: 00001KVZSGT14 remote runtime registry

This commit is contained in:
Keisuke Hirata 2026-06-26 15:29:18 +09:00
commit bbb5d68c4c
No known key found for this signature in database
7 changed files with 1148 additions and 32 deletions

1
Cargo.lock generated
View File

@ -6036,6 +6036,7 @@ dependencies = [
"pod-store", "pod-store",
"project-record", "project-record",
"protocol", "protocol",
"reqwest",
"rusqlite", "rusqlite",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -14,6 +14,7 @@ futures.workspace = true
pod-store = { workspace = true } pod-store = { workspace = true }
protocol = { workspace = true } protocol = { workspace = true }
project-record.workspace = true project-record.workspace = true
reqwest = { version = "0.13", default-features = false, features = ["blocking", "json", "native-tls"] }
rusqlite.workspace = true rusqlite.workspace = true
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true serde_json.workspace = true

File diff suppressed because it is too large Load Diff

View File

@ -50,6 +50,12 @@ pub enum Error {
}, },
#[error("invalid runtime {kind} `{value}`")] #[error("invalid runtime {kind} `{value}`")]
InvalidRuntimeIdentifier { kind: String, value: String }, InvalidRuntimeIdentifier { kind: String, value: String },
#[error("runtime `{runtime_id}` operation failed ({code}): {message}")]
RuntimeOperationFailed {
runtime_id: String,
code: String,
message: String,
},
#[error("runtime `{runtime_id}` does not support `{capability}`")] #[error("runtime `{runtime_id}` does not support `{capability}`")]
RuntimeCapabilityUnsupported { RuntimeCapabilityUnsupported {
runtime_id: String, runtime_id: String,

View File

@ -10,7 +10,7 @@ use tokio_tungstenite::tungstenite::{Error as TungsteniteError, Message as Tungs
use worker_runtime::http_server::{RuntimeWorkerEventWsEnvelope, RuntimeWorkerEventWsFrame}; use worker_runtime::http_server::{RuntimeWorkerEventWsEnvelope, RuntimeWorkerEventWsFrame};
/// Backend-private source for a runtime worker observation stream. /// Backend-private source for a runtime worker observation stream.
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, PartialEq, Eq)]
pub struct RuntimeObservationSourceConfig { pub struct RuntimeObservationSourceConfig {
pub runtime_id: String, pub runtime_id: String,
pub worker_id: String, pub worker_id: String,
@ -18,6 +18,20 @@ pub struct RuntimeObservationSourceConfig {
pub bearer_token: Option<String>, pub bearer_token: Option<String>,
} }
impl std::fmt::Debug for RuntimeObservationSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeObservationSourceConfig")
.field("runtime_id", &self.runtime_id)
.field("worker_id", &self.worker_id)
.field("endpoint", &"<backend-private>")
.field(
"bearer_token",
&self.bearer_token.as_ref().map(|_| "<redacted>"),
)
.finish()
}
}
/// Event consumed from a Runtime-owned worker observation WebSocket. /// Event consumed from a Runtime-owned worker observation WebSocket.
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RuntimeObservationUpstreamEvent { pub struct RuntimeObservationUpstreamEvent {
@ -181,12 +195,21 @@ pub struct BackendObservationOpen {
} }
/// Backend-owned in-memory v0 observation proxy state. /// Backend-owned in-memory v0 observation proxy state.
#[derive(Clone, Debug)] #[derive(Clone)]
pub struct BackendObservationProxy { pub struct BackendObservationProxy {
sources: Arc<BTreeMap<ObservationKey, RuntimeObservationSourceConfig>>, sources: Arc<BTreeMap<ObservationKey, RuntimeObservationSourceConfig>>,
state: Arc<Mutex<BackendObservationState>>, state: Arc<Mutex<BackendObservationState>>,
} }
impl std::fmt::Debug for BackendObservationProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BackendObservationProxy")
.field("source_count", &self.sources.len())
.field("state", &"<omitted>")
.finish()
}
}
impl BackendObservationProxy { impl BackendObservationProxy {
pub fn new(sources: Vec<RuntimeObservationSourceConfig>) -> Self { pub fn new(sources: Vec<RuntimeObservationSourceConfig>) -> Self {
let sources = sources let sources = sources
@ -475,3 +498,56 @@ impl RuntimeWsObservationClient {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
fn sensitive_source() -> RuntimeObservationSourceConfig {
RuntimeObservationSourceConfig {
runtime_id: "remote-runtime".to_string(),
worker_id: "worker-1".to_string(),
endpoint: "wss://remote.example.invalid/private/workers/worker-1/events/ws".to_string(),
bearer_token: Some("top-secret-bearer-token".to_string()),
}
}
#[test]
fn runtime_observation_source_debug_redacts_endpoint_and_token() {
let debug = format!("{:?}", sensitive_source());
assert!(debug.contains("remote-runtime"));
assert!(debug.contains("worker-1"));
assert!(debug.contains("<backend-private>"));
assert!(debug.contains("<redacted>"));
for forbidden in [
"remote.example.invalid",
"/private/workers/worker-1/events/ws",
"top-secret-bearer-token",
] {
assert!(
!debug.contains(forbidden),
"debug leaked {forbidden}: {debug}"
);
}
}
#[test]
fn backend_observation_proxy_debug_redacts_contained_sources() {
let proxy = BackendObservationProxy::new(vec![sensitive_source()]);
let debug = format!("{proxy:?}");
assert!(debug.contains("BackendObservationProxy"));
assert!(debug.contains("source_count"));
for forbidden in [
"remote.example.invalid",
"/private/workers/worker-1/events/ws",
"top-secret-bearer-token",
] {
assert!(
!debug.contains(forbidden),
"debug leaked {forbidden}: {debug}"
);
}
}
}

View File

@ -13,9 +13,10 @@ use serde::{Deserialize, Serialize};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use crate::hosts::{ use crate::hosts::{
DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime, RuntimeDiagnostic, DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime,
RuntimeRegistry, RuntimeSummary, WorkerInputRequest, WorkerInputResult, WorkerSpawnRequest, RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic, RuntimeRegistry, RuntimeSummary,
WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection, WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest, WorkerLifecycleResult,
WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection,
}; };
use crate::identity::WorkspaceIdentity; use crate::identity::WorkspaceIdentity;
use crate::observation::{ use crate::observation::{
@ -47,6 +48,7 @@ pub struct ServerConfig {
pub max_records: usize, pub max_records: usize,
pub local_runtime_data_dir: Option<PathBuf>, pub local_runtime_data_dir: Option<PathBuf>,
pub runtime_event_sources: Vec<RuntimeObservationSourceConfig>, pub runtime_event_sources: Vec<RuntimeObservationSourceConfig>,
pub remote_runtime_sources: Vec<RemoteRuntimeConfig>,
} }
impl ServerConfig { impl ServerConfig {
@ -64,6 +66,7 @@ impl ServerConfig {
max_records: 200, max_records: 200,
local_runtime_data_dir: manifest::paths::data_dir(), local_runtime_data_dir: manifest::paths::data_dir(),
runtime_event_sources: Vec::new(), runtime_event_sources: Vec::new(),
remote_runtime_sources: Vec::new(),
} }
} }
} }
@ -88,14 +91,19 @@ impl WorkspaceApi {
updated_at: config.workspace_created_at.clone(), updated_at: config.workspace_created_at.clone(),
}) })
.await?; .await?;
let runtime = Arc::new(RuntimeRegistry::for_workspace( let mut runtime = RuntimeRegistry::for_workspace(
LocalWorkerRuntime::new( LocalWorkerRuntime::new(
config.workspace_id.clone(), config.workspace_id.clone(),
config.workspace_root.clone(), config.workspace_root.clone(),
config.local_runtime_data_dir.clone(), config.local_runtime_data_dir.clone(),
), ),
EmbeddedWorkerRuntime::new_memory(config.workspace_id.clone()), EmbeddedWorkerRuntime::new_memory(config.workspace_id.clone()),
)); );
for remote_config in config.remote_runtime_sources.iter().cloned() {
runtime
.register(RemoteWorkerRuntime::new(remote_config).map_err(|err| err.into_error())?);
}
let runtime = Arc::new(runtime);
let observation_proxy = BackendObservationProxy::new(config.runtime_event_sources.clone()); let observation_proxy = BackendObservationProxy::new(config.runtime_event_sources.clone());
Ok(Self { Ok(Self {
records: LocalProjectRecordReader::new(config.workspace_root.clone()), records: LocalProjectRecordReader::new(config.workspace_root.clone()),
@ -155,6 +163,14 @@ pub fn build_router(api: WorkspaceApi) -> Router {
"/api/runtimes/{runtime_id}/workers/{worker_id}/input", "/api/runtimes/{runtime_id}/workers/{worker_id}/input",
post(send_runtime_worker_input), post(send_runtime_worker_input),
) )
.route(
"/api/runtimes/{runtime_id}/workers/{worker_id}/stop",
post(stop_runtime_worker),
)
.route(
"/api/runtimes/{runtime_id}/workers/{worker_id}/cancel",
post(cancel_runtime_worker),
)
.route( .route(
"/api/runtimes/{runtime_id}/workers/{worker_id}/transcript", "/api/runtimes/{runtime_id}/workers/{worker_id}/transcript",
get(get_runtime_worker_transcript), get(get_runtime_worker_transcript),
@ -499,6 +515,30 @@ async fn send_runtime_worker_input(
Ok(Json(result)) Ok(Json(result))
} }
async fn stop_runtime_worker(
State(api): State<WorkspaceApi>,
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
Json(request): Json<WorkerLifecycleRequest>,
) -> ApiResult<Json<WorkerLifecycleResult>> {
let result = api
.runtime
.stop_worker(&runtime_id, &worker_id, request)
.map_err(|err| err.into_error())?;
Ok(Json(result))
}
async fn cancel_runtime_worker(
State(api): State<WorkspaceApi>,
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
Json(request): Json<WorkerLifecycleRequest>,
) -> ApiResult<Json<WorkerLifecycleResult>> {
let result = api
.runtime
.cancel_worker(&runtime_id, &worker_id, request)
.map_err(|err| err.into_error())?;
Ok(Json(result))
}
async fn get_runtime_worker_transcript( async fn get_runtime_worker_transcript(
State(api): State<WorkspaceApi>, State(api): State<WorkspaceApi>,
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
@ -523,11 +563,16 @@ async fn worker_observation_ws(
Ok(source) => ws.on_upgrade(move |socket| { Ok(source) => ws.on_upgrade(move |socket| {
worker_observation_ws_session(api.observation_proxy, source, query, socket) worker_observation_ws_session(api.observation_proxy, source, query, socket)
}), }),
Err(ObservationProxyError::WorkerNotFound(_)) => {
match api.runtime.observation_source(&runtime_id, &worker_id) {
Ok(source) => ws.on_upgrade(move |socket| {
worker_observation_ws_session(api.observation_proxy, source, query, socket)
}),
Err(error) => ApiError(error.into_error()).into_response(),
}
}
Err(error) => { Err(error) => {
let status = match error { let status = StatusCode::BAD_REQUEST;
ObservationProxyError::WorkerNotFound(_) => StatusCode::NOT_FOUND,
_ => StatusCode::BAD_REQUEST,
};
( (
status, status,
Json(serde_json::json!({ Json(serde_json::json!({
@ -844,6 +889,16 @@ impl IntoResponse for ApiError {
| Error::UnknownRepository(_) => StatusCode::NOT_FOUND, | Error::UnknownRepository(_) => StatusCode::NOT_FOUND,
Error::Ticket(_) => StatusCode::NOT_FOUND, Error::Ticket(_) => StatusCode::NOT_FOUND,
Error::RuntimeCapabilityUnsupported { .. } => StatusCode::NOT_IMPLEMENTED, Error::RuntimeCapabilityUnsupported { .. } => StatusCode::NOT_IMPLEMENTED,
Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_auth_failed" => {
StatusCode::UNAUTHORIZED
}
Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_timeout" => {
StatusCode::GATEWAY_TIMEOUT
}
Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_unsupported" => {
StatusCode::NOT_IMPLEMENTED
}
Error::RuntimeOperationFailed { .. } => StatusCode::BAD_GATEWAY,
_ => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR,
}; };
( (

View File

@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
filter = sourceFilter; filter = sourceFilter;
}; };
cargoHash = "sha256-5vmZTzO5PSRPHvQfiK0rNiBkHNyc0y3BCeDJNFJaAqA="; cargoHash = "sha256-kZ9TAb1lNpslAhzcyC2RyIZg5Yh5hrAGCTZIhhYl/e4=";
depsExtraArgs = { depsExtraArgs = {
# Older fetchCargoVendor utilities used crates.io's API download endpoint, # Older fetchCargoVendor utilities used crates.io's API download endpoint,