Method::NotifyとEvent::Notificationが紛らわしい問題
This commit is contained in:
parent
82f08b966b
commit
2ee536ed71
1
TODO.md
1
TODO.md
|
|
@ -11,6 +11,7 @@
|
||||||
- [ ] サブミット入力
|
- [ ] サブミット入力
|
||||||
- [ ] protocol Segment 化 → [tickets/submit-segment-protocol.md](tickets/submit-segment-protocol.md)
|
- [ ] protocol Segment 化 → [tickets/submit-segment-protocol.md](tickets/submit-segment-protocol.md)
|
||||||
- [ ] TUI 補完 + 型付き atom 化 → [tickets/submit-tui-completion.md](tickets/submit-tui-completion.md)
|
- [ ] TUI 補完 + 型付き atom 化 → [tickets/submit-tui-completion.md](tickets/submit-tui-completion.md)
|
||||||
|
- [ ] Notification 用語リネーム (Event::Alert / Method::Notify) → [tickets/notification-naming-cleanup.md](tickets/notification-naming-cleanup.md)
|
||||||
- [ ] メモリ機構
|
- [ ] メモリ機構
|
||||||
- [ ] ファイル形式 + Linter 土台 → [tickets/memory-file-format.md](tickets/memory-file-format.md)
|
- [ ] ファイル形式 + Linter 土台 → [tickets/memory-file-format.md](tickets/memory-file-format.md)
|
||||||
- [ ] memory / Knowledge 検索ツール → [tickets/memory-search-tools.md](tickets/memory-search-tools.md)
|
- [ ] memory / Knowledge 検索ツール → [tickets/memory-search-tools.md](tickets/memory-search-tools.md)
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,8 @@ use llm_worker::llm_client::client::LlmClient;
|
||||||
use session_store::Store;
|
use session_store::Store;
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
|
|
||||||
use crate::ipc::notification_buffer::NotificationBuffer;
|
use crate::ipc::alerter::Alerter;
|
||||||
use crate::ipc::notifier::Notifier;
|
use crate::ipc::notify_buffer::NotifyBuffer;
|
||||||
use crate::pod::{Pod, PodError, PodRunResult};
|
use crate::pod::{Pod, PodError, PodRunResult};
|
||||||
use crate::spawn::comm_tools::{
|
use crate::spawn::comm_tools::{
|
||||||
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
|
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
|
||||||
|
|
@ -17,7 +17,7 @@ use crate::shared_state::{PodSharedState, PodStatus};
|
||||||
use crate::ipc::server::SocketServer;
|
use crate::ipc::server::SocketServer;
|
||||||
use crate::spawn::tool::spawn_pod_tool;
|
use crate::spawn::tool::spawn_pod_tool;
|
||||||
use crate::spawn::registry::SpawnedPodRegistry;
|
use crate::spawn::registry::SpawnedPodRegistry;
|
||||||
use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult};
|
use protocol::{ErrorCode, Event, Method, AlertLevel, AlertSource, RunResult, TurnResult};
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// PodHandle — client-facing, Clone-able
|
// PodHandle — client-facing, Clone-able
|
||||||
|
|
@ -29,7 +29,7 @@ pub struct PodHandle {
|
||||||
event_tx: broadcast::Sender<Event>,
|
event_tx: broadcast::Sender<Event>,
|
||||||
pub shared_state: Arc<PodSharedState>,
|
pub shared_state: Arc<PodSharedState>,
|
||||||
pub runtime_dir: Arc<RuntimeDir>,
|
pub runtime_dir: Arc<RuntimeDir>,
|
||||||
pub notifier: Notifier,
|
pub alerter: Alerter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PodHandle {
|
impl PodHandle {
|
||||||
|
|
@ -46,9 +46,9 @@ impl PodHandle {
|
||||||
self.event_tx.send(event)
|
self.event_tx.send(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Emit a user-facing notification. Thin wrapper over `Notifier::notify`.
|
/// Emit a user-facing alert. Thin wrapper over `Alerter::alert`.
|
||||||
pub fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) {
|
pub fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
|
||||||
self.notifier.notify(level, source, message);
|
self.alerter.alert(level, source, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -72,7 +72,7 @@ impl PodController {
|
||||||
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||||
let (method_tx, mut method_rx) = mpsc::channel::<Method>(32);
|
let (method_tx, mut method_rx) = mpsc::channel::<Method>(32);
|
||||||
let (event_tx, _) = broadcast::channel::<Event>(256);
|
let (event_tx, _) = broadcast::channel::<Event>(256);
|
||||||
let notifier = Notifier::new(event_tx.clone());
|
let alerter = Alerter::new(event_tx.clone());
|
||||||
|
|
||||||
let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default();
|
let manifest_toml = toml::to_string_pretty(pod.manifest()).unwrap_or_default();
|
||||||
let greeting = build_greeting(&pod);
|
let greeting = build_greeting(&pod);
|
||||||
|
|
@ -95,13 +95,13 @@ impl PodController {
|
||||||
event_tx: event_tx.clone(),
|
event_tx: event_tx.clone(),
|
||||||
shared_state: shared_state.clone(),
|
shared_state: shared_state.clone(),
|
||||||
runtime_dir: runtime_dir.clone(),
|
runtime_dir: runtime_dir.clone(),
|
||||||
notifier: notifier.clone(),
|
alerter: alerter.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Hand the notifier to the Pod so internal operations (compaction,
|
// Hand the alerter to the Pod so internal operations (compaction,
|
||||||
// AGENTS.md ingestion during the first turn) can emit user-facing
|
// AGENTS.md ingestion during the first turn) can emit user-facing
|
||||||
// notifications on the same channel.
|
// notifications on the same channel.
|
||||||
pod.attach_notifier(notifier.clone());
|
pod.attach_alerter(alerter.clone());
|
||||||
// Also hand the raw broadcast sender so Pod-internal operations
|
// Also hand the raw broadcast sender so Pod-internal operations
|
||||||
// can emit typed lifecycle `Event`s (currently: compact progress).
|
// can emit typed lifecycle `Event`s (currently: compact progress).
|
||||||
pod.attach_event_tx(event_tx.clone());
|
pod.attach_event_tx(event_tx.clone());
|
||||||
|
|
@ -212,11 +212,11 @@ impl PodController {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
let notifier_for_worker = notifier.clone();
|
let alerter_for_worker = alerter.clone();
|
||||||
worker.on_warning(move |message| {
|
worker.on_warning(move |message| {
|
||||||
notifier_for_worker.notify(
|
alerter_for_worker.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::Worker,
|
AlertSource::Worker,
|
||||||
message.to_owned(),
|
message.to_owned(),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
@ -257,7 +257,7 @@ impl PodController {
|
||||||
// `Method::Notify` into the buffer even while `pod` is held by
|
// `Method::Notify` into the buffer even while `pod` is held by
|
||||||
// an in-flight `run_for_notification` / `run` future.
|
// an in-flight `run_for_notification` / `run` future.
|
||||||
let cancel_tx = pod.worker_mut().cancel_sender();
|
let cancel_tx = pod.worker_mut().cancel_sender();
|
||||||
let notification_buffer = pod.notification_buffer_handle();
|
let notify_buffer = pod.notify_buffer_handle();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Hold socket server alive for the lifetime of the controller task
|
// Hold socket server alive for the lifetime of the controller task
|
||||||
|
|
@ -303,7 +303,7 @@ impl PodController {
|
||||||
&event_tx,
|
&event_tx,
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ify_buffer,
|
||||||
self_parent_socket.as_ref(),
|
self_parent_socket.as_ref(),
|
||||||
&spawner_name,
|
&spawner_name,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
|
|
@ -313,9 +313,9 @@ impl PodController {
|
||||||
if new_status == PodStatus::Idle {
|
if new_status == PodStatus::Idle {
|
||||||
if let Err(e) = pod.try_post_run_compact().await {
|
if let Err(e) = pod.try_post_run_compact().await {
|
||||||
tracing::warn!(error = %e, "Post-run compaction error");
|
tracing::warn!(error = %e, "Post-run compaction error");
|
||||||
notifier.notify(
|
alerter.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::Compactor,
|
AlertSource::Compactor,
|
||||||
format!("post-run compaction error: {e}"),
|
format!("post-run compaction error: {e}"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -334,7 +334,7 @@ impl PodController {
|
||||||
}
|
}
|
||||||
|
|
||||||
Method::Notify { message } => {
|
Method::Notify { message } => {
|
||||||
pod.push_notification(message);
|
pod.push_notify(message);
|
||||||
if shared_state.get_status() != PodStatus::Idle {
|
if shared_state.get_status() != PodStatus::Idle {
|
||||||
// RUNNING / Paused: the buffer push is the
|
// RUNNING / Paused: the buffer push is the
|
||||||
// entire operation; the in-flight turn (or
|
// entire operation; the in-flight turn (or
|
||||||
|
|
@ -353,7 +353,7 @@ impl PodController {
|
||||||
&event_tx,
|
&event_tx,
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ify_buffer,
|
||||||
self_parent_socket.as_ref(),
|
self_parent_socket.as_ref(),
|
||||||
&spawner_name,
|
&spawner_name,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
|
|
@ -363,9 +363,9 @@ impl PodController {
|
||||||
if new_status == PodStatus::Idle {
|
if new_status == PodStatus::Idle {
|
||||||
if let Err(e) = pod.try_post_run_compact().await {
|
if let Err(e) = pod.try_post_run_compact().await {
|
||||||
tracing::warn!(error = %e, "Post-run compaction error");
|
tracing::warn!(error = %e, "Post-run compaction error");
|
||||||
notifier.notify(
|
alerter.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::Compactor,
|
AlertSource::Compactor,
|
||||||
format!("post-run compaction error: {e}"),
|
format!("post-run compaction error: {e}"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -400,7 +400,7 @@ impl PodController {
|
||||||
&event_tx,
|
&event_tx,
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ify_buffer,
|
||||||
self_parent_socket.as_ref(),
|
self_parent_socket.as_ref(),
|
||||||
&spawner_name,
|
&spawner_name,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
|
|
@ -410,9 +410,9 @@ impl PodController {
|
||||||
if new_status == PodStatus::Idle {
|
if new_status == PodStatus::Idle {
|
||||||
if let Err(e) = pod.try_post_run_compact().await {
|
if let Err(e) = pod.try_post_run_compact().await {
|
||||||
tracing::warn!(error = %e, "Post-run compaction error");
|
tracing::warn!(error = %e, "Post-run compaction error");
|
||||||
notifier.notify(
|
alerter.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::Compactor,
|
AlertSource::Compactor,
|
||||||
format!("post-run compaction error: {e}"),
|
format!("post-run compaction error: {e}"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -475,7 +475,7 @@ impl PodController {
|
||||||
// request will inject it as a system message
|
// request will inject it as a system message
|
||||||
// via `PodInterceptor::pre_llm_request`.
|
// via `PodInterceptor::pre_llm_request`.
|
||||||
let text = crate::ipc::event::render_event(&event);
|
let text = crate::ipc::event::render_event(&event);
|
||||||
pod.push_notification(text);
|
pod.push_notify(text);
|
||||||
// Auto-kick a turn if the Pod is idle so the
|
// Auto-kick a turn if the Pod is idle so the
|
||||||
// notification is not stranded. Matches the
|
// notification is not stranded. Matches the
|
||||||
// `Method::Notify` idle path.
|
// `Method::Notify` idle path.
|
||||||
|
|
@ -489,7 +489,7 @@ impl PodController {
|
||||||
&event_tx,
|
&event_tx,
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ify_buffer,
|
||||||
self_parent_socket.as_ref(),
|
self_parent_socket.as_ref(),
|
||||||
&spawner_name,
|
&spawner_name,
|
||||||
&spawned_registry,
|
&spawned_registry,
|
||||||
|
|
@ -499,9 +499,9 @@ impl PodController {
|
||||||
if new_status == PodStatus::Idle {
|
if new_status == PodStatus::Idle {
|
||||||
if let Err(e) = pod.try_post_run_compact().await {
|
if let Err(e) = pod.try_post_run_compact().await {
|
||||||
tracing::warn!(error = %e, "Post-run compaction error");
|
tracing::warn!(error = %e, "Post-run compaction error");
|
||||||
notifier.notify(
|
alerter.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::Compactor,
|
AlertSource::Compactor,
|
||||||
format!("post-run compaction error: {e}"),
|
format!("post-run compaction error: {e}"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -563,7 +563,7 @@ async fn run_with_cancel_support<F>(
|
||||||
event_tx: &broadcast::Sender<Event>,
|
event_tx: &broadcast::Sender<Event>,
|
||||||
cancel_tx: &mpsc::Sender<()>,
|
cancel_tx: &mpsc::Sender<()>,
|
||||||
shared_state: &Arc<PodSharedState>,
|
shared_state: &Arc<PodSharedState>,
|
||||||
notification_buffer: &NotificationBuffer,
|
notify_buffer: &NotifyBuffer,
|
||||||
parent_socket: Option<&std::path::PathBuf>,
|
parent_socket: Option<&std::path::PathBuf>,
|
||||||
self_name: &str,
|
self_name: &str,
|
||||||
spawned_registry: &Arc<SpawnedPodRegistry>,
|
spawned_registry: &Arc<SpawnedPodRegistry>,
|
||||||
|
|
@ -645,7 +645,7 @@ where
|
||||||
Some(Method::Notify { message }) => {
|
Some(Method::Notify { message }) => {
|
||||||
// Route into the buffer; the in-flight turn will
|
// Route into the buffer; the in-flight turn will
|
||||||
// drain it at its next pre_llm_request.
|
// drain it at its next pre_llm_request.
|
||||||
notification_buffer.push(message);
|
notify_buffer.push(message);
|
||||||
}
|
}
|
||||||
Some(Method::GetHistory) => {}
|
Some(Method::GetHistory) => {}
|
||||||
Some(Method::PodEvent(event)) => {
|
Some(Method::PodEvent(event)) => {
|
||||||
|
|
@ -664,7 +664,7 @@ where
|
||||||
&self_parent_socket,
|
&self_parent_socket,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
notification_buffer.push(crate::ipc::event::render_event(&event));
|
notify_buffer.push(crate::ipc::event::render_event(&event));
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let _ = cancel_tx.try_send(());
|
let _ = cancel_tx.try_send(());
|
||||||
|
|
|
||||||
168
crates/pod/src/ipc/alerter.rs
Normal file
168
crates/pod/src/ipc/alerter.rs
Normal file
|
|
@ -0,0 +1,168 @@
|
||||||
|
//! User-facing alert channel for Pod → client.
|
||||||
|
//!
|
||||||
|
//! Separate from `tracing` (which is for developer logs). Alerts
|
||||||
|
//! are short human-readable messages the Pod layer wants a client to
|
||||||
|
//! see — for example "compaction failed", "tool output truncated".
|
||||||
|
//!
|
||||||
|
//! Each alert is broadcast on the shared `Event` channel and
|
||||||
|
//! also appended to an in-memory buffer so that clients connecting
|
||||||
|
//! after the fact still see everything emitted during the session.
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
use protocol::{Alert, AlertLevel, AlertSource, Event};
|
||||||
|
|
||||||
|
/// Upper bound on buffered alerts. When exceeded, the oldest
|
||||||
|
/// entries are discarded so a long-running session cannot leak
|
||||||
|
/// memory through a pathological loop of recurring alerts
|
||||||
|
/// (e.g. compaction failing every turn).
|
||||||
|
const MAX_BUFFERED_ALERTS: usize = 512;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Alerter {
|
||||||
|
inner: Arc<Inner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Inner {
|
||||||
|
event_tx: broadcast::Sender<Event>,
|
||||||
|
buffer: Mutex<VecDeque<Alert>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Alerter {
|
||||||
|
pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(Inner {
|
||||||
|
event_tx,
|
||||||
|
buffer: Mutex::new(VecDeque::with_capacity(MAX_BUFFERED_ALERTS)),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record and broadcast an alert.
|
||||||
|
///
|
||||||
|
/// The broadcast may have no subscribers (e.g. during Pod
|
||||||
|
/// construction before any client has connected); the buffer
|
||||||
|
/// guarantees the message is still delivered once a client
|
||||||
|
/// attaches.
|
||||||
|
///
|
||||||
|
/// The buffer mutex is held across `broadcast::send` to make
|
||||||
|
/// `subscribe_with_snapshot` race-free — a client that snapshots
|
||||||
|
/// the buffer while holding the same lock sees every alert
|
||||||
|
/// exactly once: older ones from the snapshot, newer ones from
|
||||||
|
/// the freshly-subscribed receiver.
|
||||||
|
pub fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
|
||||||
|
let alert = Alert {
|
||||||
|
level,
|
||||||
|
source,
|
||||||
|
message,
|
||||||
|
timestamp_ms: now_ms(),
|
||||||
|
};
|
||||||
|
if let Ok(mut buf) = self.inner.buffer.lock() {
|
||||||
|
if buf.len() >= MAX_BUFFERED_ALERTS {
|
||||||
|
buf.pop_front();
|
||||||
|
}
|
||||||
|
buf.push_back(alert.clone());
|
||||||
|
let _ = self.inner.event_tx.send(Event::Alert(alert));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe and atomically snapshot the current buffer.
|
||||||
|
///
|
||||||
|
/// The returned snapshot contains alerts emitted before
|
||||||
|
/// this call; the receiver will deliver alerts emitted
|
||||||
|
/// after. An alert cannot appear in both.
|
||||||
|
pub fn subscribe_with_snapshot(&self) -> (Vec<Alert>, broadcast::Receiver<Event>) {
|
||||||
|
let buf = self
|
||||||
|
.inner
|
||||||
|
.buffer
|
||||||
|
.lock()
|
||||||
|
.expect("alerter buffer mutex poisoned");
|
||||||
|
let rx = self.inner.event_tx.subscribe();
|
||||||
|
let snapshot: Vec<Alert> = buf.iter().cloned().collect();
|
||||||
|
(snapshot, rx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now_ms() -> i64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_millis() as i64)
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn alert_broadcasts_to_existing_subscriber() {
|
||||||
|
let (tx, _keep) = broadcast::channel::<Event>(8);
|
||||||
|
let alerter = Alerter::new(tx);
|
||||||
|
let (_snapshot, mut rx) = alerter.subscribe_with_snapshot();
|
||||||
|
|
||||||
|
alerter.alert(
|
||||||
|
AlertLevel::Warn,
|
||||||
|
AlertSource::Compactor,
|
||||||
|
"test message".into(),
|
||||||
|
);
|
||||||
|
|
||||||
|
match rx.try_recv() {
|
||||||
|
Ok(Event::Alert(a)) => assert_eq!(a.message, "test message"),
|
||||||
|
other => panic!("unexpected event: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn late_subscriber_sees_earlier_alerts_via_snapshot() {
|
||||||
|
let (tx, _keep) = broadcast::channel::<Event>(8);
|
||||||
|
let alerter = Alerter::new(tx);
|
||||||
|
|
||||||
|
alerter.alert(AlertLevel::Error, AlertSource::Pod, "first".into());
|
||||||
|
alerter.alert(AlertLevel::Warn, AlertSource::AgentsMd, "second".into());
|
||||||
|
|
||||||
|
let (snapshot, mut rx) = alerter.subscribe_with_snapshot();
|
||||||
|
assert_eq!(snapshot.len(), 2);
|
||||||
|
assert_eq!(snapshot[0].message, "first");
|
||||||
|
assert_eq!(snapshot[1].message, "second");
|
||||||
|
assert!(rx.try_recv().is_err()); // nothing pending on the receiver
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn buffer_discards_oldest_past_cap() {
|
||||||
|
let (tx, _keep) = broadcast::channel::<Event>(1024);
|
||||||
|
let alerter = Alerter::new(tx);
|
||||||
|
|
||||||
|
for i in 0..(MAX_BUFFERED_ALERTS + 50) {
|
||||||
|
alerter.alert(AlertLevel::Warn, AlertSource::Worker, format!("msg-{i}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let (snapshot, _rx) = alerter.subscribe_with_snapshot();
|
||||||
|
assert_eq!(snapshot.len(), MAX_BUFFERED_ALERTS);
|
||||||
|
// First 50 were evicted; the oldest remaining is msg-50.
|
||||||
|
assert_eq!(snapshot.first().unwrap().message, "msg-50");
|
||||||
|
let last = format!("msg-{}", MAX_BUFFERED_ALERTS + 49);
|
||||||
|
assert_eq!(snapshot.last().unwrap().message, last);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn subscribe_snapshot_and_live_do_not_overlap() {
|
||||||
|
let (tx, _keep) = broadcast::channel::<Event>(8);
|
||||||
|
let alerter = Alerter::new(tx);
|
||||||
|
|
||||||
|
alerter.alert(AlertLevel::Warn, AlertSource::Worker, "historic".into());
|
||||||
|
let (snapshot, mut rx) = alerter.subscribe_with_snapshot();
|
||||||
|
alerter.alert(AlertLevel::Error, AlertSource::Worker, "live".into());
|
||||||
|
|
||||||
|
assert_eq!(snapshot.len(), 1);
|
||||||
|
assert_eq!(snapshot[0].message, "historic");
|
||||||
|
match rx.try_recv() {
|
||||||
|
Ok(Event::Alert(a)) => assert_eq!(a.message, "live"),
|
||||||
|
other => panic!("unexpected: {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(rx.try_recv().is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -25,7 +25,7 @@ use crate::hook::{
|
||||||
AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary,
|
AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary,
|
||||||
TurnEndInfo,
|
TurnEndInfo,
|
||||||
};
|
};
|
||||||
use crate::ipc::notification_buffer::{NotificationBuffer, format_notification};
|
use crate::ipc::notify_buffer::{NotifyBuffer, format_notify};
|
||||||
use crate::prompt::catalog::PromptCatalog;
|
use crate::prompt::catalog::PromptCatalog;
|
||||||
use crate::compact::token_counter::total_tokens_impl;
|
use crate::compact::token_counter::total_tokens_impl;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
@ -42,7 +42,7 @@ pub(crate) struct PodInterceptor {
|
||||||
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
|
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
|
||||||
/// Pending-notification buffer drained into the per-request
|
/// Pending-notification buffer drained into the per-request
|
||||||
/// context at the head of `pre_llm_request`.
|
/// context at the head of `pre_llm_request`.
|
||||||
pending_notifications: NotificationBuffer,
|
pending_notifies: NotifyBuffer,
|
||||||
/// Prompt catalog used to render the injected notification wrapper.
|
/// Prompt catalog used to render the injected notification wrapper.
|
||||||
prompts: Arc<PromptCatalog>,
|
prompts: Arc<PromptCatalog>,
|
||||||
/// Next turn index assigned by `on_prompt_submit`.
|
/// Next turn index assigned by `on_prompt_submit`.
|
||||||
|
|
@ -56,14 +56,14 @@ impl PodInterceptor {
|
||||||
registry: Arc<HookRegistry>,
|
registry: Arc<HookRegistry>,
|
||||||
compact_state: Option<Arc<CompactState>>,
|
compact_state: Option<Arc<CompactState>>,
|
||||||
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
|
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
|
||||||
pending_notifications: NotificationBuffer,
|
pending_notifies: NotifyBuffer,
|
||||||
prompts: Arc<PromptCatalog>,
|
prompts: Arc<PromptCatalog>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
registry,
|
registry,
|
||||||
compact_state,
|
compact_state,
|
||||||
usage_history,
|
usage_history,
|
||||||
pending_notifications,
|
pending_notifies,
|
||||||
prompts,
|
prompts,
|
||||||
next_turn_index: AtomicUsize::new(0),
|
next_turn_index: AtomicUsize::new(0),
|
||||||
tool_calls_this_turn: AtomicUsize::new(0),
|
tool_calls_this_turn: AtomicUsize::new(0),
|
||||||
|
|
@ -127,16 +127,16 @@ impl Interceptor for PodInterceptor {
|
||||||
// into the per-request context as transient system messages.
|
// into the per-request context as transient system messages.
|
||||||
// These are not persisted to the Worker history; they exist only
|
// These are not persisted to the Worker history; they exist only
|
||||||
// for this single LLM request.
|
// for this single LLM request.
|
||||||
for notification in self.pending_notifications.drain() {
|
for n in self.pending_notifies.drain() {
|
||||||
match format_notification(¬ification, &self.prompts) {
|
match format_notify(&n, &self.prompts) {
|
||||||
Ok(item) => context.push(item),
|
Ok(item) => context.push(item),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// A render failure here would starve the LLM of the
|
// A render failure here would starve the LLM of the
|
||||||
// notification text. Fall back to the raw message —
|
// notify text. Fall back to the raw message —
|
||||||
// it still carries the intent, just without the
|
// it still carries the intent, just without the
|
||||||
// wrapper phrasing.
|
// wrapper phrasing.
|
||||||
warn!(error = %e, "failed to render notify_wrapper; using raw message");
|
warn!(error = %e, "failed to render notify_wrapper; using raw message");
|
||||||
context.push(Item::system_message(notification.message.clone()));
|
context.push(Item::system_message(n.message.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -296,7 +296,7 @@ mod tests {
|
||||||
registry,
|
registry,
|
||||||
Some(state),
|
Some(state),
|
||||||
Some(history),
|
Some(history),
|
||||||
NotificationBuffer::new(),
|
NotifyBuffer::new(),
|
||||||
PromptCatalog::builtins_only().unwrap(),
|
PromptCatalog::builtins_only().unwrap(),
|
||||||
);
|
);
|
||||||
let mut ctx = ctx_items;
|
let mut ctx = ctx_items;
|
||||||
|
|
@ -320,7 +320,7 @@ mod tests {
|
||||||
registry,
|
registry,
|
||||||
Some(state),
|
Some(state),
|
||||||
Some(history),
|
Some(history),
|
||||||
NotificationBuffer::new(),
|
NotifyBuffer::new(),
|
||||||
PromptCatalog::builtins_only().unwrap(),
|
PromptCatalog::builtins_only().unwrap(),
|
||||||
);
|
);
|
||||||
let mut ctx = ctx_items;
|
let mut ctx = ctx_items;
|
||||||
|
|
@ -345,7 +345,7 @@ mod tests {
|
||||||
registry,
|
registry,
|
||||||
Some(state),
|
Some(state),
|
||||||
Some(history),
|
Some(history),
|
||||||
NotificationBuffer::new(),
|
NotifyBuffer::new(),
|
||||||
PromptCatalog::builtins_only().unwrap(),
|
PromptCatalog::builtins_only().unwrap(),
|
||||||
);
|
);
|
||||||
let mut ctx = ctx_items;
|
let mut ctx = ctx_items;
|
||||||
|
|
@ -364,7 +364,7 @@ mod tests {
|
||||||
registry,
|
registry,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
NotificationBuffer::new(),
|
NotifyBuffer::new(),
|
||||||
PromptCatalog::builtins_only().unwrap(),
|
PromptCatalog::builtins_only().unwrap(),
|
||||||
);
|
);
|
||||||
let mut ctx: Vec<Item> = Vec::new();
|
let mut ctx: Vec<Item> = Vec::new();
|
||||||
|
|
@ -385,9 +385,9 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn pre_llm_request_drains_pending_notifications_into_context() {
|
async fn pre_llm_request_drains_pending_notifies_into_context() {
|
||||||
let registry = Arc::new(HookRegistryBuilder::new().build());
|
let registry = Arc::new(HookRegistryBuilder::new().build());
|
||||||
let buffer = NotificationBuffer::new();
|
let buffer = NotifyBuffer::new();
|
||||||
buffer.push("first".into());
|
buffer.push("first".into());
|
||||||
buffer.push("second".into());
|
buffer.push("second".into());
|
||||||
|
|
||||||
|
|
@ -419,7 +419,7 @@ mod tests {
|
||||||
// When compaction yields, notifications remain in the buffer for
|
// When compaction yields, notifications remain in the buffer for
|
||||||
// the next pre_llm_request (after compaction + resume).
|
// the next pre_llm_request (after compaction + resume).
|
||||||
let registry = Arc::new(HookRegistryBuilder::new().build());
|
let registry = Arc::new(HookRegistryBuilder::new().build());
|
||||||
let buffer = NotificationBuffer::new();
|
let buffer = NotifyBuffer::new();
|
||||||
buffer.push("msg".into());
|
buffer.push("msg".into());
|
||||||
|
|
||||||
let state = Arc::new(CompactState::new(None, Some(100), 2));
|
let state = Arc::new(CompactState::new(None, Some(100), 2));
|
||||||
|
|
@ -455,7 +455,7 @@ mod tests {
|
||||||
registry,
|
registry,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
NotificationBuffer::new(),
|
NotifyBuffer::new(),
|
||||||
PromptCatalog::builtins_only().unwrap(),
|
PromptCatalog::builtins_only().unwrap(),
|
||||||
);
|
);
|
||||||
let mut ctx: Vec<Item> = Vec::new();
|
let mut ctx: Vec<Item> = Vec::new();
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
|
pub mod alerter;
|
||||||
pub mod event;
|
pub mod event;
|
||||||
pub mod notifier;
|
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
|
||||||
pub(crate) mod interceptor;
|
pub(crate) mod interceptor;
|
||||||
pub(crate) mod notification_buffer;
|
pub(crate) mod notify_buffer;
|
||||||
|
|
|
||||||
|
|
@ -1,191 +0,0 @@
|
||||||
//! User-facing notification channel for Pod → client.
|
|
||||||
//!
|
|
||||||
//! Separate from `tracing` (which is for developer logs). Notifications
|
|
||||||
//! are short human-readable messages the Pod layer wants a client to
|
|
||||||
//! see — for example "compaction failed", "tool output truncated".
|
|
||||||
//!
|
|
||||||
//! Each notification is broadcast on the shared `Event` channel and
|
|
||||||
//! also appended to an in-memory buffer so that clients connecting
|
|
||||||
//! after the fact still see everything emitted during the session.
|
|
||||||
|
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
|
||||||
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
|
|
||||||
use protocol::{Event, Notification, NotificationLevel, NotificationSource};
|
|
||||||
|
|
||||||
/// Upper bound on buffered notifications. When exceeded, the oldest
|
|
||||||
/// entries are discarded so a long-running session cannot leak
|
|
||||||
/// memory through a pathological loop of recurring notifications
|
|
||||||
/// (e.g. compaction failing every turn).
|
|
||||||
const MAX_BUFFERED_NOTIFICATIONS: usize = 512;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Notifier {
|
|
||||||
inner: Arc<Inner>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Inner {
|
|
||||||
event_tx: broadcast::Sender<Event>,
|
|
||||||
buffer: Mutex<VecDeque<Notification>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Notifier {
|
|
||||||
pub fn new(event_tx: broadcast::Sender<Event>) -> Self {
|
|
||||||
Self {
|
|
||||||
inner: Arc::new(Inner {
|
|
||||||
event_tx,
|
|
||||||
buffer: Mutex::new(VecDeque::with_capacity(MAX_BUFFERED_NOTIFICATIONS)),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Record and broadcast a notification.
|
|
||||||
///
|
|
||||||
/// The broadcast may have no subscribers (e.g. during Pod
|
|
||||||
/// construction before any client has connected); the buffer
|
|
||||||
/// guarantees the message is still delivered once a client
|
|
||||||
/// attaches.
|
|
||||||
///
|
|
||||||
/// The buffer mutex is held across `broadcast::send` to make
|
|
||||||
/// `subscribe_with_snapshot` race-free — a client that snapshots
|
|
||||||
/// the buffer while holding the same lock sees every notification
|
|
||||||
/// exactly once: older ones from the snapshot, newer ones from
|
|
||||||
/// the freshly-subscribed receiver.
|
|
||||||
pub fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) {
|
|
||||||
let notification = Notification {
|
|
||||||
level,
|
|
||||||
source,
|
|
||||||
message,
|
|
||||||
timestamp_ms: now_ms(),
|
|
||||||
};
|
|
||||||
if let Ok(mut buf) = self.inner.buffer.lock() {
|
|
||||||
if buf.len() >= MAX_BUFFERED_NOTIFICATIONS {
|
|
||||||
buf.pop_front();
|
|
||||||
}
|
|
||||||
buf.push_back(notification.clone());
|
|
||||||
let _ = self
|
|
||||||
.inner
|
|
||||||
.event_tx
|
|
||||||
.send(Event::Notification(notification));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Subscribe and atomically snapshot the current buffer.
|
|
||||||
///
|
|
||||||
/// The returned snapshot contains notifications emitted before
|
|
||||||
/// this call; the receiver will deliver notifications emitted
|
|
||||||
/// after. A notification cannot appear in both.
|
|
||||||
pub fn subscribe_with_snapshot(&self) -> (Vec<Notification>, broadcast::Receiver<Event>) {
|
|
||||||
let buf = self
|
|
||||||
.inner
|
|
||||||
.buffer
|
|
||||||
.lock()
|
|
||||||
.expect("notifier buffer mutex poisoned");
|
|
||||||
let rx = self.inner.event_tx.subscribe();
|
|
||||||
let snapshot: Vec<Notification> = buf.iter().cloned().collect();
|
|
||||||
(snapshot, rx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn now_ms() -> i64 {
|
|
||||||
SystemTime::now()
|
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.map(|d| d.as_millis() as i64)
|
|
||||||
.unwrap_or(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn notify_broadcasts_to_existing_subscriber() {
|
|
||||||
let (tx, _keep) = broadcast::channel::<Event>(8);
|
|
||||||
let notifier = Notifier::new(tx);
|
|
||||||
let (_snapshot, mut rx) = notifier.subscribe_with_snapshot();
|
|
||||||
|
|
||||||
notifier.notify(
|
|
||||||
NotificationLevel::Warn,
|
|
||||||
NotificationSource::Compactor,
|
|
||||||
"test message".into(),
|
|
||||||
);
|
|
||||||
|
|
||||||
match rx.try_recv() {
|
|
||||||
Ok(Event::Notification(n)) => assert_eq!(n.message, "test message"),
|
|
||||||
other => panic!("unexpected event: {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn late_subscriber_sees_earlier_notifications_via_snapshot() {
|
|
||||||
let (tx, _keep) = broadcast::channel::<Event>(8);
|
|
||||||
let notifier = Notifier::new(tx);
|
|
||||||
|
|
||||||
notifier.notify(
|
|
||||||
NotificationLevel::Error,
|
|
||||||
NotificationSource::Pod,
|
|
||||||
"first".into(),
|
|
||||||
);
|
|
||||||
notifier.notify(
|
|
||||||
NotificationLevel::Warn,
|
|
||||||
NotificationSource::AgentsMd,
|
|
||||||
"second".into(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (snapshot, mut rx) = notifier.subscribe_with_snapshot();
|
|
||||||
assert_eq!(snapshot.len(), 2);
|
|
||||||
assert_eq!(snapshot[0].message, "first");
|
|
||||||
assert_eq!(snapshot[1].message, "second");
|
|
||||||
assert!(rx.try_recv().is_err()); // nothing pending on the receiver
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn buffer_discards_oldest_past_cap() {
|
|
||||||
let (tx, _keep) = broadcast::channel::<Event>(1024);
|
|
||||||
let notifier = Notifier::new(tx);
|
|
||||||
|
|
||||||
for i in 0..(MAX_BUFFERED_NOTIFICATIONS + 50) {
|
|
||||||
notifier.notify(
|
|
||||||
NotificationLevel::Warn,
|
|
||||||
NotificationSource::Worker,
|
|
||||||
format!("msg-{i}"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (snapshot, _rx) = notifier.subscribe_with_snapshot();
|
|
||||||
assert_eq!(snapshot.len(), MAX_BUFFERED_NOTIFICATIONS);
|
|
||||||
// First 50 were evicted; the oldest remaining is msg-50.
|
|
||||||
assert_eq!(snapshot.first().unwrap().message, "msg-50");
|
|
||||||
let last = format!("msg-{}", MAX_BUFFERED_NOTIFICATIONS + 49);
|
|
||||||
assert_eq!(snapshot.last().unwrap().message, last);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn subscribe_snapshot_and_live_do_not_overlap() {
|
|
||||||
let (tx, _keep) = broadcast::channel::<Event>(8);
|
|
||||||
let notifier = Notifier::new(tx);
|
|
||||||
|
|
||||||
notifier.notify(
|
|
||||||
NotificationLevel::Warn,
|
|
||||||
NotificationSource::Worker,
|
|
||||||
"historic".into(),
|
|
||||||
);
|
|
||||||
let (snapshot, mut rx) = notifier.subscribe_with_snapshot();
|
|
||||||
notifier.notify(
|
|
||||||
NotificationLevel::Error,
|
|
||||||
NotificationSource::Worker,
|
|
||||||
"live".into(),
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(snapshot.len(), 1);
|
|
||||||
assert_eq!(snapshot[0].message, "historic");
|
|
||||||
match rx.try_recv() {
|
|
||||||
Ok(Event::Notification(n)) => assert_eq!(n.message, "live"),
|
|
||||||
other => panic!("unexpected: {other:?}"),
|
|
||||||
}
|
|
||||||
assert!(rx.try_recv().is_err());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
//! Pending-notification buffer for `Method::Notify`.
|
//! Pending-notify buffer for `Method::Notify`.
|
||||||
//!
|
//!
|
||||||
//! Notifications are queued here by the Controller and drained by
|
//! Notify entries are queued here by the Controller and drained by
|
||||||
//! `PodInterceptor::pre_llm_request` into the per-request context
|
//! `PodInterceptor::pre_llm_request` into the per-request context
|
||||||
//! (never into the Worker's persistent history). Each queued entry
|
//! (never into the Worker's persistent history). Each queued entry
|
||||||
//! becomes one `Item::system_message` in the outgoing request.
|
//! becomes one `Item::system_message` in the outgoing request.
|
||||||
|
|
@ -13,56 +13,53 @@ use tracing::warn;
|
||||||
|
|
||||||
use crate::prompt::catalog::{CatalogError, PromptCatalog};
|
use crate::prompt::catalog::{CatalogError, PromptCatalog};
|
||||||
|
|
||||||
/// Maximum queued notifications. Oldest entries are dropped beyond this.
|
/// Maximum queued notify entries. Oldest entries are dropped beyond this.
|
||||||
const CAPACITY: usize = 128;
|
const CAPACITY: usize = 128;
|
||||||
|
|
||||||
/// One pending notification awaiting injection into the next LLM request.
|
/// One pending notify entry awaiting injection into the next LLM request.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PendingNotification {
|
pub struct PendingNotify {
|
||||||
pub message: String,
|
pub message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shared, mutex-guarded buffer of pending notifications.
|
/// Shared, mutex-guarded buffer of pending notify entries.
|
||||||
///
|
///
|
||||||
/// Cloned between the Pod (producer) and PodInterceptor (consumer).
|
/// Cloned between the Pod (producer) and PodInterceptor (consumer).
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub struct NotificationBuffer {
|
pub struct NotifyBuffer {
|
||||||
inner: Arc<Mutex<VecDeque<PendingNotification>>>,
|
inner: Arc<Mutex<VecDeque<PendingNotify>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NotificationBuffer {
|
impl NotifyBuffer {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a notification onto the queue. If the queue is full, the
|
/// Push a notify entry onto the queue. If the queue is full, the
|
||||||
/// oldest entry is dropped and a `tracing::warn` is emitted — the
|
/// oldest entry is dropped and a `tracing::warn` is emitted — the
|
||||||
/// caller should never hit this in normal operation.
|
/// caller should never hit this in normal operation.
|
||||||
pub fn push(&self, message: String) {
|
pub fn push(&self, message: String) {
|
||||||
let mut q = self.inner.lock().expect("notification buffer poisoned");
|
let mut q = self.inner.lock().expect("notify buffer poisoned");
|
||||||
if q.len() >= CAPACITY {
|
if q.len() >= CAPACITY {
|
||||||
let dropped = q.pop_front();
|
let dropped = q.pop_front();
|
||||||
warn!(
|
warn!(
|
||||||
capacity = CAPACITY,
|
capacity = CAPACITY,
|
||||||
dropped_message = dropped.as_ref().map(|n| n.message.as_str()),
|
dropped_message = dropped.as_ref().map(|n| n.message.as_str()),
|
||||||
"notification buffer overflow; dropped oldest"
|
"notify buffer overflow; dropped oldest"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
q.push_back(PendingNotification { message });
|
q.push_back(PendingNotify { message });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove and return all pending notifications in FIFO order.
|
/// Remove and return all pending notify entries in FIFO order.
|
||||||
pub fn drain(&self) -> Vec<PendingNotification> {
|
pub fn drain(&self) -> Vec<PendingNotify> {
|
||||||
let mut q = self.inner.lock().expect("notification buffer poisoned");
|
let mut q = self.inner.lock().expect("notify buffer poisoned");
|
||||||
q.drain(..).collect()
|
q.drain(..).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of pending notifications. Primarily for tests.
|
/// Number of pending notify entries. Primarily for tests.
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.inner
|
self.inner.lock().expect("notify buffer poisoned").len()
|
||||||
.lock()
|
|
||||||
.expect("notification buffer poisoned")
|
|
||||||
.len()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
|
|
@ -70,12 +67,12 @@ impl NotificationBuffer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Format a single pending notification into the `Item::system_message`
|
/// Format a single pending notify entry into the `Item::system_message`
|
||||||
/// that gets injected into the per-request context. The wrapper body
|
/// that gets injected into the per-request context. The wrapper body
|
||||||
/// comes from `PodPrompt::NotifyWrapper` so the surrounding phrasing
|
/// comes from `PodPrompt::NotifyWrapper` so the surrounding phrasing
|
||||||
/// can be customised via a prompt pack (translation, tone, ...).
|
/// can be customised via a prompt pack (translation, tone, ...).
|
||||||
pub(crate) fn format_notification(
|
pub(crate) fn format_notify(
|
||||||
n: &PendingNotification,
|
n: &PendingNotify,
|
||||||
prompts: &PromptCatalog,
|
prompts: &PromptCatalog,
|
||||||
) -> Result<Item, CatalogError> {
|
) -> Result<Item, CatalogError> {
|
||||||
let text = prompts.notify_wrapper(&n.message)?;
|
let text = prompts.notify_wrapper(&n.message)?;
|
||||||
|
|
@ -88,7 +85,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_then_drain_preserves_order() {
|
fn push_then_drain_preserves_order() {
|
||||||
let buf = NotificationBuffer::new();
|
let buf = NotifyBuffer::new();
|
||||||
buf.push("one".into());
|
buf.push("one".into());
|
||||||
buf.push("two".into());
|
buf.push("two".into());
|
||||||
let drained = buf.drain();
|
let drained = buf.drain();
|
||||||
|
|
@ -100,7 +97,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn capacity_drops_oldest() {
|
fn capacity_drops_oldest() {
|
||||||
let buf = NotificationBuffer::new();
|
let buf = NotifyBuffer::new();
|
||||||
for i in 0..(CAPACITY + 5) {
|
for i in 0..(CAPACITY + 5) {
|
||||||
buf.push(format!("msg{i}"));
|
buf.push(format!("msg{i}"));
|
||||||
}
|
}
|
||||||
|
|
@ -112,12 +109,12 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn format_notification_includes_message_and_nonblocking_hint() {
|
fn format_notify_includes_message_and_nonblocking_hint() {
|
||||||
let n = PendingNotification {
|
let n = PendingNotify {
|
||||||
message: "hello".into(),
|
message: "hello".into(),
|
||||||
};
|
};
|
||||||
let catalog = PromptCatalog::builtins_only().unwrap();
|
let catalog = PromptCatalog::builtins_only().unwrap();
|
||||||
let item = format_notification(&n, &catalog).unwrap();
|
let item = format_notify(&n, &catalog).unwrap();
|
||||||
let text = item.as_text().unwrap_or_default().to_string();
|
let text = item.as_text().unwrap_or_default().to_string();
|
||||||
assert!(text.contains("[Notification]"));
|
assert!(text.contains("[Notification]"));
|
||||||
assert!(text.contains("hello"));
|
assert!(text.contains("hello"));
|
||||||
|
|
@ -62,17 +62,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
||||||
let mut reader = JsonLineReader::new(reader);
|
let mut reader = JsonLineReader::new(reader);
|
||||||
let mut writer = JsonLineWriter::new(writer);
|
let mut writer = JsonLineWriter::new(writer);
|
||||||
|
|
||||||
// Atomically subscribe and snapshot buffered notifications so that
|
// Atomically subscribe and snapshot buffered alerts so that
|
||||||
// warnings emitted before this client connected are replayed
|
// warnings emitted before this client connected are replayed
|
||||||
// exactly once — they appear in the snapshot, and any notification
|
// exactly once — they appear in the snapshot, and any alert
|
||||||
// arriving afterwards reaches us through `rx`.
|
// arriving afterwards reaches us through `rx`.
|
||||||
let (notification_snapshot, mut rx) = handle.notifier.subscribe_with_snapshot();
|
let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot();
|
||||||
for notification in notification_snapshot {
|
for alert in alert_snapshot {
|
||||||
if writer
|
if writer.write(&Event::Alert(alert)).await.is_err() {
|
||||||
.write(&Event::Notification(notification))
|
|
||||||
.await
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ pub use compact::token_counter::{EstimateSource, SplitPoint, TokenEstimate};
|
||||||
pub use controller::{PodController, PodHandle, ShutdownReceiver};
|
pub use controller::{PodController, PodHandle, ShutdownReceiver};
|
||||||
pub use factory::{FactoryError, PodFactory};
|
pub use factory::{FactoryError, PodFactory};
|
||||||
pub use hook::{Hook, HookEventKind, HookRegistryBuilder};
|
pub use hook::{Hook, HookEventKind, HookRegistryBuilder};
|
||||||
pub use ipc::notifier::Notifier;
|
pub use ipc::alerter::Alerter;
|
||||||
pub use ipc::server::SocketServer;
|
pub use ipc::server::SocketServer;
|
||||||
pub use manifest::{
|
pub use manifest::{
|
||||||
AuthRef, ModelManifest, PodManifest, PodManifestConfig, PodMetaConfig, Scope, SchemeKind,
|
AuthRef, ModelManifest, PodManifest, PodManifestConfig, PodMetaConfig, Scope, SchemeKind,
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,8 @@ use crate::hook::{
|
||||||
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
|
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
|
||||||
PreRequestInfo, PreToolCall,
|
PreRequestInfo, PreToolCall,
|
||||||
};
|
};
|
||||||
use crate::ipc::notification_buffer::NotificationBuffer;
|
use crate::ipc::alerter::Alerter;
|
||||||
use crate::ipc::notifier::Notifier;
|
use crate::ipc::notify_buffer::NotifyBuffer;
|
||||||
use crate::ipc::interceptor::PodInterceptor;
|
use crate::ipc::interceptor::PodInterceptor;
|
||||||
use crate::prompt::loader::PromptLoader;
|
use crate::prompt::loader::PromptLoader;
|
||||||
use crate::prompt::catalog::{CatalogError, PromptCatalog};
|
use crate::prompt::catalog::{CatalogError, PromptCatalog};
|
||||||
|
|
@ -28,7 +28,7 @@ use crate::runtime::dir;
|
||||||
use crate::runtime::scope_lock::{self, ScopeAllocationGuard, ScopeLockError};
|
use crate::runtime::scope_lock::{self, ScopeAllocationGuard, ScopeLockError};
|
||||||
use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate};
|
use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate};
|
||||||
use crate::compact::usage_tracker::UsageTracker;
|
use crate::compact::usage_tracker::UsageTracker;
|
||||||
use protocol::{Event, NotificationLevel, NotificationSource};
|
use protocol::{Event, AlertLevel, AlertSource};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use llm_worker::interceptor::PreRequestAction;
|
use llm_worker::interceptor::PreRequestAction;
|
||||||
|
|
@ -90,16 +90,16 @@ pub struct Pod<C: LlmClient, St: Store> {
|
||||||
system_prompt_template: Option<SystemPromptTemplate>,
|
system_prompt_template: Option<SystemPromptTemplate>,
|
||||||
/// User-facing notification sink attached by the Controller at
|
/// User-facing notification sink attached by the Controller at
|
||||||
/// spawn time. `None` in tests / direct `Pod::new` usage.
|
/// spawn time. `None` in tests / direct `Pod::new` usage.
|
||||||
notifier: Option<Notifier>,
|
alerter: Option<Alerter>,
|
||||||
/// Broadcast sender for typed lifecycle `Event`s (compact progress,
|
/// Broadcast sender for typed lifecycle `Event`s (compact progress,
|
||||||
/// etc.). Attached by the Controller alongside `notifier`. Unlike
|
/// etc.). Attached by the Controller alongside `alerter`. Unlike
|
||||||
/// notifications, events sent here are NOT replayed to clients that
|
/// notifications, events sent here are NOT replayed to clients that
|
||||||
/// connect after the fact — they are fire-and-forget broadcasts.
|
/// connect after the fact — they are fire-and-forget broadcasts.
|
||||||
event_tx: Option<broadcast::Sender<Event>>,
|
event_tx: Option<broadcast::Sender<Event>>,
|
||||||
/// Queue of pending `Method::Notify` notifications awaiting
|
/// Queue of pending `Method::Notify` notifications awaiting
|
||||||
/// injection into the next LLM request. Shared with the
|
/// injection into the next LLM request. Shared with the
|
||||||
/// PodInterceptor installed in `ensure_interceptor_installed`.
|
/// PodInterceptor installed in `ensure_interceptor_installed`.
|
||||||
pending_notifications: NotificationBuffer,
|
pending_notifies: NotifyBuffer,
|
||||||
/// Scope allocation in the machine-wide lock file. `Some` for
|
/// Scope allocation in the machine-wide lock file. `Some` for
|
||||||
/// Pods built via `from_manifest` (production path); `None` for
|
/// Pods built via `from_manifest` (production path); `None` for
|
||||||
/// lower-level constructors (`Pod::new`, `Pod::restore`) that
|
/// lower-level constructors (`Pod::new`, `Pod::restore`) that
|
||||||
|
|
@ -158,9 +158,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
|
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
system_prompt_template: None,
|
system_prompt_template: None,
|
||||||
notifier: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
pending_notifications: NotificationBuffer::new(),
|
pending_notifies: NotifyBuffer::new(),
|
||||||
scope_allocation: None,
|
scope_allocation: None,
|
||||||
callback_socket: None,
|
callback_socket: None,
|
||||||
prompts,
|
prompts,
|
||||||
|
|
@ -231,9 +231,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
usage_history: Arc::new(Mutex::new(state.usage_history)),
|
usage_history: Arc::new(Mutex::new(state.usage_history)),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
system_prompt_template: None,
|
system_prompt_template: None,
|
||||||
notifier: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
pending_notifications: NotificationBuffer::new(),
|
pending_notifies: NotifyBuffer::new(),
|
||||||
scope_allocation: None,
|
scope_allocation: None,
|
||||||
callback_socket: None,
|
callback_socket: None,
|
||||||
prompts,
|
prompts,
|
||||||
|
|
@ -332,22 +332,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
/// Called by the Controller immediately after spawning so that
|
/// Called by the Controller immediately after spawning so that
|
||||||
/// Pod-internal operations (compaction failures, AGENTS.md
|
/// Pod-internal operations (compaction failures, AGENTS.md
|
||||||
/// ingestion warnings) can surface messages to connected clients.
|
/// ingestion warnings) can surface messages to connected clients.
|
||||||
pub fn attach_notifier(&mut self, notifier: Notifier) {
|
pub fn attach_alerter(&mut self, alerter: Alerter) {
|
||||||
self.notifier = Some(notifier);
|
self.alerter = Some(alerter);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attach the broadcast sender used for typed lifecycle `Event`s.
|
/// Attach the broadcast sender used for typed lifecycle `Event`s.
|
||||||
///
|
///
|
||||||
/// The Controller wires this alongside [`attach_notifier`] so that
|
/// The Controller wires this alongside [`attach_alerter`] so that
|
||||||
/// Pod-internal operations (currently: compaction) can surface
|
/// Pod-internal operations (currently: compaction) can surface
|
||||||
/// progress to connected clients.
|
/// progress to connected clients.
|
||||||
pub fn attach_event_tx(&mut self, event_tx: broadcast::Sender<Event>) {
|
pub fn attach_event_tx(&mut self, event_tx: broadcast::Sender<Event>) {
|
||||||
self.event_tx = Some(event_tx);
|
self.event_tx = Some(event_tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn notify(&self, level: NotificationLevel, source: NotificationSource, message: String) {
|
fn alert(&self, level: AlertLevel, source: AlertSource, message: String) {
|
||||||
if let Some(n) = self.notifier.as_ref() {
|
if let Some(n) = self.alerter.as_ref() {
|
||||||
n.notify(level, source, message);
|
n.alert(level, source, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -364,17 +364,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
///
|
///
|
||||||
/// The notification will be injected as an `Item::system_message`
|
/// The notification will be injected as an `Item::system_message`
|
||||||
/// into the next outgoing LLM request context (not into history).
|
/// into the next outgoing LLM request context (not into history).
|
||||||
/// See [`NotificationBuffer`] for overflow behaviour.
|
/// See [`NotifyBuffer`] for overflow behaviour.
|
||||||
pub fn push_notification(&self, message: String) {
|
pub fn push_notify(&self, message: String) {
|
||||||
self.pending_notifications.push(message);
|
self.pending_notifies.push(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shared handle to the pending notification buffer.
|
/// Shared handle to the pending notification buffer.
|
||||||
///
|
///
|
||||||
/// The Controller holds a clone so that `Method::Notify` arriving
|
/// The Controller holds a clone so that `Method::Notify` arriving
|
||||||
/// while `pod.run()` is in flight can still reach the interceptor.
|
/// while `pod.run()` is in flight can still reach the interceptor.
|
||||||
pub fn notification_buffer_handle(&self) -> NotificationBuffer {
|
pub fn notify_buffer_handle(&self) -> NotifyBuffer {
|
||||||
self.pending_notifications.clone()
|
self.pending_notifies.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parent callback socket set by `from_manifest_spawned`.
|
/// Parent callback socket set by `from_manifest_spawned`.
|
||||||
|
|
@ -497,7 +497,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
registry,
|
registry,
|
||||||
compact_state,
|
compact_state,
|
||||||
usage_history_handle,
|
usage_history_handle,
|
||||||
self.pending_notifications.clone(),
|
self.pending_notifies.clone(),
|
||||||
self.prompts.clone(),
|
self.prompts.clone(),
|
||||||
);
|
);
|
||||||
self.worker_mut().set_interceptor(interceptor);
|
self.worker_mut().set_interceptor(interceptor);
|
||||||
|
|
@ -516,7 +516,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
let Some(template) = self.system_prompt_template.take() else {
|
let Some(template) = self.system_prompt_template.take() else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
let notifier = self.notifier.clone();
|
let alerter = self.alerter.clone();
|
||||||
let worker = self.worker.as_mut().expect("worker present");
|
let worker = self.worker.as_mut().expect("worker present");
|
||||||
// Materialise any pending tool factories so the template sees the
|
// Materialise any pending tool factories so the template sees the
|
||||||
// full list of tool names. Redundant with the flush inside
|
// full list of tool names. Redundant with the flush inside
|
||||||
|
|
@ -530,10 +530,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
.collect();
|
.collect();
|
||||||
let agents_md_read = read_agents_md(&self.pwd);
|
let agents_md_read = read_agents_md(&self.pwd);
|
||||||
for warning in agents_md_read.warnings {
|
for warning in agents_md_read.warnings {
|
||||||
if let Some(n) = notifier.as_ref() {
|
if let Some(n) = alerter.as_ref() {
|
||||||
n.notify(
|
n.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::AgentsMd,
|
AlertSource::AgentsMd,
|
||||||
warning,
|
warning,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -713,9 +713,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
self.send_event(Event::CompactFailed {
|
self.send_event(Event::CompactFailed {
|
||||||
error: e.to_string(),
|
error: e.to_string(),
|
||||||
});
|
});
|
||||||
self.notify(
|
self.alert(
|
||||||
NotificationLevel::Error,
|
AlertLevel::Error,
|
||||||
NotificationSource::Compactor,
|
AlertSource::Compactor,
|
||||||
format!("mid-run compaction failed: {e}"),
|
format!("mid-run compaction failed: {e}"),
|
||||||
);
|
);
|
||||||
if let Some(ref state) = self.compact_state {
|
if let Some(ref state) = self.compact_state {
|
||||||
|
|
@ -757,9 +757,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
self.send_event(Event::CompactFailed {
|
self.send_event(Event::CompactFailed {
|
||||||
error: e.to_string(),
|
error: e.to_string(),
|
||||||
});
|
});
|
||||||
self.notify(
|
self.alert(
|
||||||
NotificationLevel::Warn,
|
AlertLevel::Warn,
|
||||||
NotificationSource::Compactor,
|
AlertSource::Compactor,
|
||||||
format!("post-run compaction failed: {e}"),
|
format!("post-run compaction failed: {e}"),
|
||||||
);
|
);
|
||||||
state.record_compact_failure();
|
state.record_compact_failure();
|
||||||
|
|
@ -1171,9 +1171,9 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
usage_history: Arc::new(Mutex::new(Vec::new())),
|
usage_history: Arc::new(Mutex::new(Vec::new())),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
system_prompt_template,
|
system_prompt_template,
|
||||||
notifier: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
pending_notifications: NotificationBuffer::new(),
|
pending_notifies: NotifyBuffer::new(),
|
||||||
scope_allocation: Some(scope_allocation),
|
scope_allocation: Some(scope_allocation),
|
||||||
callback_socket: None,
|
callback_socket: None,
|
||||||
prompts,
|
prompts,
|
||||||
|
|
@ -1234,9 +1234,9 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
usage_history: Arc::new(Mutex::new(Vec::new())),
|
usage_history: Arc::new(Mutex::new(Vec::new())),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
system_prompt_template,
|
system_prompt_template,
|
||||||
notifier: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
pending_notifications: NotificationBuffer::new(),
|
pending_notifies: NotifyBuffer::new(),
|
||||||
scope_allocation: Some(scope_allocation),
|
scope_allocation: Some(scope_allocation),
|
||||||
callback_socket: Some(callback_socket),
|
callback_socket: Some(callback_socket),
|
||||||
prompts,
|
prompts,
|
||||||
|
|
|
||||||
|
|
@ -348,7 +348,7 @@ enum SendRunError {
|
||||||
|
|
||||||
/// Write `Method::Run` to the target and read back events until we see
|
/// Write `Method::Run` to the target and read back events until we see
|
||||||
/// either `TurnStart` (accepted) or `Error { AlreadyRunning }`
|
/// either `TurnStart` (accepted) or `Error { AlreadyRunning }`
|
||||||
/// (rejected). Any replayed notifications that precede the response are
|
/// (rejected). Any replayed alerts that precede the response are
|
||||||
/// skipped. Times out per-read so a stuck Pod doesn't hang the tool.
|
/// skipped. Times out per-read so a stuck Pod doesn't hang the tool.
|
||||||
async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
|
async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
|
||||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||||
|
|
@ -373,9 +373,9 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu
|
||||||
..
|
..
|
||||||
}) => return Err(SendRunError::AlreadyRunning),
|
}) => return Err(SendRunError::AlreadyRunning),
|
||||||
Some(Event::TurnStart { .. }) => return Ok(()),
|
Some(Event::TurnStart { .. }) => return Ok(()),
|
||||||
// Notifications and other pre-turn events are replayed to
|
// Alerts and other pre-turn events are replayed to new
|
||||||
// new subscribers; keep reading until the controller's
|
// subscribers; keep reading until the controller's response
|
||||||
// response to our `Run` shows up.
|
// to our `Run` shows up.
|
||||||
Some(_) => continue,
|
Some(_) => continue,
|
||||||
None => return Err(SendRunError::Io("connection closed before response".into())),
|
None => return Err(SendRunError::Io("connection closed before response".into())),
|
||||||
}
|
}
|
||||||
|
|
@ -383,7 +383,7 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connect and ask the Pod for its conversation history. Skips
|
/// Connect and ask the Pod for its conversation history. Skips
|
||||||
/// pre-History events (such as buffered notifications replayed to new
|
/// pre-History events (such as buffered alerts replayed to new
|
||||||
/// clients). Returns the raw JSON items as `serde_json::Value` since
|
/// clients). Returns the raw JSON items as `serde_json::Value` since
|
||||||
/// the pod crate already round-trips via `Value` on the wire.
|
/// the pod crate already round-trips via `Value` on the wire.
|
||||||
async fn fetch_history(socket: &Path) -> std::io::Result<Vec<serde_json::Value>> {
|
async fn fetch_history(socket: &Path) -> std::io::Result<Vec<serde_json::Value>> {
|
||||||
|
|
|
||||||
|
|
@ -148,7 +148,7 @@ pub enum Event {
|
||||||
items: Vec<serde_json::Value>,
|
items: Vec<serde_json::Value>,
|
||||||
greeting: Greeting,
|
greeting: Greeting,
|
||||||
},
|
},
|
||||||
Notification(Notification),
|
Alert(Alert),
|
||||||
/// Pod has started compacting the current session.
|
/// Pod has started compacting the current session.
|
||||||
///
|
///
|
||||||
/// Fired immediately before a compaction run. Success is signalled by
|
/// Fired immediately before a compaction run. Success is signalled by
|
||||||
|
|
@ -169,16 +169,16 @@ pub enum Event {
|
||||||
Shutdown,
|
Shutdown,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// User-facing notification emitted from the Pod layer.
|
/// User-facing alert emitted from the Pod layer.
|
||||||
///
|
///
|
||||||
/// This is a separate channel from `tracing` (developer logs): entries
|
/// This is a separate channel from `tracing` (developer logs): entries
|
||||||
/// here are assembled explicitly by the Pod when a condition should be
|
/// here are assembled explicitly by the Pod when a condition should be
|
||||||
/// surfaced to the person driving the client. Keep messages short and
|
/// surfaced to the person driving the client. Keep messages short and
|
||||||
/// human-readable.
|
/// human-readable.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Notification {
|
pub struct Alert {
|
||||||
pub level: NotificationLevel,
|
pub level: AlertLevel,
|
||||||
pub source: NotificationSource,
|
pub source: AlertSource,
|
||||||
pub message: String,
|
pub message: String,
|
||||||
/// Milliseconds since the Unix epoch.
|
/// Milliseconds since the Unix epoch.
|
||||||
pub timestamp_ms: i64,
|
pub timestamp_ms: i64,
|
||||||
|
|
@ -186,14 +186,14 @@ pub struct Notification {
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum NotificationLevel {
|
pub enum AlertLevel {
|
||||||
Warn,
|
Warn,
|
||||||
Error,
|
Error,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum NotificationSource {
|
pub enum AlertSource {
|
||||||
Pod,
|
Pod,
|
||||||
Worker,
|
Worker,
|
||||||
Compactor,
|
Compactor,
|
||||||
|
|
@ -462,16 +462,16 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn event_notification_format() {
|
fn event_alert_format() {
|
||||||
let event = Event::Notification(Notification {
|
let event = Event::Alert(Alert {
|
||||||
level: NotificationLevel::Warn,
|
level: AlertLevel::Warn,
|
||||||
source: NotificationSource::Compactor,
|
source: AlertSource::Compactor,
|
||||||
message: "compaction failed".into(),
|
message: "compaction failed".into(),
|
||||||
timestamp_ms: 1_700_000_000_000,
|
timestamp_ms: 1_700_000_000_000,
|
||||||
});
|
});
|
||||||
let json = serde_json::to_string(&event).unwrap();
|
let json = serde_json::to_string(&event).unwrap();
|
||||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
assert_eq!(parsed["event"], "notification");
|
assert_eq!(parsed["event"], "alert");
|
||||||
assert_eq!(parsed["data"]["level"], "warn");
|
assert_eq!(parsed["data"]["level"], "warn");
|
||||||
assert_eq!(parsed["data"]["source"], "compactor");
|
assert_eq!(parsed["data"]["source"], "compactor");
|
||||||
assert_eq!(parsed["data"]["message"], "compaction failed");
|
assert_eq!(parsed["data"]["message"], "compaction failed");
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use protocol::{Event, Method, NotificationLevel, NotificationSource, RunResult};
|
use protocol::{Event, Method, AlertLevel, AlertSource, RunResult};
|
||||||
|
|
||||||
use crate::block::{Block, CompactEvent, ToolCallBlock, ToolCallState};
|
use crate::block::{Block, CompactEvent, ToolCallBlock, ToolCallState};
|
||||||
use crate::cache::FileCache;
|
use crate::cache::FileCache;
|
||||||
|
|
@ -81,9 +81,9 @@ impl App {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_error(&mut self, message: impl Into<String>) {
|
pub fn push_error(&mut self, message: impl Into<String>) {
|
||||||
self.blocks.push(Block::Notification {
|
self.blocks.push(Block::Alert {
|
||||||
level: NotificationLevel::Error,
|
level: AlertLevel::Error,
|
||||||
source: NotificationSource::Pod,
|
source: AlertSource::Pod,
|
||||||
message: message.into(),
|
message: message.into(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -194,16 +194,16 @@ impl App {
|
||||||
apply_cache_update(&mut self.cache, &name, args.as_deref(), output.as_deref());
|
apply_cache_update(&mut self.cache, &name, args.as_deref(), output.as_deref());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Result for an unknown tool call. Surface it as a
|
// Result for an unknown tool call. Surface it as an
|
||||||
// notification so it isn't silently dropped.
|
// alert so it isn't silently dropped.
|
||||||
let level = if is_error {
|
let level = if is_error {
|
||||||
NotificationLevel::Error
|
AlertLevel::Error
|
||||||
} else {
|
} else {
|
||||||
NotificationLevel::Warn
|
AlertLevel::Warn
|
||||||
};
|
};
|
||||||
self.blocks.push(Block::Notification {
|
self.blocks.push(Block::Alert {
|
||||||
level,
|
level,
|
||||||
source: NotificationSource::Pod,
|
source: AlertSource::Pod,
|
||||||
message: format!("orphan tool result ({id}): {summary}"),
|
message: format!("orphan tool result ({id}): {summary}"),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -243,11 +243,11 @@ impl App {
|
||||||
self.blocks
|
self.blocks
|
||||||
.push(Block::Compact(CompactEvent::Failed { error }));
|
.push(Block::Compact(CompactEvent::Failed { error }));
|
||||||
}
|
}
|
||||||
Event::Notification(notification) => {
|
Event::Alert(alert) => {
|
||||||
self.blocks.push(Block::Notification {
|
self.blocks.push(Block::Alert {
|
||||||
level: notification.level,
|
level: alert.level,
|
||||||
source: notification.source,
|
source: alert.source,
|
||||||
message: notification.message,
|
message: alert.message,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Event::History { items, greeting } => {
|
Event::History { items, greeting } => {
|
||||||
|
|
@ -488,12 +488,12 @@ fn strip_cat_n_prefix(formatted: &str) -> String {
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn notification_source_label(source: NotificationSource) -> &'static str {
|
pub fn alert_source_label(source: AlertSource) -> &'static str {
|
||||||
match source {
|
match source {
|
||||||
NotificationSource::Pod => "pod",
|
AlertSource::Pod => "pod",
|
||||||
NotificationSource::Worker => "worker",
|
AlertSource::Worker => "worker",
|
||||||
NotificationSource::Compactor => "compactor",
|
AlertSource::Compactor => "compactor",
|
||||||
NotificationSource::AgentsMd => "AGENTS.md",
|
AlertSource::AgentsMd => "AGENTS.md",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@
|
||||||
|
|
||||||
#![allow(dead_code)] // Phase 5 will consume `output` in detail mode.
|
#![allow(dead_code)] // Phase 5 will consume `output` in detail mode.
|
||||||
|
|
||||||
use protocol::{Greeting, NotificationLevel, NotificationSource};
|
use protocol::{Greeting, AlertLevel, AlertSource};
|
||||||
|
|
||||||
pub enum Block {
|
pub enum Block {
|
||||||
Greeting(Greeting),
|
Greeting(Greeting),
|
||||||
|
|
@ -21,9 +21,9 @@ pub enum Block {
|
||||||
text: String,
|
text: String,
|
||||||
},
|
},
|
||||||
ToolCall(ToolCallBlock),
|
ToolCall(ToolCallBlock),
|
||||||
Notification {
|
Alert {
|
||||||
level: NotificationLevel,
|
level: AlertLevel,
|
||||||
source: NotificationSource,
|
source: AlertSource,
|
||||||
message: String,
|
message: String,
|
||||||
},
|
},
|
||||||
Compact(CompactEvent),
|
Compact(CompactEvent),
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,9 @@ use ratatui::text::{Line, Span};
|
||||||
use ratatui::widgets::{Block as UiBlock, BorderType, Borders, Padding, Paragraph, Widget, Wrap};
|
use ratatui::widgets::{Block as UiBlock, BorderType, Borders, Padding, Paragraph, Widget, Wrap};
|
||||||
use unicode_width::{UnicodeWidthChar, UnicodeWidthStr};
|
use unicode_width::{UnicodeWidthChar, UnicodeWidthStr};
|
||||||
|
|
||||||
use protocol::{Greeting, NotificationLevel};
|
use protocol::{Greeting, AlertLevel};
|
||||||
|
|
||||||
use crate::app::{App, fmt_tokens, notification_source_label};
|
use crate::app::{App, fmt_tokens, alert_source_label};
|
||||||
use crate::block::{Block, CompactEvent};
|
use crate::block::{Block, CompactEvent};
|
||||||
|
|
||||||
/// Display density for the history view.
|
/// Display density for the history view.
|
||||||
|
|
@ -313,20 +313,20 @@ fn render_block_into(
|
||||||
// ToolCall is dispatched in `compute_history` via `tool::render_tool`
|
// ToolCall is dispatched in `compute_history` via `tool::render_tool`
|
||||||
// so it can consume multiple adjacent blocks (Read aggregation).
|
// so it can consume multiple adjacent blocks (Read aggregation).
|
||||||
Block::ToolCall(_) => unreachable!("ToolCall handled by compute_history"),
|
Block::ToolCall(_) => unreachable!("ToolCall handled by compute_history"),
|
||||||
Block::Notification {
|
Block::Alert {
|
||||||
level,
|
level,
|
||||||
source,
|
source,
|
||||||
message,
|
message,
|
||||||
} => {
|
} => {
|
||||||
let kind = match level {
|
let kind = match level {
|
||||||
NotificationLevel::Warn => MessageKind::NoticeWarn,
|
AlertLevel::Warn => MessageKind::NoticeWarn,
|
||||||
NotificationLevel::Error => MessageKind::NoticeError,
|
AlertLevel::Error => MessageKind::NoticeError,
|
||||||
};
|
};
|
||||||
let prefix = match level {
|
let prefix = match level {
|
||||||
NotificationLevel::Warn => "[notice]",
|
AlertLevel::Warn => "[notice]",
|
||||||
NotificationLevel::Error => "[notice error]",
|
AlertLevel::Error => "[notice error]",
|
||||||
};
|
};
|
||||||
let label = notification_source_label(*source);
|
let label = alert_source_label(*source);
|
||||||
let text = format!("{prefix} {label}: {message}");
|
let text = format!("{prefix} {label}: {message}");
|
||||||
match mode {
|
match mode {
|
||||||
Mode::Overview => push_overview_line(lines, &text, width, kind, ""),
|
Mode::Overview => push_overview_line(lines, &text, width, kind, ""),
|
||||||
|
|
|
||||||
70
tickets/notification-naming-cleanup.md
Normal file
70
tickets/notification-naming-cleanup.md
Normal file
|
|
@ -0,0 +1,70 @@
|
||||||
|
# Notification 用語のリネーム
|
||||||
|
|
||||||
|
## 背景
|
||||||
|
|
||||||
|
Pod 周辺に二系統の「notification」があり、用語が衝突して読みづらい:
|
||||||
|
|
||||||
|
- **outbound** (Pod → Client): `Event::Notification` / `Notifier::notify` — ユーザー向けの運用診断(compaction 失敗、tool 出力 truncated 等)
|
||||||
|
- **inbound** (External → Pod): `Method::Notify` / `NotificationBuffer` — 外部から Pod の LLM context に「メモを置く」経路
|
||||||
|
|
||||||
|
両者とも `Notification` / `Notify` を共有しており、コードを読む側が毎回どちらの経路かを文脈から判別する必要がある。
|
||||||
|
|
||||||
|
`Method::Notify` の動詞名は「呼び出し側がやる行為」として意味的に正しいので残し、**outbound 側の名詞**を `Alert` に倒して非対称を入れることで区別する。
|
||||||
|
|
||||||
|
## 要件
|
||||||
|
|
||||||
|
### outbound (Pod → Client) — `Alert` にリネーム
|
||||||
|
|
||||||
|
protocol:
|
||||||
|
|
||||||
|
- `Event::Notification(Notification)` → `Event::Alert(Alert)`
|
||||||
|
- `protocol::Notification` 構造体 → `protocol::Alert`
|
||||||
|
- `NotificationLevel` → `AlertLevel`
|
||||||
|
- `NotificationSource` → `AlertSource`
|
||||||
|
|
||||||
|
pod:
|
||||||
|
|
||||||
|
- `crates/pod/src/ipc/notifier.rs` → `alerter.rs`
|
||||||
|
- `Notifier` → `Alerter`
|
||||||
|
- `Notifier::notify(...)` → `Alerter::alert(...)`
|
||||||
|
- `Notifier::subscribe_with_snapshot()` の戻り値 `Vec<Notification>` → `Vec<Alert>`
|
||||||
|
|
||||||
|
tui:
|
||||||
|
|
||||||
|
- `Block::Notification` → `Block::Alert`(描画スタイルは変更しない)
|
||||||
|
|
||||||
|
### inbound (External → Pod) — `Notify` 据置 + 衛生整理
|
||||||
|
|
||||||
|
- `Method::Notify` 据置(動詞として正しい)
|
||||||
|
- `NotificationBuffer` → `NotifyBuffer`(メソッド名と揃える)
|
||||||
|
- `PendingNotification` → `PendingNotify`
|
||||||
|
- `notify_wrapper` プロンプト名 据置
|
||||||
|
- LLM 向け wrapper ラベル `[Notification]` 据置(LLM への見え方は変えない)
|
||||||
|
|
||||||
|
### protocol wire 互換
|
||||||
|
|
||||||
|
開発初期段階のため wire 互換は意図的に切る。client / pod を同時更新する前提で、旧名を吸収する `#[serde(rename)]` 等は入れない。
|
||||||
|
|
||||||
|
## 範囲外
|
||||||
|
|
||||||
|
- 通知の content / level / source の意味論変更(純粋な rename のみ)
|
||||||
|
- `Notifier` / `NotificationBuffer` の buffer 容量や snapshot 挙動の変更
|
||||||
|
- TUI 側 `Block::Alert` の描画スタイルの変更
|
||||||
|
|
||||||
|
## 完了条件
|
||||||
|
|
||||||
|
- 上記 rename がすべて反映されている
|
||||||
|
- ビルドが通り、既存テストが通る
|
||||||
|
- grep で旧名(outbound 文脈の `Notification` / `Notifier` / 動詞 `notify`)が消えている
|
||||||
|
|
||||||
|
## 参照
|
||||||
|
|
||||||
|
- `crates/protocol/src/lib.rs`(`Event::Notification`, `Notification` struct, `NotificationLevel/Source`)
|
||||||
|
- `crates/pod/src/ipc/notifier.rs`(outbound、`Notifier`)
|
||||||
|
- `crates/pod/src/ipc/notification_buffer.rs`(inbound、`NotificationBuffer`, `PendingNotification`)
|
||||||
|
- `crates/tui/`(`Block::Notification` 描画)
|
||||||
|
|
||||||
|
## Review
|
||||||
|
- 状態: Approve with follow-up
|
||||||
|
- レビュー詳細: [./notification-naming-cleanup.review.md](./notification-naming-cleanup.review.md)
|
||||||
|
- 日付: 2026-04-26
|
||||||
42
tickets/notification-naming-cleanup.review.md
Normal file
42
tickets/notification-naming-cleanup.review.md
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
# Review: Notification 用語のリネーム
|
||||||
|
|
||||||
|
## 前提・要件の確認
|
||||||
|
|
||||||
|
### outbound (Pod → Client) — Alert 系
|
||||||
|
- `Event::Notification(Notification)` → `Event::Alert(Alert)`: `crates/protocol/src/lib.rs:151`, `crates/protocol/src/lib.rs:178-185` で完了。
|
||||||
|
- `NotificationLevel/Source` → `AlertLevel/AlertSource`: `crates/protocol/src/lib.rs:187-201` で完了。serde の `rename_all = "snake_case"` も維持されており wire 名は `warn` / `error` / `pod` / `worker` / `compactor` / `agents_md` のまま (元と一致)。
|
||||||
|
- `notifier.rs` → `alerter.rs`、`Notifier` → `Alerter`、`notify()` → `alert()`: `crates/pod/src/ipc/alerter.rs:26-87` で完了。`subscribe_with_snapshot` の戻り値も `(Vec<Alert>, …)` (l.78)。
|
||||||
|
- `Block::Notification` → `Block::Alert`: `crates/tui/src/block.rs:24-28`。レンダリング側 (`crates/tui/src/ui.rs:316-335`) も match arm が更新されており、style (`MessageKind::NoticeWarn/Error` と prefix `[notice]` / `[notice error]`) は据置 — 範囲外要件を尊重している。
|
||||||
|
- イベント名のテスト: `event_alert_format` (`crates/protocol/src/lib.rs:464-479`) で `event=alert` の wire 形を確認。
|
||||||
|
|
||||||
|
### inbound (External → Pod) — Notify 据置 + 衛生整理
|
||||||
|
- `Method::Notify` 据置: `crates/protocol/src/lib.rs:18` 健在、`method_notify_json_roundtrip` (l.343) も保持。
|
||||||
|
- `notification_buffer.rs` → `notify_buffer.rs`、`NotificationBuffer/PendingNotification` → `NotifyBuffer/PendingNotify`: `crates/pod/src/ipc/notify_buffer.rs:20-67`。
|
||||||
|
- `format_notification` → `format_notify`: 同 l.74。
|
||||||
|
- `notify_wrapper` プロンプト名と `[Notification]` ラベル据置: `crates/pod/src/ipc/notify_buffer.rs:78,119` および `crates/pod/src/prompt/catalog.rs:447,484` の test asserts、`crates/pod/tests/controller_test.rs:413` の test も `[Notification]` を期待 — 範囲外要件と整合。
|
||||||
|
|
||||||
|
### protocol wire 互換
|
||||||
|
- 旧 `notification` event 名を吸収する `#[serde(rename)]` 等は入っていない (`crates/protocol/src/lib.rs:151` の `Alert(Alert)` は `rename_all = "snake_case"` 経由で wire 上 `event=alert` に変わる)。チケットの「wire 互換は意図的に切る」方針通り。
|
||||||
|
|
||||||
|
### 完了条件
|
||||||
|
- ビルド通過 (cargo build): pod / protocol / tui で warning は既存の `end_scope` のみ、本変更に起因しない。
|
||||||
|
- テスト: `cargo test -p pod -p protocol -p tui` で 205 件 PASS (ユニット+統合+doc 含む。0 fail / 0 ignored)。
|
||||||
|
- grep: `Notifier|NotificationLevel|NotificationSource|NotificationBuffer|PendingNotification|notification_buffer|push_notification|attach_notifier|notifier` がコード上で 0 ヒット。残る `Notification` 一致は LLM 向け wrapper ラベル `[Notification]` (据置) と inbound 概念を指す散在コメントのみ。
|
||||||
|
|
||||||
|
## アーキテクチャ・スコープ
|
||||||
|
- レイヤ境界尊重: 変更は protocol / pod (ipc, controller, pod 本体) / tui に限定。llm-worker は触れていない。
|
||||||
|
- 命名方針: `Alerter` / `alert()` の対 `Notify` (動詞) という非対称設計はチケット背景と一致しており、用語衝突を解消する最小限の手当てになっている。
|
||||||
|
- 不必要な変更の有無: outbound 一式の rename と inbound 側の衛生 (`NotifyBuffer` / `PendingNotify` / `format_notify`) は要件に明記されている範囲のみ。`pending_notifications` フィールド名 → `pending_notifies` や `push_notification` → `push_notify` まで踏み込んだのは、フィールド型自体が `NotifyBuffer` / `PendingNotify` に変わっていることを踏まえると naming consistency 上妥当。`run_for_notification` メソッド名は `Method::Notify` 由来の "notify-driven run" を表すので残しても問題ない (今回触っていない)。
|
||||||
|
- ファイル rename: ticket 通り `notifier.rs` → `alerter.rs`、`notification_buffer.rs` → `notify_buffer.rs` の 2 つだけで、過剰な移動なし。`mv` を使った旨 (history 保全) も妥当。
|
||||||
|
|
||||||
|
## 指摘事項
|
||||||
|
|
||||||
|
### Non-blocking / Follow-up
|
||||||
|
- `crates/pod/src/controller.rs:49` の doc comment が `Thin wrapper over `Alerter::notify`` のまま (実体は `Alerter::alert`)。1 行修正で解消可能。
|
||||||
|
|
||||||
|
### Nits
|
||||||
|
- `crates/protocol/src/lib.rs:39` の "via the notification buffer" や `crates/pod/src/controller.rs:255,258,346,474,480,657` の "notification buffer" 等の散在コメントは、文脈上 inbound (Notify) の話なので意味は通る。ただし型名が `NotifyBuffer` に揃ったので "notify buffer" に揃えると読み手のノイズが減る。ticket の「衛生整理」を保守的に解釈すれば任意項目。
|
||||||
|
- `crates/pod/src/pod.rs:91,330,363,365,372,581,585`、`crates/pod/src/ipc/interceptor.rs:4,43,46`、`crates/pod/src/ipc/event.rs:10`、`crates/pod/src/prompt/agents_md.rs:23` 等の doc comment の "notification" も同様 — inbound 側は "notify" 動詞・"alert" 名詞 (outbound) の使い分けが定義されたので、コメントもこの軸で揃え直すと表面の用語衝突がさらに減る。本リネームの影響範囲外と見るなら据置で問題ない。
|
||||||
|
|
||||||
|
## 判断
|
||||||
|
Approve with follow-up — 要件は完全に満たされ、ビルド/テストもクリーン。`controller.rs:49` の doc comment 修正は trivial なので、別チケット化せず本作業の最後に直しておくのが望ましい。コメント表面の用語ノイズは Nits で残るが、ticket の rename スコープ内で必要な分は果たされている。
|
||||||
|
|
@ -47,15 +47,16 @@ resolver の trait 化と memory / workflow 用 resolver 実装は別チケッ
|
||||||
|
|
||||||
text しか作れない client が引き続き存在しても良いことを protocol 仕様に明記する(`vec![Segment::Text(_)]` のみで動く)。
|
text しか作れない client が引き続き存在しても良いことを protocol 仕様に明記する(`vec![Segment::Text(_)]` のみで動く)。
|
||||||
|
|
||||||
### unknown variant / 未登録 resolver の扱い(要決定)
|
### unknown variant / 未登録 resolver の扱い
|
||||||
|
|
||||||
新 variant を持つ client が古い Pod に投げる、または resolver 未登録の variant を Pod が受けるケースの挙動を本チケットで決める。候補:
|
新 variant を持つ client が古い Pod に投げる、または resolver 未登録の variant を Pod が受けた場合は **2 経路に同時に流す**:
|
||||||
|
|
||||||
- (a) 黙って drop — 静かに情報が落ちて user/LLM 双方が気付けない
|
1. **LLM context** に `[unknown input: kind=foo]` 相当の placeholder を `Item::system_message` で差し込む。LLM が「ユーザーは何かを送ろうとしたが Pod が解釈できなかった」と気づき、ユーザーに聞き返せる状態を作る。
|
||||||
- (b) `[unknown input: kind=foo]` 相当の placeholder を LLM context に差し込み、LLM が気づいて指摘できる
|
2. **ユーザー向け通知チャネル**(`Event::Alert` / リネーム前は `Event::Notification`)にも同時に送る。client 側で「このサーバはこの input type を解釈できません」とユーザーに直接出せる。
|
||||||
- (c) hard error で submit 拒否
|
|
||||||
|
|
||||||
(b) を仮の第一候補として実装方針を詰める。serde 側は unknown variant を吸収する形を初期から入れる。
|
両経路に流すことで「user も LLM も気付けない silent drop」を避ける。serde 側は unknown variant を吸収する形(`#[serde(other)]` 相当)を初期から入れる。
|
||||||
|
|
||||||
|
`Event::Alert` への rename は `tickets/notification-naming-cleanup.md` で扱う。本チケットの実装時点での名称(`Event::Notification` か `Event::Alert` か)はその時の repo 状態に従う。
|
||||||
|
|
||||||
## 範囲外
|
## 範囲外
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user