chore: cargo fmt
This commit is contained in:
parent
350bb1afd8
commit
63e27b2dee
|
|
@ -401,10 +401,8 @@ fn wire_event_bridges_on_worker<C, St>(
|
||||||
/// off the channel and commits each item as a typed `LogEntry` through
|
/// off the channel and commits each item as a typed `LogEntry` through
|
||||||
/// the supplied store + sink. Lives as long as the controller; exits
|
/// the supplied store + sink. Lives as long as the controller; exits
|
||||||
/// when the sender is dropped (controller shutdown).
|
/// when the sender is dropped (controller shutdown).
|
||||||
async fn run_log_drain<St>(
|
async fn run_log_drain<St>(mut rx: mpsc::UnboundedReceiver<LogCommand>, ctx: LogDrainHandle<St>)
|
||||||
mut rx: mpsc::UnboundedReceiver<LogCommand>,
|
where
|
||||||
ctx: LogDrainHandle<St>,
|
|
||||||
) where
|
|
||||||
St: session_store::Store + Clone + Send + 'static,
|
St: session_store::Store + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
while let Some(cmd) = rx.recv().await {
|
while let Some(cmd) = rx.recv().await {
|
||||||
|
|
|
||||||
|
|
@ -511,10 +511,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
/// broadcast sink. Holds the session-head async lock across the
|
/// broadcast sink. Holds the session-head async lock across the
|
||||||
/// disk write and the sink publish so subscribers see a gap-free
|
/// disk write and the sink publish so subscribers see a gap-free
|
||||||
/// `(snapshot, live)` stream consistent with what's on disk.
|
/// `(snapshot, live)` stream consistent with what's on disk.
|
||||||
pub(crate) async fn commit_entry(
|
pub(crate) async fn commit_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
|
||||||
&self,
|
|
||||||
entry: LogEntry,
|
|
||||||
) -> Result<EntryHash, StoreError> {
|
|
||||||
let mut head = self.session_head.lock().await;
|
let mut head = self.session_head.lock().await;
|
||||||
let hash = session_store::append_entry_with_hash(
|
let hash = session_store::append_entry_with_hash(
|
||||||
&self.store,
|
&self.store,
|
||||||
|
|
@ -1643,10 +1640,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
.iter()
|
.iter()
|
||||||
.map(session_store::LoggedItem::from)
|
.map(session_store::LoggedItem::from)
|
||||||
.collect();
|
.collect();
|
||||||
self.commit_entry(LogEntry::ToolResults { ts, items }).await?;
|
self.commit_entry(LogEntry::ToolResults { ts, items })
|
||||||
} else if item.is_assistant_message()
|
.await?;
|
||||||
|| item.is_tool_call()
|
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning()
|
||||||
|| item.is_reasoning()
|
|
||||||
{
|
{
|
||||||
let start = i;
|
let start = i;
|
||||||
while i < new_items.len()
|
while i < new_items.len()
|
||||||
|
|
@ -2766,8 +2762,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||||
if state.head_hash.is_none() {
|
if state.head_hash.is_none() {
|
||||||
return Err(PodError::SessionEmpty { session_id });
|
return Err(PodError::SessionEmpty { session_id });
|
||||||
}
|
}
|
||||||
let mirror_entries: Vec<LogEntry> =
|
let mirror_entries: Vec<LogEntry> = raw_entries.iter().map(|e| e.entry.clone()).collect();
|
||||||
raw_entries.iter().map(|e| e.entry.clone()).collect();
|
|
||||||
let scope_snapshot = state
|
let scope_snapshot = state
|
||||||
.pod_scope
|
.pod_scope
|
||||||
.clone()
|
.clone()
|
||||||
|
|
|
||||||
|
|
@ -420,7 +420,10 @@ mod tests {
|
||||||
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
|
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
|
||||||
assert_eq!(snapshot.len(), 2);
|
assert_eq!(snapshot.len(), 2);
|
||||||
assert!(matches!(snapshot[0], LogEntry::SessionStart { .. }));
|
assert!(matches!(snapshot[0], LogEntry::SessionStart { .. }));
|
||||||
assert!(matches!(snapshot[1], LogEntry::TurnEnd { turn_count: 1, .. }));
|
assert!(matches!(
|
||||||
|
snapshot[1],
|
||||||
|
LogEntry::TurnEnd { turn_count: 1, .. }
|
||||||
|
));
|
||||||
assert!(rx.try_recv().is_err());
|
assert!(rx.try_recv().is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -443,10 +446,7 @@ mod tests {
|
||||||
|
|
||||||
// TurnEnd is mirror-only — no live broadcast.
|
// TurnEnd is mirror-only — no live broadcast.
|
||||||
sink.publish(turn_end(1));
|
sink.publish(turn_end(1));
|
||||||
assert!(
|
assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live");
|
||||||
rx.try_recv().is_err(),
|
|
||||||
"TurnEnd must not be broadcast live"
|
|
||||||
);
|
|
||||||
|
|
||||||
// HookInjectedItems is live-relevant.
|
// HookInjectedItems is live-relevant.
|
||||||
sink.publish(hook_injected("[Notify] hi"));
|
sink.publish(hook_injected("[Notify] hi"));
|
||||||
|
|
|
||||||
|
|
@ -188,7 +188,6 @@ mod tests {
|
||||||
assert_eq!(parsed["state"], "running");
|
assert_eq!(parsed["state"], "running");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn knowledge_completions_empty_when_unset() {
|
fn knowledge_completions_empty_when_unset() {
|
||||||
let state = test_state();
|
let state = test_state();
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,12 @@ fn drain(rx: &mut broadcast::Receiver<Event>) -> Vec<Event> {
|
||||||
|
|
||||||
/// Collect every system-message text that the post-compaction
|
/// Collect every system-message text that the post-compaction
|
||||||
/// `SessionStart.history` carries, by reading the sink mirror directly.
|
/// `SessionStart.history` carries, by reading the sink mirror directly.
|
||||||
fn system_texts_in_sink_session_start(pod: &pod::Pod<impl llm_worker::llm_client::client::LlmClient + Clone + 'static, impl session_store::Store + Clone + 'static>) -> Vec<String> {
|
fn system_texts_in_sink_session_start(
|
||||||
|
pod: &pod::Pod<
|
||||||
|
impl llm_worker::llm_client::client::LlmClient + Clone + 'static,
|
||||||
|
impl session_store::Store + Clone + 'static,
|
||||||
|
>,
|
||||||
|
) -> Vec<String> {
|
||||||
let (entries, _rx) = pod.sink().subscribe_with_snapshot();
|
let (entries, _rx) = pod.sink().subscribe_with_snapshot();
|
||||||
for entry in entries.into_iter().rev() {
|
for entry in entries.into_iter().rev() {
|
||||||
if let session_store::LogEntry::SessionStart { history, .. } = entry {
|
if let session_store::LogEntry::SessionStart { history, .. } = entry {
|
||||||
|
|
|
||||||
|
|
@ -287,10 +287,7 @@ async fn snapshot_includes_user_input_for_in_flight_turn() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert!(
|
assert!(found, "snapshot must carry the in-flight UserInput entry");
|
||||||
found,
|
|
||||||
"snapshot must carry the in-flight UserInput entry"
|
|
||||||
);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Event::Alert(_) => continue,
|
Event::Alert(_) => continue,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user