merge: embedded runtime fs-store

This commit is contained in:
Keisuke Hirata 2026-06-29 04:15:25 +09:00
commit 888e7b68e5
No known key found for this signature in database
5 changed files with 311 additions and 32 deletions

1
Cargo.lock generated
View File

@ -6038,6 +6038,7 @@ dependencies = [
"axum",
"chrono",
"futures",
"manifest",
"project-record",
"protocol",
"reqwest",

View File

@ -10,6 +10,7 @@ async-trait.workspace = true
axum = { workspace = true, features = ["ws"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
futures.workspace = true
manifest.workspace = true
protocol = { workspace = true }
project-record.workspace = true
reqwest = { version = "0.13", default-features = false, features = ["blocking", "json", "native-tls"] }
@ -23,7 +24,7 @@ ticket.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] }
tokio-tungstenite.workspace = true
worker = { workspace = true, features = ["runtime-adapter"] }
worker-runtime = { workspace = true, features = ["ws-server"] }
worker-runtime = { workspace = true, features = ["ws-server", "fs-store"] }
toml.workspace = true
tracing.workspace = true
uuid = { workspace = true, features = ["v7"] }

View File

@ -6,7 +6,7 @@ use reqwest::header::{AUTHORIZATION, CONTENT_TYPE};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::{sync::Arc, time::Duration};
use std::{path::PathBuf, sync::Arc, time::Duration};
use worker_runtime::catalog::{
ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail,
WorkerStatus as EmbeddedWorkerStatus,
@ -16,7 +16,10 @@ use worker_runtime::config_bundle::{
ConfigBundleSummary, ConfigProfileDescriptor,
};
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
use worker_runtime::execution::{WorkerExecutionRunState, WorkerExecutionStatus};
use worker_runtime::execution::{
WorkerExecutionBackendKind, WorkerExecutionRunState, WorkerExecutionStatus,
};
use worker_runtime::fs_store::FsRuntimeStoreOptions;
use worker_runtime::http_server::{
RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest,
RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse,
@ -909,29 +912,46 @@ pub struct EmbeddedWorkerRuntime {
execution_enabled: bool,
}
fn embedded_runtime_options() -> EmbeddedRuntimeOptions {
let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID)
.expect("embedded runtime id is a non-empty literal");
EmbeddedRuntimeOptions {
runtime_id: Some(runtime_id),
display_name: Some("Workspace backend embedded Runtime".to_string()),
..EmbeddedRuntimeOptions::default()
}
}
impl EmbeddedWorkerRuntime {
pub fn new_memory(workspace_id: impl AsRef<str>) -> Self {
let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID)
.expect("embedded runtime id is a non-empty literal");
let runtime = worker_runtime::Runtime::with_options(EmbeddedRuntimeOptions {
runtime_id: Some(runtime_id),
display_name: Some("Workspace backend embedded Runtime".to_string()),
..EmbeddedRuntimeOptions::default()
});
let runtime = worker_runtime::Runtime::with_options(embedded_runtime_options());
Self::from_runtime(workspace_id, runtime)
}
pub fn new_memory_with_execution_backend(
workspace_id: impl AsRef<str>,
backend: std::sync::Arc<dyn worker_runtime::execution::WorkerExecutionBackend>,
) -> Result<Self, worker_runtime::error::RuntimeError> {
let runtime =
worker_runtime::Runtime::with_execution_backend(embedded_runtime_options(), backend)?;
let mut embedded = Self::from_runtime(workspace_id, runtime);
embedded.execution_enabled = true;
Ok(embedded)
}
pub fn new_fs_store_with_execution_backend(
workspace_id: impl AsRef<str>,
store_root: impl Into<PathBuf>,
backend: std::sync::Arc<dyn worker_runtime::execution::WorkerExecutionBackend>,
) -> Result<Self, worker_runtime::error::RuntimeError> {
let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID)
.expect("embedded runtime id is a non-empty literal");
let runtime = worker_runtime::Runtime::with_execution_backend(
EmbeddedRuntimeOptions {
let runtime = worker_runtime::Runtime::with_fs_store_and_execution_backend(
FsRuntimeStoreOptions {
root: store_root.into(),
runtime_id: Some(runtime_id),
display_name: Some("Workspace backend embedded Runtime".to_string()),
..EmbeddedRuntimeOptions::default()
limits: EmbeddedRuntimeOptions::default().limits,
},
backend,
)?;
@ -998,15 +1018,12 @@ impl EmbeddedWorkerRuntime {
display_hint: "backend-internal worker-runtime Worker".to_string(),
},
capabilities: WorkerCapabilitySummary {
can_accept_input: self.can_accept_embedded_input(summary.status, &summary.execution),
can_accept_input: self
.can_accept_embedded_input(summary.status, &summary.execution),
can_stop: self.can_stop_embedded_worker(summary.status, &summary.execution),
can_spawn_followup: false,
},
diagnostics: vec![diagnostic(
"embedded_runtime_projection",
DiagnosticSeverity::Info,
"Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(),
)],
diagnostics: embedded_worker_projection_diagnostics(&summary.execution),
}
}
@ -1035,11 +1052,7 @@ impl EmbeddedWorkerRuntime {
can_stop: self.can_stop_embedded_worker(detail.status, &detail.execution),
can_spawn_followup: false,
},
diagnostics: vec![diagnostic(
"embedded_runtime_projection",
DiagnosticSeverity::Info,
"Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(),
)],
diagnostics: embedded_worker_projection_diagnostics(&detail.execution),
}
}
}
@ -2152,6 +2165,41 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str {
}
}
fn embedded_worker_projection_diagnostics(
execution: &WorkerExecutionStatus,
) -> Vec<RuntimeDiagnostic> {
let mut diagnostics = vec![diagnostic(
"embedded_runtime_projection",
DiagnosticSeverity::Info,
"Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(),
)];
if execution.backend == WorkerExecutionBackendKind::Stale {
diagnostics.push(diagnostic(
"embedded_worker_execution_stale",
DiagnosticSeverity::Warning,
"Worker execution handle is not connected in this server process; persisted execution binding was marked stale".to_string(),
));
} else if execution.backend == WorkerExecutionBackendKind::Unconnected
|| execution.run_state == WorkerExecutionRunState::Unconnected
{
diagnostics.push(diagnostic(
"embedded_worker_execution_unconnected",
DiagnosticSeverity::Warning,
"Worker execution handle is not connected in this server process".to_string(),
));
} else if execution.run_state == WorkerExecutionRunState::Rejected {
diagnostics.push(diagnostic(
"embedded_worker_execution_spawn_rejected",
DiagnosticSeverity::Error,
"Worker execution spawn was rejected; backend-private details are not exposed"
.to_string(),
));
}
diagnostics
}
fn embedded_worker_execution_status_label(
status: EmbeddedWorkerStatus,
execution: &WorkerExecutionStatus,

View File

@ -51,6 +51,7 @@ pub struct ServerConfig {
pub workspace_display_name: String,
pub workspace_created_at: String,
pub workspace_root: PathBuf,
pub embedded_runtime_store_root: PathBuf,
pub static_assets_dir: Option<PathBuf>,
pub auth: AuthConfig,
pub max_records: usize,
@ -61,11 +62,14 @@ pub struct ServerConfig {
impl ServerConfig {
pub fn local_dev(workspace_root: impl Into<PathBuf>, identity: WorkspaceIdentity) -> Self {
let workspace_root = workspace_root.into();
let workspace_id = identity.workspace_id;
let embedded_runtime_store_root = Self::default_embedded_runtime_store_root(&workspace_id);
Self {
workspace_id: identity.workspace_id,
workspace_id,
workspace_display_name: identity.display_name,
workspace_created_at: identity.created_at,
workspace_root,
embedded_runtime_store_root,
static_assets_dir: None,
auth: AuthConfig::LocalDevToken {
token_configured: false,
@ -75,6 +79,35 @@ impl ServerConfig {
remote_runtime_sources: Vec::new(),
}
}
pub fn embedded_runtime_store_root_for_data_dir(
data_dir: impl Into<PathBuf>,
workspace_id: impl AsRef<str>,
) -> PathBuf {
data_dir
.into()
.join("workspace-server")
.join(workspace_id.as_ref())
.join("embedded-runtime")
}
pub fn default_embedded_runtime_store_root(workspace_id: impl AsRef<str>) -> PathBuf {
match manifest::paths::data_dir() {
Some(data_dir) => {
Self::embedded_runtime_store_root_for_data_dir(data_dir, workspace_id.as_ref())
}
None => std::env::temp_dir()
.join("yoi")
.join("workspace-server")
.join(workspace_id.as_ref())
.join("embedded-runtime"),
}
}
pub fn with_embedded_runtime_store_root(mut self, root: impl Into<PathBuf>) -> Self {
self.embedded_runtime_store_root = root.into();
self
}
}
#[derive(Clone)]
@ -115,8 +148,9 @@ impl WorkspaceApi {
})
.await?;
let mut runtime = RuntimeRegistry::for_workspace(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
EmbeddedWorkerRuntime::new_fs_store_with_execution_backend(
config.workspace_id.clone(),
config.embedded_runtime_store_root.clone(),
execution_backend,
)
.map_err(|err| {
@ -1068,6 +1102,10 @@ mod tests {
use tokio_tungstenite::tungstenite::Message;
use tower::ServiceExt;
use crate::hosts::{
TicketWorkerRole, WorkerInputKind, WorkerOperationState, WorkerSpawnAcceptanceRequirement,
WorkerSpawnIntent,
};
use crate::observation::ClientWorkerEventWsDiagnostic;
use crate::store::SqliteWorkspaceStore;
@ -1141,6 +1179,13 @@ mod tests {
}
}
fn test_server_config(workspace_root: impl Into<PathBuf>) -> ServerConfig {
let workspace_root = workspace_root.into();
let store_root = workspace_root.join(".test-embedded-runtime-store");
ServerConfig::local_dev(workspace_root, test_identity())
.with_embedded_runtime_store_root(store_root)
}
fn runtime_test_bundle() -> worker_runtime::config_bundle::ConfigBundle {
worker_runtime::config_bundle::ConfigBundle {
metadata: worker_runtime::config_bundle::ConfigBundleMetadata {
@ -1197,7 +1242,7 @@ mod tests {
std::fs::write(static_dir.join("assets/app.js"), "console.log('yoi');").unwrap();
let store = SqliteWorkspaceStore::in_memory().unwrap();
let mut config = ServerConfig::local_dev(dir.path(), test_identity());
let mut config = test_server_config(dir.path());
config.static_assets_dir = Some(static_dir);
let api = WorkspaceApi::new_with_execution_backend(
config,
@ -1441,7 +1486,7 @@ mod tests {
#[tokio::test]
async fn legacy_companion_messages_route_dispatches_through_worker_runtime() {
let temp = tempfile::tempdir().unwrap();
let config = ServerConfig::local_dev(temp.path().join("workspace"), test_identity());
let config = test_server_config(temp.path().join("workspace"));
let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(SqliteWorkspaceStore::in_memory().unwrap()),
@ -1524,11 +1569,195 @@ mod tests {
);
}
#[tokio::test]
async fn embedded_runtime_fs_store_restores_catalog_config_bundle_transcript_and_stale_execution()
{
let dir = tempfile::tempdir().unwrap();
let config = test_server_config(dir.path().join("workspace"));
let store_root = config.embedded_runtime_store_root.clone();
let bundle = runtime_test_bundle();
let bundle_id = bundle.metadata.id.clone();
let api = WorkspaceApi::new_with_execution_backend(
config.clone(),
Arc::new(SqliteWorkspaceStore::in_memory().unwrap()),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.expect("fs-backed api starts");
let synced = api
.runtime
.sync_config_bundle("embedded-worker-runtime", bundle)
.expect("sync config bundle");
assert_eq!(synced.state, WorkerOperationState::Accepted);
assert!(store_root.exists(), "fs-store root should be created");
let spawned = api
.runtime
.spawn_worker(
"embedded-worker-runtime",
WorkerSpawnRequest {
intent: WorkerSpawnIntent::TicketRole {
ticket_id: "00001KVZSGT0Q".to_string(),
role: TicketWorkerRole::Coder,
},
requested_worker_name: None,
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
expected_segments: 0,
},
profile: None,
initial_input: None,
},
)
.expect("spawn worker");
assert_eq!(spawned.state, WorkerOperationState::Accepted);
let worker_id = spawned.worker.expect("created worker").worker_id;
let sent = api
.runtime
.send_input(
"embedded-worker-runtime",
&worker_id,
WorkerInputRequest {
kind: WorkerInputKind::User,
content: "persist me".to_string(),
},
)
.expect("send input");
assert_eq!(sent.state, WorkerOperationState::Accepted);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
let transcript = api
.runtime
.transcript("embedded-worker-runtime", &worker_id, 0, 10)
.expect("transcript");
if transcript.items.iter().any(|item| {
item.role == "assistant" && item.content == "server companion echoed: persist me"
}) {
assert!(
transcript
.items
.iter()
.any(|item| item.role == "user" && item.content == "persist me")
);
break;
}
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for deterministic transcript"
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
drop(api);
let restored = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(SqliteWorkspaceStore::in_memory().unwrap()),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.expect("restored fs-backed api starts");
let restored_worker = restored
.runtime
.worker("embedded-worker-runtime", &worker_id)
.expect("restored worker");
assert_eq!(restored_worker.status, "stale");
assert!(!restored_worker.capabilities.can_accept_input);
assert!(!restored_worker.capabilities.can_stop);
assert!(
restored_worker
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "embedded_worker_execution_stale")
);
let bundles = restored
.runtime
.list_config_bundles("embedded-worker-runtime")
.expect("config bundle list");
assert!(
bundles
.bundles
.iter()
.any(|summary| summary.id == bundle_id)
);
let restored_transcript = restored
.runtime
.transcript("embedded-worker-runtime", &worker_id, 0, 10)
.expect("restored transcript");
assert!(
restored_transcript
.items
.iter()
.any(|item| item.role == "user" && item.content == "persist me")
);
assert!(restored_transcript.items.iter().any(|item| {
item.role == "assistant" && item.content == "server companion echoed: persist me"
}));
let rejected_input = restored
.runtime
.send_input(
"embedded-worker-runtime",
&worker_id,
WorkerInputRequest {
kind: WorkerInputKind::User,
content: "should not be routed to stale handle".to_string(),
},
)
.expect("stale worker input is projected as an operation result");
assert_eq!(rejected_input.state, WorkerOperationState::Rejected);
}
#[tokio::test]
async fn embedded_runtime_store_root_is_isolated_and_not_exposed_by_browser_api() {
let dir = tempfile::tempdir().unwrap();
let data_dir = dir.path().join("user-data");
let workspace_root = dir.path().join("workspace");
let default_root =
ServerConfig::embedded_runtime_store_root_for_data_dir(&data_dir, TEST_WORKSPACE_ID);
assert_eq!(
default_root,
data_dir
.join("workspace-server")
.join(TEST_WORKSPACE_ID)
.join("embedded-runtime")
);
assert!(!default_root.starts_with(workspace_root.join(".yoi")));
let config = ServerConfig::local_dev(workspace_root, test_identity())
.with_embedded_runtime_store_root(default_root.clone());
let app = build_router(
WorkspaceApi::new_with_execution_backend(
config,
Arc::new(SqliteWorkspaceStore::in_memory().unwrap()),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap(),
);
let raw_store_root = default_root.to_string_lossy().to_string();
for uri in [
"/api/workspace",
"/api/hosts",
"/api/runtimes",
"/api/workers",
] {
let body = get_json(app.clone(), uri).await;
let serialized = serde_json::to_string(&body).unwrap();
assert!(
!serialized.contains(&raw_store_root),
"{uri} leaked embedded runtime store root: {serialized}"
);
}
}
#[tokio::test]
async fn embedded_runtime_api_routes_by_runtime_and_worker_ids_without_leaking_internals() {
let dir = tempfile::tempdir().unwrap();
let store = SqliteWorkspaceStore::in_memory().unwrap();
let config = ServerConfig::local_dev(dir.path(), test_identity());
let config = test_server_config(dir.path());
let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
@ -1682,7 +1911,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let store = SqliteWorkspaceStore::in_memory().unwrap();
let mut config = ServerConfig::local_dev(dir.path(), test_identity());
let mut config = test_server_config(dir.path());
config
.runtime_event_sources
.push(RuntimeObservationSourceConfig {
@ -1896,7 +2125,7 @@ mod tests {
) -> (String, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let store = SqliteWorkspaceStore::in_memory().unwrap();
let mut config = ServerConfig::local_dev(dir.path(), test_identity());
let mut config = test_server_config(dir.path());
let runtime_id = source.runtime_id.clone();
let worker_id = source.worker_id.clone();
config.runtime_event_sources.push(source);

View File

@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
filter = sourceFilter;
};
cargoHash = "sha256-1jSDcivotZ0/v5AURQaetn9xjH5JyQNDeNlJ4AcwEUc=";
cargoHash = "sha256-9F60cIVhRTct8sK11xoqOVA4rLd5Ba76Vi7+Y2NFrRo=";
depsExtraArgs = {
# Older fetchCargoVendor utilities used crates.io's API download endpoint,