From a0a9df11c060e15286694b625d841406fb7b2196 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 14 Apr 2026 03:13:36 +0900 Subject: [PATCH] cargo fmt --- crates/daemon/src/lib.rs | 1 + crates/llm-worker/examples/worker_cli.rs | 2 +- crates/llm-worker/src/interceptor.rs | 2 +- crates/llm-worker/src/lib.rs | 4 +- .../llm_client/scheme/anthropic/request.rs | 5 +- .../src/llm_client/scheme/gemini/request.rs | 5 +- .../src/llm_client/scheme/openai/request.rs | 5 +- crates/llm-worker/src/prune.rs | 5 +- crates/llm-worker/src/tool.rs | 8 +- crates/llm-worker/src/tool_server.rs | 11 +- crates/llm-worker/src/worker.rs | 107 +++++-------- .../tests/parallel_execution_test.rs | 4 +- crates/llm-worker/tests/tool_macro_test.rs | 22 ++- crates/llm-worker/tests/worker_state_test.rs | 2 +- crates/manifest/src/lib.rs | 12 +- crates/pod/examples/pod_protocol.rs | 10 +- crates/pod/src/compact_interceptor.rs | 2 +- crates/pod/src/controller.rs | 6 +- crates/pod/src/hook.rs | 2 +- crates/pod/src/hook_interceptor.rs | 2 +- crates/pod/src/lib.rs | 2 +- crates/pod/src/main.rs | 12 +- crates/pod/src/pod.rs | 89 ++++++----- crates/pod/src/shared_state.rs | 8 +- crates/pod/src/socket_server.rs | 5 +- crates/pod/src/token_counter.rs | 17 +-- crates/pod/tests/controller_test.rs | 8 +- crates/protocol/src/stream.rs | 2 +- crates/provider/src/lib.rs | 10 +- crates/session-store/src/fs_store.rs | 33 ++-- crates/session-store/src/lib.rs | 6 +- crates/session-store/src/session.rs | 142 ++++++++++++------ crates/session-store/src/session_log.rs | 10 +- crates/session-store/src/store.rs | 11 +- crates/session-store/tests/session_test.rs | 17 ++- crates/tools/src/edit.rs | 3 +- crates/tools/src/grep.rs | 32 ++-- crates/tools/src/lib.rs | 5 +- crates/tools/src/scoped_fs.rs | 8 +- crates/tools/src/write.rs | 10 +- crates/tools/tests/edge_cases.rs | 15 +- crates/tools/tests/integration.rs | 29 ++-- crates/tui/src/app.rs | 24 +-- crates/tui/src/main.rs | 13 +- crates/tui/src/ui.rs | 12 +- 45 files changed, 389 insertions(+), 351 deletions(-) diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index e69de29b..8b137891 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -0,0 +1 @@ + diff --git a/crates/llm-worker/examples/worker_cli.rs b/crates/llm-worker/examples/worker_cli.rs index 54f302ab..a63acec2 100644 --- a/crates/llm-worker/examples/worker_cli.rs +++ b/crates/llm-worker/examples/worker_cli.rs @@ -41,6 +41,7 @@ use tracing_subscriber::EnvFilter; use clap::{Parser, ValueEnum}; use llm_worker::{ Worker, + interceptor::{Interceptor, PostToolAction, ToolResultInfo}, llm_client::{ LlmClient, providers::{ @@ -48,7 +49,6 @@ use llm_worker::{ openai::OpenAIClient, }, }, - interceptor::{Interceptor, PostToolAction, ToolResultInfo}, timeline::{Handler, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, ToolUseBlockKind}, }; use llm_worker_macros::tool_registry; diff --git a/crates/llm-worker/src/interceptor.rs b/crates/llm-worker/src/interceptor.rs index 2b1d7078..6d8a3e1a 100644 --- a/crates/llm-worker/src/interceptor.rs +++ b/crates/llm-worker/src/interceptor.rs @@ -9,8 +9,8 @@ use std::sync::Arc; use async_trait::async_trait; -use crate::tool::{Tool, ToolCall, ToolMeta, ToolResult}; use crate::Item; +use crate::tool::{Tool, ToolCall, ToolMeta, ToolResult}; // ============================================================================= // Action Enums diff --git a/crates/llm-worker/src/lib.rs b/crates/llm-worker/src/lib.rs index 3c4f1e2a..29893327 100644 --- a/crates/llm-worker/src/lib.rs +++ b/crates/llm-worker/src/lib.rs @@ -43,8 +43,8 @@ mod worker; pub(crate) mod callback; pub mod event; -pub mod llm_client; pub mod interceptor; +pub mod llm_client; pub mod prune; pub mod state; pub mod timeline; @@ -53,7 +53,7 @@ pub mod tool_server; pub use callback::{TextBlockScope, ToolUseBlockScope}; pub use handler::ToolUseBlockStart; -pub use message::{ContentPart, Item, Message, Role}; pub use interceptor::Interceptor; +pub use message::{ContentPart, Item, Message, Role}; pub use tool::{ToolCall, ToolResult}; pub use worker::{RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult}; diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs index b51cc4f4..ff9f748e 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs @@ -182,7 +182,10 @@ impl AnthropicScheme { } Item::ToolResult { - call_id, summary, content, .. + call_id, + summary, + content, + .. } => { // Flush pending assistant parts first if !pending_assistant_parts.is_empty() { diff --git a/crates/llm-worker/src/llm_client/scheme/gemini/request.rs b/crates/llm-worker/src/llm_client/scheme/gemini/request.rs index c83ad593..822d8d23 100644 --- a/crates/llm-worker/src/llm_client/scheme/gemini/request.rs +++ b/crates/llm-worker/src/llm_client/scheme/gemini/request.rs @@ -257,7 +257,10 @@ impl GeminiScheme { } Item::ToolResult { - call_id, summary, content, .. + call_id, + summary, + content, + .. } => { // Flush pending model parts first if !pending_model_parts.is_empty() { diff --git a/crates/llm-worker/src/llm_client/scheme/openai/request.rs b/crates/llm-worker/src/llm_client/scheme/openai/request.rs index 9935a3d6..7b25a04c 100644 --- a/crates/llm-worker/src/llm_client/scheme/openai/request.rs +++ b/crates/llm-worker/src/llm_client/scheme/openai/request.rs @@ -212,7 +212,10 @@ impl OpenAIScheme { } Item::ToolResult { - call_id, summary, content, .. + call_id, + summary, + content, + .. } => { // Flush pending tool calls before tool result self.flush_pending_assistant( diff --git a/crates/llm-worker/src/prune.rs b/crates/llm-worker/src/prune.rs index 5530aaa9..7cfc819a 100644 --- a/crates/llm-worker/src/prune.rs +++ b/crates/llm-worker/src/prune.rs @@ -191,7 +191,10 @@ mod tests { assert_eq!(count, 2); for item in &items { - if let Item::ToolResult { summary, content, .. } = item { + if let Item::ToolResult { + summary, content, .. + } = item + { if summary == "s1" || summary == "s2" { assert!(content.is_none(), "old content should be projected out"); } else { diff --git a/crates/llm-worker/src/tool.rs b/crates/llm-worker/src/tool.rs index d1737f67..e5abd09f 100644 --- a/crates/llm-worker/src/tool.rs +++ b/crates/llm-worker/src/tool.rs @@ -56,13 +56,7 @@ impl From for ToolOutput { } } else { let lines = s.lines().count(); - let first_line: String = s - .lines() - .next() - .unwrap_or("") - .chars() - .take(80) - .collect(); + let first_line: String = s.lines().next().unwrap_or("").chars().take(80).collect(); let summary = format!("{lines} lines | {first_line}…"); ToolOutput { summary, diff --git a/crates/llm-worker/src/tool_server.rs b/crates/llm-worker/src/tool_server.rs index a056290a..06722c3c 100644 --- a/crates/llm-worker/src/tool_server.rs +++ b/crates/llm-worker/src/tool_server.rs @@ -65,10 +65,7 @@ impl ToolServerHandle { } /// Queue many tool factories for deferred initialization. - pub(crate) fn register_tools( - &self, - factories: impl IntoIterator, - ) { + pub(crate) fn register_tools(&self, factories: impl IntoIterator) { let mut guard = self.pending.lock().unwrap_or_else(|e| e.into_inner()); guard.extend(factories); } @@ -110,7 +107,11 @@ impl ToolServerHandle { } /// Execute a tool by name. - pub async fn call_tool(&self, name: &str, input_json: &str) -> Result { + pub async fn call_tool( + &self, + name: &str, + input_json: &str, + ) -> Result { let tool = { let guard = self.tools.lock().unwrap_or_else(|e| e.into_inner()); let (_, tool) = guard diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 22215d79..7ae6c941 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -7,24 +7,23 @@ use tracing::{debug, info, trace, warn}; use crate::{ Item, - llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition}, - interceptor::{ - DefaultInterceptor, Interceptor, PostToolAction, PreRequestAction, PreToolAction, - PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, - }, - state::{Locked, Mutable, WorkerState}, callback::{ ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope, ToolUseBlockScope, }, handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind}, - timeline::{TextBlockCollector, Timeline, ToolCallCollector}, + interceptor::{ + DefaultInterceptor, Interceptor, PostToolAction, PreRequestAction, PreToolAction, + PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, + }, + llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition}, + state::{Locked, Mutable, WorkerState}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, + timeline::{TextBlockCollector, Timeline, ToolCallCollector}, tool::{ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolResult}, tool_server::{ToolServer, ToolServerHandle}, }; - /// Worker errors #[derive(Debug, thiserror::Error)] pub enum WorkerError { @@ -53,7 +52,6 @@ pub enum ToolRegistryError { DuplicateName(String), } - /// Worker configuration #[derive(Debug, Clone, Default)] pub struct WorkerConfig { @@ -61,7 +59,6 @@ pub struct WorkerConfig { _private: (), } - /// Worker execution result (status) #[derive(Debug)] pub enum WorkerResult { @@ -95,7 +92,6 @@ enum ToolExecutionResult { Paused, } - /// Central component for managing LLM interactions /// /// Receives input from the user, sends requests to the LLM, and @@ -172,7 +168,6 @@ pub struct Worker { _state: PhantomData, } - impl Worker { fn reset_interruption_state(&mut self) { self.last_run_interrupted = false; @@ -214,10 +209,9 @@ impl Worker { &mut self, setup: impl FnMut(&mut TextBlockScope) + Send + Sync + 'static, ) { - self.timeline - .on_text_block(ClosureTextBlockHandler { - setup: Box::new(setup), - }); + self.timeline.on_text_block(ClosureTextBlockHandler { + setup: Box::new(setup), + }); } /// Register a tool use block observer with scoped callbacks. @@ -240,17 +234,13 @@ impl Worker { &mut self, setup: impl FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync + 'static, ) { - self.timeline - .on_tool_use_block(ClosureToolUseBlockHandler { - setup: Box::new(setup), - }); + self.timeline.on_tool_use_block(ClosureToolUseBlockHandler { + setup: Box::new(setup), + }); } /// Register a usage event callback. - pub fn on_usage( - &mut self, - callback: impl FnMut(&UsageEvent) + Send + Sync + 'static, - ) { + pub fn on_usage(&mut self, callback: impl FnMut(&UsageEvent) + Send + Sync + 'static) { self.timeline.on_usage(ClosureMetaHandler { callback, _kind: PhantomData::, @@ -258,10 +248,7 @@ impl Worker { } /// Register a status event callback. - pub fn on_status( - &mut self, - callback: impl FnMut(&StatusEvent) + Send + Sync + 'static, - ) { + pub fn on_status(&mut self, callback: impl FnMut(&StatusEvent) + Send + Sync + 'static) { self.timeline.on_status(ClosureMetaHandler { callback, _kind: PhantomData::, @@ -269,10 +256,7 @@ impl Worker { } /// Register an error event callback. - pub fn on_error( - &mut self, - callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static, - ) { + pub fn on_error(&mut self, callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static) { self.timeline.on_error(ClosureMetaHandler { callback, _kind: PhantomData::, @@ -280,18 +264,12 @@ impl Worker { } /// Register a turn-start callback (receives 0-based turn number). - pub fn on_turn_start( - &mut self, - callback: impl Fn(usize) + Send + Sync + 'static, - ) { + pub fn on_turn_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) { self.turn_start_cbs.push(Box::new(callback)); } /// Register a turn-end callback (receives 0-based turn number). - pub fn on_turn_end( - &mut self, - callback: impl Fn(usize) + Send + Sync + 'static, - ) { + pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) { self.turn_end_cbs.push(Box::new(callback)); } @@ -735,9 +713,7 @@ impl Worker { // prunable candidates whose estimated savings meet the // threshold. Worker does not own usage history itself; the // estimator is injected by the layer that does. - if let (Some(config), Some(estimator)) = - (&self.prune_config, &self.savings_estimator) - { + if let (Some(config), Some(estimator)) = (&self.prune_config, &self.savings_estimator) { let candidates = crate::prune::prunable_indices(&request_context, config.protected_turns); if !candidates.is_empty() { @@ -745,8 +721,7 @@ impl Worker { let last = *candidates.last().unwrap() + 1; let savings = estimator(&request_context, first..last); if savings >= config.min_savings { - let pruned = - crate::prune::project(&mut request_context, &candidates); + let pruned = crate::prune::project(&mut request_context, &candidates); if pruned > 0 { debug!( pruned, @@ -817,7 +792,11 @@ impl Worker { if let Some(max) = self.max_turns { if self.turn_count >= max as usize { - info!(turn_count = self.turn_count, max_turns = max, "Turn limit reached"); + info!( + turn_count = self.turn_count, + max_turns = max, + "Turn limit reached" + ); self.last_run_interrupted = false; return Ok(WorkerResult::LimitReached); } @@ -911,10 +890,8 @@ impl Worker { content, )); } else { - self.history.push(Item::tool_result( - &result.tool_use_id, - &result.summary, - )); + self.history + .push(Item::tool_result(&result.tool_use_id, &result.summary)); } } Ok(None) @@ -925,10 +902,8 @@ impl Worker { } } } - } - impl Worker { /// Create a new Worker (in Mutable state) pub fn new(client: C) -> Self { @@ -975,10 +950,7 @@ impl Worker { } /// Register multiple tool factories for deferred initialization. - pub fn register_tools( - &mut self, - factories: impl IntoIterator, - ) { + pub fn register_tools(&mut self, factories: impl IntoIterator) { self.tool_server.register_tools(factories); } @@ -1086,45 +1058,38 @@ impl Worker { /// /// Available only in Mutable state. pub fn history_mut(&mut self) -> &mut Vec { - &mut self.history } /// Set history pub fn set_history(&mut self, items: Vec) { - self.history = items; } /// Add an item to history (builder pattern) pub fn with_item(mut self, item: Item) -> Self { - self.history.push(item); self } /// Add an item to history pub fn push_item(&mut self, item: Item) { - self.history.push(item); } /// Add multiple items to history (builder pattern) pub fn with_items(mut self, items: impl IntoIterator) -> Self { - self.history.extend(items); self } /// Add multiple items to history pub fn extend_history(&mut self, items: impl IntoIterator) { - self.history.extend(items); } /// Clear history pub fn clear_history(&mut self) { - self.history.clear(); } @@ -1156,13 +1121,13 @@ impl Worker { /// /// Subsequent runs can use [`Worker::run()`] directly. /// To edit state between turns, call [`unlock()`](Worker::unlock) first. - pub async fn run( - self, - user_input: impl Into, - ) -> Result, WorkerError> { + pub async fn run(self, user_input: impl Into) -> Result, WorkerError> { let mut locked = self.lock(); let result = locked.run(user_input).await?; - Ok(RunOutput { worker: locked, result }) + Ok(RunOutput { + worker: locked, + result, + }) } /// Resume from Paused, consuming self and transitioning to Locked. @@ -1171,7 +1136,10 @@ impl Worker { pub async fn resume(self) -> Result, WorkerError> { let mut locked = self.lock(); let result = locked.resume().await?; - Ok(RunOutput { worker: locked, result }) + Ok(RunOutput { + worker: locked, + result, + }) } /// Lock and transition to Locked state @@ -1216,7 +1184,6 @@ impl Worker { } } - impl Worker { /// Execute a turn /// diff --git a/crates/llm-worker/tests/parallel_execution_test.rs b/crates/llm-worker/tests/parallel_execution_test.rs index b78ad2f7..d3ff86a2 100644 --- a/crates/llm-worker/tests/parallel_execution_test.rs +++ b/crates/llm-worker/tests/parallel_execution_test.rs @@ -8,8 +8,10 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use llm_worker::Worker; +use llm_worker::interceptor::{ + Interceptor, PostToolAction, PreToolAction, ToolCallInfo, ToolResultInfo, +}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; -use llm_worker::interceptor::{Interceptor, PostToolAction, PreToolAction, ToolCallInfo, ToolResultInfo}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; mod common; diff --git a/crates/llm-worker/tests/tool_macro_test.rs b/crates/llm-worker/tests/tool_macro_test.rs index 86d8bee0..3d326a98 100644 --- a/crates/llm-worker/tests/tool_macro_test.rs +++ b/crates/llm-worker/tests/tool_macro_test.rs @@ -77,8 +77,14 @@ async fn test_basic_tool_generation() { let result = tool.execute(r#"{"message": "World"}"#).await; assert!(result.is_ok(), "Should execute successfully"); let output = result.unwrap(); - assert!(output.summary.contains("Hello"), "Output should contain prefix"); - assert!(output.summary.contains("World"), "Output should contain message"); + assert!( + output.summary.contains("Hello"), + "Output should contain prefix" + ); + assert!( + output.summary.contains("World"), + "Output should contain message" + ); } #[tokio::test] @@ -94,7 +100,11 @@ async fn test_multiple_arguments() { let result = tool.execute(r#"{"a": 10, "b": 20}"#).await; assert!(result.is_ok()); let output = result.unwrap(); - assert!(output.summary.contains("30"), "Should contain sum: {:?}", output); + assert!( + output.summary.contains("30"), + "Should contain sum: {:?}", + output + ); } #[tokio::test] @@ -168,7 +178,11 @@ async fn test_result_return_type_success() { let result = tool.execute(r#"{"value": 42}"#).await; assert!(result.is_ok(), "Should succeed for positive value"); let output = result.unwrap(); - assert!(output.summary.contains("Valid"), "Should contain Valid: {:?}", output); + assert!( + output.summary.contains("Valid"), + "Should contain Valid: {:?}", + output + ); } #[tokio::test] diff --git a/crates/llm-worker/tests/worker_state_test.rs b/crates/llm-worker/tests/worker_state_test.rs index 73176ae7..310d883b 100644 --- a/crates/llm-worker/tests/worker_state_test.rs +++ b/crates/llm-worker/tests/worker_state_test.rs @@ -11,9 +11,9 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; use common::MockLlmClient; use llm_worker::Item; -use llm_worker::{Worker, WorkerError}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use llm_worker::{Worker, WorkerError}; // ============================================================================= // Mutable State Tests diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index 6dc0244f..91404d92 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -113,9 +113,15 @@ pub struct CompactionConfig { pub provider: Option, } -fn default_prune_protected_turns() -> usize { 3 } -fn default_prune_min_savings() -> u64 { 4096 } -fn default_compact_retained_turns() -> usize { 2 } +fn default_prune_protected_turns() -> usize { + 3 +} +fn default_prune_min_savings() -> u64 { + 4096 +} +fn default_compact_retained_turns() -> usize { + 2 +} impl Default for CompactionConfig { fn default() -> Self { diff --git a/crates/pod/examples/pod_protocol.rs b/crates/pod/examples/pod_protocol.rs index bcd8707f..30e21453 100644 --- a/crates/pod/examples/pod_protocol.rs +++ b/crates/pod/examples/pod_protocol.rs @@ -88,8 +88,14 @@ async fn main() -> Result<(), Box> { // Wait for completion tokio::time::sleep(std::time::Duration::from_secs(15)).await; - println!("\n[shared_state] final: {}", handle.shared_state.status_json()); - println!("[history] {} bytes", handle.shared_state.history_json().len()); + println!( + "\n[shared_state] final: {}", + handle.shared_state.status_json() + ); + println!( + "[history] {} bytes", + handle.shared_state.history_json().len() + ); drop(handle); let _ = listener.await; diff --git a/crates/pod/src/compact_interceptor.rs b/crates/pod/src/compact_interceptor.rs index ce5ca1a1..839fe604 100644 --- a/crates/pod/src/compact_interceptor.rs +++ b/crates/pod/src/compact_interceptor.rs @@ -9,11 +9,11 @@ use std::sync::Arc; use async_trait::async_trait; +use llm_worker::Item; use llm_worker::interceptor::{ Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, }; -use llm_worker::Item; use tracing::info; use crate::compact_state::CompactState; diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 5b58cab6..c57b764b 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -1,16 +1,16 @@ use std::path::Path; use std::sync::Arc; -use llm_worker::llm_client::client::LlmClient; use llm_worker::WorkerError; +use llm_worker::llm_client::client::LlmClient; use session_store::Store; use tokio::sync::{broadcast, mpsc}; -use crate::pod::{Pod, PodRunResult, PodError}; -use protocol::{ErrorCode, Event, Method, RunResult, TurnResult}; +use crate::pod::{Pod, PodError, PodRunResult}; use crate::runtime_dir::RuntimeDir; use crate::shared_state::{PodSharedState, PodStatus}; use crate::socket_server::SocketServer; +use protocol::{ErrorCode, Event, Method, RunResult, TurnResult}; // --------------------------------------------------------------------------- // PodHandle — client-facing, Clone-able diff --git a/crates/pod/src/hook.rs b/crates/pod/src/hook.rs index e3163a6a..4ca90a08 100644 --- a/crates/pod/src/hook.rs +++ b/crates/pod/src/hook.rs @@ -8,11 +8,11 @@ //! concerns belong. use async_trait::async_trait; +use llm_worker::Item; use llm_worker::interceptor::{ PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, }; -use llm_worker::Item; // ============================================================================= // Hook Event Kinds diff --git a/crates/pod/src/hook_interceptor.rs b/crates/pod/src/hook_interceptor.rs index 4d15550d..07f10e25 100644 --- a/crates/pod/src/hook_interceptor.rs +++ b/crates/pod/src/hook_interceptor.rs @@ -3,11 +3,11 @@ use std::sync::Arc; use async_trait::async_trait; +use llm_worker::Item; use llm_worker::interceptor::{ Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, }; -use llm_worker::Item; use crate::hook::HookRegistry; diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 3f8fa2fe..5c34bfb7 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -15,8 +15,8 @@ mod usage_tracker; pub use token_counter::{EstimateSource, SplitPoint, TokenEstimate}; pub use controller::{PodController, PodHandle}; -pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope}; pub use hook::{Hook, HookEventKind, HookRegistryBuilder}; +pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope}; pub use pod::{Pod, PodError, PodRunResult, apply_worker_manifest}; pub use protocol::{ErrorCode, Event, Method, TurnResult}; pub use provider::{ProviderError, build_client}; diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index fb1442cb..3aa3687f 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -2,8 +2,8 @@ use std::path::{Path, PathBuf}; use std::process::ExitCode; use clap::Parser; -use session_store::FsStore; use pod::{Pod, PodController}; +use session_store::FsStore; #[derive(Parser)] #[command(name = "pod", about = "Run a Pod process from a manifest file")] @@ -18,9 +18,8 @@ struct Cli { } fn default_store_dir() -> Result { - let home = std::env::var("HOME").map_err(|_| { - std::io::Error::new(std::io::ErrorKind::NotFound, "HOME is not set") - })?; + let home = std::env::var("HOME") + .map_err(|_| std::io::Error::new(std::io::ErrorKind::NotFound, "HOME is not set"))?; Ok(PathBuf::from(home).join(".insomnia").join("sessions")) } @@ -111,7 +110,10 @@ async fn main() -> ExitCode { } }; - eprintln!("pod: {pod_name} listening on {:?}", handle.runtime_dir.socket_path()); + eprintln!( + "pod: {pod_name} listening on {:?}", + handle.runtime_dir.socket_path() + ); // Wait for shutdown signal match tokio::signal::ctrl_c().await { diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 88d60726..a96cc8d8 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -2,8 +2,8 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; use llm_worker::Item; -use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::RequestConfig; +use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{Worker, WorkerError, WorkerResult}; use session_store::{ @@ -21,8 +21,8 @@ use crate::hook::{ }; use crate::hook_interceptor::HookInterceptor; use crate::usage_tracker::UsageTracker; -use llm_worker::interceptor::PreRequestAction; use async_trait::async_trait; +use llm_worker::interceptor::PreRequestAction; /// Pre-LLM-request hook that records `history.len()` at send time into a /// shared `UsageTracker`. The on_usage callback later pairs this with the @@ -205,7 +205,10 @@ impl Pod { /// Returns a clone since the underlying vector is shared with hooks /// running on the Worker. pub fn usage_history(&self) -> Vec { - self.usage_history.lock().expect("usage_history poisoned").clone() + self.usage_history + .lock() + .expect("usage_history poisoned") + .clone() } /// Shared handle to the cumulative Usage history. @@ -292,10 +295,9 @@ impl Pod { // Pre-LLM-request hook: capture history.len() into the // UsageTracker so the upcoming on_usage callback can pair // it with the measured input_tokens. - self.hook_builder - .add_pre_llm_request(UsageTrackingHook { - tracker: self.usage_tracker.clone(), - }); + self.hook_builder.add_pre_llm_request(UsageTrackingHook { + tracker: self.usage_tracker.clone(), + }); let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); @@ -430,8 +432,9 @@ impl Pod { /// async layout cycle (`run → handle_worker_result → do_compact_and_resume → resume`). fn do_compact_and_resume( &mut self, - ) -> std::pin::Pin> + Send + '_>> - { + ) -> std::pin::Pin< + Box> + Send + '_>, + > { Box::pin(async move { // Thrash detection: if we just compacted and hit the threshold again, // something is wrong. @@ -475,9 +478,7 @@ impl Pod { /// Best-effort: failures are logged but do not propagate. pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> { let state = match self.compact_state.as_ref() { - Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => { - s.clone() - } + Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => s.clone(), _ => return Ok(()), }; @@ -509,13 +510,8 @@ impl Pod { // head_hash mutable). let w = self.worker.as_ref().unwrap(); let new_items = &w.history()[history_before..]; - session_store::save_delta( - &self.store, - self.session_id, - &mut self.head_hash, - new_items, - ) - .await?; + session_store::save_delta(&self.store, self.session_id, &mut self.head_hash, new_items) + .await?; let turn_count = self.worker.as_ref().unwrap().turn_count(); session_store::save_turn_end( @@ -544,7 +540,10 @@ impl Pod { record.output_tokens, ) .await?; - self.usage_history.lock().expect("usage_history poisoned").push(record); + self.usage_history + .lock() + .expect("usage_history poisoned") + .push(record); } let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); @@ -578,10 +577,7 @@ impl Pod { /// - a clone of the main LlmClient via `clone_boxed()`. /// /// Returns the new session ID. - pub async fn compact( - &mut self, - retained_turns: usize, - ) -> Result { + pub async fn compact(&mut self, retained_turns: usize) -> Result { let worker = self.worker.as_ref().expect("worker taken during run"); let history = worker.history(); @@ -612,13 +608,20 @@ impl Pod { .temperature(0.0); summary_worker.set_max_tokens(2048); - let out = summary_worker.run(summary_prompt).await + let out = summary_worker + .run(summary_prompt) + .await .map_err(PodError::Worker)?; - let summary_text = out.worker + let summary_text = out + .worker .history() .iter() .filter_map(|item| { - if item.is_assistant_message() { item.as_text().map(String::from) } else { None } + if item.is_assistant_message() { + item.as_text().map(String::from) + } else { + None + } }) .collect::>() .join("\n"); @@ -632,7 +635,9 @@ impl Pod { // Persist as a new compacted session. let old_session_id = self.session_id; - let old_head_hash = self.head_hash.clone() + let old_head_hash = self + .head_hash + .clone() .expect("head_hash should be set after at least one entry"); let w = self.worker.as_ref().unwrap(); @@ -655,7 +660,10 @@ impl Pod { self.session_id = new_session_id; self.head_hash = Some(new_head_hash); self.worker.as_mut().unwrap().set_history(new_history); - self.usage_history.lock().expect("usage_history poisoned").clear(); + self.usage_history + .lock() + .expect("usage_history poisoned") + .clear(); Ok(new_session_id) } @@ -715,7 +723,6 @@ impl Pod, St> { pod.apply_prune_from_manifest(); Ok(pod) } - } /// Apply worker-level manifest settings to a Worker. @@ -769,18 +776,24 @@ fn build_summary_prompt(items: &[Item]) -> String { llm_worker::Role::Assistant => "Assistant", llm_worker::Role::System => "System", }; - let text: String = content.iter().map(|p| p.as_text()).collect::>().join(""); + let text: String = content + .iter() + .map(|p| p.as_text()) + .collect::>() + .join(""); lines.push(format!("[{role_label}] {text}")); } - Item::ToolCall { name, arguments, .. } => { + Item::ToolCall { + name, arguments, .. + } => { lines.push(format!("[ToolCall] {name}({arguments})")); } - Item::ToolResult { summary, content, .. } => { - match content { - Some(c) => lines.push(format!("[ToolResult] {summary}\n{c}")), - None => lines.push(format!("[ToolResult] {summary}")), - } - } + Item::ToolResult { + summary, content, .. + } => match content { + Some(c) => lines.push(format!("[ToolResult] {summary}\n{c}")), + None => lines.push(format!("[ToolResult] {summary}")), + }, Item::Reasoning { text, .. } => { lines.push(format!("[Reasoning] {text}")); } diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index 7f94c239..f28316dd 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -1,8 +1,8 @@ use std::sync::RwLock; use llm_worker::llm_client::types::Item; -use session_store::SessionId; use serde::{Deserialize, Serialize}; +use session_store::SessionId; /// Shared state between PodController and runtime directory. /// @@ -25,11 +25,7 @@ pub enum PodStatus { } impl PodSharedState { - pub fn new( - pod_name: String, - session_id: SessionId, - manifest_toml: String, - ) -> Self { + pub fn new(pod_name: String, session_id: SessionId, manifest_toml: String) -> Self { Self { pod_name, session_id, diff --git a/crates/pod/src/socket_server.rs b/crates/pod/src/socket_server.rs index b22f8b0f..fdbf271b 100644 --- a/crates/pod/src/socket_server.rs +++ b/crates/pod/src/socket_server.rs @@ -42,10 +42,7 @@ impl SocketServer { } }); - Ok(Self { - _accept_task, - path, - }) + Ok(Self { _accept_task, path }) } /// The socket file path. diff --git a/crates/pod/src/token_counter.rs b/crates/pod/src/token_counter.rs index 3ab95d04..876f39ca 100644 --- a/crates/pod/src/token_counter.rs +++ b/crates/pod/src/token_counter.rs @@ -140,9 +140,7 @@ fn tokens_at( let up_bytes = prefix[up.history_len.min(cap)]; let at_bytes = prefix[index]; let span_bytes = up_bytes.saturating_sub(lo_bytes); - let span_tokens = up - .input_total_tokens - .saturating_sub(lo.input_total_tokens); + let span_tokens = up.input_total_tokens.saturating_sub(lo.input_total_tokens); if span_bytes == 0 || span_tokens == 0 { return TokenEstimate { tokens: lo.input_total_tokens, @@ -198,11 +196,7 @@ fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate tokens_at(history, records, history.len(), &prefix) } -fn split_for_retained_impl( - history: &[Item], - records: &[UsageRecord], - retained: u64, -) -> SplitPoint { +fn split_for_retained_impl(history: &[Item], records: &[UsageRecord], retained: u64) -> SplitPoint { let prefix = prefix_bytes(history); let current = tokens_at(history, records, history.len(), &prefix); if current.tokens <= retained { @@ -351,12 +345,7 @@ mod tests { #[test] fn split_interpolated_between_measurements() { - let history = vec![ - msg("aaaaaa"), - msg("bbbbbb"), - msg("cccccc"), - msg("dddddd"), - ]; + let history = vec![msg("aaaaaa"), msg("bbbbbb"), msg("cccccc"), msg("dddddd")]; let records = vec![record(1, 50), record(4, 400)]; let cut = split_for_retained_impl(&history, &records, 250); assert!(cut.index > 1 && cut.index <= 4); diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 9d810adb..6c74a533 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1,17 +1,15 @@ use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; use futures::Stream; +use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; -use llm_worker::Worker; use session_store::FsStore; -use pod::{ - Event, Method, Pod, PodController, PodManifest, PodStatus, -}; +use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus}; // --------------------------------------------------------------------------- // Mock LLM Client diff --git a/crates/protocol/src/stream.rs b/crates/protocol/src/stream.rs index 1916b0a5..cf612168 100644 --- a/crates/protocol/src/stream.rs +++ b/crates/protocol/src/stream.rs @@ -1,7 +1,7 @@ use std::io; -use serde::de::DeserializeOwned; use serde::Serialize; +use serde::de::DeserializeOwned; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; /// JSONL line reader over an async byte stream. diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 48534a8a..7666b17d 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -38,7 +38,10 @@ fn resolve_api_key( if let Some(ref raw_path) = config.api_key_file { let path = expand_key_path(raw_path, manifest_dir)?; let contents = std::fs::read_to_string(&path).map_err(|e| { - ProviderError::Config(format!("failed to read api_key_file {}: {e}", path.display())) + ProviderError::Config(format!( + "failed to read api_key_file {}: {e}", + path.display() + )) })?; return Ok(Some(contents.trim().to_owned())); } @@ -47,10 +50,7 @@ fn resolve_api_key( } /// Expand `~` and resolve relative paths against `manifest_dir`. -fn expand_key_path( - raw: &Path, - manifest_dir: Option<&Path>, -) -> Result { +fn expand_key_path(raw: &Path, manifest_dir: Option<&Path>) -> Result { let path = if raw.starts_with("~") { let home = std::env::var("HOME") .map_err(|_| ProviderError::Config("HOME is not set for ~ expansion".into()))?; diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index af308ec8..bdf7dc91 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -4,10 +4,10 @@ //! - Session log: `{root}/{session_id}.jsonl` //! - Event trace: `{root}/{session_id}.trace.jsonl` +use crate::SessionId; use crate::event_trace::TraceEntry; use crate::session_log::{EntryHash, HashedEntry}; use crate::store::{Store, StoreError}; -use crate::SessionId; use std::path::{Path, PathBuf}; use tokio::fs; use tokio::io::AsyncWriteExt; @@ -50,19 +50,16 @@ impl FsStore { Ok(()) } - fn parse_jsonl( - content: &str, - ) -> Result, StoreError> { + fn parse_jsonl(content: &str) -> Result, StoreError> { let mut entries = Vec::new(); for (i, line) in content.lines().enumerate() { if line.trim().is_empty() { continue; } - let entry: T = - serde_json::from_str(line).map_err(|e| StoreError::Corrupt { - line: i + 1, - message: e.to_string(), - })?; + let entry: T = serde_json::from_str(line).map_err(|e| StoreError::Corrupt { + line: i + 1, + message: e.to_string(), + })?; entries.push(entry); } Ok(entries) @@ -122,10 +119,7 @@ impl Store for FsStore { Ok(self.log_path(id).exists()) } - async fn read_head_hash( - &self, - id: SessionId, - ) -> Result, StoreError> { + async fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); @@ -134,23 +128,18 @@ impl Store for FsStore { let last_line = content.lines().rev().find(|l| !l.trim().is_empty()); match last_line { Some(line) => { - let entry: HashedEntry = serde_json::from_str(line).map_err(|e| { - StoreError::Corrupt { + let entry: HashedEntry = + serde_json::from_str(line).map_err(|e| StoreError::Corrupt { line: content.lines().count(), message: e.to_string(), - } - })?; + })?; Ok(Some(entry.hash)) } None => Ok(None), } } - async fn append_trace( - &self, - id: SessionId, - entry: &TraceEntry, - ) -> Result<(), StoreError> { + async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; self.append_line(&self.trace_path(id), &line).await } diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 3d7d260c..52cf9f38 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -35,9 +35,9 @@ pub mod store; pub use event_trace::TraceEntry; pub use fs_store::FsStore; pub use session::{ - SessionStartState, create_compacted_session, create_session, ensure_head_or_fork, fork, fork_at, - restore, save_cache_locked, save_cache_unlocked, save_config_changed, save_delta, save_outcome, - save_turn_end, save_usage, + SessionStartState, create_compacted_session, create_session, ensure_head_or_fork, fork, + fork_at, restore, save_cache_locked, save_cache_unlocked, save_config_changed, save_delta, + save_outcome, save_turn_end, save_usage, }; pub use session_log::{ EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord, diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs index 380665ea..5a07e57a 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/session.rs @@ -4,11 +4,11 @@ //! The caller (typically Pod) holds the Worker directly and calls these //! functions after state-mutating operations. +use crate::SessionId; use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome, SessionOrigin}; use crate::store::{Store, StoreError}; -use crate::SessionId; -use llm_worker::llm_client::types::Item; use llm_worker::llm_client::RequestConfig; +use llm_worker::llm_client::types::Item; /// State snapshot for creating a SessionStart entry. pub struct SessionStartState<'a> { @@ -142,10 +142,15 @@ pub async fn save_delta( while i < new_items.len() { let item = &new_items[i]; if item.is_user_message() { - append_entry(store, session_id, head_hash, LogEntry::UserInput { - ts, - item: new_items[i].clone(), - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::UserInput { + ts, + item: new_items[i].clone(), + }, + ) .await?; i += 1; } else if item.is_tool_result() { @@ -153,10 +158,15 @@ pub async fn save_delta( while i < new_items.len() && new_items[i].is_tool_result() { i += 1; } - append_entry(store, session_id, head_hash, LogEntry::ToolResults { - ts, - items: new_items[start..i].to_vec(), - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::ToolResults { + ts, + items: new_items[start..i].to_vec(), + }, + ) .await?; } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { let start = i; @@ -167,16 +177,26 @@ pub async fn save_delta( { i += 1; } - append_entry(store, session_id, head_hash, LogEntry::AssistantItems { - ts, - items: new_items[start..i].to_vec(), - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::AssistantItems { + ts, + items: new_items[start..i].to_vec(), + }, + ) .await?; } else { - append_entry(store, session_id, head_hash, LogEntry::HookInjectedItems { - ts, - items: vec![new_items[i].clone()], - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::HookInjectedItems { + ts, + items: vec![new_items[i].clone()], + }, + ) .await?; i += 1; } @@ -191,10 +211,15 @@ pub async fn save_turn_end( head_hash: &mut Option, turn_count: usize, ) -> Result<(), StoreError> { - append_entry(store, session_id, head_hash, LogEntry::TurnEnd { - ts: session_log::now_millis(), - turn_count, - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::TurnEnd { + ts: session_log::now_millis(), + turn_count, + }, + ) .await } @@ -206,11 +231,16 @@ pub async fn save_outcome( outcome: Outcome, interrupted: bool, ) -> Result<(), StoreError> { - append_entry(store, session_id, head_hash, LogEntry::RunOutcome { - ts: session_log::now_millis(), - outcome, - interrupted, - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::RunOutcome { + ts: session_log::now_millis(), + outcome, + interrupted, + }, + ) .await } @@ -230,14 +260,19 @@ pub async fn save_usage( cache_write_tokens: u64, output_tokens: u64, ) -> Result<(), StoreError> { - append_entry(store, session_id, head_hash, LogEntry::LlmUsage { - ts: session_log::now_millis(), - history_len, - input_total_tokens, - cache_read_tokens, - cache_write_tokens, - output_tokens, - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::LlmUsage { + ts: session_log::now_millis(), + history_len, + input_total_tokens, + cache_read_tokens, + cache_write_tokens, + output_tokens, + }, + ) .await } @@ -248,10 +283,15 @@ pub async fn save_cache_locked( head_hash: &mut Option, locked_prefix_len: usize, ) -> Result<(), StoreError> { - append_entry(store, session_id, head_hash, LogEntry::Locked { - ts: session_log::now_millis(), - locked_prefix_len, - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::Locked { + ts: session_log::now_millis(), + locked_prefix_len, + }, + ) .await } @@ -261,9 +301,14 @@ pub async fn save_cache_unlocked( session_id: SessionId, head_hash: &mut Option, ) -> Result<(), StoreError> { - append_entry(store, session_id, head_hash, LogEntry::CacheUnlocked { - ts: session_log::now_millis(), - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::CacheUnlocked { + ts: session_log::now_millis(), + }, + ) .await } @@ -274,10 +319,15 @@ pub async fn save_config_changed( head_hash: &mut Option, config: &RequestConfig, ) -> Result<(), StoreError> { - append_entry(store, session_id, head_hash, LogEntry::ConfigChanged { - ts: session_log::now_millis(), - config: config.clone(), - }) + append_entry( + store, + session_id, + head_hash, + LogEntry::ConfigChanged { + ts: session_log::now_millis(), + config: config.clone(), + }, + ) .await } diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index 98e0c957..ccfbf6df 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -184,7 +184,9 @@ pub enum Outcome { /// Worker yielded control to the caller for external processing. /// Distinct from `Paused`: caller handles internally and resumes. Yielded, - Error { message: String }, + Error { + message: String, + }, } /// State collected from log entries. @@ -409,7 +411,11 @@ mod tests { }, LogEntry::AssistantItems { ts: 3000, - items: vec![Item::tool_call("call_1", "get_weather", r#"{"city":"Tokyo"}"#)], + items: vec![Item::tool_call( + "call_1", + "get_weather", + r#"{"city":"Tokyo"}"#, + )], }, LogEntry::ToolResults { ts: 3500, diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index 9565af2a..74a8066e 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -3,9 +3,9 @@ //! [`Store`] defines the async interface for reading and writing session logs. //! Implementations handle the physical storage (filesystem, database, etc.). +use crate::SessionId; use crate::event_trace::TraceEntry; use crate::session_log::{EntryHash, HashedEntry}; -use crate::SessionId; use std::future::Future; /// Errors from the persistence store. @@ -43,9 +43,7 @@ pub trait Store: Send + Sync { ) -> impl Future, StoreError>> + Send; /// List all session IDs, most recent first. - fn list_sessions( - &self, - ) -> impl Future, StoreError>> + Send; + fn list_sessions(&self) -> impl Future, StoreError>> + Send; /// Create a new session with initial entries. fn create_session( @@ -55,10 +53,7 @@ pub trait Store: Send + Sync { ) -> impl Future> + Send; /// Check if a session exists. - fn exists( - &self, - id: SessionId, - ) -> impl Future> + Send; + fn exists(&self, id: SessionId) -> impl Future> + Send; /// Read the hash of the last entry in a session (the head). /// diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index e1980b3f..d593e192 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use async_trait::async_trait; use common::MockLlmClient; +use llm_worker::Worker; use llm_worker::interceptor::{Interceptor, TurnEndAction}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; -use llm_worker::Worker; use session_store::{ EntryHash, FsStore, LogEntry, Outcome, SessionStartState, Store, collect_state, }; @@ -124,9 +124,15 @@ async fn run_and_persist( message: e.to_string(), }, }; - session_store::save_outcome(store, session_id, head_hash, outcome, worker.last_run_interrupted()) - .await - .unwrap(); + session_store::save_outcome( + store, + session_id, + head_hash, + outcome, + worker.last_run_interrupted(), + ) + .await + .unwrap(); let r = result.unwrap(); (worker, r) @@ -245,7 +251,8 @@ async fn session_run_with_tool_call() { .unwrap(); let mut head_hash = Some(head_hash); - let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; + let (_worker, _) = + run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; let entries = store.read_all(sid).await.unwrap(); diff --git a/crates/tools/src/edit.rs b/crates/tools/src/edit.rs index 5425abdf..c1194353 100644 --- a/crates/tools/src/edit.rs +++ b/crates/tools/src/edit.rs @@ -91,8 +91,7 @@ impl Tool for EditTool { let occurrences = if params.replace_all { count } else { 1 }; self.fs.write(¶ms.file_path, new_text.as_bytes())?; - self.tracker - .record(¶ms.file_path, new_text.as_bytes()); + self.tracker.record(¶ms.file_path, new_text.as_bytes()); let summary = format!( "Edited {} ({} replacement{})", diff --git a/crates/tools/src/grep.rs b/crates/tools/src/grep.rs index 2adb6106..9a3dcea7 100644 --- a/crates/tools/src/grep.rs +++ b/crates/tools/src/grep.rs @@ -6,9 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use grep_regex::RegexMatcherBuilder; use grep_searcher::sinks::UTF8 as UTF8Sink; -use grep_searcher::{ - BinaryDetection, Searcher, SearcherBuilder, Sink, SinkContext, SinkMatch, -}; +use grep_searcher::{BinaryDetection, Searcher, SearcherBuilder, Sink, SinkContext, SinkMatch}; use ignore::WalkBuilder; use ignore::overrides::OverrideBuilder; use ignore::types::TypesBuilder; @@ -94,10 +92,9 @@ impl Tool for GrepTool { ); let default_base = self.fs.scope().root().to_path_buf(); - let report = - tokio::task::spawn_blocking(move || run_grep(default_base, params)) - .await - .map_err(|e| ToolError::Internal(format!("spawn_blocking failed: {e}")))??; + let report = tokio::task::spawn_blocking(move || run_grep(default_base, params)) + .await + .map_err(|e| ToolError::Internal(format!("spawn_blocking failed: {e}")))??; Ok(report.render()) } @@ -212,12 +209,7 @@ impl GrepReport { continue; } } - body.push_str(&format!( - "{}{}{}\n", - line.path.display(), - sep, - line.text - )); + body.push_str(&format!("{}{}{}\n", line.path.display(), sep, line.text)); } let mut summary = format!( "{} matching line(s) in {} file(s)", @@ -285,7 +277,8 @@ fn run_grep(default_base: PathBuf, p: GrepParams) -> Result { impl Sink for ContentSink<'_> { type Error = std::io::Error; - fn matched( - &mut self, - _searcher: &Searcher, - mat: &SinkMatch<'_>, - ) -> Result { + fn matched(&mut self, _searcher: &Searcher, mat: &SinkMatch<'_>) -> Result { let idx = *self.matches_seen; *self.matches_seen += 1; @@ -589,10 +578,7 @@ mod tests { #[tokio::test] async fn grep_multiline() { let (dir, fs) = setup(); - touch( - &dir.path().join("a.txt"), - "start\nfoo\nbar\nend\n", - ); + touch(&dir.path().join("a.txt"), "start\nfoo\nbar\nend\n"); let def = grep_tool(fs); let (_, tool) = def(); diff --git a/crates/tools/src/lib.rs b/crates/tools/src/lib.rs index 5b78dfc9..a5bbfede 100644 --- a/crates/tools/src/lib.rs +++ b/crates/tools/src/lib.rs @@ -39,10 +39,7 @@ pub use write::write_tool; /// All returned factories share the same tracker instance so that /// `Read` / `Write` / `Edit` see a consistent history across tool /// invocations within a single session. -pub fn builtin_tools( - fs: ScopedFs, - tracker: Tracker, -) -> Vec { +pub fn builtin_tools(fs: ScopedFs, tracker: Tracker) -> Vec { vec![ read_tool(fs.clone(), tracker.clone()), write_tool(fs.clone(), tracker.clone()), diff --git a/crates/tools/src/scoped_fs.rs b/crates/tools/src/scoped_fs.rs index 19bea963..40fc6f92 100644 --- a/crates/tools/src/scoped_fs.rs +++ b/crates/tools/src/scoped_fs.rs @@ -103,10 +103,7 @@ impl ScopedFs { let existed = path.exists(); let parent = path.parent().ok_or_else(|| { - ToolsError::InvalidArgument(format!( - "path has no parent directory: {}", - path.display() - )) + ToolsError::InvalidArgument(format!("path has no parent directory: {}", path.display())) })?; if !parent.as_os_str().is_empty() && !parent.exists() { std::fs::create_dir_all(parent).map_err(|e| ToolsError::io(parent, e))?; @@ -119,7 +116,8 @@ impl ScopedFs { }; let mut tmp = tempfile::NamedTempFile::new_in(tmp_parent) .map_err(|e| ToolsError::io(tmp_parent, e))?; - tmp.write_all(content).map_err(|e| ToolsError::io(path, e))?; + tmp.write_all(content) + .map_err(|e| ToolsError::io(path, e))?; tmp.as_file() .sync_all() .map_err(|e| ToolsError::io(path, e))?; diff --git a/crates/tools/src/write.rs b/crates/tools/src/write.rs index 03f6aff5..9290ae9d 100644 --- a/crates/tools/src/write.rs +++ b/crates/tools/src/write.rs @@ -48,7 +48,9 @@ impl Tool for WriteTool { self.tracker.verify(¶ms.file_path, ¤t)?; } - let outcome = self.fs.write(¶ms.file_path, params.content.as_bytes())?; + let outcome = self + .fs + .write(¶ms.file_path, params.content.as_bytes())?; // Refresh the history entry to reflect the newly-written content, // so a subsequent Edit / Write can proceed without a re-read. @@ -57,7 +59,11 @@ impl Tool for WriteTool { let summary = format!( "{} {} ({} bytes)", - if outcome.created { "Created" } else { "Overwrote" }, + if outcome.created { + "Created" + } else { + "Overwrote" + }, params.file_path.display(), outcome.bytes_written ); diff --git a/crates/tools/tests/edge_cases.rs b/crates/tools/tests/edge_cases.rs index dcb58255..7dc6b39b 100644 --- a/crates/tools/tests/edge_cases.rs +++ b/crates/tools/tests/edge_cases.rs @@ -6,7 +6,7 @@ use llm_worker::tool::{Tool, ToolDefinition}; use manifest::Scope; use serde_json::json; use tempfile::TempDir; -use tools::{Tracker, ScopedFs, builtin_tools}; +use tools::{ScopedFs, Tracker, builtin_tools}; struct Registry { entries: Vec<(llm_worker::tool::ToolMeta, Arc)>, @@ -54,10 +54,7 @@ async fn unicode_path_and_content() { let read = reg.get("Read"); let out = read - .execute( - &json!({ "file_path": file.to_str().unwrap() }) - .to_string(), - ) + .execute(&json!({ "file_path": file.to_str().unwrap() }).to_string()) .await .unwrap(); let body = out.content.unwrap(); @@ -81,11 +78,9 @@ async fn symlink_to_outside_scope_is_rejected_for_write() { // Read tool must work against the symlink (read is unrestricted). let read = reg.get("Read"); - read.execute( - &json!({ "file_path": link.to_str().unwrap() }).to_string(), - ) - .await - .unwrap(); + read.execute(&json!({ "file_path": link.to_str().unwrap() }).to_string()) + .await + .unwrap(); // Write through the symlink must be rejected because canonicalization // resolves it to outside the scope. diff --git a/crates/tools/tests/integration.rs b/crates/tools/tests/integration.rs index 9fb83104..115f33d9 100644 --- a/crates/tools/tests/integration.rs +++ b/crates/tools/tests/integration.rs @@ -11,7 +11,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolMeta}; use manifest::Scope; use serde_json::json; use tempfile::TempDir; -use tools::{Tracker, ScopedFs, builtin_tools}; +use tools::{ScopedFs, Tracker, builtin_tools}; struct Registry { entries: Vec<(ToolMeta, Arc)>, @@ -50,10 +50,7 @@ async fn call(tool: &Arc, input: serde_json::Value) -> llm_worker::too .expect("tool execution failed") } -async fn call_err( - tool: &Arc, - input: serde_json::Value, -) -> llm_worker::tool::ToolError { +async fn call_err(tool: &Arc, input: serde_json::Value) -> llm_worker::tool::ToolError { tool.execute(&input.to_string()) .await .expect_err("expected error") @@ -71,7 +68,11 @@ fn builtin_tools_registers_all_five() { fn meta_has_description_and_schema() { let (_dir, reg) = setup(); for (meta, _) in ®.entries { - assert!(!meta.description.is_empty(), "{} missing description", meta.name); + assert!( + !meta.description.is_empty(), + "{} missing description", + meta.name + ); // Input schema must be a JSON object assert!( meta.input_schema.is_object(), @@ -283,7 +284,11 @@ async fn tracker_recent_files_tracks_read_write_edit() { std::fs::write(&a, "one\n").unwrap(); // Read `a` — should appear in recency. - call(®.get("Read"), json!({ "file_path": a.to_str().unwrap() })).await; + call( + ®.get("Read"), + json!({ "file_path": a.to_str().unwrap() }), + ) + .await; // Write `b` (new file) — should appear ahead of `a`. call( ®.get("Write"), @@ -303,8 +308,14 @@ async fn tracker_recent_files_tracks_read_write_edit() { let recent = tracker.recent_files(10); assert_eq!(recent.len(), 2); - assert!(recent[0].ends_with("a.txt"), "front should be a.txt: {recent:?}"); - assert!(recent[1].ends_with("b.txt"), "second should be b.txt: {recent:?}"); + assert!( + recent[0].ends_with("a.txt"), + "front should be a.txt: {recent:?}" + ); + assert!( + recent[1].ends_with("b.txt"), + "second should be b.txt: {recent:?}" + ); } // Sanity: unused Path import guard diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 3d9739ac..65071f3e 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -102,8 +102,10 @@ impl App { } Event::ToolCallStart { name, .. } => { self.current_tool = Some(name.clone()); - self.output_queue - .push(OutputItem::Padded(MessageKind::Tool, format!("[tool] {name}"))); + self.output_queue.push(OutputItem::Padded( + MessageKind::Tool, + format!("[tool] {name}"), + )); } Event::ToolCallDone { name, arguments, .. @@ -117,7 +119,11 @@ impl App { Event::ToolResult { output, is_error, .. } => { - let prefix = if is_error { "[tool error]" } else { "[tool result]" }; + let prefix = if is_error { + "[tool error]" + } else { + "[tool result]" + }; let display = if output.len() > 200 { format!("{}...", &output[..200]) } else { @@ -242,10 +248,8 @@ impl App { "user" => { self.turn_index += 1; self.output_queue.push(OutputItem::Blank); - self.output_queue.push(OutputItem::TurnHeader(format!( - "#{}", - self.turn_index - ))); + self.output_queue + .push(OutputItem::TurnHeader(format!("#{}", self.turn_index))); MessageKind::User } "assistant" => MessageKind::Assistant, @@ -265,8 +269,10 @@ impl App { } "tool_call" => { let name = item["name"].as_str().unwrap_or("?"); - self.output_queue - .push(OutputItem::Padded(MessageKind::Tool, format!("[tool] {name}"))); + self.output_queue.push(OutputItem::Padded( + MessageKind::Tool, + format!("[tool] {name}"), + )); } "tool_result" => { let output = item["output"].as_str().unwrap_or(""); diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 0f9789ca..636e17b5 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -19,7 +19,10 @@ fn resolve_socket(pod_name: &str, override_path: Option) -> PathBuf { return p; } if let Ok(rd) = std::env::var("XDG_RUNTIME_DIR") { - PathBuf::from(rd).join("insomnia").join(pod_name).join("sock") + PathBuf::from(rd) + .join("insomnia") + .join(pod_name) + .join("sock") } else if let Ok(home) = std::env::var("HOME") { PathBuf::from(home) .join(".insomnia") @@ -163,12 +166,8 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { app.quit = true; None } - KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => { - Some(Method::Resume) - } - KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => { - Some(Method::Cancel) - } + KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Resume), + KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Cancel), KeyCode::Enter => app.submit_input(), KeyCode::Backspace => { app.delete_char_before(); diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 4e41fbeb..c4444a2f 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -1,10 +1,10 @@ +use ratatui::Frame; use ratatui::layout::{Alignment, Constraint, Layout, Position, Rect}; use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span}; use ratatui::widgets::{Block, Padding, Paragraph, Wrap}; -use ratatui::Frame; -use crate::app::{fmt_tokens, App, MessageKind, OutputItem}; +use crate::app::{App, MessageKind, OutputItem, fmt_tokens}; /// Draw the fixed viewport (3 lines: separator, status, input). pub fn draw(frame: &mut Frame, app: &App) { @@ -44,8 +44,7 @@ pub fn flush_output( OutputItem::TurnHeader(text) => { terminal.insert_before(1, |buf| { let style = kind_style(&MessageKind::TurnHeader); - Paragraph::new(Line::from(Span::styled(text, style))) - .render(buf.area, buf); + Paragraph::new(Line::from(Span::styled(text, style))).render(buf.area, buf); })?; } OutputItem::Padded(kind, text) => { @@ -119,10 +118,7 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) { let mut spans = vec![ conn, Span::raw(" "), - Span::styled( - &app.pod_name, - Style::default().add_modifier(Modifier::BOLD), - ), + Span::styled(&app.pod_name, Style::default().add_modifier(Modifier::BOLD)), ]; if app.running {