fix: 実態にそぐわないEvent::Entryを実装した構造を訂正
This commit is contained in:
parent
0f76142993
commit
350bb1afd8
|
|
@ -96,13 +96,30 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// Live session-log entries → this client as Event::Entry.
|
// Live session-log entries → dispatched as the role-specific
|
||||||
|
// wire events. The sink only broadcasts entries that the
|
||||||
|
// streaming-event lane doesn't cover; everything else is
|
||||||
|
// already on the wire via TextDelta / ToolCall* / etc., so we
|
||||||
|
// never see (and never need to forward) other variants here.
|
||||||
entry = entry_rx.recv() => {
|
entry = entry_rx.recv() => {
|
||||||
match entry {
|
match entry {
|
||||||
Ok(entry) => {
|
Ok(entry) => {
|
||||||
let value = serde_json::to_value(&entry)
|
let value = serde_json::to_value(&entry)
|
||||||
.expect("LogEntry is Serialize");
|
.expect("LogEntry is Serialize");
|
||||||
if writer.write(&Event::Entry { entry: value }).await.is_err() {
|
let outbound = match &entry {
|
||||||
|
session_store::LogEntry::SessionStart { .. } => {
|
||||||
|
Some(Event::SessionRotated { entry: value })
|
||||||
|
}
|
||||||
|
session_store::LogEntry::HookInjectedItems { .. } => {
|
||||||
|
Some(Event::HookInjectedItems { entry: value })
|
||||||
|
}
|
||||||
|
// Defensive: should never reach here per
|
||||||
|
// `SessionLogSink::is_live_relevant`.
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
if let Some(event) = outbound
|
||||||
|
&& writer.write(&event).await.is_err()
|
||||||
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -83,11 +83,23 @@ impl SessionLogSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push `entry` to the mirror and broadcast it.
|
/// Push `entry` to the mirror; selectively broadcast it.
|
||||||
///
|
///
|
||||||
/// MUST be called only after the Pod has successfully persisted the
|
/// MUST be called only after the Pod has successfully persisted the
|
||||||
/// entry to the underlying `Store` — disk write is the gate. Failed
|
/// entry to the underlying `Store` — disk write is the gate. Failed
|
||||||
/// disk writes must not call `publish`.
|
/// disk writes must not call `publish`.
|
||||||
|
///
|
||||||
|
/// Live broadcast fires only for entries that the streaming-event
|
||||||
|
/// lane does not cover:
|
||||||
|
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
|
||||||
|
/// - `LogEntry::HookInjectedItems` → `Event::HookInjectedItems`.
|
||||||
|
/// Everything else (AssistantItems, ToolResults, UserInput, TurnEnd,
|
||||||
|
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
||||||
|
/// reflected in the mirror so reconnect snapshots stay accurate,
|
||||||
|
/// but is not sent live — the streaming events (TextDelta /
|
||||||
|
/// ToolCallStart / ToolResult / UserMessage / TurnEnd / etc.)
|
||||||
|
/// already provide that data, and re-broadcasting it as a typed
|
||||||
|
/// entry would just double-render every block on the client side.
|
||||||
pub fn publish(&self, entry: LogEntry) {
|
pub fn publish(&self, entry: LogEntry) {
|
||||||
let mut mirror = self
|
let mut mirror = self
|
||||||
.inner
|
.inner
|
||||||
|
|
@ -95,10 +107,21 @@ impl SessionLogSink {
|
||||||
.lock()
|
.lock()
|
||||||
.expect("session log mirror mutex poisoned");
|
.expect("session log mirror mutex poisoned");
|
||||||
mirror.push(entry.clone());
|
mirror.push(entry.clone());
|
||||||
// SendError means there are zero subscribers; harmless. We hold
|
if Self::is_live_relevant(&entry) {
|
||||||
// the mirror lock across `send` so that `subscribe_with_snapshot`
|
// SendError means there are zero subscribers; harmless. The
|
||||||
// cannot observe an inconsistent (snapshot, receiver) pair.
|
// mirror lock is held across `send` so subscribers cannot
|
||||||
let _ = self.inner.broadcast_tx.send(entry);
|
// observe an inconsistent (snapshot, receiver) pair.
|
||||||
|
let _ = self.inner.broadcast_tx.send(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `true` for entry kinds that the IPC layer forwards to clients
|
||||||
|
/// as a typed live event.
|
||||||
|
fn is_live_relevant(entry: &LogEntry) -> bool {
|
||||||
|
matches!(
|
||||||
|
entry,
|
||||||
|
LogEntry::SessionStart { .. } | LogEntry::HookInjectedItems { .. }
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Atomically swap the mirror to `[initial]` and broadcast the new
|
/// Atomically swap the mirror to `[initial]` and broadcast the new
|
||||||
|
|
@ -401,20 +424,40 @@ mod tests {
|
||||||
assert!(rx.try_recv().is_err());
|
assert!(rx.try_recv().is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn hook_injected(text: &str) -> LogEntry {
|
||||||
|
LogEntry::HookInjectedItems {
|
||||||
|
ts: now_millis(),
|
||||||
|
items: vec![session_store::LoggedItem::from(
|
||||||
|
&llm_worker::Item::system_message(text),
|
||||||
|
)],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn subscribe_then_publish_delivers_live_entries() {
|
fn subscribe_then_publish_delivers_only_live_relevant_entries() {
|
||||||
let sink = SessionLogSink::new();
|
let sink = SessionLogSink::new();
|
||||||
sink.publish(session_start());
|
sink.publish(session_start());
|
||||||
|
|
||||||
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
|
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
|
||||||
assert_eq!(snapshot.len(), 1);
|
assert_eq!(snapshot.len(), 1);
|
||||||
|
|
||||||
|
// TurnEnd is mirror-only — no live broadcast.
|
||||||
sink.publish(turn_end(1));
|
sink.publish(turn_end(1));
|
||||||
|
assert!(
|
||||||
|
rx.try_recv().is_err(),
|
||||||
|
"TurnEnd must not be broadcast live"
|
||||||
|
);
|
||||||
|
|
||||||
|
// HookInjectedItems is live-relevant.
|
||||||
|
sink.publish(hook_injected("[Notify] hi"));
|
||||||
match rx.try_recv() {
|
match rx.try_recv() {
|
||||||
Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {}
|
Ok(LogEntry::HookInjectedItems { .. }) => {}
|
||||||
other => panic!("unexpected: {other:?}"),
|
other => panic!("expected HookInjectedItems, got {other:?}"),
|
||||||
}
|
}
|
||||||
assert!(rx.try_recv().is_err());
|
|
||||||
|
// Mirror still grew with both entries (snapshot completeness).
|
||||||
|
let (after_snapshot, _) = sink.subscribe_with_snapshot();
|
||||||
|
assert_eq!(after_snapshot.len(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
@ -422,11 +465,11 @@ mod tests {
|
||||||
let sink = SessionLogSink::new();
|
let sink = SessionLogSink::new();
|
||||||
sink.publish(session_start());
|
sink.publish(session_start());
|
||||||
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
|
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
|
||||||
sink.publish(turn_end(1));
|
sink.publish(hook_injected("post-snapshot"));
|
||||||
|
|
||||||
assert_eq!(snapshot.len(), 1);
|
assert_eq!(snapshot.len(), 1);
|
||||||
match rx.try_recv() {
|
match rx.try_recv() {
|
||||||
Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {}
|
Ok(LogEntry::HookInjectedItems { .. }) => {}
|
||||||
other => panic!("unexpected: {other:?}"),
|
other => panic!("unexpected: {other:?}"),
|
||||||
}
|
}
|
||||||
assert!(rx.try_recv().is_err());
|
assert!(rx.try_recv().is_err());
|
||||||
|
|
|
||||||
|
|
@ -305,29 +305,45 @@ pub enum Event {
|
||||||
/// Sent exactly once at the start of every client connection.
|
/// Sent exactly once at the start of every client connection.
|
||||||
///
|
///
|
||||||
/// `entries` is the session-log mirror at subscribe time, serialised
|
/// `entries` is the session-log mirror at subscribe time, serialised
|
||||||
/// as the JSON form of `session_store::LogEntry`. Late attachers
|
/// as the JSON form of `session_store::LogEntry`. This is the
|
||||||
/// reconstruct view state by replaying entries through their own
|
/// bulk-reconstruction lane: clients walk the entries to seed their
|
||||||
/// `LogEntry → block` mapping, then continue applying live
|
/// derived view.
|
||||||
/// `Event::Entry` updates received after the snapshot.
|
|
||||||
///
|
///
|
||||||
/// `greeting` and `status` accompany the snapshot so clients render
|
/// `greeting` and `status` accompany the snapshot so clients render
|
||||||
/// pod identity and current controller state without an extra round
|
/// pod identity and current controller state without an extra round
|
||||||
/// trip.
|
/// trip.
|
||||||
|
///
|
||||||
|
/// Live updates after the snapshot arrive through the streaming
|
||||||
|
/// events (`TextDelta` / `ToolCall*` / `ToolResult` / etc.) plus
|
||||||
|
/// the two role-specific entry events
|
||||||
|
/// (`SessionRotated` / `HookInjectedItems`) — there is no generic
|
||||||
|
/// "every committed entry" broadcast.
|
||||||
Snapshot {
|
Snapshot {
|
||||||
entries: Vec<serde_json::Value>,
|
entries: Vec<serde_json::Value>,
|
||||||
greeting: Greeting,
|
greeting: Greeting,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
status: PodStatus,
|
status: PodStatus,
|
||||||
},
|
},
|
||||||
/// A single session-log entry committed atomically with the disk
|
/// Server-side session log rotated to a fresh `SessionStart`.
|
||||||
/// write. Streamed as the suffix following the connect-time
|
|
||||||
/// `Snapshot`; the prefix/suffix boundary is gap-free and
|
|
||||||
/// duplicate-free per `SessionLogSink` semantics.
|
|
||||||
///
|
///
|
||||||
/// Payload is the JSON form of `session_store::LogEntry`. Clients
|
/// Fires on compaction and on auto-fork when the store head drifts
|
||||||
/// deserialize as needed to render typed atoms (e.g.
|
/// from the live writer's cached head. Clients drop their derived
|
||||||
/// `UserInput.segments`).
|
/// view and reseed from `entry.history` exactly the way they would
|
||||||
Entry {
|
/// from a connect-time `Snapshot`.
|
||||||
|
///
|
||||||
|
/// Payload is the JSON form of `session_store::LogEntry::SessionStart`.
|
||||||
|
SessionRotated {
|
||||||
|
entry: serde_json::Value,
|
||||||
|
},
|
||||||
|
/// A non-LLM-driven history append landed in the worker history.
|
||||||
|
///
|
||||||
|
/// Carries the JSON form of `session_store::LogEntry::HookInjectedItems`.
|
||||||
|
/// This is the live counterpart of items that the streaming lane
|
||||||
|
/// never broadcasts — `Method::Notify` echoes, `@<path>` attachment
|
||||||
|
/// resolutions, `<system-reminder>` injections — so a connected
|
||||||
|
/// client can render them in time order without waiting for the
|
||||||
|
/// next reconnect's `Snapshot`.
|
||||||
|
HookInjectedItems {
|
||||||
entry: serde_json::Value,
|
entry: serde_json::Value,
|
||||||
},
|
},
|
||||||
/// Current Pod controller status. Broadcast on every controller-level
|
/// Current Pod controller status. Broadcast on every controller-level
|
||||||
|
|
@ -759,18 +775,36 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn event_entry_roundtrip() {
|
fn event_session_rotated_roundtrip() {
|
||||||
let event = Event::Entry {
|
let event = Event::SessionRotated {
|
||||||
entry: serde_json::json!({"kind": "assistant_items", "ts": 42, "items": []}),
|
entry: serde_json::json!({"kind": "session_start", "ts": 1, "history": []}),
|
||||||
};
|
};
|
||||||
let json = serde_json::to_string(&event).unwrap();
|
let json = serde_json::to_string(&event).unwrap();
|
||||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
assert_eq!(parsed["event"], "entry");
|
assert_eq!(parsed["event"], "session_rotated");
|
||||||
assert_eq!(parsed["data"]["entry"]["kind"], "assistant_items");
|
assert_eq!(parsed["data"]["entry"]["kind"], "session_start");
|
||||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||||
match decoded {
|
match decoded {
|
||||||
Event::Entry { entry } => assert_eq!(entry["kind"], "assistant_items"),
|
Event::SessionRotated { entry } => assert_eq!(entry["kind"], "session_start"),
|
||||||
other => panic!("expected Entry, got {other:?}"),
|
other => panic!("expected SessionRotated, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_hook_injected_items_roundtrip() {
|
||||||
|
let event = Event::HookInjectedItems {
|
||||||
|
entry: serde_json::json!({"kind": "hook_injected_items", "ts": 42, "items": []}),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&event).unwrap();
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(parsed["event"], "hook_injected_items");
|
||||||
|
assert_eq!(parsed["data"]["entry"]["kind"], "hook_injected_items");
|
||||||
|
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
Event::HookInjectedItems { entry } => {
|
||||||
|
assert_eq!(entry["kind"], "hook_injected_items")
|
||||||
|
}
|
||||||
|
other => panic!("expected HookInjectedItems, got {other:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -491,8 +491,14 @@ impl App {
|
||||||
self.blocks.push(Block::PodEvent { event });
|
self.blocks.push(Block::PodEvent { event });
|
||||||
self.assistant_streaming = false;
|
self.assistant_streaming = false;
|
||||||
}
|
}
|
||||||
Event::Entry { entry } => {
|
Event::SessionRotated { entry } => {
|
||||||
self.apply_log_entry(&entry);
|
self.reset_for_rotation();
|
||||||
|
self.apply_log_entry_raw(&entry);
|
||||||
|
self.assistant_streaming = false;
|
||||||
|
}
|
||||||
|
Event::HookInjectedItems { entry } => {
|
||||||
|
self.apply_log_entry_raw(&entry);
|
||||||
|
self.assistant_streaming = false;
|
||||||
}
|
}
|
||||||
Event::TurnStart { .. } => {
|
Event::TurnStart { .. } => {
|
||||||
self.set_pod_status(PodStatus::Running);
|
self.set_pod_status(PodStatus::Running);
|
||||||
|
|
@ -926,33 +932,22 @@ impl App {
|
||||||
self.mark_orphan_tool_calls_incomplete_pass();
|
self.mark_orphan_tool_calls_incomplete_pass();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply a single live `Event::Entry`.
|
/// Drop the derived view in preparation for replaying a new
|
||||||
///
|
/// `SessionStart` (compaction / fork). Greeting is preserved
|
||||||
/// `SessionStart` entries that arrive live (compaction / fork)
|
/// because the Pod identity hasn't changed.
|
||||||
/// reset the block list to a freshly seeded view, matching what a
|
fn reset_for_rotation(&mut self) {
|
||||||
/// reconnect's `Event::Snapshot` would produce.
|
let greeting = self.blocks.iter().find_map(|b| match b {
|
||||||
fn apply_log_entry(&mut self, entry: &serde_json::Value) {
|
Block::Greeting(g) => Some(g.clone()),
|
||||||
if entry.get("kind").and_then(|k| k.as_str()) == Some("session_start") {
|
_ => None,
|
||||||
// Compaction / fork on the server side. Reset our derived
|
});
|
||||||
// view but keep the greeting (identity hasn't changed).
|
self.turn_index = 0;
|
||||||
let greeting = self
|
self.blocks.clear();
|
||||||
.blocks
|
self.cache = FileCache::new();
|
||||||
.iter()
|
self.task_store = TaskStore::new();
|
||||||
.find_map(|b| match b {
|
self.task_pane_scroll = 0;
|
||||||
Block::Greeting(g) => Some(g.clone()),
|
if let Some(g) = greeting {
|
||||||
_ => None,
|
self.blocks.push(Block::Greeting(g));
|
||||||
});
|
|
||||||
self.turn_index = 0;
|
|
||||||
self.blocks.clear();
|
|
||||||
self.cache = FileCache::new();
|
|
||||||
self.task_store = TaskStore::new();
|
|
||||||
self.task_pane_scroll = 0;
|
|
||||||
if let Some(g) = greeting {
|
|
||||||
self.blocks.push(Block::Greeting(g));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
self.apply_log_entry_raw(entry);
|
|
||||||
self.assistant_streaming = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Walk a single `LogEntry` JSON value and translate it into blocks
|
/// Walk a single `LogEntry` JSON value and translate it into blocks
|
||||||
|
|
@ -1427,7 +1422,7 @@ mod completion_flow_tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn live_entry_routes_system_message_via_hook_injected_items() {
|
fn live_hook_injected_items_event_appends_system_message_block() {
|
||||||
let mut app = App::new("test".into());
|
let mut app = App::new("test".into());
|
||||||
let entry = serde_json::json!({
|
let entry = serde_json::json!({
|
||||||
"kind": "hook_injected_items",
|
"kind": "hook_injected_items",
|
||||||
|
|
@ -1438,7 +1433,7 @@ mod completion_flow_tests {
|
||||||
"content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }],
|
"content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }],
|
||||||
}],
|
}],
|
||||||
});
|
});
|
||||||
app.handle_pod_event(Event::Entry { entry });
|
app.handle_pod_event(Event::HookInjectedItems { entry });
|
||||||
|
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
app.blocks.as_slice(),
|
app.blocks.as_slice(),
|
||||||
|
|
@ -1582,7 +1577,7 @@ mod completion_flow_tests {
|
||||||
```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \
|
```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \
|
||||||
\"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \
|
\"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \
|
||||||
\"description\": \"d\"\n }\n ]\n}\n```\n";
|
\"description\": \"d\"\n }\n ]\n}\n```\n";
|
||||||
app.handle_pod_event(Event::Entry {
|
app.handle_pod_event(Event::HookInjectedItems {
|
||||||
entry: serde_json::json!({
|
entry: serde_json::json!({
|
||||||
"kind": "hook_injected_items",
|
"kind": "hook_injected_items",
|
||||||
"ts": 1,
|
"ts": 1,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user