Method::NotifyとEvent::Notificationが紛らわしい問題

This commit is contained in:
Keisuke Hirata 2026-04-26 23:25:50 +09:00
parent 123fc3b0ad
commit e44d49e80f
18 changed files with 461 additions and 377 deletions

View File

@ -11,6 +11,7 @@
- [ ] サブミット入力
- [ ] protocol Segment 化 → [tickets/submit-segment-protocol.md](tickets/submit-segment-protocol.md)
- [ ] TUI 補完 + 型付き atom 化 → [tickets/submit-tui-completion.md](tickets/submit-tui-completion.md)
- [ ] Notification 用語リネーム (Event::Alert / Method::Notify) → [tickets/notification-naming-cleanup.md](tickets/notification-naming-cleanup.md)
- [ ] メモリ機構
- [ ] ファイル形式 + Linter 土台 → [tickets/memory-file-format.md](tickets/memory-file-format.md)
- [ ] memory / Knowledge 検索ツール → [tickets/memory-search-tools.md](tickets/memory-search-tools.md)

View File

@ -6,8 +6,8 @@ use llm_worker::llm_client::client::LlmClient;
use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::ipc::notification_buffer::NotificationBuffer;
use crate::ipc::notifier::Notifier;
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::pod::{Pod, PodError, PodRunResult};
use crate::spawn::comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
@ -17,7 +17,7 @@ use crate::shared_state::{PodSharedState, PodStatus};
use crate::ipc::server::SocketServer;
use crate::spawn::tool::spawn_pod_tool;
use crate::spawn::registry::SpawnedPodRegistry;
use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult};
use protocol::{ErrorCode, Event, Method, AlertLevel, AlertSource, RunResult, TurnResult};
// ---------------------------------------------------------------------------
// PodHandle — client-facing, Clone-able
@ -29,7 +29,7 @@ pub struct PodHandle {
event_tx: broadcast::Sender<Event>,
pub shared_state: Arc<PodSharedState>,
pub runtime_dir: Arc<RuntimeDir>,
pub notifier: Notifier,
pub alerter: Alerter,
}
impl PodHandle {
@ -46,9 +46,9 @@ impl PodHandle {
self.event_tx.send(event)
}
/// Emit a user-facing notification. Thin wrapper over `Notifier::notify`.
pub fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) {
self.notifier.notify(level, source, message);
/// Emit a user-facing alert. Thin wrapper over `Alerter::alert`.
pub fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
self.alerter.alert(level, source, message);
}
}
@ -72,7 +72,7 @@ impl PodController {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (method_tx, mut method_rx) = mpsc::channel::<Method>(32);
let (event_tx, _) = broadcast::channel::<Event>(256);
let notifier = Notifier::new(event_tx.clone());
let alerter = Alerter::new(event_tx.clone());
let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default();
let greeting = build_greeting(&pod);
@ -95,13 +95,13 @@ impl PodController {
event_tx: event_tx.clone(),
shared_state: shared_state.clone(),
runtime_dir: runtime_dir.clone(),
notifier: notifier.clone(),
alerter: alerter.clone(),
};
// Hand the notifier to the Pod so internal operations (compaction,
// Hand the alerter to the Pod so internal operations (compaction,
// AGENTS.md ingestion during the first turn) can emit user-facing
// notifications on the same channel.
pod.attach_notifier(notifier.clone());
pod.attach_alerter(alerter.clone());
// Also hand the raw broadcast sender so Pod-internal operations
// can emit typed lifecycle `Event`s (currently: compact progress).
pod.attach_event_tx(event_tx.clone());
@ -212,11 +212,11 @@ impl PodController {
});
});
let notifier_for_worker = notifier.clone();
let alerter_for_worker = alerter.clone();
worker.on_warning(move |message| {
notifier_for_worker.notify(
NotificationLevel::Warn,
NotificationSource::Worker,
alerter_for_worker.alert(
AlertLevel::Warn,
AlertSource::Worker,
message.to_owned(),
);
});
@ -257,7 +257,7 @@ impl PodController {
// `Method::Notify` into the buffer even while `pod` is held by
// an in-flight `run_for_notification` / `run` future.
let cancel_tx = pod.worker_mut().cancel_sender();
let notification_buffer = pod.notification_buffer_handle();
let notify_buffer = pod.notify_buffer_handle();
tokio::spawn(async move {
// Hold socket server alive for the lifetime of the controller task
@ -303,7 +303,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&notification_buffer,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
@ -313,9 +313,9 @@ impl PodController {
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Compactor,
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
@ -334,7 +334,7 @@ impl PodController {
}
Method::Notify { message } => {
pod.push_notification(message);
pod.push_notify(message);
if shared_state.get_status() != PodStatus::Idle {
// RUNNING / Paused: the buffer push is the
// entire operation; the in-flight turn (or
@ -353,7 +353,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&notification_buffer,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
@ -363,9 +363,9 @@ impl PodController {
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Compactor,
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
@ -400,7 +400,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&notification_buffer,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
@ -410,9 +410,9 @@ impl PodController {
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Compactor,
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
@ -475,7 +475,7 @@ impl PodController {
// request will inject it as a system message
// via `PodInterceptor::pre_llm_request`.
let text = crate::ipc::event::render_event(&event);
pod.push_notification(text);
pod.push_notify(text);
// Auto-kick a turn if the Pod is idle so the
// notification is not stranded. Matches the
// `Method::Notify` idle path.
@ -489,7 +489,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&notification_buffer,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
@ -499,9 +499,9 @@ impl PodController {
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Compactor,
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
@ -563,7 +563,7 @@ async fn run_with_cancel_support<F>(
event_tx: &broadcast::Sender<Event>,
cancel_tx: &mpsc::Sender<()>,
shared_state: &Arc<PodSharedState>,
notification_buffer: &NotificationBuffer,
notify_buffer: &NotifyBuffer,
parent_socket: Option<&std::path::PathBuf>,
self_name: &str,
spawned_registry: &Arc<SpawnedPodRegistry>,
@ -645,7 +645,7 @@ where
Some(Method::Notify { message }) => {
// Route into the buffer; the in-flight turn will
// drain it at its next pre_llm_request.
notification_buffer.push(message);
notify_buffer.push(message);
}
Some(Method::GetHistory) => {}
Some(Method::PodEvent(event)) => {
@ -664,7 +664,7 @@ where
&self_parent_socket,
)
.await;
notification_buffer.push(crate::ipc::event::render_event(&event));
notify_buffer.push(crate::ipc::event::render_event(&event));
}
None => {
let _ = cancel_tx.try_send(());

View File

@ -0,0 +1,168 @@
//! User-facing alert channel for Pod → client.
//!
//! Separate from `tracing` (which is for developer logs). Alerts
//! are short human-readable messages the Pod layer wants a client to
//! see — for example "compaction failed", "tool output truncated".
//!
//! Each alert is broadcast on the shared `Event` channel and
//! also appended to an in-memory buffer so that clients connecting
//! after the fact still see everything emitted during the session.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast;
use protocol::{Alert, AlertLevel, AlertSource, Event};
/// Upper bound on buffered alerts. When exceeded, the oldest
/// entries are discarded so a long-running session cannot leak
/// memory through a pathological loop of recurring alerts
/// (e.g. compaction failing every turn).
const MAX_BUFFERED_ALERTS: usize = 512;
#[derive(Clone)]
pub struct Alerter {
inner: Arc<Inner>,
}
struct Inner {
event_tx: broadcast::Sender<Event>,
buffer: Mutex<VecDeque<Alert>>,
}
impl Alerter {
pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
Self {
inner: Arc::new(Inner {
event_tx,
buffer: Mutex::new(VecDeque::with_capacity(MAX_BUFFERED_ALERTS)),
}),
}
}
/// Record and broadcast an alert.
///
/// The broadcast may have no subscribers (e.g. during Pod
/// construction before any client has connected); the buffer
/// guarantees the message is still delivered once a client
/// attaches.
///
/// The buffer mutex is held across `broadcast::send` to make
/// `subscribe_with_snapshot` race-free — a client that snapshots
/// the buffer while holding the same lock sees every alert
/// exactly once: older ones from the snapshot, newer ones from
/// the freshly-subscribed receiver.
pub fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
let alert = Alert {
level,
source,
message,
timestamp_ms: now_ms(),
};
if let Ok(mut buf) = self.inner.buffer.lock() {
if buf.len() >= MAX_BUFFERED_ALERTS {
buf.pop_front();
}
buf.push_back(alert.clone());
let _ = self.inner.event_tx.send(Event::Alert(alert));
}
}
/// Subscribe and atomically snapshot the current buffer.
///
/// The returned snapshot contains alerts emitted before
/// this call; the receiver will deliver alerts emitted
/// after. An alert cannot appear in both.
pub fn subscribe_with_snapshot(&self) -> (Vec<Alert>, broadcast::Receiver<Event>) {
let buf = self
.inner
.buffer
.lock()
.expect("alerter buffer mutex poisoned");
let rx = self.inner.event_tx.subscribe();
let snapshot: Vec<Alert> = buf.iter().cloned().collect();
(snapshot, rx)
}
}
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn alert_broadcasts_to_existing_subscriber() {
let (tx, _keep) = broadcast::channel::<Event>(8);
let alerter = Alerter::new(tx);
let (_snapshot, mut rx) = alerter.subscribe_with_snapshot();
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
"test message".into(),
);
match rx.try_recv() {
Ok(Event::Alert(a)) => assert_eq!(a.message, "test message"),
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn late_subscriber_sees_earlier_alerts_via_snapshot() {
let (tx, _keep) = broadcast::channel::<Event>(8);
let alerter = Alerter::new(tx);
alerter.alert(AlertLevel::Error, AlertSource::Pod, "first".into());
alerter.alert(AlertLevel::Warn, AlertSource::AgentsMd, "second".into());
let (snapshot, mut rx) = alerter.subscribe_with_snapshot();
assert_eq!(snapshot.len(), 2);
assert_eq!(snapshot[0].message, "first");
assert_eq!(snapshot[1].message, "second");
assert!(rx.try_recv().is_err()); // nothing pending on the receiver
}
#[test]
fn buffer_discards_oldest_past_cap() {
let (tx, _keep) = broadcast::channel::<Event>(1024);
let alerter = Alerter::new(tx);
for i in 0..(MAX_BUFFERED_ALERTS + 50) {
alerter.alert(AlertLevel::Warn, AlertSource::Worker, format!("msg-{i}"));
}
let (snapshot, _rx) = alerter.subscribe_with_snapshot();
assert_eq!(snapshot.len(), MAX_BUFFERED_ALERTS);
// First 50 were evicted; the oldest remaining is msg-50.
assert_eq!(snapshot.first().unwrap().message, "msg-50");
let last = format!("msg-{}", MAX_BUFFERED_ALERTS + 49);
assert_eq!(snapshot.last().unwrap().message, last);
}
#[test]
fn subscribe_snapshot_and_live_do_not_overlap() {
let (tx, _keep) = broadcast::channel::<Event>(8);
let alerter = Alerter::new(tx);
alerter.alert(AlertLevel::Warn, AlertSource::Worker, "historic".into());
let (snapshot, mut rx) = alerter.subscribe_with_snapshot();
alerter.alert(AlertLevel::Error, AlertSource::Worker, "live".into());
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0].message, "historic");
match rx.try_recv() {
Ok(Event::Alert(a)) => assert_eq!(a.message, "live"),
other => panic!("unexpected: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
}

View File

@ -25,7 +25,7 @@ use crate::hook::{
AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary,
TurnEndInfo,
};
use crate::ipc::notification_buffer::{NotificationBuffer, format_notification};
use crate::ipc::notify_buffer::{NotifyBuffer, format_notify};
use crate::prompt::catalog::PromptCatalog;
use crate::compact::token_counter::total_tokens_impl;
use tracing::warn;
@ -42,7 +42,7 @@ pub(crate) struct PodInterceptor {
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
/// Pending-notification buffer drained into the per-request
/// context at the head of `pre_llm_request`.
pending_notifications: NotificationBuffer,
pending_notifies: NotifyBuffer,
/// Prompt catalog used to render the injected notification wrapper.
prompts: Arc<PromptCatalog>,
/// Next turn index assigned by `on_prompt_submit`.
@ -56,14 +56,14 @@ impl PodInterceptor {
registry: Arc<HookRegistry>,
compact_state: Option<Arc<CompactState>>,
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
pending_notifications: NotificationBuffer,
pending_notifies: NotifyBuffer,
prompts: Arc<PromptCatalog>,
) -> Self {
Self {
registry,
compact_state,
usage_history,
pending_notifications,
pending_notifies,
prompts,
next_turn_index: AtomicUsize::new(0),
tool_calls_this_turn: AtomicUsize::new(0),
@ -127,16 +127,16 @@ impl Interceptor for PodInterceptor {
// into the per-request context as transient system messages.
// These are not persisted to the Worker history; they exist only
// for this single LLM request.
for notification in self.pending_notifications.drain() {
match format_notification(&notification, &self.prompts) {
for n in self.pending_notifies.drain() {
match format_notify(&n, &self.prompts) {
Ok(item) => context.push(item),
Err(e) => {
// A render failure here would starve the LLM of the
// notification text. Fall back to the raw message —
// notify text. Fall back to the raw message —
// it still carries the intent, just without the
// wrapper phrasing.
warn!(error = %e, "failed to render notify_wrapper; using raw message");
context.push(Item::system_message(notification.message.clone()));
context.push(Item::system_message(n.message.clone()));
}
}
}
@ -296,7 +296,7 @@ mod tests {
registry,
Some(state),
Some(history),
NotificationBuffer::new(),
NotifyBuffer::new(),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx = ctx_items;
@ -320,7 +320,7 @@ mod tests {
registry,
Some(state),
Some(history),
NotificationBuffer::new(),
NotifyBuffer::new(),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx = ctx_items;
@ -345,7 +345,7 @@ mod tests {
registry,
Some(state),
Some(history),
NotificationBuffer::new(),
NotifyBuffer::new(),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx = ctx_items;
@ -364,7 +364,7 @@ mod tests {
registry,
None,
None,
NotificationBuffer::new(),
NotifyBuffer::new(),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx: Vec<Item> = Vec::new();
@ -385,9 +385,9 @@ mod tests {
}
#[tokio::test]
async fn pre_llm_request_drains_pending_notifications_into_context() {
async fn pre_llm_request_drains_pending_notifies_into_context() {
let registry = Arc::new(HookRegistryBuilder::new().build());
let buffer = NotificationBuffer::new();
let buffer = NotifyBuffer::new();
buffer.push("first".into());
buffer.push("second".into());
@ -419,7 +419,7 @@ mod tests {
// When compaction yields, notifications remain in the buffer for
// the next pre_llm_request (after compaction + resume).
let registry = Arc::new(HookRegistryBuilder::new().build());
let buffer = NotificationBuffer::new();
let buffer = NotifyBuffer::new();
buffer.push("msg".into());
let state = Arc::new(CompactState::new(None, Some(100), 2));
@ -455,7 +455,7 @@ mod tests {
registry,
None,
None,
NotificationBuffer::new(),
NotifyBuffer::new(),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx: Vec<Item> = Vec::new();

View File

@ -1,6 +1,6 @@
pub mod alerter;
pub mod event;
pub mod notifier;
pub mod server;
pub(crate) mod interceptor;
pub(crate) mod notification_buffer;
pub(crate) mod notify_buffer;

View File

@ -1,191 +0,0 @@
//! User-facing notification channel for Pod → client.
//!
//! Separate from `tracing` (which is for developer logs). Notifications
//! are short human-readable messages the Pod layer wants a client to
//! see — for example "compaction failed", "tool output truncated".
//!
//! Each notification is broadcast on the shared `Event` channel and
//! also appended to an in-memory buffer so that clients connecting
//! after the fact still see everything emitted during the session.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast;
use protocol::{Event, Notification, NotificationLevel, NotificationSource};
/// Upper bound on buffered notifications. When exceeded, the oldest
/// entries are discarded so a long-running session cannot leak
/// memory through a pathological loop of recurring notifications
/// (e.g. compaction failing every turn).
const MAX_BUFFERED_NOTIFICATIONS: usize = 512;
#[derive(Clone)]
pub struct Notifier {
inner: Arc<Inner>,
}
struct Inner {
event_tx: broadcast::Sender<Event>,
buffer: Mutex<VecDeque<Notification>>,
}
impl Notifier {
pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
Self {
inner: Arc::new(Inner {
event_tx,
buffer: Mutex::new(VecDeque::with_capacity(MAX_BUFFERED_NOTIFICATIONS)),
}),
}
}
/// Record and broadcast a notification.
///
/// The broadcast may have no subscribers (e.g. during Pod
/// construction before any client has connected); the buffer
/// guarantees the message is still delivered once a client
/// attaches.
///
/// The buffer mutex is held across `broadcast::send` to make
/// `subscribe_with_snapshot` race-free — a client that snapshots
/// the buffer while holding the same lock sees every notification
/// exactly once: older ones from the snapshot, newer ones from
/// the freshly-subscribed receiver.
pub fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) {
let notification = Notification {
level,
source,
message,
timestamp_ms: now_ms(),
};
if let Ok(mut buf) = self.inner.buffer.lock() {
if buf.len() >= MAX_BUFFERED_NOTIFICATIONS {
buf.pop_front();
}
buf.push_back(notification.clone());
let _ = self
.inner
.event_tx
.send(Event::Notification(notification));
}
}
/// Subscribe and atomically snapshot the current buffer.
///
/// The returned snapshot contains notifications emitted before
/// this call; the receiver will deliver notifications emitted
/// after. A notification cannot appear in both.
pub fn subscribe_with_snapshot(&self) -> (Vec<Notification>, broadcast::Receiver<Event>) {
let buf = self
.inner
.buffer
.lock()
.expect("notifier buffer mutex poisoned");
let rx = self.inner.event_tx.subscribe();
let snapshot: Vec<Notification> = buf.iter().cloned().collect();
(snapshot, rx)
}
}
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn notify_broadcasts_to_existing_subscriber() {
let (tx, _keep) = broadcast::channel::<Event>(8);
let notifier = Notifier::new(tx);
let (_snapshot, mut rx) = notifier.subscribe_with_snapshot();
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Compactor,
"test message".into(),
);
match rx.try_recv() {
Ok(Event::Notification(n)) => assert_eq!(n.message, "test message"),
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn late_subscriber_sees_earlier_notifications_via_snapshot() {
let (tx, _keep) = broadcast::channel::<Event>(8);
let notifier = Notifier::new(tx);
notifier.notify(
NotificationLevel::Error,
NotificationSource::Pod,
"first".into(),
);
notifier.notify(
NotificationLevel::Warn,
NotificationSource::AgentsMd,
"second".into(),
);
let (snapshot, mut rx) = notifier.subscribe_with_snapshot();
assert_eq!(snapshot.len(), 2);
assert_eq!(snapshot[0].message, "first");
assert_eq!(snapshot[1].message, "second");
assert!(rx.try_recv().is_err()); // nothing pending on the receiver
}
#[test]
fn buffer_discards_oldest_past_cap() {
let (tx, _keep) = broadcast::channel::<Event>(1024);
let notifier = Notifier::new(tx);
for i in 0..(MAX_BUFFERED_NOTIFICATIONS + 50) {
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Worker,
format!("msg-{i}"),
);
}
let (snapshot, _rx) = notifier.subscribe_with_snapshot();
assert_eq!(snapshot.len(), MAX_BUFFERED_NOTIFICATIONS);
// First 50 were evicted; the oldest remaining is msg-50.
assert_eq!(snapshot.first().unwrap().message, "msg-50");
let last = format!("msg-{}", MAX_BUFFERED_NOTIFICATIONS + 49);
assert_eq!(snapshot.last().unwrap().message, last);
}
#[test]
fn subscribe_snapshot_and_live_do_not_overlap() {
let (tx, _keep) = broadcast::channel::<Event>(8);
let notifier = Notifier::new(tx);
notifier.notify(
NotificationLevel::Warn,
NotificationSource::Worker,
"historic".into(),
);
let (snapshot, mut rx) = notifier.subscribe_with_snapshot();
notifier.notify(
NotificationLevel::Error,
NotificationSource::Worker,
"live".into(),
);
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0].message, "historic");
match rx.try_recv() {
Ok(Event::Notification(n)) => assert_eq!(n.message, "live"),
other => panic!("unexpected: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
}

View File

@ -1,6 +1,6 @@
//! Pending-notification buffer for `Method::Notify`.
//! Pending-notify buffer for `Method::Notify`.
//!
//! Notifications are queued here by the Controller and drained by
//! Notify entries are queued here by the Controller and drained by
//! `PodInterceptor::pre_llm_request` into the per-request context
//! (never into the Worker's persistent history). Each queued entry
//! becomes one `Item::system_message` in the outgoing request.
@ -13,56 +13,53 @@ use tracing::warn;
use crate::prompt::catalog::{CatalogError, PromptCatalog};
/// Maximum queued notifications. Oldest entries are dropped beyond this.
/// Maximum queued notify entries. Oldest entries are dropped beyond this.
const CAPACITY: usize = 128;
/// One pending notification awaiting injection into the next LLM request.
/// One pending notify entry awaiting injection into the next LLM request.
#[derive(Debug, Clone)]
pub struct PendingNotification {
pub struct PendingNotify {
pub message: String,
}
/// Shared, mutex-guarded buffer of pending notifications.
/// Shared, mutex-guarded buffer of pending notify entries.
///
/// Cloned between the Pod (producer) and PodInterceptor (consumer).
#[derive(Clone, Default)]
pub struct NotificationBuffer {
inner: Arc<Mutex<VecDeque<PendingNotification>>>,
pub struct NotifyBuffer {
inner: Arc<Mutex<VecDeque<PendingNotify>>>,
}
impl NotificationBuffer {
impl NotifyBuffer {
pub fn new() -> Self {
Self::default()
}
/// Push a notification onto the queue. If the queue is full, the
/// Push a notify entry onto the queue. If the queue is full, the
/// oldest entry is dropped and a `tracing::warn` is emitted — the
/// caller should never hit this in normal operation.
pub fn push(&self, message: String) {
let mut q = self.inner.lock().expect("notification buffer poisoned");
let mut q = self.inner.lock().expect("notify buffer poisoned");
if q.len() >= CAPACITY {
let dropped = q.pop_front();
warn!(
capacity = CAPACITY,
dropped_message = dropped.as_ref().map(|n| n.message.as_str()),
"notification buffer overflow; dropped oldest"
"notify buffer overflow; dropped oldest"
);
}
q.push_back(PendingNotification { message });
q.push_back(PendingNotify { message });
}
/// Remove and return all pending notifications in FIFO order.
pub fn drain(&self) -> Vec<PendingNotification> {
let mut q = self.inner.lock().expect("notification buffer poisoned");
/// Remove and return all pending notify entries in FIFO order.
pub fn drain(&self) -> Vec<PendingNotify> {
let mut q = self.inner.lock().expect("notify buffer poisoned");
q.drain(..).collect()
}
/// Number of pending notifications. Primarily for tests.
/// Number of pending notify entries. Primarily for tests.
pub fn len(&self) -> usize {
self.inner
.lock()
.expect("notification buffer poisoned")
.len()
self.inner.lock().expect("notify buffer poisoned").len()
}
pub fn is_empty(&self) -> bool {
@ -70,12 +67,12 @@ impl NotificationBuffer {
}
}
/// Format a single pending notification into the `Item::system_message`
/// Format a single pending notify entry into the `Item::system_message`
/// that gets injected into the per-request context. The wrapper body
/// comes from `PodPrompt::NotifyWrapper` so the surrounding phrasing
/// can be customised via a prompt pack (translation, tone, ...).
pub(crate) fn format_notification(
n: &PendingNotification,
pub(crate) fn format_notify(
n: &PendingNotify,
prompts: &PromptCatalog,
) -> Result<Item, CatalogError> {
let text = prompts.notify_wrapper(&n.message)?;
@ -88,7 +85,7 @@ mod tests {
#[test]
fn push_then_drain_preserves_order() {
let buf = NotificationBuffer::new();
let buf = NotifyBuffer::new();
buf.push("one".into());
buf.push("two".into());
let drained = buf.drain();
@ -100,7 +97,7 @@ mod tests {
#[test]
fn capacity_drops_oldest() {
let buf = NotificationBuffer::new();
let buf = NotifyBuffer::new();
for i in 0..(CAPACITY + 5) {
buf.push(format!("msg{i}"));
}
@ -112,12 +109,12 @@ mod tests {
}
#[test]
fn format_notification_includes_message_and_nonblocking_hint() {
let n = PendingNotification {
fn format_notify_includes_message_and_nonblocking_hint() {
let n = PendingNotify {
message: "hello".into(),
};
let catalog = PromptCatalog::builtins_only().unwrap();
let item = format_notification(&n, &catalog).unwrap();
let item = format_notify(&n, &catalog).unwrap();
let text = item.as_text().unwrap_or_default().to_string();
assert!(text.contains("[Notification]"));
assert!(text.contains("hello"));

View File

@ -62,17 +62,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer);
// Atomically subscribe and snapshot buffered notifications so that
// Atomically subscribe and snapshot buffered alerts so that
// warnings emitted before this client connected are replayed
// exactly once — they appear in the snapshot, and any notification
// exactly once — they appear in the snapshot, and any alert
// arriving afterwards reaches us through `rx`.
let (notification_snapshot, mut rx) = handle.notifier.subscribe_with_snapshot();
for notification in notification_snapshot {
if writer
.write(&Event::Notification(notification))
.await
.is_err()
{
let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot();
for alert in alert_snapshot {
if writer.write(&Event::Alert(alert)).await.is_err() {
return;
}
}

View File

@ -15,7 +15,7 @@ pub use compact::token_counter::{EstimateSource, SplitPoint, TokenEstimate};
pub use controller::{PodController, PodHandle, ShutdownReceiver};
pub use factory::{FactoryError, PodFactory};
pub use hook::{Hook, HookEventKind, HookRegistryBuilder};
pub use ipc::notifier::Notifier;
pub use ipc::alerter::Alerter;
pub use ipc::server::SocketServer;
pub use manifest::{
AuthRef, ModelManifest, PodManifest, PodManifestConfig, PodMetaConfig, Scope, SchemeKind,

View File

@ -19,8 +19,8 @@ use crate::hook::{
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
PreRequestInfo, PreToolCall,
};
use crate::ipc::notification_buffer::NotificationBuffer;
use crate::ipc::notifier::Notifier;
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::interceptor::PodInterceptor;
use crate::prompt::loader::PromptLoader;
use crate::prompt::catalog::{CatalogError, PromptCatalog};
@ -28,7 +28,7 @@ use crate::runtime::dir;
use crate::runtime::scope_lock::{self, ScopeAllocationGuard, ScopeLockError};
use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate};
use crate::compact::usage_tracker::UsageTracker;
use protocol::{Event, NotificationLevel, NotificationSource};
use protocol::{Event, AlertLevel, AlertSource};
use tokio::sync::broadcast;
use async_trait::async_trait;
use llm_worker::interceptor::PreRequestAction;
@ -90,16 +90,16 @@ pub struct Pod<C: LlmClient, St: Store> {
system_prompt_template: Option<SystemPromptTemplate>,
/// User-facing notification sink attached by the Controller at
/// spawn time. `None` in tests / direct `Pod::new` usage.
notifier: Option<Notifier>,
alerter: Option<Alerter>,
/// Broadcast sender for typed lifecycle `Event`s (compact progress,
/// etc.). Attached by the Controller alongside `notifier`. Unlike
/// etc.). Attached by the Controller alongside `alerter`. Unlike
/// notifications, events sent here are NOT replayed to clients that
/// connect after the fact — they are fire-and-forget broadcasts.
event_tx: Option<broadcast::Sender<Event>>,
/// Queue of pending `Method::Notify` notifications awaiting
/// injection into the next LLM request. Shared with the
/// PodInterceptor installed in `ensure_interceptor_installed`.
pending_notifications: NotificationBuffer,
pending_notifies: NotifyBuffer,
/// Scope allocation in the machine-wide lock file. `Some` for
/// Pods built via `from_manifest` (production path); `None` for
/// lower-level constructors (`Pod::new`, `Pod::restore`) that
@ -158,9 +158,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
tracker: None,
system_prompt_template: None,
notifier: None,
alerter: None,
event_tx: None,
pending_notifications: NotificationBuffer::new(),
pending_notifies: NotifyBuffer::new(),
scope_allocation: None,
callback_socket: None,
prompts,
@ -231,9 +231,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history: Arc::new(Mutex::new(state.usage_history)),
tracker: None,
system_prompt_template: None,
notifier: None,
alerter: None,
event_tx: None,
pending_notifications: NotificationBuffer::new(),
pending_notifies: NotifyBuffer::new(),
scope_allocation: None,
callback_socket: None,
prompts,
@ -332,22 +332,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// Called by the Controller immediately after spawning so that
/// Pod-internal operations (compaction failures, AGENTS.md
/// ingestion warnings) can surface messages to connected clients.
pub fn attach_notifier(&mut self, notifier: Notifier) {
self.notifier = Some(notifier);
pub fn attach_alerter(&mut self, alerter: Alerter) {
self.alerter = Some(alerter);
}
/// Attach the broadcast sender used for typed lifecycle `Event`s.
///
/// The Controller wires this alongside [`attach_notifier`] so that
/// The Controller wires this alongside [`attach_alerter`] so that
/// Pod-internal operations (currently: compaction) can surface
/// progress to connected clients.
pub fn attach_event_tx(&mut self, event_tx: broadcast::Sender<Event>) {
self.event_tx = Some(event_tx);
}
fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) {
if let Some(n) = self.notifier.as_ref() {
n.notify(level, source, message);
fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
if let Some(n) = self.alerter.as_ref() {
n.alert(level, source, message);
}
}
@ -364,17 +364,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
///
/// The notification will be injected as an `Item::system_message`
/// into the next outgoing LLM request context (not into history).
/// See [`NotificationBuffer`] for overflow behaviour.
pub fn push_notification(&self, message: String) {
self.pending_notifications.push(message);
/// See [`NotifyBuffer`] for overflow behaviour.
pub fn push_notify(&self, message: String) {
self.pending_notifies.push(message);
}
/// Shared handle to the pending notification buffer.
///
/// The Controller holds a clone so that `Method::Notify` arriving
/// while `pod.run()` is in flight can still reach the interceptor.
pub fn notification_buffer_handle(&self) -> NotificationBuffer {
self.pending_notifications.clone()
pub fn notify_buffer_handle(&self) -> NotifyBuffer {
self.pending_notifies.clone()
}
/// Parent callback socket set by `from_manifest_spawned`.
@ -497,7 +497,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
registry,
compact_state,
usage_history_handle,
self.pending_notifications.clone(),
self.pending_notifies.clone(),
self.prompts.clone(),
);
self.worker_mut().set_interceptor(interceptor);
@ -516,7 +516,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let Some(template) = self.system_prompt_template.take() else {
return Ok(());
};
let notifier = self.notifier.clone();
let alerter = self.alerter.clone();
let worker = self.worker.as_mut().expect("worker present");
// Materialise any pending tool factories so the template sees the
// full list of tool names. Redundant with the flush inside
@ -530,10 +530,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.collect();
let agents_md_read = read_agents_md(&self.pwd);
for warning in agents_md_read.warnings {
if let Some(n) = notifier.as_ref() {
n.notify(
NotificationLevel::Warn,
NotificationSource::AgentsMd,
if let Some(n) = alerter.as_ref() {
n.alert(
AlertLevel::Warn,
AlertSource::AgentsMd,
warning,
);
}
@ -713,9 +713,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.send_event(Event::CompactFailed {
error: e.to_string(),
});
self.notify(
NotificationLevel::Error,
NotificationSource::Compactor,
self.alert(
AlertLevel::Error,
AlertSource::Compactor,
format!("mid-run compaction failed: {e}"),
);
if let Some(ref state) = self.compact_state {
@ -757,9 +757,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.send_event(Event::CompactFailed {
error: e.to_string(),
});
self.notify(
NotificationLevel::Warn,
NotificationSource::Compactor,
self.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction failed: {e}"),
);
state.record_compact_failure();
@ -1171,9 +1171,9 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
system_prompt_template,
notifier: None,
alerter: None,
event_tx: None,
pending_notifications: NotificationBuffer::new(),
pending_notifies: NotifyBuffer::new(),
scope_allocation: Some(scope_allocation),
callback_socket: None,
prompts,
@ -1234,9 +1234,9 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
system_prompt_template,
notifier: None,
alerter: None,
event_tx: None,
pending_notifications: NotificationBuffer::new(),
pending_notifies: NotifyBuffer::new(),
scope_allocation: Some(scope_allocation),
callback_socket: Some(callback_socket),
prompts,

View File

@ -348,7 +348,7 @@ enum SendRunError {
/// Write `Method::Run` to the target and read back events until we see
/// either `TurnStart` (accepted) or `Error { AlreadyRunning }`
/// (rejected). Any replayed notifications that precede the response are
/// (rejected). Any replayed alerts that precede the response are
/// skipped. Times out per-read so a stuck Pod doesn't hang the tool.
async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
@ -373,9 +373,9 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu
..
}) => return Err(SendRunError::AlreadyRunning),
Some(Event::TurnStart { .. }) => return Ok(()),
// Notifications and other pre-turn events are replayed to
// new subscribers; keep reading until the controller's
// response to our `Run` shows up.
// Alerts and other pre-turn events are replayed to new
// subscribers; keep reading until the controller's response
// to our `Run` shows up.
Some(_) => continue,
None => return Err(SendRunError::Io("connection closed before response".into())),
}
@ -383,7 +383,7 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu
}
/// Connect and ask the Pod for its conversation history. Skips
/// pre-History events (such as buffered notifications replayed to new
/// pre-History events (such as buffered alerts replayed to new
/// clients). Returns the raw JSON items as `serde_json::Value` since
/// the pod crate already round-trips via `Value` on the wire.
async fn fetch_history(socket: &Path) -> std::io::Result<Vec<serde_json::Value>> {

View File

@ -148,7 +148,7 @@ pub enum Event {
items: Vec<serde_json::Value>,
greeting: Greeting,
},
Notification(Notification),
Alert(Alert),
/// Pod has started compacting the current session.
///
/// Fired immediately before a compaction run. Success is signalled by
@ -169,16 +169,16 @@ pub enum Event {
Shutdown,
}
/// User-facing notification emitted from the Pod layer.
/// User-facing alert emitted from the Pod layer.
///
/// This is a separate channel from `tracing` (developer logs): entries
/// here are assembled explicitly by the Pod when a condition should be
/// surfaced to the person driving the client. Keep messages short and
/// human-readable.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notification {
pub level: NotificationLevel,
pub source: NotificationSource,
pub struct Alert {
pub level: AlertLevel,
pub source: AlertSource,
pub message: String,
/// Milliseconds since the Unix epoch.
pub timestamp_ms: i64,
@ -186,14 +186,14 @@ pub struct Notification {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NotificationLevel {
pub enum AlertLevel {
Warn,
Error,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NotificationSource {
pub enum AlertSource {
Pod,
Worker,
Compactor,
@ -462,16 +462,16 @@ mod tests {
}
#[test]
fn event_notification_format() {
let event = Event::Notification(Notification {
level: NotificationLevel::Warn,
source: NotificationSource::Compactor,
fn event_alert_format() {
let event = Event::Alert(Alert {
level: AlertLevel::Warn,
source: AlertSource::Compactor,
message: "compaction failed".into(),
timestamp_ms: 1_700_000_000_000,
});
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "notification");
assert_eq!(parsed["event"], "alert");
assert_eq!(parsed["data"]["level"], "warn");
assert_eq!(parsed["data"]["source"], "compactor");
assert_eq!(parsed["data"]["message"], "compaction failed");

View File

@ -1,4 +1,4 @@
use protocol::{Event, Method, NotificationLevel, NotificationSource, RunResult};
use protocol::{Event, Method, AlertLevel, AlertSource, RunResult};
use crate::block::{Block, CompactEvent, ToolCallBlock, ToolCallState};
use crate::cache::FileCache;
@ -81,9 +81,9 @@ impl App {
}
pub fn push_error(&mut self, message: impl Into<String>) {
self.blocks.push(Block::Notification {
level: NotificationLevel::Error,
source: NotificationSource::Pod,
self.blocks.push(Block::Alert {
level: AlertLevel::Error,
source: AlertSource::Pod,
message: message.into(),
});
}
@ -194,16 +194,16 @@ impl App {
apply_cache_update(&mut self.cache, &name, args.as_deref(), output.as_deref());
}
} else {
// Result for an unknown tool call. Surface it as a
// notification so it isn't silently dropped.
// Result for an unknown tool call. Surface it as an
// alert so it isn't silently dropped.
let level = if is_error {
NotificationLevel::Error
AlertLevel::Error
} else {
NotificationLevel::Warn
AlertLevel::Warn
};
self.blocks.push(Block::Notification {
self.blocks.push(Block::Alert {
level,
source: NotificationSource::Pod,
source: AlertSource::Pod,
message: format!("orphan tool result ({id}): {summary}"),
});
}
@ -243,11 +243,11 @@ impl App {
self.blocks
.push(Block::Compact(CompactEvent::Failed { error }));
}
Event::Notification(notification) => {
self.blocks.push(Block::Notification {
level: notification.level,
source: notification.source,
message: notification.message,
Event::Alert(alert) => {
self.blocks.push(Block::Alert {
level: alert.level,
source: alert.source,
message: alert.message,
});
}
Event::History { items, greeting } => {
@ -488,12 +488,12 @@ fn strip_cat_n_prefix(formatted: &str) -> String {
out
}
pub fn notification_source_label(source: NotificationSource) -> &'static str {
pub fn alert_source_label(source: AlertSource) -> &'static str {
match source {
NotificationSource::Pod => "pod",
NotificationSource::Worker => "worker",
NotificationSource::Compactor => "compactor",
NotificationSource::AgentsMd => "AGENTS.md",
AlertSource::Pod => "pod",
AlertSource::Worker => "worker",
AlertSource::Compactor => "compactor",
AlertSource::AgentsMd => "AGENTS.md",
}
}

View File

@ -7,7 +7,7 @@
#![allow(dead_code)] // Phase 5 will consume `output` in detail mode.
use protocol::{Greeting, NotificationLevel, NotificationSource};
use protocol::{Greeting, AlertLevel, AlertSource};
pub enum Block {
Greeting(Greeting),
@ -21,9 +21,9 @@ pub enum Block {
text: String,
},
ToolCall(ToolCallBlock),
Notification {
level: NotificationLevel,
source: NotificationSource,
Alert {
level: AlertLevel,
source: AlertSource,
message: String,
},
Compact(CompactEvent),

View File

@ -20,9 +20,9 @@ use ratatui::text::{Line, Span};
use ratatui::widgets::{Block as UiBlock, BorderType, Borders, Padding, Paragraph, Widget, Wrap};
use unicode_width::{UnicodeWidthChar, UnicodeWidthStr};
use protocol::{Greeting, NotificationLevel};
use protocol::{Greeting, AlertLevel};
use crate::app::{App, fmt_tokens, notification_source_label};
use crate::app::{App, fmt_tokens, alert_source_label};
use crate::block::{Block, CompactEvent};
/// Display density for the history view.
@ -313,20 +313,20 @@ fn render_block_into(
// ToolCall is dispatched in `compute_history` via `tool::render_tool`
// so it can consume multiple adjacent blocks (Read aggregation).
Block::ToolCall(_) => unreachable!("ToolCall handled by compute_history"),
Block::Notification {
Block::Alert {
level,
source,
message,
} => {
let kind = match level {
NotificationLevel::Warn => MessageKind::NoticeWarn,
NotificationLevel::Error => MessageKind::NoticeError,
AlertLevel::Warn => MessageKind::NoticeWarn,
AlertLevel::Error => MessageKind::NoticeError,
};
let prefix = match level {
NotificationLevel::Warn => "[notice]",
NotificationLevel::Error => "[notice error]",
AlertLevel::Warn => "[notice]",
AlertLevel::Error => "[notice error]",
};
let label = notification_source_label(*source);
let label = alert_source_label(*source);
let text = format!("{prefix} {label}: {message}");
match mode {
Mode::Overview => push_overview_line(lines, &text, width, kind, ""),

View File

@ -0,0 +1,70 @@
# Notification 用語のリネーム
## 背景
Pod 周辺に二系統の「notification」があり、用語が衝突して読みづらい:
- **outbound** (Pod → Client): `Event::Notification` / `Notifier::notify` — ユーザー向けの運用診断compaction 失敗、tool 出力 truncated 等)
- **inbound** (External → Pod): `Method::Notify` / `NotificationBuffer` — 外部から Pod の LLM context に「メモを置く」経路
両者とも `Notification` / `Notify` を共有しており、コードを読む側が毎回どちらの経路かを文脈から判別する必要がある。
`Method::Notify` の動詞名は「呼び出し側がやる行為」として意味的に正しいので残し、**outbound 側の名詞**を `Alert` に倒して非対称を入れることで区別する。
## 要件
### outbound (Pod → Client) — `Alert` にリネーム
protocol:
- `Event::Notification(Notification)``Event::Alert(Alert)`
- `protocol::Notification` 構造体 → `protocol::Alert`
- `NotificationLevel``AlertLevel`
- `NotificationSource``AlertSource`
pod:
- `crates/pod/src/ipc/notifier.rs``alerter.rs`
- `Notifier``Alerter`
- `Notifier::notify(...)``Alerter::alert(...)`
- `Notifier::subscribe_with_snapshot()` の戻り値 `Vec<Notification>``Vec<Alert>`
tui:
- `Block::Notification``Block::Alert`(描画スタイルは変更しない)
### inbound (External → Pod) — `Notify` 据置 + 衛生整理
- `Method::Notify` 据置(動詞として正しい)
- `NotificationBuffer``NotifyBuffer`(メソッド名と揃える)
- `PendingNotification``PendingNotify`
- `notify_wrapper` プロンプト名 据置
- LLM 向け wrapper ラベル `[Notification]` 据置LLM への見え方は変えない)
### protocol wire 互換
開発初期段階のため wire 互換は意図的に切る。client / pod を同時更新する前提で、旧名を吸収する `#[serde(rename)]` 等は入れない。
## 範囲外
- 通知の content / level / source の意味論変更(純粋な rename のみ)
- `Notifier` / `NotificationBuffer` の buffer 容量や snapshot 挙動の変更
- TUI 側 `Block::Alert` の描画スタイルの変更
## 完了条件
- 上記 rename がすべて反映されている
- ビルドが通り、既存テストが通る
- grep で旧名outbound 文脈の `Notification` / `Notifier` / 動詞 `notify`)が消えている
## 参照
- `crates/protocol/src/lib.rs``Event::Notification`, `Notification` struct, `NotificationLevel/Source`
- `crates/pod/src/ipc/notifier.rs`outbound、`Notifier`
- `crates/pod/src/ipc/notification_buffer.rs`inbound、`NotificationBuffer`, `PendingNotification`
- `crates/tui/``Block::Notification` 描画)
## Review
- 状態: Approve with follow-up
- レビュー詳細: [./notification-naming-cleanup.review.md](./notification-naming-cleanup.review.md)
- 日付: 2026-04-26

View File

@ -0,0 +1,42 @@
# Review: Notification 用語のリネーム
## 前提・要件の確認
### outbound (Pod → Client) — Alert 系
- `Event::Notification(Notification)``Event::Alert(Alert)`: `crates/protocol/src/lib.rs:151`, `crates/protocol/src/lib.rs:178-185` で完了。
- `NotificationLevel/Source``AlertLevel/AlertSource`: `crates/protocol/src/lib.rs:187-201` で完了。serde の `rename_all = "snake_case"` も維持されており wire 名は `warn` / `error` / `pod` / `worker` / `compactor` / `agents_md` のまま (元と一致)。
- `notifier.rs``alerter.rs`、`Notifier` → `Alerter`、`notify()` → `alert()`: `crates/pod/src/ipc/alerter.rs:26-87` で完了。`subscribe_with_snapshot` の戻り値も `(Vec<Alert>, …)` (l.78)。
- `Block::Notification``Block::Alert`: `crates/tui/src/block.rs:24-28`。レンダリング側 (`crates/tui/src/ui.rs:316-335`) も match arm が更新されており、style (`MessageKind::NoticeWarn/Error` と prefix `[notice]` / `[notice error]`) は据置 — 範囲外要件を尊重している。
- イベント名のテスト: `event_alert_format` (`crates/protocol/src/lib.rs:464-479`) で `event=alert` の wire 形を確認。
### inbound (External → Pod) — Notify 据置 + 衛生整理
- `Method::Notify` 据置: `crates/protocol/src/lib.rs:18` 健在、`method_notify_json_roundtrip` (l.343) も保持。
- `notification_buffer.rs``notify_buffer.rs`、`NotificationBuffer/PendingNotification` → `NotifyBuffer/PendingNotify`: `crates/pod/src/ipc/notify_buffer.rs:20-67`
- `format_notification``format_notify`: 同 l.74。
- `notify_wrapper` プロンプト名と `[Notification]` ラベル据置: `crates/pod/src/ipc/notify_buffer.rs:78,119` および `crates/pod/src/prompt/catalog.rs:447,484` の test asserts、`crates/pod/tests/controller_test.rs:413` の test も `[Notification]` を期待 — 範囲外要件と整合。
### protocol wire 互換
- 旧 `notification` event 名を吸収する `#[serde(rename)]` 等は入っていない (`crates/protocol/src/lib.rs:151` の `Alert(Alert)``rename_all = "snake_case"` 経由で wire 上 `event=alert` に変わる)。チケットの「wire 互換は意図的に切る」方針通り。
### 完了条件
- ビルド通過 (cargo build): pod / protocol / tui で warning は既存の `end_scope` のみ、本変更に起因しない。
- テスト: `cargo test -p pod -p protocol -p tui` で 205 件 PASS (ユニット+統合+doc 含む。0 fail / 0 ignored)。
- grep: `Notifier|NotificationLevel|NotificationSource|NotificationBuffer|PendingNotification|notification_buffer|push_notification|attach_notifier|notifier` がコード上で 0 ヒット。残る `Notification` 一致は LLM 向け wrapper ラベル `[Notification]` (据置) と inbound 概念を指す散在コメントのみ。
## アーキテクチャ・スコープ
- レイヤ境界尊重: 変更は protocol / pod (ipc, controller, pod 本体) / tui に限定。llm-worker は触れていない。
- 命名方針: `Alerter` / `alert()` の対 `Notify` (動詞) という非対称設計はチケット背景と一致しており、用語衝突を解消する最小限の手当てになっている。
- 不必要な変更の有無: outbound 一式の rename と inbound 側の衛生 (`NotifyBuffer` / `PendingNotify` / `format_notify`) は要件に明記されている範囲のみ。`pending_notifications` フィールド名 → `pending_notifies``push_notification``push_notify` まで踏み込んだのは、フィールド型自体が `NotifyBuffer` / `PendingNotify` に変わっていることを踏まえると naming consistency 上妥当。`run_for_notification` メソッド名は `Method::Notify` 由来の "notify-driven run" を表すので残しても問題ない (今回触っていない)。
- ファイル rename: ticket 通り `notifier.rs``alerter.rs`、`notification_buffer.rs` → `notify_buffer.rs` の 2 つだけで、過剰な移動なし。`mv` を使った旨 (history 保全) も妥当。
## 指摘事項
### Non-blocking / Follow-up
- `crates/pod/src/controller.rs:49` の doc comment が `Thin wrapper over `Alerter::notify`` のまま (実体は `Alerter::alert`)。1 行修正で解消可能。
### Nits
- `crates/protocol/src/lib.rs:39` の "via the notification buffer" や `crates/pod/src/controller.rs:255,258,346,474,480,657` の "notification buffer" 等の散在コメントは、文脈上 inbound (Notify) の話なので意味は通る。ただし型名が `NotifyBuffer` に揃ったので "notify buffer" に揃えると読み手のイズが減る。ticket の「衛生整理」を保守的に解釈すれば任意項目。
- `crates/pod/src/pod.rs:91,330,363,365,372,581,585`、`crates/pod/src/ipc/interceptor.rs:4,43,46`、`crates/pod/src/ipc/event.rs:10`、`crates/pod/src/prompt/agents_md.rs:23` 等の doc comment の "notification" も同様 — inbound 側は "notify" 動詞・"alert" 名詞 (outbound) の使い分けが定義されたので、コメントもこの軸で揃え直すと表面の用語衝突がさらに減る。本リネームの影響範囲外と見るなら据置で問題ない。
## 判断
Approve with follow-up — 要件は完全に満たされ、ビルド/テストもクリーン。`controller.rs:49` の doc comment 修正は trivial なので、別チケット化せず本作業の最後に直しておくのが望ましい。コメント表面の用語ノイズは Nits で残るが、ticket の rename スコープ内で必要な分は果たされている。

View File

@ -47,15 +47,16 @@ resolver の trait 化と memory / workflow 用 resolver 実装は別チケッ
text しか作れない client が引き続き存在しても良いことを protocol 仕様に明記する(`vec![Segment::Text(_)]` のみで動く)。
### unknown variant / 未登録 resolver の扱い(要決定)
### unknown variant / 未登録 resolver の扱い
新 variant を持つ client が古い Pod に投げる、または resolver 未登録の variant を Pod が受けるケースの挙動を本チケットで決める。候補:
新 variant を持つ client が古い Pod に投げる、または resolver 未登録の variant を Pod が受けた場合は **2 経路に同時に流す**:
- (a) 黙って drop — 静かに情報が落ちて user/LLM 双方が気付けない
- (b) `[unknown input: kind=foo]` 相当の placeholder を LLM context に差し込み、LLM が気づいて指摘できる
- (c) hard error で submit 拒否
1. **LLM context**`[unknown input: kind=foo]` 相当の placeholder を `Item::system_message` で差し込む。LLM が「ユーザーは何かを送ろうとしたが Pod が解釈できなかった」と気づき、ユーザーに聞き返せる状態を作る。
2. **ユーザー向け通知チャネル**`Event::Alert` / リネーム前は `Event::Notification`にも同時に送る。client 側で「このサーバはこの input type を解釈できません」とユーザーに直接出せる。
(b) を仮の第一候補として実装方針を詰める。serde 側は unknown variant を吸収する形を初期から入れる。
両経路に流すことで「user も LLM も気付けない silent drop」を避ける。serde 側は unknown variant を吸収する形(`#[serde(other)]` 相当)を初期から入れる。
`Event::Alert` への rename は `tickets/notification-naming-cleanup.md` で扱う。本チケットの実装時点での名称(`Event::Notification` か `Event::Alert` か)はその時の repo 状態に従う。
## 範囲外