cargo fmt

This commit is contained in:
Keisuke Hirata 2026-04-14 03:13:36 +09:00
parent 7ec6e88605
commit a0a9df11c0
45 changed files with 389 additions and 351 deletions

View File

@ -0,0 +1 @@

View File

@ -41,6 +41,7 @@ use tracing_subscriber::EnvFilter;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use llm_worker::{ use llm_worker::{
Worker, Worker,
interceptor::{Interceptor, PostToolAction, ToolResultInfo},
llm_client::{ llm_client::{
LlmClient, LlmClient,
providers::{ providers::{
@ -48,7 +49,6 @@ use llm_worker::{
openai::OpenAIClient, openai::OpenAIClient,
}, },
}, },
interceptor::{Interceptor, PostToolAction, ToolResultInfo},
timeline::{Handler, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, ToolUseBlockKind}, timeline::{Handler, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, ToolUseBlockKind},
}; };
use llm_worker_macros::tool_registry; use llm_worker_macros::tool_registry;

View File

@ -9,8 +9,8 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use crate::tool::{Tool, ToolCall, ToolMeta, ToolResult};
use crate::Item; use crate::Item;
use crate::tool::{Tool, ToolCall, ToolMeta, ToolResult};
// ============================================================================= // =============================================================================
// Action Enums // Action Enums

View File

@ -43,8 +43,8 @@ mod worker;
pub(crate) mod callback; pub(crate) mod callback;
pub mod event; pub mod event;
pub mod llm_client;
pub mod interceptor; pub mod interceptor;
pub mod llm_client;
pub mod prune; pub mod prune;
pub mod state; pub mod state;
pub mod timeline; pub mod timeline;
@ -53,7 +53,7 @@ pub mod tool_server;
pub use callback::{TextBlockScope, ToolUseBlockScope}; pub use callback::{TextBlockScope, ToolUseBlockScope};
pub use handler::ToolUseBlockStart; pub use handler::ToolUseBlockStart;
pub use message::{ContentPart, Item, Message, Role};
pub use interceptor::Interceptor; pub use interceptor::Interceptor;
pub use message::{ContentPart, Item, Message, Role};
pub use tool::{ToolCall, ToolResult}; pub use tool::{ToolCall, ToolResult};
pub use worker::{RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult}; pub use worker::{RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult};

View File

@ -182,7 +182,10 @@ impl AnthropicScheme {
} }
Item::ToolResult { Item::ToolResult {
call_id, summary, content, .. call_id,
summary,
content,
..
} => { } => {
// Flush pending assistant parts first // Flush pending assistant parts first
if !pending_assistant_parts.is_empty() { if !pending_assistant_parts.is_empty() {

View File

@ -257,7 +257,10 @@ impl GeminiScheme {
} }
Item::ToolResult { Item::ToolResult {
call_id, summary, content, .. call_id,
summary,
content,
..
} => { } => {
// Flush pending model parts first // Flush pending model parts first
if !pending_model_parts.is_empty() { if !pending_model_parts.is_empty() {

View File

@ -212,7 +212,10 @@ impl OpenAIScheme {
} }
Item::ToolResult { Item::ToolResult {
call_id, summary, content, .. call_id,
summary,
content,
..
} => { } => {
// Flush pending tool calls before tool result // Flush pending tool calls before tool result
self.flush_pending_assistant( self.flush_pending_assistant(

View File

@ -191,7 +191,10 @@ mod tests {
assert_eq!(count, 2); assert_eq!(count, 2);
for item in &items { for item in &items {
if let Item::ToolResult { summary, content, .. } = item { if let Item::ToolResult {
summary, content, ..
} = item
{
if summary == "s1" || summary == "s2" { if summary == "s1" || summary == "s2" {
assert!(content.is_none(), "old content should be projected out"); assert!(content.is_none(), "old content should be projected out");
} else { } else {

View File

@ -56,13 +56,7 @@ impl From<String> for ToolOutput {
} }
} else { } else {
let lines = s.lines().count(); let lines = s.lines().count();
let first_line: String = s let first_line: String = s.lines().next().unwrap_or("").chars().take(80).collect();
.lines()
.next()
.unwrap_or("")
.chars()
.take(80)
.collect();
let summary = format!("{lines} lines | {first_line}"); let summary = format!("{lines} lines | {first_line}");
ToolOutput { ToolOutput {
summary, summary,

View File

@ -65,10 +65,7 @@ impl ToolServerHandle {
} }
/// Queue many tool factories for deferred initialization. /// Queue many tool factories for deferred initialization.
pub(crate) fn register_tools( pub(crate) fn register_tools(&self, factories: impl IntoIterator<Item = WorkerToolDefinition>) {
&self,
factories: impl IntoIterator<Item = WorkerToolDefinition>,
) {
let mut guard = self.pending.lock().unwrap_or_else(|e| e.into_inner()); let mut guard = self.pending.lock().unwrap_or_else(|e| e.into_inner());
guard.extend(factories); guard.extend(factories);
} }
@ -110,7 +107,11 @@ impl ToolServerHandle {
} }
/// Execute a tool by name. /// Execute a tool by name.
pub async fn call_tool(&self, name: &str, input_json: &str) -> Result<ToolOutput, ToolServerError> { pub async fn call_tool(
&self,
name: &str,
input_json: &str,
) -> Result<ToolOutput, ToolServerError> {
let tool = { let tool = {
let guard = self.tools.lock().unwrap_or_else(|e| e.into_inner()); let guard = self.tools.lock().unwrap_or_else(|e| e.into_inner());
let (_, tool) = guard let (_, tool) = guard

View File

@ -7,24 +7,23 @@ use tracing::{debug, info, trace, warn};
use crate::{ use crate::{
Item, Item,
llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition},
interceptor::{
DefaultInterceptor, Interceptor, PostToolAction, PreRequestAction, PreToolAction,
PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction,
},
state::{Locked, Mutable, WorkerState},
callback::{ callback::{
ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope, ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope,
ToolUseBlockScope, ToolUseBlockScope,
}, },
handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind}, handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind},
timeline::{TextBlockCollector, Timeline, ToolCallCollector}, interceptor::{
DefaultInterceptor, Interceptor, PostToolAction, PreRequestAction, PreToolAction,
PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction,
},
llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition},
state::{Locked, Mutable, WorkerState},
timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
timeline::{TextBlockCollector, Timeline, ToolCallCollector},
tool::{ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolResult}, tool::{ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolResult},
tool_server::{ToolServer, ToolServerHandle}, tool_server::{ToolServer, ToolServerHandle},
}; };
/// Worker errors /// Worker errors
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum WorkerError { pub enum WorkerError {
@ -53,7 +52,6 @@ pub enum ToolRegistryError {
DuplicateName(String), DuplicateName(String),
} }
/// Worker configuration /// Worker configuration
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct WorkerConfig { pub struct WorkerConfig {
@ -61,7 +59,6 @@ pub struct WorkerConfig {
_private: (), _private: (),
} }
/// Worker execution result (status) /// Worker execution result (status)
#[derive(Debug)] #[derive(Debug)]
pub enum WorkerResult { pub enum WorkerResult {
@ -95,7 +92,6 @@ enum ToolExecutionResult {
Paused, Paused,
} }
/// Central component for managing LLM interactions /// Central component for managing LLM interactions
/// ///
/// Receives input from the user, sends requests to the LLM, and /// Receives input from the user, sends requests to the LLM, and
@ -172,7 +168,6 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
_state: PhantomData<S>, _state: PhantomData<S>,
} }
impl<C: LlmClient, S: WorkerState> Worker<C, S> { impl<C: LlmClient, S: WorkerState> Worker<C, S> {
fn reset_interruption_state(&mut self) { fn reset_interruption_state(&mut self) {
self.last_run_interrupted = false; self.last_run_interrupted = false;
@ -214,10 +209,9 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
&mut self, &mut self,
setup: impl FnMut(&mut TextBlockScope) + Send + Sync + 'static, setup: impl FnMut(&mut TextBlockScope) + Send + Sync + 'static,
) { ) {
self.timeline self.timeline.on_text_block(ClosureTextBlockHandler {
.on_text_block(ClosureTextBlockHandler { setup: Box::new(setup),
setup: Box::new(setup), });
});
} }
/// Register a tool use block observer with scoped callbacks. /// Register a tool use block observer with scoped callbacks.
@ -240,17 +234,13 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
&mut self, &mut self,
setup: impl FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync + 'static, setup: impl FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync + 'static,
) { ) {
self.timeline self.timeline.on_tool_use_block(ClosureToolUseBlockHandler {
.on_tool_use_block(ClosureToolUseBlockHandler { setup: Box::new(setup),
setup: Box::new(setup), });
});
} }
/// Register a usage event callback. /// Register a usage event callback.
pub fn on_usage( pub fn on_usage(&mut self, callback: impl FnMut(&UsageEvent) + Send + Sync + 'static) {
&mut self,
callback: impl FnMut(&UsageEvent) + Send + Sync + 'static,
) {
self.timeline.on_usage(ClosureMetaHandler { self.timeline.on_usage(ClosureMetaHandler {
callback, callback,
_kind: PhantomData::<UsageKind>, _kind: PhantomData::<UsageKind>,
@ -258,10 +248,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
} }
/// Register a status event callback. /// Register a status event callback.
pub fn on_status( pub fn on_status(&mut self, callback: impl FnMut(&StatusEvent) + Send + Sync + 'static) {
&mut self,
callback: impl FnMut(&StatusEvent) + Send + Sync + 'static,
) {
self.timeline.on_status(ClosureMetaHandler { self.timeline.on_status(ClosureMetaHandler {
callback, callback,
_kind: PhantomData::<StatusKind>, _kind: PhantomData::<StatusKind>,
@ -269,10 +256,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
} }
/// Register an error event callback. /// Register an error event callback.
pub fn on_error( pub fn on_error(&mut self, callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static) {
&mut self,
callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static,
) {
self.timeline.on_error(ClosureMetaHandler { self.timeline.on_error(ClosureMetaHandler {
callback, callback,
_kind: PhantomData::<ErrorKind>, _kind: PhantomData::<ErrorKind>,
@ -280,18 +264,12 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
} }
/// Register a turn-start callback (receives 0-based turn number). /// Register a turn-start callback (receives 0-based turn number).
pub fn on_turn_start( pub fn on_turn_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
&mut self,
callback: impl Fn(usize) + Send + Sync + 'static,
) {
self.turn_start_cbs.push(Box::new(callback)); self.turn_start_cbs.push(Box::new(callback));
} }
/// Register a turn-end callback (receives 0-based turn number). /// Register a turn-end callback (receives 0-based turn number).
pub fn on_turn_end( pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
&mut self,
callback: impl Fn(usize) + Send + Sync + 'static,
) {
self.turn_end_cbs.push(Box::new(callback)); self.turn_end_cbs.push(Box::new(callback));
} }
@ -735,9 +713,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// prunable candidates whose estimated savings meet the // prunable candidates whose estimated savings meet the
// threshold. Worker does not own usage history itself; the // threshold. Worker does not own usage history itself; the
// estimator is injected by the layer that does. // estimator is injected by the layer that does.
if let (Some(config), Some(estimator)) = if let (Some(config), Some(estimator)) = (&self.prune_config, &self.savings_estimator) {
(&self.prune_config, &self.savings_estimator)
{
let candidates = let candidates =
crate::prune::prunable_indices(&request_context, config.protected_turns); crate::prune::prunable_indices(&request_context, config.protected_turns);
if !candidates.is_empty() { if !candidates.is_empty() {
@ -745,8 +721,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
let last = *candidates.last().unwrap() + 1; let last = *candidates.last().unwrap() + 1;
let savings = estimator(&request_context, first..last); let savings = estimator(&request_context, first..last);
if savings >= config.min_savings { if savings >= config.min_savings {
let pruned = let pruned = crate::prune::project(&mut request_context, &candidates);
crate::prune::project(&mut request_context, &candidates);
if pruned > 0 { if pruned > 0 {
debug!( debug!(
pruned, pruned,
@ -817,7 +792,11 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
if let Some(max) = self.max_turns { if let Some(max) = self.max_turns {
if self.turn_count >= max as usize { if self.turn_count >= max as usize {
info!(turn_count = self.turn_count, max_turns = max, "Turn limit reached"); info!(
turn_count = self.turn_count,
max_turns = max,
"Turn limit reached"
);
self.last_run_interrupted = false; self.last_run_interrupted = false;
return Ok(WorkerResult::LimitReached); return Ok(WorkerResult::LimitReached);
} }
@ -911,10 +890,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
content, content,
)); ));
} else { } else {
self.history.push(Item::tool_result( self.history
&result.tool_use_id, .push(Item::tool_result(&result.tool_use_id, &result.summary));
&result.summary,
));
} }
} }
Ok(None) Ok(None)
@ -925,10 +902,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
} }
} }
} }
} }
impl<C: LlmClient> Worker<C, Mutable> { impl<C: LlmClient> Worker<C, Mutable> {
/// Create a new Worker (in Mutable state) /// Create a new Worker (in Mutable state)
pub fn new(client: C) -> Self { pub fn new(client: C) -> Self {
@ -975,10 +950,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
} }
/// Register multiple tool factories for deferred initialization. /// Register multiple tool factories for deferred initialization.
pub fn register_tools( pub fn register_tools(&mut self, factories: impl IntoIterator<Item = WorkerToolDefinition>) {
&mut self,
factories: impl IntoIterator<Item = WorkerToolDefinition>,
) {
self.tool_server.register_tools(factories); self.tool_server.register_tools(factories);
} }
@ -1086,45 +1058,38 @@ impl<C: LlmClient> Worker<C, Mutable> {
/// ///
/// Available only in Mutable state. /// Available only in Mutable state.
pub fn history_mut(&mut self) -> &mut Vec<Item> { pub fn history_mut(&mut self) -> &mut Vec<Item> {
&mut self.history &mut self.history
} }
/// Set history /// Set history
pub fn set_history(&mut self, items: Vec<Item>) { pub fn set_history(&mut self, items: Vec<Item>) {
self.history = items; self.history = items;
} }
/// Add an item to history (builder pattern) /// Add an item to history (builder pattern)
pub fn with_item(mut self, item: Item) -> Self { pub fn with_item(mut self, item: Item) -> Self {
self.history.push(item); self.history.push(item);
self self
} }
/// Add an item to history /// Add an item to history
pub fn push_item(&mut self, item: Item) { pub fn push_item(&mut self, item: Item) {
self.history.push(item); self.history.push(item);
} }
/// Add multiple items to history (builder pattern) /// Add multiple items to history (builder pattern)
pub fn with_items(mut self, items: impl IntoIterator<Item = Item>) -> Self { pub fn with_items(mut self, items: impl IntoIterator<Item = Item>) -> Self {
self.history.extend(items); self.history.extend(items);
self self
} }
/// Add multiple items to history /// Add multiple items to history
pub fn extend_history(&mut self, items: impl IntoIterator<Item = Item>) { pub fn extend_history(&mut self, items: impl IntoIterator<Item = Item>) {
self.history.extend(items); self.history.extend(items);
} }
/// Clear history /// Clear history
pub fn clear_history(&mut self) { pub fn clear_history(&mut self) {
self.history.clear(); self.history.clear();
} }
@ -1156,13 +1121,13 @@ impl<C: LlmClient> Worker<C, Mutable> {
/// ///
/// Subsequent runs can use [`Worker<C, Locked>::run()`] directly. /// Subsequent runs can use [`Worker<C, Locked>::run()`] directly.
/// To edit state between turns, call [`unlock()`](Worker::unlock) first. /// To edit state between turns, call [`unlock()`](Worker::unlock) first.
pub async fn run( pub async fn run(self, user_input: impl Into<String>) -> Result<RunOutput<C>, WorkerError> {
self,
user_input: impl Into<String>,
) -> Result<RunOutput<C>, WorkerError> {
let mut locked = self.lock(); let mut locked = self.lock();
let result = locked.run(user_input).await?; let result = locked.run(user_input).await?;
Ok(RunOutput { worker: locked, result }) Ok(RunOutput {
worker: locked,
result,
})
} }
/// Resume from Paused, consuming self and transitioning to Locked. /// Resume from Paused, consuming self and transitioning to Locked.
@ -1171,7 +1136,10 @@ impl<C: LlmClient> Worker<C, Mutable> {
pub async fn resume(self) -> Result<RunOutput<C>, WorkerError> { pub async fn resume(self) -> Result<RunOutput<C>, WorkerError> {
let mut locked = self.lock(); let mut locked = self.lock();
let result = locked.resume().await?; let result = locked.resume().await?;
Ok(RunOutput { worker: locked, result }) Ok(RunOutput {
worker: locked,
result,
})
} }
/// Lock and transition to Locked state /// Lock and transition to Locked state
@ -1216,7 +1184,6 @@ impl<C: LlmClient> Worker<C, Mutable> {
} }
} }
impl<C: LlmClient> Worker<C, Locked> { impl<C: LlmClient> Worker<C, Locked> {
/// Execute a turn /// Execute a turn
/// ///

View File

@ -8,8 +8,10 @@ use std::time::{Duration, Instant};
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::Worker; use llm_worker::Worker;
use llm_worker::interceptor::{
Interceptor, PostToolAction, PreToolAction, ToolCallInfo, ToolResultInfo,
};
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent};
use llm_worker::interceptor::{Interceptor, PostToolAction, PreToolAction, ToolCallInfo, ToolResultInfo};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
mod common; mod common;

View File

@ -77,8 +77,14 @@ async fn test_basic_tool_generation() {
let result = tool.execute(r#"{"message": "World"}"#).await; let result = tool.execute(r#"{"message": "World"}"#).await;
assert!(result.is_ok(), "Should execute successfully"); assert!(result.is_ok(), "Should execute successfully");
let output = result.unwrap(); let output = result.unwrap();
assert!(output.summary.contains("Hello"), "Output should contain prefix"); assert!(
assert!(output.summary.contains("World"), "Output should contain message"); output.summary.contains("Hello"),
"Output should contain prefix"
);
assert!(
output.summary.contains("World"),
"Output should contain message"
);
} }
#[tokio::test] #[tokio::test]
@ -94,7 +100,11 @@ async fn test_multiple_arguments() {
let result = tool.execute(r#"{"a": 10, "b": 20}"#).await; let result = tool.execute(r#"{"a": 10, "b": 20}"#).await;
assert!(result.is_ok()); assert!(result.is_ok());
let output = result.unwrap(); let output = result.unwrap();
assert!(output.summary.contains("30"), "Should contain sum: {:?}", output); assert!(
output.summary.contains("30"),
"Should contain sum: {:?}",
output
);
} }
#[tokio::test] #[tokio::test]
@ -168,7 +178,11 @@ async fn test_result_return_type_success() {
let result = tool.execute(r#"{"value": 42}"#).await; let result = tool.execute(r#"{"value": 42}"#).await;
assert!(result.is_ok(), "Should succeed for positive value"); assert!(result.is_ok(), "Should succeed for positive value");
let output = result.unwrap(); let output = result.unwrap();
assert!(output.summary.contains("Valid"), "Should contain Valid: {:?}", output); assert!(
output.summary.contains("Valid"),
"Should contain Valid: {:?}",
output
);
} }
#[tokio::test] #[tokio::test]

View File

@ -11,9 +11,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait; use async_trait::async_trait;
use common::MockLlmClient; use common::MockLlmClient;
use llm_worker::Item; use llm_worker::Item;
use llm_worker::{Worker, WorkerError};
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use llm_worker::{Worker, WorkerError};
// ============================================================================= // =============================================================================
// Mutable State Tests // Mutable State Tests

View File

@ -113,9 +113,15 @@ pub struct CompactionConfig {
pub provider: Option<ProviderConfig>, pub provider: Option<ProviderConfig>,
} }
fn default_prune_protected_turns() -> usize { 3 } fn default_prune_protected_turns() -> usize {
fn default_prune_min_savings() -> u64 { 4096 } 3
fn default_compact_retained_turns() -> usize { 2 } }
fn default_prune_min_savings() -> u64 {
4096
}
fn default_compact_retained_turns() -> usize {
2
}
impl Default for CompactionConfig { impl Default for CompactionConfig {
fn default() -> Self { fn default() -> Self {

View File

@ -88,8 +88,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Wait for completion // Wait for completion
tokio::time::sleep(std::time::Duration::from_secs(15)).await; tokio::time::sleep(std::time::Duration::from_secs(15)).await;
println!("\n[shared_state] final: {}", handle.shared_state.status_json()); println!(
println!("[history] {} bytes", handle.shared_state.history_json().len()); "\n[shared_state] final: {}",
handle.shared_state.status_json()
);
println!(
"[history] {} bytes",
handle.shared_state.history_json().len()
);
drop(handle); drop(handle);
let _ = listener.await; let _ = listener.await;

View File

@ -9,11 +9,11 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::Item;
use llm_worker::interceptor::{ use llm_worker::interceptor::{
Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo,
ToolResultInfo, TurnEndAction, ToolResultInfo, TurnEndAction,
}; };
use llm_worker::Item;
use tracing::info; use tracing::info;
use crate::compact_state::CompactState; use crate::compact_state::CompactState;

View File

@ -1,16 +1,16 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::WorkerError; use llm_worker::WorkerError;
use llm_worker::llm_client::client::LlmClient;
use session_store::Store; use session_store::Store;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use crate::pod::{Pod, PodRunResult, PodError}; use crate::pod::{Pod, PodError, PodRunResult};
use protocol::{ErrorCode, Event, Method, RunResult, TurnResult};
use crate::runtime_dir::RuntimeDir; use crate::runtime_dir::RuntimeDir;
use crate::shared_state::{PodSharedState, PodStatus}; use crate::shared_state::{PodSharedState, PodStatus};
use crate::socket_server::SocketServer; use crate::socket_server::SocketServer;
use protocol::{ErrorCode, Event, Method, RunResult, TurnResult};
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// PodHandle — client-facing, Clone-able // PodHandle — client-facing, Clone-able

View File

@ -8,11 +8,11 @@
//! concerns belong. //! concerns belong.
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::Item;
use llm_worker::interceptor::{ use llm_worker::interceptor::{
PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo,
TurnEndAction, TurnEndAction,
}; };
use llm_worker::Item;
// ============================================================================= // =============================================================================
// Hook Event Kinds // Hook Event Kinds

View File

@ -3,11 +3,11 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::Item;
use llm_worker::interceptor::{ use llm_worker::interceptor::{
Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo,
ToolResultInfo, TurnEndAction, ToolResultInfo, TurnEndAction,
}; };
use llm_worker::Item;
use crate::hook::HookRegistry; use crate::hook::HookRegistry;

View File

@ -15,8 +15,8 @@ mod usage_tracker;
pub use token_counter::{EstimateSource, SplitPoint, TokenEstimate}; pub use token_counter::{EstimateSource, SplitPoint, TokenEstimate};
pub use controller::{PodController, PodHandle}; pub use controller::{PodController, PodHandle};
pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope};
pub use hook::{Hook, HookEventKind, HookRegistryBuilder}; pub use hook::{Hook, HookEventKind, HookRegistryBuilder};
pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope};
pub use pod::{Pod, PodError, PodRunResult, apply_worker_manifest}; pub use pod::{Pod, PodError, PodRunResult, apply_worker_manifest};
pub use protocol::{ErrorCode, Event, Method, TurnResult}; pub use protocol::{ErrorCode, Event, Method, TurnResult};
pub use provider::{ProviderError, build_client}; pub use provider::{ProviderError, build_client};

View File

@ -2,8 +2,8 @@ use std::path::{Path, PathBuf};
use std::process::ExitCode; use std::process::ExitCode;
use clap::Parser; use clap::Parser;
use session_store::FsStore;
use pod::{Pod, PodController}; use pod::{Pod, PodController};
use session_store::FsStore;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "pod", about = "Run a Pod process from a manifest file")] #[command(name = "pod", about = "Run a Pod process from a manifest file")]
@ -18,9 +18,8 @@ struct Cli {
} }
fn default_store_dir() -> Result<PathBuf, std::io::Error> { fn default_store_dir() -> Result<PathBuf, std::io::Error> {
let home = std::env::var("HOME").map_err(|_| { let home = std::env::var("HOME")
std::io::Error::new(std::io::ErrorKind::NotFound, "HOME is not set") .map_err(|_| std::io::Error::new(std::io::ErrorKind::NotFound, "HOME is not set"))?;
})?;
Ok(PathBuf::from(home).join(".insomnia").join("sessions")) Ok(PathBuf::from(home).join(".insomnia").join("sessions"))
} }
@ -111,7 +110,10 @@ async fn main() -> ExitCode {
} }
}; };
eprintln!("pod: {pod_name} listening on {:?}", handle.runtime_dir.socket_path()); eprintln!(
"pod: {pod_name} listening on {:?}",
handle.runtime_dir.socket_path()
);
// Wait for shutdown signal // Wait for shutdown signal
match tokio::signal::ctrl_c().await { match tokio::signal::ctrl_c().await {

View File

@ -2,8 +2,8 @@ use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use llm_worker::Item; use llm_worker::Item;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable; use llm_worker::state::Mutable;
use llm_worker::{Worker, WorkerError, WorkerResult}; use llm_worker::{Worker, WorkerError, WorkerResult};
use session_store::{ use session_store::{
@ -21,8 +21,8 @@ use crate::hook::{
}; };
use crate::hook_interceptor::HookInterceptor; use crate::hook_interceptor::HookInterceptor;
use crate::usage_tracker::UsageTracker; use crate::usage_tracker::UsageTracker;
use llm_worker::interceptor::PreRequestAction;
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::interceptor::PreRequestAction;
/// Pre-LLM-request hook that records `history.len()` at send time into a /// Pre-LLM-request hook that records `history.len()` at send time into a
/// shared `UsageTracker`. The on_usage callback later pairs this with the /// shared `UsageTracker`. The on_usage callback later pairs this with the
@ -205,7 +205,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// Returns a clone since the underlying vector is shared with hooks /// Returns a clone since the underlying vector is shared with hooks
/// running on the Worker. /// running on the Worker.
pub fn usage_history(&self) -> Vec<UsageRecord> { pub fn usage_history(&self) -> Vec<UsageRecord> {
self.usage_history.lock().expect("usage_history poisoned").clone() self.usage_history
.lock()
.expect("usage_history poisoned")
.clone()
} }
/// Shared handle to the cumulative Usage history. /// Shared handle to the cumulative Usage history.
@ -292,10 +295,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Pre-LLM-request hook: capture history.len() into the // Pre-LLM-request hook: capture history.len() into the
// UsageTracker so the upcoming on_usage callback can pair // UsageTracker so the upcoming on_usage callback can pair
// it with the measured input_tokens. // it with the measured input_tokens.
self.hook_builder self.hook_builder.add_pre_llm_request(UsageTrackingHook {
.add_pre_llm_request(UsageTrackingHook { tracker: self.usage_tracker.clone(),
tracker: self.usage_tracker.clone(), });
});
let builder = std::mem::take(&mut self.hook_builder); let builder = std::mem::take(&mut self.hook_builder);
let registry = Arc::new(builder.build()); let registry = Arc::new(builder.build());
@ -430,8 +432,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// async layout cycle (`run → handle_worker_result → do_compact_and_resume → resume`). /// async layout cycle (`run → handle_worker_result → do_compact_and_resume → resume`).
fn do_compact_and_resume( fn do_compact_and_resume(
&mut self, &mut self,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<PodRunResult, PodError>> + Send + '_>> ) -> std::pin::Pin<
{ Box<dyn std::future::Future<Output = Result<PodRunResult, PodError>> + Send + '_>,
> {
Box::pin(async move { Box::pin(async move {
// Thrash detection: if we just compacted and hit the threshold again, // Thrash detection: if we just compacted and hit the threshold again,
// something is wrong. // something is wrong.
@ -475,9 +478,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// Best-effort: failures are logged but do not propagate. /// Best-effort: failures are logged but do not propagate.
pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> { pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> {
let state = match self.compact_state.as_ref() { let state = match self.compact_state.as_ref() {
Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => { Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => s.clone(),
s.clone()
}
_ => return Ok(()), _ => return Ok(()),
}; };
@ -509,13 +510,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// head_hash mutable). // head_hash mutable).
let w = self.worker.as_ref().unwrap(); let w = self.worker.as_ref().unwrap();
let new_items = &w.history()[history_before..]; let new_items = &w.history()[history_before..];
session_store::save_delta( session_store::save_delta(&self.store, self.session_id, &mut self.head_hash, new_items)
&self.store, .await?;
self.session_id,
&mut self.head_hash,
new_items,
)
.await?;
let turn_count = self.worker.as_ref().unwrap().turn_count(); let turn_count = self.worker.as_ref().unwrap().turn_count();
session_store::save_turn_end( session_store::save_turn_end(
@ -544,7 +540,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
record.output_tokens, record.output_tokens,
) )
.await?; .await?;
self.usage_history.lock().expect("usage_history poisoned").push(record); self.usage_history
.lock()
.expect("usage_history poisoned")
.push(record);
} }
let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); let interrupted = self.worker.as_ref().unwrap().last_run_interrupted();
@ -578,10 +577,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// - a clone of the main LlmClient via `clone_boxed()`. /// - a clone of the main LlmClient via `clone_boxed()`.
/// ///
/// Returns the new session ID. /// Returns the new session ID.
pub async fn compact( pub async fn compact(&mut self, retained_turns: usize) -> Result<SessionId, PodError> {
&mut self,
retained_turns: usize,
) -> Result<SessionId, PodError> {
let worker = self.worker.as_ref().expect("worker taken during run"); let worker = self.worker.as_ref().expect("worker taken during run");
let history = worker.history(); let history = worker.history();
@ -612,13 +608,20 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.temperature(0.0); .temperature(0.0);
summary_worker.set_max_tokens(2048); summary_worker.set_max_tokens(2048);
let out = summary_worker.run(summary_prompt).await let out = summary_worker
.run(summary_prompt)
.await
.map_err(PodError::Worker)?; .map_err(PodError::Worker)?;
let summary_text = out.worker let summary_text = out
.worker
.history() .history()
.iter() .iter()
.filter_map(|item| { .filter_map(|item| {
if item.is_assistant_message() { item.as_text().map(String::from) } else { None } if item.is_assistant_message() {
item.as_text().map(String::from)
} else {
None
}
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join("\n"); .join("\n");
@ -632,7 +635,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Persist as a new compacted session. // Persist as a new compacted session.
let old_session_id = self.session_id; let old_session_id = self.session_id;
let old_head_hash = self.head_hash.clone() let old_head_hash = self
.head_hash
.clone()
.expect("head_hash should be set after at least one entry"); .expect("head_hash should be set after at least one entry");
let w = self.worker.as_ref().unwrap(); let w = self.worker.as_ref().unwrap();
@ -655,7 +660,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.session_id = new_session_id; self.session_id = new_session_id;
self.head_hash = Some(new_head_hash); self.head_hash = Some(new_head_hash);
self.worker.as_mut().unwrap().set_history(new_history); self.worker.as_mut().unwrap().set_history(new_history);
self.usage_history.lock().expect("usage_history poisoned").clear(); self.usage_history
.lock()
.expect("usage_history poisoned")
.clear();
Ok(new_session_id) Ok(new_session_id)
} }
@ -715,7 +723,6 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
pod.apply_prune_from_manifest(); pod.apply_prune_from_manifest();
Ok(pod) Ok(pod)
} }
} }
/// Apply worker-level manifest settings to a Worker. /// Apply worker-level manifest settings to a Worker.
@ -769,18 +776,24 @@ fn build_summary_prompt(items: &[Item]) -> String {
llm_worker::Role::Assistant => "Assistant", llm_worker::Role::Assistant => "Assistant",
llm_worker::Role::System => "System", llm_worker::Role::System => "System",
}; };
let text: String = content.iter().map(|p| p.as_text()).collect::<Vec<_>>().join(""); let text: String = content
.iter()
.map(|p| p.as_text())
.collect::<Vec<_>>()
.join("");
lines.push(format!("[{role_label}] {text}")); lines.push(format!("[{role_label}] {text}"));
} }
Item::ToolCall { name, arguments, .. } => { Item::ToolCall {
name, arguments, ..
} => {
lines.push(format!("[ToolCall] {name}({arguments})")); lines.push(format!("[ToolCall] {name}({arguments})"));
} }
Item::ToolResult { summary, content, .. } => { Item::ToolResult {
match content { summary, content, ..
Some(c) => lines.push(format!("[ToolResult] {summary}\n{c}")), } => match content {
None => lines.push(format!("[ToolResult] {summary}")), Some(c) => lines.push(format!("[ToolResult] {summary}\n{c}")),
} None => lines.push(format!("[ToolResult] {summary}")),
} },
Item::Reasoning { text, .. } => { Item::Reasoning { text, .. } => {
lines.push(format!("[Reasoning] {text}")); lines.push(format!("[Reasoning] {text}"));
} }

View File

@ -1,8 +1,8 @@
use std::sync::RwLock; use std::sync::RwLock;
use llm_worker::llm_client::types::Item; use llm_worker::llm_client::types::Item;
use session_store::SessionId;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session_store::SessionId;
/// Shared state between PodController and runtime directory. /// Shared state between PodController and runtime directory.
/// ///
@ -25,11 +25,7 @@ pub enum PodStatus {
} }
impl PodSharedState { impl PodSharedState {
pub fn new( pub fn new(pod_name: String, session_id: SessionId, manifest_toml: String) -> Self {
pod_name: String,
session_id: SessionId,
manifest_toml: String,
) -> Self {
Self { Self {
pod_name, pod_name,
session_id, session_id,

View File

@ -42,10 +42,7 @@ impl SocketServer {
} }
}); });
Ok(Self { Ok(Self { _accept_task, path })
_accept_task,
path,
})
} }
/// The socket file path. /// The socket file path.

View File

@ -140,9 +140,7 @@ fn tokens_at(
let up_bytes = prefix[up.history_len.min(cap)]; let up_bytes = prefix[up.history_len.min(cap)];
let at_bytes = prefix[index]; let at_bytes = prefix[index];
let span_bytes = up_bytes.saturating_sub(lo_bytes); let span_bytes = up_bytes.saturating_sub(lo_bytes);
let span_tokens = up let span_tokens = up.input_total_tokens.saturating_sub(lo.input_total_tokens);
.input_total_tokens
.saturating_sub(lo.input_total_tokens);
if span_bytes == 0 || span_tokens == 0 { if span_bytes == 0 || span_tokens == 0 {
return TokenEstimate { return TokenEstimate {
tokens: lo.input_total_tokens, tokens: lo.input_total_tokens,
@ -198,11 +196,7 @@ fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate
tokens_at(history, records, history.len(), &prefix) tokens_at(history, records, history.len(), &prefix)
} }
fn split_for_retained_impl( fn split_for_retained_impl(history: &[Item], records: &[UsageRecord], retained: u64) -> SplitPoint {
history: &[Item],
records: &[UsageRecord],
retained: u64,
) -> SplitPoint {
let prefix = prefix_bytes(history); let prefix = prefix_bytes(history);
let current = tokens_at(history, records, history.len(), &prefix); let current = tokens_at(history, records, history.len(), &prefix);
if current.tokens <= retained { if current.tokens <= retained {
@ -351,12 +345,7 @@ mod tests {
#[test] #[test]
fn split_interpolated_between_measurements() { fn split_interpolated_between_measurements() {
let history = vec![ let history = vec![msg("aaaaaa"), msg("bbbbbb"), msg("cccccc"), msg("dddddd")];
msg("aaaaaa"),
msg("bbbbbb"),
msg("cccccc"),
msg("dddddd"),
];
let records = vec![record(1, 50), record(4, 400)]; let records = vec![record(1, 50), record(4, 400)];
let cut = split_for_retained_impl(&history, &records, 250); let cut = split_for_retained_impl(&history, &records, 250);
assert!(cut.index > 1 && cut.index <= 4); assert!(cut.index > 1 && cut.index <= 4);

View File

@ -1,17 +1,15 @@
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait; use async_trait::async_trait;
use futures::Stream; use futures::Stream;
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::Worker;
use session_store::FsStore; use session_store::FsStore;
use pod::{ use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus};
Event, Method, Pod, PodController, PodManifest, PodStatus,
};
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Mock LLM Client // Mock LLM Client

View File

@ -1,7 +1,7 @@
use std::io; use std::io;
use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
/// JSONL line reader over an async byte stream. /// JSONL line reader over an async byte stream.

View File

@ -38,7 +38,10 @@ fn resolve_api_key(
if let Some(ref raw_path) = config.api_key_file { if let Some(ref raw_path) = config.api_key_file {
let path = expand_key_path(raw_path, manifest_dir)?; let path = expand_key_path(raw_path, manifest_dir)?;
let contents = std::fs::read_to_string(&path).map_err(|e| { let contents = std::fs::read_to_string(&path).map_err(|e| {
ProviderError::Config(format!("failed to read api_key_file {}: {e}", path.display())) ProviderError::Config(format!(
"failed to read api_key_file {}: {e}",
path.display()
))
})?; })?;
return Ok(Some(contents.trim().to_owned())); return Ok(Some(contents.trim().to_owned()));
} }
@ -47,10 +50,7 @@ fn resolve_api_key(
} }
/// Expand `~` and resolve relative paths against `manifest_dir`. /// Expand `~` and resolve relative paths against `manifest_dir`.
fn expand_key_path( fn expand_key_path(raw: &Path, manifest_dir: Option<&Path>) -> Result<PathBuf, ProviderError> {
raw: &Path,
manifest_dir: Option<&Path>,
) -> Result<PathBuf, ProviderError> {
let path = if raw.starts_with("~") { let path = if raw.starts_with("~") {
let home = std::env::var("HOME") let home = std::env::var("HOME")
.map_err(|_| ProviderError::Config("HOME is not set for ~ expansion".into()))?; .map_err(|_| ProviderError::Config("HOME is not set for ~ expansion".into()))?;

View File

@ -4,10 +4,10 @@
//! - Session log: `{root}/{session_id}.jsonl` //! - Session log: `{root}/{session_id}.jsonl`
//! - Event trace: `{root}/{session_id}.trace.jsonl` //! - Event trace: `{root}/{session_id}.trace.jsonl`
use crate::SessionId;
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::session_log::{EntryHash, HashedEntry}; use crate::session_log::{EntryHash, HashedEntry};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::SessionId;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@ -50,19 +50,16 @@ impl FsStore {
Ok(()) Ok(())
} }
fn parse_jsonl<T: serde::de::DeserializeOwned>( fn parse_jsonl<T: serde::de::DeserializeOwned>(content: &str) -> Result<Vec<T>, StoreError> {
content: &str,
) -> Result<Vec<T>, StoreError> {
let mut entries = Vec::new(); let mut entries = Vec::new();
for (i, line) in content.lines().enumerate() { for (i, line) in content.lines().enumerate() {
if line.trim().is_empty() { if line.trim().is_empty() {
continue; continue;
} }
let entry: T = let entry: T = serde_json::from_str(line).map_err(|e| StoreError::Corrupt {
serde_json::from_str(line).map_err(|e| StoreError::Corrupt { line: i + 1,
line: i + 1, message: e.to_string(),
message: e.to_string(), })?;
})?;
entries.push(entry); entries.push(entry);
} }
Ok(entries) Ok(entries)
@ -122,10 +119,7 @@ impl Store for FsStore {
Ok(self.log_path(id).exists()) Ok(self.log_path(id).exists())
} }
async fn read_head_hash( async fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
&self,
id: SessionId,
) -> Result<Option<EntryHash>, StoreError> {
let path = self.log_path(id); let path = self.log_path(id);
if !path.exists() { if !path.exists() {
return Err(StoreError::NotFound(id)); return Err(StoreError::NotFound(id));
@ -134,23 +128,18 @@ impl Store for FsStore {
let last_line = content.lines().rev().find(|l| !l.trim().is_empty()); let last_line = content.lines().rev().find(|l| !l.trim().is_empty());
match last_line { match last_line {
Some(line) => { Some(line) => {
let entry: HashedEntry = serde_json::from_str(line).map_err(|e| { let entry: HashedEntry =
StoreError::Corrupt { serde_json::from_str(line).map_err(|e| StoreError::Corrupt {
line: content.lines().count(), line: content.lines().count(),
message: e.to_string(), message: e.to_string(),
} })?;
})?;
Ok(Some(entry.hash)) Ok(Some(entry.hash))
} }
None => Ok(None), None => Ok(None),
} }
} }
async fn append_trace( async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
&self,
id: SessionId,
entry: &TraceEntry,
) -> Result<(), StoreError> {
let line = serde_json::to_string(entry)?; let line = serde_json::to_string(entry)?;
self.append_line(&self.trace_path(id), &line).await self.append_line(&self.trace_path(id), &line).await
} }

View File

@ -35,9 +35,9 @@ pub mod store;
pub use event_trace::TraceEntry; pub use event_trace::TraceEntry;
pub use fs_store::FsStore; pub use fs_store::FsStore;
pub use session::{ pub use session::{
SessionStartState, create_compacted_session, create_session, ensure_head_or_fork, fork, fork_at, SessionStartState, create_compacted_session, create_session, ensure_head_or_fork, fork,
restore, save_cache_locked, save_cache_unlocked, save_config_changed, save_delta, save_outcome, fork_at, restore, save_cache_locked, save_cache_unlocked, save_config_changed, save_delta,
save_turn_end, save_usage, save_outcome, save_turn_end, save_usage,
}; };
pub use session_log::{ pub use session_log::{
EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord, EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord,

View File

@ -4,11 +4,11 @@
//! The caller (typically Pod) holds the Worker directly and calls these //! The caller (typically Pod) holds the Worker directly and calls these
//! functions after state-mutating operations. //! functions after state-mutating operations.
use crate::SessionId;
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome, SessionOrigin}; use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome, SessionOrigin};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::SessionId;
use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::types::Item;
/// State snapshot for creating a SessionStart entry. /// State snapshot for creating a SessionStart entry.
pub struct SessionStartState<'a> { pub struct SessionStartState<'a> {
@ -142,10 +142,15 @@ pub async fn save_delta(
while i < new_items.len() { while i < new_items.len() {
let item = &new_items[i]; let item = &new_items[i];
if item.is_user_message() { if item.is_user_message() {
append_entry(store, session_id, head_hash, LogEntry::UserInput { append_entry(
ts, store,
item: new_items[i].clone(), session_id,
}) head_hash,
LogEntry::UserInput {
ts,
item: new_items[i].clone(),
},
)
.await?; .await?;
i += 1; i += 1;
} else if item.is_tool_result() { } else if item.is_tool_result() {
@ -153,10 +158,15 @@ pub async fn save_delta(
while i < new_items.len() && new_items[i].is_tool_result() { while i < new_items.len() && new_items[i].is_tool_result() {
i += 1; i += 1;
} }
append_entry(store, session_id, head_hash, LogEntry::ToolResults { append_entry(
ts, store,
items: new_items[start..i].to_vec(), session_id,
}) head_hash,
LogEntry::ToolResults {
ts,
items: new_items[start..i].to_vec(),
},
)
.await?; .await?;
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
let start = i; let start = i;
@ -167,16 +177,26 @@ pub async fn save_delta(
{ {
i += 1; i += 1;
} }
append_entry(store, session_id, head_hash, LogEntry::AssistantItems { append_entry(
ts, store,
items: new_items[start..i].to_vec(), session_id,
}) head_hash,
LogEntry::AssistantItems {
ts,
items: new_items[start..i].to_vec(),
},
)
.await?; .await?;
} else { } else {
append_entry(store, session_id, head_hash, LogEntry::HookInjectedItems { append_entry(
ts, store,
items: vec![new_items[i].clone()], session_id,
}) head_hash,
LogEntry::HookInjectedItems {
ts,
items: vec![new_items[i].clone()],
},
)
.await?; .await?;
i += 1; i += 1;
} }
@ -191,10 +211,15 @@ pub async fn save_turn_end(
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
turn_count: usize, turn_count: usize,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::TurnEnd { append_entry(
ts: session_log::now_millis(), store,
turn_count, session_id,
}) head_hash,
LogEntry::TurnEnd {
ts: session_log::now_millis(),
turn_count,
},
)
.await .await
} }
@ -206,11 +231,16 @@ pub async fn save_outcome(
outcome: Outcome, outcome: Outcome,
interrupted: bool, interrupted: bool,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::RunOutcome { append_entry(
ts: session_log::now_millis(), store,
outcome, session_id,
interrupted, head_hash,
}) LogEntry::RunOutcome {
ts: session_log::now_millis(),
outcome,
interrupted,
},
)
.await .await
} }
@ -230,14 +260,19 @@ pub async fn save_usage(
cache_write_tokens: u64, cache_write_tokens: u64,
output_tokens: u64, output_tokens: u64,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::LlmUsage { append_entry(
ts: session_log::now_millis(), store,
history_len, session_id,
input_total_tokens, head_hash,
cache_read_tokens, LogEntry::LlmUsage {
cache_write_tokens, ts: session_log::now_millis(),
output_tokens, history_len,
}) input_total_tokens,
cache_read_tokens,
cache_write_tokens,
output_tokens,
},
)
.await .await
} }
@ -248,10 +283,15 @@ pub async fn save_cache_locked(
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
locked_prefix_len: usize, locked_prefix_len: usize,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::Locked { append_entry(
ts: session_log::now_millis(), store,
locked_prefix_len, session_id,
}) head_hash,
LogEntry::Locked {
ts: session_log::now_millis(),
locked_prefix_len,
},
)
.await .await
} }
@ -261,9 +301,14 @@ pub async fn save_cache_unlocked(
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::CacheUnlocked { append_entry(
ts: session_log::now_millis(), store,
}) session_id,
head_hash,
LogEntry::CacheUnlocked {
ts: session_log::now_millis(),
},
)
.await .await
} }
@ -274,10 +319,15 @@ pub async fn save_config_changed(
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
config: &RequestConfig, config: &RequestConfig,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::ConfigChanged { append_entry(
ts: session_log::now_millis(), store,
config: config.clone(), session_id,
}) head_hash,
LogEntry::ConfigChanged {
ts: session_log::now_millis(),
config: config.clone(),
},
)
.await .await
} }

View File

@ -184,7 +184,9 @@ pub enum Outcome {
/// Worker yielded control to the caller for external processing. /// Worker yielded control to the caller for external processing.
/// Distinct from `Paused`: caller handles internally and resumes. /// Distinct from `Paused`: caller handles internally and resumes.
Yielded, Yielded,
Error { message: String }, Error {
message: String,
},
} }
/// State collected from log entries. /// State collected from log entries.
@ -409,7 +411,11 @@ mod tests {
}, },
LogEntry::AssistantItems { LogEntry::AssistantItems {
ts: 3000, ts: 3000,
items: vec![Item::tool_call("call_1", "get_weather", r#"{"city":"Tokyo"}"#)], items: vec![Item::tool_call(
"call_1",
"get_weather",
r#"{"city":"Tokyo"}"#,
)],
}, },
LogEntry::ToolResults { LogEntry::ToolResults {
ts: 3500, ts: 3500,

View File

@ -3,9 +3,9 @@
//! [`Store`] defines the async interface for reading and writing session logs. //! [`Store`] defines the async interface for reading and writing session logs.
//! Implementations handle the physical storage (filesystem, database, etc.). //! Implementations handle the physical storage (filesystem, database, etc.).
use crate::SessionId;
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::session_log::{EntryHash, HashedEntry}; use crate::session_log::{EntryHash, HashedEntry};
use crate::SessionId;
use std::future::Future; use std::future::Future;
/// Errors from the persistence store. /// Errors from the persistence store.
@ -43,9 +43,7 @@ pub trait Store: Send + Sync {
) -> impl Future<Output = Result<Vec<HashedEntry>, StoreError>> + Send; ) -> impl Future<Output = Result<Vec<HashedEntry>, StoreError>> + Send;
/// List all session IDs, most recent first. /// List all session IDs, most recent first.
fn list_sessions( fn list_sessions(&self) -> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send;
&self,
) -> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send;
/// Create a new session with initial entries. /// Create a new session with initial entries.
fn create_session( fn create_session(
@ -55,10 +53,7 @@ pub trait Store: Send + Sync {
) -> impl Future<Output = Result<(), StoreError>> + Send; ) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Check if a session exists. /// Check if a session exists.
fn exists( fn exists(&self, id: SessionId) -> impl Future<Output = Result<bool, StoreError>> + Send;
&self,
id: SessionId,
) -> impl Future<Output = Result<bool, StoreError>> + Send;
/// Read the hash of the last entry in a session (the head). /// Read the hash of the last entry in a session (the head).
/// ///

View File

@ -4,11 +4,11 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use common::MockLlmClient; use common::MockLlmClient;
use llm_worker::Worker;
use llm_worker::interceptor::{Interceptor, TurnEndAction}; use llm_worker::interceptor::{Interceptor, TurnEndAction};
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use llm_worker::Worker;
use session_store::{ use session_store::{
EntryHash, FsStore, LogEntry, Outcome, SessionStartState, Store, collect_state, EntryHash, FsStore, LogEntry, Outcome, SessionStartState, Store, collect_state,
}; };
@ -124,9 +124,15 @@ async fn run_and_persist(
message: e.to_string(), message: e.to_string(),
}, },
}; };
session_store::save_outcome(store, session_id, head_hash, outcome, worker.last_run_interrupted()) session_store::save_outcome(
.await store,
.unwrap(); session_id,
head_hash,
outcome,
worker.last_run_interrupted(),
)
.await
.unwrap();
let r = result.unwrap(); let r = result.unwrap();
(worker, r) (worker, r)
@ -245,7 +251,8 @@ async fn session_run_with_tool_call() {
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; let (_worker, _) =
run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await;
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).await.unwrap();

View File

@ -91,8 +91,7 @@ impl Tool for EditTool {
let occurrences = if params.replace_all { count } else { 1 }; let occurrences = if params.replace_all { count } else { 1 };
self.fs.write(&params.file_path, new_text.as_bytes())?; self.fs.write(&params.file_path, new_text.as_bytes())?;
self.tracker self.tracker.record(&params.file_path, new_text.as_bytes());
.record(&params.file_path, new_text.as_bytes());
let summary = format!( let summary = format!(
"Edited {} ({} replacement{})", "Edited {} ({} replacement{})",

View File

@ -6,9 +6,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use grep_regex::RegexMatcherBuilder; use grep_regex::RegexMatcherBuilder;
use grep_searcher::sinks::UTF8 as UTF8Sink; use grep_searcher::sinks::UTF8 as UTF8Sink;
use grep_searcher::{ use grep_searcher::{BinaryDetection, Searcher, SearcherBuilder, Sink, SinkContext, SinkMatch};
BinaryDetection, Searcher, SearcherBuilder, Sink, SinkContext, SinkMatch,
};
use ignore::WalkBuilder; use ignore::WalkBuilder;
use ignore::overrides::OverrideBuilder; use ignore::overrides::OverrideBuilder;
use ignore::types::TypesBuilder; use ignore::types::TypesBuilder;
@ -94,10 +92,9 @@ impl Tool for GrepTool {
); );
let default_base = self.fs.scope().root().to_path_buf(); let default_base = self.fs.scope().root().to_path_buf();
let report = let report = tokio::task::spawn_blocking(move || run_grep(default_base, params))
tokio::task::spawn_blocking(move || run_grep(default_base, params)) .await
.await .map_err(|e| ToolError::Internal(format!("spawn_blocking failed: {e}")))??;
.map_err(|e| ToolError::Internal(format!("spawn_blocking failed: {e}")))??;
Ok(report.render()) Ok(report.render())
} }
@ -212,12 +209,7 @@ impl GrepReport {
continue; continue;
} }
} }
body.push_str(&format!( body.push_str(&format!("{}{}{}\n", line.path.display(), sep, line.text));
"{}{}{}\n",
line.path.display(),
sep,
line.text
));
} }
let mut summary = format!( let mut summary = format!(
"{} matching line(s) in {} file(s)", "{} matching line(s) in {} file(s)",
@ -285,7 +277,8 @@ fn run_grep(default_base: PathBuf, p: GrepParams) -> Result<GrepReport, ToolsErr
} }
if let Some(g) = p.glob.as_deref() { if let Some(g) = p.glob.as_deref() {
let mut ob = OverrideBuilder::new(&base); let mut ob = OverrideBuilder::new(&base);
ob.add(g).map_err(|e| ToolsError::InvalidGlob(e.to_string()))?; ob.add(g)
.map_err(|e| ToolsError::InvalidGlob(e.to_string()))?;
let ov = ob let ov = ob
.build() .build()
.map_err(|e| ToolsError::InvalidGlob(e.to_string()))?; .map_err(|e| ToolsError::InvalidGlob(e.to_string()))?;
@ -414,11 +407,7 @@ struct ContentSink<'a> {
impl Sink for ContentSink<'_> { impl Sink for ContentSink<'_> {
type Error = std::io::Error; type Error = std::io::Error;
fn matched( fn matched(&mut self, _searcher: &Searcher, mat: &SinkMatch<'_>) -> Result<bool, Self::Error> {
&mut self,
_searcher: &Searcher,
mat: &SinkMatch<'_>,
) -> Result<bool, Self::Error> {
let idx = *self.matches_seen; let idx = *self.matches_seen;
*self.matches_seen += 1; *self.matches_seen += 1;
@ -589,10 +578,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn grep_multiline() { async fn grep_multiline() {
let (dir, fs) = setup(); let (dir, fs) = setup();
touch( touch(&dir.path().join("a.txt"), "start\nfoo\nbar\nend\n");
&dir.path().join("a.txt"),
"start\nfoo\nbar\nend\n",
);
let def = grep_tool(fs); let def = grep_tool(fs);
let (_, tool) = def(); let (_, tool) = def();

View File

@ -39,10 +39,7 @@ pub use write::write_tool;
/// All returned factories share the same tracker instance so that /// All returned factories share the same tracker instance so that
/// `Read` / `Write` / `Edit` see a consistent history across tool /// `Read` / `Write` / `Edit` see a consistent history across tool
/// invocations within a single session. /// invocations within a single session.
pub fn builtin_tools( pub fn builtin_tools(fs: ScopedFs, tracker: Tracker) -> Vec<llm_worker::tool::ToolDefinition> {
fs: ScopedFs,
tracker: Tracker,
) -> Vec<llm_worker::tool::ToolDefinition> {
vec![ vec![
read_tool(fs.clone(), tracker.clone()), read_tool(fs.clone(), tracker.clone()),
write_tool(fs.clone(), tracker.clone()), write_tool(fs.clone(), tracker.clone()),

View File

@ -103,10 +103,7 @@ impl ScopedFs {
let existed = path.exists(); let existed = path.exists();
let parent = path.parent().ok_or_else(|| { let parent = path.parent().ok_or_else(|| {
ToolsError::InvalidArgument(format!( ToolsError::InvalidArgument(format!("path has no parent directory: {}", path.display()))
"path has no parent directory: {}",
path.display()
))
})?; })?;
if !parent.as_os_str().is_empty() && !parent.exists() { if !parent.as_os_str().is_empty() && !parent.exists() {
std::fs::create_dir_all(parent).map_err(|e| ToolsError::io(parent, e))?; std::fs::create_dir_all(parent).map_err(|e| ToolsError::io(parent, e))?;
@ -119,7 +116,8 @@ impl ScopedFs {
}; };
let mut tmp = tempfile::NamedTempFile::new_in(tmp_parent) let mut tmp = tempfile::NamedTempFile::new_in(tmp_parent)
.map_err(|e| ToolsError::io(tmp_parent, e))?; .map_err(|e| ToolsError::io(tmp_parent, e))?;
tmp.write_all(content).map_err(|e| ToolsError::io(path, e))?; tmp.write_all(content)
.map_err(|e| ToolsError::io(path, e))?;
tmp.as_file() tmp.as_file()
.sync_all() .sync_all()
.map_err(|e| ToolsError::io(path, e))?; .map_err(|e| ToolsError::io(path, e))?;

View File

@ -48,7 +48,9 @@ impl Tool for WriteTool {
self.tracker.verify(&params.file_path, &current)?; self.tracker.verify(&params.file_path, &current)?;
} }
let outcome = self.fs.write(&params.file_path, params.content.as_bytes())?; let outcome = self
.fs
.write(&params.file_path, params.content.as_bytes())?;
// Refresh the history entry to reflect the newly-written content, // Refresh the history entry to reflect the newly-written content,
// so a subsequent Edit / Write can proceed without a re-read. // so a subsequent Edit / Write can proceed without a re-read.
@ -57,7 +59,11 @@ impl Tool for WriteTool {
let summary = format!( let summary = format!(
"{} {} ({} bytes)", "{} {} ({} bytes)",
if outcome.created { "Created" } else { "Overwrote" }, if outcome.created {
"Created"
} else {
"Overwrote"
},
params.file_path.display(), params.file_path.display(),
outcome.bytes_written outcome.bytes_written
); );

View File

@ -6,7 +6,7 @@ use llm_worker::tool::{Tool, ToolDefinition};
use manifest::Scope; use manifest::Scope;
use serde_json::json; use serde_json::json;
use tempfile::TempDir; use tempfile::TempDir;
use tools::{Tracker, ScopedFs, builtin_tools}; use tools::{ScopedFs, Tracker, builtin_tools};
struct Registry { struct Registry {
entries: Vec<(llm_worker::tool::ToolMeta, Arc<dyn Tool>)>, entries: Vec<(llm_worker::tool::ToolMeta, Arc<dyn Tool>)>,
@ -54,10 +54,7 @@ async fn unicode_path_and_content() {
let read = reg.get("Read"); let read = reg.get("Read");
let out = read let out = read
.execute( .execute(&json!({ "file_path": file.to_str().unwrap() }).to_string())
&json!({ "file_path": file.to_str().unwrap() })
.to_string(),
)
.await .await
.unwrap(); .unwrap();
let body = out.content.unwrap(); let body = out.content.unwrap();
@ -81,11 +78,9 @@ async fn symlink_to_outside_scope_is_rejected_for_write() {
// Read tool must work against the symlink (read is unrestricted). // Read tool must work against the symlink (read is unrestricted).
let read = reg.get("Read"); let read = reg.get("Read");
read.execute( read.execute(&json!({ "file_path": link.to_str().unwrap() }).to_string())
&json!({ "file_path": link.to_str().unwrap() }).to_string(), .await
) .unwrap();
.await
.unwrap();
// Write through the symlink must be rejected because canonicalization // Write through the symlink must be rejected because canonicalization
// resolves it to outside the scope. // resolves it to outside the scope.

View File

@ -11,7 +11,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolMeta};
use manifest::Scope; use manifest::Scope;
use serde_json::json; use serde_json::json;
use tempfile::TempDir; use tempfile::TempDir;
use tools::{Tracker, ScopedFs, builtin_tools}; use tools::{ScopedFs, Tracker, builtin_tools};
struct Registry { struct Registry {
entries: Vec<(ToolMeta, Arc<dyn Tool>)>, entries: Vec<(ToolMeta, Arc<dyn Tool>)>,
@ -50,10 +50,7 @@ async fn call(tool: &Arc<dyn Tool>, input: serde_json::Value) -> llm_worker::too
.expect("tool execution failed") .expect("tool execution failed")
} }
async fn call_err( async fn call_err(tool: &Arc<dyn Tool>, input: serde_json::Value) -> llm_worker::tool::ToolError {
tool: &Arc<dyn Tool>,
input: serde_json::Value,
) -> llm_worker::tool::ToolError {
tool.execute(&input.to_string()) tool.execute(&input.to_string())
.await .await
.expect_err("expected error") .expect_err("expected error")
@ -71,7 +68,11 @@ fn builtin_tools_registers_all_five() {
fn meta_has_description_and_schema() { fn meta_has_description_and_schema() {
let (_dir, reg) = setup(); let (_dir, reg) = setup();
for (meta, _) in &reg.entries { for (meta, _) in &reg.entries {
assert!(!meta.description.is_empty(), "{} missing description", meta.name); assert!(
!meta.description.is_empty(),
"{} missing description",
meta.name
);
// Input schema must be a JSON object // Input schema must be a JSON object
assert!( assert!(
meta.input_schema.is_object(), meta.input_schema.is_object(),
@ -283,7 +284,11 @@ async fn tracker_recent_files_tracks_read_write_edit() {
std::fs::write(&a, "one\n").unwrap(); std::fs::write(&a, "one\n").unwrap();
// Read `a` — should appear in recency. // Read `a` — should appear in recency.
call(&reg.get("Read"), json!({ "file_path": a.to_str().unwrap() })).await; call(
&reg.get("Read"),
json!({ "file_path": a.to_str().unwrap() }),
)
.await;
// Write `b` (new file) — should appear ahead of `a`. // Write `b` (new file) — should appear ahead of `a`.
call( call(
&reg.get("Write"), &reg.get("Write"),
@ -303,8 +308,14 @@ async fn tracker_recent_files_tracks_read_write_edit() {
let recent = tracker.recent_files(10); let recent = tracker.recent_files(10);
assert_eq!(recent.len(), 2); assert_eq!(recent.len(), 2);
assert!(recent[0].ends_with("a.txt"), "front should be a.txt: {recent:?}"); assert!(
assert!(recent[1].ends_with("b.txt"), "second should be b.txt: {recent:?}"); recent[0].ends_with("a.txt"),
"front should be a.txt: {recent:?}"
);
assert!(
recent[1].ends_with("b.txt"),
"second should be b.txt: {recent:?}"
);
} }
// Sanity: unused Path import guard // Sanity: unused Path import guard

View File

@ -102,8 +102,10 @@ impl App {
} }
Event::ToolCallStart { name, .. } => { Event::ToolCallStart { name, .. } => {
self.current_tool = Some(name.clone()); self.current_tool = Some(name.clone());
self.output_queue self.output_queue.push(OutputItem::Padded(
.push(OutputItem::Padded(MessageKind::Tool, format!("[tool] {name}"))); MessageKind::Tool,
format!("[tool] {name}"),
));
} }
Event::ToolCallDone { Event::ToolCallDone {
name, arguments, .. name, arguments, ..
@ -117,7 +119,11 @@ impl App {
Event::ToolResult { Event::ToolResult {
output, is_error, .. output, is_error, ..
} => { } => {
let prefix = if is_error { "[tool error]" } else { "[tool result]" }; let prefix = if is_error {
"[tool error]"
} else {
"[tool result]"
};
let display = if output.len() > 200 { let display = if output.len() > 200 {
format!("{}...", &output[..200]) format!("{}...", &output[..200])
} else { } else {
@ -242,10 +248,8 @@ impl App {
"user" => { "user" => {
self.turn_index += 1; self.turn_index += 1;
self.output_queue.push(OutputItem::Blank); self.output_queue.push(OutputItem::Blank);
self.output_queue.push(OutputItem::TurnHeader(format!( self.output_queue
"#{}", .push(OutputItem::TurnHeader(format!("#{}", self.turn_index)));
self.turn_index
)));
MessageKind::User MessageKind::User
} }
"assistant" => MessageKind::Assistant, "assistant" => MessageKind::Assistant,
@ -265,8 +269,10 @@ impl App {
} }
"tool_call" => { "tool_call" => {
let name = item["name"].as_str().unwrap_or("?"); let name = item["name"].as_str().unwrap_or("?");
self.output_queue self.output_queue.push(OutputItem::Padded(
.push(OutputItem::Padded(MessageKind::Tool, format!("[tool] {name}"))); MessageKind::Tool,
format!("[tool] {name}"),
));
} }
"tool_result" => { "tool_result" => {
let output = item["output"].as_str().unwrap_or(""); let output = item["output"].as_str().unwrap_or("");

View File

@ -19,7 +19,10 @@ fn resolve_socket(pod_name: &str, override_path: Option<PathBuf>) -> PathBuf {
return p; return p;
} }
if let Ok(rd) = std::env::var("XDG_RUNTIME_DIR") { if let Ok(rd) = std::env::var("XDG_RUNTIME_DIR") {
PathBuf::from(rd).join("insomnia").join(pod_name).join("sock") PathBuf::from(rd)
.join("insomnia")
.join(pod_name)
.join("sock")
} else if let Ok(home) = std::env::var("HOME") { } else if let Ok(home) = std::env::var("HOME") {
PathBuf::from(home) PathBuf::from(home)
.join(".insomnia") .join(".insomnia")
@ -163,12 +166,8 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
app.quit = true; app.quit = true;
None None
} }
KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => { KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Resume),
Some(Method::Resume) KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Cancel),
}
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => {
Some(Method::Cancel)
}
KeyCode::Enter => app.submit_input(), KeyCode::Enter => app.submit_input(),
KeyCode::Backspace => { KeyCode::Backspace => {
app.delete_char_before(); app.delete_char_before();

View File

@ -1,10 +1,10 @@
use ratatui::Frame;
use ratatui::layout::{Alignment, Constraint, Layout, Position, Rect}; use ratatui::layout::{Alignment, Constraint, Layout, Position, Rect};
use ratatui::style::{Color, Modifier, Style}; use ratatui::style::{Color, Modifier, Style};
use ratatui::text::{Line, Span}; use ratatui::text::{Line, Span};
use ratatui::widgets::{Block, Padding, Paragraph, Wrap}; use ratatui::widgets::{Block, Padding, Paragraph, Wrap};
use ratatui::Frame;
use crate::app::{fmt_tokens, App, MessageKind, OutputItem}; use crate::app::{App, MessageKind, OutputItem, fmt_tokens};
/// Draw the fixed viewport (3 lines: separator, status, input). /// Draw the fixed viewport (3 lines: separator, status, input).
pub fn draw(frame: &mut Frame, app: &App) { pub fn draw(frame: &mut Frame, app: &App) {
@ -44,8 +44,7 @@ pub fn flush_output(
OutputItem::TurnHeader(text) => { OutputItem::TurnHeader(text) => {
terminal.insert_before(1, |buf| { terminal.insert_before(1, |buf| {
let style = kind_style(&MessageKind::TurnHeader); let style = kind_style(&MessageKind::TurnHeader);
Paragraph::new(Line::from(Span::styled(text, style))) Paragraph::new(Line::from(Span::styled(text, style))).render(buf.area, buf);
.render(buf.area, buf);
})?; })?;
} }
OutputItem::Padded(kind, text) => { OutputItem::Padded(kind, text) => {
@ -119,10 +118,7 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
let mut spans = vec![ let mut spans = vec![
conn, conn,
Span::raw(" "), Span::raw(" "),
Span::styled( Span::styled(&app.pod_name, Style::default().add_modifier(Modifier::BOLD)),
&app.pod_name,
Style::default().add_modifier(Modifier::BOLD),
),
]; ];
if app.running { if app.running {