From f43a6b84011024b68c03fd5b4211ab427614683b Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 26 Jun 2026 05:48:23 +0900 Subject: [PATCH 1/2] feat: add worker runtime REST server --- Cargo.lock | 3 + crates/worker-runtime/Cargo.toml | 4 + crates/worker-runtime/src/http_server.rs | 690 +++++++++++++++++++++++ crates/worker-runtime/src/lib.rs | 11 +- package.nix | 2 +- 5 files changed, 705 insertions(+), 5 deletions(-) create mode 100644 crates/worker-runtime/src/http_server.rs diff --git a/Cargo.lock b/Cargo.lock index f2f468eb..71f9c698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5905,9 +5905,12 @@ dependencies = [ name = "worker-runtime" version = "0.1.0" dependencies = [ + "axum", "serde", "serde_json", "thiserror 2.0.18", + "tokio", + "tower", ] [[package]] diff --git a/crates/worker-runtime/Cargo.toml b/crates/worker-runtime/Cargo.toml index aca3ecb1..2cbeefd7 100644 --- a/crates/worker-runtime/Cargo.toml +++ b/crates/worker-runtime/Cargo.toml @@ -8,8 +8,12 @@ license.workspace = true [features] default = [] fs-store = ["dep:serde_json"] +http-server = ["dep:axum", "dep:serde_json", "dep:tokio", "dep:tower"] [dependencies] +axum = { workspace = true, optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, optional = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["net", "rt"], optional = true } +tower = { workspace = true, features = ["util"], optional = true } diff --git a/crates/worker-runtime/src/http_server.rs b/crates/worker-runtime/src/http_server.rs new file mode 100644 index 00000000..11624fd8 --- /dev/null +++ b/crates/worker-runtime/src/http_server.rs @@ -0,0 +1,690 @@ +//! Optional REST process adapter for the Runtime command API. +//! +//! This module is intentionally gated by the `http-server` feature so embedded +//! Runtime users do not pull HTTP dependencies. The server is a process-local +//! command surface for a trusted backend/proxy. Browsers must not connect to the +//! Runtime process directly; a backend is expected to own any browser-facing +//! credentials, registration, and policy. + +use crate::Runtime; +use crate::catalog::{CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerSummary}; +use crate::error::RuntimeError; +#[cfg(feature = "fs-store")] +use crate::fs_store::FsRuntimeStoreOptions; +use crate::identity::{RuntimeId, WorkerId, WorkerRef}; +use crate::interaction::{WorkerInput, WorkerInteractionAck}; +use crate::management::{RuntimeLimits, RuntimeOptions, RuntimeSummary}; +use crate::observation::{TranscriptProjection, TranscriptQuery}; +use axum::body::{Body, Bytes}; +use axum::extract::rejection::{JsonRejection, QueryRejection}; +use axum::extract::{Path, Query, State}; +use axum::http::{Request, StatusCode, header}; +use axum::middleware::{self, Next}; +use axum::response::{IntoResponse, Response}; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::net::SocketAddr; +#[cfg(feature = "fs-store")] +use std::path::PathBuf; +use std::sync::Arc; +use tokio::net::TcpListener; + +/// v0 Runtime REST server configuration. +#[derive(Clone, PartialEq, Eq)] +pub struct RuntimeHttpServerConfig { + /// Address for the Runtime process to bind. Use a loopback address unless a + /// trusted backend proxy explicitly owns network exposure. + pub bind_addr: SocketAddr, + /// Optional explicit Runtime authority id. If omitted, the Runtime library + /// generates one. + pub runtime_id: Option, + /// Optional display label surfaced by `GET /v1/runtime`. + pub display_name: Option, + /// Bounded Runtime API limits. + pub limits: RuntimeLimits, + /// v0 store selection for the Runtime process. + pub store: RuntimeHttpStoreSelection, + /// Minimal local bearer token placeholder for backend-to-Runtime calls. + /// This is not a browser-facing credential model. + pub local_token: Option, +} + +impl Default for RuntimeHttpServerConfig { + fn default() -> Self { + Self { + bind_addr: SocketAddr::from(([127, 0, 0, 1], 0)), + runtime_id: None, + display_name: None, + limits: RuntimeLimits::default(), + store: RuntimeHttpStoreSelection::Memory, + local_token: None, + } + } +} + +impl fmt::Debug for RuntimeHttpServerConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeHttpServerConfig") + .field("bind_addr", &self.bind_addr) + .field("runtime_id", &self.runtime_id) + .field("display_name", &self.display_name) + .field("limits", &self.limits) + .field("store", &self.store) + .field( + "local_token", + &self.local_token.as_ref().map(|_| ""), + ) + .finish() + } +} + +/// v0 Runtime store selection for the REST process adapter. +#[derive(Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] +pub enum RuntimeHttpStoreSelection { + Memory, + /// Filesystem-backed Runtime store. Available only when `fs-store` is also + /// enabled; no new persistence model is introduced by the REST adapter. + #[cfg(feature = "fs-store")] + Fs { + root: PathBuf, + }, +} + +/// Bound REST server instance. +pub struct RuntimeHttpServer { + runtime: Runtime, + local_token: Option, + listener: TcpListener, +} + +impl RuntimeHttpServer { + /// Build a Runtime from config and bind the configured address. + pub async fn bind(config: RuntimeHttpServerConfig) -> Result { + let runtime = runtime_from_config(&config)?; + let listener = TcpListener::bind(config.bind_addr).await?; + Ok(Self { + runtime, + local_token: config.local_token, + listener, + }) + } + + /// Address actually bound by the server. + pub fn local_addr(&self) -> Result { + Ok(self.listener.local_addr()?) + } + + /// Runtime owned by this server. + pub fn runtime(&self) -> Runtime { + self.runtime.clone() + } + + /// Serve requests until the axum server is stopped or returns an error. + pub async fn serve(self) -> Result<(), RuntimeHttpServerError> { + serve_runtime_http(self.runtime, self.listener, self.local_token).await + } +} + +/// Convenience entry point: bind and serve a configured Runtime REST process API. +pub async fn serve_configured_runtime_http( + config: RuntimeHttpServerConfig, +) -> Result<(), RuntimeHttpServerError> { + RuntimeHttpServer::bind(config).await?.serve().await +} + +/// Serve an existing Runtime on a pre-bound listener. +pub async fn serve_runtime_http( + runtime: Runtime, + listener: TcpListener, + local_token: Option, +) -> Result<(), RuntimeHttpServerError> { + axum::serve(listener, runtime_http_router(runtime, local_token)).await?; + Ok(()) +} + +/// Build the REST router for an existing Runtime. +/// +/// Handlers delegate to [`Runtime`] methods and keep Worker authority as +/// `(runtime_id, worker_id)`. The path contains only a Runtime-local +/// `worker_id`; the server supplies its own Runtime id instead of accepting a +/// legacy pod/socket/session path as authority. +pub fn runtime_http_router(runtime: Runtime, local_token: Option) -> Router { + let state = RuntimeHttpState { + runtime, + local_token: local_token.map(Arc::::from), + }; + + Router::new() + .route("/v1/runtime", get(get_runtime)) + .route("/v1/workers", get(list_workers).post(create_worker)) + .route("/v1/workers/{worker_id}", get(get_worker)) + .route("/v1/workers/{worker_id}/input", post(send_worker_input)) + .route("/v1/workers/{worker_id}/stop", post(stop_worker)) + .route("/v1/workers/{worker_id}/cancel", post(cancel_worker)) + .route( + "/v1/workers/{worker_id}/transcript", + get(get_worker_transcript), + ) + .with_state(state.clone()) + .layer(middleware::from_fn_with_state(state, require_local_token)) +} + +fn runtime_from_config( + config: &RuntimeHttpServerConfig, +) -> Result { + match &config.store { + RuntimeHttpStoreSelection::Memory => Ok(Runtime::with_options(RuntimeOptions { + runtime_id: config.runtime_id.clone(), + display_name: config.display_name.clone(), + limits: config.limits.clone(), + })), + #[cfg(feature = "fs-store")] + RuntimeHttpStoreSelection::Fs { root } => { + Ok(Runtime::with_fs_store(FsRuntimeStoreOptions { + root: root.clone(), + runtime_id: config.runtime_id.clone(), + display_name: config.display_name.clone(), + limits: config.limits.clone(), + })?) + } + } +} + +#[derive(Clone)] +struct RuntimeHttpState { + runtime: Runtime, + local_token: Option>, +} + +/// `GET /v1/runtime` response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpSummaryResponse { + pub runtime: RuntimeSummary, +} + +/// `GET /v1/workers` response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpWorkersResponse { + pub workers: Vec, +} + +/// Worker detail response used by create/detail endpoints. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpWorkerResponse { + pub worker: WorkerDetail, +} + +/// Worker input acknowledgement response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpWorkerInputResponse { + pub ack: WorkerInteractionAck, +} + +/// Worker lifecycle request body used by stop/cancel endpoints. +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpWorkerLifecycleRequest { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +/// Worker lifecycle acknowledgement response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpWorkerLifecycleResponse { + pub ack: WorkerLifecycleAck, +} + +/// `GET /v1/workers/{worker_id}/transcript` response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpTranscriptResponse { + pub transcript: TranscriptProjection, +} + +/// Typed REST error response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpErrorResponse { + pub error: RuntimeHttpErrorDetail, +} + +/// Typed REST error payload. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeHttpErrorDetail { + pub code: String, + pub message: String, +} + +#[derive(Clone, Debug, Deserialize)] +struct RuntimeHttpTranscriptQuery { + #[serde(default)] + start: usize, + #[serde(default = "default_transcript_limit")] + limit: usize, +} + +fn default_transcript_limit() -> usize { + 256 +} + +type RestResult = Result, RuntimeHttpRestError>; + +async fn get_runtime( + State(state): State, +) -> RestResult { + let runtime = state + .runtime + .summary() + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpSummaryResponse { runtime })) +} + +async fn list_workers( + State(state): State, +) -> RestResult { + let workers = state + .runtime + .list_workers() + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpWorkersResponse { workers })) +} + +async fn get_worker( + State(state): State, + Path(worker_id): Path, +) -> RestResult { + let worker_ref = worker_ref_for(&state.runtime, worker_id)?; + let worker = state + .runtime + .worker_detail(&worker_ref) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpWorkerResponse { worker })) +} + +async fn create_worker( + State(state): State, + body: Result, JsonRejection>, +) -> RestResult { + let Json(request) = body.map_err(RuntimeHttpRestError::json_rejection)?; + let worker = state + .runtime + .create_worker(request) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpWorkerResponse { worker })) +} + +async fn send_worker_input( + State(state): State, + Path(worker_id): Path, + body: Result, JsonRejection>, +) -> RestResult { + let worker_ref = worker_ref_for(&state.runtime, worker_id)?; + let Json(input) = body.map_err(RuntimeHttpRestError::json_rejection)?; + let ack = state + .runtime + .send_input(&worker_ref, input) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpWorkerInputResponse { ack })) +} + +async fn stop_worker( + State(state): State, + Path(worker_id): Path, + body: Bytes, +) -> RestResult { + let worker_ref = worker_ref_for(&state.runtime, worker_id)?; + let request = parse_optional_lifecycle_request(body)?; + let ack = state + .runtime + .stop_worker(&worker_ref, request.reason) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpWorkerLifecycleResponse { ack })) +} + +async fn cancel_worker( + State(state): State, + Path(worker_id): Path, + body: Bytes, +) -> RestResult { + let worker_ref = worker_ref_for(&state.runtime, worker_id)?; + let request = parse_optional_lifecycle_request(body)?; + let ack = state + .runtime + .cancel_worker(&worker_ref, request.reason) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpWorkerLifecycleResponse { ack })) +} + +async fn get_worker_transcript( + State(state): State, + Path(worker_id): Path, + query: Result, QueryRejection>, +) -> RestResult { + let worker_ref = worker_ref_for(&state.runtime, worker_id)?; + let Query(query) = query.map_err(RuntimeHttpRestError::query_rejection)?; + let transcript = state + .runtime + .transcript_projection(&worker_ref, TranscriptQuery::new(query.start, query.limit)) + .map_err(RuntimeHttpRestError::runtime)?; + Ok(Json(RuntimeHttpTranscriptResponse { transcript })) +} + +fn worker_ref_for(runtime: &Runtime, worker_id: String) -> Result { + let worker_id = WorkerId::new(worker_id).ok_or_else(|| { + RuntimeHttpRestError::new( + StatusCode::BAD_REQUEST, + "invalid_worker_id", + "worker_id must not be empty", + ) + })?; + let runtime_id = runtime + .runtime_id() + .map_err(RuntimeHttpRestError::runtime)?; + Ok(WorkerRef::new(runtime_id, worker_id)) +} + +fn parse_optional_lifecycle_request( + body: Bytes, +) -> Result { + if body.is_empty() { + return Ok(RuntimeHttpWorkerLifecycleRequest::default()); + } + serde_json::from_slice(&body).map_err(|error| { + RuntimeHttpRestError::new( + StatusCode::BAD_REQUEST, + "invalid_json", + format!("invalid lifecycle request JSON: {error}"), + ) + }) +} + +async fn require_local_token( + State(state): State, + request: Request, + next: Next, +) -> Response { + if let Some(expected) = state.local_token.as_deref() { + let supplied = request + .headers() + .get(header::AUTHORIZATION) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.strip_prefix("Bearer ")); + if supplied != Some(expected) { + return RuntimeHttpRestError::new( + StatusCode::UNAUTHORIZED, + "unauthorized", + "missing or invalid local Runtime bearer token", + ) + .into_response(); + } + } + next.run(request).await +} + +#[derive(Debug)] +struct RuntimeHttpRestError { + status: StatusCode, + code: &'static str, + message: String, +} + +impl RuntimeHttpRestError { + fn new(status: StatusCode, code: &'static str, message: impl Into) -> Self { + Self { + status, + code, + message: message.into(), + } + } + + fn runtime(error: RuntimeError) -> Self { + let status = status_for_runtime_error(&error); + let code = code_for_runtime_error(&error); + Self::new(status, code, error.to_string()) + } + + fn json_rejection(error: JsonRejection) -> Self { + Self::new( + StatusCode::BAD_REQUEST, + "invalid_json", + format!("invalid JSON request body: {error}"), + ) + } + + fn query_rejection(error: QueryRejection) -> Self { + Self::new( + StatusCode::BAD_REQUEST, + "invalid_query", + format!("invalid query parameters: {error}"), + ) + } +} + +impl IntoResponse for RuntimeHttpRestError { + fn into_response(self) -> Response { + let body = RuntimeHttpErrorResponse { + error: RuntimeHttpErrorDetail { + code: self.code.to_string(), + message: self.message, + }, + }; + (self.status, Json(body)).into_response() + } +} + +fn status_for_runtime_error(error: &RuntimeError) -> StatusCode { + match error { + RuntimeError::WorkerNotFound { .. } => StatusCode::NOT_FOUND, + RuntimeError::RuntimeStopped { .. } => StatusCode::CONFLICT, + RuntimeError::LimitTooLarge { .. } + | RuntimeError::InvalidRequest(_) + | RuntimeError::WrongRuntime { .. } + | RuntimeError::WrongRuntimeCursor { .. } => StatusCode::BAD_REQUEST, + RuntimeError::StoreIo { .. } + | RuntimeError::StoreMissing { .. } + | RuntimeError::StoreCorrupt { .. } + | RuntimeError::StatePoisoned => StatusCode::INTERNAL_SERVER_ERROR, + } +} + +fn code_for_runtime_error(error: &RuntimeError) -> &'static str { + match error { + RuntimeError::RuntimeStopped { .. } => "runtime_stopped", + RuntimeError::WrongRuntime { .. } => "wrong_runtime", + RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor", + RuntimeError::WorkerNotFound { .. } => "worker_not_found", + RuntimeError::LimitTooLarge { .. } => "limit_too_large", + RuntimeError::InvalidRequest(_) => "invalid_request", + RuntimeError::StoreIo { .. } => "store_io", + RuntimeError::StoreMissing { .. } => "store_missing", + RuntimeError::StoreCorrupt { .. } => "store_corrupt", + RuntimeError::StatePoisoned => "state_poisoned", + } +} + +/// Errors raised while building or serving the Runtime REST process API. +#[derive(Debug, thiserror::Error)] +pub enum RuntimeHttpServerError { + #[error(transparent)] + Runtime(#[from] RuntimeError), + #[error("Runtime HTTP server I/O failed: {0}")] + Io(#[from] std::io::Error), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::{CapabilityRequest, ProfileSelector, WorkerIntent}; + use axum::body::to_bytes; + use axum::http::Method; + use tower::ServiceExt; + + fn task_request(objective: &str) -> CreateWorkerRequest { + CreateWorkerRequest { + intent: WorkerIntent::Task { + objective: objective.to_string(), + }, + profile: ProfileSelector::Builtin("builtin:coder".to_string()), + config_bundle: None, + requested_capabilities: vec![CapabilityRequest::named("read")], + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + } + } + + async fn json_request( + app: Router, + method: Method, + uri: &str, + body: &T, + ) -> axum::response::Response { + app.oneshot( + Request::builder() + .method(method) + .uri(uri) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(body).unwrap())) + .unwrap(), + ) + .await + .unwrap() + } + + async fn empty_request(app: Router, method: Method, uri: &str) -> axum::response::Response { + app.oneshot( + Request::builder() + .method(method) + .uri(uri) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap() + } + + async fn read_json Deserialize<'de>>(response: Response) -> T { + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + serde_json::from_slice(&body).unwrap() + } + + #[tokio::test] + async fn rest_command_api_delegates_to_runtime() { + let runtime = Runtime::new_memory(); + let app = runtime_http_router(runtime.clone(), None); + + let response = json_request( + app.clone(), + Method::POST, + "/v1/workers", + &task_request("rest"), + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let created: RuntimeHttpWorkerResponse = read_json(response).await; + assert_eq!( + created.worker.worker_ref.runtime_id, + runtime.runtime_id().unwrap() + ); + + let input = WorkerInput::user("hello from backend"); + let response = json_request( + app.clone(), + Method::POST, + &format!("/v1/workers/{}/input", created.worker.worker_id), + &input, + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let input_ack: RuntimeHttpWorkerInputResponse = read_json(response).await; + assert_eq!(input_ack.ack.transcript_sequence, 1); + + let response = empty_request( + app.clone(), + Method::GET, + &format!("/v1/workers/{}", created.worker.worker_id), + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let detail: RuntimeHttpWorkerResponse = read_json(response).await; + assert_eq!(detail.worker.transcript_len, 1); + + let response = empty_request( + app.clone(), + Method::GET, + &format!( + "/v1/workers/{}/transcript?start=0&limit=1", + created.worker.worker_id + ), + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let transcript: RuntimeHttpTranscriptResponse = read_json(response).await; + assert_eq!(transcript.transcript.items[0].content, "hello from backend"); + + let response = empty_request( + app.clone(), + Method::POST, + &format!("/v1/workers/{}/stop", created.worker.worker_id), + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let stop: RuntimeHttpWorkerLifecycleResponse = read_json(response).await; + assert_eq!(stop.ack.worker_ref, created.worker.worker_ref); + + let response = empty_request( + app.clone(), + Method::POST, + &format!("/v1/workers/{}/cancel", created.worker.worker_id), + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let cancel: RuntimeHttpWorkerLifecycleResponse = read_json(response).await; + assert_eq!(cancel.ack.worker_ref, created.worker.worker_ref); + + let response = empty_request(app.clone(), Method::GET, "/v1/workers").await; + assert_eq!(response.status(), StatusCode::OK); + let workers: RuntimeHttpWorkersResponse = read_json(response).await; + assert_eq!(workers.workers.len(), 1); + assert_eq!(workers.workers[0].transcript_len, 1); + + let response = empty_request(app, Method::GET, "/v1/runtime").await; + assert_eq!(response.status(), StatusCode::OK); + let summary: RuntimeHttpSummaryResponse = read_json(response).await; + assert_eq!(summary.runtime.worker_count, 1); + assert_eq!(summary.runtime.stopped_worker_count, 1); + } + + #[tokio::test] + async fn local_token_placeholder_rejects_missing_bearer_token() { + let app = runtime_http_router(Runtime::new_memory(), Some("local-token".to_string())); + + let response = empty_request(app.clone(), Method::GET, "/v1/runtime").await; + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + let error: RuntimeHttpErrorResponse = read_json(response).await; + assert_eq!(error.error.code, "unauthorized"); + + let response = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/v1/runtime") + .header(header::AUTHORIZATION, "Bearer local-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn runtime_errors_use_typed_rest_error_shape() { + let app = runtime_http_router(Runtime::new_memory(), None); + let response = empty_request(app, Method::GET, "/v1/workers/worker-missing").await; + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let error: RuntimeHttpErrorResponse = read_json(response).await; + assert_eq!(error.error.code, "worker_not_found"); + assert!(error.error.message.contains("worker-missing")); + } +} diff --git a/crates/worker-runtime/src/lib.rs b/crates/worker-runtime/src/lib.rs index 5b3b844f..7864d534 100644 --- a/crates/worker-runtime/src/lib.rs +++ b/crates/worker-runtime/src/lib.rs @@ -1,16 +1,19 @@ //! Embedded Runtime domain API for Worker management. //! -//! `worker-runtime` intentionally stays independent from HTTP/WebSocket servers, +//! `worker-runtime` keeps its core independent from HTTP/WebSocket servers, //! provider execution, and the existing Worker host. Filesystem persistence is -//! available only through the optional `fs-store` feature. The crate defines the -//! in-process Runtime authority surface that higher layers can later adapt into -//! registries or web APIs. +//! available only through the optional `fs-store` feature, and the minimal REST +//! process adapter is available only through the optional `http-server` feature. +//! The crate defines the in-process Runtime authority surface that higher layers +//! can later adapt into registries or backend APIs. pub mod catalog; pub mod diagnostics; pub mod error; #[cfg(feature = "fs-store")] pub mod fs_store; +#[cfg(feature = "http-server")] +pub mod http_server; pub mod identity; pub mod interaction; pub mod management; diff --git a/package.nix b/package.nix index 61ccfcfc..c7d0e37d 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-PFh+ZgmktkpeLRnIDLsxdT2QcA/j5rcJzkq7A9B6E44="; + cargoHash = "sha256-dv2MrgL0IB+ZisZQ9QnA0kdvKJtzEm0pKUpvofgqSB8="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint, From d0db32fa6acb6efb5269de9793a63de92ef9eb4a Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 26 Jun 2026 12:20:37 +0900 Subject: [PATCH 2/2] fix: add worker runtime REST process binary --- crates/worker-runtime/Cargo.toml | 5 + crates/worker-runtime/src/main.rs | 346 ++++++++++++++++++++++++++++++ 2 files changed, 351 insertions(+) create mode 100644 crates/worker-runtime/src/main.rs diff --git a/crates/worker-runtime/Cargo.toml b/crates/worker-runtime/Cargo.toml index 2cbeefd7..d8588ab2 100644 --- a/crates/worker-runtime/Cargo.toml +++ b/crates/worker-runtime/Cargo.toml @@ -5,6 +5,11 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[[bin]] +name = "worker-runtime-rest-server" +path = "src/main.rs" +required-features = ["http-server"] + [features] default = [] fs-store = ["dep:serde_json"] diff --git a/crates/worker-runtime/src/main.rs b/crates/worker-runtime/src/main.rs new file mode 100644 index 00000000..8b511d58 --- /dev/null +++ b/crates/worker-runtime/src/main.rs @@ -0,0 +1,346 @@ +//! Minimal Runtime REST process wrapper. +//! +//! This binary is available only when the `http-server` feature is enabled. It +//! starts a Runtime-local command API intended for a trusted backend/proxy; +//! browsers must not connect to this Runtime process directly. + +use std::collections::VecDeque; +use std::env; +use std::error::Error; +use std::fmt; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::ExitCode; + +use worker_runtime::http_server::{ + RuntimeHttpServer, RuntimeHttpServerConfig, RuntimeHttpServerError, RuntimeHttpStoreSelection, +}; +use worker_runtime::identity::RuntimeId; + +fn main() -> ExitCode { + match run() { + Ok(()) => ExitCode::SUCCESS, + Err(error) => { + eprintln!("worker-runtime-rest-server: {error}"); + if let ProcessError::Usage(_) = error { + eprintln!(); + eprintln!("{}", usage()); + ExitCode::from(2) + } else { + ExitCode::FAILURE + } + } + } +} + +fn run() -> Result<(), ProcessError> { + let Some(config) = parse_args(env::args().skip(1))? else { + println!("{}", usage()); + return Ok(()); + }; + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_io() + .build()?; + runtime.block_on(async move { + let server = RuntimeHttpServer::bind(config).await?; + let local_addr = server.local_addr()?; + eprintln!( + "worker-runtime REST server listening on {local_addr}; intended client is a trusted backend/proxy, not a browser" + ); + server.serve().await + })?; + Ok(()) +} + +fn parse_args(args: I) -> Result, ProcessError> +where + I: IntoIterator, + S: Into, +{ + let mut config = RuntimeHttpServerConfig::default(); + let mut store = StoreArg::Memory; + let mut args = args.into_iter().map(Into::into).collect::>(); + + while let Some(arg) = args.pop_front() { + let (flag, inline_value) = split_flag_value(arg)?; + match flag.as_str() { + "--help" | "-h" => return Ok(None), + "--bind" => { + let value = take_value(&flag, inline_value, &mut args)?; + config.bind_addr = value.parse::().map_err(|error| { + ProcessError::usage(format!("invalid --bind socket address `{value}`: {error}")) + })?; + } + "--runtime-id" => { + let value = take_value(&flag, inline_value, &mut args)?; + config.runtime_id = Some(RuntimeId::new(value).ok_or_else(|| { + ProcessError::usage("--runtime-id must not be empty".to_string()) + })?); + } + "--display-name" => { + config.display_name = Some(take_value(&flag, inline_value, &mut args)?); + } + "--store" => { + let value = take_value(&flag, inline_value, &mut args)?; + store = match value.as_str() { + "memory" => StoreArg::Memory, + "fs" | "fs-store" => StoreArg::Fs { root: None }, + _ => { + return Err(ProcessError::usage(format!( + "unsupported --store `{value}`; expected `memory` or `fs`" + ))); + } + }; + } + "--fs-root" => { + let value = take_value(&flag, inline_value, &mut args)?; + store = StoreArg::Fs { + root: Some(PathBuf::from(value)), + }; + } + "--local-token" => { + let value = take_value(&flag, inline_value, &mut args)?; + if value.is_empty() { + return Err(ProcessError::usage( + "--local-token must not be empty when provided".to_string(), + )); + } + config.local_token = Some(value); + } + "--local-token-env" => { + let name = take_value(&flag, inline_value, &mut args)?; + let value = env::var(&name).map_err(|error| { + ProcessError::usage(format!( + "failed to read --local-token-env `{name}`: {error}" + )) + })?; + if value.is_empty() { + return Err(ProcessError::usage(format!( + "--local-token-env `{name}` resolved to an empty value" + ))); + } + config.local_token = Some(value); + } + "--max-transcript-projection-items" => { + config.limits.max_transcript_projection_items = + parse_usize_flag(&flag, take_value(&flag, inline_value, &mut args)?)?; + } + "--max-event-batch-items" => { + config.limits.max_event_batch_items = + parse_usize_flag(&flag, take_value(&flag, inline_value, &mut args)?)?; + } + _ => { + return Err(ProcessError::usage(format!("unknown argument `{flag}`"))); + } + } + } + + apply_store_selection(&mut config, store)?; + Ok(Some(config)) +} + +fn split_flag_value(arg: String) -> Result<(String, Option), ProcessError> { + if !arg.starts_with('-') { + return Err(ProcessError::usage(format!( + "unexpected positional argument `{arg}`" + ))); + } + if let Some((flag, value)) = arg.split_once('=') { + Ok((flag.to_string(), Some(value.to_string()))) + } else { + Ok((arg, None)) + } +} + +fn take_value( + flag: &str, + inline_value: Option, + args: &mut VecDeque, +) -> Result { + if let Some(value) = inline_value { + return Ok(value); + } + args.pop_front() + .ok_or_else(|| ProcessError::usage(format!("{flag} requires a value"))) +} + +fn parse_usize_flag(flag: &str, value: String) -> Result { + value + .parse::() + .map_err(|error| ProcessError::usage(format!("invalid {flag} value `{value}`: {error}"))) +} + +fn apply_store_selection( + config: &mut RuntimeHttpServerConfig, + store: StoreArg, +) -> Result<(), ProcessError> { + match store { + StoreArg::Memory => { + config.store = RuntimeHttpStoreSelection::Memory; + Ok(()) + } + StoreArg::Fs { root } => apply_fs_store_selection(config, root), + } +} + +#[cfg(feature = "fs-store")] +fn apply_fs_store_selection( + config: &mut RuntimeHttpServerConfig, + root: Option, +) -> Result<(), ProcessError> { + let root = root + .ok_or_else(|| ProcessError::usage("--store fs requires --fs-root ".to_string()))?; + config.store = RuntimeHttpStoreSelection::Fs { root }; + Ok(()) +} + +#[cfg(not(feature = "fs-store"))] +fn apply_fs_store_selection( + _config: &mut RuntimeHttpServerConfig, + root: Option, +) -> Result<(), ProcessError> { + let _ = root; + Err(ProcessError::usage( + "fs store selection requires building worker-runtime with features `http-server,fs-store`" + .to_string(), + )) +} + +#[derive(Clone, Debug, PartialEq, Eq)] +enum StoreArg { + Memory, + Fs { root: Option }, +} + +#[derive(Debug)] +enum ProcessError { + Usage(String), + Server(RuntimeHttpServerError), + Io(std::io::Error), +} + +impl ProcessError { + fn usage(message: String) -> Self { + Self::Usage(message) + } +} + +impl fmt::Display for ProcessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Usage(message) => message.fmt(f), + Self::Server(error) => error.fmt(f), + Self::Io(error) => error.fmt(f), + } + } +} + +impl Error for ProcessError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Usage(_) => None, + Self::Server(error) => Some(error), + Self::Io(error) => Some(error), + } + } +} + +impl From for ProcessError { + fn from(error: RuntimeHttpServerError) -> Self { + Self::Server(error) + } +} + +impl From for ProcessError { + fn from(error: std::io::Error) -> Self { + Self::Io(error) + } +} + +fn usage() -> &'static str { + "Usage: worker-runtime-rest-server [OPTIONS]\n\n\ +Starts the worker-runtime REST command API for a trusted backend/proxy.\n\ +Browsers must not connect to this Runtime process directly.\n\n\ +Options:\n\ + --bind Bind socket address (default: 127.0.0.1:0)\n\ + --runtime-id Runtime authority id (default: generated)\n\ + --display-name Runtime display name\n\ + --store Store selection (default: memory)\n\ + --fs-root Filesystem store root; requires fs-store feature\n\ + --local-token Minimal local bearer token placeholder\n\ + --local-token-env Read local bearer token placeholder from env\n\ + --max-transcript-projection-items Override transcript projection limit\n\ + --max-event-batch-items Override event batch limit\n\ + -h, --help Show this help" +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_memory_runtime_process_config() { + let config = parse_args([ + "--bind", + "127.0.0.1:48181", + "--runtime-id=runtime-review", + "--display-name", + "review runtime", + "--store", + "memory", + "--local-token", + "local-placeholder", + "--max-transcript-projection-items", + "32", + "--max-event-batch-items=16", + ]) + .unwrap() + .unwrap(); + + assert_eq!( + config.bind_addr, + "127.0.0.1:48181".parse::().unwrap() + ); + assert_eq!( + config.runtime_id.as_ref().map(RuntimeId::as_str), + Some("runtime-review") + ); + assert_eq!(config.display_name.as_deref(), Some("review runtime")); + assert!(matches!(config.store, RuntimeHttpStoreSelection::Memory)); + assert_eq!(config.local_token.as_deref(), Some("local-placeholder")); + assert_eq!(config.limits.max_transcript_projection_items, 32); + assert_eq!(config.limits.max_event_batch_items, 16); + } + + #[cfg(feature = "fs-store")] + #[test] + fn parses_fs_store_runtime_process_config_when_feature_enabled() { + let config = parse_args(["--fs-root", "/tmp/yoi-worker-runtime-store"]) + .unwrap() + .unwrap(); + + assert!(matches!( + config.store, + RuntimeHttpStoreSelection::Fs { ref root } + if root == &PathBuf::from("/tmp/yoi-worker-runtime-store") + )); + } + + #[cfg(not(feature = "fs-store"))] + #[test] + fn rejects_fs_store_runtime_process_config_without_feature() { + let error = parse_args(["--store", "fs", "--fs-root", "/tmp/store"]).unwrap_err(); + + assert!( + error + .to_string() + .contains("requires building worker-runtime with features") + ); + } + + #[test] + fn help_does_not_start_server() { + assert!(parse_args(["--help"]).unwrap().is_none()); + } +}