fixup! tui: add panel role session registry
This commit is contained in:
parent
489059019d
commit
2f3f54b01a
|
|
@ -11,7 +11,7 @@ use client::{PodRuntimeCommand, SpawnConfig, spawn_pod};
|
|||
use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read};
|
||||
use pod_store::FsPodStore;
|
||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
||||
use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment};
|
||||
use protocol::{ErrorCode, Event, Method, PodStatus, Segment};
|
||||
use ratatui::Frame;
|
||||
use ratatui::Terminal;
|
||||
use ratatui::backend::CrosstermBackend;
|
||||
|
|
@ -33,7 +33,9 @@ use crate::pod_list::{
|
|||
PodList, PodListEntry, PodVisibilitySource, StoredMetadataState, read_reachable_live_pod_infos,
|
||||
read_stored_pod_infos,
|
||||
};
|
||||
use crate::role_session_registry::{PanelRegistryStore, TicketClaimResult};
|
||||
use crate::role_session_registry::{
|
||||
PanelRegistryStore, RelatedTicketRef, RoleSessionOrigin, TicketClaimResult,
|
||||
};
|
||||
use crate::workspace_panel::{
|
||||
ActionPriority, ComposerTarget, NextUserAction, OrchestratorLifecyclePlan,
|
||||
OrchestratorPanelState, OrchestratorPanelStatus, OrchestratorPodPresence, PanelRow,
|
||||
|
|
@ -144,14 +146,6 @@ pub(crate) async fn run(
|
|||
app.notice = Some("Refresh already in progress.".to_string());
|
||||
}
|
||||
}
|
||||
MultiPodAction::Send(request) => {
|
||||
pending_reload.abort();
|
||||
terminal.draw(|f| draw(f, app))?;
|
||||
let result = send_run_and_confirm(&request.socket_path, request.segments).await;
|
||||
app.finish_send(result);
|
||||
app.reload_or_notice().await;
|
||||
next_poll = Instant::now() + MULTI_POD_POLL_INTERVAL;
|
||||
}
|
||||
MultiPodAction::DispatchTicketAction(request) => {
|
||||
pending_reload.abort();
|
||||
terminal.draw(|f| draw(f, app))?;
|
||||
|
|
@ -258,17 +252,11 @@ fn default_pod_store_dir() -> Result<PathBuf, MultiPodError> {
|
|||
|
||||
#[cfg(test)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum SendEligibility {
|
||||
SendNow,
|
||||
pub(crate) enum OpenEligibility {
|
||||
OpenNow,
|
||||
Disabled,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DirectSendRequest {
|
||||
socket_path: PathBuf,
|
||||
segments: Vec<Segment>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct IntakeLaunchRequest {
|
||||
context: TicketRoleLaunchContext,
|
||||
|
|
@ -282,13 +270,10 @@ pub(crate) enum IntakeRegistryUpdate {
|
|||
RecordSession {
|
||||
registry_root: PathBuf,
|
||||
pod_name: String,
|
||||
ticket_ids: Vec<String>,
|
||||
},
|
||||
ClaimedTicket {
|
||||
registry_root: PathBuf,
|
||||
ticket_id: String,
|
||||
pod_name: String,
|
||||
origin: RoleSessionOrigin,
|
||||
related_tickets: Vec<RelatedTicketRef>,
|
||||
},
|
||||
ClaimedTicket,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
|
|
@ -301,6 +286,7 @@ pub(crate) enum IntakePeerRegistrationRequest {
|
|||
pub(crate) struct IntakeLaunchOutcome {
|
||||
launch: TicketRoleLaunchResult,
|
||||
peer_registration: IntakePeerRegistrationStatus,
|
||||
registry_warning: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
|
|
@ -335,42 +321,34 @@ pub(crate) async fn launch_intake_with_handoff(request: IntakeLaunchRequest) ->
|
|||
))),
|
||||
),
|
||||
};
|
||||
let launch = match launch_ticket_role_pod_with_options(
|
||||
let launch = launch_ticket_role_pod_with_options(
|
||||
request.context,
|
||||
request.runtime_command,
|
||||
|_| {},
|
||||
options,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(launch) => launch,
|
||||
Err(error) => {
|
||||
if let IntakeRegistryUpdate::ClaimedTicket {
|
||||
registry_root,
|
||||
ticket_id,
|
||||
pod_name,
|
||||
} = &request.registry_update
|
||||
{
|
||||
let _ = PanelRegistryStore::from_root(registry_root.clone())
|
||||
.release_ticket_claim(ticket_id, pod_name);
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
match &request.registry_update {
|
||||
.await?;
|
||||
let registry_warning = match request.registry_update {
|
||||
IntakeRegistryUpdate::RecordSession {
|
||||
registry_root,
|
||||
pod_name,
|
||||
ticket_ids,
|
||||
} => {
|
||||
let _ = PanelRegistryStore::from_root(registry_root.clone()).record_session(
|
||||
pod_name.clone(),
|
||||
origin,
|
||||
related_tickets,
|
||||
} => PanelRegistryStore::from_root(registry_root)
|
||||
.record_session(
|
||||
pod_name,
|
||||
TicketRole::Intake.as_str().to_string(),
|
||||
origin,
|
||||
None,
|
||||
ticket_ids.clone(),
|
||||
);
|
||||
}
|
||||
IntakeRegistryUpdate::ClaimedTicket { .. } => {}
|
||||
related_tickets,
|
||||
)
|
||||
.err()
|
||||
.map(|error| {
|
||||
bounded_panel_diagnostic(format!(
|
||||
"local role session registry could not be updated after Intake launch: {error}"
|
||||
))
|
||||
}),
|
||||
IntakeRegistryUpdate::ClaimedTicket => None,
|
||||
};
|
||||
let peer_registration = match (orchestrator_pod, skip_warning) {
|
||||
(_, Some(warning)) => warning,
|
||||
|
|
@ -392,6 +370,7 @@ pub(crate) async fn launch_intake_with_handoff(request: IntakeLaunchRequest) ->
|
|||
Ok(IntakeLaunchOutcome {
|
||||
launch,
|
||||
peer_registration,
|
||||
registry_warning,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -506,14 +485,14 @@ impl MultiPodApp {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn selected_send_eligibility(&self) -> SendEligibility {
|
||||
pub(crate) fn selected_open_eligibility(&self) -> OpenEligibility {
|
||||
match self.selected_pod_entry() {
|
||||
Some(entry) if entry.actions.can_send_now => SendEligibility::SendNow,
|
||||
_ => SendEligibility::Disabled,
|
||||
Some(entry) if entry.actions.can_open => OpenEligibility::OpenNow,
|
||||
_ => OpenEligibility::Disabled,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn selected_send_disabled_reason(&self) -> Option<String> {
|
||||
pub(crate) fn selected_open_disabled_reason(&self) -> Option<String> {
|
||||
if let Some(row) = self
|
||||
.selected_panel_row()
|
||||
.filter(|row| row.is_ticket_action())
|
||||
|
|
@ -523,16 +502,16 @@ impl MultiPodApp {
|
|||
.clone()
|
||||
.or_else(|| row.key_hint.clone())
|
||||
.unwrap_or_else(|| {
|
||||
"Press Enter to dispatch this Ticket action; stale Tickets are re-checked before any mutation."
|
||||
"Empty Enter dispatches this Ticket action; stale Tickets are re-checked before any mutation."
|
||||
.to_string()
|
||||
}),
|
||||
);
|
||||
}
|
||||
let entry = self.selected_pod_entry()?;
|
||||
if entry.actions.can_send_now {
|
||||
if entry.actions.can_open {
|
||||
return None;
|
||||
}
|
||||
Some(send_disabled_reason(entry))
|
||||
Some(open_disabled_reason(entry))
|
||||
}
|
||||
|
||||
pub(crate) fn select_next(&mut self) {
|
||||
|
|
@ -691,53 +670,16 @@ impl MultiPodApp {
|
|||
segments_are_blank(&self.input.submit_segments())
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_send(&mut self) -> Option<DirectSendRequest> {
|
||||
let (target_name, socket_path) = {
|
||||
let entry = match self.selected_pod_entry() {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
self.notice = Some(selected_ticket_notice(self.selected_panel_row()));
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if !entry.actions.can_send_now {
|
||||
self.notice = Some(send_disabled_reason(entry));
|
||||
return None;
|
||||
}
|
||||
let Some(socket_path) = entry.attach_socket_path().map(PathBuf::from) else {
|
||||
self.notice = Some("Selected Pod has no reachable socket.".to_string());
|
||||
return None;
|
||||
};
|
||||
(entry.name.clone(), socket_path)
|
||||
};
|
||||
pub(crate) fn reject_companion_submit(&mut self) {
|
||||
let segments = self.input.submit_segments();
|
||||
if segments_are_blank(&segments) {
|
||||
self.notice = Some("Composer is empty.".to_string());
|
||||
return None;
|
||||
return;
|
||||
}
|
||||
self.sending = true;
|
||||
self.notice = Some(format!("Sending to {target_name}…"));
|
||||
Some(DirectSendRequest {
|
||||
socket_path,
|
||||
segments,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn finish_send(&mut self, result: Result<(), DirectSendError>) {
|
||||
self.sending = false;
|
||||
match result {
|
||||
Ok(()) => {
|
||||
let target = self
|
||||
.selected_pod_entry()
|
||||
.map(|entry| entry.name.clone())
|
||||
.unwrap_or_else(|| "selected Pod".to_string());
|
||||
self.input.clear();
|
||||
self.notice = Some(format!("Delivered to {target}."));
|
||||
}
|
||||
Err(e) => {
|
||||
self.notice = Some(format!("Delivery failed; composer kept: {e}"));
|
||||
}
|
||||
}
|
||||
self.notice = Some(bounded_panel_diagnostic(
|
||||
"Companion composer is not wired to a Companion Pod yet; draft kept. Press o or empty Enter to open/attach the selected Pod.",
|
||||
));
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_ticket_action_dispatch(&mut self) -> Option<TicketActionRequest> {
|
||||
|
|
@ -789,36 +731,6 @@ impl MultiPodApp {
|
|||
});
|
||||
}
|
||||
|
||||
fn prepare_intake_peer_registration(
|
||||
&self,
|
||||
context: &mut TicketRoleLaunchContext,
|
||||
) -> IntakePeerRegistrationRequest {
|
||||
match self.panel.header.orchestrator.as_ref() {
|
||||
Some(orchestrator) => {
|
||||
context.intake_handoff = Some(TicketIntakeHandoff::new(
|
||||
orchestrator.pod_name.clone(),
|
||||
self.panel.header.workspace_label.clone(),
|
||||
));
|
||||
if orchestrator_status_is_peer_reachable(orchestrator.status) {
|
||||
IntakePeerRegistrationRequest::Register {
|
||||
orchestrator_pod: orchestrator.pod_name.clone(),
|
||||
}
|
||||
} else {
|
||||
IntakePeerRegistrationRequest::Skip {
|
||||
reason: format!(
|
||||
"workspace Orchestrator {} is {}; launch input still carries the auditable handoff target",
|
||||
orchestrator.pod_name,
|
||||
orchestrator.status.label()
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
None => IntakePeerRegistrationRequest::Skip {
|
||||
reason: "workspace Orchestrator is not configured for this panel".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_intake_launch(&mut self) -> Option<IntakeLaunchRequest> {
|
||||
if !self
|
||||
.panel
|
||||
|
|
@ -848,11 +760,6 @@ impl MultiPodApp {
|
|||
return None;
|
||||
}
|
||||
};
|
||||
let registry_update = IntakeRegistryUpdate::RecordSession {
|
||||
registry_root: store.root().to_path_buf(),
|
||||
pod_name,
|
||||
ticket_ids: Vec::new(),
|
||||
};
|
||||
let peer_registration = self.prepare_intake_peer_registration(&mut context);
|
||||
self.sending = true;
|
||||
self.notice = Some("Launching Ticket Intake…".to_string());
|
||||
|
|
@ -860,10 +767,45 @@ impl MultiPodApp {
|
|||
context,
|
||||
runtime_command: self.runtime_command.clone(),
|
||||
peer_registration,
|
||||
registry_update,
|
||||
registry_update: IntakeRegistryUpdate::RecordSession {
|
||||
registry_root: store.root().to_path_buf(),
|
||||
pod_name,
|
||||
origin: RoleSessionOrigin::PreTicketIntake,
|
||||
related_tickets: Vec::new(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn prepare_intake_peer_registration(
|
||||
&self,
|
||||
context: &mut TicketRoleLaunchContext,
|
||||
) -> IntakePeerRegistrationRequest {
|
||||
match self.panel.header.orchestrator.as_ref() {
|
||||
Some(orchestrator) => {
|
||||
context.intake_handoff = Some(TicketIntakeHandoff::new(
|
||||
orchestrator.pod_name.clone(),
|
||||
self.panel.header.workspace_label.clone(),
|
||||
));
|
||||
if orchestrator_status_is_peer_reachable(orchestrator.status) {
|
||||
IntakePeerRegistrationRequest::Register {
|
||||
orchestrator_pod: orchestrator.pod_name.clone(),
|
||||
}
|
||||
} else {
|
||||
IntakePeerRegistrationRequest::Skip {
|
||||
reason: format!(
|
||||
"workspace Orchestrator {} is {}; launch input still carries the auditable handoff target",
|
||||
orchestrator.pod_name,
|
||||
orchestrator.status.label()
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
None => IntakePeerRegistrationRequest::Skip {
|
||||
reason: "workspace Orchestrator is not configured for this panel".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_existing_ticket_intake_launch(&mut self) -> Option<IntakeLaunchRequest> {
|
||||
let row = match self.selected_panel_row() {
|
||||
Some(row) if row.is_ticket_action() => row,
|
||||
|
|
@ -936,7 +878,12 @@ impl MultiPodApp {
|
|||
}
|
||||
};
|
||||
context.pod_name = Some(planned.pod_name.clone());
|
||||
match store.claim_ticket(&ticket_id, &planned.pod_name, TicketRole::Intake.as_str()) {
|
||||
match store.claim_ticket(
|
||||
&ticket_id,
|
||||
Some(&ticket_slug),
|
||||
&planned.pod_name,
|
||||
TicketRole::Intake.as_str(),
|
||||
) {
|
||||
Ok(TicketClaimResult::Claimed) | Ok(TicketClaimResult::AlreadyOwned(_)) => {}
|
||||
Err(error) => {
|
||||
self.notice = Some(format!("Ticket claim diagnostic required: {error}"));
|
||||
|
|
@ -953,11 +900,7 @@ impl MultiPodApp {
|
|||
context,
|
||||
runtime_command: self.runtime_command.clone(),
|
||||
peer_registration,
|
||||
registry_update: IntakeRegistryUpdate::ClaimedTicket {
|
||||
registry_root: store.root().to_path_buf(),
|
||||
ticket_id,
|
||||
pod_name: planned.pod_name,
|
||||
},
|
||||
registry_update: IntakeRegistryUpdate::ClaimedTicket,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -975,8 +918,12 @@ impl MultiPodApp {
|
|||
format!(" Handoff warning: {message}")
|
||||
}
|
||||
};
|
||||
let registry_notice = result
|
||||
.registry_warning
|
||||
.map(|warning| format!(" Registry warning: {warning}"))
|
||||
.unwrap_or_default();
|
||||
self.notice = Some(bounded_panel_diagnostic(format!(
|
||||
"Launched Ticket Intake Pod {pod_name}.{peer_notice}"
|
||||
"Launched Ticket Intake Pod {pod_name}.{peer_notice}{registry_notice}"
|
||||
)));
|
||||
}
|
||||
Err(error) => {
|
||||
|
|
@ -1041,10 +988,10 @@ impl MultiPodApp {
|
|||
.map(MultiPodAction::LaunchIntake)
|
||||
.unwrap_or(MultiPodAction::None),
|
||||
KeyCode::Enter if self.composer_is_blank() => MultiPodAction::Open,
|
||||
KeyCode::Enter => self
|
||||
.prepare_send()
|
||||
.map(MultiPodAction::Send)
|
||||
.unwrap_or(MultiPodAction::None),
|
||||
KeyCode::Enter => {
|
||||
self.reject_companion_submit();
|
||||
MultiPodAction::None
|
||||
}
|
||||
KeyCode::Backspace => {
|
||||
self.input.delete_before();
|
||||
MultiPodAction::None
|
||||
|
|
@ -1083,7 +1030,6 @@ enum MultiPodAction {
|
|||
Quit,
|
||||
Open,
|
||||
Refresh,
|
||||
Send(DirectSendRequest),
|
||||
DispatchTicketAction(TicketActionRequest),
|
||||
LaunchIntake(IntakeLaunchRequest),
|
||||
}
|
||||
|
|
@ -1334,7 +1280,7 @@ fn existing_ticket_claim_notice(
|
|||
) -> String {
|
||||
match status {
|
||||
TicketLocalClaimStatus::Live | TicketLocalClaimStatus::Restorable => format!(
|
||||
"Ticket {ticket_id} is already claimed by local Intake Pod {pod_name} ({}); select/open that Pod instead of starting a second Intake.",
|
||||
"Ticket {ticket_id} is already claimed by local Intake Pod {pod_name} ({}); open that Pod instead of starting a second Intake.",
|
||||
status.label()
|
||||
),
|
||||
TicketLocalClaimStatus::Stale => format!(
|
||||
|
|
@ -1559,7 +1505,6 @@ async fn dispatch_ticket_action(
|
|||
NextUserAction::Clarify
|
||||
| NextUserAction::Edit
|
||||
| NextUserAction::OpenPod
|
||||
| NextUserAction::SendToPod
|
||||
| NextUserAction::Wait => Ok(TicketActionOutcome {
|
||||
notice: format!(
|
||||
"{} for Ticket {} has no safe inline workspace-panel dispatch; use the Ticket workflow.",
|
||||
|
|
@ -1615,11 +1560,11 @@ async fn notify_workspace_orchestrator(
|
|||
}
|
||||
}
|
||||
|
||||
async fn send_notify_only(socket: &Path, message: String) -> Result<(), DirectSendError> {
|
||||
async fn send_notify_only(socket: &Path, message: String) -> Result<(), NotifySendError> {
|
||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("connect timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("connect: {e}")))?;
|
||||
.map_err(|_| NotifySendError::Io("connect timed out".into()))?
|
||||
.map_err(|e| NotifySendError::Io(format!("connect: {e}")))?;
|
||||
let (reader, writer) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(reader);
|
||||
let mut writer = JsonLineWriter::new(writer);
|
||||
|
|
@ -1627,17 +1572,17 @@ async fn send_notify_only(socket: &Path, message: String) -> Result<(), DirectSe
|
|||
loop {
|
||||
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("read initial Snapshot timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("read initial Snapshot: {e}")))?;
|
||||
.map_err(|_| NotifySendError::Io("read initial Snapshot timed out".into()))?
|
||||
.map_err(|e| NotifySendError::Io(format!("read initial Snapshot: {e}")))?;
|
||||
match event {
|
||||
Some(Event::Snapshot { .. }) => break,
|
||||
Some(Event::Alert(_)) => continue,
|
||||
Some(Event::Error { code, message }) => {
|
||||
return Err(DirectSendError::Rejected { code, message });
|
||||
return Err(NotifySendError::Rejected { code, message });
|
||||
}
|
||||
Some(_) => continue,
|
||||
None => {
|
||||
return Err(DirectSendError::Io(
|
||||
return Err(NotifySendError::Io(
|
||||
"connection closed before initial Snapshot".into(),
|
||||
));
|
||||
}
|
||||
|
|
@ -1646,96 +1591,28 @@ async fn send_notify_only(socket: &Path, message: String) -> Result<(), DirectSe
|
|||
|
||||
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::Notify { message }))
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("write timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("write: {e}")))
|
||||
.map_err(|_| NotifySendError::Io("write timed out".into()))?
|
||||
.map_err(|e| NotifySendError::Io(format!("write: {e}")))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum DirectSendError {
|
||||
AlreadyRunning,
|
||||
pub(crate) enum NotifySendError {
|
||||
Rejected { code: ErrorCode, message: String },
|
||||
Io(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DirectSendError {
|
||||
impl std::fmt::Display for NotifySendError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::AlreadyRunning => write!(f, "target Pod is already running"),
|
||||
Self::Rejected { code, message } => {
|
||||
write!(f, "target rejected run ({code:?}): {message}")
|
||||
write!(f, "target rejected method ({code:?}): {message}")
|
||||
}
|
||||
Self::Io(message) => write!(f, "{message}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DirectSendError {}
|
||||
|
||||
async fn send_run_and_confirm(socket: &Path, input: Vec<Segment>) -> Result<(), DirectSendError> {
|
||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("connect timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("connect: {e}")))?;
|
||||
let (reader, writer) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(reader);
|
||||
let mut writer = JsonLineWriter::new(writer);
|
||||
|
||||
loop {
|
||||
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("read initial Snapshot timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("read initial Snapshot: {e}")))?;
|
||||
match event {
|
||||
Some(Event::Snapshot { .. }) => break,
|
||||
Some(Event::Alert(_)) => continue,
|
||||
Some(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
..
|
||||
}) => return Err(DirectSendError::AlreadyRunning),
|
||||
Some(Event::Error { code, message }) => {
|
||||
return Err(DirectSendError::Rejected { code, message });
|
||||
}
|
||||
Some(_) => continue,
|
||||
None => {
|
||||
return Err(DirectSendError::Io(
|
||||
"connection closed before initial Snapshot".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::Run { input }))
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("write timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("write: {e}")))?;
|
||||
|
||||
loop {
|
||||
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
|
||||
.await
|
||||
.map_err(|_| DirectSendError::Io("read response timed out".into()))?
|
||||
.map_err(|e| DirectSendError::Io(format!("read response: {e}")))?;
|
||||
match event {
|
||||
Some(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
..
|
||||
}) => return Err(DirectSendError::AlreadyRunning),
|
||||
Some(Event::Error { code, message }) => {
|
||||
return Err(DirectSendError::Rejected { code, message });
|
||||
}
|
||||
Some(Event::InvokeStart {
|
||||
kind: InvokeKind::UserSend,
|
||||
})
|
||||
| Some(Event::UserMessage { .. })
|
||||
| Some(Event::TurnStart { .. }) => return Ok(()),
|
||||
Some(_) => continue,
|
||||
None => {
|
||||
return Err(DirectSendError::Io(
|
||||
"connection closed before response".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl std::error::Error for NotifySendError {}
|
||||
|
||||
fn segments_are_blank(segments: &[Segment]) -> bool {
|
||||
segments.iter().all(|segment| match segment {
|
||||
|
|
@ -1744,31 +1621,31 @@ fn segments_are_blank(segments: &[Segment]) -> bool {
|
|||
})
|
||||
}
|
||||
|
||||
fn send_disabled_reason(entry: &PodListEntry) -> String {
|
||||
fn open_disabled_reason(entry: &PodListEntry) -> String {
|
||||
if let Some(live) = entry.live.as_ref() {
|
||||
if !live.reachable {
|
||||
return "Selected live Pod is unreachable.".to_string();
|
||||
}
|
||||
return match live.status {
|
||||
Some(PodStatus::Running) => {
|
||||
"Selected Pod is running; direct send is disabled in multi-Pod view.".to_string()
|
||||
"Selected Pod is running; press o or empty Enter to open/attach.".to_string()
|
||||
}
|
||||
Some(PodStatus::Paused) => {
|
||||
"Selected Pod is paused; open it explicitly to resume or start a new turn."
|
||||
.to_string()
|
||||
}
|
||||
Some(PodStatus::Idle) => "Selected Pod is not send-eligible.".to_string(),
|
||||
Some(PodStatus::Idle) => "Selected Pod can be opened/attached.".to_string(),
|
||||
None => "Selected Pod did not report a live status.".to_string(),
|
||||
};
|
||||
}
|
||||
if entry.stored.is_some() {
|
||||
return "Selected Pod is stopped; open/restore it before sending.".to_string();
|
||||
return "Selected Pod is stopped; press o or empty Enter to restore/open.".to_string();
|
||||
}
|
||||
entry
|
||||
.actions
|
||||
.disabled_reason
|
||||
.clone()
|
||||
.unwrap_or_else(|| "Selected Pod is not send-eligible.".to_string())
|
||||
.unwrap_or_else(|| "Selected Pod cannot be opened from this row.".to_string())
|
||||
}
|
||||
|
||||
fn selected_ticket_notice(row: Option<&PanelRow>) -> String {
|
||||
|
|
@ -2011,11 +1888,11 @@ fn draw_title(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) {
|
|||
.composer
|
||||
.is_available(ComposerTarget::TicketIntake)
|
||||
{
|
||||
" Enter dispatches selected Ticket action · Ctrl+T target · o open/attach · r refresh"
|
||||
" Empty Enter dispatches selected Ticket action · Ctrl+T target · o/empty Enter open/attach · r refresh"
|
||||
} else if app.panel.header.ticket_configured {
|
||||
" Enter dispatches selected Ticket action · Enter sends to selected idle Pod when no Ticket action is selected · o open/attach · r refresh"
|
||||
" Empty Enter dispatches selected Ticket action · o/empty Enter open/attach Pod · r refresh"
|
||||
} else {
|
||||
" Pod-centric view · Enter sends to selected idle Pod · o open/attach · r refresh"
|
||||
" Pod-centric view · o/empty Enter open/attach selected Pod · r refresh"
|
||||
};
|
||||
let mut spans = vec![
|
||||
Span::styled(
|
||||
|
|
@ -2397,15 +2274,16 @@ fn draw_actionbar(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) {
|
|||
let left = if app.sending && app.composer_target() == ComposerTarget::TicketIntake {
|
||||
"launching Ticket Intake…".to_string()
|
||||
} else if app.sending {
|
||||
"sending…".to_string()
|
||||
"working…".to_string()
|
||||
} else if let Some(notice) = app.notice.as_deref() {
|
||||
notice.to_string()
|
||||
} else if let Some(reason) = app.selected_send_disabled_reason() {
|
||||
} else if let Some(reason) = app.selected_open_disabled_reason() {
|
||||
reason
|
||||
} else {
|
||||
match app.composer_target() {
|
||||
ComposerTarget::Companion => {
|
||||
"idle live target: Enter sends directly without opening conversation".to_string()
|
||||
"Companion target pending; non-empty Enter keeps draft and reports a diagnostic"
|
||||
.to_string()
|
||||
}
|
||||
ComposerTarget::TicketIntake => {
|
||||
"Ticket Intake target: Enter launches Intake with composer text".to_string()
|
||||
|
|
@ -2417,9 +2295,9 @@ fn draw_actionbar(frame: &mut Frame<'_>, app: &MultiPodApp, area: Rect) {
|
|||
.composer
|
||||
.is_available(ComposerTarget::TicketIntake)
|
||||
{
|
||||
"↑/↓ select Enter use target Ctrl+T target o open r refresh Esc quit"
|
||||
"↑/↓ select Empty Enter target/open Ctrl+T target o open r refresh Esc quit"
|
||||
} else {
|
||||
"↑/↓ select Enter send o open r refresh Esc quit"
|
||||
"↑/↓ select Empty Enter open non-empty Enter diagnose o open r refresh Esc quit"
|
||||
};
|
||||
let left_width = area
|
||||
.width
|
||||
|
|
@ -2759,7 +2637,7 @@ mod tests {
|
|||
let mut app = app_with_panel(list, panel);
|
||||
|
||||
assert_eq!(app.selected_panel_row().unwrap().title, "Needs Human Reply");
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled);
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::Disabled);
|
||||
let lines = list_lines(&app, 100, 6)
|
||||
.into_iter()
|
||||
.map(|line| plain_line(&line))
|
||||
|
|
@ -2773,20 +2651,23 @@ mod tests {
|
|||
|
||||
app.select_next();
|
||||
assert_eq!(app.list.selected_entry().unwrap().name, "idle");
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::SendNow);
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow);
|
||||
let open = app.prepare_open().unwrap();
|
||||
assert_eq!(open.pod_name, "idle");
|
||||
assert_eq!(open.socket_override, Some(PathBuf::from("/tmp/idle.sock")));
|
||||
|
||||
app.input.insert_str("send after ticket row");
|
||||
let request = match app.handle_key(key(KeyCode::Enter)) {
|
||||
MultiPodAction::Send(request) => request,
|
||||
_ => panic!("Pod row should preserve direct send behavior"),
|
||||
};
|
||||
assert_eq!(request.socket_path, PathBuf::from("/tmp/idle.sock"));
|
||||
assert_eq!(
|
||||
Segment::flatten_to_text(&request.segments),
|
||||
"send after ticket row"
|
||||
app.input.insert_str("draft after ticket row");
|
||||
assert!(matches!(
|
||||
app.handle_key(key(KeyCode::Enter)),
|
||||
MultiPodAction::None
|
||||
));
|
||||
assert!(!app.sending);
|
||||
assert_eq!(input_text(&app), "draft after ticket row");
|
||||
assert!(
|
||||
app.notice
|
||||
.as_deref()
|
||||
.unwrap()
|
||||
.contains("Companion composer is not wired")
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -2912,11 +2793,11 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn multi_idle_live_selected_target_is_send_eligible() {
|
||||
fn multi_idle_live_selected_target_is_open_eligible() {
|
||||
let app = test_app(vec![live_info("idle", PodStatus::Idle)]);
|
||||
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::SendNow);
|
||||
assert!(app.selected_send_disabled_reason().is_none());
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow);
|
||||
assert!(app.selected_open_disabled_reason().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -3058,7 +2939,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn multi_running_paused_and_stopped_targets_are_direct_send_disabled() {
|
||||
fn multi_running_paused_and_stopped_targets_are_open_eligible() {
|
||||
let mut app = test_app(vec![
|
||||
live_info("running", PodStatus::Running),
|
||||
live_info("paused", PodStatus::Paused),
|
||||
|
|
@ -3077,28 +2958,16 @@ mod tests {
|
|||
app.selected_row = None;
|
||||
app.ensure_selection_visible();
|
||||
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled);
|
||||
assert!(
|
||||
app.selected_send_disabled_reason()
|
||||
.unwrap()
|
||||
.contains("running")
|
||||
);
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow);
|
||||
assert!(app.selected_open_disabled_reason().is_none());
|
||||
app.select_next();
|
||||
assert_eq!(app.list.selected_entry().unwrap().name, "paused");
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled);
|
||||
assert!(
|
||||
app.selected_send_disabled_reason()
|
||||
.unwrap()
|
||||
.contains("paused")
|
||||
);
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow);
|
||||
assert!(app.selected_open_disabled_reason().is_none());
|
||||
app.select_next();
|
||||
assert_eq!(app.list.selected_entry().unwrap().name, "stopped");
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled);
|
||||
assert!(
|
||||
app.selected_send_disabled_reason()
|
||||
.unwrap()
|
||||
.contains("stopped")
|
||||
);
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::OpenNow);
|
||||
assert!(app.selected_open_disabled_reason().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -3202,7 +3071,7 @@ mod tests {
|
|||
|
||||
assert!(app.selected_row.is_none());
|
||||
assert!(app.list.selected_name.is_none());
|
||||
assert_eq!(app.selected_send_eligibility(), SendEligibility::Disabled);
|
||||
assert_eq!(app.selected_open_eligibility(), OpenEligibility::Disabled);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -3295,26 +3164,32 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn multi_delivery_failure_keeps_composer_contents() {
|
||||
fn multi_companion_submit_keeps_composer_contents() {
|
||||
let mut app = test_app(vec![live_info("idle", PodStatus::Idle)]);
|
||||
app.input.insert_str("keep me");
|
||||
let before = input_text(&app);
|
||||
|
||||
app.finish_send(Err(DirectSendError::Io("boom".to_string())));
|
||||
app.reject_companion_submit();
|
||||
|
||||
assert_eq!(input_text(&app), before);
|
||||
assert!(app.notice.as_deref().unwrap().contains("composer kept"));
|
||||
assert!(!app.sending);
|
||||
assert!(
|
||||
app.notice
|
||||
.as_deref()
|
||||
.unwrap()
|
||||
.contains("Companion composer is not wired")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multi_delivery_success_clears_composer_contents() {
|
||||
fn multi_companion_submit_empty_reports_empty_composer() {
|
||||
let mut app = test_app(vec![live_info("idle", PodStatus::Idle)]);
|
||||
app.input.insert_str("send me");
|
||||
|
||||
app.finish_send(Ok(()));
|
||||
app.reject_companion_submit();
|
||||
|
||||
assert_eq!(input_text(&app), "");
|
||||
assert!(app.notice.as_deref().unwrap().contains("Delivered"));
|
||||
assert!(!app.sending);
|
||||
assert_eq!(app.notice.as_deref(), Some("Composer is empty."));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -3399,19 +3274,23 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn multi_non_empty_enter_uses_direct_send_action() {
|
||||
fn multi_non_empty_enter_reports_companion_unavailable() {
|
||||
let mut app = test_app(vec![live_info("idle", PodStatus::Idle)]);
|
||||
app.input.insert_str("send me");
|
||||
app.input.insert_str("keep this draft");
|
||||
|
||||
let request = match app.handle_key(key(KeyCode::Enter)) {
|
||||
MultiPodAction::Send(request) => request,
|
||||
_ => panic!("non-empty Enter should direct-send"),
|
||||
};
|
||||
assert!(matches!(
|
||||
app.handle_key(key(KeyCode::Enter)),
|
||||
MultiPodAction::None
|
||||
));
|
||||
|
||||
assert_eq!(request.socket_path, PathBuf::from("/tmp/idle.sock"));
|
||||
assert_eq!(Segment::flatten_to_text(&request.segments), "send me");
|
||||
assert!(app.sending);
|
||||
assert!(app.notice.as_deref().unwrap().contains("Sending to idle"));
|
||||
assert_eq!(input_text(&app), "keep this draft");
|
||||
assert!(!app.sending);
|
||||
assert!(
|
||||
app.notice
|
||||
.as_deref()
|
||||
.unwrap()
|
||||
.contains("Companion composer is not wired")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -3477,7 +3356,6 @@ mod tests {
|
|||
|
||||
let request = match app.handle_key(key(KeyCode::Enter)) {
|
||||
MultiPodAction::LaunchIntake(request) => request,
|
||||
MultiPodAction::Send(_) => panic!("Ticket Intake target must not direct-send"),
|
||||
_ => panic!("Ticket Intake target should launch Intake"),
|
||||
};
|
||||
|
||||
|
|
@ -3556,6 +3434,7 @@ mod tests {
|
|||
peer_registration: IntakePeerRegistrationStatus::Registered {
|
||||
orchestrator_pod: "test-orchestrator".to_string(),
|
||||
},
|
||||
registry_warning: None,
|
||||
}));
|
||||
|
||||
assert!(!app.sending);
|
||||
|
|
|
|||
|
|
@ -1,12 +1,15 @@
|
|||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::io::{self, Write};
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const REGISTRY_VERSION: u32 = 1;
|
||||
const REGISTRY_FILE: &str = "role-sessions.json";
|
||||
const REGISTRY_LOCK_FILE: &str = "role-sessions.lock";
|
||||
const CLAIMS_DIR: &str = "ticket-claims";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -24,17 +27,37 @@ pub(crate) struct RoleSessionRegistry {
|
|||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) struct RoleSessionRecord {
|
||||
pub pod_name: String,
|
||||
pub role: String,
|
||||
pub pod_name: String,
|
||||
pub origin: RoleSessionOrigin,
|
||||
pub created_at: String,
|
||||
pub updated_at: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub session_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub ticket_ids: Vec<String>,
|
||||
pub related_tickets: Vec<RelatedTicketRef>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum RoleSessionOrigin {
|
||||
PreTicketIntake,
|
||||
TicketClaim,
|
||||
RoleLaunch,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub(crate) struct RelatedTicketRef {
|
||||
pub id: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub slug: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) struct TicketClaim {
|
||||
pub ticket_id: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub ticket_slug: Option<String>,
|
||||
pub pod_name: String,
|
||||
pub role: String,
|
||||
}
|
||||
|
|
@ -144,37 +167,46 @@ impl PanelRegistryStore {
|
|||
&self,
|
||||
pod_name: impl Into<String>,
|
||||
role: impl Into<String>,
|
||||
origin: RoleSessionOrigin,
|
||||
session_id: Option<String>,
|
||||
ticket_ids: impl IntoIterator<Item = String>,
|
||||
related_tickets: impl IntoIterator<Item = RelatedTicketRef>,
|
||||
) -> Result<(), PanelRegistryError> {
|
||||
let pod_name = pod_name.into();
|
||||
let role = role.into();
|
||||
let mut registry = self.load_registry()?;
|
||||
registry.version = REGISTRY_VERSION;
|
||||
if let Some(workspace_root) = self.workspace_root.as_ref() {
|
||||
registry.workspace_root = workspace_root.clone();
|
||||
}
|
||||
let mut tickets: BTreeSet<String> = registry
|
||||
.sessions
|
||||
.get(&pod_name)
|
||||
.map(|record| record.ticket_ids.iter().cloned().collect())
|
||||
.unwrap_or_default();
|
||||
tickets.extend(ticket_ids);
|
||||
registry.sessions.insert(
|
||||
pod_name.clone(),
|
||||
RoleSessionRecord {
|
||||
pod_name,
|
||||
role,
|
||||
session_id,
|
||||
ticket_ids: tickets.into_iter().collect(),
|
||||
},
|
||||
);
|
||||
self.save_registry(®istry)
|
||||
let related_tickets: Vec<RelatedTicketRef> = related_tickets.into_iter().collect();
|
||||
self.update_registry(|registry| {
|
||||
let now = now_timestamp_string();
|
||||
let mut tickets: BTreeSet<RelatedTicketRef> = registry
|
||||
.sessions
|
||||
.get(&pod_name)
|
||||
.map(|record| record.related_tickets.iter().cloned().collect())
|
||||
.unwrap_or_default();
|
||||
tickets.extend(related_tickets);
|
||||
let created_at = registry
|
||||
.sessions
|
||||
.get(&pod_name)
|
||||
.map(|record| record.created_at.clone())
|
||||
.unwrap_or_else(|| now.clone());
|
||||
registry.sessions.insert(
|
||||
pod_name.clone(),
|
||||
RoleSessionRecord {
|
||||
role,
|
||||
pod_name,
|
||||
origin,
|
||||
created_at,
|
||||
updated_at: now,
|
||||
session_id,
|
||||
related_tickets: tickets.into_iter().collect(),
|
||||
},
|
||||
);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn claim_ticket(
|
||||
&self,
|
||||
ticket_id: &str,
|
||||
ticket_slug: Option<&str>,
|
||||
pod_name: &str,
|
||||
role: &str,
|
||||
) -> Result<TicketClaimResult, PanelRegistryError> {
|
||||
|
|
@ -182,25 +214,21 @@ impl PanelRegistryStore {
|
|||
let claim_path = self.claim_path(ticket_id);
|
||||
let claim = TicketClaim {
|
||||
ticket_id: ticket_id.to_string(),
|
||||
ticket_slug: ticket_slug.map(ToOwned::to_owned),
|
||||
pod_name: pod_name.to_string(),
|
||||
role: role.to_string(),
|
||||
};
|
||||
let bytes = serde_json::to_vec_pretty(&claim)?;
|
||||
match OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&claim_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
if let Err(error) = file.write_all(&bytes).and_then(|()| file.write_all(b"\n")) {
|
||||
let _ = fs::remove_file(&claim_path);
|
||||
return Err(error.into());
|
||||
}
|
||||
match self.create_claim_file(&claim_path, &claim) {
|
||||
Ok(()) => {
|
||||
if let Err(error) = self.record_session(
|
||||
pod_name.to_string(),
|
||||
role.to_string(),
|
||||
RoleSessionOrigin::TicketClaim,
|
||||
None,
|
||||
[ticket_id.to_string()],
|
||||
[RelatedTicketRef {
|
||||
id: ticket_id.to_string(),
|
||||
slug: ticket_slug.map(ToOwned::to_owned),
|
||||
}],
|
||||
) {
|
||||
let _ = fs::remove_file(&claim_path);
|
||||
return Err(error);
|
||||
|
|
@ -237,39 +265,77 @@ impl PanelRegistryStore {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn release_ticket_claim(
|
||||
fn update_registry(
|
||||
&self,
|
||||
ticket_id: &str,
|
||||
pod_name: &str,
|
||||
update: impl FnOnce(&mut RoleSessionRegistry) -> Result<(), PanelRegistryError>,
|
||||
) -> Result<(), PanelRegistryError> {
|
||||
match self.load_claim(ticket_id) {
|
||||
Ok(claim) if claim.pod_name == pod_name => {
|
||||
fs::remove_file(self.claim_path(ticket_id))?;
|
||||
Ok(())
|
||||
}
|
||||
Ok(_) => Ok(()),
|
||||
Err(PanelRegistryError::Io(error)) if error.kind() == io::ErrorKind::NotFound => Ok(()),
|
||||
Err(error) => Err(error),
|
||||
fs::create_dir_all(&self.root)?;
|
||||
let _lock = self.acquire_registry_lock()?;
|
||||
let mut registry = self.load_registry()?;
|
||||
registry.version = REGISTRY_VERSION;
|
||||
if let Some(workspace_root) = self.workspace_root.as_ref() {
|
||||
registry.workspace_root = workspace_root.clone();
|
||||
}
|
||||
update(&mut registry)?;
|
||||
self.save_registry(®istry)
|
||||
}
|
||||
|
||||
fn acquire_registry_lock(&self) -> Result<RegistryLockGuard, PanelRegistryError> {
|
||||
let lock_path = self.root.join(REGISTRY_LOCK_FILE);
|
||||
for _ in 0..50 {
|
||||
match OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&lock_path)
|
||||
{
|
||||
Ok(_) => return Ok(RegistryLockGuard { path: lock_path }),
|
||||
Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
Err(error) => return Err(error.into()),
|
||||
}
|
||||
}
|
||||
Err(PanelRegistryError::Io(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"timed out acquiring panel role session registry lock",
|
||||
)))
|
||||
}
|
||||
|
||||
fn save_registry(&self, registry: &RoleSessionRegistry) -> Result<(), PanelRegistryError> {
|
||||
fs::create_dir_all(&self.root)?;
|
||||
let path = self.registry_path();
|
||||
let temp_path = path.with_extension("json.tmp");
|
||||
let temp_path = path.with_extension(format!("json.{}.tmp", now_timestamp_string()));
|
||||
let bytes = serde_json::to_vec_pretty(registry)?;
|
||||
fs::write(&temp_path, [&bytes[..], b"\n"].concat())?;
|
||||
fs::rename(temp_path, path)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_claim_file(&self, claim_path: &Path, claim: &TicketClaim) -> io::Result<()> {
|
||||
let temp_path = self
|
||||
.claims_dir()
|
||||
.join(format!(".{}.tmp", now_timestamp_string()));
|
||||
let bytes = serde_json::to_vec_pretty(claim).map_err(io::Error::other)?;
|
||||
fs::write(&temp_path, [&bytes[..], b"\n"].concat())?;
|
||||
let link_result = fs::hard_link(&temp_path, claim_path);
|
||||
let remove_result = fs::remove_file(&temp_path);
|
||||
match (link_result, remove_result) {
|
||||
(Ok(()), Ok(())) | (Ok(()), Err(_)) => Ok(()),
|
||||
(Err(error), _) => Err(error),
|
||||
}
|
||||
}
|
||||
|
||||
fn load_claims(&self) -> Result<Vec<TicketClaim>, PanelRegistryError> {
|
||||
let mut claims: Vec<TicketClaim> = Vec::new();
|
||||
match fs::read_dir(self.claims_dir()) {
|
||||
Ok(entries) => {
|
||||
for entry in entries {
|
||||
let entry = entry?;
|
||||
if entry.file_type()?.is_file() {
|
||||
if entry.file_type()?.is_file()
|
||||
&& entry
|
||||
.path()
|
||||
.extension()
|
||||
.is_some_and(|extension| extension == "json")
|
||||
{
|
||||
let bytes = fs::read(entry.path())?;
|
||||
claims.push(serde_json::from_slice(&bytes)?);
|
||||
}
|
||||
|
|
@ -296,6 +362,16 @@ impl PanelRegistryStore {
|
|||
}
|
||||
}
|
||||
|
||||
struct RegistryLockGuard {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Drop for RegistryLockGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_file(&self.path);
|
||||
}
|
||||
}
|
||||
|
||||
impl PanelRegistrySnapshot {
|
||||
pub(crate) fn empty() -> Self {
|
||||
Self {
|
||||
|
|
@ -359,6 +435,13 @@ fn encode_path_component(value: &str) -> String {
|
|||
encoded
|
||||
}
|
||||
|
||||
fn now_timestamp_string() -> String {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|duration| duration.as_nanos().to_string())
|
||||
.unwrap_or_else(|_| "0".to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
@ -376,7 +459,13 @@ mod tests {
|
|||
assert_ne!(store.root(), other.root());
|
||||
|
||||
store
|
||||
.record_session("ticket-intake-preticket", "intake", None, [])
|
||||
.record_session(
|
||||
"ticket-intake-preticket",
|
||||
"intake",
|
||||
RoleSessionOrigin::PreTicketIntake,
|
||||
None,
|
||||
[],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(store.load_registry().unwrap().workspace_root, "/repo/yoi");
|
||||
}
|
||||
|
|
@ -387,18 +476,17 @@ mod tests {
|
|||
let store = PanelRegistryStore::from_root(temp.path().join("registry"));
|
||||
|
||||
assert!(matches!(
|
||||
store.claim_ticket("T-1", "ticket-one-intake", "intake"),
|
||||
store.claim_ticket("T-1", Some("ticket-one"), "ticket-one-intake", "intake"),
|
||||
Ok(TicketClaimResult::Claimed)
|
||||
));
|
||||
|
||||
let error = store
|
||||
.claim_ticket("T-1", "ticket-two-intake", "intake")
|
||||
.claim_ticket("T-1", Some("ticket-one"), "ticket-two-intake", "intake")
|
||||
.unwrap_err();
|
||||
assert!(matches!(error, PanelRegistryError::TicketAlreadyClaimed(_)));
|
||||
assert_eq!(
|
||||
store.claim_for_ticket("T-1").unwrap().unwrap().pod_name,
|
||||
"ticket-one-intake"
|
||||
);
|
||||
let claim = store.claim_for_ticket("T-1").unwrap().unwrap();
|
||||
assert_eq!(claim.pod_name, "ticket-one-intake");
|
||||
assert_eq!(claim.ticket_slug.as_deref(), Some("ticket-one"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -407,14 +495,30 @@ mod tests {
|
|||
let store = PanelRegistryStore::from_root(temp.path().join("registry"));
|
||||
|
||||
store
|
||||
.record_session("ticket-intake-preticket", "intake", None, [])
|
||||
.record_session(
|
||||
"ticket-intake-preticket",
|
||||
"intake",
|
||||
RoleSessionOrigin::PreTicketIntake,
|
||||
None,
|
||||
[],
|
||||
)
|
||||
.unwrap();
|
||||
store
|
||||
.record_session(
|
||||
"ticket-intake-shared",
|
||||
"intake",
|
||||
RoleSessionOrigin::RoleLaunch,
|
||||
None,
|
||||
["T-1".to_string(), "T-2".to_string()],
|
||||
[
|
||||
RelatedTicketRef {
|
||||
id: "T-1".to_string(),
|
||||
slug: Some("one".to_string()),
|
||||
},
|
||||
RelatedTicketRef {
|
||||
id: "T-2".to_string(),
|
||||
slug: Some("two".to_string()),
|
||||
},
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -430,7 +534,23 @@ mod tests {
|
|||
.find(|session| session.pod_name == "ticket-intake-shared")
|
||||
.unwrap();
|
||||
|
||||
assert!(preticket.ticket_ids.is_empty());
|
||||
assert_eq!(shared.ticket_ids, vec!["T-1", "T-2"]);
|
||||
assert!(preticket.related_tickets.is_empty());
|
||||
assert_eq!(shared.role, "intake");
|
||||
assert_eq!(shared.origin, RoleSessionOrigin::RoleLaunch);
|
||||
assert!(!shared.created_at.is_empty());
|
||||
assert!(!shared.updated_at.is_empty());
|
||||
assert_eq!(
|
||||
shared.related_tickets,
|
||||
vec![
|
||||
RelatedTicketRef {
|
||||
id: "T-1".to_string(),
|
||||
slug: Some("one".to_string()),
|
||||
},
|
||||
RelatedTicketRef {
|
||||
id: "T-2".to_string(),
|
||||
slug: Some("two".to_string()),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -167,7 +167,6 @@ pub(crate) enum NextUserAction {
|
|||
Edit,
|
||||
Wait,
|
||||
OpenPod,
|
||||
SendToPod,
|
||||
}
|
||||
|
||||
impl NextUserAction {
|
||||
|
|
@ -180,7 +179,6 @@ impl NextUserAction {
|
|||
Self::Edit => "Edit",
|
||||
Self::Wait => "Wait",
|
||||
Self::OpenPod => "Open",
|
||||
Self::SendToPod => "Send",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -404,11 +402,6 @@ pub(crate) fn build_workspace_panel(
|
|||
{
|
||||
Ok(snapshot) => snapshot,
|
||||
Err(error) => {
|
||||
let mut snapshot = PanelRegistrySnapshot::empty();
|
||||
// Keep panel rendering available even when the local registry is corrupt or
|
||||
// unavailable; the diagnostic is attached below after the header is initialized.
|
||||
snapshot.claims.clear();
|
||||
snapshot.sessions.clear();
|
||||
let mut model = WorkspacePanelViewModel::empty(workspace_root);
|
||||
model
|
||||
.header
|
||||
|
|
@ -420,7 +413,7 @@ pub(crate) fn build_workspace_panel(
|
|||
model,
|
||||
workspace_root,
|
||||
pods,
|
||||
&snapshot,
|
||||
&PanelRegistrySnapshot::empty(),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
|
@ -774,9 +767,7 @@ fn pod_rows(pods: &PodList) -> Vec<PanelRow> {
|
|||
|
||||
fn pod_row(entry: &PodListEntry) -> PanelRow {
|
||||
let status = pod_status_label(entry).to_string();
|
||||
let next_action = if entry.actions.can_send_now {
|
||||
Some(NextUserAction::SendToPod)
|
||||
} else if entry.actions.can_open {
|
||||
let next_action = if entry.actions.can_open {
|
||||
Some(NextUserAction::OpenPod)
|
||||
} else {
|
||||
None
|
||||
|
|
@ -802,7 +793,7 @@ fn pod_row(entry: &PodListEntry) -> PanelRow {
|
|||
ticket: None,
|
||||
related_pods: Vec::new(),
|
||||
disabled_reason: entry.actions.disabled_reason.clone(),
|
||||
key_hint: Some("Pod rows preserve existing open/direct-send behavior".to_string()),
|
||||
key_hint: Some("Press o or empty Enter to open/attach this Pod".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1077,7 +1068,7 @@ mod tests {
|
|||
let summary = backend.list(TicketFilter::all()).unwrap().remove(0);
|
||||
let store = PanelRegistryStore::from_root(temp.path().join("local-registry"));
|
||||
store
|
||||
.claim_ticket(&summary.id, "ticket-claimed-intake", "intake")
|
||||
.claim_ticket(&summary.id, None, "ticket-claimed-intake", "intake")
|
||||
.unwrap();
|
||||
let registry = store.snapshot().unwrap();
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user