pod: stop intake after ready idle
This commit is contained in:
parent
64318e8045
commit
61c323189c
|
|
@ -32,6 +32,9 @@ pub struct SpawnConfig {
|
|||
/// Optional reusable Profile selector. Pod identity is always supplied
|
||||
/// separately with `--pod`; profile selection must not imply a name.
|
||||
pub profile: Option<String>,
|
||||
/// Process-local Ticket role marker supplied only by Ticket role launches.
|
||||
/// This does not alter prompts, manifests, or Ticket claim records.
|
||||
pub ticket_role: Option<String>,
|
||||
/// Explicit runtime workspace root. The child uses it as process cwd and
|
||||
/// receives it via `--workspace` so startup does not infer workspace
|
||||
/// identity from the parent process cwd.
|
||||
|
|
@ -123,6 +126,9 @@ fn runtime_args(config: &SpawnConfig) -> Vec<String> {
|
|||
args.extend(["--profile".to_string(), profile.clone()]);
|
||||
}
|
||||
}
|
||||
if let Some(ticket_role) = &config.ticket_role {
|
||||
args.extend(["--ticket-role".to_string(), ticket_role.clone()]);
|
||||
}
|
||||
args
|
||||
}
|
||||
|
||||
|
|
@ -327,6 +333,7 @@ mod tests {
|
|||
runtime_command: PodRuntimeCommand::new("/bin/yoi", vec![OsString::from("pod")]),
|
||||
pod_name: "explicit-pod".to_string(),
|
||||
profile: Some("project:companion".to_string()),
|
||||
ticket_role: None,
|
||||
workspace_root: PathBuf::from("/work/other-project"),
|
||||
resume_from: None,
|
||||
}
|
||||
|
|
@ -363,4 +370,24 @@ mod tests {
|
|||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_args_pass_ticket_role_marker_when_present() {
|
||||
let mut config = base_config();
|
||||
config.ticket_role = Some("intake".to_string());
|
||||
|
||||
assert_eq!(
|
||||
runtime_args(&config),
|
||||
vec![
|
||||
"--workspace",
|
||||
"/work/other-project",
|
||||
"--pod",
|
||||
"explicit-pod",
|
||||
"--profile",
|
||||
"project:companion",
|
||||
"--ticket-role",
|
||||
"intake",
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -162,6 +162,7 @@ impl TicketRoleLaunchPlan {
|
|||
runtime_command,
|
||||
pod_name: self.pod_name.clone(),
|
||||
profile: Some(self.profile.clone()),
|
||||
ticket_role: Some(self.role.as_str().to_string()),
|
||||
workspace_root: self.workspace_root.clone(),
|
||||
resume_from: None,
|
||||
})
|
||||
|
|
@ -1024,6 +1025,7 @@ workflow = "ticket-review-workflow"
|
|||
.unwrap();
|
||||
assert_eq!(spawn.pod_name, "reviewer-fixed");
|
||||
assert_eq!(spawn.profile.as_deref(), Some("builtin:default"));
|
||||
assert_eq!(spawn.ticket_role.as_deref(), Some("reviewer"));
|
||||
assert_eq!(spawn.workspace_root, temp.path());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,10 @@ use crate::pod::{Pod, PodError, PodRunResult, SystemItemCommitter};
|
|||
use crate::runtime::dir::RuntimeDir;
|
||||
use crate::segment_log_sink::SegmentLogSink;
|
||||
use crate::shared_state::PodSharedState;
|
||||
use crate::shutdown_after_idle::{
|
||||
ShutdownAfterIdleRequest, TicketIntakeReadyShutdownHook, is_ticket_intake_role,
|
||||
take_shutdown_request_after_status,
|
||||
};
|
||||
use crate::spawn::comm_tools::{read_pod_output_tool, send_to_pod_tool, stop_pod_tool};
|
||||
use crate::spawn::registry::SpawnedPodRegistry;
|
||||
use crate::spawn::tool::spawn_pod_tool;
|
||||
|
|
@ -221,6 +225,16 @@ impl PodController {
|
|||
spawned_registry.clone(),
|
||||
);
|
||||
|
||||
// Intake role Pods self-terminate only after a successful
|
||||
// TicketIntakeReady turn has fully settled back to Idle. The request
|
||||
// is transient controller state, not model-visible context or ticket
|
||||
// claim metadata.
|
||||
let shutdown_after_idle = ShutdownAfterIdleRequest::default();
|
||||
pod.add_post_tool_call_hook(TicketIntakeReadyShutdownHook::new(
|
||||
shutdown_after_idle.clone(),
|
||||
is_ticket_intake_role(pod.runtime_ticket_role()),
|
||||
));
|
||||
|
||||
// Materialise pending tool factories so the greeting reflects
|
||||
// the actual registered set instead of a hand-maintained mirror.
|
||||
pod.worker().tool_server_handle().flush_pending();
|
||||
|
|
@ -282,6 +296,7 @@ impl PodController {
|
|||
spawned_registry,
|
||||
shutdown_tx,
|
||||
socket_server,
|
||||
shutdown_after_idle,
|
||||
));
|
||||
|
||||
Ok((handle, shutdown_rx))
|
||||
|
|
@ -592,6 +607,7 @@ async fn controller_loop<C, St>(
|
|||
spawned_registry: Arc<SpawnedPodRegistry>,
|
||||
shutdown_tx: oneshot::Sender<()>,
|
||||
socket_server: SocketServer,
|
||||
shutdown_after_idle: ShutdownAfterIdleRequest,
|
||||
) where
|
||||
C: LlmClient + Clone + 'static,
|
||||
St: Store + PodMetadataStore + Clone + 'static,
|
||||
|
|
@ -678,6 +694,10 @@ async fn controller_loop<C, St>(
|
|||
let _ = event_tx.send(Event::Shutdown);
|
||||
break;
|
||||
}
|
||||
if take_shutdown_request_after_status(&shutdown_after_idle, new_status) {
|
||||
let _ = event_tx.send(Event::Shutdown);
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ use manifest::{
|
|||
};
|
||||
use pod_store::{CombinedStore, FsPodStore, PodMetadataStore};
|
||||
use session_store::{FsStore, SegmentId, Store};
|
||||
use ticket::config::TicketRole;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(about = "Spawn a Pod process from a profile or a single manifest file")]
|
||||
|
|
@ -65,6 +66,10 @@ struct Cli {
|
|||
#[arg(long, value_name = "PATH", requires = "adopt")]
|
||||
callback: Option<PathBuf>,
|
||||
|
||||
/// Process-local Ticket role marker supplied by the Ticket role launcher.
|
||||
#[arg(long, hide = true)]
|
||||
ticket_role: Option<String>,
|
||||
|
||||
/// Resume or create a Pod by name. If name-keyed Pod state exists,
|
||||
/// the active session/segment recorded there is restored; otherwise a
|
||||
/// fresh top-level Pod is created with this name.
|
||||
|
|
@ -342,7 +347,7 @@ async fn run_cli_inner(cli: Cli) -> ExitCode {
|
|||
};
|
||||
let store = CombinedStore::new(session_store, pod_store);
|
||||
|
||||
let pod = if cli.adopt {
|
||||
let mut pod = if cli.adopt {
|
||||
let callback = match cli.callback.clone() {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
|
|
@ -423,6 +428,13 @@ async fn run_cli_inner(cli: Cli) -> ExitCode {
|
|||
}
|
||||
}
|
||||
};
|
||||
if let Some(role) = cli.ticket_role.clone() {
|
||||
if TicketRole::parse(&role).is_none() {
|
||||
eprintln!("error: invalid --ticket-role {role:?}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
pod.set_runtime_ticket_role(Some(role));
|
||||
}
|
||||
let pod_name = pod.manifest().pod.name.clone();
|
||||
|
||||
// Spawn the controller (starts socket server)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ pub mod prompt;
|
|||
pub mod runtime;
|
||||
pub mod segment_log_sink;
|
||||
pub mod shared_state;
|
||||
mod shutdown_after_idle;
|
||||
pub mod spawn;
|
||||
pub mod workflow;
|
||||
|
||||
|
|
|
|||
|
|
@ -312,6 +312,10 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// `Method::PodEvent` reports upward (turn end, error, shutdown,
|
||||
/// scope sub-delegation).
|
||||
callback_socket: Option<PathBuf>,
|
||||
/// Transient launch role for Ticket role sessions. This is process-local
|
||||
/// runtime identity used by controller policy; it is not model-visible and
|
||||
/// is not persisted into Ticket claim/session records.
|
||||
runtime_ticket_role: Option<String>,
|
||||
/// Central catalog of Pod-level prompt strings (compaction system
|
||||
/// prompt, notification wrapper, interrupt notes, trailing system
|
||||
/// sections, ...). Built from the 4-layer overlay in
|
||||
|
|
@ -435,6 +439,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
scope_allocation: None,
|
||||
callback_socket: None,
|
||||
runtime_ticket_role: None,
|
||||
prompts: self.prompts.clone(),
|
||||
workflow_registry: self.workflow_registry.clone(),
|
||||
memory_layout: self.memory_layout.clone(),
|
||||
|
|
@ -616,6 +621,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
scope_allocation: None,
|
||||
callback_socket: None,
|
||||
runtime_ticket_role: None,
|
||||
prompts,
|
||||
workflow_registry: workflow_crate::WorkflowRegistry::empty(),
|
||||
memory_layout: None,
|
||||
|
|
@ -695,6 +701,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&self.manifest
|
||||
}
|
||||
|
||||
/// Process-local Ticket role marker supplied by the role launcher.
|
||||
pub fn runtime_ticket_role(&self) -> Option<&str> {
|
||||
self.runtime_ticket_role.as_deref()
|
||||
}
|
||||
|
||||
/// Set the process-local Ticket role marker. Intended for entrypoint
|
||||
/// launch metadata, not for model-visible prompts or durable claims.
|
||||
pub fn set_runtime_ticket_role(&mut self, role: Option<String>) {
|
||||
self.runtime_ticket_role = role;
|
||||
}
|
||||
|
||||
/// The Pod's working directory.
|
||||
pub fn pwd(&self) -> &Path {
|
||||
&self.pwd
|
||||
|
|
@ -3748,6 +3765,7 @@ where
|
|||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
scope_allocation: Some(scope_allocation),
|
||||
callback_socket: None,
|
||||
runtime_ticket_role: None,
|
||||
prompts: common.prompts,
|
||||
workflow_registry: common.workflow_registry,
|
||||
memory_layout: common.memory_layout,
|
||||
|
|
@ -3827,6 +3845,7 @@ where
|
|||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
scope_allocation: Some(scope_allocation),
|
||||
callback_socket: Some(callback_socket),
|
||||
runtime_ticket_role: None,
|
||||
prompts: common.prompts,
|
||||
workflow_registry: common.workflow_registry,
|
||||
memory_layout: common.memory_layout,
|
||||
|
|
@ -4007,6 +4026,7 @@ where
|
|||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
scope_allocation: Some(scope_allocation),
|
||||
callback_socket: None,
|
||||
runtime_ticket_role: None,
|
||||
prompts: common.prompts,
|
||||
workflow_registry: common.workflow_registry,
|
||||
memory_layout: common.memory_layout,
|
||||
|
|
|
|||
165
crates/pod/src/shutdown_after_idle.rs
Normal file
165
crates/pod/src/shutdown_after_idle.rs
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use protocol::PodStatus;
|
||||
use ticket::config::TicketRole;
|
||||
|
||||
use crate::hook::{Hook, HookPostToolAction, PostToolCall, ToolResultSummary};
|
||||
|
||||
const TICKET_INTAKE_READY_TOOL_NAME: &str = "TicketIntakeReady";
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct ShutdownAfterIdleRequest {
|
||||
requested: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl ShutdownAfterIdleRequest {
|
||||
pub(crate) fn request(&self) {
|
||||
self.requested.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
pub(crate) fn take(&self) -> bool {
|
||||
self.requested.swap(false, Ordering::AcqRel)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn is_requested(&self) -> bool {
|
||||
self.requested.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_ticket_intake_role(role: Option<&str>) -> bool {
|
||||
matches!(role.and_then(TicketRole::parse), Some(TicketRole::Intake))
|
||||
}
|
||||
|
||||
pub(crate) fn take_shutdown_request_after_status(
|
||||
shutdown_after_idle: &ShutdownAfterIdleRequest,
|
||||
status: PodStatus,
|
||||
) -> bool {
|
||||
status == PodStatus::Idle && shutdown_after_idle.take()
|
||||
}
|
||||
|
||||
pub(crate) struct TicketIntakeReadyShutdownHook {
|
||||
shutdown_after_idle: ShutdownAfterIdleRequest,
|
||||
eligible_ticket_intake_role: bool,
|
||||
}
|
||||
|
||||
impl TicketIntakeReadyShutdownHook {
|
||||
pub(crate) fn new(
|
||||
shutdown_after_idle: ShutdownAfterIdleRequest,
|
||||
eligible_ticket_intake_role: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
shutdown_after_idle,
|
||||
eligible_ticket_intake_role,
|
||||
}
|
||||
}
|
||||
|
||||
fn observe_tool_result(&self, info: &ToolResultSummary) {
|
||||
if self.eligible_ticket_intake_role
|
||||
&& info.tool_name == TICKET_INTAKE_READY_TOOL_NAME
|
||||
&& !info.is_error
|
||||
{
|
||||
self.shutdown_after_idle.request();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Hook<PostToolCall> for TicketIntakeReadyShutdownHook {
|
||||
async fn call(&self, info: &ToolResultSummary) -> HookPostToolAction {
|
||||
self.observe_tool_result(info);
|
||||
HookPostToolAction::Continue
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use llm_worker::tool::ToolOutput;
|
||||
|
||||
fn tool_result(name: &str, is_error: bool) -> ToolResultSummary {
|
||||
ToolResultSummary {
|
||||
call_id: "tool-1".to_string(),
|
||||
tool_name: name.to_string(),
|
||||
is_error,
|
||||
output: ToolOutput {
|
||||
summary: "result".to_string(),
|
||||
content: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn successful_ticket_intake_ready_schedules_shutdown_after_idle_for_intake_role() {
|
||||
let request = ShutdownAfterIdleRequest::default();
|
||||
let hook = TicketIntakeReadyShutdownHook::new(request.clone(), true);
|
||||
|
||||
hook.observe_tool_result(&tool_result(TICKET_INTAKE_READY_TOOL_NAME, false));
|
||||
|
||||
assert!(request.is_requested());
|
||||
assert!(request.take());
|
||||
assert!(!request.is_requested());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn failed_ticket_intake_ready_does_not_schedule_shutdown_after_idle() {
|
||||
let request = ShutdownAfterIdleRequest::default();
|
||||
let hook = TicketIntakeReadyShutdownHook::new(request.clone(), true);
|
||||
|
||||
hook.observe_tool_result(&tool_result(TICKET_INTAKE_READY_TOOL_NAME, true));
|
||||
|
||||
assert!(!request.is_requested());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_intake_role_does_not_schedule_shutdown_after_idle() {
|
||||
let request = ShutdownAfterIdleRequest::default();
|
||||
let hook = TicketIntakeReadyShutdownHook::new(request.clone(), false);
|
||||
|
||||
hook.observe_tool_result(&tool_result(TICKET_INTAKE_READY_TOOL_NAME, false));
|
||||
|
||||
assert!(!request.is_requested());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn other_successful_tools_do_not_schedule_shutdown_after_idle() {
|
||||
let request = ShutdownAfterIdleRequest::default();
|
||||
let hook = TicketIntakeReadyShutdownHook::new(request.clone(), true);
|
||||
|
||||
hook.observe_tool_result(&tool_result("TicketShow", false));
|
||||
|
||||
assert!(!request.is_requested());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn only_ticket_intake_runtime_role_is_eligible() {
|
||||
assert!(is_ticket_intake_role(Some("intake")));
|
||||
assert!(!is_ticket_intake_role(Some("orchestrator")));
|
||||
assert!(!is_ticket_intake_role(Some("coder")));
|
||||
assert!(!is_ticket_intake_role(Some("reviewer")));
|
||||
assert!(!is_ticket_intake_role(Some("unknown")));
|
||||
assert!(!is_ticket_intake_role(None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shutdown_after_idle_is_taken_only_after_idle_status() {
|
||||
let request = ShutdownAfterIdleRequest::default();
|
||||
request.request();
|
||||
|
||||
assert!(!take_shutdown_request_after_status(
|
||||
&request,
|
||||
PodStatus::Running
|
||||
));
|
||||
assert!(request.is_requested());
|
||||
|
||||
assert!(take_shutdown_request_after_status(
|
||||
&request,
|
||||
PodStatus::Idle
|
||||
));
|
||||
assert!(!request.is_requested());
|
||||
}
|
||||
}
|
||||
|
|
@ -1671,6 +1671,7 @@ async fn restore_workspace_companion_pod(
|
|||
runtime_command,
|
||||
pod_name: pod_name.to_string(),
|
||||
profile: None,
|
||||
ticket_role: None,
|
||||
workspace_root: workspace_root.to_path_buf(),
|
||||
resume_from: None,
|
||||
};
|
||||
|
|
@ -1686,6 +1687,7 @@ async fn spawn_workspace_companion_pod(
|
|||
runtime_command,
|
||||
pod_name: pod_name.to_string(),
|
||||
profile: None,
|
||||
ticket_role: None,
|
||||
workspace_root: workspace_root.to_path_buf(),
|
||||
resume_from: None,
|
||||
};
|
||||
|
|
@ -1701,6 +1703,7 @@ async fn restore_orchestrator_pod(
|
|||
runtime_command,
|
||||
pod_name: pod_name.to_string(),
|
||||
profile: None,
|
||||
ticket_role: None,
|
||||
workspace_root: workspace_root.to_path_buf(),
|
||||
resume_from: None,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -378,6 +378,7 @@ async fn wait_for_ready(
|
|||
runtime_command: runtime_command.clone(),
|
||||
pod_name: form.name.clone(),
|
||||
profile: form.selected_profile_selector(),
|
||||
ticket_role: None,
|
||||
workspace_root: form.cwd.clone(),
|
||||
resume_from: form.resume_from,
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user