workspace: respect remote execution projection
This commit is contained in:
parent
c29d10b67b
commit
ba7f9d2ee8
|
|
@ -16,7 +16,7 @@ use worker_runtime::config_bundle::{
|
|||
ConfigBundleSummary, ConfigProfileDescriptor,
|
||||
};
|
||||
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
|
||||
use worker_runtime::execution::WorkerExecutionRunState;
|
||||
use worker_runtime::execution::{WorkerExecutionRunState, WorkerExecutionStatus};
|
||||
use worker_runtime::http_server::{
|
||||
RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest,
|
||||
RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse,
|
||||
|
|
@ -966,11 +966,7 @@ impl EmbeddedWorkerRuntime {
|
|||
status: EmbeddedWorkerStatus,
|
||||
execution: &worker_runtime::execution::WorkerExecutionStatus,
|
||||
) -> bool {
|
||||
self.execution_enabled
|
||||
&& status == EmbeddedWorkerStatus::Running
|
||||
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
|
||||
&& execution.run_state == WorkerExecutionRunState::Idle
|
||||
&& !execution_last_result_blocks_control(execution)
|
||||
runtime_worker_can_accept_input(self.execution_enabled, status, execution)
|
||||
}
|
||||
|
||||
fn can_stop_embedded_worker(
|
||||
|
|
@ -978,16 +974,7 @@ impl EmbeddedWorkerRuntime {
|
|||
status: EmbeddedWorkerStatus,
|
||||
execution: &worker_runtime::execution::WorkerExecutionStatus,
|
||||
) -> bool {
|
||||
self.execution_enabled
|
||||
&& status == EmbeddedWorkerStatus::Running
|
||||
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
|
||||
&& !matches!(
|
||||
execution.run_state,
|
||||
WorkerExecutionRunState::Rejected
|
||||
| WorkerExecutionRunState::Errored
|
||||
| WorkerExecutionRunState::Unconnected
|
||||
)
|
||||
&& !execution_last_result_blocks_control(execution)
|
||||
runtime_worker_can_stop(self.execution_enabled, status, execution)
|
||||
}
|
||||
|
||||
fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary {
|
||||
|
|
@ -1003,7 +990,7 @@ impl EmbeddedWorkerRuntime {
|
|||
identity: "runtime_registry_worker".to_string(),
|
||||
},
|
||||
state: embedded_worker_status_label(summary.status).to_string(),
|
||||
status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state)
|
||||
status: embedded_worker_execution_status_label(summary.status, &summary.execution)
|
||||
.to_string(),
|
||||
last_seen_at: None,
|
||||
implementation: WorkerImplementationSummary {
|
||||
|
|
@ -1036,7 +1023,7 @@ impl EmbeddedWorkerRuntime {
|
|||
identity: "runtime_registry_worker".to_string(),
|
||||
},
|
||||
state: embedded_worker_status_label(detail.status).to_string(),
|
||||
status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state)
|
||||
status: embedded_worker_execution_status_label(detail.status, &detail.execution)
|
||||
.to_string(),
|
||||
last_seen_at: None,
|
||||
implementation: WorkerImplementationSummary {
|
||||
|
|
@ -1693,15 +1680,16 @@ impl RemoteWorkerRuntime {
|
|||
identity: "runtime_registry_worker".to_string(),
|
||||
},
|
||||
state: embedded_worker_status_label(summary.status).to_string(),
|
||||
status: embedded_worker_status_label(summary.status).to_string(),
|
||||
status: embedded_worker_execution_status_label(summary.status, &summary.execution)
|
||||
.to_string(),
|
||||
last_seen_at: None,
|
||||
implementation: WorkerImplementationSummary {
|
||||
kind: "remote_worker_runtime".to_string(),
|
||||
display_hint: "Backend-proxied remote worker-runtime Worker".to_string(),
|
||||
},
|
||||
capabilities: WorkerCapabilitySummary {
|
||||
can_accept_input: true,
|
||||
can_stop: true,
|
||||
can_accept_input: runtime_worker_can_accept_input(true, summary.status, &summary.execution),
|
||||
can_stop: runtime_worker_can_stop(true, summary.status, &summary.execution),
|
||||
can_spawn_followup: false,
|
||||
},
|
||||
diagnostics: vec![diagnostic(
|
||||
|
|
@ -1725,15 +1713,16 @@ impl RemoteWorkerRuntime {
|
|||
identity: "runtime_registry_worker".to_string(),
|
||||
},
|
||||
state: embedded_worker_status_label(detail.status).to_string(),
|
||||
status: embedded_worker_status_label(detail.status).to_string(),
|
||||
status: embedded_worker_execution_status_label(detail.status, &detail.execution)
|
||||
.to_string(),
|
||||
last_seen_at: None,
|
||||
implementation: WorkerImplementationSummary {
|
||||
kind: "remote_worker_runtime".to_string(),
|
||||
display_hint: "Backend-proxied remote worker-runtime Worker".to_string(),
|
||||
},
|
||||
capabilities: WorkerCapabilitySummary {
|
||||
can_accept_input: true,
|
||||
can_stop: true,
|
||||
can_accept_input: runtime_worker_can_accept_input(true, detail.status, &detail.execution),
|
||||
can_stop: runtime_worker_can_stop(true, detail.status, &detail.execution),
|
||||
can_spawn_followup: false,
|
||||
},
|
||||
diagnostics: vec![diagnostic(
|
||||
|
|
@ -2114,9 +2103,37 @@ fn embedded_spawn_execution_failure_diagnostic(
|
|||
))
|
||||
}
|
||||
|
||||
fn execution_last_result_blocks_control(
|
||||
execution: &worker_runtime::execution::WorkerExecutionStatus,
|
||||
fn runtime_worker_can_accept_input(
|
||||
execution_enabled: bool,
|
||||
status: EmbeddedWorkerStatus,
|
||||
execution: &WorkerExecutionStatus,
|
||||
) -> bool {
|
||||
execution_enabled
|
||||
&& status == EmbeddedWorkerStatus::Running
|
||||
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
|
||||
&& execution.run_state == WorkerExecutionRunState::Idle
|
||||
&& !execution_last_result_blocks_control(execution)
|
||||
}
|
||||
|
||||
fn runtime_worker_can_stop(
|
||||
execution_enabled: bool,
|
||||
status: EmbeddedWorkerStatus,
|
||||
execution: &WorkerExecutionStatus,
|
||||
) -> bool {
|
||||
execution_enabled
|
||||
&& status == EmbeddedWorkerStatus::Running
|
||||
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
|
||||
&& !matches!(
|
||||
execution.run_state,
|
||||
WorkerExecutionRunState::Stopped
|
||||
| WorkerExecutionRunState::Rejected
|
||||
| WorkerExecutionRunState::Errored
|
||||
| WorkerExecutionRunState::Unconnected
|
||||
)
|
||||
&& !execution_last_result_blocks_control(execution)
|
||||
}
|
||||
|
||||
fn execution_last_result_blocks_control(execution: &WorkerExecutionStatus) -> bool {
|
||||
execution.last_result.as_ref().is_some_and(|result| {
|
||||
matches!(
|
||||
result.outcome,
|
||||
|
|
@ -2137,19 +2154,24 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str {
|
|||
|
||||
fn embedded_worker_execution_status_label(
|
||||
status: EmbeddedWorkerStatus,
|
||||
run_state: WorkerExecutionRunState,
|
||||
execution: &WorkerExecutionStatus,
|
||||
) -> &'static str {
|
||||
match status {
|
||||
EmbeddedWorkerStatus::Stopped => "stopped",
|
||||
EmbeddedWorkerStatus::Cancelled => "cancelled",
|
||||
EmbeddedWorkerStatus::Running => match run_state {
|
||||
EmbeddedWorkerStatus::Running => {
|
||||
if execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Stale {
|
||||
return "stale";
|
||||
}
|
||||
match execution.run_state {
|
||||
WorkerExecutionRunState::Idle => "idle",
|
||||
WorkerExecutionRunState::Busy => "running",
|
||||
WorkerExecutionRunState::Stopped => "stopped",
|
||||
WorkerExecutionRunState::Rejected => "rejected",
|
||||
WorkerExecutionRunState::Errored => "errored",
|
||||
WorkerExecutionRunState::Unconnected => "unconnected",
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -3327,6 +3349,8 @@ mod tests {
|
|||
workers.items[0].workspace.identity,
|
||||
"runtime_registry_worker"
|
||||
);
|
||||
assert!(workers.items[0].capabilities.can_accept_input);
|
||||
assert!(workers.items[0].capabilities.can_stop);
|
||||
|
||||
let input = registry
|
||||
.send_input(
|
||||
|
|
@ -3356,6 +3380,101 @@ mod tests {
|
|||
assert!(browser_payload.contains("worker_id"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_runtime_projection_blocks_stale_and_unconnected_execution_input() {
|
||||
let (base_url, server) = serve_mock_http(vec![
|
||||
mock_response(
|
||||
"GET",
|
||||
"/v1/workers",
|
||||
true,
|
||||
200,
|
||||
json!({
|
||||
"workers": [
|
||||
worker_json_with_execution(
|
||||
"embedded-worker-runtime",
|
||||
"worker-stale",
|
||||
"stale",
|
||||
"unconnected",
|
||||
None,
|
||||
),
|
||||
worker_json_with_execution(
|
||||
"embedded-worker-runtime",
|
||||
"worker-unconnected",
|
||||
"unconnected",
|
||||
"unconnected",
|
||||
None,
|
||||
),
|
||||
worker_json_with_execution(
|
||||
"embedded-worker-runtime",
|
||||
"worker-rejected",
|
||||
"connected",
|
||||
"rejected",
|
||||
Some("rejected"),
|
||||
),
|
||||
worker_json_with_execution(
|
||||
"embedded-worker-runtime",
|
||||
"worker-errored",
|
||||
"connected",
|
||||
"errored",
|
||||
Some("errored"),
|
||||
)
|
||||
]
|
||||
})
|
||||
.to_string(),
|
||||
),
|
||||
mock_response(
|
||||
"GET",
|
||||
"/v1/workers/worker-stale",
|
||||
true,
|
||||
200,
|
||||
json!({
|
||||
"worker": worker_json_with_execution(
|
||||
"embedded-worker-runtime",
|
||||
"worker-stale",
|
||||
"stale",
|
||||
"unconnected",
|
||||
None,
|
||||
)})
|
||||
.to_string(),
|
||||
),
|
||||
]);
|
||||
let registry = RuntimeRegistry::new(vec![Arc::new(
|
||||
RemoteWorkerRuntime::new(RemoteRuntimeConfig::new(
|
||||
"remote:primary",
|
||||
"Remote Primary",
|
||||
base_url,
|
||||
Some("secret-token-do-not-leak".to_string()),
|
||||
))
|
||||
.unwrap(),
|
||||
)]);
|
||||
|
||||
let workers = registry.list_workers(10);
|
||||
assert_eq!(workers.items.len(), 4);
|
||||
for worker in &workers.items {
|
||||
assert!(
|
||||
!worker.capabilities.can_accept_input,
|
||||
"{} should not be input-capable",
|
||||
worker.worker_id
|
||||
);
|
||||
assert!(
|
||||
!worker.capabilities.can_stop,
|
||||
"{} should not be stoppable",
|
||||
worker.worker_id
|
||||
);
|
||||
}
|
||||
assert_eq!(workers.items[0].status, "stale");
|
||||
assert_eq!(workers.items[1].status, "unconnected");
|
||||
assert_eq!(workers.items[2].status, "rejected");
|
||||
assert_eq!(workers.items[3].status, "errored");
|
||||
|
||||
let stale_detail = registry.worker("remote:primary", "worker-stale").unwrap();
|
||||
assert!(!stale_detail.capabilities.can_accept_input);
|
||||
assert!(!stale_detail.capabilities.can_stop);
|
||||
assert_eq!(stale_detail.status, "stale");
|
||||
|
||||
server.join().expect("mock remote server finished");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_config_bundle_sync_and_check_diagnostics_are_sanitized_and_path_safe() {
|
||||
let leaked_store_path = "/var/lib/yoi/runtime/bundles/bundle-1.json";
|
||||
|
|
@ -3524,12 +3643,30 @@ mod tests {
|
|||
}
|
||||
|
||||
fn worker_json(runtime_id: &str, worker_id: &str) -> serde_json::Value {
|
||||
worker_json_with_execution(runtime_id, worker_id, "connected", "idle", None)
|
||||
}
|
||||
|
||||
fn worker_json_with_execution(
|
||||
runtime_id: &str,
|
||||
worker_id: &str,
|
||||
backend: &str,
|
||||
run_state: &str,
|
||||
last_outcome: Option<&str>,
|
||||
) -> serde_json::Value {
|
||||
let last_result = last_outcome.map(|outcome| {
|
||||
json!({
|
||||
"operation": "input",
|
||||
"outcome": outcome,
|
||||
"run_state": run_state,
|
||||
"message": format!("{outcome} result")
|
||||
})
|
||||
});
|
||||
json!({
|
||||
"worker_ref": { "runtime_id": runtime_id, "worker_id": worker_id },
|
||||
"runtime_id": runtime_id,
|
||||
"worker_id": worker_id,
|
||||
"status": "running",
|
||||
"execution": { "backend": "connected", "run_state": "idle" },
|
||||
"execution": { "backend": backend, "run_state": run_state, "last_result": last_result },
|
||||
"intent": { "kind": "role", "role": "coder", "purpose": "remote test" },
|
||||
"profile": { "kind": "builtin", "value": "coder" },
|
||||
"config_bundle": { "id": "remote-bundle", "digest": "remote-digest" },
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user