feat: add runtime config bundle sync
This commit is contained in:
parent
9a09ebd9ac
commit
abab1af2f0
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -5941,6 +5941,7 @@ dependencies = [
|
|||
"protocol",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.11.0",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.29.0",
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ axum = { workspace = true, optional = true }
|
|||
futures = { workspace = true, optional = true }
|
||||
protocol = { workspace = true, optional = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
sha2.workspace = true
|
||||
serde_json = { workspace = true, optional = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["net", "rt"], optional = true }
|
||||
|
|
|
|||
|
|
@ -40,10 +40,11 @@ impl Default for ProfileSelector {
|
|||
}
|
||||
}
|
||||
|
||||
/// Placeholder for future config-bundle synchronization.
|
||||
/// Backend-synced config bundle reference used during Worker creation.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigBundleRef {
|
||||
pub id: String,
|
||||
pub digest: String,
|
||||
}
|
||||
|
||||
/// Requested capability name plus optional human-readable reason.
|
||||
|
|
|
|||
319
crates/worker-runtime/src/config_bundle.rs
Normal file
319
crates/worker-runtime/src/config_bundle.rs
Normal file
|
|
@ -0,0 +1,319 @@
|
|||
use crate::catalog::ProfileSelector;
|
||||
use crate::error::RuntimeError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::path::Path;
|
||||
|
||||
pub const CONFIG_BUNDLE_DIGEST_ALGORITHM: &str = "sha256";
|
||||
|
||||
/// Backend-synced Profile/config bundle stored by a Runtime.
|
||||
///
|
||||
/// The bundle is intentionally an intent/declaration boundary: it contains
|
||||
/// profile selectors plus refs/grants/policies, never secret values, direct
|
||||
/// Runtime endpoints, raw socket/session paths, runtime-local mount actual
|
||||
/// paths, host-local cache paths, or fully resolved WorkerSpec content.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigBundle {
|
||||
pub metadata: ConfigBundleMetadata,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub profiles: Vec<ConfigProfileDescriptor>,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub declarations: Vec<ConfigDeclaration>,
|
||||
}
|
||||
|
||||
impl ConfigBundle {
|
||||
pub fn computed_digest(&self) -> String {
|
||||
let mut lines = Vec::new();
|
||||
lines.push(format!("id\0{}", self.metadata.id));
|
||||
lines.push(format!("revision\0{}", self.metadata.revision));
|
||||
lines.push(format!("workspace_id\0{}", self.metadata.workspace_id));
|
||||
lines.push(format!("created_at\0{}", self.metadata.created_at));
|
||||
lines.push(format!(
|
||||
"provenance.source\0{}",
|
||||
self.metadata.provenance.source
|
||||
));
|
||||
lines.push(format!(
|
||||
"provenance.detail\0{}",
|
||||
self.metadata.provenance.detail.as_deref().unwrap_or("")
|
||||
));
|
||||
|
||||
let mut profiles = self.profiles.clone();
|
||||
profiles.sort_by(|left, right| {
|
||||
profile_sort_key(&left.selector).cmp(&profile_sort_key(&right.selector))
|
||||
});
|
||||
for profile in profiles {
|
||||
lines.push(format!(
|
||||
"profile\0{}\0{}",
|
||||
profile_sort_key(&profile.selector),
|
||||
profile.label.unwrap_or_default()
|
||||
));
|
||||
}
|
||||
|
||||
let mut declarations = self.declarations.clone();
|
||||
declarations
|
||||
.sort_by(|left, right| declaration_sort_key(left).cmp(&declaration_sort_key(right)));
|
||||
for declaration in declarations {
|
||||
lines.push(format!(
|
||||
"declaration\0{}\0{}\0{}",
|
||||
declaration.kind.canonical_name(),
|
||||
declaration.name,
|
||||
declaration.reference
|
||||
));
|
||||
}
|
||||
|
||||
lines.sort();
|
||||
let mut hasher = Sha256::new();
|
||||
for line in lines {
|
||||
hasher.update(line.as_bytes());
|
||||
hasher.update(b"\n");
|
||||
}
|
||||
let digest = hasher.finalize();
|
||||
hex_digest(&digest)
|
||||
}
|
||||
|
||||
pub fn with_computed_digest(mut self) -> Self {
|
||||
self.metadata.digest = self.computed_digest();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn summary(&self) -> ConfigBundleSummary {
|
||||
ConfigBundleSummary {
|
||||
id: self.metadata.id.clone(),
|
||||
digest: self.metadata.digest.clone(),
|
||||
digest_algorithm: CONFIG_BUNDLE_DIGEST_ALGORITHM.to_string(),
|
||||
revision: self.metadata.revision.clone(),
|
||||
workspace_id: self.metadata.workspace_id.clone(),
|
||||
created_at: self.metadata.created_at.clone(),
|
||||
provenance: self.metadata.provenance.clone(),
|
||||
profile_count: self.profiles.len(),
|
||||
declaration_count: self.declarations.len(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains_profile(&self, selector: &ProfileSelector) -> bool {
|
||||
self.profiles
|
||||
.iter()
|
||||
.any(|profile| profile.selector == *selector)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigBundleMetadata {
|
||||
pub id: String,
|
||||
pub digest: String,
|
||||
pub revision: String,
|
||||
pub workspace_id: String,
|
||||
pub created_at: String,
|
||||
pub provenance: ConfigBundleProvenance,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigBundleProvenance {
|
||||
pub source: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub detail: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigProfileDescriptor {
|
||||
pub selector: ProfileSelector,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub label: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigDeclaration {
|
||||
pub kind: ConfigDeclarationKind,
|
||||
pub name: String,
|
||||
pub reference: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ConfigDeclarationKind {
|
||||
SecretRef,
|
||||
MountGrant,
|
||||
NetworkPolicy,
|
||||
ShellPolicy,
|
||||
GitPolicy,
|
||||
CapabilityGrant,
|
||||
Unsupported,
|
||||
}
|
||||
|
||||
impl ConfigDeclarationKind {
|
||||
pub fn canonical_name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::SecretRef => "secret_ref",
|
||||
Self::MountGrant => "mount_grant",
|
||||
Self::NetworkPolicy => "network_policy",
|
||||
Self::ShellPolicy => "shell_policy",
|
||||
Self::GitPolicy => "git_policy",
|
||||
Self::CapabilityGrant => "capability_grant",
|
||||
Self::Unsupported => "unsupported",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigBundleSummary {
|
||||
pub id: String,
|
||||
pub digest: String,
|
||||
pub digest_algorithm: String,
|
||||
pub revision: String,
|
||||
pub workspace_id: String,
|
||||
pub created_at: String,
|
||||
pub provenance: ConfigBundleProvenance,
|
||||
pub profile_count: usize,
|
||||
pub declaration_count: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ConfigBundleAvailability {
|
||||
pub reference: crate::catalog::ConfigBundleRef,
|
||||
pub summary: ConfigBundleSummary,
|
||||
}
|
||||
|
||||
pub(crate) fn validate_config_bundle(bundle: &ConfigBundle) -> Result<(), RuntimeError> {
|
||||
validate_non_empty("config bundle id", &bundle.metadata.id)?;
|
||||
validate_non_empty("config bundle digest", &bundle.metadata.digest)?;
|
||||
validate_non_empty("config bundle revision", &bundle.metadata.revision)?;
|
||||
validate_non_empty("config bundle workspace id", &bundle.metadata.workspace_id)?;
|
||||
validate_non_empty("config bundle created_at", &bundle.metadata.created_at)?;
|
||||
validate_non_empty(
|
||||
"config bundle provenance source",
|
||||
&bundle.metadata.provenance.source,
|
||||
)?;
|
||||
validate_boundary_text("config bundle id", &bundle.metadata.id)?;
|
||||
validate_boundary_text("config bundle revision", &bundle.metadata.revision)?;
|
||||
validate_boundary_text("config bundle workspace id", &bundle.metadata.workspace_id)?;
|
||||
validate_boundary_text(
|
||||
"config bundle provenance source",
|
||||
&bundle.metadata.provenance.source,
|
||||
)?;
|
||||
if let Some(detail) = &bundle.metadata.provenance.detail {
|
||||
validate_boundary_text("config bundle provenance detail", detail)?;
|
||||
}
|
||||
|
||||
let computed = bundle.computed_digest();
|
||||
if computed != bundle.metadata.digest {
|
||||
return Err(RuntimeError::ConfigBundleDigestMismatch {
|
||||
bundle_id: bundle.metadata.id.clone(),
|
||||
expected_digest: bundle.metadata.digest.clone(),
|
||||
actual_digest: computed,
|
||||
});
|
||||
}
|
||||
|
||||
for profile in &bundle.profiles {
|
||||
validate_profile_selector(profile.selector.clone(), Some(&bundle.metadata.id))?;
|
||||
if let Some(label) = &profile.label {
|
||||
validate_boundary_text("profile label", label)?;
|
||||
}
|
||||
}
|
||||
|
||||
for declaration in &bundle.declarations {
|
||||
validate_non_empty("config declaration name", &declaration.name)?;
|
||||
validate_non_empty("config declaration reference", &declaration.reference)?;
|
||||
validate_boundary_text("config declaration name", &declaration.name)?;
|
||||
validate_boundary_text("config declaration reference", &declaration.reference)?;
|
||||
if declaration.kind == ConfigDeclarationKind::Unsupported {
|
||||
return Err(RuntimeError::UnsupportedConfigDeclaration {
|
||||
bundle_id: bundle.metadata.id.clone(),
|
||||
declaration_kind: declaration.kind.canonical_name().to_string(),
|
||||
name: declaration.name.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn validate_profile_selector(
|
||||
selector: ProfileSelector,
|
||||
bundle_id: Option<&str>,
|
||||
) -> Result<(), RuntimeError> {
|
||||
match selector {
|
||||
ProfileSelector::RuntimeDefault => Ok(()),
|
||||
ProfileSelector::Builtin(value) | ProfileSelector::Named(value) => {
|
||||
if value.trim().is_empty() {
|
||||
Err(RuntimeError::InvalidProfileSelector {
|
||||
profile: value,
|
||||
bundle_id: bundle_id.map(ToOwned::to_owned),
|
||||
message: "profile selector must not be empty".to_string(),
|
||||
})
|
||||
} else {
|
||||
validate_boundary_text("profile selector", &value).map_err(|err| match err {
|
||||
RuntimeError::InvalidRequest(message) => RuntimeError::InvalidProfileSelector {
|
||||
profile: value,
|
||||
bundle_id: bundle_id.map(ToOwned::to_owned),
|
||||
message,
|
||||
},
|
||||
other => other,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_non_empty(label: &'static str, value: &str) -> Result<(), RuntimeError> {
|
||||
if value.trim().is_empty() {
|
||||
Err(RuntimeError::InvalidRequest(format!(
|
||||
"{label} must not be empty"
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_boundary_text(label: &'static str, value: &str) -> Result<(), RuntimeError> {
|
||||
let trimmed = value.trim();
|
||||
if trimmed.len() > 2048 {
|
||||
return Err(RuntimeError::InvalidRequest(format!(
|
||||
"{label} is too large"
|
||||
)));
|
||||
}
|
||||
if trimmed.chars().any(char::is_control) {
|
||||
return Err(RuntimeError::InvalidRequest(format!(
|
||||
"{label} must not contain control characters"
|
||||
)));
|
||||
}
|
||||
if Path::new(trimmed).is_absolute()
|
||||
|| trimmed.starts_with('~')
|
||||
|| trimmed.contains("/.cache")
|
||||
|| trimmed.contains("\\.cache")
|
||||
|| trimmed.contains("/run/")
|
||||
|| trimmed.contains("\\run\\")
|
||||
|| trimmed.contains(".sock")
|
||||
|| trimmed.contains("socket=")
|
||||
|| trimmed.contains("session_path")
|
||||
|| trimmed.contains("cache_path")
|
||||
{
|
||||
return Err(RuntimeError::InvalidRequest(format!(
|
||||
"{label} must be a stable ref/grant/policy declaration, not a host-local path"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn declaration_sort_key(declaration: &ConfigDeclaration) -> String {
|
||||
format!(
|
||||
"{}\0{}\0{}",
|
||||
declaration.kind.canonical_name(),
|
||||
declaration.name,
|
||||
declaration.reference
|
||||
)
|
||||
}
|
||||
|
||||
fn profile_sort_key(selector: &ProfileSelector) -> String {
|
||||
match selector {
|
||||
ProfileSelector::RuntimeDefault => "runtime_default".to_string(),
|
||||
ProfileSelector::Builtin(value) => format!("builtin\0{value}"),
|
||||
ProfileSelector::Named(value) => format!("named\0{value}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn hex_digest(bytes: &[u8]) -> String {
|
||||
let mut out = String::with_capacity(bytes.len() * 2);
|
||||
for byte in bytes {
|
||||
out.push_str(&format!("{byte:02x}"));
|
||||
}
|
||||
out
|
||||
}
|
||||
|
|
@ -34,6 +34,34 @@ pub enum RuntimeError {
|
|||
#[error("invalid request: {0}")]
|
||||
InvalidRequest(String),
|
||||
|
||||
#[error("config bundle `{bundle_id}` was not found")]
|
||||
ConfigBundleMissing { bundle_id: String },
|
||||
|
||||
#[error(
|
||||
"config bundle `{bundle_id}` digest mismatch: expected {expected_digest}, got {actual_digest}"
|
||||
)]
|
||||
ConfigBundleDigestMismatch {
|
||||
bundle_id: String,
|
||||
expected_digest: String,
|
||||
actual_digest: String,
|
||||
},
|
||||
|
||||
#[error("invalid profile selector `{profile}` for config bundle {bundle_id:?}: {message}")]
|
||||
InvalidProfileSelector {
|
||||
profile: String,
|
||||
bundle_id: Option<String>,
|
||||
message: String,
|
||||
},
|
||||
|
||||
#[error(
|
||||
"config bundle `{bundle_id}` contains unsupported declaration `{declaration_kind}` named `{name}`"
|
||||
)]
|
||||
UnsupportedConfigDeclaration {
|
||||
bundle_id: String,
|
||||
declaration_kind: String,
|
||||
name: String,
|
||||
},
|
||||
|
||||
#[error("runtime store {operation} failed at {}: {source}", path.display())]
|
||||
StoreIo {
|
||||
operation: &'static str,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use crate::catalog::{CreateWorkerRequest, WorkerStatus};
|
||||
use crate::config_bundle::ConfigBundle;
|
||||
use crate::diagnostics::RuntimeDiagnostic;
|
||||
use crate::error::RuntimeError;
|
||||
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
|
||||
|
|
@ -364,6 +365,7 @@ pub(crate) struct PersistedRuntimeState {
|
|||
pub(crate) next_event_id: u64,
|
||||
pub(crate) next_diagnostic_id: u64,
|
||||
pub(crate) workers: BTreeMap<WorkerId, PersistedWorkerRecord>,
|
||||
pub(crate) config_bundles: BTreeMap<String, ConfigBundle>,
|
||||
pub(crate) events: Vec<RuntimeEvent>,
|
||||
pub(crate) diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
|
@ -390,6 +392,8 @@ struct RuntimeSnapshot {
|
|||
next_worker_sequence: u64,
|
||||
next_event_id: u64,
|
||||
next_diagnostic_id: u64,
|
||||
#[serde(default)]
|
||||
config_bundles: BTreeMap<String, ConfigBundle>,
|
||||
diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
|
|
@ -405,6 +409,7 @@ impl RuntimeSnapshot {
|
|||
next_worker_sequence: state.next_worker_sequence,
|
||||
next_event_id: state.next_event_id,
|
||||
next_diagnostic_id: state.next_diagnostic_id,
|
||||
config_bundles: state.config_bundles.clone(),
|
||||
diagnostics: state.diagnostics.clone(),
|
||||
}
|
||||
}
|
||||
|
|
@ -454,6 +459,7 @@ impl RuntimeSnapshot {
|
|||
next_event_id: self.next_event_id,
|
||||
next_diagnostic_id: self.next_diagnostic_id,
|
||||
workers,
|
||||
config_bundles: self.config_bundles,
|
||||
events,
|
||||
diagnostics: self.diagnostics,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,10 @@
|
|||
//! credentials, registration, and policy.
|
||||
|
||||
use crate::Runtime;
|
||||
use crate::catalog::{CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerSummary};
|
||||
use crate::catalog::{
|
||||
ConfigBundleRef, CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerSummary,
|
||||
};
|
||||
use crate::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary};
|
||||
use crate::error::RuntimeError;
|
||||
#[cfg(feature = "fs-store")]
|
||||
use crate::fs_store::FsRuntimeStoreOptions;
|
||||
|
|
@ -165,6 +168,14 @@ pub fn runtime_http_router(runtime: Runtime, local_token: Option<String>) -> Rou
|
|||
|
||||
let router = Router::new()
|
||||
.route("/v1/runtime", get(get_runtime))
|
||||
.route(
|
||||
"/v1/config-bundles",
|
||||
get(list_config_bundles).post(store_config_bundle),
|
||||
)
|
||||
.route(
|
||||
"/v1/config-bundles/{bundle_id}/availability",
|
||||
get(check_config_bundle),
|
||||
)
|
||||
.route("/v1/workers", get(list_workers).post(create_worker))
|
||||
.route("/v1/workers/{worker_id}", get(get_worker))
|
||||
.route("/v1/workers/{worker_id}/input", post(send_worker_input))
|
||||
|
|
@ -216,6 +227,29 @@ pub struct RuntimeHttpSummaryResponse {
|
|||
pub runtime: RuntimeSummary,
|
||||
}
|
||||
|
||||
/// `GET /v1/config-bundles` response.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RuntimeHttpConfigBundlesResponse {
|
||||
pub bundles: Vec<ConfigBundleSummary>,
|
||||
}
|
||||
|
||||
/// `POST /v1/config-bundles` request.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RuntimeHttpConfigBundleSyncRequest {
|
||||
pub bundle: ConfigBundle,
|
||||
}
|
||||
|
||||
/// Config bundle availability response used by sync/check endpoints.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RuntimeHttpConfigBundleAvailabilityResponse {
|
||||
pub availability: ConfigBundleAvailability,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
struct RuntimeHttpConfigBundleAvailabilityQuery {
|
||||
digest: String,
|
||||
}
|
||||
|
||||
/// `GET /v1/workers` response.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RuntimeHttpWorkersResponse {
|
||||
|
|
@ -372,6 +406,48 @@ async fn get_runtime(
|
|||
Ok(Json(RuntimeHttpSummaryResponse { runtime }))
|
||||
}
|
||||
|
||||
async fn list_config_bundles(
|
||||
State(state): State<RuntimeHttpState>,
|
||||
) -> RestResult<RuntimeHttpConfigBundlesResponse> {
|
||||
let bundles = state
|
||||
.runtime
|
||||
.list_config_bundles()
|
||||
.map_err(RuntimeHttpRestError::runtime)?;
|
||||
Ok(Json(RuntimeHttpConfigBundlesResponse { bundles }))
|
||||
}
|
||||
|
||||
async fn store_config_bundle(
|
||||
State(state): State<RuntimeHttpState>,
|
||||
body: Result<Json<RuntimeHttpConfigBundleSyncRequest>, JsonRejection>,
|
||||
) -> RestResult<RuntimeHttpConfigBundleAvailabilityResponse> {
|
||||
let Json(request) = body.map_err(RuntimeHttpRestError::json_rejection)?;
|
||||
let availability = state
|
||||
.runtime
|
||||
.store_config_bundle(request.bundle)
|
||||
.map_err(RuntimeHttpRestError::runtime)?;
|
||||
Ok(Json(RuntimeHttpConfigBundleAvailabilityResponse {
|
||||
availability,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn check_config_bundle(
|
||||
State(state): State<RuntimeHttpState>,
|
||||
Path(bundle_id): Path<String>,
|
||||
query: Result<Query<RuntimeHttpConfigBundleAvailabilityQuery>, QueryRejection>,
|
||||
) -> RestResult<RuntimeHttpConfigBundleAvailabilityResponse> {
|
||||
let Query(query) = query.map_err(RuntimeHttpRestError::query_rejection)?;
|
||||
let availability = state
|
||||
.runtime
|
||||
.check_config_bundle(&ConfigBundleRef {
|
||||
id: bundle_id,
|
||||
digest: query.digest,
|
||||
})
|
||||
.map_err(RuntimeHttpRestError::runtime)?;
|
||||
Ok(Json(RuntimeHttpConfigBundleAvailabilityResponse {
|
||||
availability,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn list_workers(
|
||||
State(state): State<RuntimeHttpState>,
|
||||
) -> RestResult<RuntimeHttpWorkersResponse> {
|
||||
|
|
@ -743,10 +819,15 @@ impl IntoResponse for RuntimeHttpRestError {
|
|||
|
||||
fn status_for_runtime_error(error: &RuntimeError) -> StatusCode {
|
||||
match error {
|
||||
RuntimeError::WorkerNotFound { .. } => StatusCode::NOT_FOUND,
|
||||
RuntimeError::WorkerNotFound { .. } | RuntimeError::ConfigBundleMissing { .. } => {
|
||||
StatusCode::NOT_FOUND
|
||||
}
|
||||
RuntimeError::RuntimeStopped { .. } => StatusCode::CONFLICT,
|
||||
RuntimeError::LimitTooLarge { .. }
|
||||
| RuntimeError::InvalidRequest(_)
|
||||
| RuntimeError::ConfigBundleDigestMismatch { .. }
|
||||
| RuntimeError::InvalidProfileSelector { .. }
|
||||
| RuntimeError::UnsupportedConfigDeclaration { .. }
|
||||
| RuntimeError::WrongRuntime { .. }
|
||||
| RuntimeError::WrongRuntimeCursor { .. } => StatusCode::BAD_REQUEST,
|
||||
RuntimeError::StoreIo { .. }
|
||||
|
|
@ -764,6 +845,10 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str {
|
|||
RuntimeError::WorkerNotFound { .. } => "worker_not_found",
|
||||
RuntimeError::LimitTooLarge { .. } => "limit_too_large",
|
||||
RuntimeError::InvalidRequest(_) => "invalid_request",
|
||||
RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing",
|
||||
RuntimeError::ConfigBundleDigestMismatch { .. } => "config_bundle_digest_mismatch",
|
||||
RuntimeError::InvalidProfileSelector { .. } => "invalid_profile_selector",
|
||||
RuntimeError::UnsupportedConfigDeclaration { .. } => "unsupported_config_declaration",
|
||||
RuntimeError::StoreIo { .. } => "store_io",
|
||||
RuntimeError::StoreMissing { .. } => "store_missing",
|
||||
RuntimeError::StoreCorrupt { .. } => "store_corrupt",
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
//! can later adapt into registries or backend APIs.
|
||||
|
||||
pub mod catalog;
|
||||
pub mod config_bundle;
|
||||
pub mod diagnostics;
|
||||
pub mod error;
|
||||
#[cfg(feature = "fs-store")]
|
||||
|
|
|
|||
|
|
@ -1,5 +1,10 @@
|
|||
use crate::catalog::{
|
||||
CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerStatus, WorkerSummary,
|
||||
ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail, WorkerLifecycleAck,
|
||||
WorkerStatus, WorkerSummary,
|
||||
};
|
||||
use crate::config_bundle::{
|
||||
ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle,
|
||||
validate_profile_selector,
|
||||
};
|
||||
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic};
|
||||
use crate::error::RuntimeError;
|
||||
|
|
@ -128,6 +133,45 @@ impl Runtime {
|
|||
Ok(self.lock()?.status)
|
||||
}
|
||||
|
||||
/// Store a backend-synced Profile/config bundle for later Worker creation.
|
||||
pub fn store_config_bundle(
|
||||
&self,
|
||||
bundle: ConfigBundle,
|
||||
) -> Result<ConfigBundleAvailability, RuntimeError> {
|
||||
validate_config_bundle(&bundle)?;
|
||||
let mut state = self.lock()?;
|
||||
state.ensure_running()?;
|
||||
let reference = ConfigBundleRef {
|
||||
id: bundle.metadata.id.clone(),
|
||||
digest: bundle.metadata.digest.clone(),
|
||||
};
|
||||
let summary = bundle.summary();
|
||||
state
|
||||
.config_bundles
|
||||
.insert(bundle.metadata.id.clone(), bundle);
|
||||
state.persist_runtime_snapshot()?;
|
||||
Ok(ConfigBundleAvailability { reference, summary })
|
||||
}
|
||||
|
||||
/// List synced config bundles known to this Runtime.
|
||||
pub fn list_config_bundles(&self) -> Result<Vec<ConfigBundleSummary>, RuntimeError> {
|
||||
Ok(self
|
||||
.lock()?
|
||||
.config_bundles
|
||||
.values()
|
||||
.map(ConfigBundle::summary)
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Validate that a config bundle reference is present and digest-matched.
|
||||
pub fn check_config_bundle(
|
||||
&self,
|
||||
reference: &ConfigBundleRef,
|
||||
) -> Result<ConfigBundleAvailability, RuntimeError> {
|
||||
let state = self.lock()?;
|
||||
state.check_config_bundle_ref(reference)
|
||||
}
|
||||
|
||||
/// Stop the Runtime. v0 keeps data readable after stop, but rejects new
|
||||
/// create/send/worker lifecycle mutations.
|
||||
pub fn stop_runtime(&self) -> Result<u64, RuntimeError> {
|
||||
|
|
@ -161,6 +205,7 @@ impl Runtime {
|
|||
let mut state = self.lock()?;
|
||||
state.ensure_running()?;
|
||||
validate_create_worker_request(&request)?;
|
||||
state.validate_worker_config_boundary(&request)?;
|
||||
|
||||
let worker_id = WorkerId::generated(state.next_worker_sequence);
|
||||
state.next_worker_sequence += 1;
|
||||
|
|
@ -551,6 +596,7 @@ struct RuntimeState {
|
|||
next_event_id: u64,
|
||||
next_diagnostic_id: u64,
|
||||
workers: BTreeMap<WorkerId, WorkerRecord>,
|
||||
config_bundles: BTreeMap<String, ConfigBundle>,
|
||||
events: Vec<RuntimeEvent>,
|
||||
diagnostics: Vec<RuntimeDiagnostic>,
|
||||
#[cfg(feature = "ws-server")]
|
||||
|
|
@ -574,6 +620,7 @@ impl RuntimeState {
|
|||
next_event_id: 1,
|
||||
next_diagnostic_id: 1,
|
||||
workers: BTreeMap::new(),
|
||||
config_bundles: BTreeMap::new(),
|
||||
events: Vec::new(),
|
||||
diagnostics: Vec::new(),
|
||||
#[cfg(feature = "ws-server")]
|
||||
|
|
@ -603,6 +650,7 @@ impl RuntimeState {
|
|||
next_event_id: 1,
|
||||
next_diagnostic_id: 1,
|
||||
workers: BTreeMap::new(),
|
||||
config_bundles: BTreeMap::new(),
|
||||
events: Vec::new(),
|
||||
diagnostics: Vec::new(),
|
||||
#[cfg(feature = "ws-server")]
|
||||
|
|
@ -658,6 +706,7 @@ impl RuntimeState {
|
|||
next_event_id: persisted.next_event_id,
|
||||
next_diagnostic_id: persisted.next_diagnostic_id,
|
||||
workers,
|
||||
config_bundles: persisted.config_bundles,
|
||||
events: persisted.events,
|
||||
diagnostics: persisted.diagnostics,
|
||||
})
|
||||
|
|
@ -678,6 +727,7 @@ impl RuntimeState {
|
|||
.iter()
|
||||
.map(|(worker_id, worker)| (worker_id.clone(), worker.persisted_record()))
|
||||
.collect(),
|
||||
config_bundles: self.config_bundles.clone(),
|
||||
events: self.events.clone(),
|
||||
diagnostics: self.diagnostics.clone(),
|
||||
}
|
||||
|
|
@ -810,6 +860,69 @@ impl RuntimeState {
|
|||
}
|
||||
}
|
||||
|
||||
fn check_config_bundle_ref(
|
||||
&self,
|
||||
reference: &ConfigBundleRef,
|
||||
) -> Result<ConfigBundleAvailability, RuntimeError> {
|
||||
if reference.id.trim().is_empty() || reference.digest.trim().is_empty() {
|
||||
return Err(RuntimeError::InvalidRequest(
|
||||
"config bundle reference id and digest must not be empty".to_string(),
|
||||
));
|
||||
}
|
||||
let bundle = self.config_bundles.get(&reference.id).ok_or_else(|| {
|
||||
RuntimeError::ConfigBundleMissing {
|
||||
bundle_id: reference.id.clone(),
|
||||
}
|
||||
})?;
|
||||
if bundle.metadata.digest != reference.digest {
|
||||
return Err(RuntimeError::ConfigBundleDigestMismatch {
|
||||
bundle_id: reference.id.clone(),
|
||||
expected_digest: reference.digest.clone(),
|
||||
actual_digest: bundle.metadata.digest.clone(),
|
||||
});
|
||||
}
|
||||
Ok(ConfigBundleAvailability {
|
||||
reference: reference.clone(),
|
||||
summary: bundle.summary(),
|
||||
})
|
||||
}
|
||||
|
||||
fn validate_worker_config_boundary(
|
||||
&self,
|
||||
request: &CreateWorkerRequest,
|
||||
) -> Result<(), RuntimeError> {
|
||||
match &request.config_bundle {
|
||||
Some(reference) => {
|
||||
let availability = self.check_config_bundle_ref(reference)?;
|
||||
let bundle = self
|
||||
.config_bundles
|
||||
.get(&availability.reference.id)
|
||||
.ok_or_else(|| RuntimeError::ConfigBundleMissing {
|
||||
bundle_id: availability.reference.id.clone(),
|
||||
})?;
|
||||
if !bundle.contains_profile(&request.profile) {
|
||||
return Err(RuntimeError::InvalidProfileSelector {
|
||||
profile: profile_label(&request.profile),
|
||||
bundle_id: Some(reference.id.clone()),
|
||||
message: "profile selector is not declared by synced config bundle"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
None => match &request.profile {
|
||||
ProfileSelector::RuntimeDefault | ProfileSelector::Builtin(_) => {
|
||||
validate_profile_selector(request.profile.clone(), None)
|
||||
}
|
||||
ProfileSelector::Named(_) => Err(RuntimeError::InvalidProfileSelector {
|
||||
profile: profile_label(&request.profile),
|
||||
bundle_id: None,
|
||||
message: "named profiles require a synced config bundle reference".to_string(),
|
||||
}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
|
||||
if worker_ref.runtime_id != self.runtime_id {
|
||||
return Err(RuntimeError::WrongRuntime {
|
||||
|
|
@ -1012,6 +1125,14 @@ impl WorkerRecord {
|
|||
}
|
||||
}
|
||||
|
||||
fn profile_label(selector: &ProfileSelector) -> String {
|
||||
match selector {
|
||||
ProfileSelector::RuntimeDefault => "runtime_default".to_string(),
|
||||
ProfileSelector::Builtin(value) => value.clone(),
|
||||
ProfileSelector::Named(value) => value.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> {
|
||||
if let crate::catalog::WorkerIntent::Task { objective } = &request.intent {
|
||||
if objective.trim().is_empty() {
|
||||
|
|
@ -1043,6 +1164,10 @@ fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent};
|
||||
use crate::config_bundle::{
|
||||
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration,
|
||||
ConfigDeclarationKind, ConfigProfileDescriptor,
|
||||
};
|
||||
use crate::management::RuntimeLimits;
|
||||
|
||||
fn task_request(objective: &str) -> CreateWorkerRequest {
|
||||
|
|
@ -1051,15 +1176,48 @@ mod tests {
|
|||
objective: objective.to_string(),
|
||||
},
|
||||
profile: ProfileSelector::Builtin("builtin:coder".to_string()),
|
||||
config_bundle: Some(ConfigBundleRef {
|
||||
id: "bundle-1".to_string(),
|
||||
}),
|
||||
config_bundle: None,
|
||||
requested_capabilities: vec![CapabilityRequest::named("read")],
|
||||
workspace_refs: Vec::new(),
|
||||
mount_refs: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn test_bundle() -> ConfigBundle {
|
||||
ConfigBundle {
|
||||
metadata: ConfigBundleMetadata {
|
||||
id: "bundle-1".to_string(),
|
||||
digest: String::new(),
|
||||
revision: "rev-1".to_string(),
|
||||
workspace_id: "workspace-1".to_string(),
|
||||
created_at: "2026-06-26T00:00:00Z".to_string(),
|
||||
provenance: ConfigBundleProvenance {
|
||||
source: "workspace-backend".to_string(),
|
||||
detail: Some("profile-sync".to_string()),
|
||||
},
|
||||
},
|
||||
profiles: vec![ConfigProfileDescriptor {
|
||||
selector: ProfileSelector::Builtin("builtin:coder".to_string()),
|
||||
label: Some("Coder".to_string()),
|
||||
}],
|
||||
declarations: vec![ConfigDeclaration {
|
||||
kind: ConfigDeclarationKind::CapabilityGrant,
|
||||
name: "read".to_string(),
|
||||
reference: "capability:read".to_string(),
|
||||
}],
|
||||
}
|
||||
.with_computed_digest()
|
||||
}
|
||||
|
||||
fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest {
|
||||
let mut request = task_request(objective);
|
||||
request.config_bundle = Some(ConfigBundleRef {
|
||||
id: bundle.metadata.id.clone(),
|
||||
digest: bundle.metadata.digest.clone(),
|
||||
});
|
||||
request
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_list_and_detail_preserve_runtime_worker_authority() {
|
||||
let runtime = Runtime::new_memory();
|
||||
|
|
@ -1067,7 +1225,7 @@ mod tests {
|
|||
|
||||
assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap());
|
||||
assert_eq!(detail.status, WorkerStatus::Running);
|
||||
assert!(detail.config_bundle.is_some());
|
||||
assert!(detail.config_bundle.is_none());
|
||||
|
||||
let list = runtime.list_workers().unwrap();
|
||||
assert_eq!(list.len(), 1);
|
||||
|
|
@ -1079,6 +1237,73 @@ mod tests {
|
|||
assert_eq!(fetched.intent, detail.intent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() {
|
||||
let runtime = Runtime::new_memory();
|
||||
let bundle = test_bundle();
|
||||
let availability = runtime.store_config_bundle(bundle.clone()).unwrap();
|
||||
assert_eq!(availability.reference.id, "bundle-1");
|
||||
assert_eq!(availability.reference.digest, bundle.metadata.digest);
|
||||
|
||||
let listed = runtime.list_config_bundles().unwrap();
|
||||
assert_eq!(listed.len(), 1);
|
||||
assert_eq!(listed[0].id, "bundle-1");
|
||||
|
||||
let checked = runtime
|
||||
.check_config_bundle(&availability.reference)
|
||||
.unwrap();
|
||||
assert_eq!(checked.summary.digest, availability.summary.digest);
|
||||
|
||||
let detail = runtime
|
||||
.create_worker(bundled_task_request("synced", &bundle))
|
||||
.unwrap();
|
||||
assert_eq!(detail.config_bundle, Some(availability.reference));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_bundle_errors_are_typed() {
|
||||
let runtime = Runtime::new_memory();
|
||||
let bundle = test_bundle();
|
||||
|
||||
let missing = runtime
|
||||
.create_worker(bundled_task_request("missing", &bundle))
|
||||
.unwrap_err();
|
||||
assert!(matches!(missing, RuntimeError::ConfigBundleMissing { .. }));
|
||||
|
||||
runtime.store_config_bundle(bundle.clone()).unwrap();
|
||||
let mismatch = runtime
|
||||
.check_config_bundle(&ConfigBundleRef {
|
||||
id: bundle.metadata.id.clone(),
|
||||
digest: "bad-digest".to_string(),
|
||||
})
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
mismatch,
|
||||
RuntimeError::ConfigBundleDigestMismatch { .. }
|
||||
));
|
||||
|
||||
let mut bad_profile = bundled_task_request("bad profile", &bundle);
|
||||
bad_profile.profile = ProfileSelector::Builtin("builtin:reviewer".to_string());
|
||||
let invalid_profile = runtime.create_worker(bad_profile).unwrap_err();
|
||||
assert!(matches!(
|
||||
invalid_profile,
|
||||
RuntimeError::InvalidProfileSelector { .. }
|
||||
));
|
||||
|
||||
let mut unsupported = test_bundle();
|
||||
unsupported.declarations.push(ConfigDeclaration {
|
||||
kind: ConfigDeclarationKind::Unsupported,
|
||||
name: "plugin-registry".to_string(),
|
||||
reference: "plugin-registry:v0".to_string(),
|
||||
});
|
||||
unsupported = unsupported.with_computed_digest();
|
||||
let unsupported_err = runtime.store_config_bundle(unsupported).unwrap_err();
|
||||
assert!(matches!(
|
||||
unsupported_err,
|
||||
RuntimeError::UnsupportedConfigDeclaration { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_worker_refs_from_another_runtime() {
|
||||
let runtime_a = Runtime::new_memory();
|
||||
|
|
|
|||
|
|
@ -16,11 +16,13 @@ use std::{
|
|||
time::Duration,
|
||||
};
|
||||
use worker_runtime::catalog::{
|
||||
CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, WorkerIntent,
|
||||
WorkerStatus as EmbeddedWorkerStatus,
|
||||
CapabilityRequest, ConfigBundleRef, CreateWorkerRequest, ProfileSelector,
|
||||
WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus,
|
||||
};
|
||||
use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary};
|
||||
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
|
||||
use worker_runtime::http_server::{
|
||||
RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest,
|
||||
RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse,
|
||||
RuntimeHttpWorkerInputResponse, RuntimeHttpWorkerLifecycleRequest,
|
||||
RuntimeHttpWorkerLifecycleResponse, RuntimeHttpWorkerResponse, RuntimeHttpWorkersResponse,
|
||||
|
|
@ -261,16 +263,24 @@ pub struct WorkerLookupResult {
|
|||
|
||||
/// Browser-safe worker spawn request shape.
|
||||
///
|
||||
/// The request intentionally carries only workspace policy intents and stable
|
||||
/// worker identifiers. Raw workspace roots, child cwd, executable path, and raw
|
||||
/// profile selectors are resolved by the runtime service and never accepted from
|
||||
/// Workspace API callers.
|
||||
/// The request intentionally carries only workspace policy intents, stable
|
||||
/// worker identifiers, optional profile selectors, config bundle refs, and
|
||||
/// requested capability names. Raw workspace roots, child cwd, executable path,
|
||||
/// Runtime endpoints/credentials, raw bundle storage paths, and host-local
|
||||
/// resolved WorkerSpec content are resolved by the runtime service and never
|
||||
/// accepted from Workspace API callers.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct WorkerSpawnRequest {
|
||||
pub intent: WorkerSpawnIntent,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub requested_worker_name: Option<String>,
|
||||
pub acceptance: WorkerSpawnAcceptanceRequirement,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub profile: Option<ProfileSelector>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub config_bundle: Option<ConfigBundleRef>,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub requested_capabilities: Vec<CapabilityRequest>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
|
@ -309,6 +319,28 @@ pub struct WorkerSpawnResult {
|
|||
pub diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct ConfigBundleSyncResult {
|
||||
pub state: WorkerOperationState,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub availability: Option<ConfigBundleAvailability>,
|
||||
pub diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct ConfigBundleCheckResult {
|
||||
pub state: WorkerOperationState,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub availability: Option<ConfigBundleAvailability>,
|
||||
pub diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct ConfigBundleListResult {
|
||||
pub bundles: Vec<ConfigBundleSummary>,
|
||||
pub diagnostics: Vec<RuntimeDiagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum WorkerOperationState {
|
||||
|
|
@ -492,6 +524,41 @@ pub trait WorkspaceWorkerRuntime: Send + Sync {
|
|||
}
|
||||
}
|
||||
|
||||
fn sync_config_bundle(&self, _bundle: ConfigBundle) -> ConfigBundleSyncResult {
|
||||
ConfigBundleSyncResult {
|
||||
state: WorkerOperationState::Unsupported,
|
||||
availability: None,
|
||||
diagnostics: vec![diagnostic(
|
||||
"config_bundle_sync_unsupported",
|
||||
DiagnosticSeverity::Info,
|
||||
"runtime does not implement config bundle sync".to_string(),
|
||||
)],
|
||||
}
|
||||
}
|
||||
|
||||
fn check_config_bundle(&self, _reference: ConfigBundleRef) -> ConfigBundleCheckResult {
|
||||
ConfigBundleCheckResult {
|
||||
state: WorkerOperationState::Unsupported,
|
||||
availability: None,
|
||||
diagnostics: vec![diagnostic(
|
||||
"config_bundle_check_unsupported",
|
||||
DiagnosticSeverity::Info,
|
||||
"runtime does not implement config bundle availability checks".to_string(),
|
||||
)],
|
||||
}
|
||||
}
|
||||
|
||||
fn list_config_bundles(&self) -> ConfigBundleListResult {
|
||||
ConfigBundleListResult {
|
||||
bundles: Vec::new(),
|
||||
diagnostics: vec![diagnostic(
|
||||
"config_bundle_list_unsupported",
|
||||
DiagnosticSeverity::Info,
|
||||
"runtime does not implement config bundle listing".to_string(),
|
||||
)],
|
||||
}
|
||||
}
|
||||
|
||||
fn stop_worker(
|
||||
&self,
|
||||
worker_id: &str,
|
||||
|
|
@ -729,6 +796,35 @@ impl RuntimeRegistry {
|
|||
Ok(runtime.spawn_worker(request))
|
||||
}
|
||||
|
||||
pub fn sync_config_bundle(
|
||||
&self,
|
||||
runtime_id: &str,
|
||||
bundle: ConfigBundle,
|
||||
) -> Result<ConfigBundleSyncResult, RuntimeRegistryError> {
|
||||
validate_backend_identifier("runtime_id", runtime_id)?;
|
||||
let runtime = self.runtime(runtime_id)?;
|
||||
Ok(runtime.sync_config_bundle(bundle))
|
||||
}
|
||||
|
||||
pub fn check_config_bundle(
|
||||
&self,
|
||||
runtime_id: &str,
|
||||
reference: ConfigBundleRef,
|
||||
) -> Result<ConfigBundleCheckResult, RuntimeRegistryError> {
|
||||
validate_backend_identifier("runtime_id", runtime_id)?;
|
||||
let runtime = self.runtime(runtime_id)?;
|
||||
Ok(runtime.check_config_bundle(reference))
|
||||
}
|
||||
|
||||
pub fn list_config_bundles(
|
||||
&self,
|
||||
runtime_id: &str,
|
||||
) -> Result<ConfigBundleListResult, RuntimeRegistryError> {
|
||||
validate_backend_identifier("runtime_id", runtime_id)?;
|
||||
let runtime = self.runtime(runtime_id)?;
|
||||
Ok(runtime.list_config_bundles())
|
||||
}
|
||||
|
||||
pub fn send_input(
|
||||
&self,
|
||||
runtime_id: &str,
|
||||
|
|
@ -1091,10 +1187,21 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
|
|||
));
|
||||
}
|
||||
|
||||
let create_request = CreateWorkerRequest::tools_less(
|
||||
embedded_create_intent(&request.intent),
|
||||
embedded_profile_selector(&request.intent),
|
||||
);
|
||||
let create_request = CreateWorkerRequest {
|
||||
intent: embedded_create_intent(&request.intent),
|
||||
profile: request
|
||||
.profile
|
||||
.clone()
|
||||
.unwrap_or_else(|| embedded_profile_selector(&request.intent)),
|
||||
config_bundle: request.config_bundle.clone(),
|
||||
requested_capabilities: if request.requested_capabilities.is_empty() {
|
||||
vec![CapabilityRequest::named("read")]
|
||||
} else {
|
||||
request.requested_capabilities.clone()
|
||||
},
|
||||
workspace_refs: Vec::new(),
|
||||
mount_refs: Vec::new(),
|
||||
};
|
||||
match self.runtime.create_worker(create_request) {
|
||||
Ok(detail) => WorkerSpawnResult {
|
||||
state: WorkerOperationState::Accepted,
|
||||
|
|
@ -1126,6 +1233,49 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
|
|||
}
|
||||
}
|
||||
|
||||
fn sync_config_bundle(&self, bundle: ConfigBundle) -> ConfigBundleSyncResult {
|
||||
match self.runtime.store_config_bundle(bundle) {
|
||||
Ok(availability) => ConfigBundleSyncResult {
|
||||
state: WorkerOperationState::Accepted,
|
||||
availability: Some(availability),
|
||||
diagnostics: Vec::new(),
|
||||
},
|
||||
Err(error) => ConfigBundleSyncResult {
|
||||
state: WorkerOperationState::Rejected,
|
||||
availability: None,
|
||||
diagnostics: vec![embedded_runtime_diagnostic(&error)],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn check_config_bundle(&self, reference: ConfigBundleRef) -> ConfigBundleCheckResult {
|
||||
match self.runtime.check_config_bundle(&reference) {
|
||||
Ok(availability) => ConfigBundleCheckResult {
|
||||
state: WorkerOperationState::Accepted,
|
||||
availability: Some(availability),
|
||||
diagnostics: Vec::new(),
|
||||
},
|
||||
Err(error) => ConfigBundleCheckResult {
|
||||
state: WorkerOperationState::Rejected,
|
||||
availability: None,
|
||||
diagnostics: vec![embedded_runtime_diagnostic(&error)],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn list_config_bundles(&self) -> ConfigBundleListResult {
|
||||
match self.runtime.list_config_bundles() {
|
||||
Ok(bundles) => ConfigBundleListResult {
|
||||
bundles,
|
||||
diagnostics: Vec::new(),
|
||||
},
|
||||
Err(error) => ConfigBundleListResult {
|
||||
bundles: Vec::new(),
|
||||
diagnostics: vec![embedded_runtime_diagnostic(&error)],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult {
|
||||
let Some(worker_ref) = self.worker_ref(worker_id) else {
|
||||
return embedded_input_rejected(
|
||||
|
|
@ -1574,10 +1724,21 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime {
|
|||
)],
|
||||
};
|
||||
}
|
||||
let create = CreateWorkerRequest::tools_less(
|
||||
embedded_create_intent(&request.intent),
|
||||
embedded_profile_selector(&request.intent),
|
||||
);
|
||||
let create = CreateWorkerRequest {
|
||||
intent: embedded_create_intent(&request.intent),
|
||||
profile: request
|
||||
.profile
|
||||
.clone()
|
||||
.unwrap_or_else(|| embedded_profile_selector(&request.intent)),
|
||||
config_bundle: request.config_bundle.clone(),
|
||||
requested_capabilities: if request.requested_capabilities.is_empty() {
|
||||
vec![CapabilityRequest::named("read")]
|
||||
} else {
|
||||
request.requested_capabilities.clone()
|
||||
},
|
||||
workspace_refs: Vec::new(),
|
||||
mount_refs: Vec::new(),
|
||||
};
|
||||
match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) {
|
||||
Ok(response) => WorkerSpawnResult {
|
||||
state: WorkerOperationState::Accepted,
|
||||
|
|
@ -1601,6 +1762,44 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime {
|
|||
}
|
||||
}
|
||||
|
||||
fn sync_config_bundle(&self, bundle: ConfigBundle) -> ConfigBundleSyncResult {
|
||||
let request = RuntimeHttpConfigBundleSyncRequest { bundle };
|
||||
match self.post_json::<_, RuntimeHttpConfigBundleAvailabilityResponse>(
|
||||
"/v1/config-bundles",
|
||||
&request,
|
||||
) {
|
||||
Ok(response) => ConfigBundleSyncResult {
|
||||
state: WorkerOperationState::Accepted,
|
||||
availability: Some(response.availability),
|
||||
diagnostics: Vec::new(),
|
||||
},
|
||||
Err(diagnostic) => ConfigBundleSyncResult {
|
||||
state: WorkerOperationState::Rejected,
|
||||
availability: None,
|
||||
diagnostics: vec![diagnostic],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn check_config_bundle(&self, reference: ConfigBundleRef) -> ConfigBundleCheckResult {
|
||||
let path = format!(
|
||||
"/v1/config-bundles/{}/availability?digest={}",
|
||||
reference.id, reference.digest
|
||||
);
|
||||
match self.get_json::<RuntimeHttpConfigBundleAvailabilityResponse>(&path) {
|
||||
Ok(response) => ConfigBundleCheckResult {
|
||||
state: WorkerOperationState::Accepted,
|
||||
availability: Some(response.availability),
|
||||
diagnostics: Vec::new(),
|
||||
},
|
||||
Err(diagnostic) => ConfigBundleCheckResult {
|
||||
state: WorkerOperationState::Rejected,
|
||||
availability: None,
|
||||
diagnostics: vec![diagnostic],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn stop_worker(
|
||||
&self,
|
||||
worker_id: &str,
|
||||
|
|
@ -2197,7 +2396,11 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti
|
|||
DiagnosticSeverity::Warning,
|
||||
format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"),
|
||||
),
|
||||
EmbeddedRuntimeError::InvalidRequest(_) => diagnostic(
|
||||
EmbeddedRuntimeError::InvalidRequest(_)
|
||||
| EmbeddedRuntimeError::ConfigBundleMissing { .. }
|
||||
| EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. }
|
||||
| EmbeddedRuntimeError::InvalidProfileSelector { .. }
|
||||
| EmbeddedRuntimeError::UnsupportedConfigDeclaration { .. } => diagnostic(
|
||||
"embedded_runtime_invalid_request",
|
||||
DiagnosticSeverity::Warning,
|
||||
"Embedded Runtime rejected the request".to_string(),
|
||||
|
|
@ -2592,6 +2795,32 @@ mod tests {
|
|||
metadata
|
||||
}
|
||||
|
||||
fn test_config_bundle() -> ConfigBundle {
|
||||
ConfigBundle {
|
||||
metadata: worker_runtime::config_bundle::ConfigBundleMetadata {
|
||||
id: "bundle-1".to_string(),
|
||||
digest: String::new(),
|
||||
revision: "rev-1".to_string(),
|
||||
workspace_id: "local:test".to_string(),
|
||||
created_at: "2026-06-26T00:00:00Z".to_string(),
|
||||
provenance: worker_runtime::config_bundle::ConfigBundleProvenance {
|
||||
source: "workspace-server-test".to_string(),
|
||||
detail: None,
|
||||
},
|
||||
},
|
||||
profiles: vec![worker_runtime::config_bundle::ConfigProfileDescriptor {
|
||||
selector: ProfileSelector::Builtin("builtin:coder".to_string()),
|
||||
label: Some("Coder".to_string()),
|
||||
}],
|
||||
declarations: vec![worker_runtime::config_bundle::ConfigDeclaration {
|
||||
kind: worker_runtime::config_bundle::ConfigDeclarationKind::CapabilityGrant,
|
||||
name: "read".to_string(),
|
||||
reference: "capability:read".to_string(),
|
||||
}],
|
||||
}
|
||||
.with_computed_digest()
|
||||
}
|
||||
|
||||
fn assert_valid_generated_id(id: &str) {
|
||||
assert!(id.len() <= MAX_IDENTIFIER_LEN, "id too long: {id}");
|
||||
validate_backend_identifier("test_id", id).unwrap();
|
||||
|
|
@ -2941,6 +3170,9 @@ mod tests {
|
|||
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
|
||||
expected_segments: 0,
|
||||
},
|
||||
profile: None,
|
||||
config_bundle: None,
|
||||
requested_capabilities: Vec::new(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
|
@ -3013,6 +3245,50 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() {
|
||||
let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory(
|
||||
"local:test",
|
||||
))]);
|
||||
let bundle = test_config_bundle();
|
||||
let sync = registry
|
||||
.sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone())
|
||||
.unwrap();
|
||||
assert_eq!(sync.state, WorkerOperationState::Accepted);
|
||||
let reference = sync.availability.expect("bundle availability").reference;
|
||||
assert_eq!(reference.id, bundle.metadata.id);
|
||||
assert_eq!(reference.digest, bundle.metadata.digest);
|
||||
|
||||
let check = registry
|
||||
.check_config_bundle(EMBEDDED_RUNTIME_ID, reference.clone())
|
||||
.unwrap();
|
||||
assert_eq!(check.state, WorkerOperationState::Accepted);
|
||||
|
||||
let spawned = registry
|
||||
.spawn_worker(
|
||||
EMBEDDED_RUNTIME_ID,
|
||||
WorkerSpawnRequest {
|
||||
intent: WorkerSpawnIntent::TicketRole {
|
||||
ticket_id: "00001KVZSGT0Q".to_string(),
|
||||
role: TicketWorkerRole::Coder,
|
||||
},
|
||||
requested_worker_name: None,
|
||||
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
|
||||
expected_segments: 0,
|
||||
},
|
||||
profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())),
|
||||
config_bundle: Some(reference),
|
||||
requested_capabilities: vec![CapabilityRequest::named("read")],
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(spawned.state, WorkerOperationState::Accepted);
|
||||
assert_eq!(
|
||||
spawned.worker.unwrap().profile.as_deref(),
|
||||
Some("builtin:coder")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() {
|
||||
let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory(
|
||||
|
|
@ -3025,6 +3301,9 @@ mod tests {
|
|||
intent: WorkerSpawnIntent::WorkspaceCompanion,
|
||||
requested_worker_name: None,
|
||||
acceptance: WorkerSpawnAcceptanceRequirement::SocketReady,
|
||||
profile: None,
|
||||
config_bundle: None,
|
||||
requested_capabilities: Vec::new(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
|
|
|||
|
|
@ -13,10 +13,11 @@ use serde::{Deserialize, Serialize};
|
|||
use tokio::net::TcpListener;
|
||||
|
||||
use crate::hosts::{
|
||||
DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime,
|
||||
RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic, RuntimeRegistry, RuntimeSummary,
|
||||
WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest, WorkerLifecycleResult,
|
||||
WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection,
|
||||
ConfigBundleCheckResult, ConfigBundleSyncResult, DiagnosticSeverity, EmbeddedWorkerRuntime,
|
||||
HostSummary, LocalWorkerRuntime, RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic,
|
||||
RuntimeRegistry, RuntimeSummary, WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest,
|
||||
WorkerLifecycleResult, WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary,
|
||||
WorkerTranscriptProjection,
|
||||
};
|
||||
use crate::identity::WorkspaceIdentity;
|
||||
use crate::observation::{
|
||||
|
|
@ -29,6 +30,8 @@ use crate::records::{
|
|||
use crate::repositories::{LocalRepositoryReader, RepositoryLogRead, RepositorySummary};
|
||||
use crate::store::{ControlPlaneStore, WorkspaceRecord};
|
||||
use crate::{Error, Result};
|
||||
use worker_runtime::catalog::ConfigBundleRef;
|
||||
use worker_runtime::config_bundle::ConfigBundle;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum AuthConfig {
|
||||
|
|
@ -155,6 +158,14 @@ pub fn build_router(api: WorkspaceApi) -> Router {
|
|||
"/api/runtimes/{runtime_id}/workers",
|
||||
post(create_runtime_worker),
|
||||
)
|
||||
.route(
|
||||
"/api/runtimes/{runtime_id}/config-bundles",
|
||||
post(sync_runtime_config_bundle),
|
||||
)
|
||||
.route(
|
||||
"/api/runtimes/{runtime_id}/config-bundles/{bundle_id}/availability",
|
||||
get(check_runtime_config_bundle),
|
||||
)
|
||||
.route(
|
||||
"/api/runtimes/{runtime_id}/workers/{worker_id}",
|
||||
get(get_runtime_worker),
|
||||
|
|
@ -491,6 +502,16 @@ async fn get_runtime_worker(
|
|||
Ok(Json(worker))
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RuntimeConfigBundleSyncRequest {
|
||||
pub bundle: ConfigBundle,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RuntimeConfigBundleAvailabilityQuery {
|
||||
digest: String,
|
||||
}
|
||||
|
||||
async fn create_runtime_worker(
|
||||
State(api): State<WorkspaceApi>,
|
||||
AxumPath(runtime_id): AxumPath<String>,
|
||||
|
|
@ -503,6 +524,36 @@ async fn create_runtime_worker(
|
|||
Ok(Json(result))
|
||||
}
|
||||
|
||||
async fn sync_runtime_config_bundle(
|
||||
State(api): State<WorkspaceApi>,
|
||||
AxumPath(runtime_id): AxumPath<String>,
|
||||
Json(request): Json<RuntimeConfigBundleSyncRequest>,
|
||||
) -> ApiResult<Json<ConfigBundleSyncResult>> {
|
||||
let result = api
|
||||
.runtime
|
||||
.sync_config_bundle(&runtime_id, request.bundle)
|
||||
.map_err(|err| err.into_error())?;
|
||||
Ok(Json(result))
|
||||
}
|
||||
|
||||
async fn check_runtime_config_bundle(
|
||||
State(api): State<WorkspaceApi>,
|
||||
AxumPath((runtime_id, bundle_id)): AxumPath<(String, String)>,
|
||||
Query(query): Query<RuntimeConfigBundleAvailabilityQuery>,
|
||||
) -> ApiResult<Json<ConfigBundleCheckResult>> {
|
||||
let result = api
|
||||
.runtime
|
||||
.check_config_bundle(
|
||||
&runtime_id,
|
||||
ConfigBundleRef {
|
||||
id: bundle_id,
|
||||
digest: query.digest,
|
||||
},
|
||||
)
|
||||
.map_err(|err| err.into_error())?;
|
||||
Ok(Json(result))
|
||||
}
|
||||
|
||||
async fn send_runtime_worker_input(
|
||||
State(api): State<WorkspaceApi>,
|
||||
AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>,
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
|
|||
filter = sourceFilter;
|
||||
};
|
||||
|
||||
cargoHash = "sha256-kZ9TAb1lNpslAhzcyC2RyIZg5Yh5hrAGCTZIhhYl/e4=";
|
||||
cargoHash = "sha256-/7qrJH25rQSV2tKMOVUSu6ISUuEi+4WdwuX0E94LZYg=";
|
||||
|
||||
depsExtraArgs = {
|
||||
# Older fetchCargoVendor utilities used crates.io's API download endpoint,
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user