fix: 実態にそぐわないEvent::Entryを実装した構造を訂正

This commit is contained in:
Keisuke Hirata 2026-05-14 03:35:52 +09:00
parent e7064878c2
commit f73e648929
4 changed files with 152 additions and 63 deletions

View File

@ -96,13 +96,30 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
loop {
tokio::select! {
// Live session-log entries → this client as Event::Entry.
// Live session-log entries → dispatched as the role-specific
// wire events. The sink only broadcasts entries that the
// streaming-event lane doesn't cover; everything else is
// already on the wire via TextDelta / ToolCall* / etc., so we
// never see (and never need to forward) other variants here.
entry = entry_rx.recv() => {
match entry {
Ok(entry) => {
let value = serde_json::to_value(&entry)
.expect("LogEntry is Serialize");
if writer.write(&Event::Entry { entry: value }).await.is_err() {
let outbound = match &entry {
session_store::LogEntry::SessionStart { .. } => {
Some(Event::SessionRotated { entry: value })
}
session_store::LogEntry::HookInjectedItems { .. } => {
Some(Event::HookInjectedItems { entry: value })
}
// Defensive: should never reach here per
// `SessionLogSink::is_live_relevant`.
_ => None,
};
if let Some(event) = outbound
&& writer.write(&event).await.is_err()
{
break;
}
}

View File

@ -83,11 +83,23 @@ impl SessionLogSink {
}
}
/// Push `entry` to the mirror and broadcast it.
/// Push `entry` to the mirror; selectively broadcast it.
///
/// MUST be called only after the Pod has successfully persisted the
/// entry to the underlying `Store` — disk write is the gate. Failed
/// disk writes must not call `publish`.
///
/// Live broadcast fires only for entries that the streaming-event
/// lane does not cover:
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
/// - `LogEntry::HookInjectedItems` → `Event::HookInjectedItems`.
/// Everything else (AssistantItems, ToolResults, UserInput, TurnEnd,
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
/// reflected in the mirror so reconnect snapshots stay accurate,
/// but is not sent live — the streaming events (TextDelta /
/// ToolCallStart / ToolResult / UserMessage / TurnEnd / etc.)
/// already provide that data, and re-broadcasting it as a typed
/// entry would just double-render every block on the client side.
pub fn publish(&self, entry: LogEntry) {
let mut mirror = self
.inner
@ -95,10 +107,21 @@ impl SessionLogSink {
.lock()
.expect("session log mirror mutex poisoned");
mirror.push(entry.clone());
// SendError means there are zero subscribers; harmless. We hold
// the mirror lock across `send` so that `subscribe_with_snapshot`
// cannot observe an inconsistent (snapshot, receiver) pair.
let _ = self.inner.broadcast_tx.send(entry);
if Self::is_live_relevant(&entry) {
// SendError means there are zero subscribers; harmless. The
// mirror lock is held across `send` so subscribers cannot
// observe an inconsistent (snapshot, receiver) pair.
let _ = self.inner.broadcast_tx.send(entry);
}
}
/// `true` for entry kinds that the IPC layer forwards to clients
/// as a typed live event.
fn is_live_relevant(entry: &LogEntry) -> bool {
matches!(
entry,
LogEntry::SessionStart { .. } | LogEntry::HookInjectedItems { .. }
)
}
/// Atomically swap the mirror to `[initial]` and broadcast the new
@ -401,20 +424,40 @@ mod tests {
assert!(rx.try_recv().is_err());
}
fn hook_injected(text: &str) -> LogEntry {
LogEntry::HookInjectedItems {
ts: now_millis(),
items: vec![session_store::LoggedItem::from(
&llm_worker::Item::system_message(text),
)],
}
}
#[test]
fn subscribe_then_publish_delivers_live_entries() {
fn subscribe_then_publish_delivers_only_live_relevant_entries() {
let sink = SessionLogSink::new();
sink.publish(session_start());
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
assert_eq!(snapshot.len(), 1);
// TurnEnd is mirror-only — no live broadcast.
sink.publish(turn_end(1));
assert!(
rx.try_recv().is_err(),
"TurnEnd must not be broadcast live"
);
// HookInjectedItems is live-relevant.
sink.publish(hook_injected("[Notify] hi"));
match rx.try_recv() {
Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {}
other => panic!("unexpected: {other:?}"),
Ok(LogEntry::HookInjectedItems { .. }) => {}
other => panic!("expected HookInjectedItems, got {other:?}"),
}
assert!(rx.try_recv().is_err());
// Mirror still grew with both entries (snapshot completeness).
let (after_snapshot, _) = sink.subscribe_with_snapshot();
assert_eq!(after_snapshot.len(), 3);
}
#[test]
@ -422,11 +465,11 @@ mod tests {
let sink = SessionLogSink::new();
sink.publish(session_start());
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
sink.publish(turn_end(1));
sink.publish(hook_injected("post-snapshot"));
assert_eq!(snapshot.len(), 1);
match rx.try_recv() {
Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {}
Ok(LogEntry::HookInjectedItems { .. }) => {}
other => panic!("unexpected: {other:?}"),
}
assert!(rx.try_recv().is_err());

View File

@ -305,29 +305,45 @@ pub enum Event {
/// Sent exactly once at the start of every client connection.
///
/// `entries` is the session-log mirror at subscribe time, serialised
/// as the JSON form of `session_store::LogEntry`. Late attachers
/// reconstruct view state by replaying entries through their own
/// `LogEntry → block` mapping, then continue applying live
/// `Event::Entry` updates received after the snapshot.
/// as the JSON form of `session_store::LogEntry`. This is the
/// bulk-reconstruction lane: clients walk the entries to seed their
/// derived view.
///
/// `greeting` and `status` accompany the snapshot so clients render
/// pod identity and current controller state without an extra round
/// trip.
///
/// Live updates after the snapshot arrive through the streaming
/// events (`TextDelta` / `ToolCall*` / `ToolResult` / etc.) plus
/// the two role-specific entry events
/// (`SessionRotated` / `HookInjectedItems`) — there is no generic
/// "every committed entry" broadcast.
Snapshot {
entries: Vec<serde_json::Value>,
greeting: Greeting,
#[serde(default)]
status: PodStatus,
},
/// A single session-log entry committed atomically with the disk
/// write. Streamed as the suffix following the connect-time
/// `Snapshot`; the prefix/suffix boundary is gap-free and
/// duplicate-free per `SessionLogSink` semantics.
/// Server-side session log rotated to a fresh `SessionStart`.
///
/// Payload is the JSON form of `session_store::LogEntry`. Clients
/// deserialize as needed to render typed atoms (e.g.
/// `UserInput.segments`).
Entry {
/// Fires on compaction and on auto-fork when the store head drifts
/// from the live writer's cached head. Clients drop their derived
/// view and reseed from `entry.history` exactly the way they would
/// from a connect-time `Snapshot`.
///
/// Payload is the JSON form of `session_store::LogEntry::SessionStart`.
SessionRotated {
entry: serde_json::Value,
},
/// A non-LLM-driven history append landed in the worker history.
///
/// Carries the JSON form of `session_store::LogEntry::HookInjectedItems`.
/// This is the live counterpart of items that the streaming lane
/// never broadcasts — `Method::Notify` echoes, `@<path>` attachment
/// resolutions, `<system-reminder>` injections — so a connected
/// client can render them in time order without waiting for the
/// next reconnect's `Snapshot`.
HookInjectedItems {
entry: serde_json::Value,
},
/// Current Pod controller status. Broadcast on every controller-level
@ -759,18 +775,36 @@ mod tests {
}
#[test]
fn event_entry_roundtrip() {
let event = Event::Entry {
entry: serde_json::json!({"kind": "assistant_items", "ts": 42, "items": []}),
fn event_session_rotated_roundtrip() {
let event = Event::SessionRotated {
entry: serde_json::json!({"kind": "session_start", "ts": 1, "history": []}),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "entry");
assert_eq!(parsed["data"]["entry"]["kind"], "assistant_items");
assert_eq!(parsed["event"], "session_rotated");
assert_eq!(parsed["data"]["entry"]["kind"], "session_start");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::Entry { entry } => assert_eq!(entry["kind"], "assistant_items"),
other => panic!("expected Entry, got {other:?}"),
Event::SessionRotated { entry } => assert_eq!(entry["kind"], "session_start"),
other => panic!("expected SessionRotated, got {other:?}"),
}
}
#[test]
fn event_hook_injected_items_roundtrip() {
let event = Event::HookInjectedItems {
entry: serde_json::json!({"kind": "hook_injected_items", "ts": 42, "items": []}),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "hook_injected_items");
assert_eq!(parsed["data"]["entry"]["kind"], "hook_injected_items");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::HookInjectedItems { entry } => {
assert_eq!(entry["kind"], "hook_injected_items")
}
other => panic!("expected HookInjectedItems, got {other:?}"),
}
}

View File

@ -491,8 +491,14 @@ impl App {
self.blocks.push(Block::PodEvent { event });
self.assistant_streaming = false;
}
Event::Entry { entry } => {
self.apply_log_entry(&entry);
Event::SessionRotated { entry } => {
self.reset_for_rotation();
self.apply_log_entry_raw(&entry);
self.assistant_streaming = false;
}
Event::HookInjectedItems { entry } => {
self.apply_log_entry_raw(&entry);
self.assistant_streaming = false;
}
Event::TurnStart { .. } => {
self.set_pod_status(PodStatus::Running);
@ -926,33 +932,22 @@ impl App {
self.mark_orphan_tool_calls_incomplete_pass();
}
/// Apply a single live `Event::Entry`.
///
/// `SessionStart` entries that arrive live (compaction / fork)
/// reset the block list to a freshly seeded view, matching what a
/// reconnect's `Event::Snapshot` would produce.
fn apply_log_entry(&mut self, entry: &serde_json::Value) {
if entry.get("kind").and_then(|k| k.as_str()) == Some("session_start") {
// Compaction / fork on the server side. Reset our derived
// view but keep the greeting (identity hasn't changed).
let greeting = self
.blocks
.iter()
.find_map(|b| match b {
Block::Greeting(g) => Some(g.clone()),
_ => None,
});
self.turn_index = 0;
self.blocks.clear();
self.cache = FileCache::new();
self.task_store = TaskStore::new();
self.task_pane_scroll = 0;
if let Some(g) = greeting {
self.blocks.push(Block::Greeting(g));
}
/// Drop the derived view in preparation for replaying a new
/// `SessionStart` (compaction / fork). Greeting is preserved
/// because the Pod identity hasn't changed.
fn reset_for_rotation(&mut self) {
let greeting = self.blocks.iter().find_map(|b| match b {
Block::Greeting(g) => Some(g.clone()),
_ => None,
});
self.turn_index = 0;
self.blocks.clear();
self.cache = FileCache::new();
self.task_store = TaskStore::new();
self.task_pane_scroll = 0;
if let Some(g) = greeting {
self.blocks.push(Block::Greeting(g));
}
self.apply_log_entry_raw(entry);
self.assistant_streaming = false;
}
/// Walk a single `LogEntry` JSON value and translate it into blocks
@ -1427,7 +1422,7 @@ mod completion_flow_tests {
}
#[test]
fn live_entry_routes_system_message_via_hook_injected_items() {
fn live_hook_injected_items_event_appends_system_message_block() {
let mut app = App::new("test".into());
let entry = serde_json::json!({
"kind": "hook_injected_items",
@ -1438,7 +1433,7 @@ mod completion_flow_tests {
"content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }],
}],
});
app.handle_pod_event(Event::Entry { entry });
app.handle_pod_event(Event::HookInjectedItems { entry });
assert!(matches!(
app.blocks.as_slice(),
@ -1582,7 +1577,7 @@ mod completion_flow_tests {
```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \
\"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \
\"description\": \"d\"\n }\n ]\n}\n```\n";
app.handle_pod_event(Event::Entry {
app.handle_pod_event(Event::HookInjectedItems {
entry: serde_json::json!({
"kind": "hook_injected_items",
"ts": 1,