diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 664be199..23b29825 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -96,13 +96,30 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { loop { tokio::select! { - // Live session-log entries → this client as Event::Entry. + // Live session-log entries → dispatched as the role-specific + // wire events. The sink only broadcasts entries that the + // streaming-event lane doesn't cover; everything else is + // already on the wire via TextDelta / ToolCall* / etc., so we + // never see (and never need to forward) other variants here. entry = entry_rx.recv() => { match entry { Ok(entry) => { let value = serde_json::to_value(&entry) .expect("LogEntry is Serialize"); - if writer.write(&Event::Entry { entry: value }).await.is_err() { + let outbound = match &entry { + session_store::LogEntry::SessionStart { .. } => { + Some(Event::SessionRotated { entry: value }) + } + session_store::LogEntry::HookInjectedItems { .. } => { + Some(Event::HookInjectedItems { entry: value }) + } + // Defensive: should never reach here per + // `SessionLogSink::is_live_relevant`. + _ => None, + }; + if let Some(event) = outbound + && writer.write(&event).await.is_err() + { break; } } diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/session_log_sink.rs index ff94b645..683230d4 100644 --- a/crates/pod/src/session_log_sink.rs +++ b/crates/pod/src/session_log_sink.rs @@ -83,11 +83,23 @@ impl SessionLogSink { } } - /// Push `entry` to the mirror and broadcast it. + /// Push `entry` to the mirror; selectively broadcast it. /// /// MUST be called only after the Pod has successfully persisted the /// entry to the underlying `Store` — disk write is the gate. Failed /// disk writes must not call `publish`. + /// + /// Live broadcast fires only for entries that the streaming-event + /// lane does not cover: + /// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire. + /// - `LogEntry::HookInjectedItems` → `Event::HookInjectedItems`. + /// Everything else (AssistantItems, ToolResults, UserInput, TurnEnd, + /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is + /// reflected in the mirror so reconnect snapshots stay accurate, + /// but is not sent live — the streaming events (TextDelta / + /// ToolCallStart / ToolResult / UserMessage / TurnEnd / etc.) + /// already provide that data, and re-broadcasting it as a typed + /// entry would just double-render every block on the client side. pub fn publish(&self, entry: LogEntry) { let mut mirror = self .inner @@ -95,10 +107,21 @@ impl SessionLogSink { .lock() .expect("session log mirror mutex poisoned"); mirror.push(entry.clone()); - // SendError means there are zero subscribers; harmless. We hold - // the mirror lock across `send` so that `subscribe_with_snapshot` - // cannot observe an inconsistent (snapshot, receiver) pair. - let _ = self.inner.broadcast_tx.send(entry); + if Self::is_live_relevant(&entry) { + // SendError means there are zero subscribers; harmless. The + // mirror lock is held across `send` so subscribers cannot + // observe an inconsistent (snapshot, receiver) pair. + let _ = self.inner.broadcast_tx.send(entry); + } + } + + /// `true` for entry kinds that the IPC layer forwards to clients + /// as a typed live event. + fn is_live_relevant(entry: &LogEntry) -> bool { + matches!( + entry, + LogEntry::SessionStart { .. } | LogEntry::HookInjectedItems { .. } + ) } /// Atomically swap the mirror to `[initial]` and broadcast the new @@ -401,20 +424,40 @@ mod tests { assert!(rx.try_recv().is_err()); } + fn hook_injected(text: &str) -> LogEntry { + LogEntry::HookInjectedItems { + ts: now_millis(), + items: vec![session_store::LoggedItem::from( + &llm_worker::Item::system_message(text), + )], + } + } + #[test] - fn subscribe_then_publish_delivers_live_entries() { + fn subscribe_then_publish_delivers_only_live_relevant_entries() { let sink = SessionLogSink::new(); sink.publish(session_start()); let (snapshot, mut rx) = sink.subscribe_with_snapshot(); assert_eq!(snapshot.len(), 1); + // TurnEnd is mirror-only — no live broadcast. sink.publish(turn_end(1)); + assert!( + rx.try_recv().is_err(), + "TurnEnd must not be broadcast live" + ); + + // HookInjectedItems is live-relevant. + sink.publish(hook_injected("[Notify] hi")); match rx.try_recv() { - Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {} - other => panic!("unexpected: {other:?}"), + Ok(LogEntry::HookInjectedItems { .. }) => {} + other => panic!("expected HookInjectedItems, got {other:?}"), } - assert!(rx.try_recv().is_err()); + + // Mirror still grew with both entries (snapshot completeness). + let (after_snapshot, _) = sink.subscribe_with_snapshot(); + assert_eq!(after_snapshot.len(), 3); } #[test] @@ -422,11 +465,11 @@ mod tests { let sink = SessionLogSink::new(); sink.publish(session_start()); let (snapshot, mut rx) = sink.subscribe_with_snapshot(); - sink.publish(turn_end(1)); + sink.publish(hook_injected("post-snapshot")); assert_eq!(snapshot.len(), 1); match rx.try_recv() { - Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {} + Ok(LogEntry::HookInjectedItems { .. }) => {} other => panic!("unexpected: {other:?}"), } assert!(rx.try_recv().is_err()); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index fd5bed1f..262a6a7b 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -305,29 +305,45 @@ pub enum Event { /// Sent exactly once at the start of every client connection. /// /// `entries` is the session-log mirror at subscribe time, serialised - /// as the JSON form of `session_store::LogEntry`. Late attachers - /// reconstruct view state by replaying entries through their own - /// `LogEntry → block` mapping, then continue applying live - /// `Event::Entry` updates received after the snapshot. + /// as the JSON form of `session_store::LogEntry`. This is the + /// bulk-reconstruction lane: clients walk the entries to seed their + /// derived view. /// /// `greeting` and `status` accompany the snapshot so clients render /// pod identity and current controller state without an extra round /// trip. + /// + /// Live updates after the snapshot arrive through the streaming + /// events (`TextDelta` / `ToolCall*` / `ToolResult` / etc.) plus + /// the two role-specific entry events + /// (`SessionRotated` / `HookInjectedItems`) — there is no generic + /// "every committed entry" broadcast. Snapshot { entries: Vec, greeting: Greeting, #[serde(default)] status: PodStatus, }, - /// A single session-log entry committed atomically with the disk - /// write. Streamed as the suffix following the connect-time - /// `Snapshot`; the prefix/suffix boundary is gap-free and - /// duplicate-free per `SessionLogSink` semantics. + /// Server-side session log rotated to a fresh `SessionStart`. /// - /// Payload is the JSON form of `session_store::LogEntry`. Clients - /// deserialize as needed to render typed atoms (e.g. - /// `UserInput.segments`). - Entry { + /// Fires on compaction and on auto-fork when the store head drifts + /// from the live writer's cached head. Clients drop their derived + /// view and reseed from `entry.history` exactly the way they would + /// from a connect-time `Snapshot`. + /// + /// Payload is the JSON form of `session_store::LogEntry::SessionStart`. + SessionRotated { + entry: serde_json::Value, + }, + /// A non-LLM-driven history append landed in the worker history. + /// + /// Carries the JSON form of `session_store::LogEntry::HookInjectedItems`. + /// This is the live counterpart of items that the streaming lane + /// never broadcasts — `Method::Notify` echoes, `@` attachment + /// resolutions, `` injections — so a connected + /// client can render them in time order without waiting for the + /// next reconnect's `Snapshot`. + HookInjectedItems { entry: serde_json::Value, }, /// Current Pod controller status. Broadcast on every controller-level @@ -759,18 +775,36 @@ mod tests { } #[test] - fn event_entry_roundtrip() { - let event = Event::Entry { - entry: serde_json::json!({"kind": "assistant_items", "ts": 42, "items": []}), + fn event_session_rotated_roundtrip() { + let event = Event::SessionRotated { + entry: serde_json::json!({"kind": "session_start", "ts": 1, "history": []}), }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "entry"); - assert_eq!(parsed["data"]["entry"]["kind"], "assistant_items"); + assert_eq!(parsed["event"], "session_rotated"); + assert_eq!(parsed["data"]["entry"]["kind"], "session_start"); let decoded: Event = serde_json::from_str(&json).unwrap(); match decoded { - Event::Entry { entry } => assert_eq!(entry["kind"], "assistant_items"), - other => panic!("expected Entry, got {other:?}"), + Event::SessionRotated { entry } => assert_eq!(entry["kind"], "session_start"), + other => panic!("expected SessionRotated, got {other:?}"), + } + } + + #[test] + fn event_hook_injected_items_roundtrip() { + let event = Event::HookInjectedItems { + entry: serde_json::json!({"kind": "hook_injected_items", "ts": 42, "items": []}), + }; + let json = serde_json::to_string(&event).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["event"], "hook_injected_items"); + assert_eq!(parsed["data"]["entry"]["kind"], "hook_injected_items"); + let decoded: Event = serde_json::from_str(&json).unwrap(); + match decoded { + Event::HookInjectedItems { entry } => { + assert_eq!(entry["kind"], "hook_injected_items") + } + other => panic!("expected HookInjectedItems, got {other:?}"), } } diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 4d88f48b..2eb75ce5 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -491,8 +491,14 @@ impl App { self.blocks.push(Block::PodEvent { event }); self.assistant_streaming = false; } - Event::Entry { entry } => { - self.apply_log_entry(&entry); + Event::SessionRotated { entry } => { + self.reset_for_rotation(); + self.apply_log_entry_raw(&entry); + self.assistant_streaming = false; + } + Event::HookInjectedItems { entry } => { + self.apply_log_entry_raw(&entry); + self.assistant_streaming = false; } Event::TurnStart { .. } => { self.set_pod_status(PodStatus::Running); @@ -926,33 +932,22 @@ impl App { self.mark_orphan_tool_calls_incomplete_pass(); } - /// Apply a single live `Event::Entry`. - /// - /// `SessionStart` entries that arrive live (compaction / fork) - /// reset the block list to a freshly seeded view, matching what a - /// reconnect's `Event::Snapshot` would produce. - fn apply_log_entry(&mut self, entry: &serde_json::Value) { - if entry.get("kind").and_then(|k| k.as_str()) == Some("session_start") { - // Compaction / fork on the server side. Reset our derived - // view but keep the greeting (identity hasn't changed). - let greeting = self - .blocks - .iter() - .find_map(|b| match b { - Block::Greeting(g) => Some(g.clone()), - _ => None, - }); - self.turn_index = 0; - self.blocks.clear(); - self.cache = FileCache::new(); - self.task_store = TaskStore::new(); - self.task_pane_scroll = 0; - if let Some(g) = greeting { - self.blocks.push(Block::Greeting(g)); - } + /// Drop the derived view in preparation for replaying a new + /// `SessionStart` (compaction / fork). Greeting is preserved + /// because the Pod identity hasn't changed. + fn reset_for_rotation(&mut self) { + let greeting = self.blocks.iter().find_map(|b| match b { + Block::Greeting(g) => Some(g.clone()), + _ => None, + }); + self.turn_index = 0; + self.blocks.clear(); + self.cache = FileCache::new(); + self.task_store = TaskStore::new(); + self.task_pane_scroll = 0; + if let Some(g) = greeting { + self.blocks.push(Block::Greeting(g)); } - self.apply_log_entry_raw(entry); - self.assistant_streaming = false; } /// Walk a single `LogEntry` JSON value and translate it into blocks @@ -1427,7 +1422,7 @@ mod completion_flow_tests { } #[test] - fn live_entry_routes_system_message_via_hook_injected_items() { + fn live_hook_injected_items_event_appends_system_message_block() { let mut app = App::new("test".into()); let entry = serde_json::json!({ "kind": "hook_injected_items", @@ -1438,7 +1433,7 @@ mod completion_flow_tests { "content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }], }], }); - app.handle_pod_event(Event::Entry { entry }); + app.handle_pod_event(Event::HookInjectedItems { entry }); assert!(matches!( app.blocks.as_slice(), @@ -1582,7 +1577,7 @@ mod completion_flow_tests { ```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \ \"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \ \"description\": \"d\"\n }\n ]\n}\n```\n"; - app.handle_pod_event(Event::Entry { + app.handle_pod_event(Event::HookInjectedItems { entry: serde_json::json!({ "kind": "hook_injected_items", "ts": 1,