From 2f3f54b01a0c54dee6072857b1a408070e6abc1f Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 7 Jun 2026 11:25:03 +0900 Subject: [PATCH] fixup! tui: add panel role session registry --- crates/tui/src/multi_pod.rs | 469 +++++++++--------------- crates/tui/src/role_session_registry.rs | 244 ++++++++---- crates/tui/src/workspace_panel.rs | 17 +- 3 files changed, 360 insertions(+), 370 deletions(-) diff --git a/crates/tui/src/multi_pod.rs b/crates/tui/src/multi_pod.rs index d117462b..b3058125 100644 --- a/crates/tui/src/multi_pod.rs +++ b/crates/tui/src/multi_pod.rs @@ -11,7 +11,7 @@ use client::{PodRuntimeCommand, SpawnConfig, spawn_pod}; use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read}; use pod_store::FsPodStore; use protocol::stream::{JsonLineReader, JsonLineWriter}; -use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment}; +use protocol::{ErrorCode, Event, Method, PodStatus, Segment}; use ratatui::Frame; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; @@ -33,7 +33,9 @@ use crate::pod_list::{ PodList, PodListEntry, PodVisibilitySource, StoredMetadataState, read_reachable_live_pod_infos, read_stored_pod_infos, }; -use crate::role_session_registry::{PanelRegistryStore, TicketClaimResult}; +use crate::role_session_registry::{ + PanelRegistryStore, RelatedTicketRef, RoleSessionOrigin, TicketClaimResult, +}; use crate::workspace_panel::{ ActionPriority, ComposerTarget, NextUserAction, OrchestratorLifecyclePlan, OrchestratorPanelState, OrchestratorPanelStatus, OrchestratorPodPresence, PanelRow, @@ -144,14 +146,6 @@ pub(crate) async fn run( app.notice = Some("Refresh already in progress.".to_string()); } } - MultiPodAction::Send(request) => { - pending_reload.abort(); - terminal.draw(|f| draw(f, app))?; - let result = send_run_and_confirm(&request.socket_path, request.segments).await; - app.finish_send(result); - app.reload_or_notice().await; - next_poll = Instant::now() + MULTI_POD_POLL_INTERVAL; - } MultiPodAction::DispatchTicketAction(request) => { pending_reload.abort(); terminal.draw(|f| draw(f, app))?; @@ -258,17 +252,11 @@ fn default_pod_store_dir() -> Result { #[cfg(test)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum SendEligibility { - SendNow, +pub(crate) enum OpenEligibility { + OpenNow, Disabled, } -#[derive(Debug)] -pub(crate) struct DirectSendRequest { - socket_path: PathBuf, - segments: Vec, -} - #[derive(Debug)] pub(crate) struct IntakeLaunchRequest { context: TicketRoleLaunchContext, @@ -282,13 +270,10 @@ pub(crate) enum IntakeRegistryUpdate { RecordSession { registry_root: PathBuf, pod_name: String, - ticket_ids: Vec, - }, - ClaimedTicket { - registry_root: PathBuf, - ticket_id: String, - pod_name: String, + origin: RoleSessionOrigin, + related_tickets: Vec, }, + ClaimedTicket, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -301,6 +286,7 @@ pub(crate) enum IntakePeerRegistrationRequest { pub(crate) struct IntakeLaunchOutcome { launch: TicketRoleLaunchResult, peer_registration: IntakePeerRegistrationStatus, + registry_warning: Option, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -335,42 +321,34 @@ pub(crate) async fn launch_intake_with_handoff(request: IntakeLaunchRequest) -> ))), ), }; - let launch = match launch_ticket_role_pod_with_options( + let launch = launch_ticket_role_pod_with_options( request.context, request.runtime_command, |_| {}, options, ) - .await - { - Ok(launch) => launch, - Err(error) => { - if let IntakeRegistryUpdate::ClaimedTicket { - registry_root, - ticket_id, - pod_name, - } = &request.registry_update - { - let _ = PanelRegistryStore::from_root(registry_root.clone()) - .release_ticket_claim(ticket_id, pod_name); - } - return Err(error); - } - }; - match &request.registry_update { + .await?; + let registry_warning = match request.registry_update { IntakeRegistryUpdate::RecordSession { registry_root, pod_name, - ticket_ids, - } => { - let _ = PanelRegistryStore::from_root(registry_root.clone()).record_session( - pod_name.clone(), + origin, + related_tickets, + } => PanelRegistryStore::from_root(registry_root) + .record_session( + pod_name, TicketRole::Intake.as_str().to_string(), + origin, None, - ticket_ids.clone(), - ); - } - IntakeRegistryUpdate::ClaimedTicket { .. } => {} + related_tickets, + ) + .err() + .map(|error| { + bounded_panel_diagnostic(format!( + "local role session registry could not be updated after Intake launch: {error}" + )) + }), + IntakeRegistryUpdate::ClaimedTicket => None, }; let peer_registration = match (orchestrator_pod, skip_warning) { (_, Some(warning)) => warning, @@ -392,6 +370,7 @@ pub(crate) async fn launch_intake_with_handoff(request: IntakeLaunchRequest) -> Ok(IntakeLaunchOutcome { launch, peer_registration, + registry_warning, }) } @@ -506,14 +485,14 @@ impl MultiPodApp { } #[cfg(test)] - pub(crate) fn selected_send_eligibility(&self) -> SendEligibility { + pub(crate) fn selected_open_eligibility(&self) -> OpenEligibility { match self.selected_pod_entry() { - Some(entry) if entry.actions.can_send_now => SendEligibility::SendNow, - _ => SendEligibility::Disabled, + Some(entry) if entry.actions.can_open => OpenEligibility::OpenNow, + _ => OpenEligibility::Disabled, } } - pub(crate) fn selected_send_disabled_reason(&self) -> Option { + pub(crate) fn selected_open_disabled_reason(&self) -> Option { if let Some(row) = self .selected_panel_row() .filter(|row| row.is_ticket_action()) @@ -523,16 +502,16 @@ impl MultiPodApp { .clone() .or_else(|| row.key_hint.clone()) .unwrap_or_else(|| { - "Press Enter to dispatch this Ticket action; stale Tickets are re-checked before any mutation." + "Empty Enter dispatches this Ticket action; stale Tickets are re-checked before any mutation." .to_string() }), ); } let entry = self.selected_pod_entry()?; - if entry.actions.can_send_now { + if entry.actions.can_open { return None; } - Some(send_disabled_reason(entry)) + Some(open_disabled_reason(entry)) } pub(crate) fn select_next(&mut self) { @@ -691,53 +670,16 @@ impl MultiPodApp { segments_are_blank(&self.input.submit_segments()) } - pub(crate) fn prepare_send(&mut self) -> Option { - let (target_name, socket_path) = { - let entry = match self.selected_pod_entry() { - Some(entry) => entry, - None => { - self.notice = Some(selected_ticket_notice(self.selected_panel_row())); - return None; - } - }; - if !entry.actions.can_send_now { - self.notice = Some(send_disabled_reason(entry)); - return None; - } - let Some(socket_path) = entry.attach_socket_path().map(PathBuf::from) else { - self.notice = Some("Selected Pod has no reachable socket.".to_string()); - return None; - }; - (entry.name.clone(), socket_path) - }; + pub(crate) fn reject_companion_submit(&mut self) { let segments = self.input.submit_segments(); if segments_are_blank(&segments) { self.notice = Some("Composer is empty.".to_string()); - return None; + return; } - self.sending = true; - self.notice = Some(format!("Sending to {target_name}…")); - Some(DirectSendRequest { - socket_path, - segments, - }) - } - - pub(crate) fn finish_send(&mut self, result: Result<(), DirectSendError>) { self.sending = false; - match result { - Ok(()) => { - let target = self - .selected_pod_entry() - .map(|entry| entry.name.clone()) - .unwrap_or_else(|| "selected Pod".to_string()); - self.input.clear(); - self.notice = Some(format!("Delivered to {target}.")); - } - Err(e) => { - self.notice = Some(format!("Delivery failed; composer kept: {e}")); - } - } + self.notice = Some(bounded_panel_diagnostic( + "Companion composer is not wired to a Companion Pod yet; draft kept. Press o or empty Enter to open/attach the selected Pod.", + )); } pub(crate) fn prepare_ticket_action_dispatch(&mut self) -> Option { @@ -789,36 +731,6 @@ impl MultiPodApp { }); } - fn prepare_intake_peer_registration( - &self, - context: &mut TicketRoleLaunchContext, - ) -> IntakePeerRegistrationRequest { - match self.panel.header.orchestrator.as_ref() { - Some(orchestrator) => { - context.intake_handoff = Some(TicketIntakeHandoff::new( - orchestrator.pod_name.clone(), - self.panel.header.workspace_label.clone(), - )); - if orchestrator_status_is_peer_reachable(orchestrator.status) { - IntakePeerRegistrationRequest::Register { - orchestrator_pod: orchestrator.pod_name.clone(), - } - } else { - IntakePeerRegistrationRequest::Skip { - reason: format!( - "workspace Orchestrator {} is {}; launch input still carries the auditable handoff target", - orchestrator.pod_name, - orchestrator.status.label() - ), - } - } - } - None => IntakePeerRegistrationRequest::Skip { - reason: "workspace Orchestrator is not configured for this panel".to_string(), - }, - } - } - pub(crate) fn prepare_intake_launch(&mut self) -> Option { if !self .panel @@ -848,11 +760,6 @@ impl MultiPodApp { return None; } }; - let registry_update = IntakeRegistryUpdate::RecordSession { - registry_root: store.root().to_path_buf(), - pod_name, - ticket_ids: Vec::new(), - }; let peer_registration = self.prepare_intake_peer_registration(&mut context); self.sending = true; self.notice = Some("Launching Ticket Intake…".to_string()); @@ -860,10 +767,45 @@ impl MultiPodApp { context, runtime_command: self.runtime_command.clone(), peer_registration, - registry_update, + registry_update: IntakeRegistryUpdate::RecordSession { + registry_root: store.root().to_path_buf(), + pod_name, + origin: RoleSessionOrigin::PreTicketIntake, + related_tickets: Vec::new(), + }, }) } + fn prepare_intake_peer_registration( + &self, + context: &mut TicketRoleLaunchContext, + ) -> IntakePeerRegistrationRequest { + match self.panel.header.orchestrator.as_ref() { + Some(orchestrator) => { + context.intake_handoff = Some(TicketIntakeHandoff::new( + orchestrator.pod_name.clone(), + self.panel.header.workspace_label.clone(), + )); + if orchestrator_status_is_peer_reachable(orchestrator.status) { + IntakePeerRegistrationRequest::Register { + orchestrator_pod: orchestrator.pod_name.clone(), + } + } else { + IntakePeerRegistrationRequest::Skip { + reason: format!( + "workspace Orchestrator {} is {}; launch input still carries the auditable handoff target", + orchestrator.pod_name, + orchestrator.status.label() + ), + } + } + } + None => IntakePeerRegistrationRequest::Skip { + reason: "workspace Orchestrator is not configured for this panel".to_string(), + }, + } + } + pub(crate) fn prepare_existing_ticket_intake_launch(&mut self) -> Option { let row = match self.selected_panel_row() { Some(row) if row.is_ticket_action() => row, @@ -936,7 +878,12 @@ impl MultiPodApp { } }; context.pod_name = Some(planned.pod_name.clone()); - match store.claim_ticket(&ticket_id, &planned.pod_name, TicketRole::Intake.as_str()) { + match store.claim_ticket( + &ticket_id, + Some(&ticket_slug), + &planned.pod_name, + TicketRole::Intake.as_str(), + ) { Ok(TicketClaimResult::Claimed) | Ok(TicketClaimResult::AlreadyOwned(_)) => {} Err(error) => { self.notice = Some(format!("Ticket claim diagnostic required: {error}")); @@ -953,11 +900,7 @@ impl MultiPodApp { context, runtime_command: self.runtime_command.clone(), peer_registration, - registry_update: IntakeRegistryUpdate::ClaimedTicket { - registry_root: store.root().to_path_buf(), - ticket_id, - pod_name: planned.pod_name, - }, + registry_update: IntakeRegistryUpdate::ClaimedTicket, }) } @@ -975,8 +918,12 @@ impl MultiPodApp { format!(" Handoff warning: {message}") } }; + let registry_notice = result + .registry_warning + .map(|warning| format!(" Registry warning: {warning}")) + .unwrap_or_default(); self.notice = Some(bounded_panel_diagnostic(format!( - "Launched Ticket Intake Pod {pod_name}.{peer_notice}" + "Launched Ticket Intake Pod {pod_name}.{peer_notice}{registry_notice}" ))); } Err(error) => { @@ -1041,10 +988,10 @@ impl MultiPodApp { .map(MultiPodAction::LaunchIntake) .unwrap_or(MultiPodAction::None), KeyCode::Enter if self.composer_is_blank() => MultiPodAction::Open, - KeyCode::Enter => self - .prepare_send() - .map(MultiPodAction::Send) - .unwrap_or(MultiPodAction::None), + KeyCode::Enter => { + self.reject_companion_submit(); + MultiPodAction::None + } KeyCode::Backspace => { self.input.delete_before(); MultiPodAction::None @@ -1083,7 +1030,6 @@ enum MultiPodAction { Quit, Open, Refresh, - Send(DirectSendRequest), DispatchTicketAction(TicketActionRequest), LaunchIntake(IntakeLaunchRequest), } @@ -1334,7 +1280,7 @@ fn existing_ticket_claim_notice( ) -> String { match status { TicketLocalClaimStatus::Live | TicketLocalClaimStatus::Restorable => format!( - "Ticket {ticket_id} is already claimed by local Intake Pod {pod_name} ({}); select/open that Pod instead of starting a second Intake.", + "Ticket {ticket_id} is already claimed by local Intake Pod {pod_name} ({}); open that Pod instead of starting a second Intake.", status.label() ), TicketLocalClaimStatus::Stale => format!( @@ -1559,7 +1505,6 @@ async fn dispatch_ticket_action( NextUserAction::Clarify | NextUserAction::Edit | NextUserAction::OpenPod - | NextUserAction::SendToPod | NextUserAction::Wait => Ok(TicketActionOutcome { notice: format!( "{} for Ticket {} has no safe inline workspace-panel dispatch; use the Ticket workflow.", @@ -1615,11 +1560,11 @@ async fn notify_workspace_orchestrator( } } -async fn send_notify_only(socket: &Path, message: String) -> Result<(), DirectSendError> { +async fn send_notify_only(socket: &Path, message: String) -> Result<(), NotifySendError> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) .await - .map_err(|_| DirectSendError::Io("connect timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("connect: {e}")))?; + .map_err(|_| NotifySendError::Io("connect timed out".into()))? + .map_err(|e| NotifySendError::Io(format!("connect: {e}")))?; let (reader, writer) = stream.into_split(); let mut reader = JsonLineReader::new(reader); let mut writer = JsonLineWriter::new(writer); @@ -1627,17 +1572,17 @@ async fn send_notify_only(socket: &Path, message: String) -> Result<(), DirectSe loop { let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) .await - .map_err(|_| DirectSendError::Io("read initial Snapshot timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("read initial Snapshot: {e}")))?; + .map_err(|_| NotifySendError::Io("read initial Snapshot timed out".into()))? + .map_err(|e| NotifySendError::Io(format!("read initial Snapshot: {e}")))?; match event { Some(Event::Snapshot { .. }) => break, Some(Event::Alert(_)) => continue, Some(Event::Error { code, message }) => { - return Err(DirectSendError::Rejected { code, message }); + return Err(NotifySendError::Rejected { code, message }); } Some(_) => continue, None => { - return Err(DirectSendError::Io( + return Err(NotifySendError::Io( "connection closed before initial Snapshot".into(), )); } @@ -1646,96 +1591,28 @@ async fn send_notify_only(socket: &Path, message: String) -> Result<(), DirectSe tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::Notify { message })) .await - .map_err(|_| DirectSendError::Io("write timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("write: {e}"))) + .map_err(|_| NotifySendError::Io("write timed out".into()))? + .map_err(|e| NotifySendError::Io(format!("write: {e}"))) } #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum DirectSendError { - AlreadyRunning, +pub(crate) enum NotifySendError { Rejected { code: ErrorCode, message: String }, Io(String), } -impl std::fmt::Display for DirectSendError { +impl std::fmt::Display for NotifySendError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::AlreadyRunning => write!(f, "target Pod is already running"), Self::Rejected { code, message } => { - write!(f, "target rejected run ({code:?}): {message}") + write!(f, "target rejected method ({code:?}): {message}") } Self::Io(message) => write!(f, "{message}"), } } } -impl std::error::Error for DirectSendError {} - -async fn send_run_and_confirm(socket: &Path, input: Vec) -> Result<(), DirectSendError> { - let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) - .await - .map_err(|_| DirectSendError::Io("connect timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("connect: {e}")))?; - let (reader, writer) = stream.into_split(); - let mut reader = JsonLineReader::new(reader); - let mut writer = JsonLineWriter::new(writer); - - loop { - let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) - .await - .map_err(|_| DirectSendError::Io("read initial Snapshot timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("read initial Snapshot: {e}")))?; - match event { - Some(Event::Snapshot { .. }) => break, - Some(Event::Alert(_)) => continue, - Some(Event::Error { - code: ErrorCode::AlreadyRunning, - .. - }) => return Err(DirectSendError::AlreadyRunning), - Some(Event::Error { code, message }) => { - return Err(DirectSendError::Rejected { code, message }); - } - Some(_) => continue, - None => { - return Err(DirectSendError::Io( - "connection closed before initial Snapshot".into(), - )); - } - } - } - - tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::Run { input })) - .await - .map_err(|_| DirectSendError::Io("write timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("write: {e}")))?; - - loop { - let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) - .await - .map_err(|_| DirectSendError::Io("read response timed out".into()))? - .map_err(|e| DirectSendError::Io(format!("read response: {e}")))?; - match event { - Some(Event::Error { - code: ErrorCode::AlreadyRunning, - .. - }) => return Err(DirectSendError::AlreadyRunning), - Some(Event::Error { code, message }) => { - return Err(DirectSendError::Rejected { code, message }); - } - Some(Event::InvokeStart { - kind: InvokeKind::UserSend, - }) - | Some(Event::UserMessage { .. }) - | Some(Event::TurnStart { .. }) => return Ok(()), - Some(_) => continue, - None => { - return Err(DirectSendError::Io( - "connection closed before response".into(), - )); - } - } - } -} +impl std::error::Error for NotifySendError {} fn segments_are_blank(segments: &[Segment]) -> bool { segments.iter().all(|segment| match segment { @@ -1744,31 +1621,31 @@ fn segments_are_blank(segments: &[Segment]) -> bool { }) } -fn send_disabled_reason(entry: &PodListEntry) -> String { +fn open_disabled_reason(entry: &PodListEntry) -> String { if let Some(live) = entry.live.as_ref() { if !live.reachable { return "Selected live Pod is unreachable.".to_string(); } return match live.status { Some(PodStatus::Running) => { - "Selected Pod is running; direct send is disabled in multi-Pod view.".to_string() + "Selected Pod is running; press o or empty Enter to open/attach.".to_string() } Some(PodStatus::Paused) => { "Selected Pod is paused; open it explicitly to resume or start a new turn." .to_string() } - Some(PodStatus::Idle) => "Selected Pod is not send-eligible.".to_string(), + Some(PodStatus::Idle) => "Selected Pod can be opened/attached.".to_string(), None => "Selected Pod did not report a live status.".to_string(), }; } if entry.stored.is_some() { - return "Selected Pod is stopped; open/restore it before sending.".to_string(); + return "Selected Pod is stopped; press o or empty Enter to restore/open.".to_string(); } entry .actions .disabled_reason .clone() - .unwrap_or_else(|| "Selected Pod is not send-eligible.".to_string()) + .unwrap_or_else(|| "Selected Pod cannot be opened from this row.".to_string()) } fn selected_ticket_notice(row: Option<&PanelRow>) -> String { @@ -2011,11 +1888,11 @@ fn draw_title(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) { .composer .is_available(ComposerTarget::TicketIntake) { - " Enter dispatches selected Ticket action · Ctrl+T target · o open/attach · r refresh" + " Empty Enter dispatches selected Ticket action · Ctrl+T target · o/empty Enter open/attach · r refresh" } else if app.panel.header.ticket_configured { - " Enter dispatches selected Ticket action · Enter sends to selected idle Pod when no Ticket action is selected · o open/attach · r refresh" + " Empty Enter dispatches selected Ticket action · o/empty Enter open/attach Pod · r refresh" } else { - " Pod-centric view · Enter sends to selected idle Pod · o open/attach · r refresh" + " Pod-centric view · o/empty Enter open/attach selected Pod · r refresh" }; let mut spans = vec![ Span::styled( @@ -2397,15 +2274,16 @@ fn draw_actionbar(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) { let left = if app.sending && app.composer_target() == ComposerTarget::TicketIntake { "launching Ticket Intake…".to_string() } else if app.sending { - "sending…".to_string() + "working…".to_string() } else if let Some(notice) = app.notice.as_deref() { notice.to_string() - } else if let Some(reason) = app.selected_send_disabled_reason() { + } else if let Some(reason) = app.selected_open_disabled_reason() { reason } else { match app.composer_target() { ComposerTarget::Companion => { - "idle live target: Enter sends directly without opening conversation".to_string() + "Companion target pending; non-empty Enter keeps draft and reports a diagnostic" + .to_string() } ComposerTarget::TicketIntake => { "Ticket Intake target: Enter launches Intake with composer text".to_string() @@ -2417,9 +2295,9 @@ fn draw_actionbar(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) { .composer .is_available(ComposerTarget::TicketIntake) { - "↑/↓ select Enter use target Ctrl+T target o open r refresh Esc quit" + "↑/↓ select Empty Enter target/open Ctrl+T target o open r refresh Esc quit" } else { - "↑/↓ select Enter send o open r refresh Esc quit" + "↑/↓ select Empty Enter open non-empty Enter diagnose o open r refresh Esc quit" }; let left_width = area .width @@ -2759,7 +2637,7 @@ mod tests { let mut app = app_with_panel(list, panel); assert_eq!(app.selected_panel_row().unwrap().title, "Needs Human Reply"); - assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::Disabled); let lines = list_lines(&app, 100, 6) .into_iter() .map(|line| plain_line(&line)) @@ -2773,20 +2651,23 @@ mod tests { app.select_next(); assert_eq!(app.list.selected_entry().unwrap().name, "idle"); - assert_eq!(app.selected_send_eligibility(), SendEligibility::SendNow); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow); let open = app.prepare_open().unwrap(); assert_eq!(open.pod_name, "idle"); assert_eq!(open.socket_override, Some(PathBuf::from("/tmp/idle.sock"))); - app.input.insert_str("send after ticket row"); - let request = match app.handle_key(key(KeyCode::Enter)) { - MultiPodAction::Send(request) => request, - _ => panic!("Pod row should preserve direct send behavior"), - }; - assert_eq!(request.socket_path, PathBuf::from("/tmp/idle.sock")); - assert_eq!( - Segment::flatten_to_text(&request.segments), - "send after ticket row" + app.input.insert_str("draft after ticket row"); + assert!(matches!( + app.handle_key(key(KeyCode::Enter)), + MultiPodAction::None + )); + assert!(!app.sending); + assert_eq!(input_text(&app), "draft after ticket row"); + assert!( + app.notice + .as_deref() + .unwrap() + .contains("Companion composer is not wired") ); } @@ -2912,11 +2793,11 @@ mod tests { } #[test] - fn multi_idle_live_selected_target_is_send_eligible() { + fn multi_idle_live_selected_target_is_open_eligible() { let app = test_app(vec![live_info("idle", PodStatus::Idle)]); - assert_eq!(app.selected_send_eligibility(), SendEligibility::SendNow); - assert!(app.selected_send_disabled_reason().is_none()); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow); + assert!(app.selected_open_disabled_reason().is_none()); } #[test] @@ -3058,7 +2939,7 @@ mod tests { } #[test] - fn multi_running_paused_and_stopped_targets_are_direct_send_disabled() { + fn multi_running_paused_and_stopped_targets_are_open_eligible() { let mut app = test_app(vec![ live_info("running", PodStatus::Running), live_info("paused", PodStatus::Paused), @@ -3077,28 +2958,16 @@ mod tests { app.selected_row = None; app.ensure_selection_visible(); - assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled); - assert!( - app.selected_send_disabled_reason() - .unwrap() - .contains("running") - ); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow); + assert!(app.selected_open_disabled_reason().is_none()); app.select_next(); assert_eq!(app.list.selected_entry().unwrap().name, "paused"); - assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled); - assert!( - app.selected_send_disabled_reason() - .unwrap() - .contains("paused") - ); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow); + assert!(app.selected_open_disabled_reason().is_none()); app.select_next(); assert_eq!(app.list.selected_entry().unwrap().name, "stopped"); - assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled); - assert!( - app.selected_send_disabled_reason() - .unwrap() - .contains("stopped") - ); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow); + assert!(app.selected_open_disabled_reason().is_none()); } #[test] @@ -3202,7 +3071,7 @@ mod tests { assert!(app.selected_row.is_none()); assert!(app.list.selected_name.is_none()); - assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled); + assert_eq!(app.selected_open_eligibility(), OpenEligibility::Disabled); } #[test] @@ -3295,26 +3164,32 @@ mod tests { } #[test] - fn multi_delivery_failure_keeps_composer_contents() { + fn multi_companion_submit_keeps_composer_contents() { let mut app = test_app(vec![live_info("idle", PodStatus::Idle)]); app.input.insert_str("keep me"); let before = input_text(&app); - app.finish_send(Err(DirectSendError::Io("boom".to_string()))); + app.reject_companion_submit(); assert_eq!(input_text(&app), before); - assert!(app.notice.as_deref().unwrap().contains("composer kept")); + assert!(!app.sending); + assert!( + app.notice + .as_deref() + .unwrap() + .contains("Companion composer is not wired") + ); } #[test] - fn multi_delivery_success_clears_composer_contents() { + fn multi_companion_submit_empty_reports_empty_composer() { let mut app = test_app(vec![live_info("idle", PodStatus::Idle)]); - app.input.insert_str("send me"); - app.finish_send(Ok(())); + app.reject_companion_submit(); assert_eq!(input_text(&app), ""); - assert!(app.notice.as_deref().unwrap().contains("Delivered")); + assert!(!app.sending); + assert_eq!(app.notice.as_deref(), Some("Composer is empty.")); } #[test] @@ -3399,19 +3274,23 @@ mod tests { } #[test] - fn multi_non_empty_enter_uses_direct_send_action() { + fn multi_non_empty_enter_reports_companion_unavailable() { let mut app = test_app(vec![live_info("idle", PodStatus::Idle)]); - app.input.insert_str("send me"); + app.input.insert_str("keep this draft"); - let request = match app.handle_key(key(KeyCode::Enter)) { - MultiPodAction::Send(request) => request, - _ => panic!("non-empty Enter should direct-send"), - }; + assert!(matches!( + app.handle_key(key(KeyCode::Enter)), + MultiPodAction::None + )); - assert_eq!(request.socket_path, PathBuf::from("/tmp/idle.sock")); - assert_eq!(Segment::flatten_to_text(&request.segments), "send me"); - assert!(app.sending); - assert!(app.notice.as_deref().unwrap().contains("Sending to idle")); + assert_eq!(input_text(&app), "keep this draft"); + assert!(!app.sending); + assert!( + app.notice + .as_deref() + .unwrap() + .contains("Companion composer is not wired") + ); } #[test] @@ -3477,7 +3356,6 @@ mod tests { let request = match app.handle_key(key(KeyCode::Enter)) { MultiPodAction::LaunchIntake(request) => request, - MultiPodAction::Send(_) => panic!("Ticket Intake target must not direct-send"), _ => panic!("Ticket Intake target should launch Intake"), }; @@ -3556,6 +3434,7 @@ mod tests { peer_registration: IntakePeerRegistrationStatus::Registered { orchestrator_pod: "test-orchestrator".to_string(), }, + registry_warning: None, })); assert!(!app.sending); diff --git a/crates/tui/src/role_session_registry.rs b/crates/tui/src/role_session_registry.rs index e97ec844..f4dcb41c 100644 --- a/crates/tui/src/role_session_registry.rs +++ b/crates/tui/src/role_session_registry.rs @@ -1,12 +1,15 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fs::{self, OpenOptions}; -use std::io::{self, Write}; +use std::io; use std::path::{Path, PathBuf}; +use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; const REGISTRY_VERSION: u32 = 1; const REGISTRY_FILE: &str = "role-sessions.json"; +const REGISTRY_LOCK_FILE: &str = "role-sessions.lock"; const CLAIMS_DIR: &str = "ticket-claims"; #[derive(Debug, Clone)] @@ -24,17 +27,37 @@ pub(crate) struct RoleSessionRegistry { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub(crate) struct RoleSessionRecord { - pub pod_name: String, pub role: String, + pub pod_name: String, + pub origin: RoleSessionOrigin, + pub created_at: String, + pub updated_at: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub session_id: Option, #[serde(default)] - pub ticket_ids: Vec, + pub related_tickets: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum RoleSessionOrigin { + PreTicketIntake, + TicketClaim, + RoleLaunch, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct RelatedTicketRef { + pub id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub slug: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub(crate) struct TicketClaim { pub ticket_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub ticket_slug: Option, pub pod_name: String, pub role: String, } @@ -144,37 +167,46 @@ impl PanelRegistryStore { &self, pod_name: impl Into, role: impl Into, + origin: RoleSessionOrigin, session_id: Option, - ticket_ids: impl IntoIterator, + related_tickets: impl IntoIterator, ) -> Result<(), PanelRegistryError> { let pod_name = pod_name.into(); let role = role.into(); - let mut registry = self.load_registry()?; - registry.version = REGISTRY_VERSION; - if let Some(workspace_root) = self.workspace_root.as_ref() { - registry.workspace_root = workspace_root.clone(); - } - let mut tickets: BTreeSet = registry - .sessions - .get(&pod_name) - .map(|record| record.ticket_ids.iter().cloned().collect()) - .unwrap_or_default(); - tickets.extend(ticket_ids); - registry.sessions.insert( - pod_name.clone(), - RoleSessionRecord { - pod_name, - role, - session_id, - ticket_ids: tickets.into_iter().collect(), - }, - ); - self.save_registry(®istry) + let related_tickets: Vec = related_tickets.into_iter().collect(); + self.update_registry(|registry| { + let now = now_timestamp_string(); + let mut tickets: BTreeSet = registry + .sessions + .get(&pod_name) + .map(|record| record.related_tickets.iter().cloned().collect()) + .unwrap_or_default(); + tickets.extend(related_tickets); + let created_at = registry + .sessions + .get(&pod_name) + .map(|record| record.created_at.clone()) + .unwrap_or_else(|| now.clone()); + registry.sessions.insert( + pod_name.clone(), + RoleSessionRecord { + role, + pod_name, + origin, + created_at, + updated_at: now, + session_id, + related_tickets: tickets.into_iter().collect(), + }, + ); + Ok(()) + }) } pub(crate) fn claim_ticket( &self, ticket_id: &str, + ticket_slug: Option<&str>, pod_name: &str, role: &str, ) -> Result { @@ -182,25 +214,21 @@ impl PanelRegistryStore { let claim_path = self.claim_path(ticket_id); let claim = TicketClaim { ticket_id: ticket_id.to_string(), + ticket_slug: ticket_slug.map(ToOwned::to_owned), pod_name: pod_name.to_string(), role: role.to_string(), }; - let bytes = serde_json::to_vec_pretty(&claim)?; - match OpenOptions::new() - .write(true) - .create_new(true) - .open(&claim_path) - { - Ok(mut file) => { - if let Err(error) = file.write_all(&bytes).and_then(|()| file.write_all(b"\n")) { - let _ = fs::remove_file(&claim_path); - return Err(error.into()); - } + match self.create_claim_file(&claim_path, &claim) { + Ok(()) => { if let Err(error) = self.record_session( pod_name.to_string(), role.to_string(), + RoleSessionOrigin::TicketClaim, None, - [ticket_id.to_string()], + [RelatedTicketRef { + id: ticket_id.to_string(), + slug: ticket_slug.map(ToOwned::to_owned), + }], ) { let _ = fs::remove_file(&claim_path); return Err(error); @@ -237,39 +265,77 @@ impl PanelRegistryStore { } } - pub(crate) fn release_ticket_claim( + fn update_registry( &self, - ticket_id: &str, - pod_name: &str, + update: impl FnOnce(&mut RoleSessionRegistry) -> Result<(), PanelRegistryError>, ) -> Result<(), PanelRegistryError> { - match self.load_claim(ticket_id) { - Ok(claim) if claim.pod_name == pod_name => { - fs::remove_file(self.claim_path(ticket_id))?; - Ok(()) - } - Ok(_) => Ok(()), - Err(PanelRegistryError::Io(error)) if error.kind() == io::ErrorKind::NotFound => Ok(()), - Err(error) => Err(error), + fs::create_dir_all(&self.root)?; + let _lock = self.acquire_registry_lock()?; + let mut registry = self.load_registry()?; + registry.version = REGISTRY_VERSION; + if let Some(workspace_root) = self.workspace_root.as_ref() { + registry.workspace_root = workspace_root.clone(); } + update(&mut registry)?; + self.save_registry(®istry) + } + + fn acquire_registry_lock(&self) -> Result { + let lock_path = self.root.join(REGISTRY_LOCK_FILE); + for _ in 0..50 { + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&lock_path) + { + Ok(_) => return Ok(RegistryLockGuard { path: lock_path }), + Err(error) if error.kind() == io::ErrorKind::AlreadyExists => { + thread::sleep(Duration::from_millis(10)); + } + Err(error) => return Err(error.into()), + } + } + Err(PanelRegistryError::Io(io::Error::new( + io::ErrorKind::WouldBlock, + "timed out acquiring panel role session registry lock", + ))) } fn save_registry(&self, registry: &RoleSessionRegistry) -> Result<(), PanelRegistryError> { - fs::create_dir_all(&self.root)?; let path = self.registry_path(); - let temp_path = path.with_extension("json.tmp"); + let temp_path = path.with_extension(format!("json.{}.tmp", now_timestamp_string())); let bytes = serde_json::to_vec_pretty(registry)?; fs::write(&temp_path, [&bytes[..], b"\n"].concat())?; fs::rename(temp_path, path)?; Ok(()) } + fn create_claim_file(&self, claim_path: &Path, claim: &TicketClaim) -> io::Result<()> { + let temp_path = self + .claims_dir() + .join(format!(".{}.tmp", now_timestamp_string())); + let bytes = serde_json::to_vec_pretty(claim).map_err(io::Error::other)?; + fs::write(&temp_path, [&bytes[..], b"\n"].concat())?; + let link_result = fs::hard_link(&temp_path, claim_path); + let remove_result = fs::remove_file(&temp_path); + match (link_result, remove_result) { + (Ok(()), Ok(())) | (Ok(()), Err(_)) => Ok(()), + (Err(error), _) => Err(error), + } + } + fn load_claims(&self) -> Result, PanelRegistryError> { let mut claims: Vec = Vec::new(); match fs::read_dir(self.claims_dir()) { Ok(entries) => { for entry in entries { let entry = entry?; - if entry.file_type()?.is_file() { + if entry.file_type()?.is_file() + && entry + .path() + .extension() + .is_some_and(|extension| extension == "json") + { let bytes = fs::read(entry.path())?; claims.push(serde_json::from_slice(&bytes)?); } @@ -296,6 +362,16 @@ impl PanelRegistryStore { } } +struct RegistryLockGuard { + path: PathBuf, +} + +impl Drop for RegistryLockGuard { + fn drop(&mut self) { + let _ = fs::remove_file(&self.path); + } +} + impl PanelRegistrySnapshot { pub(crate) fn empty() -> Self { Self { @@ -359,6 +435,13 @@ fn encode_path_component(value: &str) -> String { encoded } +fn now_timestamp_string() -> String { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos().to_string()) + .unwrap_or_else(|_| "0".to_string()) +} + #[cfg(test)] mod tests { use super::*; @@ -376,7 +459,13 @@ mod tests { assert_ne!(store.root(), other.root()); store - .record_session("ticket-intake-preticket", "intake", None, []) + .record_session( + "ticket-intake-preticket", + "intake", + RoleSessionOrigin::PreTicketIntake, + None, + [], + ) .unwrap(); assert_eq!(store.load_registry().unwrap().workspace_root, "/repo/yoi"); } @@ -387,18 +476,17 @@ mod tests { let store = PanelRegistryStore::from_root(temp.path().join("registry")); assert!(matches!( - store.claim_ticket("T-1", "ticket-one-intake", "intake"), + store.claim_ticket("T-1", Some("ticket-one"), "ticket-one-intake", "intake"), Ok(TicketClaimResult::Claimed) )); let error = store - .claim_ticket("T-1", "ticket-two-intake", "intake") + .claim_ticket("T-1", Some("ticket-one"), "ticket-two-intake", "intake") .unwrap_err(); assert!(matches!(error, PanelRegistryError::TicketAlreadyClaimed(_))); - assert_eq!( - store.claim_for_ticket("T-1").unwrap().unwrap().pod_name, - "ticket-one-intake" - ); + let claim = store.claim_for_ticket("T-1").unwrap().unwrap(); + assert_eq!(claim.pod_name, "ticket-one-intake"); + assert_eq!(claim.ticket_slug.as_deref(), Some("ticket-one")); } #[test] @@ -407,14 +495,30 @@ mod tests { let store = PanelRegistryStore::from_root(temp.path().join("registry")); store - .record_session("ticket-intake-preticket", "intake", None, []) + .record_session( + "ticket-intake-preticket", + "intake", + RoleSessionOrigin::PreTicketIntake, + None, + [], + ) .unwrap(); store .record_session( "ticket-intake-shared", "intake", + RoleSessionOrigin::RoleLaunch, None, - ["T-1".to_string(), "T-2".to_string()], + [ + RelatedTicketRef { + id: "T-1".to_string(), + slug: Some("one".to_string()), + }, + RelatedTicketRef { + id: "T-2".to_string(), + slug: Some("two".to_string()), + }, + ], ) .unwrap(); @@ -430,7 +534,23 @@ mod tests { .find(|session| session.pod_name == "ticket-intake-shared") .unwrap(); - assert!(preticket.ticket_ids.is_empty()); - assert_eq!(shared.ticket_ids, vec!["T-1", "T-2"]); + assert!(preticket.related_tickets.is_empty()); + assert_eq!(shared.role, "intake"); + assert_eq!(shared.origin, RoleSessionOrigin::RoleLaunch); + assert!(!shared.created_at.is_empty()); + assert!(!shared.updated_at.is_empty()); + assert_eq!( + shared.related_tickets, + vec![ + RelatedTicketRef { + id: "T-1".to_string(), + slug: Some("one".to_string()), + }, + RelatedTicketRef { + id: "T-2".to_string(), + slug: Some("two".to_string()), + }, + ] + ); } } diff --git a/crates/tui/src/workspace_panel.rs b/crates/tui/src/workspace_panel.rs index c52ef9d8..df416508 100644 --- a/crates/tui/src/workspace_panel.rs +++ b/crates/tui/src/workspace_panel.rs @@ -167,7 +167,6 @@ pub(crate) enum NextUserAction { Edit, Wait, OpenPod, - SendToPod, } impl NextUserAction { @@ -180,7 +179,6 @@ impl NextUserAction { Self::Edit => "Edit", Self::Wait => "Wait", Self::OpenPod => "Open", - Self::SendToPod => "Send", } } } @@ -404,11 +402,6 @@ pub(crate) fn build_workspace_panel( { Ok(snapshot) => snapshot, Err(error) => { - let mut snapshot = PanelRegistrySnapshot::empty(); - // Keep panel rendering available even when the local registry is corrupt or - // unavailable; the diagnostic is attached below after the header is initialized. - snapshot.claims.clear(); - snapshot.sessions.clear(); let mut model = WorkspacePanelViewModel::empty(workspace_root); model .header @@ -420,7 +413,7 @@ pub(crate) fn build_workspace_panel( model, workspace_root, pods, - &snapshot, + &PanelRegistrySnapshot::empty(), ); } }; @@ -774,9 +767,7 @@ fn pod_rows(pods: &PodList) -> Vec { fn pod_row(entry: &PodListEntry) -> PanelRow { let status = pod_status_label(entry).to_string(); - let next_action = if entry.actions.can_send_now { - Some(NextUserAction::SendToPod) - } else if entry.actions.can_open { + let next_action = if entry.actions.can_open { Some(NextUserAction::OpenPod) } else { None @@ -802,7 +793,7 @@ fn pod_row(entry: &PodListEntry) -> PanelRow { ticket: None, related_pods: Vec::new(), disabled_reason: entry.actions.disabled_reason.clone(), - key_hint: Some("Pod rows preserve existing open/direct-send behavior".to_string()), + key_hint: Some("Press o or empty Enter to open/attach this Pod".to_string()), } } @@ -1077,7 +1068,7 @@ mod tests { let summary = backend.list(TicketFilter::all()).unwrap().remove(0); let store = PanelRegistryStore::from_root(temp.path().join("local-registry")); store - .claim_ticket(&summary.id, "ticket-claimed-intake", "intake") + .claim_ticket(&summary.id, None, "ticket-claimed-intake", "intake") .unwrap(); let registry = store.snapshot().unwrap();