diff --git a/Cargo.lock b/Cargo.lock index 1f046047..8f9f2ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6038,6 +6038,7 @@ dependencies = [ "axum", "chrono", "futures", + "manifest", "project-record", "protocol", "reqwest", diff --git a/crates/workspace-server/Cargo.toml b/crates/workspace-server/Cargo.toml index 842975df..11182650 100644 --- a/crates/workspace-server/Cargo.toml +++ b/crates/workspace-server/Cargo.toml @@ -10,6 +10,7 @@ async-trait.workspace = true axum = { workspace = true, features = ["ws"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } futures.workspace = true +manifest.workspace = true protocol = { workspace = true } project-record.workspace = true reqwest = { version = "0.13", default-features = false, features = ["blocking", "json", "native-tls"] } @@ -23,7 +24,7 @@ ticket.workspace = true tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] } tokio-tungstenite.workspace = true worker = { workspace = true, features = ["runtime-adapter"] } -worker-runtime = { workspace = true, features = ["ws-server"] } +worker-runtime = { workspace = true, features = ["ws-server", "fs-store"] } toml.workspace = true tracing.workspace = true uuid = { workspace = true, features = ["v7"] } diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index dfd4a391..c18515fd 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -6,7 +6,7 @@ use reqwest::header::{AUTHORIZATION, CONTENT_TYPE}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use std::{sync::Arc, time::Duration}; +use std::{path::PathBuf, sync::Arc, time::Duration}; use worker_runtime::catalog::{ ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, WorkerStatus as EmbeddedWorkerStatus, @@ -16,7 +16,10 @@ use worker_runtime::config_bundle::{ ConfigBundleSummary, ConfigProfileDescriptor, }; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; -use worker_runtime::execution::{WorkerExecutionRunState, WorkerExecutionStatus}; +use worker_runtime::execution::{ + WorkerExecutionBackendKind, WorkerExecutionRunState, WorkerExecutionStatus, +}; +use worker_runtime::fs_store::FsRuntimeStoreOptions; use worker_runtime::http_server::{ RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, @@ -909,29 +912,46 @@ pub struct EmbeddedWorkerRuntime { execution_enabled: bool, } +fn embedded_runtime_options() -> EmbeddedRuntimeOptions { + let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) + .expect("embedded runtime id is a non-empty literal"); + EmbeddedRuntimeOptions { + runtime_id: Some(runtime_id), + display_name: Some("Workspace backend embedded Runtime".to_string()), + ..EmbeddedRuntimeOptions::default() + } +} + impl EmbeddedWorkerRuntime { pub fn new_memory(workspace_id: impl AsRef) -> Self { - let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) - .expect("embedded runtime id is a non-empty literal"); - let runtime = worker_runtime::Runtime::with_options(EmbeddedRuntimeOptions { - runtime_id: Some(runtime_id), - display_name: Some("Workspace backend embedded Runtime".to_string()), - ..EmbeddedRuntimeOptions::default() - }); + let runtime = worker_runtime::Runtime::with_options(embedded_runtime_options()); Self::from_runtime(workspace_id, runtime) } pub fn new_memory_with_execution_backend( workspace_id: impl AsRef, backend: std::sync::Arc, + ) -> Result { + let runtime = + worker_runtime::Runtime::with_execution_backend(embedded_runtime_options(), backend)?; + let mut embedded = Self::from_runtime(workspace_id, runtime); + embedded.execution_enabled = true; + Ok(embedded) + } + + pub fn new_fs_store_with_execution_backend( + workspace_id: impl AsRef, + store_root: impl Into, + backend: std::sync::Arc, ) -> Result { let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) .expect("embedded runtime id is a non-empty literal"); - let runtime = worker_runtime::Runtime::with_execution_backend( - EmbeddedRuntimeOptions { + let runtime = worker_runtime::Runtime::with_fs_store_and_execution_backend( + FsRuntimeStoreOptions { + root: store_root.into(), runtime_id: Some(runtime_id), display_name: Some("Workspace backend embedded Runtime".to_string()), - ..EmbeddedRuntimeOptions::default() + limits: EmbeddedRuntimeOptions::default().limits, }, backend, )?; @@ -998,15 +1018,12 @@ impl EmbeddedWorkerRuntime { display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { - can_accept_input: self.can_accept_embedded_input(summary.status, &summary.execution), + can_accept_input: self + .can_accept_embedded_input(summary.status, &summary.execution), can_stop: self.can_stop_embedded_worker(summary.status, &summary.execution), can_spawn_followup: false, }, - diagnostics: vec![diagnostic( - "embedded_runtime_projection", - DiagnosticSeverity::Info, - "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), - )], + diagnostics: embedded_worker_projection_diagnostics(&summary.execution), } } @@ -1035,11 +1052,7 @@ impl EmbeddedWorkerRuntime { can_stop: self.can_stop_embedded_worker(detail.status, &detail.execution), can_spawn_followup: false, }, - diagnostics: vec![diagnostic( - "embedded_runtime_projection", - DiagnosticSeverity::Info, - "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), - )], + diagnostics: embedded_worker_projection_diagnostics(&detail.execution), } } } @@ -2152,6 +2165,41 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str { } } +fn embedded_worker_projection_diagnostics( + execution: &WorkerExecutionStatus, +) -> Vec { + let mut diagnostics = vec![diagnostic( + "embedded_runtime_projection", + DiagnosticSeverity::Info, + "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), + )]; + + if execution.backend == WorkerExecutionBackendKind::Stale { + diagnostics.push(diagnostic( + "embedded_worker_execution_stale", + DiagnosticSeverity::Warning, + "Worker execution handle is not connected in this server process; persisted execution binding was marked stale".to_string(), + )); + } else if execution.backend == WorkerExecutionBackendKind::Unconnected + || execution.run_state == WorkerExecutionRunState::Unconnected + { + diagnostics.push(diagnostic( + "embedded_worker_execution_unconnected", + DiagnosticSeverity::Warning, + "Worker execution handle is not connected in this server process".to_string(), + )); + } else if execution.run_state == WorkerExecutionRunState::Rejected { + diagnostics.push(diagnostic( + "embedded_worker_execution_spawn_rejected", + DiagnosticSeverity::Error, + "Worker execution spawn was rejected; backend-private details are not exposed" + .to_string(), + )); + } + + diagnostics +} + fn embedded_worker_execution_status_label( status: EmbeddedWorkerStatus, execution: &WorkerExecutionStatus, diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 79fcf849..2e2a4773 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -51,6 +51,7 @@ pub struct ServerConfig { pub workspace_display_name: String, pub workspace_created_at: String, pub workspace_root: PathBuf, + pub embedded_runtime_store_root: PathBuf, pub static_assets_dir: Option, pub auth: AuthConfig, pub max_records: usize, @@ -61,11 +62,14 @@ pub struct ServerConfig { impl ServerConfig { pub fn local_dev(workspace_root: impl Into, identity: WorkspaceIdentity) -> Self { let workspace_root = workspace_root.into(); + let workspace_id = identity.workspace_id; + let embedded_runtime_store_root = Self::default_embedded_runtime_store_root(&workspace_id); Self { - workspace_id: identity.workspace_id, + workspace_id, workspace_display_name: identity.display_name, workspace_created_at: identity.created_at, workspace_root, + embedded_runtime_store_root, static_assets_dir: None, auth: AuthConfig::LocalDevToken { token_configured: false, @@ -75,6 +79,35 @@ impl ServerConfig { remote_runtime_sources: Vec::new(), } } + + pub fn embedded_runtime_store_root_for_data_dir( + data_dir: impl Into, + workspace_id: impl AsRef, + ) -> PathBuf { + data_dir + .into() + .join("workspace-server") + .join(workspace_id.as_ref()) + .join("embedded-runtime") + } + + pub fn default_embedded_runtime_store_root(workspace_id: impl AsRef) -> PathBuf { + match manifest::paths::data_dir() { + Some(data_dir) => { + Self::embedded_runtime_store_root_for_data_dir(data_dir, workspace_id.as_ref()) + } + None => std::env::temp_dir() + .join("yoi") + .join("workspace-server") + .join(workspace_id.as_ref()) + .join("embedded-runtime"), + } + } + + pub fn with_embedded_runtime_store_root(mut self, root: impl Into) -> Self { + self.embedded_runtime_store_root = root.into(); + self + } } #[derive(Clone)] @@ -115,8 +148,9 @@ impl WorkspaceApi { }) .await?; let mut runtime = RuntimeRegistry::for_workspace( - EmbeddedWorkerRuntime::new_memory_with_execution_backend( + EmbeddedWorkerRuntime::new_fs_store_with_execution_backend( config.workspace_id.clone(), + config.embedded_runtime_store_root.clone(), execution_backend, ) .map_err(|err| { @@ -1068,6 +1102,10 @@ mod tests { use tokio_tungstenite::tungstenite::Message; use tower::ServiceExt; + use crate::hosts::{ + TicketWorkerRole, WorkerInputKind, WorkerOperationState, WorkerSpawnAcceptanceRequirement, + WorkerSpawnIntent, + }; use crate::observation::ClientWorkerEventWsDiagnostic; use crate::store::SqliteWorkspaceStore; @@ -1141,6 +1179,13 @@ mod tests { } } + fn test_server_config(workspace_root: impl Into) -> ServerConfig { + let workspace_root = workspace_root.into(); + let store_root = workspace_root.join(".test-embedded-runtime-store"); + ServerConfig::local_dev(workspace_root, test_identity()) + .with_embedded_runtime_store_root(store_root) + } + fn runtime_test_bundle() -> worker_runtime::config_bundle::ConfigBundle { worker_runtime::config_bundle::ConfigBundle { metadata: worker_runtime::config_bundle::ConfigBundleMetadata { @@ -1197,7 +1242,7 @@ mod tests { std::fs::write(static_dir.join("assets/app.js"), "console.log('yoi');").unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap(); - let mut config = ServerConfig::local_dev(dir.path(), test_identity()); + let mut config = test_server_config(dir.path()); config.static_assets_dir = Some(static_dir); let api = WorkspaceApi::new_with_execution_backend( config, @@ -1441,7 +1486,7 @@ mod tests { #[tokio::test] async fn legacy_companion_messages_route_dispatches_through_worker_runtime() { let temp = tempfile::tempdir().unwrap(); - let config = ServerConfig::local_dev(temp.path().join("workspace"), test_identity()); + let config = test_server_config(temp.path().join("workspace")); let api = WorkspaceApi::new_with_execution_backend( config, Arc::new(SqliteWorkspaceStore::in_memory().unwrap()), @@ -1524,11 +1569,195 @@ mod tests { ); } + #[tokio::test] + async fn embedded_runtime_fs_store_restores_catalog_config_bundle_transcript_and_stale_execution() + { + let dir = tempfile::tempdir().unwrap(); + let config = test_server_config(dir.path().join("workspace")); + let store_root = config.embedded_runtime_store_root.clone(); + let bundle = runtime_test_bundle(); + let bundle_id = bundle.metadata.id.clone(); + + let api = WorkspaceApi::new_with_execution_backend( + config.clone(), + Arc::new(SqliteWorkspaceStore::in_memory().unwrap()), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .expect("fs-backed api starts"); + let synced = api + .runtime + .sync_config_bundle("embedded-worker-runtime", bundle) + .expect("sync config bundle"); + assert_eq!(synced.state, WorkerOperationState::Accepted); + assert!(store_root.exists(), "fs-store root should be created"); + + let spawned = api + .runtime + .spawn_worker( + "embedded-worker-runtime", + WorkerSpawnRequest { + intent: WorkerSpawnIntent::TicketRole { + ticket_id: "00001KVZSGT0Q".to_string(), + role: TicketWorkerRole::Coder, + }, + requested_worker_name: None, + acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { + expected_segments: 0, + }, + profile: None, + initial_input: None, + }, + ) + .expect("spawn worker"); + assert_eq!(spawned.state, WorkerOperationState::Accepted); + let worker_id = spawned.worker.expect("created worker").worker_id; + let sent = api + .runtime + .send_input( + "embedded-worker-runtime", + &worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "persist me".to_string(), + }, + ) + .expect("send input"); + assert_eq!(sent.state, WorkerOperationState::Accepted); + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + let transcript = api + .runtime + .transcript("embedded-worker-runtime", &worker_id, 0, 10) + .expect("transcript"); + if transcript.items.iter().any(|item| { + item.role == "assistant" && item.content == "server companion echoed: persist me" + }) { + assert!( + transcript + .items + .iter() + .any(|item| item.role == "user" && item.content == "persist me") + ); + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for deterministic transcript" + ); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + drop(api); + + let restored = WorkspaceApi::new_with_execution_backend( + config, + Arc::new(SqliteWorkspaceStore::in_memory().unwrap()), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .expect("restored fs-backed api starts"); + let restored_worker = restored + .runtime + .worker("embedded-worker-runtime", &worker_id) + .expect("restored worker"); + assert_eq!(restored_worker.status, "stale"); + assert!(!restored_worker.capabilities.can_accept_input); + assert!(!restored_worker.capabilities.can_stop); + assert!( + restored_worker + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "embedded_worker_execution_stale") + ); + + let bundles = restored + .runtime + .list_config_bundles("embedded-worker-runtime") + .expect("config bundle list"); + assert!( + bundles + .bundles + .iter() + .any(|summary| summary.id == bundle_id) + ); + + let restored_transcript = restored + .runtime + .transcript("embedded-worker-runtime", &worker_id, 0, 10) + .expect("restored transcript"); + assert!( + restored_transcript + .items + .iter() + .any(|item| item.role == "user" && item.content == "persist me") + ); + assert!(restored_transcript.items.iter().any(|item| { + item.role == "assistant" && item.content == "server companion echoed: persist me" + })); + + let rejected_input = restored + .runtime + .send_input( + "embedded-worker-runtime", + &worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "should not be routed to stale handle".to_string(), + }, + ) + .expect("stale worker input is projected as an operation result"); + assert_eq!(rejected_input.state, WorkerOperationState::Rejected); + } + + #[tokio::test] + async fn embedded_runtime_store_root_is_isolated_and_not_exposed_by_browser_api() { + let dir = tempfile::tempdir().unwrap(); + let data_dir = dir.path().join("user-data"); + let workspace_root = dir.path().join("workspace"); + let default_root = + ServerConfig::embedded_runtime_store_root_for_data_dir(&data_dir, TEST_WORKSPACE_ID); + assert_eq!( + default_root, + data_dir + .join("workspace-server") + .join(TEST_WORKSPACE_ID) + .join("embedded-runtime") + ); + assert!(!default_root.starts_with(workspace_root.join(".yoi"))); + + let config = ServerConfig::local_dev(workspace_root, test_identity()) + .with_embedded_runtime_store_root(default_root.clone()); + let app = build_router( + WorkspaceApi::new_with_execution_backend( + config, + Arc::new(SqliteWorkspaceStore::in_memory().unwrap()), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .unwrap(), + ); + let raw_store_root = default_root.to_string_lossy().to_string(); + for uri in [ + "/api/workspace", + "/api/hosts", + "/api/runtimes", + "/api/workers", + ] { + let body = get_json(app.clone(), uri).await; + let serialized = serde_json::to_string(&body).unwrap(); + assert!( + !serialized.contains(&raw_store_root), + "{uri} leaked embedded runtime store root: {serialized}" + ); + } + } + #[tokio::test] async fn embedded_runtime_api_routes_by_runtime_and_worker_ids_without_leaking_internals() { let dir = tempfile::tempdir().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap(); - let config = ServerConfig::local_dev(dir.path(), test_identity()); + let config = test_server_config(dir.path()); let api = WorkspaceApi::new_with_execution_backend( config, Arc::new(store), @@ -1682,7 +1911,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap(); - let mut config = ServerConfig::local_dev(dir.path(), test_identity()); + let mut config = test_server_config(dir.path()); config .runtime_event_sources .push(RuntimeObservationSourceConfig { @@ -1896,7 +2125,7 @@ mod tests { ) -> (String, tempfile::TempDir) { let dir = tempfile::tempdir().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap(); - let mut config = ServerConfig::local_dev(dir.path(), test_identity()); + let mut config = test_server_config(dir.path()); let runtime_id = source.runtime_id.clone(); let worker_id = source.worker_id.clone(); config.runtime_event_sources.push(source); diff --git a/package.nix b/package.nix index 4ca9659d..8e206b0c 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-1jSDcivotZ0/v5AURQaetn9xjH5JyQNDeNlJ4AcwEUc="; + cargoHash = "sha256-9F60cIVhRTct8sK11xoqOVA4rLd5Ba76Vi7+Y2NFrRo="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,