style: run cargo fmt

This commit is contained in:
Keisuke Hirata 2026-05-22 22:03:27 +09:00
parent d08ea1734e
commit 08dc6b29f8
15 changed files with 94 additions and 62 deletions

View File

@ -639,9 +639,7 @@ async fn controller_loop<C, St>(
// sees the buffered notification(s) without a human // sees the buffered notification(s) without a human
// Run. // Run.
if shared_state.get_status() == PodStatus::Idle { if shared_state.get_status() == PodStatus::Idle {
pending = Some(PendingRun::RunForNotification( pending = Some(PendingRun::RunForNotification(protocol::InvokeKind::Notify));
protocol::InvokeKind::Notify,
));
} }
} }

View File

@ -11,9 +11,9 @@
//! happen at the front of `Pod::run` when //! happen at the front of `Pod::run` when
//! `worker.last_run_interrupted()` is set; see `Pod::apply_interrupt_prep`. //! `worker.last_run_interrupted()` is set; see `Pod::apply_interrupt_prep`.
use llm_worker::Item;
#[cfg(test)] #[cfg(test)]
use crate::prompt::catalog::PromptCatalog; use crate::prompt::catalog::PromptCatalog;
use llm_worker::Item;
/// Build synthetic `Item::ToolResult` items for every unanswered /// Build synthetic `Item::ToolResult` items for every unanswered
/// `Item::ToolCall` in `history`, preserving order. /// `Item::ToolCall` in `history`, preserving order.

View File

@ -185,7 +185,9 @@ mod tests {
let item = build_system_item(&entry, &catalog).unwrap(); let item = build_system_item(&entry, &catalog).unwrap();
match item { match item {
SystemItem::PodEvent { event, body } => { SystemItem::PodEvent { event, body } => {
assert!(matches!(event, PodEvent::TurnEnded { ref pod_name } if pod_name == "child")); assert!(
matches!(event, PodEvent::TurnEnded { ref pod_name } if pod_name == "child")
);
assert!(body.contains("[Notification]")); assert!(body.contains("[Notification]"));
assert!(body.contains("`child`")); assert!(body.contains("`child`"));
} }

View File

@ -1227,8 +1227,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.commit_entry(LogEntry::UserInput { self.commit_entry(LogEntry::UserInput {
ts: segment_log::now_millis(), ts: segment_log::now_millis(),
segments: input.clone(), segments: input.clone(),
}) })?;
?;
self.user_segments.push(input.clone()); self.user_segments.push(input.clone());
// Resolve `@<path>` refs, `#<slug>` Knowledge refs, and `/<slug>` // Resolve `@<path>` refs, `#<slug>` Knowledge refs, and `/<slug>`
@ -1881,8 +1880,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.commit_entry(LogEntry::TurnEnd { self.commit_entry(LogEntry::TurnEnd {
ts: segment_log::now_millis(), ts: segment_log::now_millis(),
turn_count, turn_count,
}) })?;
?;
// Flush any sync-buffered metrics from this run first // Flush any sync-buffered metrics from this run first
// (currently `prune.fire` / `prune.skip` from the prune observer). // (currently `prune.fire` / `prune.skip` from the prune observer).
@ -1922,8 +1920,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
cache_read_tokens: record.cache_read_tokens, cache_read_tokens: record.cache_read_tokens,
cache_write_tokens: record.cache_write_tokens, cache_write_tokens: record.cache_write_tokens,
output_tokens: record.output_tokens, output_tokens: record.output_tokens,
}) })?;
?;
if let Some(id) = correlation_id { if let Some(id) = correlation_id {
let metric = session_metrics::Metric::now("prune.post_request") let metric = session_metrics::Metric::now("prune.post_request")
.with_correlation_id(&id) .with_correlation_id(&id)
@ -1945,16 +1942,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
ts: segment_log::now_millis(), ts: segment_log::now_millis(),
interrupted, interrupted,
result: r.clone(), result: r.clone(),
}) })?;
?;
} }
Err(e) => { Err(e) => {
self.commit_entry(LogEntry::RunErrored { self.commit_entry(LogEntry::RunErrored {
ts: segment_log::now_millis(), ts: segment_log::now_millis(),
interrupted, interrupted,
message: e.to_string(), message: e.to_string(),
}) })?;
?;
} }
} }

View File

@ -119,9 +119,7 @@ impl SegmentLogSink {
fn is_live_relevant(entry: &LogEntry) -> bool { fn is_live_relevant(entry: &LogEntry) -> bool {
matches!( matches!(
entry, entry,
LogEntry::SegmentStart { .. } LogEntry::SegmentStart { .. } | LogEntry::SystemItem { .. } | LogEntry::Invoke { .. }
| LogEntry::SystemItem { .. }
| LogEntry::Invoke { .. }
) )
} }

View File

@ -240,7 +240,10 @@ max_tokens = 100
target = "./" target = "./"
permission = "write" permission = "write"
"#; "#;
let client = MockClient::new(vec![single_text_events("first"), single_text_events("second")]); let client = MockClient::new(vec![
single_text_events("first"),
single_text_events("second"),
]);
let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await; let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await;
pod.run_text("first").await.unwrap(); pod.run_text("first").await.unwrap();

View File

@ -777,13 +777,15 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
// not on the `event_tx` broadcast that `handle.subscribe()` taps. // not on the `event_tx` broadcast that `handle.subscribe()` taps.
// Verify the notification landed on the sink mirror instead. // Verify the notification landed on the sink mirror instead.
let (entries, _) = handle.sink.subscribe_with_snapshot(); let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_notify_in_mirror = entries.iter().any(|e| matches!( let saw_notify_in_mirror = entries.iter().any(|e| {
matches!(
e, e,
session_store::LogEntry::SystemItem { session_store::LogEntry::SystemItem {
item: session_store::SystemItem::Notification { message, .. }, item: session_store::SystemItem::Notification { message, .. },
.. ..
} if message == "turn finished" } if message == "turn finished"
)); )
});
assert!( assert!(
saw_notify_in_mirror, saw_notify_in_mirror,
"Method::Notify should commit a SystemItem::Notification entry; mirror = {entries:?}" "Method::Notify should commit a SystemItem::Notification entry; mirror = {entries:?}"
@ -865,7 +867,8 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
// its Flush of the drain queue) runs afterwards. // its Flush of the drain queue) runs afterwards.
wait_for_status(&handle, PodStatus::Idle).await; wait_for_status(&handle, PodStatus::Idle).await;
let (entries, _) = handle.sink.subscribe_with_snapshot(); let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_pod_event_in_mirror = entries.iter().any(|e| matches!( let saw_pod_event_in_mirror = entries.iter().any(|e| {
matches!(
e, e,
session_store::LogEntry::SystemItem { session_store::LogEntry::SystemItem {
item: session_store::SystemItem::PodEvent { item: session_store::SystemItem::PodEvent {
@ -874,7 +877,8 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
}, },
.. ..
} if pod_name == "child" } if pod_name == "child"
)); )
});
assert!( assert!(
saw_pod_event_in_mirror, saw_pod_event_in_mirror,
"Method::PodEvent should commit a SystemItem::PodEvent entry" "Method::PodEvent should commit a SystemItem::PodEvent entry"

View File

@ -78,8 +78,13 @@ async fn restore_from_manifest_rejects_empty_segment_log() {
std::fs::create_dir_all(&dir).unwrap(); std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join(format!("{segid}.jsonl")), b"").unwrap(); std::fs::write(dir.join(format!("{segid}.jsonl")), b"").unwrap();
let result = let result = Pod::restore_from_manifest(
Pod::restore_from_manifest(sid, segid, manifest, store, pod::PromptLoader::builtins_only()) sid,
segid,
manifest,
store,
pod::PromptLoader::builtins_only(),
)
.await; .await;
match result { match result {
@ -106,8 +111,13 @@ async fn restore_from_manifest_rejects_segment_without_scope_snapshot() {
}; };
session_store::create_segment_with_ids(&store, sid, segid, state).unwrap(); session_store::create_segment_with_ids(&store, sid, segid, state).unwrap();
let result = let result = Pod::restore_from_manifest(
Pod::restore_from_manifest(sid, segid, manifest, store, pod::PromptLoader::builtins_only()) sid,
segid,
manifest,
store,
pod::PromptLoader::builtins_only(),
)
.await; .await;
match result { match result {

View File

@ -74,7 +74,10 @@ impl FsStore {
if let Some(parent) = path.parent() { if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?; fs::create_dir_all(parent)?;
} }
let mut file = fs::OpenOptions::new().create(true).append(true).open(path)?; let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
file.write_all(line.as_bytes())?; file.write_all(line.as_bytes())?;
file.write_all(b"\n")?; file.write_all(b"\n")?;
// Append-mode write is the durability boundary; an explicit // Append-mode write is the durability boundary; an explicit

View File

@ -56,8 +56,8 @@ pub use segment_log::{
LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin,
collect_state, collect_state,
}; };
pub use system_item::{SystemItem, render_pod_event};
pub use store::{Store, StoreError}; pub use store::{Store, StoreError};
pub use system_item::{SystemItem, render_pod_event};
/// Session identifier — the fork-tree root. UUID v7 (time-ordered). /// Session identifier — the fork-tree root. UUID v7 (time-ordered).
/// ///

View File

@ -4,11 +4,11 @@
//! The caller (typically Pod) holds the Worker directly and calls these //! The caller (typically Pod) holds the Worker directly and calls these
//! functions after state-mutating operations. //! functions after state-mutating operations.
use crate::{SegmentId, SessionId};
use crate::logged_item::{LoggedItem, to_logged}; use crate::logged_item::{LoggedItem, to_logged};
use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin}; use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::system_item::SystemItem; use crate::system_item::SystemItem;
use crate::{SegmentId, SessionId};
use llm_worker::WorkerResult; use llm_worker::WorkerResult;
use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::types::Item; use llm_worker::llm_client::types::Item;

View File

@ -112,9 +112,15 @@ fn list_sessions_and_segments() {
let seg_a2 = new_segment_id(); let seg_a2 = new_segment_id();
let seg_b1 = new_segment_id(); let seg_b1 = new_segment_id();
store.append(sid_a, seg_a1, &nil_session_start(1, sid_a)).unwrap(); store
store.append(sid_a, seg_a2, &nil_session_start(2, sid_a)).unwrap(); .append(sid_a, seg_a1, &nil_session_start(1, sid_a))
store.append(sid_b, seg_b1, &nil_session_start(3, sid_b)).unwrap(); .unwrap();
store
.append(sid_a, seg_a2, &nil_session_start(2, sid_a))
.unwrap();
store
.append(sid_b, seg_b1, &nil_session_start(3, sid_b))
.unwrap();
let sessions = store.list_sessions().unwrap(); let sessions = store.list_sessions().unwrap();
assert_eq!(sessions, vec![sid_b, sid_a]); // newest first assert_eq!(sessions, vec![sid_b, sid_a]); // newest first
@ -136,7 +142,9 @@ fn exists_returns_correct_state() {
assert!(!store.exists(sid, segid).unwrap()); assert!(!store.exists(sid, segid).unwrap());
store.append(sid, segid, &nil_session_start(1000, sid)).unwrap(); store
.append(sid, segid, &nil_session_start(1000, sid))
.unwrap();
assert!(store.exists(sid, segid).unwrap()); assert!(store.exists(sid, segid).unwrap());
} }
@ -159,7 +167,9 @@ fn trace_entries_in_separate_file() {
let sid = new_session_id(); let sid = new_session_id();
let segid = new_segment_id(); let segid = new_segment_id();
store.append(sid, segid, &nil_session_start(1000, sid)).unwrap(); store
.append(sid, segid, &nil_session_start(1000, sid))
.unwrap();
let trace = TraceEntry { let trace = TraceEntry {
ts: 1500, ts: 1500,
@ -221,7 +231,9 @@ fn lookup_session_of_finds_owning_session() {
assert_eq!(store.lookup_session_of(segid).unwrap(), None); assert_eq!(store.lookup_session_of(segid).unwrap(), None);
store.append(sid, segid, &nil_session_start(1, sid)).unwrap(); store
.append(sid, segid, &nil_session_start(1, sid))
.unwrap();
assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid)); assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid));
} }

View File

@ -475,7 +475,11 @@ async fn session_auto_forks_on_conflict() {
let fork_entries = store.read_all(sid, segment_id).unwrap(); let fork_entries = store.read_all(sid, segment_id).unwrap();
assert!(!fork_entries.is_empty()); assert!(!fork_entries.is_empty());
let fork_state = collect_state(&fork_entries); let fork_state = collect_state(&fork_entries);
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session"); assert_eq!(
fork_state.session_id,
Some(sid),
"auto-fork inherits Session"
);
// The new segment records its lineage forward via forked_from; the // The new segment records its lineage forward via forked_from; the
// source segment is left immutable (no terminal marker written back). // source segment is left immutable (no terminal marker written back).
@ -542,9 +546,16 @@ async fn nested_past_fork_leaves_ancestors_immutable() {
} }
// The root and fork1 are untouched by forking their descendants. // The root and fork1 are untouched by forking their descendants.
assert_eq!(store.read_all(sid, root_segid).unwrap().len(), root_before.len()); assert_eq!(
store.read_all(sid, root_segid).unwrap().len(),
root_before.len()
);
let fork1_entries = store.read_all(sid, fork1).unwrap(); let fork1_entries = store.read_all(sid, fork1).unwrap();
assert_eq!(fork1_entries.len(), 1, "fork1 is just its SegmentStart seed"); assert_eq!(
fork1_entries.len(),
1,
"fork1 is just its SegmentStart seed"
);
// fork2's lineage points at fork1, not the root. // fork2's lineage points at fork1, not the root.
match &store.read_all(sid, fork2).unwrap()[0] { match &store.read_all(sid, fork2).unwrap()[0] {

View File

@ -502,9 +502,7 @@ impl App {
// for `tickets/invoke-turn-llmcall-semantics.md`; events flow // for `tickets/invoke-turn-llmcall-semantics.md`; events flow
// through to subscribers but the TUI currently derives its // through to subscribers but the TUI currently derives its
// turn header from `UserMessage` / `SystemItem` arrivals. // turn header from `UserMessage` / `SystemItem` arrivals.
Event::InvokeStart { .. } Event::InvokeStart { .. } | Event::LlmCallStart { .. } | Event::LlmCallEnd { .. } => {}
| Event::LlmCallStart { .. }
| Event::LlmCallEnd { .. } => {}
Event::TextDelta { text } => { Event::TextDelta { text } => {
self.append_assistant_text(&text); self.append_assistant_text(&text);
} }

View File

@ -65,7 +65,9 @@ pub enum PickerOutcome {
/// User picked a session; resume at its leaf segment. The pod-cli /// User picked a session; resume at its leaf segment. The pod-cli
/// rehydrates `session_id` via `Store::lookup_session_of` so we only /// rehydrates `session_id` via `Store::lookup_session_of` so we only
/// need to surface the segment here. /// need to surface the segment here.
Picked { segment_id: SegmentId }, Picked {
segment_id: SegmentId,
},
Cancelled, Cancelled,
} }
@ -92,11 +94,7 @@ pub async fn run() -> Result<PickerOutcome, PickerError> {
} }
let mut rows: Vec<Row> = Vec::with_capacity(MAX_ROWS); let mut rows: Vec<Row> = Vec::with_capacity(MAX_ROWS);
for session_id in sessions.into_iter().take(MAX_ROWS) { for session_id in sessions.into_iter().take(MAX_ROWS) {
let Some(leaf_segment_id) = store let Some(leaf_segment_id) = store.list_segments(session_id)?.into_iter().next() else {
.list_segments(session_id)?
.into_iter()
.next()
else {
continue; continue;
}; };
let preview = build_preview(&store, session_id, leaf_segment_id); let preview = build_preview(&store, session_id, leaf_segment_id);