diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 1cf822c5..c9f9f140 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -401,10 +401,8 @@ fn wire_event_bridges_on_worker( /// off the channel and commits each item as a typed `LogEntry` through /// the supplied store + sink. Lives as long as the controller; exits /// when the sender is dropped (controller shutdown). -async fn run_log_drain( - mut rx: mpsc::UnboundedReceiver, - ctx: LogDrainHandle, -) where +async fn run_log_drain(mut rx: mpsc::UnboundedReceiver, ctx: LogDrainHandle) +where St: session_store::Store + Clone + Send + 'static, { while let Some(cmd) = rx.recv().await { diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 653276e7..674efee0 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -511,10 +511,7 @@ impl Pod { /// broadcast sink. Holds the session-head async lock across the /// disk write and the sink publish so subscribers see a gap-free /// `(snapshot, live)` stream consistent with what's on disk. - pub(crate) async fn commit_entry( - &self, - entry: LogEntry, - ) -> Result { + pub(crate) async fn commit_entry(&self, entry: LogEntry) -> Result { let mut head = self.session_head.lock().await; let hash = session_store::append_entry_with_hash( &self.store, @@ -1643,10 +1640,9 @@ impl Pod { .iter() .map(session_store::LoggedItem::from) .collect(); - self.commit_entry(LogEntry::ToolResults { ts, items }).await?; - } else if item.is_assistant_message() - || item.is_tool_call() - || item.is_reasoning() + self.commit_entry(LogEntry::ToolResults { ts, items }) + .await?; + } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { let start = i; while i < new_items.len() @@ -2766,8 +2762,7 @@ impl Pod, St> { if state.head_hash.is_none() { return Err(PodError::SessionEmpty { session_id }); } - let mirror_entries: Vec = - raw_entries.iter().map(|e| e.entry.clone()).collect(); + let mirror_entries: Vec = raw_entries.iter().map(|e| e.entry.clone()).collect(); let scope_snapshot = state .pod_scope .clone() diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/session_log_sink.rs index 683230d4..4e35a046 100644 --- a/crates/pod/src/session_log_sink.rs +++ b/crates/pod/src/session_log_sink.rs @@ -420,7 +420,10 @@ mod tests { let (snapshot, mut rx) = sink.subscribe_with_snapshot(); assert_eq!(snapshot.len(), 2); assert!(matches!(snapshot[0], LogEntry::SessionStart { .. })); - assert!(matches!(snapshot[1], LogEntry::TurnEnd { turn_count: 1, .. })); + assert!(matches!( + snapshot[1], + LogEntry::TurnEnd { turn_count: 1, .. } + )); assert!(rx.try_recv().is_err()); } @@ -443,10 +446,7 @@ mod tests { // TurnEnd is mirror-only — no live broadcast. sink.publish(turn_end(1)); - assert!( - rx.try_recv().is_err(), - "TurnEnd must not be broadcast live" - ); + assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live"); // HookInjectedItems is live-relevant. sink.publish(hook_injected("[Notify] hi")); diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index c29cfe6a..5ecccbfe 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -188,7 +188,6 @@ mod tests { assert_eq!(parsed["state"], "running"); } - #[test] fn knowledge_completions_empty_when_unset() { let state = test_state(); diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 8823b408..7b827938 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -179,7 +179,12 @@ fn drain(rx: &mut broadcast::Receiver) -> Vec { /// Collect every system-message text that the post-compaction /// `SessionStart.history` carries, by reading the sink mirror directly. -fn system_texts_in_sink_session_start(pod: &pod::Pod) -> Vec { +fn system_texts_in_sink_session_start( + pod: &pod::Pod< + impl llm_worker::llm_client::client::LlmClient + Clone + 'static, + impl session_store::Store + Clone + 'static, + >, +) -> Vec { let (entries, _rx) = pod.sink().subscribe_with_snapshot(); for entry in entries.into_iter().rev() { if let session_store::LogEntry::SessionStart { history, .. } = entry { diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index c4629bcd..af1fb9c2 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -287,10 +287,7 @@ async fn snapshot_includes_user_input_for_in_flight_turn() { } } } - assert!( - found, - "snapshot must carry the in-flight UserInput entry" - ); + assert!(found, "snapshot must carry the in-flight UserInput entry"); return; } Event::Alert(_) => continue,