fix: make worker runtime ids resolvable

This commit is contained in:
Keisuke Hirata 2026-06-24 20:08:23 +09:00
parent 9bd1550715
commit 38d25582b2
No known key found for this signature in database
4 changed files with 199 additions and 67 deletions

1
Cargo.lock generated
View File

@ -6058,6 +6058,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"sha2 0.11.0",
"tempfile", "tempfile",
"thiserror 2.0.18", "thiserror 2.0.18",
"ticket", "ticket",

View File

@ -16,6 +16,7 @@ rusqlite.workspace = true
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true serde_json.workspace = true
serde_yaml.workspace = true serde_yaml.workspace = true
sha2.workspace = true
thiserror.workspace = true thiserror.workspace = true
ticket.workspace = true ticket.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] } tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] }

View File

@ -3,6 +3,7 @@ use chrono::Utc;
use pod_store::PodMetadata; use pod_store::PodMetadata;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use sha2::{Digest, Sha256};
use std::{ use std::{
collections::BTreeSet, collections::BTreeSet,
fs, fs,
@ -15,6 +16,7 @@ const LOCAL_HOST_KIND: &str = "local-pod-host";
const MAX_DIAGNOSTICS: usize = 16; const MAX_DIAGNOSTICS: usize = 16;
const MAX_HOST_SCAN: usize = 256; const MAX_HOST_SCAN: usize = 256;
const MAX_IDENTIFIER_LEN: usize = 120; const MAX_IDENTIFIER_LEN: usize = 120;
const ID_DIGEST_HEX_LEN: usize = 16;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RuntimeDiagnostic { pub struct RuntimeDiagnostic {
@ -448,6 +450,31 @@ impl LocalPodRuntime {
self.data_dir.as_ref().map(|dir| dir.join("pods")) self.data_dir.as_ref().map(|dir| dir.join("pods"))
} }
fn pod_names(&self, pod_root: &Path) -> Result<BTreeSet<String>, RuntimeDiagnostic> {
let entries = fs::read_dir(pod_root).map_err(|err| {
diagnostic(
"local_pod_registry_unreadable",
DiagnosticSeverity::Warning,
format!("local Pod registry could not be read: {err}"),
)
})?;
let mut pod_names = BTreeSet::new();
for entry in entries.flatten() {
let Ok(file_type) = entry.file_type() else {
continue;
};
if !file_type.is_dir() {
continue;
}
let Some(name) = entry.file_name().to_str().map(|name| name.to_string()) else {
continue;
};
pod_names.insert(name);
}
Ok(pod_names)
}
fn read_worker(&self, pod_root: &Path, pod_name: &str) -> WorkerReadOutcome { fn read_worker(&self, pod_root: &Path, pod_name: &str) -> WorkerReadOutcome {
let metadata_path = pod_root.join(pod_name).join("metadata.json"); let metadata_path = pod_root.join(pod_name).join("metadata.json");
let data = match fs::read_to_string(metadata_path) { let data = match fs::read_to_string(metadata_path) {
@ -618,34 +645,11 @@ impl WorkspaceWorkerRuntime for LocalPodRuntime {
let mut workers = Vec::new(); let mut workers = Vec::new();
let mut diagnostics = Vec::new(); let mut diagnostics = Vec::new();
let entries = match fs::read_dir(&pod_root) { let pod_names = match self.pod_names(&pod_root) {
Ok(entries) => entries, Ok(pod_names) => pod_names,
Err(err) => { Err(diag) => return RuntimeList::new(Vec::new(), vec![diag]),
return RuntimeList::new(
Vec::new(),
vec![diagnostic(
"local_pod_registry_unreadable",
DiagnosticSeverity::Warning,
format!("local Pod registry could not be read: {err}"),
)],
);
}
}; };
let mut pod_names = BTreeSet::new();
for entry in entries.flatten() {
let Ok(file_type) = entry.file_type() else {
continue;
};
if !file_type.is_dir() {
continue;
}
let Some(name) = entry.file_name().to_str().map(|name| name.to_string()) else {
continue;
};
pod_names.insert(name);
}
for pod_name in pod_names { for pod_name in pod_names {
if workers.len() >= limit { if workers.len() >= limit {
break; break;
@ -663,22 +667,43 @@ impl WorkspaceWorkerRuntime for LocalPodRuntime {
} }
fn worker(&self, worker_id: &str) -> WorkerLookupResult { fn worker(&self, worker_id: &str) -> WorkerLookupResult {
let worker = worker_id let Some(pod_root) = self.pod_root() else {
.strip_prefix("local-pod-") return WorkerLookupResult {
.and_then(|pod_name| { worker: None,
self.pod_root() diagnostics: vec![diagnostic(
.map(|pod_root| (pod_root, pod_name.to_string())) "local_pod_registry_unavailable",
}) DiagnosticSeverity::Warning,
.and_then( "local Pod data directory is not configured; worker discovery is unavailable"
|(pod_root, pod_name)| match self.read_worker(&pod_root, &pod_name) { .to_string(),
WorkerReadOutcome::Worker(worker) if worker.worker_id == worker_id => { )],
Some(worker) };
} };
_ => None, let pod_names = match self.pod_names(&pod_root) {
Ok(pod_names) => pod_names,
Err(diag) => {
return WorkerLookupResult {
worker: None,
diagnostics: vec![diag],
};
}
};
for pod_name in pod_names {
if worker_id_for_pod(&pod_name) != worker_id {
continue;
}
return match self.read_worker(&pod_root, &pod_name) {
WorkerReadOutcome::Worker(worker) => WorkerLookupResult {
worker: Some(worker),
diagnostics: Vec::new(),
}, },
); WorkerReadOutcome::Diagnostic(diag) => WorkerLookupResult {
worker: None,
diagnostics: vec![diag],
},
};
}
WorkerLookupResult { WorkerLookupResult {
worker, worker: None,
diagnostics: Vec::new(), diagnostics: Vec::new(),
} }
} }
@ -733,14 +758,47 @@ fn diagnostic(
} }
fn host_id_for_workspace(workspace_id: &str) -> String { fn host_id_for_workspace(workspace_id: &str) -> String {
format!("local-{}", sanitize_identifier(workspace_id)) bounded_backend_identifier("local-", workspace_id)
} }
fn worker_id_for_pod(pod_name: &str) -> String { fn worker_id_for_pod(pod_name: &str) -> String {
format!("local-pod-{}", sanitize_identifier(pod_name)) bounded_backend_identifier("local-pod-", pod_name)
} }
fn sanitize_identifier(value: &str) -> String { fn bounded_backend_identifier(prefix: &str, value: &str) -> String {
let digest = digest_hex(value.as_bytes(), ID_DIGEST_HEX_LEN);
let mut body = sanitize_identifier_body(value);
if body.is_empty() {
body = "id".to_string();
}
let suffix_len = 1 + ID_DIGEST_HEX_LEN;
let body_budget = MAX_IDENTIFIER_LEN
.saturating_sub(prefix.len())
.saturating_sub(suffix_len)
.max(1);
if body.len() > body_budget {
body.truncate(body_budget);
body = body.trim_matches('-').to_string();
if body.is_empty() {
body = "id".to_string();
}
}
let mut id = format!("{prefix}{body}-{digest}");
if id.len() > MAX_IDENTIFIER_LEN {
let digest_suffix = format!("-{digest}");
let prefix_budget = MAX_IDENTIFIER_LEN.saturating_sub(digest_suffix.len());
id = format!(
"{}{}",
prefix.chars().take(prefix_budget).collect::<String>(),
digest_suffix
);
}
id
}
fn sanitize_identifier_body(value: &str) -> String {
let mut out = String::with_capacity(value.len()); let mut out = String::with_capacity(value.len());
for ch in value.chars() { for ch in value.chars() {
if ch.is_ascii_alphanumeric() { if ch.is_ascii_alphanumeric() {
@ -751,14 +809,20 @@ fn sanitize_identifier(value: &str) -> String {
out.push('-'); out.push('-');
} }
} }
let out = out.trim_matches('-').to_string(); out.trim_matches('-').to_string()
if out.is_empty() { }
"workspace".to_string()
} else if out.len() > MAX_IDENTIFIER_LEN { fn digest_hex(bytes: &[u8], hex_len: usize) -> String {
out.chars().take(MAX_IDENTIFIER_LEN).collect() let digest = Sha256::digest(bytes);
} else { let mut out = String::with_capacity(hex_len);
out for byte in digest {
if out.len() >= hex_len {
break;
}
out.push_str(&format!("{byte:02x}"));
} }
out.truncate(hex_len);
out
} }
fn validate_backend_identifier( fn validate_backend_identifier(
@ -917,6 +981,15 @@ mod tests {
metadata metadata
} }
fn assert_valid_generated_id(id: &str) {
assert!(id.len() <= MAX_IDENTIFIER_LEN, "id too long: {id}");
validate_backend_identifier("test_id", id).unwrap();
}
fn host_id() -> String {
host_id_for_workspace("local:test")
}
#[test] #[test]
fn local_runtime_reports_host_without_private_paths() { fn local_runtime_reports_host_without_private_paths() {
let bridge = LocalPodRuntime::new("local:test", "/workspace/project", None); let bridge = LocalPodRuntime::new("local:test", "/workspace/project", None);
@ -924,7 +997,8 @@ mod tests {
assert_eq!(hosts.items.len(), 1); assert_eq!(hosts.items.len(), 1);
let host = &hosts.items[0]; let host = &hosts.items[0];
assert_eq!(host.runtime_id, LOCAL_RUNTIME_ID); assert_eq!(host.runtime_id, LOCAL_RUNTIME_ID);
assert_eq!(host.host_id, "local-local-test"); assert_eq!(host.host_id, host_id());
assert_valid_generated_id(&host.host_id);
assert_eq!(host.capabilities.local_pod_inspection, "unavailable"); assert_eq!(host.capabilities.local_pod_inspection, "unavailable");
assert_eq!(host.capabilities.workspace_scope, "current_workspace"); assert_eq!(host.capabilities.workspace_scope, "current_workspace");
let json = serde_json::to_string(host).unwrap(); let json = serde_json::to_string(host).unwrap();
@ -944,17 +1018,19 @@ mod tests {
let runtimes = registry.list_runtimes(10); let runtimes = registry.list_runtimes(10);
assert_eq!(runtimes.items[0].runtime_id, LOCAL_RUNTIME_ID); assert_eq!(runtimes.items[0].runtime_id, LOCAL_RUNTIME_ID);
assert_eq!(runtimes.items[0].host_ids, vec!["local-local-test"]); assert_eq!(runtimes.items[0].host_ids, vec![host_id()]);
let hosts = registry.list_hosts(10); let hosts = registry.list_hosts(10);
assert_eq!(hosts.items[0].host_id, "local-local-test"); assert_eq!(hosts.items[0].host_id, host_id());
assert_valid_generated_id(&hosts.items[0].host_id);
let workers = registry.list_workers(10); let workers = registry.list_workers(10);
assert_eq!(workers.items.len(), 1); assert_eq!(workers.items.len(), 1);
let worker = &workers.items[0]; let worker = &workers.items[0];
assert_eq!(worker.runtime_id, LOCAL_RUNTIME_ID); assert_eq!(worker.runtime_id, LOCAL_RUNTIME_ID);
assert_eq!(worker.worker_id, "local-pod-coder"); assert_eq!(worker.worker_id, worker_id_for_pod("coder"));
assert_eq!(worker.host_id, "local-local-test"); assert_valid_generated_id(&worker.worker_id);
assert_eq!(worker.host_id, host_id());
assert_eq!(worker.workspace.visibility, "current_workspace"); assert_eq!(worker.workspace.visibility, "current_workspace");
assert_eq!(worker.implementation.display_hint, "coder"); assert_eq!(worker.implementation.display_hint, "coder");
let json = serde_json::to_string(worker).unwrap(); let json = serde_json::to_string(worker).unwrap();
@ -972,9 +1048,7 @@ mod tests {
Some(temp.path().to_path_buf()), Some(temp.path().to_path_buf()),
)); ));
let workers = registry let workers = registry.list_workers_for_host(&host_id(), 10).unwrap();
.list_workers_for_host("local-local-test", 10)
.unwrap();
assert_eq!(workers.items.len(), 1); assert_eq!(workers.items.len(), 1);
assert!(matches!( assert!(matches!(
registry.list_workers_for_host("../secret", 10), registry.list_workers_for_host("../secret", 10),
@ -1021,9 +1095,66 @@ mod tests {
"/workspace/project", "/workspace/project",
Some(temp.path().to_path_buf()), Some(temp.path().to_path_buf()),
); );
let worker = bridge.worker("local-pod-coder").worker.unwrap(); let worker_id = worker_id_for_pod("coder");
let worker = bridge.worker(&worker_id).worker.unwrap();
assert_eq!(worker.label, "coder"); assert_eq!(worker.label, "coder");
assert_eq!(worker.workspace.identity, "runtime_workspace"); assert_eq!(worker.workspace.identity, "runtime_workspace");
assert!(bridge.worker("local-pod-missing").worker.is_none()); assert!(
bridge
.worker(&worker_id_for_pod("missing"))
.worker
.is_none()
);
}
#[test]
fn generated_worker_ids_are_opaque_bounded_unique_and_resolvable() {
let temp = TempDir::new().unwrap();
let long_a = format!("{}-A", "TicketWorkerWithVeryLongName".repeat(8));
let long_b = format!("{}-B", "TicketWorkerWithVeryLongName".repeat(8));
let pod_names = vec![
"00001KVWECEQG-Coder.Pod".to_string(),
"foo.bar".to_string(),
"foo-bar".to_string(),
"Ticket#Worker@Reviewer".to_string(),
long_a,
long_b,
];
for pod_name in &pod_names {
write_metadata(temp.path(), pod_name, &metadata(Some("/workspace/project")));
}
let bridge = LocalPodRuntime::new(
"local:test",
"/workspace/project",
Some(temp.path().to_path_buf()),
);
let registry = WorkerRuntimeRegistry::for_local_pods(bridge.clone());
let listed = registry.list_workers(100);
assert_eq!(listed.items.len(), pod_names.len());
let mut ids = BTreeSet::new();
for worker in listed.items {
assert_valid_generated_id(&worker.worker_id);
assert!(
ids.insert(worker.worker_id.clone()),
"duplicate id: {}",
worker.worker_id
);
assert!(!worker.worker_id.contains('.'));
assert!(!worker.worker_id.contains('@'));
assert!(!worker.worker_id.contains('#'));
let from_registry = registry.worker(&worker.worker_id).unwrap();
assert_eq!(from_registry.worker_id, worker.worker_id);
let from_runtime = bridge.worker(&worker.worker_id).worker.unwrap();
assert_eq!(from_runtime.worker_id, worker.worker_id);
}
let dotted = worker_id_for_pod("foo.bar");
let dashed = worker_id_for_pod("foo-bar");
assert_ne!(dotted, dashed);
assert!(ids.contains(&dotted));
assert!(ids.contains(&dashed));
assert!(ids.iter().any(|id| id.len() == MAX_IDENTIFIER_LEN));
} }
} }

View File

@ -726,7 +726,10 @@ mod tests {
let hosts = get_json(app.clone(), "/api/hosts").await; let hosts = get_json(app.clone(), "/api/hosts").await;
assert_eq!(hosts["source"], "worker_runtime_registry"); assert_eq!(hosts["source"], "worker_runtime_registry");
assert_eq!(hosts["items"][0]["runtime_id"], "local-pod-runtime"); assert_eq!(hosts["items"][0]["runtime_id"], "local-pod-runtime");
assert_eq!(hosts["items"][0]["host_id"], TEST_REPOSITORY_ID); let host_id = hosts["items"][0]["host_id"].as_str().unwrap().to_string();
assert!(host_id.starts_with("local-"));
assert!(host_id.len() <= 120);
assert_ne!(host_id, TEST_REPOSITORY_ID);
assert_eq!(hosts["items"][0]["kind"], "local-pod-host"); assert_eq!(hosts["items"][0]["kind"], "local-pod-host");
assert_eq!( assert_eq!(
hosts["items"][0]["capabilities"]["local_pod_inspection"], hosts["items"][0]["capabilities"]["local_pod_inspection"],
@ -741,7 +744,7 @@ mod tests {
let runtimes = get_json(app.clone(), "/api/runtimes").await; let runtimes = get_json(app.clone(), "/api/runtimes").await;
assert_eq!(runtimes["source"], "worker_runtime_registry"); assert_eq!(runtimes["source"], "worker_runtime_registry");
assert_eq!(runtimes["items"][0]["runtime_id"], "local-pod-runtime"); assert_eq!(runtimes["items"][0]["runtime_id"], "local-pod-runtime");
assert_eq!(runtimes["items"][0]["host_ids"][0], TEST_REPOSITORY_ID); assert_eq!(runtimes["items"][0]["host_ids"][0], host_id);
let workers = get_json(app.clone(), "/api/workers").await; let workers = get_json(app.clone(), "/api/workers").await;
assert!(workers["items"].as_array().unwrap().is_empty()); assert!(workers["items"].as_array().unwrap().is_empty());
@ -750,11 +753,7 @@ mod tests {
"local_pod_registry_unreadable" "local_pod_registry_unreadable"
); );
let host_workers = get_json( let host_workers = get_json(app.clone(), &format!("/api/hosts/{host_id}/workers")).await;
app.clone(),
&format!("/api/hosts/{TEST_REPOSITORY_ID}/workers"),
)
.await;
assert!(host_workers["items"].as_array().unwrap().is_empty()); assert!(host_workers["items"].as_array().unwrap().is_empty());
let runs_response = app let runs_response = app