From abab1af2f04f45a8a27e055367b1da08bd941646 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 26 Jun 2026 16:06:58 +0900 Subject: [PATCH] feat: add runtime config bundle sync --- Cargo.lock | 1 + crates/worker-runtime/Cargo.toml | 1 + crates/worker-runtime/src/catalog.rs | 3 +- crates/worker-runtime/src/config_bundle.rs | 319 +++++++++++++++++++++ crates/worker-runtime/src/error.rs | 28 ++ crates/worker-runtime/src/fs_store.rs | 6 + crates/worker-runtime/src/http_server.rs | 89 +++++- crates/worker-runtime/src/lib.rs | 1 + crates/worker-runtime/src/runtime.rs | 235 ++++++++++++++- crates/workspace-server/src/hosts.rs | 309 +++++++++++++++++++- crates/workspace-server/src/server.rs | 59 +++- package.nix | 2 +- 12 files changed, 1025 insertions(+), 28 deletions(-) create mode 100644 crates/worker-runtime/src/config_bundle.rs diff --git a/Cargo.lock b/Cargo.lock index 983d5cb9..f81268a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5941,6 +5941,7 @@ dependencies = [ "protocol", "serde", "serde_json", + "sha2 0.11.0", "thiserror 2.0.18", "tokio", "tokio-tungstenite 0.29.0", diff --git a/crates/worker-runtime/Cargo.toml b/crates/worker-runtime/Cargo.toml index cac97606..3553f0bb 100644 --- a/crates/worker-runtime/Cargo.toml +++ b/crates/worker-runtime/Cargo.toml @@ -21,6 +21,7 @@ axum = { workspace = true, optional = true } futures = { workspace = true, optional = true } protocol = { workspace = true, optional = true } serde = { workspace = true, features = ["derive"] } +sha2.workspace = true serde_json = { workspace = true, optional = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["net", "rt"], optional = true } diff --git a/crates/worker-runtime/src/catalog.rs b/crates/worker-runtime/src/catalog.rs index cf9cd735..85c45d37 100644 --- a/crates/worker-runtime/src/catalog.rs +++ b/crates/worker-runtime/src/catalog.rs @@ -40,10 +40,11 @@ impl Default for ProfileSelector { } } -/// Placeholder for future config-bundle synchronization. +/// Backend-synced config bundle reference used during Worker creation. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct ConfigBundleRef { pub id: String, + pub digest: String, } /// Requested capability name plus optional human-readable reason. diff --git a/crates/worker-runtime/src/config_bundle.rs b/crates/worker-runtime/src/config_bundle.rs new file mode 100644 index 00000000..91837878 --- /dev/null +++ b/crates/worker-runtime/src/config_bundle.rs @@ -0,0 +1,319 @@ +use crate::catalog::ProfileSelector; +use crate::error::RuntimeError; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::path::Path; + +pub const CONFIG_BUNDLE_DIGEST_ALGORITHM: &str = "sha256"; + +/// Backend-synced Profile/config bundle stored by a Runtime. +/// +/// The bundle is intentionally an intent/declaration boundary: it contains +/// profile selectors plus refs/grants/policies, never secret values, direct +/// Runtime endpoints, raw socket/session paths, runtime-local mount actual +/// paths, host-local cache paths, or fully resolved WorkerSpec content. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigBundle { + pub metadata: ConfigBundleMetadata, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub profiles: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub declarations: Vec, +} + +impl ConfigBundle { + pub fn computed_digest(&self) -> String { + let mut lines = Vec::new(); + lines.push(format!("id\0{}", self.metadata.id)); + lines.push(format!("revision\0{}", self.metadata.revision)); + lines.push(format!("workspace_id\0{}", self.metadata.workspace_id)); + lines.push(format!("created_at\0{}", self.metadata.created_at)); + lines.push(format!( + "provenance.source\0{}", + self.metadata.provenance.source + )); + lines.push(format!( + "provenance.detail\0{}", + self.metadata.provenance.detail.as_deref().unwrap_or("") + )); + + let mut profiles = self.profiles.clone(); + profiles.sort_by(|left, right| { + profile_sort_key(&left.selector).cmp(&profile_sort_key(&right.selector)) + }); + for profile in profiles { + lines.push(format!( + "profile\0{}\0{}", + profile_sort_key(&profile.selector), + profile.label.unwrap_or_default() + )); + } + + let mut declarations = self.declarations.clone(); + declarations + .sort_by(|left, right| declaration_sort_key(left).cmp(&declaration_sort_key(right))); + for declaration in declarations { + lines.push(format!( + "declaration\0{}\0{}\0{}", + declaration.kind.canonical_name(), + declaration.name, + declaration.reference + )); + } + + lines.sort(); + let mut hasher = Sha256::new(); + for line in lines { + hasher.update(line.as_bytes()); + hasher.update(b"\n"); + } + let digest = hasher.finalize(); + hex_digest(&digest) + } + + pub fn with_computed_digest(mut self) -> Self { + self.metadata.digest = self.computed_digest(); + self + } + + pub fn summary(&self) -> ConfigBundleSummary { + ConfigBundleSummary { + id: self.metadata.id.clone(), + digest: self.metadata.digest.clone(), + digest_algorithm: CONFIG_BUNDLE_DIGEST_ALGORITHM.to_string(), + revision: self.metadata.revision.clone(), + workspace_id: self.metadata.workspace_id.clone(), + created_at: self.metadata.created_at.clone(), + provenance: self.metadata.provenance.clone(), + profile_count: self.profiles.len(), + declaration_count: self.declarations.len(), + } + } + + pub fn contains_profile(&self, selector: &ProfileSelector) -> bool { + self.profiles + .iter() + .any(|profile| profile.selector == *selector) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigBundleMetadata { + pub id: String, + pub digest: String, + pub revision: String, + pub workspace_id: String, + pub created_at: String, + pub provenance: ConfigBundleProvenance, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigBundleProvenance { + pub source: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub detail: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigProfileDescriptor { + pub selector: ProfileSelector, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub label: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigDeclaration { + pub kind: ConfigDeclarationKind, + pub name: String, + pub reference: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ConfigDeclarationKind { + SecretRef, + MountGrant, + NetworkPolicy, + ShellPolicy, + GitPolicy, + CapabilityGrant, + Unsupported, +} + +impl ConfigDeclarationKind { + pub fn canonical_name(&self) -> &'static str { + match self { + Self::SecretRef => "secret_ref", + Self::MountGrant => "mount_grant", + Self::NetworkPolicy => "network_policy", + Self::ShellPolicy => "shell_policy", + Self::GitPolicy => "git_policy", + Self::CapabilityGrant => "capability_grant", + Self::Unsupported => "unsupported", + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigBundleSummary { + pub id: String, + pub digest: String, + pub digest_algorithm: String, + pub revision: String, + pub workspace_id: String, + pub created_at: String, + pub provenance: ConfigBundleProvenance, + pub profile_count: usize, + pub declaration_count: usize, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigBundleAvailability { + pub reference: crate::catalog::ConfigBundleRef, + pub summary: ConfigBundleSummary, +} + +pub(crate) fn validate_config_bundle(bundle: &ConfigBundle) -> Result<(), RuntimeError> { + validate_non_empty("config bundle id", &bundle.metadata.id)?; + validate_non_empty("config bundle digest", &bundle.metadata.digest)?; + validate_non_empty("config bundle revision", &bundle.metadata.revision)?; + validate_non_empty("config bundle workspace id", &bundle.metadata.workspace_id)?; + validate_non_empty("config bundle created_at", &bundle.metadata.created_at)?; + validate_non_empty( + "config bundle provenance source", + &bundle.metadata.provenance.source, + )?; + validate_boundary_text("config bundle id", &bundle.metadata.id)?; + validate_boundary_text("config bundle revision", &bundle.metadata.revision)?; + validate_boundary_text("config bundle workspace id", &bundle.metadata.workspace_id)?; + validate_boundary_text( + "config bundle provenance source", + &bundle.metadata.provenance.source, + )?; + if let Some(detail) = &bundle.metadata.provenance.detail { + validate_boundary_text("config bundle provenance detail", detail)?; + } + + let computed = bundle.computed_digest(); + if computed != bundle.metadata.digest { + return Err(RuntimeError::ConfigBundleDigestMismatch { + bundle_id: bundle.metadata.id.clone(), + expected_digest: bundle.metadata.digest.clone(), + actual_digest: computed, + }); + } + + for profile in &bundle.profiles { + validate_profile_selector(profile.selector.clone(), Some(&bundle.metadata.id))?; + if let Some(label) = &profile.label { + validate_boundary_text("profile label", label)?; + } + } + + for declaration in &bundle.declarations { + validate_non_empty("config declaration name", &declaration.name)?; + validate_non_empty("config declaration reference", &declaration.reference)?; + validate_boundary_text("config declaration name", &declaration.name)?; + validate_boundary_text("config declaration reference", &declaration.reference)?; + if declaration.kind == ConfigDeclarationKind::Unsupported { + return Err(RuntimeError::UnsupportedConfigDeclaration { + bundle_id: bundle.metadata.id.clone(), + declaration_kind: declaration.kind.canonical_name().to_string(), + name: declaration.name.clone(), + }); + } + } + Ok(()) +} + +pub(crate) fn validate_profile_selector( + selector: ProfileSelector, + bundle_id: Option<&str>, +) -> Result<(), RuntimeError> { + match selector { + ProfileSelector::RuntimeDefault => Ok(()), + ProfileSelector::Builtin(value) | ProfileSelector::Named(value) => { + if value.trim().is_empty() { + Err(RuntimeError::InvalidProfileSelector { + profile: value, + bundle_id: bundle_id.map(ToOwned::to_owned), + message: "profile selector must not be empty".to_string(), + }) + } else { + validate_boundary_text("profile selector", &value).map_err(|err| match err { + RuntimeError::InvalidRequest(message) => RuntimeError::InvalidProfileSelector { + profile: value, + bundle_id: bundle_id.map(ToOwned::to_owned), + message, + }, + other => other, + }) + } + } + } +} + +fn validate_non_empty(label: &'static str, value: &str) -> Result<(), RuntimeError> { + if value.trim().is_empty() { + Err(RuntimeError::InvalidRequest(format!( + "{label} must not be empty" + ))) + } else { + Ok(()) + } +} + +fn validate_boundary_text(label: &'static str, value: &str) -> Result<(), RuntimeError> { + let trimmed = value.trim(); + if trimmed.len() > 2048 { + return Err(RuntimeError::InvalidRequest(format!( + "{label} is too large" + ))); + } + if trimmed.chars().any(char::is_control) { + return Err(RuntimeError::InvalidRequest(format!( + "{label} must not contain control characters" + ))); + } + if Path::new(trimmed).is_absolute() + || trimmed.starts_with('~') + || trimmed.contains("/.cache") + || trimmed.contains("\\.cache") + || trimmed.contains("/run/") + || trimmed.contains("\\run\\") + || trimmed.contains(".sock") + || trimmed.contains("socket=") + || trimmed.contains("session_path") + || trimmed.contains("cache_path") + { + return Err(RuntimeError::InvalidRequest(format!( + "{label} must be a stable ref/grant/policy declaration, not a host-local path" + ))); + } + Ok(()) +} + +fn declaration_sort_key(declaration: &ConfigDeclaration) -> String { + format!( + "{}\0{}\0{}", + declaration.kind.canonical_name(), + declaration.name, + declaration.reference + ) +} + +fn profile_sort_key(selector: &ProfileSelector) -> String { + match selector { + ProfileSelector::RuntimeDefault => "runtime_default".to_string(), + ProfileSelector::Builtin(value) => format!("builtin\0{value}"), + ProfileSelector::Named(value) => format!("named\0{value}"), + } +} + +fn hex_digest(bytes: &[u8]) -> String { + let mut out = String::with_capacity(bytes.len() * 2); + for byte in bytes { + out.push_str(&format!("{byte:02x}")); + } + out +} diff --git a/crates/worker-runtime/src/error.rs b/crates/worker-runtime/src/error.rs index 76b1c6b4..54484d64 100644 --- a/crates/worker-runtime/src/error.rs +++ b/crates/worker-runtime/src/error.rs @@ -34,6 +34,34 @@ pub enum RuntimeError { #[error("invalid request: {0}")] InvalidRequest(String), + #[error("config bundle `{bundle_id}` was not found")] + ConfigBundleMissing { bundle_id: String }, + + #[error( + "config bundle `{bundle_id}` digest mismatch: expected {expected_digest}, got {actual_digest}" + )] + ConfigBundleDigestMismatch { + bundle_id: String, + expected_digest: String, + actual_digest: String, + }, + + #[error("invalid profile selector `{profile}` for config bundle {bundle_id:?}: {message}")] + InvalidProfileSelector { + profile: String, + bundle_id: Option, + message: String, + }, + + #[error( + "config bundle `{bundle_id}` contains unsupported declaration `{declaration_kind}` named `{name}`" + )] + UnsupportedConfigDeclaration { + bundle_id: String, + declaration_kind: String, + name: String, + }, + #[error("runtime store {operation} failed at {}: {source}", path.display())] StoreIo { operation: &'static str, diff --git a/crates/worker-runtime/src/fs_store.rs b/crates/worker-runtime/src/fs_store.rs index 54f1c41c..bbc7d84f 100644 --- a/crates/worker-runtime/src/fs_store.rs +++ b/crates/worker-runtime/src/fs_store.rs @@ -1,4 +1,5 @@ use crate::catalog::{CreateWorkerRequest, WorkerStatus}; +use crate::config_bundle::ConfigBundle; use crate::diagnostics::RuntimeDiagnostic; use crate::error::RuntimeError; use crate::identity::{RuntimeId, WorkerId, WorkerRef}; @@ -364,6 +365,7 @@ pub(crate) struct PersistedRuntimeState { pub(crate) next_event_id: u64, pub(crate) next_diagnostic_id: u64, pub(crate) workers: BTreeMap, + pub(crate) config_bundles: BTreeMap, pub(crate) events: Vec, pub(crate) diagnostics: Vec, } @@ -390,6 +392,8 @@ struct RuntimeSnapshot { next_worker_sequence: u64, next_event_id: u64, next_diagnostic_id: u64, + #[serde(default)] + config_bundles: BTreeMap, diagnostics: Vec, } @@ -405,6 +409,7 @@ impl RuntimeSnapshot { next_worker_sequence: state.next_worker_sequence, next_event_id: state.next_event_id, next_diagnostic_id: state.next_diagnostic_id, + config_bundles: state.config_bundles.clone(), diagnostics: state.diagnostics.clone(), } } @@ -454,6 +459,7 @@ impl RuntimeSnapshot { next_event_id: self.next_event_id, next_diagnostic_id: self.next_diagnostic_id, workers, + config_bundles: self.config_bundles, events, diagnostics: self.diagnostics, } diff --git a/crates/worker-runtime/src/http_server.rs b/crates/worker-runtime/src/http_server.rs index dd9a3357..7bd2bdfb 100644 --- a/crates/worker-runtime/src/http_server.rs +++ b/crates/worker-runtime/src/http_server.rs @@ -7,7 +7,10 @@ //! credentials, registration, and policy. use crate::Runtime; -use crate::catalog::{CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerSummary}; +use crate::catalog::{ + ConfigBundleRef, CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerSummary, +}; +use crate::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use crate::error::RuntimeError; #[cfg(feature = "fs-store")] use crate::fs_store::FsRuntimeStoreOptions; @@ -165,6 +168,14 @@ pub fn runtime_http_router(runtime: Runtime, local_token: Option) -> Rou let router = Router::new() .route("/v1/runtime", get(get_runtime)) + .route( + "/v1/config-bundles", + get(list_config_bundles).post(store_config_bundle), + ) + .route( + "/v1/config-bundles/{bundle_id}/availability", + get(check_config_bundle), + ) .route("/v1/workers", get(list_workers).post(create_worker)) .route("/v1/workers/{worker_id}", get(get_worker)) .route("/v1/workers/{worker_id}/input", post(send_worker_input)) @@ -216,6 +227,29 @@ pub struct RuntimeHttpSummaryResponse { pub runtime: RuntimeSummary, } +/// `GET /v1/config-bundles` response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpConfigBundlesResponse { + pub bundles: Vec, +} + +/// `POST /v1/config-bundles` request. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpConfigBundleSyncRequest { + pub bundle: ConfigBundle, +} + +/// Config bundle availability response used by sync/check endpoints. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpConfigBundleAvailabilityResponse { + pub availability: ConfigBundleAvailability, +} + +#[derive(Clone, Debug, Deserialize)] +struct RuntimeHttpConfigBundleAvailabilityQuery { + digest: String, +} + /// `GET /v1/workers` response. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct RuntimeHttpWorkersResponse { @@ -372,6 +406,48 @@ async fn get_runtime( Ok(Json(RuntimeHttpSummaryResponse { runtime })) } +async fn list_config_bundles( + State(state): State, +) -> RestResult { + let bundles = state + .runtime + .list_config_bundles() + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpConfigBundlesResponse { bundles })) +} + +async fn store_config_bundle( + State(state): State, + body: Result, JsonRejection>, +) -> RestResult { + let Json(request) = body.map_err(RuntimeHttpRestError::json_rejection)?; + let availability = state + .runtime + .store_config_bundle(request.bundle) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpConfigBundleAvailabilityResponse { + availability, + })) +} + +async fn check_config_bundle( + State(state): State, + Path(bundle_id): Path, + query: Result, QueryRejection>, +) -> RestResult { + let Query(query) = query.map_err(RuntimeHttpRestError::query_rejection)?; + let availability = state + .runtime + .check_config_bundle(&ConfigBundleRef { + id: bundle_id, + digest: query.digest, + }) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpConfigBundleAvailabilityResponse { + availability, + })) +} + async fn list_workers( State(state): State, ) -> RestResult { @@ -743,10 +819,15 @@ impl IntoResponse for RuntimeHttpRestError { fn status_for_runtime_error(error: &RuntimeError) -> StatusCode { match error { - RuntimeError::WorkerNotFound { .. } => StatusCode::NOT_FOUND, + RuntimeError::WorkerNotFound { .. } | RuntimeError::ConfigBundleMissing { .. } => { + StatusCode::NOT_FOUND + } RuntimeError::RuntimeStopped { .. } => StatusCode::CONFLICT, RuntimeError::LimitTooLarge { .. } | RuntimeError::InvalidRequest(_) + | RuntimeError::ConfigBundleDigestMismatch { .. } + | RuntimeError::InvalidProfileSelector { .. } + | RuntimeError::UnsupportedConfigDeclaration { .. } | RuntimeError::WrongRuntime { .. } | RuntimeError::WrongRuntimeCursor { .. } => StatusCode::BAD_REQUEST, RuntimeError::StoreIo { .. } @@ -764,6 +845,10 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str { RuntimeError::WorkerNotFound { .. } => "worker_not_found", RuntimeError::LimitTooLarge { .. } => "limit_too_large", RuntimeError::InvalidRequest(_) => "invalid_request", + RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing", + RuntimeError::ConfigBundleDigestMismatch { .. } => "config_bundle_digest_mismatch", + RuntimeError::InvalidProfileSelector { .. } => "invalid_profile_selector", + RuntimeError::UnsupportedConfigDeclaration { .. } => "unsupported_config_declaration", RuntimeError::StoreIo { .. } => "store_io", RuntimeError::StoreMissing { .. } => "store_missing", RuntimeError::StoreCorrupt { .. } => "store_corrupt", diff --git a/crates/worker-runtime/src/lib.rs b/crates/worker-runtime/src/lib.rs index 7864d534..e6fa362d 100644 --- a/crates/worker-runtime/src/lib.rs +++ b/crates/worker-runtime/src/lib.rs @@ -8,6 +8,7 @@ //! can later adapt into registries or backend APIs. pub mod catalog; +pub mod config_bundle; pub mod diagnostics; pub mod error; #[cfg(feature = "fs-store")] diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 172b4f65..7fa559ea 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -1,5 +1,10 @@ use crate::catalog::{ - CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerStatus, WorkerSummary, + ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail, WorkerLifecycleAck, + WorkerStatus, WorkerSummary, +}; +use crate::config_bundle::{ + ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle, + validate_profile_selector, }; use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; use crate::error::RuntimeError; @@ -128,6 +133,45 @@ impl Runtime { Ok(self.lock()?.status) } + /// Store a backend-synced Profile/config bundle for later Worker creation. + pub fn store_config_bundle( + &self, + bundle: ConfigBundle, + ) -> Result { + validate_config_bundle(&bundle)?; + let mut state = self.lock()?; + state.ensure_running()?; + let reference = ConfigBundleRef { + id: bundle.metadata.id.clone(), + digest: bundle.metadata.digest.clone(), + }; + let summary = bundle.summary(); + state + .config_bundles + .insert(bundle.metadata.id.clone(), bundle); + state.persist_runtime_snapshot()?; + Ok(ConfigBundleAvailability { reference, summary }) + } + + /// List synced config bundles known to this Runtime. + pub fn list_config_bundles(&self) -> Result, RuntimeError> { + Ok(self + .lock()? + .config_bundles + .values() + .map(ConfigBundle::summary) + .collect()) + } + + /// Validate that a config bundle reference is present and digest-matched. + pub fn check_config_bundle( + &self, + reference: &ConfigBundleRef, + ) -> Result { + let state = self.lock()?; + state.check_config_bundle_ref(reference) + } + /// Stop the Runtime. v0 keeps data readable after stop, but rejects new /// create/send/worker lifecycle mutations. pub fn stop_runtime(&self) -> Result { @@ -161,6 +205,7 @@ impl Runtime { let mut state = self.lock()?; state.ensure_running()?; validate_create_worker_request(&request)?; + state.validate_worker_config_boundary(&request)?; let worker_id = WorkerId::generated(state.next_worker_sequence); state.next_worker_sequence += 1; @@ -551,6 +596,7 @@ struct RuntimeState { next_event_id: u64, next_diagnostic_id: u64, workers: BTreeMap, + config_bundles: BTreeMap, events: Vec, diagnostics: Vec, #[cfg(feature = "ws-server")] @@ -574,6 +620,7 @@ impl RuntimeState { next_event_id: 1, next_diagnostic_id: 1, workers: BTreeMap::new(), + config_bundles: BTreeMap::new(), events: Vec::new(), diagnostics: Vec::new(), #[cfg(feature = "ws-server")] @@ -603,6 +650,7 @@ impl RuntimeState { next_event_id: 1, next_diagnostic_id: 1, workers: BTreeMap::new(), + config_bundles: BTreeMap::new(), events: Vec::new(), diagnostics: Vec::new(), #[cfg(feature = "ws-server")] @@ -658,6 +706,7 @@ impl RuntimeState { next_event_id: persisted.next_event_id, next_diagnostic_id: persisted.next_diagnostic_id, workers, + config_bundles: persisted.config_bundles, events: persisted.events, diagnostics: persisted.diagnostics, }) @@ -678,6 +727,7 @@ impl RuntimeState { .iter() .map(|(worker_id, worker)| (worker_id.clone(), worker.persisted_record())) .collect(), + config_bundles: self.config_bundles.clone(), events: self.events.clone(), diagnostics: self.diagnostics.clone(), } @@ -810,6 +860,69 @@ impl RuntimeState { } } + fn check_config_bundle_ref( + &self, + reference: &ConfigBundleRef, + ) -> Result { + if reference.id.trim().is_empty() || reference.digest.trim().is_empty() { + return Err(RuntimeError::InvalidRequest( + "config bundle reference id and digest must not be empty".to_string(), + )); + } + let bundle = self.config_bundles.get(&reference.id).ok_or_else(|| { + RuntimeError::ConfigBundleMissing { + bundle_id: reference.id.clone(), + } + })?; + if bundle.metadata.digest != reference.digest { + return Err(RuntimeError::ConfigBundleDigestMismatch { + bundle_id: reference.id.clone(), + expected_digest: reference.digest.clone(), + actual_digest: bundle.metadata.digest.clone(), + }); + } + Ok(ConfigBundleAvailability { + reference: reference.clone(), + summary: bundle.summary(), + }) + } + + fn validate_worker_config_boundary( + &self, + request: &CreateWorkerRequest, + ) -> Result<(), RuntimeError> { + match &request.config_bundle { + Some(reference) => { + let availability = self.check_config_bundle_ref(reference)?; + let bundle = self + .config_bundles + .get(&availability.reference.id) + .ok_or_else(|| RuntimeError::ConfigBundleMissing { + bundle_id: availability.reference.id.clone(), + })?; + if !bundle.contains_profile(&request.profile) { + return Err(RuntimeError::InvalidProfileSelector { + profile: profile_label(&request.profile), + bundle_id: Some(reference.id.clone()), + message: "profile selector is not declared by synced config bundle" + .to_string(), + }); + } + Ok(()) + } + None => match &request.profile { + ProfileSelector::RuntimeDefault | ProfileSelector::Builtin(_) => { + validate_profile_selector(request.profile.clone(), None) + } + ProfileSelector::Named(_) => Err(RuntimeError::InvalidProfileSelector { + profile: profile_label(&request.profile), + bundle_id: None, + message: "named profiles require a synced config bundle reference".to_string(), + }), + }, + } + } + fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { if worker_ref.runtime_id != self.runtime_id { return Err(RuntimeError::WrongRuntime { @@ -1012,6 +1125,14 @@ impl WorkerRecord { } } +fn profile_label(selector: &ProfileSelector) -> String { + match selector { + ProfileSelector::RuntimeDefault => "runtime_default".to_string(), + ProfileSelector::Builtin(value) => value.clone(), + ProfileSelector::Named(value) => value.clone(), + } +} + fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { if let crate::catalog::WorkerIntent::Task { objective } = &request.intent { if objective.trim().is_empty() { @@ -1043,6 +1164,10 @@ fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> { mod tests { use super::*; use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent}; + use crate::config_bundle::{ + ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration, + ConfigDeclarationKind, ConfigProfileDescriptor, + }; use crate::management::RuntimeLimits; fn task_request(objective: &str) -> CreateWorkerRequest { @@ -1051,15 +1176,48 @@ mod tests { objective: objective.to_string(), }, profile: ProfileSelector::Builtin("builtin:coder".to_string()), - config_bundle: Some(ConfigBundleRef { - id: "bundle-1".to_string(), - }), + config_bundle: None, requested_capabilities: vec![CapabilityRequest::named("read")], workspace_refs: Vec::new(), mount_refs: Vec::new(), } } + fn test_bundle() -> ConfigBundle { + ConfigBundle { + metadata: ConfigBundleMetadata { + id: "bundle-1".to_string(), + digest: String::new(), + revision: "rev-1".to_string(), + workspace_id: "workspace-1".to_string(), + created_at: "2026-06-26T00:00:00Z".to_string(), + provenance: ConfigBundleProvenance { + source: "workspace-backend".to_string(), + detail: Some("profile-sync".to_string()), + }, + }, + profiles: vec![ConfigProfileDescriptor { + selector: ProfileSelector::Builtin("builtin:coder".to_string()), + label: Some("Coder".to_string()), + }], + declarations: vec![ConfigDeclaration { + kind: ConfigDeclarationKind::CapabilityGrant, + name: "read".to_string(), + reference: "capability:read".to_string(), + }], + } + .with_computed_digest() + } + + fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest { + let mut request = task_request(objective); + request.config_bundle = Some(ConfigBundleRef { + id: bundle.metadata.id.clone(), + digest: bundle.metadata.digest.clone(), + }); + request + } + #[test] fn create_list_and_detail_preserve_runtime_worker_authority() { let runtime = Runtime::new_memory(); @@ -1067,7 +1225,7 @@ mod tests { assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap()); assert_eq!(detail.status, WorkerStatus::Running); - assert!(detail.config_bundle.is_some()); + assert!(detail.config_bundle.is_none()); let list = runtime.list_workers().unwrap(); assert_eq!(list.len(), 1); @@ -1079,6 +1237,73 @@ mod tests { assert_eq!(fetched.intent, detail.intent); } + #[test] + fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() { + let runtime = Runtime::new_memory(); + let bundle = test_bundle(); + let availability = runtime.store_config_bundle(bundle.clone()).unwrap(); + assert_eq!(availability.reference.id, "bundle-1"); + assert_eq!(availability.reference.digest, bundle.metadata.digest); + + let listed = runtime.list_config_bundles().unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].id, "bundle-1"); + + let checked = runtime + .check_config_bundle(&availability.reference) + .unwrap(); + assert_eq!(checked.summary.digest, availability.summary.digest); + + let detail = runtime + .create_worker(bundled_task_request("synced", &bundle)) + .unwrap(); + assert_eq!(detail.config_bundle, Some(availability.reference)); + } + + #[test] + fn config_bundle_errors_are_typed() { + let runtime = Runtime::new_memory(); + let bundle = test_bundle(); + + let missing = runtime + .create_worker(bundled_task_request("missing", &bundle)) + .unwrap_err(); + assert!(matches!(missing, RuntimeError::ConfigBundleMissing { .. })); + + runtime.store_config_bundle(bundle.clone()).unwrap(); + let mismatch = runtime + .check_config_bundle(&ConfigBundleRef { + id: bundle.metadata.id.clone(), + digest: "bad-digest".to_string(), + }) + .unwrap_err(); + assert!(matches!( + mismatch, + RuntimeError::ConfigBundleDigestMismatch { .. } + )); + + let mut bad_profile = bundled_task_request("bad profile", &bundle); + bad_profile.profile = ProfileSelector::Builtin("builtin:reviewer".to_string()); + let invalid_profile = runtime.create_worker(bad_profile).unwrap_err(); + assert!(matches!( + invalid_profile, + RuntimeError::InvalidProfileSelector { .. } + )); + + let mut unsupported = test_bundle(); + unsupported.declarations.push(ConfigDeclaration { + kind: ConfigDeclarationKind::Unsupported, + name: "plugin-registry".to_string(), + reference: "plugin-registry:v0".to_string(), + }); + unsupported = unsupported.with_computed_digest(); + let unsupported_err = runtime.store_config_bundle(unsupported).unwrap_err(); + assert!(matches!( + unsupported_err, + RuntimeError::UnsupportedConfigDeclaration { .. } + )); + } + #[test] fn rejects_worker_refs_from_another_runtime() { let runtime_a = Runtime::new_memory(); diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index f5e6615f..4094be3a 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -16,11 +16,13 @@ use std::{ time::Duration, }; use worker_runtime::catalog::{ - CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, - WorkerStatus as EmbeddedWorkerStatus, + CapabilityRequest, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, + WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus, }; +use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; use worker_runtime::http_server::{ + RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, RuntimeHttpWorkerInputResponse, RuntimeHttpWorkerLifecycleRequest, RuntimeHttpWorkerLifecycleResponse, RuntimeHttpWorkerResponse, RuntimeHttpWorkersResponse, @@ -261,16 +263,24 @@ pub struct WorkerLookupResult { /// Browser-safe worker spawn request shape. /// -/// The request intentionally carries only workspace policy intents and stable -/// worker identifiers. Raw workspace roots, child cwd, executable path, and raw -/// profile selectors are resolved by the runtime service and never accepted from -/// Workspace API callers. +/// The request intentionally carries only workspace policy intents, stable +/// worker identifiers, optional profile selectors, config bundle refs, and +/// requested capability names. Raw workspace roots, child cwd, executable path, +/// Runtime endpoints/credentials, raw bundle storage paths, and host-local +/// resolved WorkerSpec content are resolved by the runtime service and never +/// accepted from Workspace API callers. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSpawnRequest { pub intent: WorkerSpawnIntent, #[serde(skip_serializing_if = "Option::is_none")] pub requested_worker_name: Option, pub acceptance: WorkerSpawnAcceptanceRequirement, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub profile: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub config_bundle: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub requested_capabilities: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -309,6 +319,28 @@ pub struct WorkerSpawnResult { pub diagnostics: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ConfigBundleSyncResult { + pub state: WorkerOperationState, + #[serde(skip_serializing_if = "Option::is_none")] + pub availability: Option, + pub diagnostics: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ConfigBundleCheckResult { + pub state: WorkerOperationState, + #[serde(skip_serializing_if = "Option::is_none")] + pub availability: Option, + pub diagnostics: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ConfigBundleListResult { + pub bundles: Vec, + pub diagnostics: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum WorkerOperationState { @@ -492,6 +524,41 @@ pub trait WorkspaceWorkerRuntime: Send + Sync { } } + fn sync_config_bundle(&self, _bundle: ConfigBundle) -> ConfigBundleSyncResult { + ConfigBundleSyncResult { + state: WorkerOperationState::Unsupported, + availability: None, + diagnostics: vec![diagnostic( + "config_bundle_sync_unsupported", + DiagnosticSeverity::Info, + "runtime does not implement config bundle sync".to_string(), + )], + } + } + + fn check_config_bundle(&self, _reference: ConfigBundleRef) -> ConfigBundleCheckResult { + ConfigBundleCheckResult { + state: WorkerOperationState::Unsupported, + availability: None, + diagnostics: vec![diagnostic( + "config_bundle_check_unsupported", + DiagnosticSeverity::Info, + "runtime does not implement config bundle availability checks".to_string(), + )], + } + } + + fn list_config_bundles(&self) -> ConfigBundleListResult { + ConfigBundleListResult { + bundles: Vec::new(), + diagnostics: vec![diagnostic( + "config_bundle_list_unsupported", + DiagnosticSeverity::Info, + "runtime does not implement config bundle listing".to_string(), + )], + } + } + fn stop_worker( &self, worker_id: &str, @@ -729,6 +796,35 @@ impl RuntimeRegistry { Ok(runtime.spawn_worker(request)) } + pub fn sync_config_bundle( + &self, + runtime_id: &str, + bundle: ConfigBundle, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + let runtime = self.runtime(runtime_id)?; + Ok(runtime.sync_config_bundle(bundle)) + } + + pub fn check_config_bundle( + &self, + runtime_id: &str, + reference: ConfigBundleRef, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + let runtime = self.runtime(runtime_id)?; + Ok(runtime.check_config_bundle(reference)) + } + + pub fn list_config_bundles( + &self, + runtime_id: &str, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + let runtime = self.runtime(runtime_id)?; + Ok(runtime.list_config_bundles()) + } + pub fn send_input( &self, runtime_id: &str, @@ -1091,10 +1187,21 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { )); } - let create_request = CreateWorkerRequest::tools_less( - embedded_create_intent(&request.intent), - embedded_profile_selector(&request.intent), - ); + let create_request = CreateWorkerRequest { + intent: embedded_create_intent(&request.intent), + profile: request + .profile + .clone() + .unwrap_or_else(|| embedded_profile_selector(&request.intent)), + config_bundle: request.config_bundle.clone(), + requested_capabilities: if request.requested_capabilities.is_empty() { + vec![CapabilityRequest::named("read")] + } else { + request.requested_capabilities.clone() + }, + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + }; match self.runtime.create_worker(create_request) { Ok(detail) => WorkerSpawnResult { state: WorkerOperationState::Accepted, @@ -1126,6 +1233,49 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } } + fn sync_config_bundle(&self, bundle: ConfigBundle) -> ConfigBundleSyncResult { + match self.runtime.store_config_bundle(bundle) { + Ok(availability) => ConfigBundleSyncResult { + state: WorkerOperationState::Accepted, + availability: Some(availability), + diagnostics: Vec::new(), + }, + Err(error) => ConfigBundleSyncResult { + state: WorkerOperationState::Rejected, + availability: None, + diagnostics: vec![embedded_runtime_diagnostic(&error)], + }, + } + } + + fn check_config_bundle(&self, reference: ConfigBundleRef) -> ConfigBundleCheckResult { + match self.runtime.check_config_bundle(&reference) { + Ok(availability) => ConfigBundleCheckResult { + state: WorkerOperationState::Accepted, + availability: Some(availability), + diagnostics: Vec::new(), + }, + Err(error) => ConfigBundleCheckResult { + state: WorkerOperationState::Rejected, + availability: None, + diagnostics: vec![embedded_runtime_diagnostic(&error)], + }, + } + } + + fn list_config_bundles(&self) -> ConfigBundleListResult { + match self.runtime.list_config_bundles() { + Ok(bundles) => ConfigBundleListResult { + bundles, + diagnostics: Vec::new(), + }, + Err(error) => ConfigBundleListResult { + bundles: Vec::new(), + diagnostics: vec![embedded_runtime_diagnostic(&error)], + }, + } + } + fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { let Some(worker_ref) = self.worker_ref(worker_id) else { return embedded_input_rejected( @@ -1574,10 +1724,21 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { )], }; } - let create = CreateWorkerRequest::tools_less( - embedded_create_intent(&request.intent), - embedded_profile_selector(&request.intent), - ); + let create = CreateWorkerRequest { + intent: embedded_create_intent(&request.intent), + profile: request + .profile + .clone() + .unwrap_or_else(|| embedded_profile_selector(&request.intent)), + config_bundle: request.config_bundle.clone(), + requested_capabilities: if request.requested_capabilities.is_empty() { + vec![CapabilityRequest::named("read")] + } else { + request.requested_capabilities.clone() + }, + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + }; match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) { Ok(response) => WorkerSpawnResult { state: WorkerOperationState::Accepted, @@ -1601,6 +1762,44 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { } } + fn sync_config_bundle(&self, bundle: ConfigBundle) -> ConfigBundleSyncResult { + let request = RuntimeHttpConfigBundleSyncRequest { bundle }; + match self.post_json::<_, RuntimeHttpConfigBundleAvailabilityResponse>( + "/v1/config-bundles", + &request, + ) { + Ok(response) => ConfigBundleSyncResult { + state: WorkerOperationState::Accepted, + availability: Some(response.availability), + diagnostics: Vec::new(), + }, + Err(diagnostic) => ConfigBundleSyncResult { + state: WorkerOperationState::Rejected, + availability: None, + diagnostics: vec![diagnostic], + }, + } + } + + fn check_config_bundle(&self, reference: ConfigBundleRef) -> ConfigBundleCheckResult { + let path = format!( + "/v1/config-bundles/{}/availability?digest={}", + reference.id, reference.digest + ); + match self.get_json::(&path) { + Ok(response) => ConfigBundleCheckResult { + state: WorkerOperationState::Accepted, + availability: Some(response.availability), + diagnostics: Vec::new(), + }, + Err(diagnostic) => ConfigBundleCheckResult { + state: WorkerOperationState::Rejected, + availability: None, + diagnostics: vec![diagnostic], + }, + } + } + fn stop_worker( &self, worker_id: &str, @@ -2197,7 +2396,11 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti DiagnosticSeverity::Warning, format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"), ), - EmbeddedRuntimeError::InvalidRequest(_) => diagnostic( + EmbeddedRuntimeError::InvalidRequest(_) + | EmbeddedRuntimeError::ConfigBundleMissing { .. } + | EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. } + | EmbeddedRuntimeError::InvalidProfileSelector { .. } + | EmbeddedRuntimeError::UnsupportedConfigDeclaration { .. } => diagnostic( "embedded_runtime_invalid_request", DiagnosticSeverity::Warning, "Embedded Runtime rejected the request".to_string(), @@ -2592,6 +2795,32 @@ mod tests { metadata } + fn test_config_bundle() -> ConfigBundle { + ConfigBundle { + metadata: worker_runtime::config_bundle::ConfigBundleMetadata { + id: "bundle-1".to_string(), + digest: String::new(), + revision: "rev-1".to_string(), + workspace_id: "local:test".to_string(), + created_at: "2026-06-26T00:00:00Z".to_string(), + provenance: worker_runtime::config_bundle::ConfigBundleProvenance { + source: "workspace-server-test".to_string(), + detail: None, + }, + }, + profiles: vec![worker_runtime::config_bundle::ConfigProfileDescriptor { + selector: ProfileSelector::Builtin("builtin:coder".to_string()), + label: Some("Coder".to_string()), + }], + declarations: vec![worker_runtime::config_bundle::ConfigDeclaration { + kind: worker_runtime::config_bundle::ConfigDeclarationKind::CapabilityGrant, + name: "read".to_string(), + reference: "capability:read".to_string(), + }], + } + .with_computed_digest() + } + fn assert_valid_generated_id(id: &str) { assert!(id.len() <= MAX_IDENTIFIER_LEN, "id too long: {id}"); validate_backend_identifier("test_id", id).unwrap(); @@ -2941,6 +3170,9 @@ mod tests { acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments: 0, }, + profile: None, + config_bundle: None, + requested_capabilities: Vec::new(), }, ) .unwrap(); @@ -3013,6 +3245,50 @@ mod tests { } } + #[test] + fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() { + let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( + "local:test", + ))]); + let bundle = test_config_bundle(); + let sync = registry + .sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone()) + .unwrap(); + assert_eq!(sync.state, WorkerOperationState::Accepted); + let reference = sync.availability.expect("bundle availability").reference; + assert_eq!(reference.id, bundle.metadata.id); + assert_eq!(reference.digest, bundle.metadata.digest); + + let check = registry + .check_config_bundle(EMBEDDED_RUNTIME_ID, reference.clone()) + .unwrap(); + assert_eq!(check.state, WorkerOperationState::Accepted); + + let spawned = registry + .spawn_worker( + EMBEDDED_RUNTIME_ID, + WorkerSpawnRequest { + intent: WorkerSpawnIntent::TicketRole { + ticket_id: "00001KVZSGT0Q".to_string(), + role: TicketWorkerRole::Coder, + }, + requested_worker_name: None, + acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { + expected_segments: 0, + }, + profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())), + config_bundle: Some(reference), + requested_capabilities: vec![CapabilityRequest::named("read")], + }, + ) + .unwrap(); + assert_eq!(spawned.state, WorkerOperationState::Accepted); + assert_eq!( + spawned.worker.unwrap().profile.as_deref(), + Some("builtin:coder") + ); + } + #[test] fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() { let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( @@ -3025,6 +3301,9 @@ mod tests { intent: WorkerSpawnIntent::WorkspaceCompanion, requested_worker_name: None, acceptance: WorkerSpawnAcceptanceRequirement::SocketReady, + profile: None, + config_bundle: None, + requested_capabilities: Vec::new(), }, ) .unwrap(); diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index e3a98dc0..421c1e91 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -13,10 +13,11 @@ use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use crate::hosts::{ - DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime, - RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic, RuntimeRegistry, RuntimeSummary, - WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest, WorkerLifecycleResult, - WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection, + ConfigBundleCheckResult, ConfigBundleSyncResult, DiagnosticSeverity, EmbeddedWorkerRuntime, + HostSummary, LocalWorkerRuntime, RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic, + RuntimeRegistry, RuntimeSummary, WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest, + WorkerLifecycleResult, WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary, + WorkerTranscriptProjection, }; use crate::identity::WorkspaceIdentity; use crate::observation::{ @@ -29,6 +30,8 @@ use crate::records::{ use crate::repositories::{LocalRepositoryReader, RepositoryLogRead, RepositorySummary}; use crate::store::{ControlPlaneStore, WorkspaceRecord}; use crate::{Error, Result}; +use worker_runtime::catalog::ConfigBundleRef; +use worker_runtime::config_bundle::ConfigBundle; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum AuthConfig { @@ -155,6 +158,14 @@ pub fn build_router(api: WorkspaceApi) -> Router { "/api/runtimes/{runtime_id}/workers", post(create_runtime_worker), ) + .route( + "/api/runtimes/{runtime_id}/config-bundles", + post(sync_runtime_config_bundle), + ) + .route( + "/api/runtimes/{runtime_id}/config-bundles/{bundle_id}/availability", + get(check_runtime_config_bundle), + ) .route( "/api/runtimes/{runtime_id}/workers/{worker_id}", get(get_runtime_worker), @@ -491,6 +502,16 @@ async fn get_runtime_worker( Ok(Json(worker)) } +#[derive(Debug, Serialize, Deserialize)] +pub struct RuntimeConfigBundleSyncRequest { + pub bundle: ConfigBundle, +} + +#[derive(Debug, Deserialize)] +struct RuntimeConfigBundleAvailabilityQuery { + digest: String, +} + async fn create_runtime_worker( State(api): State, AxumPath(runtime_id): AxumPath, @@ -503,6 +524,36 @@ async fn create_runtime_worker( Ok(Json(result)) } +async fn sync_runtime_config_bundle( + State(api): State, + AxumPath(runtime_id): AxumPath, + Json(request): Json, +) -> ApiResult> { + let result = api + .runtime + .sync_config_bundle(&runtime_id, request.bundle) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + +async fn check_runtime_config_bundle( + State(api): State, + AxumPath((runtime_id, bundle_id)): AxumPath<(String, String)>, + Query(query): Query, +) -> ApiResult> { + let result = api + .runtime + .check_config_bundle( + &runtime_id, + ConfigBundleRef { + id: bundle_id, + digest: query.digest, + }, + ) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + async fn send_runtime_worker_input( State(api): State, AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, diff --git a/package.nix b/package.nix index 5d2cb9f1..ddfeb04e 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-kZ9TAb1lNpslAhzcyC2RyIZg5Yh5hrAGCTZIhhYl/e4="; + cargoHash = "sha256-/7qrJH25rQSV2tKMOVUSu6ISUuEi+4WdwuX0E94LZYg="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,