merge: idle queued orchestrator attention
This commit is contained in:
commit
9538feb1ce
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::collections::BTreeSet;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
@ -55,6 +56,11 @@ const COMPANION_PROGRESS_MAX_TITLE_CHARS: usize = 80;
|
||||||
const COMPANION_PROGRESS_MAX_MESSAGE_CHARS: usize = 1_800;
|
const COMPANION_PROGRESS_MAX_MESSAGE_CHARS: usize = 1_800;
|
||||||
const COMPANION_PROGRESS_NOTICE_TEMPLATE: &str =
|
const COMPANION_PROGRESS_NOTICE_TEMPLATE: &str =
|
||||||
include_str!("../../../resources/prompts/panel/companion_progress_notice.md");
|
include_str!("../../../resources/prompts/panel/companion_progress_notice.md");
|
||||||
|
const ORCHESTRATOR_IDLE_QUEUE_NOTICE_TEMPLATE: &str =
|
||||||
|
include_str!("../../../resources/prompts/panel/orchestrator_idle_queue_notice.md");
|
||||||
|
const ORCHESTRATOR_QUEUE_ATTENTION_MAX_TICKETS: usize = 6;
|
||||||
|
const ORCHESTRATOR_QUEUE_ATTENTION_MAX_TEXT_CHARS: usize = 120;
|
||||||
|
const ORCHESTRATOR_QUEUE_ATTENTION_MAX_MESSAGE_CHARS: usize = 2_400;
|
||||||
const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(3);
|
const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(3);
|
||||||
const MULTI_POD_POLL_INTERVAL: Duration = Duration::from_millis(1_500);
|
const MULTI_POD_POLL_INTERVAL: Duration = Duration::from_millis(1_500);
|
||||||
const TERMINAL_EVENT_POLL_INTERVAL: Duration = Duration::from_millis(100);
|
const TERMINAL_EVENT_POLL_INTERVAL: Duration = Duration::from_millis(100);
|
||||||
|
|
@ -132,6 +138,10 @@ pub(crate) async fn run(
|
||||||
loop {
|
loop {
|
||||||
if let Some(result) = pending_reload.finish_if_ready().await {
|
if let Some(result) = pending_reload.finish_if_ready().await {
|
||||||
app.apply_reload_result(result);
|
app.apply_reload_result(result);
|
||||||
|
if let Some(request) = app.prepare_orchestrator_queue_attention_notice() {
|
||||||
|
let result = dispatch_orchestrator_queue_attention_notice(request).await;
|
||||||
|
app.finish_orchestrator_queue_attention_notice(result);
|
||||||
|
}
|
||||||
if let Some(request) = app.prepare_companion_progress_notice() {
|
if let Some(request) = app.prepare_companion_progress_notice() {
|
||||||
let result = dispatch_companion_progress_notice(request).await;
|
let result = dispatch_companion_progress_notice(request).await;
|
||||||
app.finish_companion_progress_notice(result);
|
app.finish_companion_progress_notice(result);
|
||||||
|
|
@ -605,6 +615,124 @@ struct CompanionProgressTemplateRolePod {
|
||||||
status: String,
|
status: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||||
|
struct OrchestratorWorkSet {
|
||||||
|
active_inprogress: Vec<OrchestratorActiveWorkItem>,
|
||||||
|
queued: Vec<OrchestratorQueuedWorkItem>,
|
||||||
|
fingerprint: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OrchestratorWorkSet {
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.active_inprogress.is_empty() && self.queued.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_active_inprogress(&self) -> bool {
|
||||||
|
!self.active_inprogress.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn planned_queued_ids(&self) -> BTreeSet<String> {
|
||||||
|
self.queued.iter().map(|item| item.id.clone()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn actionable_queued(&self) -> Vec<&OrchestratorQueuedWorkItem> {
|
||||||
|
self.queued
|
||||||
|
.iter()
|
||||||
|
.filter(|item| item.waiting_reason.is_none())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct OrchestratorActiveWorkItem {
|
||||||
|
id: String,
|
||||||
|
title: String,
|
||||||
|
status: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct OrchestratorQueuedWorkItem {
|
||||||
|
id: String,
|
||||||
|
title: String,
|
||||||
|
classification: OrchestratorQueuedClassification,
|
||||||
|
waiting_reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
enum OrchestratorQueuedClassification {
|
||||||
|
NewQueued,
|
||||||
|
PlannedQueued,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OrchestratorQueuedClassification {
|
||||||
|
fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::NewQueued => "new_queued",
|
||||||
|
Self::PlannedQueued => "planned_queued",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct OrchestratorQueueAttentionFreshness {
|
||||||
|
fingerprint: String,
|
||||||
|
updated_at: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct OrchestratorQueueAttentionNotice {
|
||||||
|
message: String,
|
||||||
|
fingerprint: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct OrchestratorQueueAttentionNoticeRequest {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
notice: OrchestratorQueueAttentionNotice,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
struct OrchestratorQueueAttentionNoticeResult {
|
||||||
|
fingerprint: String,
|
||||||
|
updated_at: String,
|
||||||
|
error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OrchestratorQueueAttentionNoticeResult {
|
||||||
|
fn sent(fingerprint: String, updated_at: String) -> Self {
|
||||||
|
Self {
|
||||||
|
fingerprint,
|
||||||
|
updated_at,
|
||||||
|
error: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn failed(fingerprint: String, error: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
fingerprint,
|
||||||
|
updated_at: String::new(),
|
||||||
|
error: Some(error.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct OrchestratorQueueTemplateContext {
|
||||||
|
workspace: String,
|
||||||
|
actionable_tickets: Vec<OrchestratorQueueTemplateTicket>,
|
||||||
|
waiting_tickets: Vec<OrchestratorQueueTemplateTicket>,
|
||||||
|
omitted_ticket_count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct OrchestratorQueueTemplateTicket {
|
||||||
|
id: String,
|
||||||
|
title: String,
|
||||||
|
classification: &'static str,
|
||||||
|
waiting_reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct MultiPodApp {
|
pub(crate) struct MultiPodApp {
|
||||||
pub(crate) list: PodList,
|
pub(crate) list: PodList,
|
||||||
pub(crate) panel: WorkspacePanelViewModel,
|
pub(crate) panel: WorkspacePanelViewModel,
|
||||||
|
|
@ -621,6 +749,8 @@ pub(crate) struct MultiPodApp {
|
||||||
last_companion_lifecycle_failure: Option<CompanionPanelState>,
|
last_companion_lifecycle_failure: Option<CompanionPanelState>,
|
||||||
last_orchestrator_lifecycle_failure: Option<OrchestratorPanelState>,
|
last_orchestrator_lifecycle_failure: Option<OrchestratorPanelState>,
|
||||||
companion_progress: Option<CompanionProgressFreshness>,
|
companion_progress: Option<CompanionProgressFreshness>,
|
||||||
|
orchestrator_work_set: OrchestratorWorkSet,
|
||||||
|
orchestrator_queue_attention: Option<OrchestratorQueueAttentionFreshness>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MultiPodApp {
|
impl MultiPodApp {
|
||||||
|
|
@ -655,6 +785,8 @@ impl MultiPodApp {
|
||||||
last_companion_lifecycle_failure: None,
|
last_companion_lifecycle_failure: None,
|
||||||
last_orchestrator_lifecycle_failure: None,
|
last_orchestrator_lifecycle_failure: None,
|
||||||
companion_progress: None,
|
companion_progress: None,
|
||||||
|
orchestrator_work_set: OrchestratorWorkSet::default(),
|
||||||
|
orchestrator_queue_attention: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -705,9 +837,64 @@ impl MultiPodApp {
|
||||||
self.selected_row = previous_row.filter(|key| self.panel.row(key).is_some());
|
self.selected_row = previous_row.filter(|key| self.panel.row(key).is_some());
|
||||||
self.ensure_selection_visible();
|
self.ensure_selection_visible();
|
||||||
self.ensure_composer_target_available();
|
self.ensure_composer_target_available();
|
||||||
|
self.refresh_orchestrator_work_set();
|
||||||
|
self.apply_orchestrator_work_set_detail();
|
||||||
self.apply_companion_progress_freshness();
|
self.apply_companion_progress_freshness();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn prepare_orchestrator_queue_attention_notice(
|
||||||
|
&mut self,
|
||||||
|
) -> Option<OrchestratorQueueAttentionNoticeRequest> {
|
||||||
|
let target = orchestrator_queue_attention_notice_target(&self.panel, &self.list)?;
|
||||||
|
if self.orchestrator_work_set.is_empty() {
|
||||||
|
self.refresh_orchestrator_work_set();
|
||||||
|
}
|
||||||
|
let notice = orchestrator_queue_attention_notice(&self.panel, &self.orchestrator_work_set)?;
|
||||||
|
if self
|
||||||
|
.orchestrator_queue_attention
|
||||||
|
.as_ref()
|
||||||
|
.is_some_and(|freshness| freshness.fingerprint == notice.fingerprint)
|
||||||
|
{
|
||||||
|
self.apply_orchestrator_work_set_detail();
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(OrchestratorQueueAttentionNoticeRequest {
|
||||||
|
pod_name: target.pod_name,
|
||||||
|
socket_path: target.socket_path,
|
||||||
|
notice,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish_orchestrator_queue_attention_notice(
|
||||||
|
&mut self,
|
||||||
|
result: OrchestratorQueueAttentionNoticeResult,
|
||||||
|
) {
|
||||||
|
if let Some(error) = result.error {
|
||||||
|
self.notice = Some(format!(
|
||||||
|
"Orchestrator queued-work attention not delivered: {error}"
|
||||||
|
));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.orchestrator_queue_attention = Some(OrchestratorQueueAttentionFreshness {
|
||||||
|
fingerprint: result.fingerprint,
|
||||||
|
updated_at: result.updated_at,
|
||||||
|
});
|
||||||
|
self.apply_orchestrator_work_set_detail();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn refresh_orchestrator_work_set(&mut self) {
|
||||||
|
let previous_planned = self.orchestrator_work_set.planned_queued_ids();
|
||||||
|
self.orchestrator_work_set = derive_orchestrator_work_set(&self.panel, previous_planned);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_orchestrator_work_set_detail(&mut self) {
|
||||||
|
let detail = orchestrator_work_set_detail(
|
||||||
|
&self.orchestrator_work_set,
|
||||||
|
self.orchestrator_queue_attention.as_ref(),
|
||||||
|
);
|
||||||
|
apply_orchestrator_detail(&mut self.panel, detail);
|
||||||
|
}
|
||||||
|
|
||||||
fn prepare_companion_progress_notice(&mut self) -> Option<CompanionProgressNoticeRequest> {
|
fn prepare_companion_progress_notice(&mut self) -> Option<CompanionProgressNoticeRequest> {
|
||||||
let target = companion_progress_notice_target(&self.panel, &self.list)?;
|
let target = companion_progress_notice_target(&self.panel, &self.list)?;
|
||||||
let notice = companion_progress_notice(&self.panel, &self.list)?;
|
let notice = companion_progress_notice(&self.panel, &self.list)?;
|
||||||
|
|
@ -2417,6 +2604,312 @@ struct OrchestratorNotifyTarget {
|
||||||
socket_path: PathBuf,
|
socket_path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn orchestrator_queue_attention_notice_target(
|
||||||
|
panel: &WorkspacePanelViewModel,
|
||||||
|
list: &PodList,
|
||||||
|
) -> Option<OrchestratorNotifyTarget> {
|
||||||
|
let orchestrator = panel.header.orchestrator.as_ref()?;
|
||||||
|
if !matches!(orchestrator.status, OrchestratorPanelStatus::Live) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let entry = list
|
||||||
|
.entries
|
||||||
|
.iter()
|
||||||
|
.find(|entry| entry.name == orchestrator.pod_name)?;
|
||||||
|
if !entry.actions.can_open {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let live = entry.live.as_ref()?;
|
||||||
|
if !live.reachable || live.status != Some(PodStatus::Idle) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(OrchestratorNotifyTarget {
|
||||||
|
pod_name: orchestrator.pod_name.clone(),
|
||||||
|
socket_path: live.socket_path.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_orchestrator_work_set(
|
||||||
|
panel: &WorkspacePanelViewModel,
|
||||||
|
previous_planned: BTreeSet<String>,
|
||||||
|
) -> OrchestratorWorkSet {
|
||||||
|
let active_inprogress = panel
|
||||||
|
.rows
|
||||||
|
.iter()
|
||||||
|
.filter_map(|row| {
|
||||||
|
let ticket = row.ticket.as_ref()?;
|
||||||
|
if ticket.workflow_state == TicketWorkflowState::InProgress {
|
||||||
|
Some(OrchestratorActiveWorkItem {
|
||||||
|
id: ticket.id.clone(),
|
||||||
|
title: ticket.title.clone(),
|
||||||
|
status: ticket.workflow_state.as_str().to_string(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let active_wait = if active_inprogress.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(format!(
|
||||||
|
"waiting for active_inprogress: {}",
|
||||||
|
active_inprogress
|
||||||
|
.iter()
|
||||||
|
.map(|item| item.id.as_str())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ")
|
||||||
|
))
|
||||||
|
};
|
||||||
|
let queued = panel
|
||||||
|
.rows
|
||||||
|
.iter()
|
||||||
|
.filter_map(|row| {
|
||||||
|
let ticket = row.ticket.as_ref()?;
|
||||||
|
if ticket.workflow_state != TicketWorkflowState::Queued {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let duplicate_guard = queued_duplicate_guard(ticket, row);
|
||||||
|
let waiting_reason = active_wait
|
||||||
|
.clone()
|
||||||
|
.or_else(|| {
|
||||||
|
ticket
|
||||||
|
.blocked_reason
|
||||||
|
.as_ref()
|
||||||
|
.map(|reason| format!("blocked by Ticket relation diagnostics: {reason}"))
|
||||||
|
})
|
||||||
|
.or(duplicate_guard);
|
||||||
|
let classification =
|
||||||
|
if waiting_reason.is_some() || previous_planned.contains(&ticket.id) {
|
||||||
|
OrchestratorQueuedClassification::PlannedQueued
|
||||||
|
} else {
|
||||||
|
OrchestratorQueuedClassification::NewQueued
|
||||||
|
};
|
||||||
|
Some(OrchestratorQueuedWorkItem {
|
||||||
|
id: ticket.id.clone(),
|
||||||
|
title: ticket.title.clone(),
|
||||||
|
classification,
|
||||||
|
waiting_reason,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let fingerprint = orchestrator_work_set_fingerprint(&active_inprogress, &queued);
|
||||||
|
OrchestratorWorkSet {
|
||||||
|
active_inprogress,
|
||||||
|
queued,
|
||||||
|
fingerprint,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queued_duplicate_guard(
|
||||||
|
ticket: &crate::workspace_panel::TicketPanelEntry,
|
||||||
|
row: &PanelRow,
|
||||||
|
) -> Option<String> {
|
||||||
|
let mut guards = Vec::new();
|
||||||
|
if let Some(claim) = ticket.local_claim.as_ref().filter(|claim| {
|
||||||
|
matches!(
|
||||||
|
claim.status,
|
||||||
|
TicketLocalClaimStatus::Live | TicketLocalClaimStatus::Restorable
|
||||||
|
)
|
||||||
|
}) {
|
||||||
|
guards.push(format!(
|
||||||
|
"local {} claim {} ({})",
|
||||||
|
claim.role,
|
||||||
|
claim.pod_name,
|
||||||
|
claim.status.label()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
for pod in ticket.related_pods.iter().chain(row.related_pods.iter()) {
|
||||||
|
if !guards.iter().any(|guard| guard.contains(pod)) {
|
||||||
|
guards.push(format!("related pod/worktree {pod}"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if guards.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(format!(
|
||||||
|
"waiting on existing role/session or visible pod/worktree before duplicate start: {}",
|
||||||
|
guards.join(", ")
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn orchestrator_work_set_fingerprint(
|
||||||
|
active: &[OrchestratorActiveWorkItem],
|
||||||
|
queued: &[OrchestratorQueuedWorkItem],
|
||||||
|
) -> String {
|
||||||
|
let active = active
|
||||||
|
.iter()
|
||||||
|
.map(|item| format!("active:{}:{}", item.id, item.status))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("|");
|
||||||
|
let queued = queued
|
||||||
|
.iter()
|
||||||
|
.map(|item| {
|
||||||
|
format!(
|
||||||
|
"queued:{}:{}:{}",
|
||||||
|
item.id,
|
||||||
|
item.classification.as_str(),
|
||||||
|
item.waiting_reason.as_deref().unwrap_or("actionable")
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("|");
|
||||||
|
format!("active=[{active}];queued=[{queued}]")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn orchestrator_queue_attention_notice(
|
||||||
|
panel: &WorkspacePanelViewModel,
|
||||||
|
work_set: &OrchestratorWorkSet,
|
||||||
|
) -> Option<OrchestratorQueueAttentionNotice> {
|
||||||
|
if work_set.has_active_inprogress() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let actionable = work_set.actionable_queued();
|
||||||
|
if actionable.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let waiting = work_set
|
||||||
|
.queued
|
||||||
|
.iter()
|
||||||
|
.filter(|item| item.waiting_reason.is_some())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let ticket_count = actionable.len() + waiting.len();
|
||||||
|
let actionable_tickets = actionable
|
||||||
|
.iter()
|
||||||
|
.take(ORCHESTRATOR_QUEUE_ATTENTION_MAX_TICKETS)
|
||||||
|
.map(|item| orchestrator_queue_template_ticket(item))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let remaining_capacity =
|
||||||
|
ORCHESTRATOR_QUEUE_ATTENTION_MAX_TICKETS.saturating_sub(actionable_tickets.len());
|
||||||
|
let waiting_tickets = waiting
|
||||||
|
.iter()
|
||||||
|
.take(remaining_capacity)
|
||||||
|
.map(|item| orchestrator_queue_template_ticket(item))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let rendered =
|
||||||
|
render_orchestrator_queue_attention_template(&OrchestratorQueueTemplateContext {
|
||||||
|
workspace: bounded_progress_text(
|
||||||
|
&panel.header.workspace_label,
|
||||||
|
ORCHESTRATOR_QUEUE_ATTENTION_MAX_TEXT_CHARS,
|
||||||
|
),
|
||||||
|
actionable_tickets,
|
||||||
|
waiting_tickets,
|
||||||
|
omitted_ticket_count: ticket_count
|
||||||
|
.saturating_sub(ORCHESTRATOR_QUEUE_ATTENTION_MAX_TICKETS),
|
||||||
|
})
|
||||||
|
.ok()?;
|
||||||
|
let message = bounded_progress_text(&rendered, ORCHESTRATOR_QUEUE_ATTENTION_MAX_MESSAGE_CHARS);
|
||||||
|
let fingerprint = format!("idle-queue:{}", work_set.fingerprint);
|
||||||
|
Some(OrchestratorQueueAttentionNotice {
|
||||||
|
message,
|
||||||
|
fingerprint,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn orchestrator_queue_template_ticket(
|
||||||
|
item: &&OrchestratorQueuedWorkItem,
|
||||||
|
) -> OrchestratorQueueTemplateTicket {
|
||||||
|
OrchestratorQueueTemplateTicket {
|
||||||
|
id: bounded_progress_text(&item.id, ORCHESTRATOR_QUEUE_ATTENTION_MAX_TEXT_CHARS),
|
||||||
|
title: bounded_progress_text(&item.title, ORCHESTRATOR_QUEUE_ATTENTION_MAX_TEXT_CHARS),
|
||||||
|
classification: item.classification.as_str(),
|
||||||
|
waiting_reason: item.waiting_reason.as_ref().map(|reason| {
|
||||||
|
bounded_progress_text(reason, ORCHESTRATOR_QUEUE_ATTENTION_MAX_TEXT_CHARS)
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn render_orchestrator_queue_attention_template(
|
||||||
|
context: &OrchestratorQueueTemplateContext,
|
||||||
|
) -> Result<String, minijinja::Error> {
|
||||||
|
let mut env = minijinja::Environment::new();
|
||||||
|
env.set_undefined_behavior(minijinja::UndefinedBehavior::Strict);
|
||||||
|
env.add_template(
|
||||||
|
"orchestrator_idle_queue_notice",
|
||||||
|
ORCHESTRATOR_IDLE_QUEUE_NOTICE_TEMPLATE,
|
||||||
|
)?;
|
||||||
|
env.get_template("orchestrator_idle_queue_notice")?
|
||||||
|
.render(context)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn orchestrator_work_set_detail(
|
||||||
|
work_set: &OrchestratorWorkSet,
|
||||||
|
freshness: Option<&OrchestratorQueueAttentionFreshness>,
|
||||||
|
) -> Option<String> {
|
||||||
|
if work_set.is_empty() {
|
||||||
|
return freshness.map(|freshness| {
|
||||||
|
format!(
|
||||||
|
"queued-work attention last sent at {} (idle auto-run notify)",
|
||||||
|
freshness.updated_at
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if work_set.has_active_inprogress() {
|
||||||
|
let active = work_set
|
||||||
|
.active_inprogress
|
||||||
|
.iter()
|
||||||
|
.take(3)
|
||||||
|
.map(|item| item.id.as_str())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
let queued = work_set
|
||||||
|
.queued
|
||||||
|
.iter()
|
||||||
|
.take(3)
|
||||||
|
.map(|item| match item.waiting_reason.as_deref() {
|
||||||
|
Some(reason) => format!("{} ({reason})", item.id),
|
||||||
|
None => item.id.clone(),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
return Some(bounded_panel_diagnostic(format!(
|
||||||
|
"queued-work attention suppressed; active_inprogress: {active}; planned queued: {queued}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let actionable = work_set.actionable_queued();
|
||||||
|
let waiting = work_set.queued.len().saturating_sub(actionable.len());
|
||||||
|
if !actionable.is_empty() {
|
||||||
|
let classes = actionable
|
||||||
|
.iter()
|
||||||
|
.take(3)
|
||||||
|
.map(|item| format!("{} ({})", item.id, item.classification.as_str()))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
let sent = freshness
|
||||||
|
.map(|freshness| format!("; last sent at {}", freshness.updated_at))
|
||||||
|
.unwrap_or_default();
|
||||||
|
return Some(bounded_panel_diagnostic(format!(
|
||||||
|
"queued-work attention pending: {classes}; waiting queued: {waiting}{sent}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let waiting = work_set
|
||||||
|
.queued
|
||||||
|
.iter()
|
||||||
|
.take(3)
|
||||||
|
.map(|item| match item.waiting_reason.as_deref() {
|
||||||
|
Some(reason) => format!("{} ({reason})", item.id),
|
||||||
|
None => item.id.clone(),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(", ");
|
||||||
|
Some(bounded_panel_diagnostic(format!(
|
||||||
|
"queued-work attention waiting: {waiting}"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_orchestrator_detail(panel: &mut WorkspacePanelViewModel, detail: Option<String>) {
|
||||||
|
let Some(orchestrator) = panel.header.orchestrator.as_mut() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if matches!(orchestrator.status, OrchestratorPanelStatus::Unavailable) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Some(detail) = detail {
|
||||||
|
orchestrator.detail = Some(detail);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
struct CompanionProgressNoticeTarget {
|
struct CompanionProgressNoticeTarget {
|
||||||
pod_name: String,
|
pod_name: String,
|
||||||
|
|
@ -2571,6 +3064,21 @@ async fn dispatch_companion_progress_notice(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn dispatch_orchestrator_queue_attention_notice(
|
||||||
|
request: OrchestratorQueueAttentionNoticeRequest,
|
||||||
|
) -> OrchestratorQueueAttentionNoticeResult {
|
||||||
|
let fingerprint = request.notice.fingerprint.clone();
|
||||||
|
match send_notify_only(&request.socket_path, request.notice.message, true).await {
|
||||||
|
Ok(()) => {
|
||||||
|
OrchestratorQueueAttentionNoticeResult::sent(fingerprint, progress_notice_timestamp())
|
||||||
|
}
|
||||||
|
Err(err) => OrchestratorQueueAttentionNoticeResult::failed(
|
||||||
|
fingerprint,
|
||||||
|
format!("{}: {}", request.pod_name, err),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub(crate) struct TicketActionOutcome {
|
pub(crate) struct TicketActionOutcome {
|
||||||
notice: String,
|
notice: String,
|
||||||
|
|
@ -6476,6 +6984,189 @@ mod tests {
|
||||||
assert!(app.notice.as_deref().unwrap().contains("cannot be opened"));
|
assert!(app.notice.as_deref().unwrap().contains("cannot be opened"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn idle_orchestrator_gets_bounded_attention_for_new_queued_work() {
|
||||||
|
let mut app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Idle)]);
|
||||||
|
app.panel.rows = vec![panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
)];
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
|
||||||
|
let request = app
|
||||||
|
.prepare_orchestrator_queue_attention_notice()
|
||||||
|
.expect("idle orchestrator should receive queued-work attention");
|
||||||
|
|
||||||
|
assert_eq!(request.pod_name, "test-orchestrator");
|
||||||
|
assert!(request.notice.message.contains("00001QUEUE"));
|
||||||
|
assert!(request.notice.message.contains("new_queued"));
|
||||||
|
assert!(request.notice.message.contains("queued -> inprogress"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn active_inprogress_suppresses_queued_attention_and_retains_waiting_reason() {
|
||||||
|
let mut app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Idle)]);
|
||||||
|
app.panel.rows = vec![
|
||||||
|
panel_test_ticket_row(
|
||||||
|
"00001ACTIVE",
|
||||||
|
"Active work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"inprogress",
|
||||||
|
),
|
||||||
|
panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
),
|
||||||
|
];
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
app.apply_orchestrator_work_set_detail();
|
||||||
|
|
||||||
|
assert!(app.prepare_orchestrator_queue_attention_notice().is_none());
|
||||||
|
let queued = app
|
||||||
|
.orchestrator_work_set
|
||||||
|
.queued
|
||||||
|
.iter()
|
||||||
|
.find(|item| item.id == "00001QUEUE")
|
||||||
|
.expect("queued item retained");
|
||||||
|
assert_eq!(
|
||||||
|
queued.classification,
|
||||||
|
OrchestratorQueuedClassification::PlannedQueued
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
queued
|
||||||
|
.waiting_reason
|
||||||
|
.as_deref()
|
||||||
|
.unwrap()
|
||||||
|
.contains("active_inprogress")
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
app.panel
|
||||||
|
.header
|
||||||
|
.orchestrator
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.detail
|
||||||
|
.as_deref()
|
||||||
|
.unwrap()
|
||||||
|
.contains("suppressed")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn planned_queued_prompts_when_active_work_clears() {
|
||||||
|
let mut app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Idle)]);
|
||||||
|
app.panel.rows = vec![
|
||||||
|
panel_test_ticket_row(
|
||||||
|
"00001ACTIVE",
|
||||||
|
"Active work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"inprogress",
|
||||||
|
),
|
||||||
|
panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
),
|
||||||
|
];
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
assert!(app.prepare_orchestrator_queue_attention_notice().is_none());
|
||||||
|
|
||||||
|
app.panel.rows = vec![panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
)];
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
let request = app
|
||||||
|
.prepare_orchestrator_queue_attention_notice()
|
||||||
|
.expect("planned queued work should prompt after active work clears");
|
||||||
|
|
||||||
|
assert!(request.notice.message.contains("planned_queued"));
|
||||||
|
assert!(
|
||||||
|
!request
|
||||||
|
.notice
|
||||||
|
.message
|
||||||
|
.contains("waiting for active_inprogress")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn queued_attention_is_suppressed_when_existing_claim_prevents_duplicate_start() {
|
||||||
|
let mut app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Idle)]);
|
||||||
|
let mut row = panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
);
|
||||||
|
row.ticket.as_mut().unwrap().local_claim =
|
||||||
|
Some(crate::workspace_panel::TicketLocalClaimEntry {
|
||||||
|
pod_name: "coder-00001QUEUE".to_string(),
|
||||||
|
role: "coder".to_string(),
|
||||||
|
status: TicketLocalClaimStatus::Live,
|
||||||
|
});
|
||||||
|
row.related_pods.push("reviewer-00001QUEUE".to_string());
|
||||||
|
app.panel.rows = vec![row];
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
app.apply_orchestrator_work_set_detail();
|
||||||
|
|
||||||
|
assert!(app.prepare_orchestrator_queue_attention_notice().is_none());
|
||||||
|
let waiting = app.orchestrator_work_set.queued[0]
|
||||||
|
.waiting_reason
|
||||||
|
.as_deref()
|
||||||
|
.unwrap();
|
||||||
|
assert!(waiting.contains("duplicate start"));
|
||||||
|
assert!(waiting.contains("coder-00001QUEUE"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rediscovered_queued_work_is_actionable_when_session_work_set_is_empty() {
|
||||||
|
let mut app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Idle)]);
|
||||||
|
app.orchestrator_work_set = OrchestratorWorkSet::default();
|
||||||
|
app.panel.rows = vec![panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
)];
|
||||||
|
|
||||||
|
let request = app
|
||||||
|
.prepare_orchestrator_queue_attention_notice()
|
||||||
|
.expect("queued ticket state should be rediscovered safely");
|
||||||
|
|
||||||
|
assert!(request.notice.message.contains("new_queued"));
|
||||||
|
assert!(request.notice.message.contains("00001QUEUE"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn queued_attention_requires_idle_orchestrator_to_avoid_duplicate_rekick() {
|
||||||
|
let mut app = ticket_enabled_app(vec![live_info("test-orchestrator", PodStatus::Running)]);
|
||||||
|
app.panel.rows = vec![panel_test_ticket_row(
|
||||||
|
"00001QUEUE",
|
||||||
|
"Queued work",
|
||||||
|
ActionPriority::Background,
|
||||||
|
NextUserAction::Wait,
|
||||||
|
"queued",
|
||||||
|
)];
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
|
||||||
|
assert!(app.prepare_orchestrator_queue_attention_notice().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
fn test_app(live: Vec<LivePodInfo>) -> MultiPodApp {
|
fn test_app(live: Vec<LivePodInfo>) -> MultiPodApp {
|
||||||
app_with_list(PodList::from_sources(
|
app_with_list(PodList::from_sources(
|
||||||
PodVisibilitySource::ResumePicker,
|
PodVisibilitySource::ResumePicker,
|
||||||
|
|
@ -6564,9 +7255,13 @@ mod tests {
|
||||||
last_companion_lifecycle_failure,
|
last_companion_lifecycle_failure,
|
||||||
last_orchestrator_lifecycle_failure,
|
last_orchestrator_lifecycle_failure,
|
||||||
companion_progress: None,
|
companion_progress: None,
|
||||||
|
orchestrator_work_set: OrchestratorWorkSet::default(),
|
||||||
|
orchestrator_queue_attention: None,
|
||||||
};
|
};
|
||||||
app.ensure_selection_visible();
|
app.ensure_selection_visible();
|
||||||
app.ensure_composer_target_available();
|
app.ensure_composer_target_available();
|
||||||
|
app.refresh_orchestrator_work_set();
|
||||||
|
app.apply_orchestrator_work_set_detail();
|
||||||
app
|
app
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
24
resources/prompts/panel/orchestrator_idle_queue_notice.md
Normal file
24
resources/prompts/panel/orchestrator_idle_queue_notice.md
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
<system-reminder>
|
||||||
|
Workspace panel observed that this Orchestrator Pod is idle while queued Ticket work is present.
|
||||||
|
|
||||||
|
This is bounded attention only, not scheduler authority. Do not drain the queue automatically. Before implementation side effects, verify the Ticket state and record the normal `queued -> inprogress` acceptance through Ticket tools.
|
||||||
|
|
||||||
|
Workspace: {{ workspace }}
|
||||||
|
|
||||||
|
Actionable queued Tickets:
|
||||||
|
{% for ticket in actionable_tickets -%}
|
||||||
|
- {{ ticket.id }} — {{ ticket.title }} [{{ ticket.classification }}]
|
||||||
|
{% endfor -%}
|
||||||
|
|
||||||
|
{% if waiting_tickets | length > 0 -%}
|
||||||
|
Queued Tickets retained in the session work set but currently waiting:
|
||||||
|
{% for ticket in waiting_tickets -%}
|
||||||
|
- {{ ticket.id }} — {{ ticket.title }} [{{ ticket.classification }}]: {{ ticket.waiting_reason }}
|
||||||
|
{% endfor -%}
|
||||||
|
{% endif -%}
|
||||||
|
{% if omitted_ticket_count > 0 -%}
|
||||||
|
Additional queued Tickets omitted from this bounded notice: {{ omitted_ticket_count }}
|
||||||
|
{% endif -%}
|
||||||
|
|
||||||
|
Preserve the existing human gate, dependency/conflict/capacity/dirty-workspace checks, and duplicate-start checks using actual Ticket state, role/session claims, visible Pods, and worktrees.
|
||||||
|
</system-reminder>
|
||||||
Loading…
Reference in New Issue
Block a user