diff --git a/TODO.md b/TODO.md index 9a3ff87f..2867aad1 100644 --- a/TODO.md +++ b/TODO.md @@ -11,6 +11,7 @@ - [ ] サブミット入力 - [ ] protocol Segment 化 → [tickets/submit-segment-protocol.md](tickets/submit-segment-protocol.md) - [ ] TUI 補完 + 型付き atom 化 → [tickets/submit-tui-completion.md](tickets/submit-tui-completion.md) +- [ ] Notification 用語リネーム (Event::Alert / Method::Notify) → [tickets/notification-naming-cleanup.md](tickets/notification-naming-cleanup.md) - [ ] メモリ機構 - [ ] ファイル形式 + Linter 土台 → [tickets/memory-file-format.md](tickets/memory-file-format.md) - [ ] memory / Knowledge 検索ツール → [tickets/memory-search-tools.md](tickets/memory-search-tools.md) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index a57627a5..97595e47 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -6,8 +6,8 @@ use llm_worker::llm_client::client::LlmClient; use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; -use crate::ipc::notification_buffer::NotificationBuffer; -use crate::ipc::notifier::Notifier; +use crate::ipc::alerter::Alerter; +use crate::ipc::notify_buffer::NotifyBuffer; use crate::pod::{Pod, PodError, PodRunResult}; use crate::spawn::comm_tools::{ list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, @@ -17,7 +17,7 @@ use crate::shared_state::{PodSharedState, PodStatus}; use crate::ipc::server::SocketServer; use crate::spawn::tool::spawn_pod_tool; use crate::spawn::registry::SpawnedPodRegistry; -use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult}; +use protocol::{ErrorCode, Event, Method, AlertLevel, AlertSource, RunResult, TurnResult}; // --------------------------------------------------------------------------- // PodHandle — client-facing, Clone-able @@ -29,7 +29,7 @@ pub struct PodHandle { event_tx: broadcast::Sender, pub shared_state: Arc, pub runtime_dir: Arc, - pub notifier: Notifier, + pub alerter: Alerter, } impl PodHandle { @@ -46,9 +46,9 @@ impl PodHandle { self.event_tx.send(event) } - /// Emit a user-facing notification. Thin wrapper over `Notifier::notify`. - pub fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) { - self.notifier.notify(level, source, message); + /// Emit a user-facing alert. Thin wrapper over `Alerter::alert`. + pub fn alert(&self, level: AlertLevel, source: AlertSource, message: String) { + self.alerter.alert(level, source, message); } } @@ -72,7 +72,7 @@ impl PodController { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (method_tx, mut method_rx) = mpsc::channel::(32); let (event_tx, _) = broadcast::channel::(256); - let notifier = Notifier::new(event_tx.clone()); + let alerter = Alerter::new(event_tx.clone()); let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default(); let greeting = build_greeting(&pod); @@ -95,13 +95,13 @@ impl PodController { event_tx: event_tx.clone(), shared_state: shared_state.clone(), runtime_dir: runtime_dir.clone(), - notifier: notifier.clone(), + alerter: alerter.clone(), }; - // Hand the notifier to the Pod so internal operations (compaction, + // Hand the alerter to the Pod so internal operations (compaction, // AGENTS.md ingestion during the first turn) can emit user-facing // notifications on the same channel. - pod.attach_notifier(notifier.clone()); + pod.attach_alerter(alerter.clone()); // Also hand the raw broadcast sender so Pod-internal operations // can emit typed lifecycle `Event`s (currently: compact progress). pod.attach_event_tx(event_tx.clone()); @@ -212,11 +212,11 @@ impl PodController { }); }); - let notifier_for_worker = notifier.clone(); + let alerter_for_worker = alerter.clone(); worker.on_warning(move |message| { - notifier_for_worker.notify( - NotificationLevel::Warn, - NotificationSource::Worker, + alerter_for_worker.alert( + AlertLevel::Warn, + AlertSource::Worker, message.to_owned(), ); }); @@ -257,7 +257,7 @@ impl PodController { // `Method::Notify` into the buffer even while `pod` is held by // an in-flight `run_for_notification` / `run` future. let cancel_tx = pod.worker_mut().cancel_sender(); - let notification_buffer = pod.notification_buffer_handle(); + let notify_buffer = pod.notify_buffer_handle(); tokio::spawn(async move { // Hold socket server alive for the lifetime of the controller task @@ -303,7 +303,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, - ¬ification_buffer, + ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, &spawned_registry, @@ -313,9 +313,9 @@ impl PodController { if new_status == PodStatus::Idle { if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Compactor, + alerter.alert( + AlertLevel::Warn, + AlertSource::Compactor, format!("post-run compaction error: {e}"), ); } @@ -334,7 +334,7 @@ impl PodController { } Method::Notify { message } => { - pod.push_notification(message); + pod.push_notify(message); if shared_state.get_status() != PodStatus::Idle { // RUNNING / Paused: the buffer push is the // entire operation; the in-flight turn (or @@ -353,7 +353,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, - ¬ification_buffer, + ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, &spawned_registry, @@ -363,9 +363,9 @@ impl PodController { if new_status == PodStatus::Idle { if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Compactor, + alerter.alert( + AlertLevel::Warn, + AlertSource::Compactor, format!("post-run compaction error: {e}"), ); } @@ -400,7 +400,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, - ¬ification_buffer, + ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, &spawned_registry, @@ -410,9 +410,9 @@ impl PodController { if new_status == PodStatus::Idle { if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Compactor, + alerter.alert( + AlertLevel::Warn, + AlertSource::Compactor, format!("post-run compaction error: {e}"), ); } @@ -475,7 +475,7 @@ impl PodController { // request will inject it as a system message // via `PodInterceptor::pre_llm_request`. let text = crate::ipc::event::render_event(&event); - pod.push_notification(text); + pod.push_notify(text); // Auto-kick a turn if the Pod is idle so the // notification is not stranded. Matches the // `Method::Notify` idle path. @@ -489,7 +489,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, - ¬ification_buffer, + ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, &spawned_registry, @@ -499,9 +499,9 @@ impl PodController { if new_status == PodStatus::Idle { if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Compactor, + alerter.alert( + AlertLevel::Warn, + AlertSource::Compactor, format!("post-run compaction error: {e}"), ); } @@ -563,7 +563,7 @@ async fn run_with_cancel_support( event_tx: &broadcast::Sender, cancel_tx: &mpsc::Sender<()>, shared_state: &Arc, - notification_buffer: &NotificationBuffer, + notify_buffer: &NotifyBuffer, parent_socket: Option<&std::path::PathBuf>, self_name: &str, spawned_registry: &Arc, @@ -645,7 +645,7 @@ where Some(Method::Notify { message }) => { // Route into the buffer; the in-flight turn will // drain it at its next pre_llm_request. - notification_buffer.push(message); + notify_buffer.push(message); } Some(Method::GetHistory) => {} Some(Method::PodEvent(event)) => { @@ -664,7 +664,7 @@ where &self_parent_socket, ) .await; - notification_buffer.push(crate::ipc::event::render_event(&event)); + notify_buffer.push(crate::ipc::event::render_event(&event)); } None => { let _ = cancel_tx.try_send(()); diff --git a/crates/pod/src/ipc/alerter.rs b/crates/pod/src/ipc/alerter.rs new file mode 100644 index 00000000..390e3158 --- /dev/null +++ b/crates/pod/src/ipc/alerter.rs @@ -0,0 +1,168 @@ +//! User-facing alert channel for Pod → client. +//! +//! Separate from `tracing` (which is for developer logs). Alerts +//! are short human-readable messages the Pod layer wants a client to +//! see — for example "compaction failed", "tool output truncated". +//! +//! Each alert is broadcast on the shared `Event` channel and +//! also appended to an in-memory buffer so that clients connecting +//! after the fact still see everything emitted during the session. + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use tokio::sync::broadcast; + +use protocol::{Alert, AlertLevel, AlertSource, Event}; + +/// Upper bound on buffered alerts. When exceeded, the oldest +/// entries are discarded so a long-running session cannot leak +/// memory through a pathological loop of recurring alerts +/// (e.g. compaction failing every turn). +const MAX_BUFFERED_ALERTS: usize = 512; + +#[derive(Clone)] +pub struct Alerter { + inner: Arc, +} + +struct Inner { + event_tx: broadcast::Sender, + buffer: Mutex>, +} + +impl Alerter { + pub fn new(event_tx: broadcast::Sender) -> Self { + Self { + inner: Arc::new(Inner { + event_tx, + buffer: Mutex::new(VecDeque::with_capacity(MAX_BUFFERED_ALERTS)), + }), + } + } + + /// Record and broadcast an alert. + /// + /// The broadcast may have no subscribers (e.g. during Pod + /// construction before any client has connected); the buffer + /// guarantees the message is still delivered once a client + /// attaches. + /// + /// The buffer mutex is held across `broadcast::send` to make + /// `subscribe_with_snapshot` race-free — a client that snapshots + /// the buffer while holding the same lock sees every alert + /// exactly once: older ones from the snapshot, newer ones from + /// the freshly-subscribed receiver. + pub fn alert(&self, level: AlertLevel, source: AlertSource, message: String) { + let alert = Alert { + level, + source, + message, + timestamp_ms: now_ms(), + }; + if let Ok(mut buf) = self.inner.buffer.lock() { + if buf.len() >= MAX_BUFFERED_ALERTS { + buf.pop_front(); + } + buf.push_back(alert.clone()); + let _ = self.inner.event_tx.send(Event::Alert(alert)); + } + } + + /// Subscribe and atomically snapshot the current buffer. + /// + /// The returned snapshot contains alerts emitted before + /// this call; the receiver will deliver alerts emitted + /// after. An alert cannot appear in both. + pub fn subscribe_with_snapshot(&self) -> (Vec, broadcast::Receiver) { + let buf = self + .inner + .buffer + .lock() + .expect("alerter buffer mutex poisoned"); + let rx = self.inner.event_tx.subscribe(); + let snapshot: Vec = buf.iter().cloned().collect(); + (snapshot, rx) + } +} + +fn now_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn alert_broadcasts_to_existing_subscriber() { + let (tx, _keep) = broadcast::channel::(8); + let alerter = Alerter::new(tx); + let (_snapshot, mut rx) = alerter.subscribe_with_snapshot(); + + alerter.alert( + AlertLevel::Warn, + AlertSource::Compactor, + "test message".into(), + ); + + match rx.try_recv() { + Ok(Event::Alert(a)) => assert_eq!(a.message, "test message"), + other => panic!("unexpected event: {other:?}"), + } + } + + #[test] + fn late_subscriber_sees_earlier_alerts_via_snapshot() { + let (tx, _keep) = broadcast::channel::(8); + let alerter = Alerter::new(tx); + + alerter.alert(AlertLevel::Error, AlertSource::Pod, "first".into()); + alerter.alert(AlertLevel::Warn, AlertSource::AgentsMd, "second".into()); + + let (snapshot, mut rx) = alerter.subscribe_with_snapshot(); + assert_eq!(snapshot.len(), 2); + assert_eq!(snapshot[0].message, "first"); + assert_eq!(snapshot[1].message, "second"); + assert!(rx.try_recv().is_err()); // nothing pending on the receiver + } + + #[test] + fn buffer_discards_oldest_past_cap() { + let (tx, _keep) = broadcast::channel::(1024); + let alerter = Alerter::new(tx); + + for i in 0..(MAX_BUFFERED_ALERTS + 50) { + alerter.alert(AlertLevel::Warn, AlertSource::Worker, format!("msg-{i}")); + } + + let (snapshot, _rx) = alerter.subscribe_with_snapshot(); + assert_eq!(snapshot.len(), MAX_BUFFERED_ALERTS); + // First 50 were evicted; the oldest remaining is msg-50. + assert_eq!(snapshot.first().unwrap().message, "msg-50"); + let last = format!("msg-{}", MAX_BUFFERED_ALERTS + 49); + assert_eq!(snapshot.last().unwrap().message, last); + } + + #[test] + fn subscribe_snapshot_and_live_do_not_overlap() { + let (tx, _keep) = broadcast::channel::(8); + let alerter = Alerter::new(tx); + + alerter.alert(AlertLevel::Warn, AlertSource::Worker, "historic".into()); + let (snapshot, mut rx) = alerter.subscribe_with_snapshot(); + alerter.alert(AlertLevel::Error, AlertSource::Worker, "live".into()); + + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0].message, "historic"); + match rx.try_recv() { + Ok(Event::Alert(a)) => assert_eq!(a.message, "live"), + other => panic!("unexpected: {other:?}"), + } + assert!(rx.try_recv().is_err()); + } +} diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 6c8de01b..c014a390 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -25,7 +25,7 @@ use crate::hook::{ AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary, TurnEndInfo, }; -use crate::ipc::notification_buffer::{NotificationBuffer, format_notification}; +use crate::ipc::notify_buffer::{NotifyBuffer, format_notify}; use crate::prompt::catalog::PromptCatalog; use crate::compact::token_counter::total_tokens_impl; use tracing::warn; @@ -42,7 +42,7 @@ pub(crate) struct PodInterceptor { usage_history: Option>>>, /// Pending-notification buffer drained into the per-request /// context at the head of `pre_llm_request`. - pending_notifications: NotificationBuffer, + pending_notifies: NotifyBuffer, /// Prompt catalog used to render the injected notification wrapper. prompts: Arc, /// Next turn index assigned by `on_prompt_submit`. @@ -56,14 +56,14 @@ impl PodInterceptor { registry: Arc, compact_state: Option>, usage_history: Option>>>, - pending_notifications: NotificationBuffer, + pending_notifies: NotifyBuffer, prompts: Arc, ) -> Self { Self { registry, compact_state, usage_history, - pending_notifications, + pending_notifies, prompts, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), @@ -127,16 +127,16 @@ impl Interceptor for PodInterceptor { // into the per-request context as transient system messages. // These are not persisted to the Worker history; they exist only // for this single LLM request. - for notification in self.pending_notifications.drain() { - match format_notification(¬ification, &self.prompts) { + for n in self.pending_notifies.drain() { + match format_notify(&n, &self.prompts) { Ok(item) => context.push(item), Err(e) => { // A render failure here would starve the LLM of the - // notification text. Fall back to the raw message — + // notify text. Fall back to the raw message — // it still carries the intent, just without the // wrapper phrasing. warn!(error = %e, "failed to render notify_wrapper; using raw message"); - context.push(Item::system_message(notification.message.clone())); + context.push(Item::system_message(n.message.clone())); } } } @@ -296,7 +296,7 @@ mod tests { registry, Some(state), Some(history), - NotificationBuffer::new(), + NotifyBuffer::new(), PromptCatalog::builtins_only().unwrap(), ); let mut ctx = ctx_items; @@ -320,7 +320,7 @@ mod tests { registry, Some(state), Some(history), - NotificationBuffer::new(), + NotifyBuffer::new(), PromptCatalog::builtins_only().unwrap(), ); let mut ctx = ctx_items; @@ -345,7 +345,7 @@ mod tests { registry, Some(state), Some(history), - NotificationBuffer::new(), + NotifyBuffer::new(), PromptCatalog::builtins_only().unwrap(), ); let mut ctx = ctx_items; @@ -364,7 +364,7 @@ mod tests { registry, None, None, - NotificationBuffer::new(), + NotifyBuffer::new(), PromptCatalog::builtins_only().unwrap(), ); let mut ctx: Vec = Vec::new(); @@ -385,9 +385,9 @@ mod tests { } #[tokio::test] - async fn pre_llm_request_drains_pending_notifications_into_context() { + async fn pre_llm_request_drains_pending_notifies_into_context() { let registry = Arc::new(HookRegistryBuilder::new().build()); - let buffer = NotificationBuffer::new(); + let buffer = NotifyBuffer::new(); buffer.push("first".into()); buffer.push("second".into()); @@ -419,7 +419,7 @@ mod tests { // When compaction yields, notifications remain in the buffer for // the next pre_llm_request (after compaction + resume). let registry = Arc::new(HookRegistryBuilder::new().build()); - let buffer = NotificationBuffer::new(); + let buffer = NotifyBuffer::new(); buffer.push("msg".into()); let state = Arc::new(CompactState::new(None, Some(100), 2)); @@ -455,7 +455,7 @@ mod tests { registry, None, None, - NotificationBuffer::new(), + NotifyBuffer::new(), PromptCatalog::builtins_only().unwrap(), ); let mut ctx: Vec = Vec::new(); diff --git a/crates/pod/src/ipc/mod.rs b/crates/pod/src/ipc/mod.rs index 833b3aea..7072fb0a 100644 --- a/crates/pod/src/ipc/mod.rs +++ b/crates/pod/src/ipc/mod.rs @@ -1,6 +1,6 @@ +pub mod alerter; pub mod event; -pub mod notifier; pub mod server; pub(crate) mod interceptor; -pub(crate) mod notification_buffer; +pub(crate) mod notify_buffer; diff --git a/crates/pod/src/ipc/notifier.rs b/crates/pod/src/ipc/notifier.rs deleted file mode 100644 index 40d215d0..00000000 --- a/crates/pod/src/ipc/notifier.rs +++ /dev/null @@ -1,191 +0,0 @@ -//! User-facing notification channel for Pod → client. -//! -//! Separate from `tracing` (which is for developer logs). Notifications -//! are short human-readable messages the Pod layer wants a client to -//! see — for example "compaction failed", "tool output truncated". -//! -//! Each notification is broadcast on the shared `Event` channel and -//! also appended to an in-memory buffer so that clients connecting -//! after the fact still see everything emitted during the session. - -use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH}; - -use tokio::sync::broadcast; - -use protocol::{Event, Notification, NotificationLevel, NotificationSource}; - -/// Upper bound on buffered notifications. When exceeded, the oldest -/// entries are discarded so a long-running session cannot leak -/// memory through a pathological loop of recurring notifications -/// (e.g. compaction failing every turn). -const MAX_BUFFERED_NOTIFICATIONS: usize = 512; - -#[derive(Clone)] -pub struct Notifier { - inner: Arc, -} - -struct Inner { - event_tx: broadcast::Sender, - buffer: Mutex>, -} - -impl Notifier { - pub fn new(event_tx: broadcast::Sender) -> Self { - Self { - inner: Arc::new(Inner { - event_tx, - buffer: Mutex::new(VecDeque::with_capacity(MAX_BUFFERED_NOTIFICATIONS)), - }), - } - } - - /// Record and broadcast a notification. - /// - /// The broadcast may have no subscribers (e.g. during Pod - /// construction before any client has connected); the buffer - /// guarantees the message is still delivered once a client - /// attaches. - /// - /// The buffer mutex is held across `broadcast::send` to make - /// `subscribe_with_snapshot` race-free — a client that snapshots - /// the buffer while holding the same lock sees every notification - /// exactly once: older ones from the snapshot, newer ones from - /// the freshly-subscribed receiver. - pub fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) { - let notification = Notification { - level, - source, - message, - timestamp_ms: now_ms(), - }; - if let Ok(mut buf) = self.inner.buffer.lock() { - if buf.len() >= MAX_BUFFERED_NOTIFICATIONS { - buf.pop_front(); - } - buf.push_back(notification.clone()); - let _ = self - .inner - .event_tx - .send(Event::Notification(notification)); - } - } - - /// Subscribe and atomically snapshot the current buffer. - /// - /// The returned snapshot contains notifications emitted before - /// this call; the receiver will deliver notifications emitted - /// after. A notification cannot appear in both. - pub fn subscribe_with_snapshot(&self) -> (Vec, broadcast::Receiver) { - let buf = self - .inner - .buffer - .lock() - .expect("notifier buffer mutex poisoned"); - let rx = self.inner.event_tx.subscribe(); - let snapshot: Vec = buf.iter().cloned().collect(); - (snapshot, rx) - } -} - -fn now_ms() -> i64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_millis() as i64) - .unwrap_or(0) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn notify_broadcasts_to_existing_subscriber() { - let (tx, _keep) = broadcast::channel::(8); - let notifier = Notifier::new(tx); - let (_snapshot, mut rx) = notifier.subscribe_with_snapshot(); - - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Compactor, - "test message".into(), - ); - - match rx.try_recv() { - Ok(Event::Notification(n)) => assert_eq!(n.message, "test message"), - other => panic!("unexpected event: {other:?}"), - } - } - - #[test] - fn late_subscriber_sees_earlier_notifications_via_snapshot() { - let (tx, _keep) = broadcast::channel::(8); - let notifier = Notifier::new(tx); - - notifier.notify( - NotificationLevel::Error, - NotificationSource::Pod, - "first".into(), - ); - notifier.notify( - NotificationLevel::Warn, - NotificationSource::AgentsMd, - "second".into(), - ); - - let (snapshot, mut rx) = notifier.subscribe_with_snapshot(); - assert_eq!(snapshot.len(), 2); - assert_eq!(snapshot[0].message, "first"); - assert_eq!(snapshot[1].message, "second"); - assert!(rx.try_recv().is_err()); // nothing pending on the receiver - } - - #[test] - fn buffer_discards_oldest_past_cap() { - let (tx, _keep) = broadcast::channel::(1024); - let notifier = Notifier::new(tx); - - for i in 0..(MAX_BUFFERED_NOTIFICATIONS + 50) { - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Worker, - format!("msg-{i}"), - ); - } - - let (snapshot, _rx) = notifier.subscribe_with_snapshot(); - assert_eq!(snapshot.len(), MAX_BUFFERED_NOTIFICATIONS); - // First 50 were evicted; the oldest remaining is msg-50. - assert_eq!(snapshot.first().unwrap().message, "msg-50"); - let last = format!("msg-{}", MAX_BUFFERED_NOTIFICATIONS + 49); - assert_eq!(snapshot.last().unwrap().message, last); - } - - #[test] - fn subscribe_snapshot_and_live_do_not_overlap() { - let (tx, _keep) = broadcast::channel::(8); - let notifier = Notifier::new(tx); - - notifier.notify( - NotificationLevel::Warn, - NotificationSource::Worker, - "historic".into(), - ); - let (snapshot, mut rx) = notifier.subscribe_with_snapshot(); - notifier.notify( - NotificationLevel::Error, - NotificationSource::Worker, - "live".into(), - ); - - assert_eq!(snapshot.len(), 1); - assert_eq!(snapshot[0].message, "historic"); - match rx.try_recv() { - Ok(Event::Notification(n)) => assert_eq!(n.message, "live"), - other => panic!("unexpected: {other:?}"), - } - assert!(rx.try_recv().is_err()); - } -} diff --git a/crates/pod/src/ipc/notification_buffer.rs b/crates/pod/src/ipc/notify_buffer.rs similarity index 64% rename from crates/pod/src/ipc/notification_buffer.rs rename to crates/pod/src/ipc/notify_buffer.rs index 19506759..63009248 100644 --- a/crates/pod/src/ipc/notification_buffer.rs +++ b/crates/pod/src/ipc/notify_buffer.rs @@ -1,6 +1,6 @@ -//! Pending-notification buffer for `Method::Notify`. +//! Pending-notify buffer for `Method::Notify`. //! -//! Notifications are queued here by the Controller and drained by +//! Notify entries are queued here by the Controller and drained by //! `PodInterceptor::pre_llm_request` into the per-request context //! (never into the Worker's persistent history). Each queued entry //! becomes one `Item::system_message` in the outgoing request. @@ -13,56 +13,53 @@ use tracing::warn; use crate::prompt::catalog::{CatalogError, PromptCatalog}; -/// Maximum queued notifications. Oldest entries are dropped beyond this. +/// Maximum queued notify entries. Oldest entries are dropped beyond this. const CAPACITY: usize = 128; -/// One pending notification awaiting injection into the next LLM request. +/// One pending notify entry awaiting injection into the next LLM request. #[derive(Debug, Clone)] -pub struct PendingNotification { +pub struct PendingNotify { pub message: String, } -/// Shared, mutex-guarded buffer of pending notifications. +/// Shared, mutex-guarded buffer of pending notify entries. /// /// Cloned between the Pod (producer) and PodInterceptor (consumer). #[derive(Clone, Default)] -pub struct NotificationBuffer { - inner: Arc>>, +pub struct NotifyBuffer { + inner: Arc>>, } -impl NotificationBuffer { +impl NotifyBuffer { pub fn new() -> Self { Self::default() } - /// Push a notification onto the queue. If the queue is full, the + /// Push a notify entry onto the queue. If the queue is full, the /// oldest entry is dropped and a `tracing::warn` is emitted — the /// caller should never hit this in normal operation. pub fn push(&self, message: String) { - let mut q = self.inner.lock().expect("notification buffer poisoned"); + let mut q = self.inner.lock().expect("notify buffer poisoned"); if q.len() >= CAPACITY { let dropped = q.pop_front(); warn!( capacity = CAPACITY, dropped_message = dropped.as_ref().map(|n| n.message.as_str()), - "notification buffer overflow; dropped oldest" + "notify buffer overflow; dropped oldest" ); } - q.push_back(PendingNotification { message }); + q.push_back(PendingNotify { message }); } - /// Remove and return all pending notifications in FIFO order. - pub fn drain(&self) -> Vec { - let mut q = self.inner.lock().expect("notification buffer poisoned"); + /// Remove and return all pending notify entries in FIFO order. + pub fn drain(&self) -> Vec { + let mut q = self.inner.lock().expect("notify buffer poisoned"); q.drain(..).collect() } - /// Number of pending notifications. Primarily for tests. + /// Number of pending notify entries. Primarily for tests. pub fn len(&self) -> usize { - self.inner - .lock() - .expect("notification buffer poisoned") - .len() + self.inner.lock().expect("notify buffer poisoned").len() } pub fn is_empty(&self) -> bool { @@ -70,12 +67,12 @@ impl NotificationBuffer { } } -/// Format a single pending notification into the `Item::system_message` +/// Format a single pending notify entry into the `Item::system_message` /// that gets injected into the per-request context. The wrapper body /// comes from `PodPrompt::NotifyWrapper` so the surrounding phrasing /// can be customised via a prompt pack (translation, tone, ...). -pub(crate) fn format_notification( - n: &PendingNotification, +pub(crate) fn format_notify( + n: &PendingNotify, prompts: &PromptCatalog, ) -> Result { let text = prompts.notify_wrapper(&n.message)?; @@ -88,7 +85,7 @@ mod tests { #[test] fn push_then_drain_preserves_order() { - let buf = NotificationBuffer::new(); + let buf = NotifyBuffer::new(); buf.push("one".into()); buf.push("two".into()); let drained = buf.drain(); @@ -100,7 +97,7 @@ mod tests { #[test] fn capacity_drops_oldest() { - let buf = NotificationBuffer::new(); + let buf = NotifyBuffer::new(); for i in 0..(CAPACITY + 5) { buf.push(format!("msg{i}")); } @@ -112,12 +109,12 @@ mod tests { } #[test] - fn format_notification_includes_message_and_nonblocking_hint() { - let n = PendingNotification { + fn format_notify_includes_message_and_nonblocking_hint() { + let n = PendingNotify { message: "hello".into(), }; let catalog = PromptCatalog::builtins_only().unwrap(); - let item = format_notification(&n, &catalog).unwrap(); + let item = format_notify(&n, &catalog).unwrap(); let text = item.as_text().unwrap_or_default().to_string(); assert!(text.contains("[Notification]")); assert!(text.contains("hello")); diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index d81cab66..e090e805 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -62,17 +62,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let mut reader = JsonLineReader::new(reader); let mut writer = JsonLineWriter::new(writer); - // Atomically subscribe and snapshot buffered notifications so that + // Atomically subscribe and snapshot buffered alerts so that // warnings emitted before this client connected are replayed - // exactly once — they appear in the snapshot, and any notification + // exactly once — they appear in the snapshot, and any alert // arriving afterwards reaches us through `rx`. - let (notification_snapshot, mut rx) = handle.notifier.subscribe_with_snapshot(); - for notification in notification_snapshot { - if writer - .write(&Event::Notification(notification)) - .await - .is_err() - { + let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot(); + for alert in alert_snapshot { + if writer.write(&Event::Alert(alert)).await.is_err() { return; } } diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 736feae6..1d84a370 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -15,7 +15,7 @@ pub use compact::token_counter::{EstimateSource, SplitPoint, TokenEstimate}; pub use controller::{PodController, PodHandle, ShutdownReceiver}; pub use factory::{FactoryError, PodFactory}; pub use hook::{Hook, HookEventKind, HookRegistryBuilder}; -pub use ipc::notifier::Notifier; +pub use ipc::alerter::Alerter; pub use ipc::server::SocketServer; pub use manifest::{ AuthRef, ModelManifest, PodManifest, PodManifestConfig, PodMetaConfig, Scope, SchemeKind, diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index d36d69ea..f99e354d 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -19,8 +19,8 @@ use crate::hook::{ Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, PreRequestInfo, PreToolCall, }; -use crate::ipc::notification_buffer::NotificationBuffer; -use crate::ipc::notifier::Notifier; +use crate::ipc::alerter::Alerter; +use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::interceptor::PodInterceptor; use crate::prompt::loader::PromptLoader; use crate::prompt::catalog::{CatalogError, PromptCatalog}; @@ -28,7 +28,7 @@ use crate::runtime::dir; use crate::runtime::scope_lock::{self, ScopeAllocationGuard, ScopeLockError}; use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate}; use crate::compact::usage_tracker::UsageTracker; -use protocol::{Event, NotificationLevel, NotificationSource}; +use protocol::{Event, AlertLevel, AlertSource}; use tokio::sync::broadcast; use async_trait::async_trait; use llm_worker::interceptor::PreRequestAction; @@ -90,16 +90,16 @@ pub struct Pod { system_prompt_template: Option, /// User-facing notification sink attached by the Controller at /// spawn time. `None` in tests / direct `Pod::new` usage. - notifier: Option, + alerter: Option, /// Broadcast sender for typed lifecycle `Event`s (compact progress, - /// etc.). Attached by the Controller alongside `notifier`. Unlike + /// etc.). Attached by the Controller alongside `alerter`. Unlike /// notifications, events sent here are NOT replayed to clients that /// connect after the fact — they are fire-and-forget broadcasts. event_tx: Option>, /// Queue of pending `Method::Notify` notifications awaiting /// injection into the next LLM request. Shared with the /// PodInterceptor installed in `ensure_interceptor_installed`. - pending_notifications: NotificationBuffer, + pending_notifies: NotifyBuffer, /// Scope allocation in the machine-wide lock file. `Some` for /// Pods built via `from_manifest` (production path); `None` for /// lower-level constructors (`Pod::new`, `Pod::restore`) that @@ -158,9 +158,9 @@ impl Pod { usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, system_prompt_template: None, - notifier: None, + alerter: None, event_tx: None, - pending_notifications: NotificationBuffer::new(), + pending_notifies: NotifyBuffer::new(), scope_allocation: None, callback_socket: None, prompts, @@ -231,9 +231,9 @@ impl Pod { usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, system_prompt_template: None, - notifier: None, + alerter: None, event_tx: None, - pending_notifications: NotificationBuffer::new(), + pending_notifies: NotifyBuffer::new(), scope_allocation: None, callback_socket: None, prompts, @@ -332,22 +332,22 @@ impl Pod { /// Called by the Controller immediately after spawning so that /// Pod-internal operations (compaction failures, AGENTS.md /// ingestion warnings) can surface messages to connected clients. - pub fn attach_notifier(&mut self, notifier: Notifier) { - self.notifier = Some(notifier); + pub fn attach_alerter(&mut self, alerter: Alerter) { + self.alerter = Some(alerter); } /// Attach the broadcast sender used for typed lifecycle `Event`s. /// - /// The Controller wires this alongside [`attach_notifier`] so that + /// The Controller wires this alongside [`attach_alerter`] so that /// Pod-internal operations (currently: compaction) can surface /// progress to connected clients. pub fn attach_event_tx(&mut self, event_tx: broadcast::Sender) { self.event_tx = Some(event_tx); } - fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) { - if let Some(n) = self.notifier.as_ref() { - n.notify(level, source, message); + fn alert(&self, level: AlertLevel, source: AlertSource, message: String) { + if let Some(n) = self.alerter.as_ref() { + n.alert(level, source, message); } } @@ -364,17 +364,17 @@ impl Pod { /// /// The notification will be injected as an `Item::system_message` /// into the next outgoing LLM request context (not into history). - /// See [`NotificationBuffer`] for overflow behaviour. - pub fn push_notification(&self, message: String) { - self.pending_notifications.push(message); + /// See [`NotifyBuffer`] for overflow behaviour. + pub fn push_notify(&self, message: String) { + self.pending_notifies.push(message); } /// Shared handle to the pending notification buffer. /// /// The Controller holds a clone so that `Method::Notify` arriving /// while `pod.run()` is in flight can still reach the interceptor. - pub fn notification_buffer_handle(&self) -> NotificationBuffer { - self.pending_notifications.clone() + pub fn notify_buffer_handle(&self) -> NotifyBuffer { + self.pending_notifies.clone() } /// Parent callback socket set by `from_manifest_spawned`. @@ -497,7 +497,7 @@ impl Pod { registry, compact_state, usage_history_handle, - self.pending_notifications.clone(), + self.pending_notifies.clone(), self.prompts.clone(), ); self.worker_mut().set_interceptor(interceptor); @@ -516,7 +516,7 @@ impl Pod { let Some(template) = self.system_prompt_template.take() else { return Ok(()); }; - let notifier = self.notifier.clone(); + let alerter = self.alerter.clone(); let worker = self.worker.as_mut().expect("worker present"); // Materialise any pending tool factories so the template sees the // full list of tool names. Redundant with the flush inside @@ -530,10 +530,10 @@ impl Pod { .collect(); let agents_md_read = read_agents_md(&self.pwd); for warning in agents_md_read.warnings { - if let Some(n) = notifier.as_ref() { - n.notify( - NotificationLevel::Warn, - NotificationSource::AgentsMd, + if let Some(n) = alerter.as_ref() { + n.alert( + AlertLevel::Warn, + AlertSource::AgentsMd, warning, ); } @@ -713,9 +713,9 @@ impl Pod { self.send_event(Event::CompactFailed { error: e.to_string(), }); - self.notify( - NotificationLevel::Error, - NotificationSource::Compactor, + self.alert( + AlertLevel::Error, + AlertSource::Compactor, format!("mid-run compaction failed: {e}"), ); if let Some(ref state) = self.compact_state { @@ -757,9 +757,9 @@ impl Pod { self.send_event(Event::CompactFailed { error: e.to_string(), }); - self.notify( - NotificationLevel::Warn, - NotificationSource::Compactor, + self.alert( + AlertLevel::Warn, + AlertSource::Compactor, format!("post-run compaction failed: {e}"), ); state.record_compact_failure(); @@ -1171,9 +1171,9 @@ impl Pod, St> { usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, system_prompt_template, - notifier: None, + alerter: None, event_tx: None, - pending_notifications: NotificationBuffer::new(), + pending_notifies: NotifyBuffer::new(), scope_allocation: Some(scope_allocation), callback_socket: None, prompts, @@ -1234,9 +1234,9 @@ impl Pod, St> { usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, system_prompt_template, - notifier: None, + alerter: None, event_tx: None, - pending_notifications: NotificationBuffer::new(), + pending_notifies: NotifyBuffer::new(), scope_allocation: Some(scope_allocation), callback_socket: Some(callback_socket), prompts, diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index 0571d333..f9d4292f 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -348,7 +348,7 @@ enum SendRunError { /// Write `Method::Run` to the target and read back events until we see /// either `TurnStart` (accepted) or `Error { AlreadyRunning }` -/// (rejected). Any replayed notifications that precede the response are +/// (rejected). Any replayed alerts that precede the response are /// skipped. Times out per-read so a stuck Pod doesn't hang the tool. async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) @@ -373,9 +373,9 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu .. }) => return Err(SendRunError::AlreadyRunning), Some(Event::TurnStart { .. }) => return Ok(()), - // Notifications and other pre-turn events are replayed to - // new subscribers; keep reading until the controller's - // response to our `Run` shows up. + // Alerts and other pre-turn events are replayed to new + // subscribers; keep reading until the controller's response + // to our `Run` shows up. Some(_) => continue, None => return Err(SendRunError::Io("connection closed before response".into())), } @@ -383,7 +383,7 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu } /// Connect and ask the Pod for its conversation history. Skips -/// pre-History events (such as buffered notifications replayed to new +/// pre-History events (such as buffered alerts replayed to new /// clients). Returns the raw JSON items as `serde_json::Value` since /// the pod crate already round-trips via `Value` on the wire. async fn fetch_history(socket: &Path) -> std::io::Result> { diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index a5957327..87ce0c1c 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -148,7 +148,7 @@ pub enum Event { items: Vec, greeting: Greeting, }, - Notification(Notification), + Alert(Alert), /// Pod has started compacting the current session. /// /// Fired immediately before a compaction run. Success is signalled by @@ -169,16 +169,16 @@ pub enum Event { Shutdown, } -/// User-facing notification emitted from the Pod layer. +/// User-facing alert emitted from the Pod layer. /// /// This is a separate channel from `tracing` (developer logs): entries /// here are assembled explicitly by the Pod when a condition should be /// surfaced to the person driving the client. Keep messages short and /// human-readable. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Notification { - pub level: NotificationLevel, - pub source: NotificationSource, +pub struct Alert { + pub level: AlertLevel, + pub source: AlertSource, pub message: String, /// Milliseconds since the Unix epoch. pub timestamp_ms: i64, @@ -186,14 +186,14 @@ pub struct Notification { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub enum NotificationLevel { +pub enum AlertLevel { Warn, Error, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub enum NotificationSource { +pub enum AlertSource { Pod, Worker, Compactor, @@ -462,16 +462,16 @@ mod tests { } #[test] - fn event_notification_format() { - let event = Event::Notification(Notification { - level: NotificationLevel::Warn, - source: NotificationSource::Compactor, + fn event_alert_format() { + let event = Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Compactor, message: "compaction failed".into(), timestamp_ms: 1_700_000_000_000, }); let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "notification"); + assert_eq!(parsed["event"], "alert"); assert_eq!(parsed["data"]["level"], "warn"); assert_eq!(parsed["data"]["source"], "compactor"); assert_eq!(parsed["data"]["message"], "compaction failed"); diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 533ba792..835605a7 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1,4 +1,4 @@ -use protocol::{Event, Method, NotificationLevel, NotificationSource, RunResult}; +use protocol::{Event, Method, AlertLevel, AlertSource, RunResult}; use crate::block::{Block, CompactEvent, ToolCallBlock, ToolCallState}; use crate::cache::FileCache; @@ -81,9 +81,9 @@ impl App { } pub fn push_error(&mut self, message: impl Into) { - self.blocks.push(Block::Notification { - level: NotificationLevel::Error, - source: NotificationSource::Pod, + self.blocks.push(Block::Alert { + level: AlertLevel::Error, + source: AlertSource::Pod, message: message.into(), }); } @@ -194,16 +194,16 @@ impl App { apply_cache_update(&mut self.cache, &name, args.as_deref(), output.as_deref()); } } else { - // Result for an unknown tool call. Surface it as a - // notification so it isn't silently dropped. + // Result for an unknown tool call. Surface it as an + // alert so it isn't silently dropped. let level = if is_error { - NotificationLevel::Error + AlertLevel::Error } else { - NotificationLevel::Warn + AlertLevel::Warn }; - self.blocks.push(Block::Notification { + self.blocks.push(Block::Alert { level, - source: NotificationSource::Pod, + source: AlertSource::Pod, message: format!("orphan tool result ({id}): {summary}"), }); } @@ -243,11 +243,11 @@ impl App { self.blocks .push(Block::Compact(CompactEvent::Failed { error })); } - Event::Notification(notification) => { - self.blocks.push(Block::Notification { - level: notification.level, - source: notification.source, - message: notification.message, + Event::Alert(alert) => { + self.blocks.push(Block::Alert { + level: alert.level, + source: alert.source, + message: alert.message, }); } Event::History { items, greeting } => { @@ -488,12 +488,12 @@ fn strip_cat_n_prefix(formatted: &str) -> String { out } -pub fn notification_source_label(source: NotificationSource) -> &'static str { +pub fn alert_source_label(source: AlertSource) -> &'static str { match source { - NotificationSource::Pod => "pod", - NotificationSource::Worker => "worker", - NotificationSource::Compactor => "compactor", - NotificationSource::AgentsMd => "AGENTS.md", + AlertSource::Pod => "pod", + AlertSource::Worker => "worker", + AlertSource::Compactor => "compactor", + AlertSource::AgentsMd => "AGENTS.md", } } diff --git a/crates/tui/src/block.rs b/crates/tui/src/block.rs index f825b45b..5c3201fc 100644 --- a/crates/tui/src/block.rs +++ b/crates/tui/src/block.rs @@ -7,7 +7,7 @@ #![allow(dead_code)] // Phase 5 will consume `output` in detail mode. -use protocol::{Greeting, NotificationLevel, NotificationSource}; +use protocol::{Greeting, AlertLevel, AlertSource}; pub enum Block { Greeting(Greeting), @@ -21,9 +21,9 @@ pub enum Block { text: String, }, ToolCall(ToolCallBlock), - Notification { - level: NotificationLevel, - source: NotificationSource, + Alert { + level: AlertLevel, + source: AlertSource, message: String, }, Compact(CompactEvent), diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 2290bc9d..57fc03b8 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -20,9 +20,9 @@ use ratatui::text::{Line, Span}; use ratatui::widgets::{Block as UiBlock, BorderType, Borders, Padding, Paragraph, Widget, Wrap}; use unicode_width::{UnicodeWidthChar, UnicodeWidthStr}; -use protocol::{Greeting, NotificationLevel}; +use protocol::{Greeting, AlertLevel}; -use crate::app::{App, fmt_tokens, notification_source_label}; +use crate::app::{App, fmt_tokens, alert_source_label}; use crate::block::{Block, CompactEvent}; /// Display density for the history view. @@ -313,20 +313,20 @@ fn render_block_into( // ToolCall is dispatched in `compute_history` via `tool::render_tool` // so it can consume multiple adjacent blocks (Read aggregation). Block::ToolCall(_) => unreachable!("ToolCall handled by compute_history"), - Block::Notification { + Block::Alert { level, source, message, } => { let kind = match level { - NotificationLevel::Warn => MessageKind::NoticeWarn, - NotificationLevel::Error => MessageKind::NoticeError, + AlertLevel::Warn => MessageKind::NoticeWarn, + AlertLevel::Error => MessageKind::NoticeError, }; let prefix = match level { - NotificationLevel::Warn => "[notice]", - NotificationLevel::Error => "[notice error]", + AlertLevel::Warn => "[notice]", + AlertLevel::Error => "[notice error]", }; - let label = notification_source_label(*source); + let label = alert_source_label(*source); let text = format!("{prefix} {label}: {message}"); match mode { Mode::Overview => push_overview_line(lines, &text, width, kind, ""), diff --git a/tickets/notification-naming-cleanup.md b/tickets/notification-naming-cleanup.md new file mode 100644 index 00000000..71b1579e --- /dev/null +++ b/tickets/notification-naming-cleanup.md @@ -0,0 +1,70 @@ +# Notification 用語のリネーム + +## 背景 + +Pod 周辺に二系統の「notification」があり、用語が衝突して読みづらい: + +- **outbound** (Pod → Client): `Event::Notification` / `Notifier::notify` — ユーザー向けの運用診断(compaction 失敗、tool 出力 truncated 等) +- **inbound** (External → Pod): `Method::Notify` / `NotificationBuffer` — 外部から Pod の LLM context に「メモを置く」経路 + +両者とも `Notification` / `Notify` を共有しており、コードを読む側が毎回どちらの経路かを文脈から判別する必要がある。 + +`Method::Notify` の動詞名は「呼び出し側がやる行為」として意味的に正しいので残し、**outbound 側の名詞**を `Alert` に倒して非対称を入れることで区別する。 + +## 要件 + +### outbound (Pod → Client) — `Alert` にリネーム + +protocol: + +- `Event::Notification(Notification)` → `Event::Alert(Alert)` +- `protocol::Notification` 構造体 → `protocol::Alert` +- `NotificationLevel` → `AlertLevel` +- `NotificationSource` → `AlertSource` + +pod: + +- `crates/pod/src/ipc/notifier.rs` → `alerter.rs` +- `Notifier` → `Alerter` +- `Notifier::notify(...)` → `Alerter::alert(...)` +- `Notifier::subscribe_with_snapshot()` の戻り値 `Vec` → `Vec` + +tui: + +- `Block::Notification` → `Block::Alert`(描画スタイルは変更しない) + +### inbound (External → Pod) — `Notify` 据置 + 衛生整理 + +- `Method::Notify` 据置(動詞として正しい) +- `NotificationBuffer` → `NotifyBuffer`(メソッド名と揃える) +- `PendingNotification` → `PendingNotify` +- `notify_wrapper` プロンプト名 据置 +- LLM 向け wrapper ラベル `[Notification]` 据置(LLM への見え方は変えない) + +### protocol wire 互換 + +開発初期段階のため wire 互換は意図的に切る。client / pod を同時更新する前提で、旧名を吸収する `#[serde(rename)]` 等は入れない。 + +## 範囲外 + +- 通知の content / level / source の意味論変更(純粋な rename のみ) +- `Notifier` / `NotificationBuffer` の buffer 容量や snapshot 挙動の変更 +- TUI 側 `Block::Alert` の描画スタイルの変更 + +## 完了条件 + +- 上記 rename がすべて反映されている +- ビルドが通り、既存テストが通る +- grep で旧名(outbound 文脈の `Notification` / `Notifier` / 動詞 `notify`)が消えている + +## 参照 + +- `crates/protocol/src/lib.rs`(`Event::Notification`, `Notification` struct, `NotificationLevel/Source`) +- `crates/pod/src/ipc/notifier.rs`(outbound、`Notifier`) +- `crates/pod/src/ipc/notification_buffer.rs`(inbound、`NotificationBuffer`, `PendingNotification`) +- `crates/tui/`(`Block::Notification` 描画) + +## Review +- 状態: Approve with follow-up +- レビュー詳細: [./notification-naming-cleanup.review.md](./notification-naming-cleanup.review.md) +- 日付: 2026-04-26 diff --git a/tickets/notification-naming-cleanup.review.md b/tickets/notification-naming-cleanup.review.md new file mode 100644 index 00000000..c85e0492 --- /dev/null +++ b/tickets/notification-naming-cleanup.review.md @@ -0,0 +1,42 @@ +# Review: Notification 用語のリネーム + +## 前提・要件の確認 + +### outbound (Pod → Client) — Alert 系 +- `Event::Notification(Notification)` → `Event::Alert(Alert)`: `crates/protocol/src/lib.rs:151`, `crates/protocol/src/lib.rs:178-185` で完了。 +- `NotificationLevel/Source` → `AlertLevel/AlertSource`: `crates/protocol/src/lib.rs:187-201` で完了。serde の `rename_all = "snake_case"` も維持されており wire 名は `warn` / `error` / `pod` / `worker` / `compactor` / `agents_md` のまま (元と一致)。 +- `notifier.rs` → `alerter.rs`、`Notifier` → `Alerter`、`notify()` → `alert()`: `crates/pod/src/ipc/alerter.rs:26-87` で完了。`subscribe_with_snapshot` の戻り値も `(Vec, …)` (l.78)。 +- `Block::Notification` → `Block::Alert`: `crates/tui/src/block.rs:24-28`。レンダリング側 (`crates/tui/src/ui.rs:316-335`) も match arm が更新されており、style (`MessageKind::NoticeWarn/Error` と prefix `[notice]` / `[notice error]`) は据置 — 範囲外要件を尊重している。 +- イベント名のテスト: `event_alert_format` (`crates/protocol/src/lib.rs:464-479`) で `event=alert` の wire 形を確認。 + +### inbound (External → Pod) — Notify 据置 + 衛生整理 +- `Method::Notify` 据置: `crates/protocol/src/lib.rs:18` 健在、`method_notify_json_roundtrip` (l.343) も保持。 +- `notification_buffer.rs` → `notify_buffer.rs`、`NotificationBuffer/PendingNotification` → `NotifyBuffer/PendingNotify`: `crates/pod/src/ipc/notify_buffer.rs:20-67`。 +- `format_notification` → `format_notify`: 同 l.74。 +- `notify_wrapper` プロンプト名と `[Notification]` ラベル据置: `crates/pod/src/ipc/notify_buffer.rs:78,119` および `crates/pod/src/prompt/catalog.rs:447,484` の test asserts、`crates/pod/tests/controller_test.rs:413` の test も `[Notification]` を期待 — 範囲外要件と整合。 + +### protocol wire 互換 +- 旧 `notification` event 名を吸収する `#[serde(rename)]` 等は入っていない (`crates/protocol/src/lib.rs:151` の `Alert(Alert)` は `rename_all = "snake_case"` 経由で wire 上 `event=alert` に変わる)。チケットの「wire 互換は意図的に切る」方針通り。 + +### 完了条件 +- ビルド通過 (cargo build): pod / protocol / tui で warning は既存の `end_scope` のみ、本変更に起因しない。 +- テスト: `cargo test -p pod -p protocol -p tui` で 205 件 PASS (ユニット+統合+doc 含む。0 fail / 0 ignored)。 +- grep: `Notifier|NotificationLevel|NotificationSource|NotificationBuffer|PendingNotification|notification_buffer|push_notification|attach_notifier|notifier` がコード上で 0 ヒット。残る `Notification` 一致は LLM 向け wrapper ラベル `[Notification]` (据置) と inbound 概念を指す散在コメントのみ。 + +## アーキテクチャ・スコープ +- レイヤ境界尊重: 変更は protocol / pod (ipc, controller, pod 本体) / tui に限定。llm-worker は触れていない。 +- 命名方針: `Alerter` / `alert()` の対 `Notify` (動詞) という非対称設計はチケット背景と一致しており、用語衝突を解消する最小限の手当てになっている。 +- 不必要な変更の有無: outbound 一式の rename と inbound 側の衛生 (`NotifyBuffer` / `PendingNotify` / `format_notify`) は要件に明記されている範囲のみ。`pending_notifications` フィールド名 → `pending_notifies` や `push_notification` → `push_notify` まで踏み込んだのは、フィールド型自体が `NotifyBuffer` / `PendingNotify` に変わっていることを踏まえると naming consistency 上妥当。`run_for_notification` メソッド名は `Method::Notify` 由来の "notify-driven run" を表すので残しても問題ない (今回触っていない)。 +- ファイル rename: ticket 通り `notifier.rs` → `alerter.rs`、`notification_buffer.rs` → `notify_buffer.rs` の 2 つだけで、過剰な移動なし。`mv` を使った旨 (history 保全) も妥当。 + +## 指摘事項 + +### Non-blocking / Follow-up +- `crates/pod/src/controller.rs:49` の doc comment が `Thin wrapper over `Alerter::notify`` のまま (実体は `Alerter::alert`)。1 行修正で解消可能。 + +### Nits +- `crates/protocol/src/lib.rs:39` の "via the notification buffer" や `crates/pod/src/controller.rs:255,258,346,474,480,657` の "notification buffer" 等の散在コメントは、文脈上 inbound (Notify) の話なので意味は通る。ただし型名が `NotifyBuffer` に揃ったので "notify buffer" に揃えると読み手のノイズが減る。ticket の「衛生整理」を保守的に解釈すれば任意項目。 +- `crates/pod/src/pod.rs:91,330,363,365,372,581,585`、`crates/pod/src/ipc/interceptor.rs:4,43,46`、`crates/pod/src/ipc/event.rs:10`、`crates/pod/src/prompt/agents_md.rs:23` 等の doc comment の "notification" も同様 — inbound 側は "notify" 動詞・"alert" 名詞 (outbound) の使い分けが定義されたので、コメントもこの軸で揃え直すと表面の用語衝突がさらに減る。本リネームの影響範囲外と見るなら据置で問題ない。 + +## 判断 +Approve with follow-up — 要件は完全に満たされ、ビルド/テストもクリーン。`controller.rs:49` の doc comment 修正は trivial なので、別チケット化せず本作業の最後に直しておくのが望ましい。コメント表面の用語ノイズは Nits で残るが、ticket の rename スコープ内で必要な分は果たされている。 diff --git a/tickets/submit-segment-protocol.md b/tickets/submit-segment-protocol.md index 664cc392..c8104445 100644 --- a/tickets/submit-segment-protocol.md +++ b/tickets/submit-segment-protocol.md @@ -47,15 +47,16 @@ resolver の trait 化と memory / workflow 用 resolver 実装は別チケッ text しか作れない client が引き続き存在しても良いことを protocol 仕様に明記する(`vec![Segment::Text(_)]` のみで動く)。 -### unknown variant / 未登録 resolver の扱い(要決定) +### unknown variant / 未登録 resolver の扱い -新 variant を持つ client が古い Pod に投げる、または resolver 未登録の variant を Pod が受けるケースの挙動を本チケットで決める。候補: +新 variant を持つ client が古い Pod に投げる、または resolver 未登録の variant を Pod が受けた場合は **2 経路に同時に流す**: -- (a) 黙って drop — 静かに情報が落ちて user/LLM 双方が気付けない -- (b) `[unknown input: kind=foo]` 相当の placeholder を LLM context に差し込み、LLM が気づいて指摘できる -- (c) hard error で submit 拒否 +1. **LLM context** に `[unknown input: kind=foo]` 相当の placeholder を `Item::system_message` で差し込む。LLM が「ユーザーは何かを送ろうとしたが Pod が解釈できなかった」と気づき、ユーザーに聞き返せる状態を作る。 +2. **ユーザー向け通知チャネル**(`Event::Alert` / リネーム前は `Event::Notification`)にも同時に送る。client 側で「このサーバはこの input type を解釈できません」とユーザーに直接出せる。 -(b) を仮の第一候補として実装方針を詰める。serde 側は unknown variant を吸収する形を初期から入れる。 +両経路に流すことで「user も LLM も気付けない silent drop」を避ける。serde 側は unknown variant を吸収する形(`#[serde(other)]` 相当)を初期から入れる。 + +`Event::Alert` への rename は `tickets/notification-naming-cleanup.md` で扱う。本チケットの実装時点での名称(`Event::Notification` か `Event::Alert` か)はその時の repo 状態に従う。 ## 範囲外