feat: weak companion progress notify
This commit is contained in:
parent
05fe1f6fb3
commit
a87d315471
|
|
@ -125,6 +125,10 @@ impl PendingRun {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn should_auto_run_notification(status: PodStatus, auto_run: bool) -> bool {
|
||||||
|
auto_run && status == PodStatus::Idle
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// PodController — actor that owns a Pod
|
// PodController — actor that owns a Pod
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -774,7 +778,7 @@ async fn controller_loop<C, St>(
|
||||||
pending = Some(PendingRun::Run(input));
|
pending = Some(PendingRun::Run(input));
|
||||||
}
|
}
|
||||||
|
|
||||||
Method::Notify { message } => {
|
Method::Notify { message, auto_run } => {
|
||||||
// Client-side live echo is delivered as `Event::SystemItem`
|
// Client-side live echo is delivered as `Event::SystemItem`
|
||||||
// once the interceptor commits the corresponding
|
// once the interceptor commits the corresponding
|
||||||
// `LogEntry::SystemItem` entry — drained out of the
|
// `LogEntry::SystemItem` entry — drained out of the
|
||||||
|
|
@ -784,10 +788,10 @@ async fn controller_loop<C, St>(
|
||||||
// RUNNING / Paused: the buffer push is the entire
|
// RUNNING / Paused: the buffer push is the entire
|
||||||
// operation; an in-flight turn (or the next
|
// operation; an in-flight turn (or the next
|
||||||
// Resume/Run) will drain it at its next
|
// Resume/Run) will drain it at its next
|
||||||
// pending_history_appends. IDLE: auto-start a turn so the LLM
|
// pending_history_appends. IDLE: only `auto_run`
|
||||||
// sees the buffered notification(s) without a human
|
// notifications stage RunForNotification; weak progress
|
||||||
// Run.
|
// notices stay queued until an explicit run/resume.
|
||||||
if shared_state.get_status() == PodStatus::Idle {
|
if should_auto_run_notification(shared_state.get_status(), auto_run) {
|
||||||
pending = Some(PendingRun::RunForNotification(protocol::InvokeKind::Notify));
|
pending = Some(PendingRun::RunForNotification(protocol::InvokeKind::Notify));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1145,7 +1149,7 @@ where
|
||||||
.into(),
|
.into(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Some(Method::Notify { message }) => {
|
Some(Method::Notify { message, .. }) => {
|
||||||
// Live echo arrives via `Event::SystemItem` once
|
// Live echo arrives via `Event::SystemItem` once
|
||||||
// the in-flight turn's next `pending_history_appends`
|
// the in-flight turn's next `pending_history_appends`
|
||||||
// drains this entry through the interceptor.
|
// drains this entry through the interceptor.
|
||||||
|
|
@ -1337,6 +1341,14 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn notification_auto_run_gate_only_allows_idle_auto_run() {
|
||||||
|
assert!(should_auto_run_notification(PodStatus::Idle, true));
|
||||||
|
assert!(!should_auto_run_notification(PodStatus::Idle, false));
|
||||||
|
assert!(!should_auto_run_notification(PodStatus::Running, true));
|
||||||
|
assert!(!should_auto_run_notification(PodStatus::Paused, true));
|
||||||
|
}
|
||||||
|
|
||||||
struct DriveTurnEnv {
|
struct DriveTurnEnv {
|
||||||
// Held to keep the channel alive; without this `method_rx.recv()`
|
// Held to keep the channel alive; without this `method_rx.recv()`
|
||||||
// would observe channel-closed and confuse the select! arm.
|
// would observe channel-closed and confuse the select! arm.
|
||||||
|
|
|
||||||
|
|
@ -913,7 +913,14 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> {
|
async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> {
|
||||||
connect_and_send(socket_path, &Method::Notify { message }).await
|
connect_and_send(
|
||||||
|
socket_path,
|
||||||
|
&Method::Notify {
|
||||||
|
message,
|
||||||
|
auto_run: true,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
|
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
|
||||||
|
|
@ -1395,7 +1402,8 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let method = reader.next::<Method>().await.unwrap().unwrap();
|
let method = reader.next::<Method>().await.unwrap().unwrap();
|
||||||
if let Method::Notify { message } = method {
|
if let Method::Notify { message, auto_run } = method {
|
||||||
|
assert!(auto_run);
|
||||||
tx.send(message).await.unwrap();
|
tx.send(message).await.unwrap();
|
||||||
} else {
|
} else {
|
||||||
panic!("expected Notify, got {method:?}");
|
panic!("expected Notify, got {method:?}");
|
||||||
|
|
|
||||||
|
|
@ -1025,6 +1025,7 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
|
||||||
handle
|
handle
|
||||||
.send(Method::Notify {
|
.send(Method::Notify {
|
||||||
message: "turn finished".into(),
|
message: "turn finished".into(),
|
||||||
|
auto_run: true,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
@ -1105,6 +1106,62 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn notify_while_idle_with_auto_run_false_waits_for_explicit_run() {
|
||||||
|
let client = MockClient::new(simple_text_events());
|
||||||
|
let client_for_assert = client.clone();
|
||||||
|
let pod = make_pod(client).await;
|
||||||
|
let handle = spawn_controller(pod).await;
|
||||||
|
|
||||||
|
handle
|
||||||
|
.send(Method::Notify {
|
||||||
|
message: "progress snapshot".into(),
|
||||||
|
auto_run: false,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
|
||||||
|
assert!(
|
||||||
|
client_for_assert.captured_requests().is_empty(),
|
||||||
|
"weak Notify must not stage RunForNotification while idle"
|
||||||
|
);
|
||||||
|
|
||||||
|
handle.send(Method::run_text("continue")).await.unwrap();
|
||||||
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
|
||||||
|
loop {
|
||||||
|
if !client_for_assert.captured_requests().is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assert!(
|
||||||
|
tokio::time::Instant::now() < deadline,
|
||||||
|
"explicit run did not reach the mock LLM"
|
||||||
|
);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
|
wait_for_status(&handle, PodStatus::Idle).await;
|
||||||
|
let requests = client_for_assert.captured_requests();
|
||||||
|
assert_eq!(
|
||||||
|
requests.len(),
|
||||||
|
1,
|
||||||
|
"explicit run should drain the queued notification"
|
||||||
|
);
|
||||||
|
let notify_in_request = requests[0].items.iter().any(|i| {
|
||||||
|
i.as_text()
|
||||||
|
.is_some_and(|t| t.contains("[Notification]") && t.contains("progress snapshot"))
|
||||||
|
});
|
||||||
|
assert!(
|
||||||
|
notify_in_request,
|
||||||
|
"queued weak notification must be history-backed on the next explicit run; got items: {:?}",
|
||||||
|
requests[0]
|
||||||
|
.items
|
||||||
|
.iter()
|
||||||
|
.filter_map(|i| i.as_text())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_message() {
|
async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_message() {
|
||||||
let client = MockClient::new(simple_text_events());
|
let client = MockClient::new(simple_text_events());
|
||||||
|
|
@ -1259,6 +1316,7 @@ async fn notify_while_running_does_not_emit_already_running_error() {
|
||||||
handle
|
handle
|
||||||
.send(Method::Notify {
|
.send(Method::Notify {
|
||||||
message: "ping".into(),
|
message: "ping".into(),
|
||||||
|
auto_run: true,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,14 @@ use std::path::PathBuf;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
fn default_true() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_true(value: &bool) -> bool {
|
||||||
|
*value
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Method (Client → Pod via Unix Socket)
|
// Method (Client → Pod via Unix Socket)
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -15,10 +23,15 @@ pub enum Method {
|
||||||
input: Vec<Segment>,
|
input: Vec<Segment>,
|
||||||
},
|
},
|
||||||
/// Human-readable text injected into the target Pod's LLM context
|
/// Human-readable text injected into the target Pod's LLM context
|
||||||
/// as a non-blocking system message. No side effects beyond LLM
|
/// as a non-blocking system message. `auto_run` controls whether an
|
||||||
/// context; use `PodEvent` for typed lifecycle reports.
|
/// idle target is kicked into `RunForNotification`; weak notifications
|
||||||
|
/// (`auto_run: false`) are only queued for the next turn/resume/run.
|
||||||
|
/// No side effects beyond LLM context; use `PodEvent` for typed
|
||||||
|
/// lifecycle reports.
|
||||||
Notify {
|
Notify {
|
||||||
message: String,
|
message: String,
|
||||||
|
#[serde(default = "default_true", skip_serializing_if = "is_true")]
|
||||||
|
auto_run: bool,
|
||||||
},
|
},
|
||||||
/// Typed lifecycle report from a child Pod to its direct parent.
|
/// Typed lifecycle report from a child Pod to its direct parent.
|
||||||
PodEvent(PodEvent),
|
PodEvent(PodEvent),
|
||||||
|
|
@ -1027,17 +1040,28 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn method_notify_json_roundtrip() {
|
fn method_notify_json_roundtrip_defaults_to_auto_run() {
|
||||||
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
||||||
let method: Method = serde_json::from_str(json).unwrap();
|
let method: Method = serde_json::from_str(json).unwrap();
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
method,
|
method,
|
||||||
Method::Notify { ref message } if message == "turn done"
|
Method::Notify { ref message, auto_run: true } if message == "turn done"
|
||||||
));
|
));
|
||||||
let serialized = serde_json::to_string(&method).unwrap();
|
let serialized = serde_json::to_string(&method).unwrap();
|
||||||
assert_eq!(serialized, json);
|
assert_eq!(serialized, json);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn method_notify_weak_json_roundtrip_serializes_auto_run_false() {
|
||||||
|
let json = r#"{"method":"notify","params":{"message":"progress","auto_run":false}}"#;
|
||||||
|
let method: Method = serde_json::from_str(json).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
method,
|
||||||
|
Method::Notify { ref message, auto_run: false } if message == "progress"
|
||||||
|
));
|
||||||
|
assert_eq!(serde_json::to_string(&method).unwrap(), json);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn method_list_completions_roundtrip() {
|
fn method_list_completions_roundtrip() {
|
||||||
let method = Method::ListCompletions {
|
let method = Method::ListCompletions {
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,9 @@ use crate::workspace_panel::{
|
||||||
|
|
||||||
const MAX_ENTRIES: usize = 50;
|
const MAX_ENTRIES: usize = 50;
|
||||||
const CLOSED_VISIBLE_ROWS: usize = 3;
|
const CLOSED_VISIBLE_ROWS: usize = 3;
|
||||||
|
const COMPANION_PROGRESS_MAX_TICKETS: usize = 5;
|
||||||
|
const COMPANION_PROGRESS_MAX_TITLE_CHARS: usize = 80;
|
||||||
|
const COMPANION_PROGRESS_MAX_MESSAGE_CHARS: usize = 1_800;
|
||||||
const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(3);
|
const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(3);
|
||||||
const MULTI_POD_POLL_INTERVAL: Duration = Duration::from_millis(1_500);
|
const MULTI_POD_POLL_INTERVAL: Duration = Duration::from_millis(1_500);
|
||||||
const TERMINAL_EVENT_POLL_INTERVAL: Duration = Duration::from_millis(100);
|
const TERMINAL_EVENT_POLL_INTERVAL: Duration = Duration::from_millis(100);
|
||||||
|
|
@ -126,6 +129,10 @@ pub(crate) async fn run(
|
||||||
loop {
|
loop {
|
||||||
if let Some(result) = pending_reload.finish_if_ready().await {
|
if let Some(result) = pending_reload.finish_if_ready().await {
|
||||||
app.apply_reload_result(result);
|
app.apply_reload_result(result);
|
||||||
|
if let Some(request) = app.prepare_companion_progress_notice() {
|
||||||
|
let result = dispatch_companion_progress_notice(request).await;
|
||||||
|
app.finish_companion_progress_notice(result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
terminal.draw(|f| draw(f, app))?;
|
terminal.draw(|f| draw(f, app))?;
|
||||||
|
|
@ -529,6 +536,50 @@ struct PanelDiagnostic {
|
||||||
details: String,
|
details: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct CompanionProgressFreshness {
|
||||||
|
fingerprint: String,
|
||||||
|
updated_at: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct CompanionProgressNotice {
|
||||||
|
message: String,
|
||||||
|
fingerprint: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct CompanionProgressNoticeRequest {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
notice: CompanionProgressNotice,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct CompanionProgressNoticeResult {
|
||||||
|
fingerprint: String,
|
||||||
|
updated_at: String,
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CompanionProgressNoticeResult {
|
||||||
|
fn sent(fingerprint: String, updated_at: String) -> Self {
|
||||||
|
Self {
|
||||||
|
fingerprint,
|
||||||
|
updated_at,
|
||||||
|
error: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn failed(fingerprint: String, error: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
fingerprint,
|
||||||
|
updated_at: String::new(),
|
||||||
|
error: Some(error.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct MultiPodApp {
|
pub(crate) struct MultiPodApp {
|
||||||
pub(crate) list: PodList,
|
pub(crate) list: PodList,
|
||||||
pub(crate) panel: WorkspacePanelViewModel,
|
pub(crate) panel: WorkspacePanelViewModel,
|
||||||
|
|
@ -545,6 +596,7 @@ pub(crate) struct MultiPodApp {
|
||||||
runtime_command: PodRuntimeCommand,
|
runtime_command: PodRuntimeCommand,
|
||||||
last_companion_lifecycle_failure: Option<CompanionPanelState>,
|
last_companion_lifecycle_failure: Option<CompanionPanelState>,
|
||||||
last_orchestrator_lifecycle_failure: Option<OrchestratorPanelState>,
|
last_orchestrator_lifecycle_failure: Option<OrchestratorPanelState>,
|
||||||
|
companion_progress: Option<CompanionProgressFreshness>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MultiPodApp {
|
impl MultiPodApp {
|
||||||
|
|
@ -579,6 +631,7 @@ impl MultiPodApp {
|
||||||
runtime_command,
|
runtime_command,
|
||||||
last_companion_lifecycle_failure: None,
|
last_companion_lifecycle_failure: None,
|
||||||
last_orchestrator_lifecycle_failure: None,
|
last_orchestrator_lifecycle_failure: None,
|
||||||
|
companion_progress: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -629,6 +682,50 @@ impl MultiPodApp {
|
||||||
self.selected_row = previous_row.filter(|key| self.panel.row(key).is_some());
|
self.selected_row = previous_row.filter(|key| self.panel.row(key).is_some());
|
||||||
self.ensure_selection_visible();
|
self.ensure_selection_visible();
|
||||||
self.ensure_composer_target_available();
|
self.ensure_composer_target_available();
|
||||||
|
self.apply_companion_progress_freshness();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prepare_companion_progress_notice(&mut self) -> Option<CompanionProgressNoticeRequest> {
|
||||||
|
let target = companion_progress_notice_target(&self.panel, &self.list)?;
|
||||||
|
let notice = companion_progress_notice(&self.panel, &self.list)?;
|
||||||
|
if self
|
||||||
|
.companion_progress
|
||||||
|
.as_ref()
|
||||||
|
.is_some_and(|freshness| freshness.fingerprint == notice.fingerprint)
|
||||||
|
{
|
||||||
|
self.apply_companion_progress_freshness();
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(CompanionProgressNoticeRequest {
|
||||||
|
pod_name: target.pod_name,
|
||||||
|
socket_path: target.socket_path,
|
||||||
|
notice,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish_companion_progress_notice(&mut self, result: CompanionProgressNoticeResult) {
|
||||||
|
if let Some(error) = result.error {
|
||||||
|
self.notice = Some(format!("Companion progress notice not delivered: {error}"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.companion_progress = Some(CompanionProgressFreshness {
|
||||||
|
fingerprint: result.fingerprint,
|
||||||
|
updated_at: result.updated_at,
|
||||||
|
});
|
||||||
|
self.apply_companion_progress_freshness();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_companion_progress_freshness(&mut self) {
|
||||||
|
let Some(freshness) = self.companion_progress.as_ref() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Some(companion) = self.panel.header.companion.as_mut() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
companion.detail = Some(format!(
|
||||||
|
"progress context updated at {} (weak notify)",
|
||||||
|
freshness.updated_at
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn apply_companion_lifecycle_memory(&mut self, panel: &mut WorkspacePanelViewModel) {
|
fn apply_companion_lifecycle_memory(&mut self, panel: &mut WorkspacePanelViewModel) {
|
||||||
|
|
@ -2338,6 +2435,157 @@ struct OrchestratorNotifyTarget {
|
||||||
socket_path: PathBuf,
|
socket_path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct CompanionProgressNoticeTarget {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn companion_progress_notice_target(
|
||||||
|
panel: &WorkspacePanelViewModel,
|
||||||
|
list: &PodList,
|
||||||
|
) -> Option<CompanionProgressNoticeTarget> {
|
||||||
|
let companion = panel.header.companion.as_ref()?;
|
||||||
|
if !companion_status_is_peer_reachable(companion.status) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let entry = list
|
||||||
|
.entries
|
||||||
|
.iter()
|
||||||
|
.find(|entry| entry.name == companion.pod_name)?;
|
||||||
|
let live = entry.live.as_ref()?;
|
||||||
|
if !live.reachable {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(CompanionProgressNoticeTarget {
|
||||||
|
pod_name: companion.pod_name.clone(),
|
||||||
|
socket_path: live.socket_path.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn companion_status_is_peer_reachable(status: CompanionPanelStatus) -> bool {
|
||||||
|
matches!(
|
||||||
|
status,
|
||||||
|
CompanionPanelStatus::Live | CompanionPanelStatus::Restored | CompanionPanelStatus::Spawned
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn companion_progress_notice(
|
||||||
|
panel: &WorkspacePanelViewModel,
|
||||||
|
list: &PodList,
|
||||||
|
) -> Option<CompanionProgressNotice> {
|
||||||
|
let companion = panel.header.companion.as_ref()?;
|
||||||
|
let orchestrator = panel.header.orchestrator.as_ref()?;
|
||||||
|
let mut lines = vec![
|
||||||
|
"Orchestrator progress context (read-only weak notification; no auto-run).".to_string(),
|
||||||
|
"Reason: workspace Panel refreshed bounded orchestration progress for Companion explanation."
|
||||||
|
.to_string(),
|
||||||
|
format!(
|
||||||
|
"Roles: Companion {} is {}; Orchestrator {} is {}.",
|
||||||
|
companion.pod_name,
|
||||||
|
companion.status.label(),
|
||||||
|
orchestrator.pod_name,
|
||||||
|
orchestrator.status.label()
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut ticket_lines = Vec::new();
|
||||||
|
for row in panel.rows.iter().take(COMPANION_PROGRESS_MAX_TICKETS) {
|
||||||
|
let ticket_id = row
|
||||||
|
.ticket
|
||||||
|
.as_ref()
|
||||||
|
.map(|ticket| ticket.id.as_str())
|
||||||
|
.unwrap_or("unknown-ticket");
|
||||||
|
ticket_lines.push(format!(
|
||||||
|
"- {} [{}] {} (ref: .yoi/tickets/{})",
|
||||||
|
ticket_id,
|
||||||
|
row.status,
|
||||||
|
bounded_progress_text(&row.title, COMPANION_PROGRESS_MAX_TITLE_CHARS),
|
||||||
|
ticket_id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if ticket_lines.is_empty() {
|
||||||
|
lines.push("Tickets: none visible in the current Panel snapshot.".to_string());
|
||||||
|
} else {
|
||||||
|
lines.push(format!(
|
||||||
|
"Tickets (first {} visible, bounded):",
|
||||||
|
ticket_lines.len()
|
||||||
|
));
|
||||||
|
lines.extend(ticket_lines);
|
||||||
|
if panel.rows.len() > COMPANION_PROGRESS_MAX_TICKETS {
|
||||||
|
lines.push(format!(
|
||||||
|
"- … {} more ticket(s) omitted from this bounded notice.",
|
||||||
|
panel.rows.len() - COMPANION_PROGRESS_MAX_TICKETS
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let role_pod_lines = bounded_role_pod_lines(list, companion, orchestrator);
|
||||||
|
if !role_pod_lines.is_empty() {
|
||||||
|
lines.push("Role pod status snapshot:".to_string());
|
||||||
|
lines.extend(role_pod_lines);
|
||||||
|
}
|
||||||
|
|
||||||
|
let message = bounded_progress_text(&lines.join("\n"), COMPANION_PROGRESS_MAX_MESSAGE_CHARS);
|
||||||
|
let fingerprint = message.clone();
|
||||||
|
Some(CompanionProgressNotice {
|
||||||
|
message,
|
||||||
|
fingerprint,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bounded_role_pod_lines(
|
||||||
|
list: &PodList,
|
||||||
|
companion: &CompanionPanelState,
|
||||||
|
orchestrator: &OrchestratorPanelState,
|
||||||
|
) -> Vec<String> {
|
||||||
|
let mut lines = Vec::new();
|
||||||
|
for name in [&companion.pod_name, &orchestrator.pod_name] {
|
||||||
|
let Some(entry) = list.entries.iter().find(|entry| entry.name == *name) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
lines.push(format!("- {}: {}", entry.name, row_status_label(entry).0));
|
||||||
|
}
|
||||||
|
lines
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bounded_progress_text(input: &str, max_chars: usize) -> String {
|
||||||
|
let mut output = String::new();
|
||||||
|
for (idx, ch) in input.chars().enumerate() {
|
||||||
|
if idx >= max_chars {
|
||||||
|
output.push('…');
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
let sanitized = if ch.is_control() && ch != '\n' && ch != '\t' {
|
||||||
|
' '
|
||||||
|
} else {
|
||||||
|
ch
|
||||||
|
};
|
||||||
|
output.push(sanitized);
|
||||||
|
}
|
||||||
|
output
|
||||||
|
}
|
||||||
|
|
||||||
|
fn progress_notice_timestamp() -> String {
|
||||||
|
match SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||||
|
Ok(duration) => format!("unix:{}", duration.as_secs()),
|
||||||
|
Err(_) => "unix:0".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch_companion_progress_notice(
|
||||||
|
request: CompanionProgressNoticeRequest,
|
||||||
|
) -> CompanionProgressNoticeResult {
|
||||||
|
let fingerprint = request.notice.fingerprint.clone();
|
||||||
|
match send_notify_only(&request.socket_path, request.notice.message, false).await {
|
||||||
|
Ok(()) => CompanionProgressNoticeResult::sent(fingerprint, progress_notice_timestamp()),
|
||||||
|
Err(err) => CompanionProgressNoticeResult::failed(
|
||||||
|
fingerprint,
|
||||||
|
format!("{}: {}", request.pod_name, err),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub(crate) struct TicketActionOutcome {
|
pub(crate) struct TicketActionOutcome {
|
||||||
notice: String,
|
notice: String,
|
||||||
|
|
@ -3133,7 +3381,7 @@ async fn notify_workspace_orchestrator(
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
let message = orchestrator_queue_notification_message(ticket);
|
let message = orchestrator_queue_notification_message(ticket);
|
||||||
match send_notify_only(&target.socket_path, message).await {
|
match send_notify_only(&target.socket_path, message, true).await {
|
||||||
Ok(()) => OrchestratorNotificationOutcome::Sent {
|
Ok(()) => OrchestratorNotificationOutcome::Sent {
|
||||||
pod_name: target.pod_name,
|
pod_name: target.pod_name,
|
||||||
},
|
},
|
||||||
|
|
@ -3146,7 +3394,11 @@ async fn notify_workspace_orchestrator(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_notify_only(socket: &Path, message: String) -> Result<(), NotifySendError> {
|
async fn send_notify_only(
|
||||||
|
socket: &Path,
|
||||||
|
message: String,
|
||||||
|
auto_run: bool,
|
||||||
|
) -> Result<(), NotifySendError> {
|
||||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||||
.await
|
.await
|
||||||
.map_err(|_| NotifySendError::Io("connect timed out".into()))?
|
.map_err(|_| NotifySendError::Io("connect timed out".into()))?
|
||||||
|
|
@ -3175,10 +3427,13 @@ async fn send_notify_only(socket: &Path, message: String) -> Result<(), NotifySe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::Notify { message }))
|
tokio::time::timeout(
|
||||||
.await
|
SOCKET_OP_TIMEOUT,
|
||||||
.map_err(|_| NotifySendError::Io("write timed out".into()))?
|
writer.write(&Method::Notify { message, auto_run }),
|
||||||
.map_err(|e| NotifySendError::Io(format!("write: {e}")))
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| NotifySendError::Io("write timed out".into()))?
|
||||||
|
.map_err(|e| NotifySendError::Io(format!("write: {e}")))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
|
@ -3531,6 +3786,12 @@ fn draw_title(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) {
|
||||||
companion.status.label(),
|
companion.status.label(),
|
||||||
companion_status_style(companion.status),
|
companion_status_style(companion.status),
|
||||||
));
|
));
|
||||||
|
if let Some(detail) = companion.detail.as_deref() {
|
||||||
|
spans.push(Span::styled(
|
||||||
|
format!(" ({detail})"),
|
||||||
|
Style::default().fg(Color::DarkGray),
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Some(orchestrator) = &app.panel.header.orchestrator {
|
if let Some(orchestrator) = &app.panel.header.orchestrator {
|
||||||
spans.push(Span::styled(
|
spans.push(Span::styled(
|
||||||
|
|
@ -4692,16 +4953,194 @@ mod tests {
|
||||||
reader.next::<Method>().await.unwrap().unwrap()
|
reader.next::<Method>().await.unwrap().unwrap()
|
||||||
});
|
});
|
||||||
|
|
||||||
send_notify_only(&socket_path, "panel Queue".to_string())
|
send_notify_only(&socket_path, "panel Queue".to_string(), true)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let method = server.await.unwrap();
|
let method = server.await.unwrap();
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
method,
|
method,
|
||||||
Method::Notify { message } if message == "panel Queue"
|
Method::Notify { message, auto_run: true } if message == "panel Queue"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn send_notify_only_can_deliver_weak_notification_without_auto_run() {
|
||||||
|
let temp = TempDir::new().unwrap();
|
||||||
|
let socket_path = temp.path().join("companion.sock");
|
||||||
|
let listener = tokio::net::UnixListener::bind(&socket_path).unwrap();
|
||||||
|
let server = tokio::spawn(async move {
|
||||||
|
let (stream, _) = listener.accept().await.unwrap();
|
||||||
|
let (reader, writer) = stream.into_split();
|
||||||
|
let mut reader = JsonLineReader::new(reader);
|
||||||
|
let mut writer = JsonLineWriter::new(writer);
|
||||||
|
writer
|
||||||
|
.write(&Event::Snapshot {
|
||||||
|
entries: Vec::new(),
|
||||||
|
greeting: protocol::Greeting {
|
||||||
|
pod_name: "yoi".to_string(),
|
||||||
|
cwd: temp.path().display().to_string(),
|
||||||
|
provider: "test".to_string(),
|
||||||
|
model: "test".to_string(),
|
||||||
|
scope_summary: "test".to_string(),
|
||||||
|
tools: Vec::new(),
|
||||||
|
context_window: 0,
|
||||||
|
context_tokens: 0,
|
||||||
|
},
|
||||||
|
status: PodStatus::Idle,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
reader.next::<Method>().await.unwrap().unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
send_notify_only(&socket_path, "panel progress".to_string(), false)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let method = server.await.unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
method,
|
||||||
|
Method::Notify { message, auto_run: false } if message == "panel progress"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn companion_progress_notice_target_skips_missing_stopped_and_unreachable_without_spawn_restore()
|
||||||
|
{
|
||||||
|
let missing_app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Idle)]);
|
||||||
|
assert!(companion_progress_notice_target(&missing_app.panel, &missing_app.list).is_none());
|
||||||
|
|
||||||
|
let mut stopped_panel = WorkspacePanelViewModel::empty(Path::new("test"));
|
||||||
|
stopped_panel.header.companion = Some(CompanionPanelState::new(
|
||||||
|
"yoi",
|
||||||
|
CompanionPanelStatus::Stopped,
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
stopped_panel.header.orchestrator = Some(OrchestratorPanelState::new(
|
||||||
|
"test-orchestrator",
|
||||||
|
OrchestratorPanelStatus::Live,
|
||||||
|
None,
|
||||||
|
));
|
||||||
|
let stopped_list = PodList::from_sources(
|
||||||
|
PodVisibilitySource::ResumePicker,
|
||||||
|
vec![stopped_info("yoi")],
|
||||||
|
vec![live_info("test-orchestrator", PodStatus::Idle)],
|
||||||
|
None,
|
||||||
|
10,
|
||||||
|
);
|
||||||
|
assert!(companion_progress_notice_target(&stopped_panel, &stopped_list).is_none());
|
||||||
|
|
||||||
|
let mut unreachable = live_info("yoi", PodStatus::Idle);
|
||||||
|
unreachable.reachable = false;
|
||||||
|
let unreachable_app = ticket_enabled_app(vec![
|
||||||
|
unreachable,
|
||||||
|
live_info("test-orchestrator", PodStatus::Idle),
|
||||||
|
]);
|
||||||
|
assert!(
|
||||||
|
companion_progress_notice_target(&unreachable_app.panel, &unreachable_app.list)
|
||||||
|
.is_none()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn companion_progress_notice_is_bounded_and_excludes_sensitive_unbounded_fields() {
|
||||||
|
let mut app = ticket_enabled_app(vec![
|
||||||
|
live_info("yoi", PodStatus::Idle),
|
||||||
|
live_info("test-orchestrator", PodStatus::Running),
|
||||||
|
]);
|
||||||
|
app.panel.rows = (0..12)
|
||||||
|
.map(|index| {
|
||||||
|
let mut row = panel_test_ticket_row(
|
||||||
|
&format!("TICKET-{index}"),
|
||||||
|
&format!("Visible title {index} {}", "x".repeat(140)),
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"inprogress",
|
||||||
|
);
|
||||||
|
if let Some(ticket) = row.ticket.as_mut() {
|
||||||
|
ticket.latest_event_excerpt = Some(
|
||||||
|
"SECRET_PROVIDER_ERROR_TOKEN should never be copied into progress notices"
|
||||||
|
.to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
row.subtitle = Some("private thread excerpt should stay out".to_string());
|
||||||
|
row
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
app.panel
|
||||||
|
.header
|
||||||
|
.diagnostics
|
||||||
|
.push("diagnostic with SECRET_PROVIDER_ERROR_TOKEN should stay out".to_string());
|
||||||
|
|
||||||
|
let notice = companion_progress_notice(&app.panel, &app.list).unwrap();
|
||||||
|
assert!(notice.message.contains("TICKET-0"));
|
||||||
|
assert!(notice.message.contains("ref: .yoi/tickets/TICKET-0"));
|
||||||
|
assert!(notice.message.contains("more ticket(s) omitted"));
|
||||||
|
assert!(notice.message.chars().count() <= COMPANION_PROGRESS_MAX_MESSAGE_CHARS + 1);
|
||||||
|
assert!(!notice.message.contains("SECRET_PROVIDER_ERROR_TOKEN"));
|
||||||
|
assert!(!notice.message.contains("private thread excerpt"));
|
||||||
|
assert_eq!(notice.fingerprint, notice.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn companion_progress_notice_success_sets_panel_freshness_without_persisting_snapshot() {
|
||||||
|
let mut app = ticket_enabled_app(vec![
|
||||||
|
live_info("yoi", PodStatus::Idle),
|
||||||
|
live_info("test-orchestrator", PodStatus::Idle),
|
||||||
|
]);
|
||||||
|
app.panel.rows.push(panel_test_ticket_row(
|
||||||
|
"TICKET-1",
|
||||||
|
"Implement progress notices",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"inprogress",
|
||||||
|
));
|
||||||
|
|
||||||
|
let request = app.prepare_companion_progress_notice().unwrap();
|
||||||
|
assert_eq!(request.pod_name, "yoi");
|
||||||
|
app.finish_companion_progress_notice(CompanionProgressNoticeResult::sent(
|
||||||
|
request.notice.fingerprint,
|
||||||
|
"unix:42".to_string(),
|
||||||
|
));
|
||||||
|
|
||||||
|
let detail = app
|
||||||
|
.panel
|
||||||
|
.header
|
||||||
|
.companion
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|companion| companion.detail.as_deref())
|
||||||
|
.unwrap();
|
||||||
|
assert!(detail.contains("unix:42"));
|
||||||
|
assert!(detail.contains("weak notify"));
|
||||||
|
assert!(app.prepare_companion_progress_notice().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn companion_progress_notice_target_accepts_live_running_companion() {
|
||||||
|
let app = ticket_enabled_app(vec![
|
||||||
|
live_info("yoi", PodStatus::Running),
|
||||||
|
live_info("test-orchestrator", PodStatus::Running),
|
||||||
|
]);
|
||||||
|
let target = companion_progress_notice_target(&app.panel, &app.list).unwrap();
|
||||||
|
assert_eq!(target.pod_name, "yoi");
|
||||||
|
assert_eq!(target.socket_path, PathBuf::from("/tmp/yoi.sock"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn companion_progress_failure_is_best_effort_and_does_not_mark_freshness() {
|
||||||
|
let mut app = ticket_enabled_app(vec![
|
||||||
|
live_info("yoi", PodStatus::Idle),
|
||||||
|
live_info("test-orchestrator", PodStatus::Idle),
|
||||||
|
]);
|
||||||
|
let request = app.prepare_companion_progress_notice().unwrap();
|
||||||
|
app.finish_companion_progress_notice(CompanionProgressNoticeResult::failed(
|
||||||
|
request.notice.fingerprint,
|
||||||
|
"socket closed",
|
||||||
|
));
|
||||||
|
|
||||||
|
assert!(app.companion_progress.is_none());
|
||||||
|
assert!(app.notice.as_deref().unwrap().contains("not delivered"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn no_ticket_selection_keeps_enter_pod_centric() {
|
fn no_ticket_selection_keeps_enter_pod_centric() {
|
||||||
let mut app = test_app(vec![live_info("alpha", PodStatus::Idle)]);
|
let mut app = test_app(vec![live_info("alpha", PodStatus::Idle)]);
|
||||||
|
|
@ -6103,6 +6542,7 @@ mod tests {
|
||||||
runtime_command: PodRuntimeCommand::for_executable("/tmp/yoi"),
|
runtime_command: PodRuntimeCommand::for_executable("/tmp/yoi"),
|
||||||
last_companion_lifecycle_failure,
|
last_companion_lifecycle_failure,
|
||||||
last_orchestrator_lifecycle_failure,
|
last_orchestrator_lifecycle_failure,
|
||||||
|
companion_progress: None,
|
||||||
};
|
};
|
||||||
app.ensure_selection_visible();
|
app.ensure_selection_visible();
|
||||||
app.ensure_composer_target_available();
|
app.ensure_composer_target_available();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user