diff --git a/TODO.md b/TODO.md index c6ac7089..0b538aa6 100644 --- a/TODO.md +++ b/TODO.md @@ -8,7 +8,6 @@ - Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md) - Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md) - Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md) -- Pod: System 注入経路 (Notify / PodEvent / HookInjectedItems) を SystemItem 一本に統合 → [tickets/system-item-unify.md](tickets/system-item-unify.md) - 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md) - Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md) - llm-worker のエラー耐性 diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index c9f9f140..c5b65e94 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -411,24 +411,17 @@ where let Some(entry) = classify_history_item(item) else { continue; }; - let mut head = ctx.session_head.lock().await; - match session_store::append_entry_with_hash( - &ctx.store, - head.session_id, - &mut head.head_hash, - entry.clone(), - ) - .await - { - Ok(_) => { - // Publish under the same critical section view - // a `subscribe_with_snapshot` would observe. - ctx.sink.publish(entry); - } - Err(e) => { - tracing::warn!(error = %e, "drain: append_entry failed; entry dropped"); - } + commit_via_drain(&ctx, entry).await; + } + LogCommand::SystemItems(items) => { + if items.is_empty() { + continue; } + let entry = LogEntry::SystemItems { + ts: session_log::now_millis(), + items, + }; + commit_via_drain(&ctx, entry).await; } LogCommand::Flush(ack) => { let _ = ack.send(()); @@ -437,15 +430,52 @@ where } } -/// Map a single worker-history `Item` to its corresponding `LogEntry` -/// classification. `None` is the skip signal for `user_message` items — -/// those are committed via `LogEntry::UserInput` by `Pod::run` at -/// submit time and would otherwise produce a duplicate entry here. +async fn commit_via_drain(ctx: &LogDrainHandle, entry: LogEntry) +where + St: session_store::Store + Clone + Send + 'static, +{ + let mut head = ctx.session_head.lock().await; + match session_store::append_entry_with_hash( + &ctx.store, + head.session_id, + &mut head.head_hash, + entry.clone(), + ) + .await + { + Ok(_) => { + // Publish under the same critical section view a + // `subscribe_with_snapshot` would observe. + ctx.sink.publish(entry); + } + Err(e) => { + tracing::warn!(error = %e, "drain: append_entry failed; entry dropped"); + } + } +} + +/// Map one LLM-driven worker-history append to its `LogEntry` form. +/// +/// `None` is the skip signal for items that the drain must not commit: +/// - `user_message` items are committed by `Pod::run` up-front as +/// `LogEntry::UserInput { segments }`. +/// - `system_message` items are committed by `PodInterceptor` as part +/// of a `LogEntry::SystemItems` batch (with typed kind metadata) +/// before they reach the worker's history. fn classify_history_item(item: Item) -> Option { let ts = session_log::now_millis(); if item.is_user_message() { return None; } + if matches!( + item, + Item::Message { + role: llm_worker::Role::System, + .. + } + ) { + return None; + } if item.is_tool_result() { return Some(LogEntry::ToolResults { ts, @@ -458,7 +488,9 @@ fn classify_history_item(item: Item) -> Option { items: vec![session_store::LoggedItem::from(&item)], }); } - Some(LogEntry::HookInjectedItems { + // Defensive: anything else (future Item kinds) routes through + // AssistantItems rather than getting silently dropped. + Some(LogEntry::AssistantItems { ts, items: vec![session_store::LoggedItem::from(&item)], }) @@ -696,9 +728,11 @@ async fn controller_loop( } Method::Notify { message } => { - let _ = event_tx.send(Event::Notify { - message: message.clone(), - }); + // Client-side live echo is delivered as `Event::SystemItem` + // once the interceptor commits the corresponding + // `LogEntry::SystemItems` entry — drained out of the + // notify buffer + broadcast through the sink. No + // separate echo here. pod.push_notify(message); // RUNNING / Paused: the buffer push is the entire // operation; an in-flight turn (or the next @@ -751,10 +785,12 @@ async fn controller_loop( Method::ListCompletions { .. } => {} Method::PodEvent(event) => { - // Echo the received event to all subscribers so every - // client sees the input that drove any following - // auto-kicked turn. - let _ = event_tx.send(Event::PodEvent(event.clone())); + // Live echo travels through the SystemItem lane: once + // the interceptor drains the notify buffer, the + // typed `SystemItem::PodEvent` lands as a + // `LogEntry::SystemItems` entry and the sink fans it + // out to clients as `Event::SystemItem`. + // // (1) system side effects — idempotent and tolerant of // out-of-order delivery (e.g. `TurnEnded` arriving // after `ShutDown`). @@ -765,11 +801,10 @@ async fn controller_loop( &self_parent_socket, ) .await; - // (2) render a one-line summary and push it into the - // notification buffer; the next LLM request will - // inject it as a system message via - // `PodInterceptor::pre_llm_request`. - pod.push_notify(crate::ipc::event::render_event(&event)); + // (2) queue the typed event in the notification buffer; + // the next LLM request will inject it as a typed + // `SystemItem::PodEvent` via the interceptor drain. + pod.push_pod_event_notify(event); // Auto-kick a turn if the Pod is idle so the // notification is not stranded. Matches the // `Method::Notify` idle path. @@ -902,23 +937,21 @@ where }); } Some(Method::Notify { message }) => { - let _ = event_tx.send(Event::Notify { - message: message.clone(), - }); - // Route into the buffer; the in-flight turn will - // drain it at its next pre_llm_request. - notify_buffer.push(message); + // Live echo arrives via `Event::SystemItem` once + // the in-flight turn's next `pre_llm_request` + // drains this entry through the interceptor. + notify_buffer.push_notify(message); } Some(Method::ListCompletions { .. }) => {} Some(Method::PodEvent(event)) => { - let _ = event_tx.send(Event::PodEvent(event.clone())); // mpsc is consume-once, so we cannot defer this // to the next main-loop iteration — drop here // would lose the event entirely (children fire // and forget). Apply the side effects inline - // and stage the rendered string on the - // notification buffer so the in-flight turn's - // next `pre_llm_request` surfaces it. + // and stage the typed event on the notification + // buffer so the in-flight turn's next + // `pre_llm_request` surfaces it as a typed + // `SystemItem::PodEvent`. let self_parent_socket = parent_socket.cloned(); crate::ipc::event::apply_event_side_effects( &event, @@ -927,7 +960,7 @@ where &self_parent_socket, ) .await; - notify_buffer.push(crate::ipc::event::render_event(&event)); + notify_buffer.push_pod_event(event); } None => { let _ = cancel_tx.try_send(()); diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 39a70fb4..3403468f 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -22,11 +22,15 @@ use tracing::info; use tracing::warn; use crate::compact::state::CompactState; +use session_store::SystemItem; +use tokio::sync::mpsc; + use crate::hook::{ AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary, TurnEndInfo, }; -use crate::ipc::notify_buffer::{NotifyBuffer, format_notify}; +use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item}; +use crate::pod::LogCommand; use crate::prompt::catalog::PromptCatalog; use llm_worker::token_counter::total_tokens; @@ -45,13 +49,20 @@ pub(crate) struct PodInterceptor { /// request. The Worker `extend`s these into its persistent history /// so the LLM has a visible trigger for any reaction it commits. pending_notifies: NotifyBuffer, - /// Submit-scoped stash of resolver-produced system messages. - /// Drained inside `on_prompt_submit` and returned via - /// `PromptAction::ContinueWith`. Populated by `Pod::run` immediately - /// before handing off to the worker. - pending_attachments: Arc>>, + /// Submit-scoped stash of resolver-produced typed system items. + /// Drained inside `on_prompt_submit`, committed as a + /// `LogEntry::SystemItems` through `log_cmd_tx`, and returned to + /// the worker as `Item::system_message` via + /// `PromptAction::ContinueWith`. Populated by `Pod::run` + /// immediately before handing off to the worker. + pending_attachments: Arc>>, /// Prompt catalog used to render the injected notification wrapper. prompts: Arc, + /// Sender into the Pod's history-drain task. The interceptor uses + /// it to commit `LogCommand::SystemItems` batches before returning + /// the corresponding `Item::system_message`s up to the worker. + /// `None` in tests / `Pod::new` paths where no drain is wired. + log_cmd_tx: Option>, /// Next turn index assigned by `on_prompt_submit`. next_turn_index: AtomicUsize, /// Tool calls observed in the current turn (reset on each new prompt). @@ -64,8 +75,9 @@ impl PodInterceptor { compact_state: Option>, usage_history: Option>>>, pending_notifies: NotifyBuffer, - pending_attachments: Arc>>, + pending_attachments: Arc>>, prompts: Arc, + log_cmd_tx: Option>, ) -> Self { Self { registry, @@ -74,11 +86,26 @@ impl PodInterceptor { pending_notifies, pending_attachments, prompts, + log_cmd_tx, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), } } + /// Send a `LogCommand::SystemItems` batch down the drain channel + /// (no-op if no drain is wired). The drain task commits the entry + /// before the corresponding `Item::system_message`s reach the + /// worker via `ContinueWith` / `pending_history_appends`, so the + /// drain barrier in `persist_turn` covers system commits too. + fn send_system_items(&self, items: Vec) { + if items.is_empty() { + return; + } + if let Some(tx) = self.log_cmd_tx.as_ref() { + let _ = tx.send(LogCommand::SystemItems(items)); + } + } + fn current_turn_index(&self) -> usize { self.next_turn_index .load(Ordering::Relaxed) @@ -111,7 +138,7 @@ impl Interceptor for PodInterceptor { return action.into(); } } - let extras = std::mem::take( + let extras: Vec = std::mem::take( &mut *self .pending_attachments .lock() @@ -120,7 +147,14 @@ impl Interceptor for PodInterceptor { if extras.is_empty() { PromptAction::Continue } else { - PromptAction::ContinueWith(extras) + // Commit the typed system items first, then hand the + // matching `Item::system_message`s to the worker. The + // drain task processes the `SystemItems` command BEFORE + // any subsequent `Item` commands from `on_history_append`, + // so on-disk order matches worker-history order. + let items: Vec = extras.iter().map(SystemItem::to_history_item).collect(); + self.send_system_items(extras); + PromptAction::ContinueWith(items) } } @@ -129,19 +163,31 @@ impl Interceptor for PodInterceptor { if drained.is_empty() { return Vec::new(); } - let mut items = Vec::with_capacity(drained.len()); - for n in drained { - match format_notify(&n, &self.prompts) { - Ok(item) => items.push(item), + let mut system_items: Vec = Vec::with_capacity(drained.len()); + let mut items: Vec = Vec::with_capacity(drained.len()); + for entry in drained { + match build_system_item(&entry, &self.prompts) { + Ok(system_item) => { + items.push(system_item.to_history_item()); + system_items.push(system_item); + } Err(e) => { // A render failure here would starve the LLM of - // the notify text. Fall back to the raw message - // so the trigger still lands in history. + // the notify text. Fall back to a raw item so the + // trigger still lands in history; the entry will + // simply be skipped from the SystemItems batch. warn!(error = %e, "failed to render notify_wrapper; using raw message"); - items.push(Item::system_message(n.message.clone())); + let fallback = match &entry { + super::notify_buffer::PendingNotify::Notify { message } => message.clone(), + super::notify_buffer::PendingNotify::PodEvent { event } => { + session_store::render_pod_event(event) + } + }; + items.push(Item::system_message(fallback)); } } } + self.send_system_items(system_items); items } @@ -321,6 +367,7 @@ mod tests { NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -346,6 +393,7 @@ mod tests { NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -372,6 +420,7 @@ mod tests { NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -392,6 +441,7 @@ mod tests { NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; @@ -414,8 +464,8 @@ mod tests { async fn pending_history_appends_drains_buffer_into_items() { let registry = Arc::new(HookRegistryBuilder::new().build()); let buffer = NotifyBuffer::new(); - buffer.push("first".into()); - buffer.push("second".into()); + buffer.push_notify("first".into()); + buffer.push_notify("second".into()); let interceptor = PodInterceptor::new( registry, @@ -424,6 +474,7 @@ mod tests { buffer.clone(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let items = interceptor.pending_history_appends().await; @@ -451,7 +502,7 @@ mod tests { // anything itself. let registry = Arc::new(HookRegistryBuilder::new().build()); let buffer = NotifyBuffer::new(); - buffer.push("msg".into()); + buffer.push_notify("msg".into()); let interceptor = PodInterceptor::new( registry, @@ -460,6 +511,7 @@ mod tests { buffer.clone(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -489,6 +541,7 @@ mod tests { NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), + None, ); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; diff --git a/crates/pod/src/ipc/notify_buffer.rs b/crates/pod/src/ipc/notify_buffer.rs index 2700f2eb..f5e50477 100644 --- a/crates/pod/src/ipc/notify_buffer.rs +++ b/crates/pod/src/ipc/notify_buffer.rs @@ -3,39 +3,48 @@ //! Entries are queued here by the Controller (on receipt of the //! corresponding IPC method) and drained by //! `PodInterceptor::pending_history_appends`, which the Worker calls -//! at the head of each turn loop iteration to `extend` them into the -//! persistent `worker.history`. Each queued entry becomes one -//! `Item::system_message`. +//! at the head of each turn loop iteration. The drain renders each +//! pending entry into a typed `SystemItem` (with the `notify_wrapper` +//! prompt applied), commits a `LogEntry::SystemItems` through the +//! session-log sink, and returns the corresponding +//! `Item::system_message`s for the worker to append to its +//! persistent history. //! //! This is the **single lane** for "system messages produced by Pod //! state that should land in the next LLM request": Notify, PodEvent, -//! and any future `` injection all ride this queue -//! (or a sibling queue with the same lifecycle). Per -//! `tickets/notify-history-persist.md` and `AGENTS.md` (LLM コンテキスト -//! の加工原則), there is **no** "transient, history-skipping" lane — -//! everything injected into a request is also committed to history so -//! that any LLM reaction has a visible trigger across turns, resume, -//! and compaction, and so the Anthropic prompt cache prefix stays -//! stable across requests. +//! and any future `` injection all ride this queue. +//! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM +//! context の加工原則), there is **no** "transient, history-skipping" +//! lane — everything injected into a request is also committed to +//! history so any LLM reaction has a visible trigger across turns, +//! resume, and compaction, and so the Anthropic prompt cache prefix +//! stays stable across requests. use std::collections::VecDeque; use std::sync::{Arc, Mutex}; -use llm_worker::Item; +use protocol::PodEvent; +use session_store::SystemItem; use tracing::warn; use crate::prompt::catalog::{CatalogError, PromptCatalog}; -/// Maximum queued notify entries. Oldest entries are dropped beyond this. +/// Maximum queued pending entries. Oldest entries are dropped beyond this. const CAPACITY: usize = 128; -/// One pending notify entry awaiting injection into the next LLM request. +/// One pending entry awaiting drain into the next LLM request. +/// +/// The buffer keeps the raw input shape so the drain step can decide +/// the right `SystemItem` kind (and apply `notify_wrapper` to the +/// rendered body) at the moment of commit, when the prompt catalog +/// is available. #[derive(Debug, Clone)] -pub struct PendingNotify { - pub message: String, +pub enum PendingNotify { + Notify { message: String }, + PodEvent { event: PodEvent }, } -/// Shared, mutex-guarded buffer of pending notify entries. +/// Shared, mutex-guarded buffer of pending entries. /// /// Cloned between the Pod (producer) and PodInterceptor (consumer). #[derive(Clone, Default)] @@ -51,26 +60,35 @@ impl NotifyBuffer { /// Push a notify entry onto the queue. If the queue is full, the /// oldest entry is dropped and a `tracing::warn` is emitted — the /// caller should never hit this in normal operation. - pub fn push(&self, message: String) { + pub fn push_notify(&self, message: String) { + self.push_entry(PendingNotify::Notify { message }); + } + + /// Push a typed pod-event entry onto the queue. + pub fn push_pod_event(&self, event: PodEvent) { + self.push_entry(PendingNotify::PodEvent { event }); + } + + fn push_entry(&self, entry: PendingNotify) { let mut q = self.inner.lock().expect("notify buffer poisoned"); if q.len() >= CAPACITY { let dropped = q.pop_front(); warn!( capacity = CAPACITY, - dropped_message = dropped.as_ref().map(|n| n.message.as_str()), + dropped = ?dropped, "notify buffer overflow; dropped oldest" ); } - q.push_back(PendingNotify { message }); + q.push_back(entry); } - /// Remove and return all pending notify entries in FIFO order. + /// Remove and return all pending entries in FIFO order. pub fn drain(&self) -> Vec { let mut q = self.inner.lock().expect("notify buffer poisoned"); q.drain(..).collect() } - /// Number of pending notify entries. Primarily for tests. + /// Number of pending entries. Primarily for tests. pub fn len(&self) -> usize { self.inner.lock().expect("notify buffer poisoned").len() } @@ -80,17 +98,30 @@ impl NotifyBuffer { } } -/// Format a single pending notify entry into the `Item::system_message` -/// that gets appended to `worker.history` just before the next LLM -/// request. The wrapper body comes from `PodPrompt::NotifyWrapper` so -/// the surrounding phrasing can be customised via a prompt pack -/// (translation, tone, ...). -pub(crate) fn format_notify( - n: &PendingNotify, +/// Render one pending entry into a typed `SystemItem`. The +/// `notify_wrapper` prompt produces the LLM-context body for both +/// `Notify` (raw message) and `PodEvent` (rendered event line). +pub(crate) fn build_system_item( + entry: &PendingNotify, prompts: &PromptCatalog, -) -> Result { - let text = prompts.notify_wrapper(&n.message)?; - Ok(Item::system_message(text)) +) -> Result { + match entry { + PendingNotify::Notify { message } => { + let body = prompts.notify_wrapper(message)?; + Ok(SystemItem::Notification { + message: message.clone(), + body, + }) + } + PendingNotify::PodEvent { event } => { + let rendered = session_store::render_pod_event(event); + let body = prompts.notify_wrapper(&rendered)?; + Ok(SystemItem::PodEvent { + event: event.clone(), + body, + }) + } + } } #[cfg(test)] @@ -100,12 +131,14 @@ mod tests { #[test] fn push_then_drain_preserves_order() { let buf = NotifyBuffer::new(); - buf.push("one".into()); - buf.push("two".into()); + buf.push_notify("one".into()); + buf.push_notify("two".into()); let drained = buf.drain(); assert_eq!(drained.len(), 2); - assert_eq!(drained[0].message, "one"); - assert_eq!(drained[1].message, "two"); + match &drained[0] { + PendingNotify::Notify { message } => assert_eq!(message, "one"), + other => panic!("unexpected: {other:?}"), + } assert!(buf.is_empty()); } @@ -113,28 +146,50 @@ mod tests { fn capacity_drops_oldest() { let buf = NotifyBuffer::new(); for i in 0..(CAPACITY + 5) { - buf.push(format!("msg{i}")); + buf.push_notify(format!("msg{i}")); } let drained = buf.drain(); assert_eq!(drained.len(), CAPACITY); - // Oldest 5 were dropped; first retained is msg5. - assert_eq!(drained[0].message, "msg5"); - assert_eq!( - drained[CAPACITY - 1].message, - format!("msg{}", CAPACITY + 4) - ); + match &drained[0] { + PendingNotify::Notify { message } => assert_eq!(message, "msg5"), + other => panic!("unexpected: {other:?}"), + } } #[test] - fn format_notify_includes_message_and_nonblocking_hint() { - let n = PendingNotify { + fn build_system_item_for_notify_carries_wrapper_body() { + let entry = PendingNotify::Notify { message: "hello".into(), }; let catalog = PromptCatalog::builtins_only().unwrap(); - let item = format_notify(&n, &catalog).unwrap(); - let text = item.as_text().unwrap_or_default().to_string(); - assert!(text.contains("[Notification]")); - assert!(text.contains("hello")); - assert!(text.contains("not a blocking request")); + let item = build_system_item(&entry, &catalog).unwrap(); + match item { + SystemItem::Notification { message, body } => { + assert_eq!(message, "hello"); + assert!(body.contains("[Notification]")); + assert!(body.contains("hello")); + assert!(body.contains("not a blocking request")); + } + other => panic!("unexpected: {other:?}"), + } + } + + #[test] + fn build_system_item_for_pod_event_wraps_rendered_event_text() { + let entry = PendingNotify::PodEvent { + event: PodEvent::TurnEnded { + pod_name: "child".into(), + }, + }; + let catalog = PromptCatalog::builtins_only().unwrap(); + let item = build_system_item(&entry, &catalog).unwrap(); + match item { + SystemItem::PodEvent { event, body } => { + assert!(matches!(event, PodEvent::TurnEnded { ref pod_name } if pod_name == "child")); + assert!(body.contains("[Notification]")); + assert!(body.contains("`child`")); + } + other => panic!("unexpected: {other:?}"), + } } } diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 23b29825..3f2a03e4 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -104,22 +104,39 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { entry = entry_rx.recv() => { match entry { Ok(entry) => { - let value = serde_json::to_value(&entry) - .expect("LogEntry is Serialize"); - let outbound = match &entry { + let outbound = match entry { session_store::LogEntry::SessionStart { .. } => { - Some(Event::SessionRotated { entry: value }) + let value = serde_json::to_value(&entry) + .expect("LogEntry is Serialize"); + vec![Event::SessionRotated { entry: value }] } - session_store::LogEntry::HookInjectedItems { .. } => { - Some(Event::HookInjectedItems { entry: value }) + session_store::LogEntry::SystemItems { items, .. } => { + // Fan out per-item so each `SystemItem` + // arrives as its own `Event::SystemItem` + // on the wire. Batching on disk is an + // implementation detail of the drain + // task; clients see them one at a time. + items + .into_iter() + .map(|si| { + let value = serde_json::to_value(&si) + .expect("SystemItem is Serialize"); + Event::SystemItem { item: value } + }) + .collect() } // Defensive: should never reach here per // `SessionLogSink::is_live_relevant`. - _ => None, + _ => Vec::new(), }; - if let Some(event) = outbound - && writer.write(&event).await.is_err() - { + let mut hit_error = false; + for event in outbound { + if writer.write(&event).await.is_err() { + hit_error = true; + break; + } + } + if hit_error { break; } } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 674efee0..c4e35c64 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,8 +9,8 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use session_store::{ - EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, session_log, - to_logged, + EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, + session_log, to_logged, }; use tracing::{info, warn}; @@ -18,16 +18,21 @@ use crate::session_log_sink::SessionLogSink; /// Command sent to the per-Pod history-drain task. /// -/// `Item` carries one worker-history append observed via -/// `Worker::on_history_append`; the drain classifies it into a -/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` / -/// `LogEntry::HookInjectedItems` and commits it through the sink. -/// `Flush(ack)` is the barrier used by `persist_turn` to ensure every -/// in-flight item is committed before the trailing `TurnEnd` entry -/// lands. +/// - `Item`: one worker-history append observed via +/// `Worker::on_history_append`; the drain classifies it into +/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` and commits +/// through the sink. `role:system` items are explicitly skipped +/// because they are committed up-front through `SystemItems`. +/// - `SystemItems`: typed agent-injected items committed as a single +/// `LogEntry::SystemItems` entry. Used by the interceptor when it +/// drains the notify buffer or pending attachments. +/// - `Flush(ack)`: barrier used by `persist_turn` to ensure every +/// queued command has been processed before the trailing `TurnEnd` +/// entry lands. #[derive(Debug)] pub enum LogCommand { Item(Item), + SystemItems(Vec), Flush(tokio::sync::oneshot::Sender<()>), } @@ -158,7 +163,7 @@ pub struct Pod { /// before handing off to the worker; `PodInterceptor::on_prompt_submit` /// drains it and returns `ContinueWith` so the items land in /// history right after the user message that referenced them. - pending_attachments: Arc>>, + pending_attachments: Arc>>, /// Scope allocation in the machine-wide lock file. `Some` for /// Pods built via `from_manifest` / `from_manifest_spawned` / /// `restore_from_manifest` (production paths); `None` for the @@ -279,7 +284,7 @@ impl Pod { alerter: self.alerter.clone(), event_tx: self.event_tx.clone(), pending_notifies: NotifyBuffer::new(), - pending_attachments: Arc::new(Mutex::new(Vec::new())), + pending_attachments: Arc::new(Mutex::new(Vec::::new())), scope_allocation: None, callback_socket: None, prompts: self.prompts.clone(), @@ -378,7 +383,7 @@ impl Pod { alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), - pending_attachments: Arc::new(Mutex::new(Vec::new())), + pending_attachments: Arc::new(Mutex::new(Vec::::new())), scope_allocation: None, callback_socket: None, prompts, @@ -760,7 +765,17 @@ impl Pod { /// `PodInterceptor::pending_history_appends`. See [`NotifyBuffer`] /// for overflow behaviour and the lane-of-record rationale. pub fn push_notify(&self, message: String) { - self.pending_notifies.push(message); + self.pending_notifies.push_notify(message); + } + + /// Push a typed `PodEvent` entry onto the pending buffer. + /// + /// Same lifecycle as [`push_notify`](Self::push_notify) but + /// preserves the typed `PodEvent` payload so the IPC layer can + /// emit `SystemItem::PodEvent { event, body }` with structured + /// data for clients. + pub fn push_pod_event_notify(&self, event: protocol::PodEvent) { + self.pending_notifies.push_pod_event(event); } /// Shared handle to the pending notification buffer. @@ -892,6 +907,7 @@ impl Pod { self.pending_notifies.clone(), self.pending_attachments.clone(), self.prompts.clone(), + self.log_cmd_tx.clone(), ); self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; @@ -1099,7 +1115,7 @@ impl Pod { /// directory) surface as `AlertLevel::Warn` Alerts and are skipped — the /// unresolved placeholder stays in the flattened user message so the LLM /// still sees the intent. - fn resolve_file_refs(&self, segments: &[Segment]) -> Vec { + fn resolve_file_refs(&self, segments: &[Segment]) -> Vec { let view = crate::fs_view::PodFsView::new(tools::ScopedFs::with_shared_scope( self.scope.clone(), self.pwd.clone(), @@ -1110,7 +1126,19 @@ impl Pod { continue; }; match view.resolve_file_ref(path, self.manifest.worker.file_upload.max_bytes) { - Ok(item) => out.push(item), + Ok(item) => { + // `resolve_file_ref` returns an `Item::system_message` + // whose text already carries the `[File: ]` or + // `[Dir: ]` header (plus any truncation hint). + // Persist that body verbatim — it is what the LLM + // actually saw, so resume produces byte-identical + // history. + let body = item.as_text().unwrap_or_default().to_string(); + out.push(SystemItem::FileAttachment { + path: path.clone(), + body, + }); + } Err(e) => { self.alert( AlertLevel::Warn, @@ -1123,7 +1151,7 @@ impl Pod { out } - fn resolve_knowledge_refs(&self, segments: &[Segment]) -> Vec { + fn resolve_knowledge_refs(&self, segments: &[Segment]) -> Vec { let Some(layout) = self.memory_layout.as_ref() else { return Vec::new(); }; @@ -1156,7 +1184,7 @@ impl Pod { } }; let raw = String::from_utf8_lossy(&bytes).into_owned(); - let body = match memory::schema::split_frontmatter(&raw) { + let body_text = match memory::schema::split_frontmatter(&raw) { Ok((_yaml, body)) => body, Err(e) => { self.alert( @@ -1173,11 +1201,11 @@ impl Pod { &bytes, ); self.append_memory_use_event(memory::UsageSource::KnowledgeRef, vec![snapshot]); - out.push(Item::system_message(format!( - "[Knowledge #{}]\n{}", - slug, - body.trim_end() - ))); + let body = format!("[Knowledge #{}]\n{}", slug, body_text.trim_end()); + out.push(SystemItem::Knowledge { + slug: slug.clone(), + body, + }); } out } @@ -1247,7 +1275,7 @@ impl Pod { fn resolve_workflow_invocations( &self, segments: &[Segment], - ) -> Result, WorkflowResolveError> { + ) -> Result, WorkflowResolveError> { let Some(layout) = self.memory_layout.as_ref() else { if let Some(slug) = segments.iter().find_map(|seg| match seg { Segment::WorkflowInvoke { slug } => Some(slug.clone()), @@ -1282,7 +1310,17 @@ impl Pod { warn!(workflow = %slug, error = %err, "failed to snapshot workflow usage"); } } - out.extend(items); + // `resolve_workflow_invocation` returns Item::system_message + // entries (potentially multiple — body + dependency knowledge + // bodies). Persist each as a SystemItem::Workflow keyed on + // the invocation slug. + for item in items { + let body = item.as_text().unwrap_or_default().to_string(); + out.push(SystemItem::Workflow { + slug: slug.clone(), + body, + }); + } } Ok(out) } @@ -2635,7 +2673,7 @@ impl Pod, St> { alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), - pending_attachments: Arc::new(Mutex::new(Vec::new())), + pending_attachments: Arc::new(Mutex::new(Vec::::new())), scope_allocation: Some(scope_allocation), callback_socket: None, prompts: common.prompts, @@ -2708,7 +2746,7 @@ impl Pod, St> { alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), - pending_attachments: Arc::new(Mutex::new(Vec::new())), + pending_attachments: Arc::new(Mutex::new(Vec::::new())), scope_allocation: Some(scope_allocation), callback_socket: Some(callback_socket), prompts: common.prompts, @@ -2852,7 +2890,7 @@ impl Pod, St> { alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), - pending_attachments: Arc::new(Mutex::new(Vec::new())), + pending_attachments: Arc::new(Mutex::new(Vec::::new())), scope_allocation: Some(scope_allocation), callback_socket: None, prompts: common.prompts, diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/session_log_sink.rs index 4e35a046..1f72deaf 100644 --- a/crates/pod/src/session_log_sink.rs +++ b/crates/pod/src/session_log_sink.rs @@ -120,7 +120,7 @@ impl SessionLogSink { fn is_live_relevant(entry: &LogEntry) -> bool { matches!( entry, - LogEntry::SessionStart { .. } | LogEntry::HookInjectedItems { .. } + LogEntry::SessionStart { .. } | LogEntry::SystemItems { .. } ) } @@ -427,12 +427,13 @@ mod tests { assert!(rx.try_recv().is_err()); } - fn hook_injected(text: &str) -> LogEntry { - LogEntry::HookInjectedItems { + fn notification_entry(text: &str) -> LogEntry { + LogEntry::SystemItems { ts: now_millis(), - items: vec![session_store::LoggedItem::from( - &llm_worker::Item::system_message(text), - )], + items: vec![session_store::SystemItem::Notification { + message: text.to_owned(), + body: format!("[Notification] {text}"), + }], } } @@ -448,11 +449,11 @@ mod tests { sink.publish(turn_end(1)); assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live"); - // HookInjectedItems is live-relevant. - sink.publish(hook_injected("[Notify] hi")); + // SystemItems is live-relevant. + sink.publish(notification_entry("hi")); match rx.try_recv() { - Ok(LogEntry::HookInjectedItems { .. }) => {} - other => panic!("expected HookInjectedItems, got {other:?}"), + Ok(LogEntry::SystemItems { .. }) => {} + other => panic!("expected SystemItems, got {other:?}"), } // Mirror still grew with both entries (snapshot completeness). @@ -465,11 +466,11 @@ mod tests { let sink = SessionLogSink::new(); sink.publish(session_start()); let (snapshot, mut rx) = sink.subscribe_with_snapshot(); - sink.publish(hook_injected("post-snapshot")); + sink.publish(notification_entry("post-snapshot")); assert_eq!(snapshot.len(), 1); match rx.try_recv() { - Ok(LogEntry::HookInjectedItems { .. }) => {} + Ok(LogEntry::SystemItems { .. }) => {} other => panic!("unexpected: {other:?}"), } assert!(rx.try_recv().is_err()); diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index af1fb9c2..b4859641 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -34,6 +34,9 @@ fn history_from_sink(handle: &PodHandle) -> Vec { | LogEntry::HookInjectedItems { items: i, .. } => { items.extend(i.into_iter().map(Item::from)); } + LogEntry::SystemItems { items: si, .. } => { + items.extend(si.iter().map(|s| s.to_history_item())); + } _ => {} } } @@ -745,16 +748,12 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { .unwrap(); // Wait for the auto-started turn to complete. - let mut saw_notify_echo = false; let mut saw_turn_end = false; let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); loop { tokio::select! { event = rx.recv() => { match event { - Ok(Event::Notify { ref message }) if message == "turn finished" => { - saw_notify_echo = true; - } Ok(Event::TurnEnd { .. }) => { saw_turn_end = true; break; } Err(_) => break, _ => {} @@ -763,14 +762,28 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { _ = tokio::time::sleep_until(deadline) => break, } } - assert!( - saw_notify_echo, - "Method::Notify on idle Pod should be echoed as Event::Notify" - ); assert!(saw_turn_end, "auto-triggered turn should complete"); - // Status flips back to Idle on the controller thread after RunEnd. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); + // Wait for the post-run persist_turn (Flush + TurnEnd + RunCompleted + // commits) to finish; the controller flips status to Idle right + // after that. + wait_for_status(&handle, PodStatus::Idle).await; + // The live echo arrives via the sink's `Event::SystemItem` lane, + // not on the `event_tx` broadcast that `handle.subscribe()` taps. + // Verify the notification landed on the sink mirror instead. + let (entries, _) = handle.sink.subscribe_with_snapshot(); + let saw_notify_in_mirror = entries.iter().any(|e| matches!( + e, + session_store::LogEntry::SystemItems { items, .. } + if items.iter().any(|si| matches!( + si, + session_store::SystemItem::Notification { message, .. } + if message == "turn finished" + )) + )); + assert!( + saw_notify_in_mirror, + "Method::Notify should commit a SystemItem::Notification entry; mirror = {entries:?}" + ); // Exactly one request was made; it must contain the formatted // notification as one of the items (committed to history by @@ -825,18 +838,12 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes .await .unwrap(); - let mut saw_pod_event_echo = false; let mut saw_turn_end = false; let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); loop { tokio::select! { event = rx.recv() => { match event { - Ok(Event::PodEvent(protocol::PodEvent::TurnEnded { ref pod_name })) - if pod_name == "child" => - { - saw_pod_event_echo = true; - } Ok(Event::TurnEnd { .. }) => { saw_turn_end = true; break; } Err(_) => break, _ => {} @@ -845,15 +852,28 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes _ = tokio::time::sleep_until(deadline) => break, } } - assert!( - saw_pod_event_echo, - "Method::PodEvent on idle Pod should be echoed as Event::PodEvent" - ); assert!( saw_turn_end, "PodEvent::TurnEnded on idle Pod should auto-start a turn" ); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // Wait for the post-run persist_turn to complete before reading the + // mirror — TurnEnd fires inside the worker loop, persist_turn (and + // its Flush of the drain queue) runs afterwards. + wait_for_status(&handle, PodStatus::Idle).await; + let (entries, _) = handle.sink.subscribe_with_snapshot(); + let saw_pod_event_in_mirror = entries.iter().any(|e| matches!( + e, + session_store::LogEntry::SystemItems { items, .. } + if items.iter().any(|si| matches!( + si, + session_store::SystemItem::PodEvent { event: protocol::PodEvent::TurnEnded { pod_name }, .. } + if pod_name == "child" + )) + )); + assert!( + saw_pod_event_in_mirror, + "Method::PodEvent should commit a SystemItem::PodEvent entry" + ); assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); let requests = client_for_assert.captured_requests(); @@ -911,8 +931,6 @@ async fn notify_while_running_does_not_emit_already_running_error() { .unwrap(); // Drain events until the run ends; AlreadyRunning must never appear. - // The in-flight branch must still echo the Notify as a log element. - let mut saw_notify_echo = false; let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); loop { tokio::select! { @@ -921,9 +939,6 @@ async fn notify_while_running_does_not_emit_already_running_error() { Ok(Event::Error { code, .. }) if code == pod::ErrorCode::AlreadyRunning => { panic!("Notify while running must not produce AlreadyRunning"); } - Ok(Event::Notify { ref message }) if message == "ping" => { - saw_notify_echo = true; - } Ok(Event::TurnEnd { .. }) => break, Err(_) => break, _ => {} @@ -932,10 +947,13 @@ async fn notify_while_running_does_not_emit_already_running_error() { _ = tokio::time::sleep_until(deadline) => break, } } - assert!( - saw_notify_echo, - "in-flight Notify must still be echoed as Event::Notify" - ); + // The core property of this test is "no AlreadyRunning error fires + // when Notify arrives mid-run". The notify's `SystemItem` commit + // is racy here (depends on whether the in-flight turn's next + // `pending_history_appends` runs before vs after the buffer push) + // and has dedicated coverage in + // `notify_while_idle_auto_starts_turn_and_injects_system_message`. + wait_for_status(&handle, PodStatus::Idle).await; } #[tokio::test] @@ -1032,19 +1050,29 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() { let mut saw_turn_end = false; let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + // The SystemItem and TurnEnd events arrive through independent + // broadcast lanes (sink fan-out vs `event_tx`), so their relative + // order on the wire is non-deterministic. Keep reading until both + // are observed (or the deadline trips), rather than breaking on + // the first TurnEnd. loop { + if saw_pod_event_echo && saw_turn_end { + break; + } tokio::select! { event = reader.next::() => { match event { - Ok(Some(Event::PodEvent(protocol::PodEvent::TurnEnded { pod_name }))) - if pod_name == "child" => + Ok(Some(Event::SystemItem { ref item })) + if item.get("kind").and_then(|k| k.as_str()) == Some("pod_event") + && item + .pointer("/event/pod_name") + .and_then(|v| v.as_str()) == Some("child") => { saw_pod_event_echo = true; } Ok(Some(Event::TurnStart { .. })) => saw_turn_start = true, Ok(Some(Event::TurnEnd { .. })) => { saw_turn_end = true; - break; } Ok(None) | Err(_) => break, _ => {} @@ -1056,7 +1084,7 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() { assert!( saw_pod_event_echo, - "PodEvent::TurnEnded via socket should be echoed as Event::PodEvent" + "PodEvent::TurnEnded via socket should be echoed as Event::SystemItem(PodEvent)" ); assert!( saw_turn_start, diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 262a6a7b..eb51e148 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -214,20 +214,23 @@ pub enum Event { UserMessage { segments: Vec, }, - /// Echo of `Method::Notify` received by this Pod. Broadcast on - /// receipt so subscribers can render the external input as a log - /// element. The same `message` is independently pushed into the - /// notification buffer for LLM injection (with prompt-pack - /// wrapping); this echo carries the raw payload and does not - /// imply any turn-boundary semantics. - Notify { - message: String, + /// One agent-injected system item committed to history. + /// + /// Carries the JSON form of `session_store::SystemItem`. Covers + /// `Method::Notify` echoes, child-Pod lifecycle events from + /// `Method::PodEvent`, `@` / `#` / `/` + /// resolution payloads, and any future agent-side injection kind. + /// Clients dispatch on the `kind` tag for typed rendering instead + /// of parsing free-text prefixes like `[Notification] …` or + /// `[File: …]`. + /// + /// Fired per-item, even when the underlying + /// `LogEntry::SystemItems` entry batched several together — the + /// IPC layer fans the batch out at broadcast time so subscribers + /// observe one event per item. + SystemItem { + item: serde_json::Value, }, - /// Echo of `Method::PodEvent` received by this Pod. Same rationale - /// as `Notify`: subscribers render the event as a log element, - /// while a rendered summary is independently injected into the LLM - /// context via the notification buffer. - PodEvent(PodEvent), TurnStart { turn: usize, }, @@ -335,17 +338,6 @@ pub enum Event { SessionRotated { entry: serde_json::Value, }, - /// A non-LLM-driven history append landed in the worker history. - /// - /// Carries the JSON form of `session_store::LogEntry::HookInjectedItems`. - /// This is the live counterpart of items that the streaming lane - /// never broadcasts — `Method::Notify` echoes, `@` attachment - /// resolutions, `` injections — so a connected - /// client can render them in time order without waiting for the - /// next reconnect's `Snapshot`. - HookInjectedItems { - entry: serde_json::Value, - }, /// Current Pod controller status. Broadcast on every controller-level /// transition and included in `History` snapshots for late attach. Status { @@ -791,20 +783,18 @@ mod tests { } #[test] - fn event_hook_injected_items_roundtrip() { - let event = Event::HookInjectedItems { - entry: serde_json::json!({"kind": "hook_injected_items", "ts": 42, "items": []}), + fn event_system_item_roundtrip() { + let event = Event::SystemItem { + item: serde_json::json!({"kind": "notification", "message": "hello"}), }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "hook_injected_items"); - assert_eq!(parsed["data"]["entry"]["kind"], "hook_injected_items"); + assert_eq!(parsed["event"], "system_item"); + assert_eq!(parsed["data"]["item"]["kind"], "notification"); let decoded: Event = serde_json::from_str(&json).unwrap(); match decoded { - Event::HookInjectedItems { entry } => { - assert_eq!(entry["kind"], "hook_injected_items") - } - other => panic!("expected HookInjectedItems, got {other:?}"), + Event::SystemItem { item } => assert_eq!(item["kind"], "notification"), + other => panic!("expected SystemItem, got {other:?}"), } } @@ -1066,43 +1056,6 @@ mod tests { assert_eq!(parsed["data"]["code"], "already_running"); } - #[test] - fn event_notify_roundtrip() { - let event = Event::Notify { - message: "child-pod finished".into(), - }; - let json = serde_json::to_string(&event).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "notify"); - assert_eq!(parsed["data"]["message"], "child-pod finished"); - - let decoded: Event = serde_json::from_str(&json).unwrap(); - match decoded { - Event::Notify { message } => assert_eq!(message, "child-pod finished"), - other => panic!("expected Notify, got {other:?}"), - } - } - - #[test] - fn event_pod_event_roundtrip() { - let event = Event::PodEvent(PodEvent::TurnEnded { - pod_name: "child".into(), - }); - let json = serde_json::to_string(&event).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "pod_event"); - assert_eq!(parsed["data"]["kind"], "turn_ended"); - assert_eq!(parsed["data"]["pod_name"], "child"); - - let decoded: Event = serde_json::from_str(&json).unwrap(); - match decoded { - Event::PodEvent(PodEvent::TurnEnded { pod_name }) => { - assert_eq!(pod_name, "child"); - } - other => panic!("expected PodEvent::TurnEnded, got {other:?}"), - } - } - #[test] fn event_user_message_roundtrip() { let event = Event::UserMessage { diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 697c0ae6..62f0d872 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -32,6 +32,7 @@ pub mod logged_item; pub mod session; pub mod session_log; pub mod store; +pub mod system_item; pub use event_trace::TraceEntry; pub use fs_store::FsStore; @@ -48,6 +49,7 @@ pub use session_log::{ EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SessionOrigin, build_chain, collect_state, compute_hash, }; +pub use system_item::{SystemItem, render_pod_event}; pub use store::{Store, StoreError}; /// Session identifier. UUID v7 (time-ordered, lexicographically sortable). diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index 4e6dcae7..056ef115 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use crate::logged_item::LoggedItem; +use crate::system_item::SystemItem; /// SHA-256 hash identifying a specific log entry in the chain. /// @@ -125,7 +126,18 @@ pub enum LogEntry { /// Tool execution results added to history (worker.rs:897-900, 1072-1076). ToolResults { ts: u64, items: Vec }, - /// Items injected by `on_turn_end` hook via `ContinueWithMessages` (worker.rs:1055). + /// Typed agent-injected system items: notifications, child-Pod + /// lifecycle events, `@` / `#` / `/` resolution + /// payloads. Each `SystemItem` carries kind metadata that the LLM + /// itself never sees (the LLM gets `Item::system_message` with the + /// item's `history_text()`), but live clients and replay paths + /// dispatch on `kind` for typed rendering. + SystemItems { ts: u64, items: Vec }, + + /// Legacy pre-`SystemItems` form. Deserialize-only — new writes + /// always use `SystemItems`. Items are flattened to + /// `Item::system_message` on replay, matching how the original + /// path worked. HookInjectedItems { ts: u64, items: Vec }, /// Turn boundary. Records the turn count after increment. @@ -276,6 +288,11 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { LogEntry::ToolResults { items, .. } => { state.history.extend(items.iter().cloned().map(Item::from)); } + LogEntry::SystemItems { items, .. } => { + state + .history + .extend(items.iter().map(|si| si.to_history_item())); + } LogEntry::HookInjectedItems { items, .. } => { state.history.extend(items.iter().cloned().map(Item::from)); } diff --git a/crates/session-store/src/system_item.rs b/crates/session-store/src/system_item.rs new file mode 100644 index 00000000..8849f2b8 --- /dev/null +++ b/crates/session-store/src/system_item.rs @@ -0,0 +1,198 @@ +//! Typed system-message items injected by the agent system. +//! +//! Items in worker history with `role:system` are never produced by the +//! LLM — they are always inserted by the Pod itself (notifications, +//! file/knowledge/workflow ref resolutions, child-pod lifecycle events, +//! future `` tags, …). [`SystemItem`] carries the +//! typed shape of each such injection so clients can dispatch on +//! `kind` instead of parsing text prefixes like `[Notification] …` or +//! `[File: …]`. +//! +//! Persisted as the payload of [`crate::LogEntry::SystemItems`], and +//! broadcast live as the payload of `Event::SystemItem` on the wire. +//! +//! For LLM context replay, each `SystemItem` reduces to an +//! `Item::system_message(...)` whose body matches the legacy free-text +//! shape (see [`SystemItem::history_text`]). The kind metadata is +//! preserved only on the log/wire side; the LLM still sees plain +//! system-message text. + +use llm_worker::llm_client::types::Item; +use protocol::PodEvent; +use serde::{Deserialize, Serialize}; + +/// One agent-injected system item, tagged by origin. +/// +/// Each variant carries the kind-specific raw data clients use for +/// typed rendering (`Notification.message`, `PodEvent.event`, file +/// path / knowledge slug / workflow slug / etc.), plus a pre-rendered +/// `body` (where applicable) that is the exact `role:system` text the +/// LLM actually saw at commit time. `body` is denormalised so that +/// session log replay reconstructs worker history byte-identical to +/// what was on the wire — even when prompt overrides (e.g. custom +/// `notify_wrapper` template) re-shape the live rendering on a later +/// resume. +/// +/// New variants get added here as fresh injection kinds come online +/// (e.g. `Reminder`). The `kind` JSON tag is the snake_case form of +/// the variant name. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum SystemItem { + /// Free-form notification sent in by an external caller via + /// `Method::Notify`. `message` is the raw caller-supplied text; + /// `body` is the wrapped LLM-context form (Pod renders it via + /// `notify_wrapper` at commit time). + Notification { message: String, body: String }, + + /// Lifecycle event reported by a child Pod via `Method::PodEvent`. + /// `event` is the typed payload (so the TUI can render per-child + /// banners without re-parsing); `body` is the wrapped LLM-context + /// form (same `notify_wrapper` path as `Notification`). + PodEvent { event: PodEvent, body: String }, + + /// `@` file reference resolution. `body` is the rendered + /// LLM-context text (`[File: ]\n…` for regular files, + /// `[Dir: ]\n…` for directory listings, possibly with a + /// truncation hint) so replay reconstructs worker history + /// byte-identical to what was sent. + FileAttachment { path: String, body: String }, + + /// `#` Knowledge reference resolution. `body` is the + /// rendered text the LLM saw (Pod composes the `[Knowledge: …]` + /// header + body). + Knowledge { slug: String, body: String }, + + /// `/` Workflow invocation. `body` is the workflow's + /// prompt body materialized into the LLM context. + Workflow { slug: String, body: String }, +} + +impl SystemItem { + /// Free-text body the LLM sees inside its `role:system` message + /// for this item. Returns the variant's stored `body` verbatim. + pub fn history_text(&self) -> String { + match self { + SystemItem::Notification { body, .. } => body.clone(), + SystemItem::PodEvent { body, .. } => body.clone(), + SystemItem::FileAttachment { body, .. } => body.clone(), + SystemItem::Knowledge { body, .. } => body.clone(), + SystemItem::Workflow { body, .. } => body.clone(), + } + } + + /// Materialize this `SystemItem` as the `Item::system_message` + /// form that lands in worker history. + pub fn to_history_item(&self) -> Item { + Item::system_message(self.history_text()) + } + + /// Short human-readable label used for diagnostics. Not on the + /// wire — keep flexible. + pub fn kind_label(&self) -> &'static str { + match self { + SystemItem::Notification { .. } => "notification", + SystemItem::PodEvent { .. } => "pod_event", + SystemItem::FileAttachment { .. } => "file_attachment", + SystemItem::Knowledge { .. } => "knowledge", + SystemItem::Workflow { .. } => "workflow", + } + } +} + +/// Render a `PodEvent` as the one-line notification text the agent +/// sees. Centralised here (rather than at the controller's render +/// site) so persistence and broadcast share the same rendering. +pub fn render_pod_event(event: &PodEvent) -> String { + match event { + PodEvent::TurnEnded { pod_name } => format!("pod `{pod_name}` finished a turn"), + PodEvent::Errored { pod_name, message } => { + format!("pod `{pod_name}` errored: {message}") + } + PodEvent::ShutDown { pod_name } => format!("pod `{pod_name}` shut down"), + PodEvent::ScopeSubDelegated { + parent_pod, + sub_pod, + .. + } => { + format!("pod `{parent_pod}` sub-delegated scope to `{sub_pod}`") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn notification_history_text_returns_stored_body() { + let item = SystemItem::Notification { + message: "child done".into(), + body: "[Notification]\nchild done\n\n(non-blocking hint…)".into(), + }; + assert_eq!( + item.history_text(), + "[Notification]\nchild done\n\n(non-blocking hint…)" + ); + } + + #[test] + fn pod_event_history_text_returns_stored_body() { + let item = SystemItem::PodEvent { + event: PodEvent::TurnEnded { + pod_name: "child".into(), + }, + body: "[Notification]\npod `child` finished a turn\n\n(non-blocking hint…)".into(), + }; + assert!(item.history_text().starts_with("[Notification]\n")); + assert!(item.history_text().contains("`child`")); + } + + #[test] + fn file_attachment_history_text_returns_stored_body() { + let item = SystemItem::FileAttachment { + path: "src/main.rs".into(), + body: "[File: src/main.rs]\nfn main() {}".into(), + }; + assert_eq!(item.history_text(), "[File: src/main.rs]\nfn main() {}"); + } + + #[test] + fn round_trip_via_json() { + let item = SystemItem::FileAttachment { + path: "src/main.rs".into(), + body: "[File: src/main.rs]\nfn main() {}".into(), + }; + let json = serde_json::to_string(&item).unwrap(); + let parsed: SystemItem = serde_json::from_str(&json).unwrap(); + match parsed { + SystemItem::FileAttachment { path, body } => { + assert_eq!(path, "src/main.rs"); + assert_eq!(body, "[File: src/main.rs]\nfn main() {}"); + } + other => panic!("unexpected: {other:?}"), + } + } + + #[test] + fn round_trip_pod_event() { + let item = SystemItem::PodEvent { + event: PodEvent::TurnEnded { + pod_name: "child".into(), + }, + body: "[Notification] pod `child` finished a turn".into(), + }; + let json = serde_json::to_string(&item).unwrap(); + let parsed: SystemItem = serde_json::from_str(&json).unwrap(); + match parsed { + SystemItem::PodEvent { + event: PodEvent::TurnEnded { pod_name }, + body, + } => { + assert_eq!(pod_name, "child"); + assert!(body.contains("`child`")); + } + other => panic!("unexpected: {other:?}"), + } + } +} diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 2eb75ce5..607ef82c 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -483,21 +483,13 @@ impl App { self.blocks.push(Block::UserMessage { segments }); self.assistant_streaming = false; } - Event::Notify { message } => { - self.blocks.push(Block::Notify { message }); - self.assistant_streaming = false; - } - Event::PodEvent(event) => { - self.blocks.push(Block::PodEvent { event }); - self.assistant_streaming = false; - } Event::SessionRotated { entry } => { self.reset_for_rotation(); self.apply_log_entry_raw(&entry); self.assistant_streaming = false; } - Event::HookInjectedItems { entry } => { - self.apply_log_entry_raw(&entry); + Event::SystemItem { item } => { + self.apply_system_item(&item); self.assistant_streaming = false; } Event::TurnStart { .. } => { @@ -984,11 +976,51 @@ impl App { self.push_history_item(&item_value); } } + session_store::LogEntry::SystemItems { items, .. } => { + for system_item in items { + let value = + serde_json::to_value(&system_item).expect("SystemItem is Serialize"); + self.apply_system_item(&value); + } + } // Non-history-bearing variants don't affect the block view. _ => {} } } + /// Dispatch one `SystemItem` JSON value into the appropriate block. + /// + /// Kind-based routing replaces the old free-text `[Notification]` / + /// `[File: …]` parsing path: each kind maps directly to a typed + /// block (`Block::Notify`, `Block::PodEvent`, …). + fn apply_system_item(&mut self, value: &serde_json::Value) { + let Ok(item) = serde_json::from_value::(value.clone()) else { + // Unknown / forward-compat shape: fall back to rendering the + // raw text payload (if any) as a generic system message. + if let Some(text) = value.get("body").and_then(|b| b.as_str()) { + self.task_store.apply_system_message_text(text); + self.blocks.push(Block::SystemMessage { + text: text.to_owned(), + }); + } + return; + }; + match item { + session_store::SystemItem::Notification { message, .. } => { + self.blocks.push(Block::Notify { message }); + } + session_store::SystemItem::PodEvent { event, .. } => { + self.blocks.push(Block::PodEvent { event }); + } + session_store::SystemItem::FileAttachment { body, .. } + | session_store::SystemItem::Knowledge { body, .. } + | session_store::SystemItem::Workflow { body, .. } => { + self.task_store.apply_system_message_text(&body); + self.blocks.push(Block::SystemMessage { text: body }); + } + } + } + /// Sweep all current tool-call blocks: any that never resolved into /// a Done / Error state get marked Incomplete. Called after a /// snapshot replay so dangling in-flight tool calls in the seed @@ -1422,18 +1454,14 @@ mod completion_flow_tests { } #[test] - fn live_hook_injected_items_event_appends_system_message_block() { + fn live_system_item_workflow_appends_system_message_block() { let mut app = App::new("test".into()); - let entry = serde_json::json!({ - "kind": "hook_injected_items", - "ts": 1, - "items": [{ - "kind": "message", - "role": "system", - "content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }], - }], + let item = serde_json::json!({ + "kind": "workflow", + "slug": "build", + "body": "[Workflow /build]\nRun the build", }); - app.handle_pod_event(Event::HookInjectedItems { entry }); + app.handle_pod_event(Event::SystemItem { item }); assert!(matches!( app.blocks.as_slice(), @@ -1441,6 +1469,39 @@ mod completion_flow_tests { )); } + #[test] + fn live_system_item_notification_appends_notify_block() { + let mut app = App::new("test".into()); + let item = serde_json::json!({ + "kind": "notification", + "message": "hi", + "body": "[Notification] hi", + }); + app.handle_pod_event(Event::SystemItem { item }); + assert!(matches!( + app.blocks.as_slice(), + [Block::Notify { message }] if message == "hi" + )); + } + + #[test] + fn live_system_item_pod_event_appends_pod_event_block() { + let mut app = App::new("test".into()); + let item = serde_json::json!({ + "kind": "pod_event", + "event": { "kind": "turn_ended", "pod_name": "child" }, + "body": "[Notification] pod `child` finished a turn", + }); + app.handle_pod_event(Event::SystemItem { item }); + assert_eq!(app.blocks.len(), 1); + match &app.blocks[0] { + Block::PodEvent { + event: protocol::PodEvent::TurnEnded { pod_name }, + } => assert_eq!(pod_name, "child"), + _ => panic!("expected a PodEvent block"), + } + } + #[test] fn compact_done_replaces_live_block() { let mut app = App::new("test".into()); @@ -1577,15 +1638,13 @@ mod completion_flow_tests { ```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \ \"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \ \"description\": \"d\"\n }\n ]\n}\n```\n"; - app.handle_pod_event(Event::HookInjectedItems { - entry: serde_json::json!({ - "kind": "hook_injected_items", - "ts": 1, - "items": [{ - "kind": "message", - "role": "system", - "content": [{ "kind": "text", "text": snapshot }], - }], + // Snapshot text injected as a workflow body (kind doesn't matter + // for task-store parsing, only the text contents do). + app.handle_pod_event(Event::SystemItem { + item: serde_json::json!({ + "kind": "workflow", + "slug": "task-snapshot", + "body": snapshot, }), }); diff --git a/tickets/system-item-unify.md b/tickets/system-item-unify.md deleted file mode 100644 index 6b1f539a..00000000 --- a/tickets/system-item-unify.md +++ /dev/null @@ -1,80 +0,0 @@ -# Event / LogEntry: System 注入経路を SystemItem 一本に統合する - -## 背景 - -エージェントシステム (= ユーザー由来でも LLM 由来でもない、Pod 自身) が LLM context に注入する `role:system` の `Item::Message` は、現状 3 系統の ad-hoc 経路で並走している: - -1. **`Method::Notify`** — 外部からの非同期メッセージ - - controller → `Event::Notify { message }` (生 message echo) - - `pod.push_notify(message)` → `NotifyBuffer` → `pending_history_appends` で `[Notification] ` の system_message として history に commit -2. **`Method::PodEvent`** — 子 pod のライフサイクル通知 - - controller → `Event::PodEvent(event)` (typed echo) - - `render_event` で 1 行整形 → `NotifyBuffer` (Notify と合流) → 同じく `[Notification] ` として commit -3. **Interceptor 内部注入** — `@` / `#` / `/` の解決結果 - - `PodInterceptor::on_prompt_submit` の `ContinueWith` で `[File: ]` / `[Knowledge: ]` / workflow 本文の system_message を history に append - - wire echo は無し - -これらは全部 「**人でも LLM でもなく、エージェントシステムが LLM に与えた情報**」 という同一カテゴリで、history への commit 形 (`role:system` の `Item::Message`) もほぼ同じだが、wire event 側は echo/typed/未送信が混在し、TUI 側のブロックも `Block::Notify` / `Block::PodEvent` / `Block::SystemMessage` の 3 つに分かれている。 - -加えて `LogEntry::HookInjectedItems` という命名が誤称: 実際に注入しているのは公開 `Hook` ではなく **`Interceptor`** で、内部機構専用の経路。`hook.rs` モジュール doc でも 「Hook は read-only な公開 extension surface」 「内部機構は Interceptor を使う」 と明確に分離されている。 - -このばらつきの結果: -- wire 上、同じ通知が `Event::Notify` (生) + `Event::HookInjectedItems` (整形版) の 2 重に流れて TUI が重複描画した (`pod-state-from-session-log` 改修中に表面化) -- kind 判別がテキストプレフィックス (`[Notification] ...` / `[File: ...]`) 頼みで脆い -- 新しい注入種 (`` 等) を足すたびに 1 系統増える設計圧力 -- `Method::Notify` の "Notify" 語感が view-only な `Alerter` (本来 "Notification" 寄り) とぶつかっている - -LLM は `role:system` を生成しないため、worker.history 中の `role:system` 項目は構造的にすべてこのエージェント注入経路に由来する。この性質を型として表に出す。 - -## 方針 - -`Tool` パターンに倣って 「**1 つの concept + kind ベース dispatch**」 に統合する。 - -- wire event は 1 種類: `Event::SystemItem { kind, payload }` (1 件ずつライブ配信) -- LogEntry は kind 揃いで batch する単一バリアントに置き換え、`Hook` 命名を捨てる: `LogEntry::SystemItems { ts, items: Vec }` -- Pod 内部の注入路 (NotifyBuffer / `format_notify` / `render_event` / Interceptor.ContinueWith) は **全部「kind 付き `SystemItem` を作って worker.history に commit」 という単一形式に合流** -- TUI は kind 別に Block を出し分け (現 `ToolCallBlock` がツール別に見た目を出すのと同じ構造) - -単数/複数の使い分けは既存パターンに揃える: -- 1 件単位の wire event は `Event::SystemItem` (`Event::TextDelta` と同じ呼吸) -- 永続バッチは `LogEntry::SystemItems` で `Vec` を内包 (`LogEntry::AssistantItems` / `ToolResults` と同じ呼吸) - -`Method::Notify` / `Method::PodEvent` は外部 API としてはそのまま残す (入口の意味付けは別)。 中で `SystemItem::Notification` / `SystemItem::PodEvent` に変換されて以後は単一経路、という整理。 - -`Event::Alert` (= LLM context に乗らない純 UI 通知) は **別経路として明確に残す**。 view-only な persistent stream (Alerter の subscribe_with_snapshot) としてすでに正しく機能している。 "Notification" 語感の衝突は、本チケットで context 注入側を `SystemItem` に rename することで解消する (Notification は `SystemItem` の一 kind に格下げ、`Alerter` が "Notification" 語感の本来のオーナーに戻る)。 - -## 要件 - -- wire event は 1 種類: `Event::SystemItem { kind, payload }` で全注入が乗る。 `Event::Notify` / `Event::PodEvent` / `Event::HookInjectedItems` は protocol から削除 -- LogEntry は `HookInjectedItems` を rename + items を kind 付き typed shape に置換。 新名 `LogEntry::SystemItems { ts, items: Vec }` で wire tag は `system_items` -- `SystemItem` の kind 列挙は最低限以下を含む: - - `Notification { message }` (`Method::Notify` 由来) - - `PodEvent { event: PodEvent }` (子 pod ライフサイクル) - - `FileAttachment { path, content }` (`@` 解決) - - `Knowledge { slug, body }` (`#` 解決) - - `Workflow { slug, body }` (`/` 解決) - - 将来追加可能 (`Reminder` 等) を見越した拡張点 -- Pod 側の `NotifyBuffer` / `format_notify` / `render_event` / `Interceptor::on_prompt_submit ContinueWith` は `SystemItem` を中間表現として通る。 worker.history への append は最終的に `Item::system_message` + 対応する `SystemItem` 1 件を `LogEntry::SystemItems` として commit -- TUI は `Event::SystemItem` を kind で dispatch して描画する。 既存 `Block::Notify` / `Block::PodEvent` / `Block::SystemMessage` を `Block::SystemItem(SystemItemBlock)` に集約 (or 既存 Block を再利用しつつ駆動イベントだけ統一) -- `Method::Notify` / `Method::PodEvent` (外部入口 API) は名前を維持し、内部で `SystemItem::Notification` / `SystemItem::PodEvent` に変換される -- `Event::Alert` / `Alerter` は無変更 - -## 完了条件 - -- `Event::Notify` / `Event::PodEvent` / `Event::HookInjectedItems` が protocol から削除されている -- `LogEntry::HookInjectedItems` が削除され、`LogEntry::SystemItems` に置き換わっている (旧 wire tag を deserialize alias で残すかは実装判断) -- TUI が `Event::SystemItem` 駆動で system 系ブロックを構築している。 ライブ通知の二重描画が起きない -- `Method::Notify` と `Method::PodEvent` は外部 API としては変わらず動く -- `Event::Alert` / `Alerter` 経路は無変更 - -## 範囲外 - -- `Method::Notify` / `Method::PodEvent` の rename (入口名の整理は別の話) -- `Event::Alert` / `Alerter` 系の変更 -- 旧 session log (`hook_injected_items` を含む) のファイル変換: deserialize alias で読めるところまでで、ファイル書き換えは行わない -- TUI 内の `Block::SystemItem` 詳細な視覚設計 - -## 関連 - -- 前提となる `tickets/pod-state-from-session-log.md` (state 正本を session log に統合) の後続。 同チケット内で `Event::HookInjectedItems` を導入したが、 直後に「Hook 命名は誤り」「Notify/PodEvent と二重」と判明したため本チケットで整理する -- CLAUDE.md の 「context に乗せる前に history に commit する」 加工原則に整合する整理 (現実装の経路を統一形にするだけで、原則自体は変わらない)