update: SystemItem1本化

This commit is contained in:
Keisuke Hirata 2026-05-14 14:36:29 +09:00
parent b6b158a244
commit 904ea6e326
14 changed files with 752 additions and 379 deletions

View File

@ -8,7 +8,6 @@
- Pod: 任意ターンからの Fork複数ターン巻き戻しを汎用化 → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
- Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md)
- Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md)
- Pod: System 注入経路 (Notify / PodEvent / HookInjectedItems) を SystemItem 一本に統合 → [tickets/system-item-unify.md](tickets/system-item-unify.md)
- 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md)
- Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md)
- llm-worker のエラー耐性

View File

@ -411,24 +411,17 @@ where
let Some(entry) = classify_history_item(item) else {
continue;
};
let mut head = ctx.session_head.lock().await;
match session_store::append_entry_with_hash(
&ctx.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)
.await
{
Ok(_) => {
// Publish under the same critical section view
// a `subscribe_with_snapshot` would observe.
ctx.sink.publish(entry);
}
Err(e) => {
tracing::warn!(error = %e, "drain: append_entry failed; entry dropped");
}
commit_via_drain(&ctx, entry).await;
}
LogCommand::SystemItems(items) => {
if items.is_empty() {
continue;
}
let entry = LogEntry::SystemItems {
ts: session_log::now_millis(),
items,
};
commit_via_drain(&ctx, entry).await;
}
LogCommand::Flush(ack) => {
let _ = ack.send(());
@ -437,15 +430,52 @@ where
}
}
/// Map a single worker-history `Item` to its corresponding `LogEntry`
/// classification. `None` is the skip signal for `user_message` items —
/// those are committed via `LogEntry::UserInput` by `Pod::run` at
/// submit time and would otherwise produce a duplicate entry here.
async fn commit_via_drain<St>(ctx: &LogDrainHandle<St>, entry: LogEntry)
where
St: session_store::Store + Clone + Send + 'static,
{
let mut head = ctx.session_head.lock().await;
match session_store::append_entry_with_hash(
&ctx.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)
.await
{
Ok(_) => {
// Publish under the same critical section view a
// `subscribe_with_snapshot` would observe.
ctx.sink.publish(entry);
}
Err(e) => {
tracing::warn!(error = %e, "drain: append_entry failed; entry dropped");
}
}
}
/// Map one LLM-driven worker-history append to its `LogEntry` form.
///
/// `None` is the skip signal for items that the drain must not commit:
/// - `user_message` items are committed by `Pod::run` up-front as
/// `LogEntry::UserInput { segments }`.
/// - `system_message` items are committed by `PodInterceptor` as part
/// of a `LogEntry::SystemItems` batch (with typed kind metadata)
/// before they reach the worker's history.
fn classify_history_item(item: Item) -> Option<LogEntry> {
let ts = session_log::now_millis();
if item.is_user_message() {
return None;
}
if matches!(
item,
Item::Message {
role: llm_worker::Role::System,
..
}
) {
return None;
}
if item.is_tool_result() {
return Some(LogEntry::ToolResults {
ts,
@ -458,7 +488,9 @@ fn classify_history_item(item: Item) -> Option<LogEntry> {
items: vec![session_store::LoggedItem::from(&item)],
});
}
Some(LogEntry::HookInjectedItems {
// Defensive: anything else (future Item kinds) routes through
// AssistantItems rather than getting silently dropped.
Some(LogEntry::AssistantItems {
ts,
items: vec![session_store::LoggedItem::from(&item)],
})
@ -696,9 +728,11 @@ async fn controller_loop<C, St>(
}
Method::Notify { message } => {
let _ = event_tx.send(Event::Notify {
message: message.clone(),
});
// Client-side live echo is delivered as `Event::SystemItem`
// once the interceptor commits the corresponding
// `LogEntry::SystemItems` entry — drained out of the
// notify buffer + broadcast through the sink. No
// separate echo here.
pod.push_notify(message);
// RUNNING / Paused: the buffer push is the entire
// operation; an in-flight turn (or the next
@ -751,10 +785,12 @@ async fn controller_loop<C, St>(
Method::ListCompletions { .. } => {}
Method::PodEvent(event) => {
// Echo the received event to all subscribers so every
// client sees the input that drove any following
// auto-kicked turn.
let _ = event_tx.send(Event::PodEvent(event.clone()));
// Live echo travels through the SystemItem lane: once
// the interceptor drains the notify buffer, the
// typed `SystemItem::PodEvent` lands as a
// `LogEntry::SystemItems` entry and the sink fans it
// out to clients as `Event::SystemItem`.
//
// (1) system side effects — idempotent and tolerant of
// out-of-order delivery (e.g. `TurnEnded` arriving
// after `ShutDown`).
@ -765,11 +801,10 @@ async fn controller_loop<C, St>(
&self_parent_socket,
)
.await;
// (2) render a one-line summary and push it into the
// notification buffer; the next LLM request will
// inject it as a system message via
// `PodInterceptor::pre_llm_request`.
pod.push_notify(crate::ipc::event::render_event(&event));
// (2) queue the typed event in the notification buffer;
// the next LLM request will inject it as a typed
// `SystemItem::PodEvent` via the interceptor drain.
pod.push_pod_event_notify(event);
// Auto-kick a turn if the Pod is idle so the
// notification is not stranded. Matches the
// `Method::Notify` idle path.
@ -902,23 +937,21 @@ where
});
}
Some(Method::Notify { message }) => {
let _ = event_tx.send(Event::Notify {
message: message.clone(),
});
// Route into the buffer; the in-flight turn will
// drain it at its next pre_llm_request.
notify_buffer.push(message);
// Live echo arrives via `Event::SystemItem` once
// the in-flight turn's next `pre_llm_request`
// drains this entry through the interceptor.
notify_buffer.push_notify(message);
}
Some(Method::ListCompletions { .. }) => {}
Some(Method::PodEvent(event)) => {
let _ = event_tx.send(Event::PodEvent(event.clone()));
// mpsc is consume-once, so we cannot defer this
// to the next main-loop iteration — drop here
// would lose the event entirely (children fire
// and forget). Apply the side effects inline
// and stage the rendered string on the
// notification buffer so the in-flight turn's
// next `pre_llm_request` surfaces it.
// and stage the typed event on the notification
// buffer so the in-flight turn's next
// `pre_llm_request` surfaces it as a typed
// `SystemItem::PodEvent`.
let self_parent_socket = parent_socket.cloned();
crate::ipc::event::apply_event_side_effects(
&event,
@ -927,7 +960,7 @@ where
&self_parent_socket,
)
.await;
notify_buffer.push(crate::ipc::event::render_event(&event));
notify_buffer.push_pod_event(event);
}
None => {
let _ = cancel_tx.try_send(());

View File

@ -22,11 +22,15 @@ use tracing::info;
use tracing::warn;
use crate::compact::state::CompactState;
use session_store::SystemItem;
use tokio::sync::mpsc;
use crate::hook::{
AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary,
ToolResultSummary, TurnEndInfo,
};
use crate::ipc::notify_buffer::{NotifyBuffer, format_notify};
use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item};
use crate::pod::LogCommand;
use crate::prompt::catalog::PromptCatalog;
use llm_worker::token_counter::total_tokens;
@ -45,13 +49,20 @@ pub(crate) struct PodInterceptor {
/// request. The Worker `extend`s these into its persistent history
/// so the LLM has a visible trigger for any reaction it commits.
pending_notifies: NotifyBuffer,
/// Submit-scoped stash of resolver-produced system messages.
/// Drained inside `on_prompt_submit` and returned via
/// `PromptAction::ContinueWith`. Populated by `Pod::run` immediately
/// before handing off to the worker.
pending_attachments: Arc<Mutex<Vec<Item>>>,
/// Submit-scoped stash of resolver-produced typed system items.
/// Drained inside `on_prompt_submit`, committed as a
/// `LogEntry::SystemItems` through `log_cmd_tx`, and returned to
/// the worker as `Item::system_message` via
/// `PromptAction::ContinueWith`. Populated by `Pod::run`
/// immediately before handing off to the worker.
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
/// Prompt catalog used to render the injected notification wrapper.
prompts: Arc<PromptCatalog>,
/// Sender into the Pod's history-drain task. The interceptor uses
/// it to commit `LogCommand::SystemItems` batches before returning
/// the corresponding `Item::system_message`s up to the worker.
/// `None` in tests / `Pod::new` paths where no drain is wired.
log_cmd_tx: Option<mpsc::UnboundedSender<LogCommand>>,
/// Next turn index assigned by `on_prompt_submit`.
next_turn_index: AtomicUsize,
/// Tool calls observed in the current turn (reset on each new prompt).
@ -64,8 +75,9 @@ impl PodInterceptor {
compact_state: Option<Arc<CompactState>>,
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
pending_notifies: NotifyBuffer,
pending_attachments: Arc<Mutex<Vec<Item>>>,
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
prompts: Arc<PromptCatalog>,
log_cmd_tx: Option<mpsc::UnboundedSender<LogCommand>>,
) -> Self {
Self {
registry,
@ -74,11 +86,26 @@ impl PodInterceptor {
pending_notifies,
pending_attachments,
prompts,
log_cmd_tx,
next_turn_index: AtomicUsize::new(0),
tool_calls_this_turn: AtomicUsize::new(0),
}
}
/// Send a `LogCommand::SystemItems` batch down the drain channel
/// (no-op if no drain is wired). The drain task commits the entry
/// before the corresponding `Item::system_message`s reach the
/// worker via `ContinueWith` / `pending_history_appends`, so the
/// drain barrier in `persist_turn` covers system commits too.
fn send_system_items(&self, items: Vec<SystemItem>) {
if items.is_empty() {
return;
}
if let Some(tx) = self.log_cmd_tx.as_ref() {
let _ = tx.send(LogCommand::SystemItems(items));
}
}
fn current_turn_index(&self) -> usize {
self.next_turn_index
.load(Ordering::Relaxed)
@ -111,7 +138,7 @@ impl Interceptor for PodInterceptor {
return action.into();
}
}
let extras = std::mem::take(
let extras: Vec<SystemItem> = std::mem::take(
&mut *self
.pending_attachments
.lock()
@ -120,7 +147,14 @@ impl Interceptor for PodInterceptor {
if extras.is_empty() {
PromptAction::Continue
} else {
PromptAction::ContinueWith(extras)
// Commit the typed system items first, then hand the
// matching `Item::system_message`s to the worker. The
// drain task processes the `SystemItems` command BEFORE
// any subsequent `Item` commands from `on_history_append`,
// so on-disk order matches worker-history order.
let items: Vec<Item> = extras.iter().map(SystemItem::to_history_item).collect();
self.send_system_items(extras);
PromptAction::ContinueWith(items)
}
}
@ -129,19 +163,31 @@ impl Interceptor for PodInterceptor {
if drained.is_empty() {
return Vec::new();
}
let mut items = Vec::with_capacity(drained.len());
for n in drained {
match format_notify(&n, &self.prompts) {
Ok(item) => items.push(item),
let mut system_items: Vec<SystemItem> = Vec::with_capacity(drained.len());
let mut items: Vec<Item> = Vec::with_capacity(drained.len());
for entry in drained {
match build_system_item(&entry, &self.prompts) {
Ok(system_item) => {
items.push(system_item.to_history_item());
system_items.push(system_item);
}
Err(e) => {
// A render failure here would starve the LLM of
// the notify text. Fall back to the raw message
// so the trigger still lands in history.
// the notify text. Fall back to a raw item so the
// trigger still lands in history; the entry will
// simply be skipped from the SystemItems batch.
warn!(error = %e, "failed to render notify_wrapper; using raw message");
items.push(Item::system_message(n.message.clone()));
let fallback = match &entry {
super::notify_buffer::PendingNotify::Notify { message } => message.clone(),
super::notify_buffer::PendingNotify::PodEvent { event } => {
session_store::render_pod_event(event)
}
};
items.push(Item::system_message(fallback));
}
}
}
self.send_system_items(system_items);
items
}
@ -321,6 +367,7 @@ mod tests {
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await;
@ -346,6 +393,7 @@ mod tests {
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await;
@ -372,6 +420,7 @@ mod tests {
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await;
@ -392,6 +441,7 @@ mod tests {
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await;
@ -414,8 +464,8 @@ mod tests {
async fn pending_history_appends_drains_buffer_into_items() {
let registry = Arc::new(HookRegistryBuilder::new().build());
let buffer = NotifyBuffer::new();
buffer.push("first".into());
buffer.push("second".into());
buffer.push_notify("first".into());
buffer.push_notify("second".into());
let interceptor = PodInterceptor::new(
registry,
@ -424,6 +474,7 @@ mod tests {
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let items = interceptor.pending_history_appends().await;
@ -451,7 +502,7 @@ mod tests {
// anything itself.
let registry = Arc::new(HookRegistryBuilder::new().build());
let buffer = NotifyBuffer::new();
buffer.push("msg".into());
buffer.push_notify("msg".into());
let interceptor = PodInterceptor::new(
registry,
@ -460,6 +511,7 @@ mod tests {
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
let action = interceptor.pre_llm_request(&mut ctx).await;
@ -489,6 +541,7 @@ mod tests {
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await;

View File

@ -3,39 +3,48 @@
//! Entries are queued here by the Controller (on receipt of the
//! corresponding IPC method) and drained by
//! `PodInterceptor::pending_history_appends`, which the Worker calls
//! at the head of each turn loop iteration to `extend` them into the
//! persistent `worker.history`. Each queued entry becomes one
//! `Item::system_message`.
//! at the head of each turn loop iteration. The drain renders each
//! pending entry into a typed `SystemItem` (with the `notify_wrapper`
//! prompt applied), commits a `LogEntry::SystemItems` through the
//! session-log sink, and returns the corresponding
//! `Item::system_message`s for the worker to append to its
//! persistent history.
//!
//! This is the **single lane** for "system messages produced by Pod
//! state that should land in the next LLM request": Notify, PodEvent,
//! and any future `<system-reminder>` injection all ride this queue
//! (or a sibling queue with the same lifecycle). Per
//! `tickets/notify-history-persist.md` and `AGENTS.md` (LLM コンテキスト
//! の加工原則), there is **no** "transient, history-skipping" lane —
//! everything injected into a request is also committed to history so
//! that any LLM reaction has a visible trigger across turns, resume,
//! and compaction, and so the Anthropic prompt cache prefix stays
//! stable across requests.
//! and any future `<system-reminder>` injection all ride this queue.
//! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM
//! context の加工原則), there is **no** "transient, history-skipping"
//! lane — everything injected into a request is also committed to
//! history so any LLM reaction has a visible trigger across turns,
//! resume, and compaction, and so the Anthropic prompt cache prefix
//! stays stable across requests.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use llm_worker::Item;
use protocol::PodEvent;
use session_store::SystemItem;
use tracing::warn;
use crate::prompt::catalog::{CatalogError, PromptCatalog};
/// Maximum queued notify entries. Oldest entries are dropped beyond this.
/// Maximum queued pending entries. Oldest entries are dropped beyond this.
const CAPACITY: usize = 128;
/// One pending notify entry awaiting injection into the next LLM request.
/// One pending entry awaiting drain into the next LLM request.
///
/// The buffer keeps the raw input shape so the drain step can decide
/// the right `SystemItem` kind (and apply `notify_wrapper` to the
/// rendered body) at the moment of commit, when the prompt catalog
/// is available.
#[derive(Debug, Clone)]
pub struct PendingNotify {
pub message: String,
pub enum PendingNotify {
Notify { message: String },
PodEvent { event: PodEvent },
}
/// Shared, mutex-guarded buffer of pending notify entries.
/// Shared, mutex-guarded buffer of pending entries.
///
/// Cloned between the Pod (producer) and PodInterceptor (consumer).
#[derive(Clone, Default)]
@ -51,26 +60,35 @@ impl NotifyBuffer {
/// Push a notify entry onto the queue. If the queue is full, the
/// oldest entry is dropped and a `tracing::warn` is emitted — the
/// caller should never hit this in normal operation.
pub fn push(&self, message: String) {
pub fn push_notify(&self, message: String) {
self.push_entry(PendingNotify::Notify { message });
}
/// Push a typed pod-event entry onto the queue.
pub fn push_pod_event(&self, event: PodEvent) {
self.push_entry(PendingNotify::PodEvent { event });
}
fn push_entry(&self, entry: PendingNotify) {
let mut q = self.inner.lock().expect("notify buffer poisoned");
if q.len() >= CAPACITY {
let dropped = q.pop_front();
warn!(
capacity = CAPACITY,
dropped_message = dropped.as_ref().map(|n| n.message.as_str()),
dropped = ?dropped,
"notify buffer overflow; dropped oldest"
);
}
q.push_back(PendingNotify { message });
q.push_back(entry);
}
/// Remove and return all pending notify entries in FIFO order.
/// Remove and return all pending entries in FIFO order.
pub fn drain(&self) -> Vec<PendingNotify> {
let mut q = self.inner.lock().expect("notify buffer poisoned");
q.drain(..).collect()
}
/// Number of pending notify entries. Primarily for tests.
/// Number of pending entries. Primarily for tests.
pub fn len(&self) -> usize {
self.inner.lock().expect("notify buffer poisoned").len()
}
@ -80,17 +98,30 @@ impl NotifyBuffer {
}
}
/// Format a single pending notify entry into the `Item::system_message`
/// that gets appended to `worker.history` just before the next LLM
/// request. The wrapper body comes from `PodPrompt::NotifyWrapper` so
/// the surrounding phrasing can be customised via a prompt pack
/// (translation, tone, ...).
pub(crate) fn format_notify(
n: &PendingNotify,
/// Render one pending entry into a typed `SystemItem`. The
/// `notify_wrapper` prompt produces the LLM-context body for both
/// `Notify` (raw message) and `PodEvent` (rendered event line).
pub(crate) fn build_system_item(
entry: &PendingNotify,
prompts: &PromptCatalog,
) -> Result<Item, CatalogError> {
let text = prompts.notify_wrapper(&n.message)?;
Ok(Item::system_message(text))
) -> Result<SystemItem, CatalogError> {
match entry {
PendingNotify::Notify { message } => {
let body = prompts.notify_wrapper(message)?;
Ok(SystemItem::Notification {
message: message.clone(),
body,
})
}
PendingNotify::PodEvent { event } => {
let rendered = session_store::render_pod_event(event);
let body = prompts.notify_wrapper(&rendered)?;
Ok(SystemItem::PodEvent {
event: event.clone(),
body,
})
}
}
}
#[cfg(test)]
@ -100,12 +131,14 @@ mod tests {
#[test]
fn push_then_drain_preserves_order() {
let buf = NotifyBuffer::new();
buf.push("one".into());
buf.push("two".into());
buf.push_notify("one".into());
buf.push_notify("two".into());
let drained = buf.drain();
assert_eq!(drained.len(), 2);
assert_eq!(drained[0].message, "one");
assert_eq!(drained[1].message, "two");
match &drained[0] {
PendingNotify::Notify { message } => assert_eq!(message, "one"),
other => panic!("unexpected: {other:?}"),
}
assert!(buf.is_empty());
}
@ -113,28 +146,50 @@ mod tests {
fn capacity_drops_oldest() {
let buf = NotifyBuffer::new();
for i in 0..(CAPACITY + 5) {
buf.push(format!("msg{i}"));
buf.push_notify(format!("msg{i}"));
}
let drained = buf.drain();
assert_eq!(drained.len(), CAPACITY);
// Oldest 5 were dropped; first retained is msg5.
assert_eq!(drained[0].message, "msg5");
assert_eq!(
drained[CAPACITY - 1].message,
format!("msg{}", CAPACITY + 4)
);
match &drained[0] {
PendingNotify::Notify { message } => assert_eq!(message, "msg5"),
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn format_notify_includes_message_and_nonblocking_hint() {
let n = PendingNotify {
fn build_system_item_for_notify_carries_wrapper_body() {
let entry = PendingNotify::Notify {
message: "hello".into(),
};
let catalog = PromptCatalog::builtins_only().unwrap();
let item = format_notify(&n, &catalog).unwrap();
let text = item.as_text().unwrap_or_default().to_string();
assert!(text.contains("[Notification]"));
assert!(text.contains("hello"));
assert!(text.contains("not a blocking request"));
let item = build_system_item(&entry, &catalog).unwrap();
match item {
SystemItem::Notification { message, body } => {
assert_eq!(message, "hello");
assert!(body.contains("[Notification]"));
assert!(body.contains("hello"));
assert!(body.contains("not a blocking request"));
}
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn build_system_item_for_pod_event_wraps_rendered_event_text() {
let entry = PendingNotify::PodEvent {
event: PodEvent::TurnEnded {
pod_name: "child".into(),
},
};
let catalog = PromptCatalog::builtins_only().unwrap();
let item = build_system_item(&entry, &catalog).unwrap();
match item {
SystemItem::PodEvent { event, body } => {
assert!(matches!(event, PodEvent::TurnEnded { ref pod_name } if pod_name == "child"));
assert!(body.contains("[Notification]"));
assert!(body.contains("`child`"));
}
other => panic!("unexpected: {other:?}"),
}
}
}

View File

@ -104,22 +104,39 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
entry = entry_rx.recv() => {
match entry {
Ok(entry) => {
let value = serde_json::to_value(&entry)
.expect("LogEntry is Serialize");
let outbound = match &entry {
let outbound = match entry {
session_store::LogEntry::SessionStart { .. } => {
Some(Event::SessionRotated { entry: value })
let value = serde_json::to_value(&entry)
.expect("LogEntry is Serialize");
vec![Event::SessionRotated { entry: value }]
}
session_store::LogEntry::HookInjectedItems { .. } => {
Some(Event::HookInjectedItems { entry: value })
session_store::LogEntry::SystemItems { items, .. } => {
// Fan out per-item so each `SystemItem`
// arrives as its own `Event::SystemItem`
// on the wire. Batching on disk is an
// implementation detail of the drain
// task; clients see them one at a time.
items
.into_iter()
.map(|si| {
let value = serde_json::to_value(&si)
.expect("SystemItem is Serialize");
Event::SystemItem { item: value }
})
.collect()
}
// Defensive: should never reach here per
// `SessionLogSink::is_live_relevant`.
_ => None,
_ => Vec::new(),
};
if let Some(event) = outbound
&& writer.write(&event).await.is_err()
{
let mut hit_error = false;
for event in outbound {
if writer.write(&event).await.is_err() {
hit_error = true;
break;
}
}
if hit_error {
break;
}
}

View File

@ -9,8 +9,8 @@ use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable;
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use session_store::{
EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, session_log,
to_logged,
EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem,
session_log, to_logged,
};
use tracing::{info, warn};
@ -18,16 +18,21 @@ use crate::session_log_sink::SessionLogSink;
/// Command sent to the per-Pod history-drain task.
///
/// `Item` carries one worker-history append observed via
/// `Worker::on_history_append`; the drain classifies it into a
/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` /
/// `LogEntry::HookInjectedItems` and commits it through the sink.
/// `Flush(ack)` is the barrier used by `persist_turn` to ensure every
/// in-flight item is committed before the trailing `TurnEnd` entry
/// lands.
/// - `Item`: one worker-history append observed via
/// `Worker::on_history_append`; the drain classifies it into
/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` and commits
/// through the sink. `role:system` items are explicitly skipped
/// because they are committed up-front through `SystemItems`.
/// - `SystemItems`: typed agent-injected items committed as a single
/// `LogEntry::SystemItems` entry. Used by the interceptor when it
/// drains the notify buffer or pending attachments.
/// - `Flush(ack)`: barrier used by `persist_turn` to ensure every
/// queued command has been processed before the trailing `TurnEnd`
/// entry lands.
#[derive(Debug)]
pub enum LogCommand {
Item(Item),
SystemItems(Vec<SystemItem>),
Flush(tokio::sync::oneshot::Sender<()>),
}
@ -158,7 +163,7 @@ pub struct Pod<C: LlmClient, St: Store> {
/// before handing off to the worker; `PodInterceptor::on_prompt_submit`
/// drains it and returns `ContinueWith` so the items land in
/// history right after the user message that referenced them.
pending_attachments: Arc<Mutex<Vec<Item>>>,
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
/// Scope allocation in the machine-wide lock file. `Some` for
/// Pods built via `from_manifest` / `from_manifest_spawned` /
/// `restore_from_manifest` (production paths); `None` for the
@ -279,7 +284,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(),
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::new())),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: None,
callback_socket: None,
prompts: self.prompts.clone(),
@ -378,7 +383,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
alerter: None,
event_tx: None,
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::new())),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: None,
callback_socket: None,
prompts,
@ -760,7 +765,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// `PodInterceptor::pending_history_appends`. See [`NotifyBuffer`]
/// for overflow behaviour and the lane-of-record rationale.
pub fn push_notify(&self, message: String) {
self.pending_notifies.push(message);
self.pending_notifies.push_notify(message);
}
/// Push a typed `PodEvent` entry onto the pending buffer.
///
/// Same lifecycle as [`push_notify`](Self::push_notify) but
/// preserves the typed `PodEvent` payload so the IPC layer can
/// emit `SystemItem::PodEvent { event, body }` with structured
/// data for clients.
pub fn push_pod_event_notify(&self, event: protocol::PodEvent) {
self.pending_notifies.push_pod_event(event);
}
/// Shared handle to the pending notification buffer.
@ -892,6 +907,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.pending_notifies.clone(),
self.pending_attachments.clone(),
self.prompts.clone(),
self.log_cmd_tx.clone(),
);
self.worker_mut().set_interceptor(interceptor);
self.interceptor_installed = true;
@ -1099,7 +1115,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// directory) surface as `AlertLevel::Warn` Alerts and are skipped — the
/// unresolved placeholder stays in the flattened user message so the LLM
/// still sees the intent.
fn resolve_file_refs(&self, segments: &[Segment]) -> Vec<Item> {
fn resolve_file_refs(&self, segments: &[Segment]) -> Vec<SystemItem> {
let view = crate::fs_view::PodFsView::new(tools::ScopedFs::with_shared_scope(
self.scope.clone(),
self.pwd.clone(),
@ -1110,7 +1126,19 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
continue;
};
match view.resolve_file_ref(path, self.manifest.worker.file_upload.max_bytes) {
Ok(item) => out.push(item),
Ok(item) => {
// `resolve_file_ref` returns an `Item::system_message`
// whose text already carries the `[File: <path>]` or
// `[Dir: <path>]` header (plus any truncation hint).
// Persist that body verbatim — it is what the LLM
// actually saw, so resume produces byte-identical
// history.
let body = item.as_text().unwrap_or_default().to_string();
out.push(SystemItem::FileAttachment {
path: path.clone(),
body,
});
}
Err(e) => {
self.alert(
AlertLevel::Warn,
@ -1123,7 +1151,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
out
}
fn resolve_knowledge_refs(&self, segments: &[Segment]) -> Vec<Item> {
fn resolve_knowledge_refs(&self, segments: &[Segment]) -> Vec<SystemItem> {
let Some(layout) = self.memory_layout.as_ref() else {
return Vec::new();
};
@ -1156,7 +1184,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}
};
let raw = String::from_utf8_lossy(&bytes).into_owned();
let body = match memory::schema::split_frontmatter(&raw) {
let body_text = match memory::schema::split_frontmatter(&raw) {
Ok((_yaml, body)) => body,
Err(e) => {
self.alert(
@ -1173,11 +1201,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
&bytes,
);
self.append_memory_use_event(memory::UsageSource::KnowledgeRef, vec![snapshot]);
out.push(Item::system_message(format!(
"[Knowledge #{}]\n{}",
slug,
body.trim_end()
)));
let body = format!("[Knowledge #{}]\n{}", slug, body_text.trim_end());
out.push(SystemItem::Knowledge {
slug: slug.clone(),
body,
});
}
out
}
@ -1247,7 +1275,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
fn resolve_workflow_invocations(
&self,
segments: &[Segment],
) -> Result<Vec<Item>, WorkflowResolveError> {
) -> Result<Vec<SystemItem>, WorkflowResolveError> {
let Some(layout) = self.memory_layout.as_ref() else {
if let Some(slug) = segments.iter().find_map(|seg| match seg {
Segment::WorkflowInvoke { slug } => Some(slug.clone()),
@ -1282,7 +1310,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
warn!(workflow = %slug, error = %err, "failed to snapshot workflow usage");
}
}
out.extend(items);
// `resolve_workflow_invocation` returns Item::system_message
// entries (potentially multiple — body + dependency knowledge
// bodies). Persist each as a SystemItem::Workflow keyed on
// the invocation slug.
for item in items {
let body = item.as_text().unwrap_or_default().to_string();
out.push(SystemItem::Workflow {
slug: slug.clone(),
body,
});
}
}
Ok(out)
}
@ -2635,7 +2673,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
alerter: None,
event_tx: None,
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::new())),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: Some(scope_allocation),
callback_socket: None,
prompts: common.prompts,
@ -2708,7 +2746,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
alerter: None,
event_tx: None,
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::new())),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: Some(scope_allocation),
callback_socket: Some(callback_socket),
prompts: common.prompts,
@ -2852,7 +2890,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
alerter: None,
event_tx: None,
pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::new())),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
scope_allocation: Some(scope_allocation),
callback_socket: None,
prompts: common.prompts,

View File

@ -120,7 +120,7 @@ impl SessionLogSink {
fn is_live_relevant(entry: &LogEntry) -> bool {
matches!(
entry,
LogEntry::SessionStart { .. } | LogEntry::HookInjectedItems { .. }
LogEntry::SessionStart { .. } | LogEntry::SystemItems { .. }
)
}
@ -427,12 +427,13 @@ mod tests {
assert!(rx.try_recv().is_err());
}
fn hook_injected(text: &str) -> LogEntry {
LogEntry::HookInjectedItems {
fn notification_entry(text: &str) -> LogEntry {
LogEntry::SystemItems {
ts: now_millis(),
items: vec![session_store::LoggedItem::from(
&llm_worker::Item::system_message(text),
)],
items: vec![session_store::SystemItem::Notification {
message: text.to_owned(),
body: format!("[Notification] {text}"),
}],
}
}
@ -448,11 +449,11 @@ mod tests {
sink.publish(turn_end(1));
assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live");
// HookInjectedItems is live-relevant.
sink.publish(hook_injected("[Notify] hi"));
// SystemItems is live-relevant.
sink.publish(notification_entry("hi"));
match rx.try_recv() {
Ok(LogEntry::HookInjectedItems { .. }) => {}
other => panic!("expected HookInjectedItems, got {other:?}"),
Ok(LogEntry::SystemItems { .. }) => {}
other => panic!("expected SystemItems, got {other:?}"),
}
// Mirror still grew with both entries (snapshot completeness).
@ -465,11 +466,11 @@ mod tests {
let sink = SessionLogSink::new();
sink.publish(session_start());
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
sink.publish(hook_injected("post-snapshot"));
sink.publish(notification_entry("post-snapshot"));
assert_eq!(snapshot.len(), 1);
match rx.try_recv() {
Ok(LogEntry::HookInjectedItems { .. }) => {}
Ok(LogEntry::SystemItems { .. }) => {}
other => panic!("unexpected: {other:?}"),
}
assert!(rx.try_recv().is_err());

View File

@ -34,6 +34,9 @@ fn history_from_sink(handle: &PodHandle) -> Vec<Item> {
| LogEntry::HookInjectedItems { items: i, .. } => {
items.extend(i.into_iter().map(Item::from));
}
LogEntry::SystemItems { items: si, .. } => {
items.extend(si.iter().map(|s| s.to_history_item()));
}
_ => {}
}
}
@ -745,16 +748,12 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
.unwrap();
// Wait for the auto-started turn to complete.
let mut saw_notify_echo = false;
let mut saw_turn_end = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(Event::Notify { ref message }) if message == "turn finished" => {
saw_notify_echo = true;
}
Ok(Event::TurnEnd { .. }) => { saw_turn_end = true; break; }
Err(_) => break,
_ => {}
@ -763,14 +762,28 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
_ = tokio::time::sleep_until(deadline) => break,
}
}
assert!(
saw_notify_echo,
"Method::Notify on idle Pod should be echoed as Event::Notify"
);
assert!(saw_turn_end, "auto-triggered turn should complete");
// Status flips back to Idle on the controller thread after RunEnd.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
// Wait for the post-run persist_turn (Flush + TurnEnd + RunCompleted
// commits) to finish; the controller flips status to Idle right
// after that.
wait_for_status(&handle, PodStatus::Idle).await;
// The live echo arrives via the sink's `Event::SystemItem` lane,
// not on the `event_tx` broadcast that `handle.subscribe()` taps.
// Verify the notification landed on the sink mirror instead.
let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_notify_in_mirror = entries.iter().any(|e| matches!(
e,
session_store::LogEntry::SystemItems { items, .. }
if items.iter().any(|si| matches!(
si,
session_store::SystemItem::Notification { message, .. }
if message == "turn finished"
))
));
assert!(
saw_notify_in_mirror,
"Method::Notify should commit a SystemItem::Notification entry; mirror = {entries:?}"
);
// Exactly one request was made; it must contain the formatted
// notification as one of the items (committed to history by
@ -825,18 +838,12 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
.await
.unwrap();
let mut saw_pod_event_echo = false;
let mut saw_turn_end = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(Event::PodEvent(protocol::PodEvent::TurnEnded { ref pod_name }))
if pod_name == "child" =>
{
saw_pod_event_echo = true;
}
Ok(Event::TurnEnd { .. }) => { saw_turn_end = true; break; }
Err(_) => break,
_ => {}
@ -845,15 +852,28 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
_ = tokio::time::sleep_until(deadline) => break,
}
}
assert!(
saw_pod_event_echo,
"Method::PodEvent on idle Pod should be echoed as Event::PodEvent"
);
assert!(
saw_turn_end,
"PodEvent::TurnEnded on idle Pod should auto-start a turn"
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Wait for the post-run persist_turn to complete before reading the
// mirror — TurnEnd fires inside the worker loop, persist_turn (and
// its Flush of the drain queue) runs afterwards.
wait_for_status(&handle, PodStatus::Idle).await;
let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_pod_event_in_mirror = entries.iter().any(|e| matches!(
e,
session_store::LogEntry::SystemItems { items, .. }
if items.iter().any(|si| matches!(
si,
session_store::SystemItem::PodEvent { event: protocol::PodEvent::TurnEnded { pod_name }, .. }
if pod_name == "child"
))
));
assert!(
saw_pod_event_in_mirror,
"Method::PodEvent should commit a SystemItem::PodEvent entry"
);
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
let requests = client_for_assert.captured_requests();
@ -911,8 +931,6 @@ async fn notify_while_running_does_not_emit_already_running_error() {
.unwrap();
// Drain events until the run ends; AlreadyRunning must never appear.
// The in-flight branch must still echo the Notify as a log element.
let mut saw_notify_echo = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
tokio::select! {
@ -921,9 +939,6 @@ async fn notify_while_running_does_not_emit_already_running_error() {
Ok(Event::Error { code, .. }) if code == pod::ErrorCode::AlreadyRunning => {
panic!("Notify while running must not produce AlreadyRunning");
}
Ok(Event::Notify { ref message }) if message == "ping" => {
saw_notify_echo = true;
}
Ok(Event::TurnEnd { .. }) => break,
Err(_) => break,
_ => {}
@ -932,10 +947,13 @@ async fn notify_while_running_does_not_emit_already_running_error() {
_ = tokio::time::sleep_until(deadline) => break,
}
}
assert!(
saw_notify_echo,
"in-flight Notify must still be echoed as Event::Notify"
);
// The core property of this test is "no AlreadyRunning error fires
// when Notify arrives mid-run". The notify's `SystemItem` commit
// is racy here (depends on whether the in-flight turn's next
// `pending_history_appends` runs before vs after the buffer push)
// and has dedicated coverage in
// `notify_while_idle_auto_starts_turn_and_injects_system_message`.
wait_for_status(&handle, PodStatus::Idle).await;
}
#[tokio::test]
@ -1032,19 +1050,29 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() {
let mut saw_turn_end = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
// The SystemItem and TurnEnd events arrive through independent
// broadcast lanes (sink fan-out vs `event_tx`), so their relative
// order on the wire is non-deterministic. Keep reading until both
// are observed (or the deadline trips), rather than breaking on
// the first TurnEnd.
loop {
if saw_pod_event_echo && saw_turn_end {
break;
}
tokio::select! {
event = reader.next::<Event>() => {
match event {
Ok(Some(Event::PodEvent(protocol::PodEvent::TurnEnded { pod_name })))
if pod_name == "child" =>
Ok(Some(Event::SystemItem { ref item }))
if item.get("kind").and_then(|k| k.as_str()) == Some("pod_event")
&& item
.pointer("/event/pod_name")
.and_then(|v| v.as_str()) == Some("child") =>
{
saw_pod_event_echo = true;
}
Ok(Some(Event::TurnStart { .. })) => saw_turn_start = true,
Ok(Some(Event::TurnEnd { .. })) => {
saw_turn_end = true;
break;
}
Ok(None) | Err(_) => break,
_ => {}
@ -1056,7 +1084,7 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() {
assert!(
saw_pod_event_echo,
"PodEvent::TurnEnded via socket should be echoed as Event::PodEvent"
"PodEvent::TurnEnded via socket should be echoed as Event::SystemItem(PodEvent)"
);
assert!(
saw_turn_start,

View File

@ -214,20 +214,23 @@ pub enum Event {
UserMessage {
segments: Vec<Segment>,
},
/// Echo of `Method::Notify` received by this Pod. Broadcast on
/// receipt so subscribers can render the external input as a log
/// element. The same `message` is independently pushed into the
/// notification buffer for LLM injection (with prompt-pack
/// wrapping); this echo carries the raw payload and does not
/// imply any turn-boundary semantics.
Notify {
message: String,
/// One agent-injected system item committed to history.
///
/// Carries the JSON form of `session_store::SystemItem`. Covers
/// `Method::Notify` echoes, child-Pod lifecycle events from
/// `Method::PodEvent`, `@<path>` / `#<slug>` / `/<slug>`
/// resolution payloads, and any future agent-side injection kind.
/// Clients dispatch on the `kind` tag for typed rendering instead
/// of parsing free-text prefixes like `[Notification] …` or
/// `[File: …]`.
///
/// Fired per-item, even when the underlying
/// `LogEntry::SystemItems` entry batched several together — the
/// IPC layer fans the batch out at broadcast time so subscribers
/// observe one event per item.
SystemItem {
item: serde_json::Value,
},
/// Echo of `Method::PodEvent` received by this Pod. Same rationale
/// as `Notify`: subscribers render the event as a log element,
/// while a rendered summary is independently injected into the LLM
/// context via the notification buffer.
PodEvent(PodEvent),
TurnStart {
turn: usize,
},
@ -335,17 +338,6 @@ pub enum Event {
SessionRotated {
entry: serde_json::Value,
},
/// A non-LLM-driven history append landed in the worker history.
///
/// Carries the JSON form of `session_store::LogEntry::HookInjectedItems`.
/// This is the live counterpart of items that the streaming lane
/// never broadcasts — `Method::Notify` echoes, `@<path>` attachment
/// resolutions, `<system-reminder>` injections — so a connected
/// client can render them in time order without waiting for the
/// next reconnect's `Snapshot`.
HookInjectedItems {
entry: serde_json::Value,
},
/// Current Pod controller status. Broadcast on every controller-level
/// transition and included in `History` snapshots for late attach.
Status {
@ -791,20 +783,18 @@ mod tests {
}
#[test]
fn event_hook_injected_items_roundtrip() {
let event = Event::HookInjectedItems {
entry: serde_json::json!({"kind": "hook_injected_items", "ts": 42, "items": []}),
fn event_system_item_roundtrip() {
let event = Event::SystemItem {
item: serde_json::json!({"kind": "notification", "message": "hello"}),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "hook_injected_items");
assert_eq!(parsed["data"]["entry"]["kind"], "hook_injected_items");
assert_eq!(parsed["event"], "system_item");
assert_eq!(parsed["data"]["item"]["kind"], "notification");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::HookInjectedItems { entry } => {
assert_eq!(entry["kind"], "hook_injected_items")
}
other => panic!("expected HookInjectedItems, got {other:?}"),
Event::SystemItem { item } => assert_eq!(item["kind"], "notification"),
other => panic!("expected SystemItem, got {other:?}"),
}
}
@ -1066,43 +1056,6 @@ mod tests {
assert_eq!(parsed["data"]["code"], "already_running");
}
#[test]
fn event_notify_roundtrip() {
let event = Event::Notify {
message: "child-pod finished".into(),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "notify");
assert_eq!(parsed["data"]["message"], "child-pod finished");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::Notify { message } => assert_eq!(message, "child-pod finished"),
other => panic!("expected Notify, got {other:?}"),
}
}
#[test]
fn event_pod_event_roundtrip() {
let event = Event::PodEvent(PodEvent::TurnEnded {
pod_name: "child".into(),
});
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "pod_event");
assert_eq!(parsed["data"]["kind"], "turn_ended");
assert_eq!(parsed["data"]["pod_name"], "child");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::PodEvent(PodEvent::TurnEnded { pod_name }) => {
assert_eq!(pod_name, "child");
}
other => panic!("expected PodEvent::TurnEnded, got {other:?}"),
}
}
#[test]
fn event_user_message_roundtrip() {
let event = Event::UserMessage {

View File

@ -32,6 +32,7 @@ pub mod logged_item;
pub mod session;
pub mod session_log;
pub mod store;
pub mod system_item;
pub use event_trace::TraceEntry;
pub use fs_store::FsStore;
@ -48,6 +49,7 @@ pub use session_log::{
EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState,
SessionOrigin, build_chain, collect_state, compute_hash,
};
pub use system_item::{SystemItem, render_pod_event};
pub use store::{Store, StoreError};
/// Session identifier. UUID v7 (time-ordered, lexicographically sortable).

View File

@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::logged_item::LoggedItem;
use crate::system_item::SystemItem;
/// SHA-256 hash identifying a specific log entry in the chain.
///
@ -125,7 +126,18 @@ pub enum LogEntry {
/// Tool execution results added to history (worker.rs:897-900, 1072-1076).
ToolResults { ts: u64, items: Vec<LoggedItem> },
/// Items injected by `on_turn_end` hook via `ContinueWithMessages` (worker.rs:1055).
/// Typed agent-injected system items: notifications, child-Pod
/// lifecycle events, `@<path>` / `#<slug>` / `/<slug>` resolution
/// payloads. Each `SystemItem` carries kind metadata that the LLM
/// itself never sees (the LLM gets `Item::system_message` with the
/// item's `history_text()`), but live clients and replay paths
/// dispatch on `kind` for typed rendering.
SystemItems { ts: u64, items: Vec<SystemItem> },
/// Legacy pre-`SystemItems` form. Deserialize-only — new writes
/// always use `SystemItems`. Items are flattened to
/// `Item::system_message` on replay, matching how the original
/// path worked.
HookInjectedItems { ts: u64, items: Vec<LoggedItem> },
/// Turn boundary. Records the turn count after increment.
@ -276,6 +288,11 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
LogEntry::ToolResults { items, .. } => {
state.history.extend(items.iter().cloned().map(Item::from));
}
LogEntry::SystemItems { items, .. } => {
state
.history
.extend(items.iter().map(|si| si.to_history_item()));
}
LogEntry::HookInjectedItems { items, .. } => {
state.history.extend(items.iter().cloned().map(Item::from));
}

View File

@ -0,0 +1,198 @@
//! Typed system-message items injected by the agent system.
//!
//! Items in worker history with `role:system` are never produced by the
//! LLM — they are always inserted by the Pod itself (notifications,
//! file/knowledge/workflow ref resolutions, child-pod lifecycle events,
//! future `<system-reminder>` tags, …). [`SystemItem`] carries the
//! typed shape of each such injection so clients can dispatch on
//! `kind` instead of parsing text prefixes like `[Notification] …` or
//! `[File: …]`.
//!
//! Persisted as the payload of [`crate::LogEntry::SystemItems`], and
//! broadcast live as the payload of `Event::SystemItem` on the wire.
//!
//! For LLM context replay, each `SystemItem` reduces to an
//! `Item::system_message(...)` whose body matches the legacy free-text
//! shape (see [`SystemItem::history_text`]). The kind metadata is
//! preserved only on the log/wire side; the LLM still sees plain
//! system-message text.
use llm_worker::llm_client::types::Item;
use protocol::PodEvent;
use serde::{Deserialize, Serialize};
/// One agent-injected system item, tagged by origin.
///
/// Each variant carries the kind-specific raw data clients use for
/// typed rendering (`Notification.message`, `PodEvent.event`, file
/// path / knowledge slug / workflow slug / etc.), plus a pre-rendered
/// `body` (where applicable) that is the exact `role:system` text the
/// LLM actually saw at commit time. `body` is denormalised so that
/// session log replay reconstructs worker history byte-identical to
/// what was on the wire — even when prompt overrides (e.g. custom
/// `notify_wrapper` template) re-shape the live rendering on a later
/// resume.
///
/// New variants get added here as fresh injection kinds come online
/// (e.g. `Reminder`). The `kind` JSON tag is the snake_case form of
/// the variant name.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum SystemItem {
/// Free-form notification sent in by an external caller via
/// `Method::Notify`. `message` is the raw caller-supplied text;
/// `body` is the wrapped LLM-context form (Pod renders it via
/// `notify_wrapper` at commit time).
Notification { message: String, body: String },
/// Lifecycle event reported by a child Pod via `Method::PodEvent`.
/// `event` is the typed payload (so the TUI can render per-child
/// banners without re-parsing); `body` is the wrapped LLM-context
/// form (same `notify_wrapper` path as `Notification`).
PodEvent { event: PodEvent, body: String },
/// `@<path>` file reference resolution. `body` is the rendered
/// LLM-context text (`[File: <path>]\n…` for regular files,
/// `[Dir: <path>]\n…` for directory listings, possibly with a
/// truncation hint) so replay reconstructs worker history
/// byte-identical to what was sent.
FileAttachment { path: String, body: String },
/// `#<slug>` Knowledge reference resolution. `body` is the
/// rendered text the LLM saw (Pod composes the `[Knowledge: …]`
/// header + body).
Knowledge { slug: String, body: String },
/// `/<slug>` Workflow invocation. `body` is the workflow's
/// prompt body materialized into the LLM context.
Workflow { slug: String, body: String },
}
impl SystemItem {
/// Free-text body the LLM sees inside its `role:system` message
/// for this item. Returns the variant's stored `body` verbatim.
pub fn history_text(&self) -> String {
match self {
SystemItem::Notification { body, .. } => body.clone(),
SystemItem::PodEvent { body, .. } => body.clone(),
SystemItem::FileAttachment { body, .. } => body.clone(),
SystemItem::Knowledge { body, .. } => body.clone(),
SystemItem::Workflow { body, .. } => body.clone(),
}
}
/// Materialize this `SystemItem` as the `Item::system_message`
/// form that lands in worker history.
pub fn to_history_item(&self) -> Item {
Item::system_message(self.history_text())
}
/// Short human-readable label used for diagnostics. Not on the
/// wire — keep flexible.
pub fn kind_label(&self) -> &'static str {
match self {
SystemItem::Notification { .. } => "notification",
SystemItem::PodEvent { .. } => "pod_event",
SystemItem::FileAttachment { .. } => "file_attachment",
SystemItem::Knowledge { .. } => "knowledge",
SystemItem::Workflow { .. } => "workflow",
}
}
}
/// Render a `PodEvent` as the one-line notification text the agent
/// sees. Centralised here (rather than at the controller's render
/// site) so persistence and broadcast share the same rendering.
pub fn render_pod_event(event: &PodEvent) -> String {
match event {
PodEvent::TurnEnded { pod_name } => format!("pod `{pod_name}` finished a turn"),
PodEvent::Errored { pod_name, message } => {
format!("pod `{pod_name}` errored: {message}")
}
PodEvent::ShutDown { pod_name } => format!("pod `{pod_name}` shut down"),
PodEvent::ScopeSubDelegated {
parent_pod,
sub_pod,
..
} => {
format!("pod `{parent_pod}` sub-delegated scope to `{sub_pod}`")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn notification_history_text_returns_stored_body() {
let item = SystemItem::Notification {
message: "child done".into(),
body: "[Notification]\nchild done\n\n(non-blocking hint…)".into(),
};
assert_eq!(
item.history_text(),
"[Notification]\nchild done\n\n(non-blocking hint…)"
);
}
#[test]
fn pod_event_history_text_returns_stored_body() {
let item = SystemItem::PodEvent {
event: PodEvent::TurnEnded {
pod_name: "child".into(),
},
body: "[Notification]\npod `child` finished a turn\n\n(non-blocking hint…)".into(),
};
assert!(item.history_text().starts_with("[Notification]\n"));
assert!(item.history_text().contains("`child`"));
}
#[test]
fn file_attachment_history_text_returns_stored_body() {
let item = SystemItem::FileAttachment {
path: "src/main.rs".into(),
body: "[File: src/main.rs]\nfn main() {}".into(),
};
assert_eq!(item.history_text(), "[File: src/main.rs]\nfn main() {}");
}
#[test]
fn round_trip_via_json() {
let item = SystemItem::FileAttachment {
path: "src/main.rs".into(),
body: "[File: src/main.rs]\nfn main() {}".into(),
};
let json = serde_json::to_string(&item).unwrap();
let parsed: SystemItem = serde_json::from_str(&json).unwrap();
match parsed {
SystemItem::FileAttachment { path, body } => {
assert_eq!(path, "src/main.rs");
assert_eq!(body, "[File: src/main.rs]\nfn main() {}");
}
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn round_trip_pod_event() {
let item = SystemItem::PodEvent {
event: PodEvent::TurnEnded {
pod_name: "child".into(),
},
body: "[Notification] pod `child` finished a turn".into(),
};
let json = serde_json::to_string(&item).unwrap();
let parsed: SystemItem = serde_json::from_str(&json).unwrap();
match parsed {
SystemItem::PodEvent {
event: PodEvent::TurnEnded { pod_name },
body,
} => {
assert_eq!(pod_name, "child");
assert!(body.contains("`child`"));
}
other => panic!("unexpected: {other:?}"),
}
}
}

View File

@ -483,21 +483,13 @@ impl App {
self.blocks.push(Block::UserMessage { segments });
self.assistant_streaming = false;
}
Event::Notify { message } => {
self.blocks.push(Block::Notify { message });
self.assistant_streaming = false;
}
Event::PodEvent(event) => {
self.blocks.push(Block::PodEvent { event });
self.assistant_streaming = false;
}
Event::SessionRotated { entry } => {
self.reset_for_rotation();
self.apply_log_entry_raw(&entry);
self.assistant_streaming = false;
}
Event::HookInjectedItems { entry } => {
self.apply_log_entry_raw(&entry);
Event::SystemItem { item } => {
self.apply_system_item(&item);
self.assistant_streaming = false;
}
Event::TurnStart { .. } => {
@ -984,11 +976,51 @@ impl App {
self.push_history_item(&item_value);
}
}
session_store::LogEntry::SystemItems { items, .. } => {
for system_item in items {
let value =
serde_json::to_value(&system_item).expect("SystemItem is Serialize");
self.apply_system_item(&value);
}
}
// Non-history-bearing variants don't affect the block view.
_ => {}
}
}
/// Dispatch one `SystemItem` JSON value into the appropriate block.
///
/// Kind-based routing replaces the old free-text `[Notification]` /
/// `[File: …]` parsing path: each kind maps directly to a typed
/// block (`Block::Notify`, `Block::PodEvent`, …).
fn apply_system_item(&mut self, value: &serde_json::Value) {
let Ok(item) = serde_json::from_value::<session_store::SystemItem>(value.clone()) else {
// Unknown / forward-compat shape: fall back to rendering the
// raw text payload (if any) as a generic system message.
if let Some(text) = value.get("body").and_then(|b| b.as_str()) {
self.task_store.apply_system_message_text(text);
self.blocks.push(Block::SystemMessage {
text: text.to_owned(),
});
}
return;
};
match item {
session_store::SystemItem::Notification { message, .. } => {
self.blocks.push(Block::Notify { message });
}
session_store::SystemItem::PodEvent { event, .. } => {
self.blocks.push(Block::PodEvent { event });
}
session_store::SystemItem::FileAttachment { body, .. }
| session_store::SystemItem::Knowledge { body, .. }
| session_store::SystemItem::Workflow { body, .. } => {
self.task_store.apply_system_message_text(&body);
self.blocks.push(Block::SystemMessage { text: body });
}
}
}
/// Sweep all current tool-call blocks: any that never resolved into
/// a Done / Error state get marked Incomplete. Called after a
/// snapshot replay so dangling in-flight tool calls in the seed
@ -1422,18 +1454,14 @@ mod completion_flow_tests {
}
#[test]
fn live_hook_injected_items_event_appends_system_message_block() {
fn live_system_item_workflow_appends_system_message_block() {
let mut app = App::new("test".into());
let entry = serde_json::json!({
"kind": "hook_injected_items",
"ts": 1,
"items": [{
"kind": "message",
"role": "system",
"content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }],
}],
let item = serde_json::json!({
"kind": "workflow",
"slug": "build",
"body": "[Workflow /build]\nRun the build",
});
app.handle_pod_event(Event::HookInjectedItems { entry });
app.handle_pod_event(Event::SystemItem { item });
assert!(matches!(
app.blocks.as_slice(),
@ -1441,6 +1469,39 @@ mod completion_flow_tests {
));
}
#[test]
fn live_system_item_notification_appends_notify_block() {
let mut app = App::new("test".into());
let item = serde_json::json!({
"kind": "notification",
"message": "hi",
"body": "[Notification] hi",
});
app.handle_pod_event(Event::SystemItem { item });
assert!(matches!(
app.blocks.as_slice(),
[Block::Notify { message }] if message == "hi"
));
}
#[test]
fn live_system_item_pod_event_appends_pod_event_block() {
let mut app = App::new("test".into());
let item = serde_json::json!({
"kind": "pod_event",
"event": { "kind": "turn_ended", "pod_name": "child" },
"body": "[Notification] pod `child` finished a turn",
});
app.handle_pod_event(Event::SystemItem { item });
assert_eq!(app.blocks.len(), 1);
match &app.blocks[0] {
Block::PodEvent {
event: protocol::PodEvent::TurnEnded { pod_name },
} => assert_eq!(pod_name, "child"),
_ => panic!("expected a PodEvent block"),
}
}
#[test]
fn compact_done_replaces_live_block() {
let mut app = App::new("test".into());
@ -1577,15 +1638,13 @@ mod completion_flow_tests {
```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \
\"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \
\"description\": \"d\"\n }\n ]\n}\n```\n";
app.handle_pod_event(Event::HookInjectedItems {
entry: serde_json::json!({
"kind": "hook_injected_items",
"ts": 1,
"items": [{
"kind": "message",
"role": "system",
"content": [{ "kind": "text", "text": snapshot }],
}],
// Snapshot text injected as a workflow body (kind doesn't matter
// for task-store parsing, only the text contents do).
app.handle_pod_event(Event::SystemItem {
item: serde_json::json!({
"kind": "workflow",
"slug": "task-snapshot",
"body": snapshot,
}),
});

View File

@ -1,80 +0,0 @@
# Event / LogEntry: System 注入経路を SystemItem 一本に統合する
## 背景
エージェントシステム (= ユーザー由来でも LLM 由来でもない、Pod 自身) が LLM context に注入する `role:system``Item::Message` は、現状 3 系統の ad-hoc 経路で並走している:
1. **`Method::Notify`** — 外部からの非同期メッセージ
- controller → `Event::Notify { message }` (生 message echo)
- `pod.push_notify(message)``NotifyBuffer``pending_history_appends``[Notification] <msg>` の system_message として history に commit
2. **`Method::PodEvent`** — 子 pod のライフサイクル通知
- controller → `Event::PodEvent(event)` (typed echo)
- `render_event` で 1 行整形 → `NotifyBuffer` (Notify と合流) → 同じく `[Notification] <rendered>` として commit
3. **Interceptor 内部注入**`@<path>` / `#<slug>` / `/<slug>` の解決結果
- `PodInterceptor::on_prompt_submit``ContinueWith``[File: <path>]` / `[Knowledge: <slug>]` / workflow 本文の system_message を history に append
- wire echo は無し
これらは全部 「**人でも LLM でもなく、エージェントシステムが LLM に与えた情報**」 という同一カテゴリで、history への commit 形 (`role:system` の `Item::Message`) もほぼ同じだが、wire event 側は echo/typed/未送信が混在し、TUI 側のブロックも `Block::Notify` / `Block::PodEvent` / `Block::SystemMessage` の 3 つに分かれている。
加えて `LogEntry::HookInjectedItems` という命名が誤称: 実際に注入しているのは公開 `Hook` ではなく **`Interceptor`** で、内部機構専用の経路。`hook.rs` モジュール doc でも 「Hook は read-only な公開 extension surface」 「内部機構は Interceptor を使う」 と明確に分離されている。
このばらつきの結果:
- wire 上、同じ通知が `Event::Notify` (生) + `Event::HookInjectedItems` (整形版) の 2 重に流れて TUI が重複描画した (`pod-state-from-session-log` 改修中に表面化)
- kind 判別がテキストプレフィックス (`[Notification] ...` / `[File: ...]`) 頼みで脆い
- 新しい注入種 (`<system-reminder>` 等) を足すたびに 1 系統増える設計圧力
- `Method::Notify` の "Notify" 語感が view-only な `Alerter` (本来 "Notification" 寄り) とぶつかっている
LLM は `role:system` を生成しないため、worker.history 中の `role:system` 項目は構造的にすべてこのエージェント注入経路に由来する。この性質を型として表に出す。
## 方針
`Tool` パターンに倣って 「**1 つの concept + kind ベース dispatch**」 に統合する。
- wire event は 1 種類: `Event::SystemItem { kind, payload }` (1 件ずつライブ配信)
- LogEntry は kind 揃いで batch する単一バリアントに置き換え、`Hook` 命名を捨てる: `LogEntry::SystemItems { ts, items: Vec<SystemItem> }`
- Pod 内部の注入路 (NotifyBuffer / `format_notify` / `render_event` / Interceptor.ContinueWith) は **全部「kind 付き `SystemItem` を作って worker.history に commit」 という単一形式に合流**
- TUI は kind 別に Block を出し分け (現 `ToolCallBlock` がツール別に見た目を出すのと同じ構造)
単数/複数の使い分けは既存パターンに揃える:
- 1 件単位の wire event は `Event::SystemItem` (`Event::TextDelta` と同じ呼吸)
- 永続バッチは `LogEntry::SystemItems``Vec<SystemItem>` を内包 (`LogEntry::AssistantItems` / `ToolResults` と同じ呼吸)
`Method::Notify` / `Method::PodEvent` は外部 API としてはそのまま残す (入口の意味付けは別)。 中で `SystemItem::Notification` / `SystemItem::PodEvent` に変換されて以後は単一経路、という整理。
`Event::Alert` (= LLM context に乗らない純 UI 通知) は **別経路として明確に残す**。 view-only な persistent stream (Alerter の subscribe_with_snapshot) としてすでに正しく機能している。 "Notification" 語感の衝突は、本チケットで context 注入側を `SystemItem` に rename することで解消する (Notification は `SystemItem` の一 kind に格下げ、`Alerter` が "Notification" 語感の本来のオーナーに戻る)。
## 要件
- wire event は 1 種類: `Event::SystemItem { kind, payload }` で全注入が乗る。 `Event::Notify` / `Event::PodEvent` / `Event::HookInjectedItems` は protocol から削除
- LogEntry は `HookInjectedItems` を rename + items を kind 付き typed shape に置換。 新名 `LogEntry::SystemItems { ts, items: Vec<SystemItem> }` で wire tag は `system_items`
- `SystemItem` の kind 列挙は最低限以下を含む:
- `Notification { message }` (`Method::Notify` 由来)
- `PodEvent { event: PodEvent }` (子 pod ライフサイクル)
- `FileAttachment { path, content }` (`@<path>` 解決)
- `Knowledge { slug, body }` (`#<slug>` 解決)
- `Workflow { slug, body }` (`/<slug>` 解決)
- 将来追加可能 (`Reminder` 等) を見越した拡張点
- Pod 側の `NotifyBuffer` / `format_notify` / `render_event` / `Interceptor::on_prompt_submit ContinueWith``SystemItem` を中間表現として通る。 worker.history への append は最終的に `Item::system_message` + 対応する `SystemItem` 1 件を `LogEntry::SystemItems` として commit
- TUI は `Event::SystemItem` を kind で dispatch して描画する。 既存 `Block::Notify` / `Block::PodEvent` / `Block::SystemMessage``Block::SystemItem(SystemItemBlock)` に集約 (or 既存 Block を再利用しつつ駆動イベントだけ統一)
- `Method::Notify` / `Method::PodEvent` (外部入口 API) は名前を維持し、内部で `SystemItem::Notification` / `SystemItem::PodEvent` に変換される
- `Event::Alert` / `Alerter` は無変更
## 完了条件
- `Event::Notify` / `Event::PodEvent` / `Event::HookInjectedItems` が protocol から削除されている
- `LogEntry::HookInjectedItems` が削除され、`LogEntry::SystemItems` に置き換わっている (旧 wire tag を deserialize alias で残すかは実装判断)
- TUI が `Event::SystemItem` 駆動で system 系ブロックを構築している。 ライブ通知の二重描画が起きない
- `Method::Notify``Method::PodEvent` は外部 API としては変わらず動く
- `Event::Alert` / `Alerter` 経路は無変更
## 範囲外
- `Method::Notify` / `Method::PodEvent` の rename (入口名の整理は別の話)
- `Event::Alert` / `Alerter` 系の変更
- 旧 session log (`hook_injected_items` を含む) のファイル変換: deserialize alias で読めるところまでで、ファイル書き換えは行わない
- TUI 内の `Block::SystemItem` 詳細な視覚設計
## 関連
- 前提となる `tickets/pod-state-from-session-log.md` (state 正本を session log に統合) の後続。 同チケット内で `Event::HookInjectedItems` を導入したが、 直後に「Hook 命名は誤り」「Notify/PodEvent と二重」と判明したため本チケットで整理する
- CLAUDE.md の 「context に乗せる前に history に commit する」 加工原則に整合する整理 (現実装の経路を統一形にするだけで、原則自体は変わらない)