use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; use futures::StreamExt; use tokio::sync::mpsc; use tracing::{debug, info, trace, warn}; use crate::{ Item, llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition}, interceptor::{ DefaultInterceptor, Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, }, state::{Locked, Mutable, WorkerState}, callback::{ ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope, ToolUseBlockScope, }, handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind}, timeline::{TextBlockCollector, Timeline, ToolCallCollector}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, tool::{ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputProcessor, ToolResult}, tool_server::{ToolServer, ToolServerHandle}, }; /// Worker errors #[derive(Debug, thiserror::Error)] pub enum WorkerError { /// Client error #[error("Client error: {0}")] Client(#[from] ClientError), /// Tool error #[error("Tool error: {0}")] Tool(#[from] ToolError), /// Execution was aborted #[error("Aborted: {0}")] Aborted(String), /// Cancelled by CancellationToken #[error("Cancelled")] Cancelled, /// Config warnings (unsupported options) #[error("Config warnings: {}", .0.iter().map(|w| w.to_string()).collect::>().join(", "))] ConfigWarnings(Vec), } /// Tool registration error #[derive(Debug, thiserror::Error)] pub enum ToolRegistryError { /// A tool with the same name is already registered #[error("Tool with name '{0}' already registered")] DuplicateName(String), } /// Worker configuration #[derive(Debug, Clone, Default)] pub struct WorkerConfig { // Reserved for future extensions (currently empty) _private: (), } /// Worker execution result (status) #[derive(Debug)] pub enum WorkerResult { /// Completed (waiting for user input) Finished, /// Paused (can be resumed) Paused, /// Turn limit reached (max_turns exceeded) LimitReached, } /// Internal: tool execution result enum ToolExecutionResult { Completed(Vec), Paused, } /// Central component for managing LLM interactions /// /// Receives input from the user, sends requests to the LLM, and /// automatically executes tool calls if any, advancing the turn. /// /// # State Transitions (Type-state) /// /// - [`Mutable`]: Initial state. System prompt, history, and tools can be freely edited. /// - [`Locked`]: Cache-protected state. Prefix context is immutable; only `run()` / `resume()` are available. /// /// Calling `run()` on a `Mutable` Worker consumes it and returns a /// `Locked` Worker together with the result. This ensures the /// cache prefix is fixed for optimal KV cache hit rate. /// /// ```ignore /// let mut worker = Worker::new(client) /// .system_prompt("You are a helpful assistant."); /// worker.register_tool(my_tool); /// /// // Mutable::run() consumes self → Locked /// let (mut worker, _result) = worker.run("Hello").await?; /// /// // Locked::run() borrows &mut self /// worker.run("Follow-up").await?; /// /// // To edit between turns, unlock back to Mutable /// let mut worker = worker.unlock(); /// worker.history_mut().truncate(5); /// let (mut worker, _result) = worker.run("Continue").await?; /// ``` pub struct Worker { /// LLM client client: C, /// Event timeline timeline: Timeline, /// Text block collector (Timeline handler) text_block_collector: TextBlockCollector, /// Tool call collector (Timeline handler) tool_call_collector: ToolCallCollector, /// Tool server handle tool_server: ToolServerHandle, /// Interceptor for control-flow decisions interceptor: Box, /// System prompt system_prompt: Option, /// Item history (owned by Worker) history: Vec, /// History length at lock time (only meaningful in Locked state) locked_prefix_len: usize, /// Turn count turn_count: usize, /// Maximum number of turns (None = unlimited) max_turns: Option, /// Turn-start callbacks turn_start_cbs: Vec>, /// Turn-end callbacks turn_end_cbs: Vec>, /// Request configuration (max_tokens, temperature, etc.) request_config: RequestConfig, /// Whether the previous run was interrupted last_run_interrupted: bool, /// Optional processor for large tool outputs (stores externally, returns summary) output_processor: Option>, /// Cancel notification channel (for interrupting execution) cancel_tx: mpsc::Sender<()>, cancel_rx: mpsc::Receiver<()>, /// State marker _state: PhantomData, } impl Worker { fn reset_interruption_state(&mut self) { self.last_run_interrupted = false; } fn drain_cancel_queue(&mut self) { use tokio::sync::mpsc::error::TryRecvError; loop { match self.cancel_rx.try_recv() { Ok(()) => continue, Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break, } } } fn try_cancelled(&mut self) -> bool { use tokio::sync::mpsc::error::TryRecvError; match self.cancel_rx.try_recv() { Ok(()) => true, Err(TryRecvError::Empty) => false, Err(TryRecvError::Disconnected) => true, } } /// Register a text block observer with scoped callbacks. /// /// The setup closure is called once per text block. Inside it, register /// `on_delta` and/or `on_stop` callbacks on the provided scope. /// /// # Examples /// /// ```ignore /// worker.on_text_block(|block| { /// block.on_delta(|text| print!("{}", text)); /// block.on_stop(|full_text| println!("\n--- {} chars ---", full_text.len())); /// }); /// ``` pub fn on_text_block( &mut self, setup: impl FnMut(&mut TextBlockScope) + Send + Sync + 'static, ) { self.timeline .on_text_block(ClosureTextBlockHandler { setup: Box::new(setup), }); } /// Register a tool use block observer with scoped callbacks. /// /// The setup closure receives `&ToolUseBlockStart` (containing `id` and `name`) /// and a scope for registering `on_delta` and `on_stop` callbacks. /// /// `on_stop` receives a fully assembled `&ToolCall` with parsed JSON input. /// /// # Examples /// /// ```ignore /// worker.on_tool_use_block(|start, block| { /// println!("Tool: {} ({})", start.name, start.id); /// block.on_delta(|json| { /* streaming JSON fragment */ }); /// block.on_stop(|call| println!("Done: {}", call.name)); /// }); /// ``` pub fn on_tool_use_block( &mut self, setup: impl FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync + 'static, ) { self.timeline .on_tool_use_block(ClosureToolUseBlockHandler { setup: Box::new(setup), }); } /// Register a usage event callback. pub fn on_usage( &mut self, callback: impl FnMut(&UsageEvent) + Send + Sync + 'static, ) { self.timeline.on_usage(ClosureMetaHandler { callback, _kind: PhantomData::, }); } /// Register a status event callback. pub fn on_status( &mut self, callback: impl FnMut(&StatusEvent) + Send + Sync + 'static, ) { self.timeline.on_status(ClosureMetaHandler { callback, _kind: PhantomData::, }); } /// Register an error event callback. pub fn on_error( &mut self, callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static, ) { self.timeline.on_error(ClosureMetaHandler { callback, _kind: PhantomData::, }); } /// Register a turn-start callback (receives 0-based turn number). pub fn on_turn_start( &mut self, callback: impl Fn(usize) + Send + Sync + 'static, ) { self.turn_start_cbs.push(Box::new(callback)); } /// Register a turn-end callback (receives 0-based turn number). pub fn on_turn_end( &mut self, callback: impl Fn(usize) + Send + Sync + 'static, ) { self.turn_end_cbs.push(Box::new(callback)); } /// Get a shared tool server handle. pub fn tool_server_handle(&self) -> ToolServerHandle { self.tool_server.clone() } /// Set the interceptor for control-flow decisions. /// /// The interceptor governs approval, skip, pause, and abort decisions /// at key points in the execution loop. If not set, the default /// interceptor is used (all Continue / Finish). pub fn set_interceptor(&mut self, interceptor: impl Interceptor + 'static) { self.interceptor = Box::new(interceptor); } /// Get a mutable reference to the timeline (for additional handler registration) pub fn timeline_mut(&mut self) -> &mut Timeline { &mut self.timeline } /// Get a reference to the history pub fn history(&self) -> &[Item] { &self.history } /// Get a reference to the system prompt pub fn get_system_prompt(&self) -> Option<&str> { self.system_prompt.as_deref() } /// Get the current turn count pub fn turn_count(&self) -> usize { self.turn_count } /// Get a reference to the current request configuration pub fn request_config(&self) -> &RequestConfig { &self.request_config } /// Set maximum tokens /// /// This setting is independent of cache lock and applies to each request. /// /// # Examples /// /// ```ignore /// worker.set_max_tokens(4096); /// ``` pub fn set_max_tokens(&mut self, max_tokens: u32) { self.request_config.max_tokens = Some(max_tokens); } /// Set temperature /// /// Set in the range of 0.0 to 1.0 (or 2.0). /// Lower values produce more deterministic output, higher values produce more diverse output. /// /// # Examples /// /// ```ignore /// worker.set_temperature(0.7); /// ``` pub fn set_temperature(&mut self, temperature: f32) { self.request_config.temperature = Some(temperature); } /// Set top_p (nucleus sampling) /// /// # Examples /// /// ```ignore /// worker.set_top_p(0.9); /// ``` pub fn set_top_p(&mut self, top_p: f32) { self.request_config.top_p = Some(top_p); } /// Set top_k /// /// Specifies the top k tokens to consider when selecting tokens. /// /// # Examples /// /// ```ignore /// worker.set_top_k(40); /// ``` pub fn set_top_k(&mut self, top_k: u32) { self.request_config.top_k = Some(top_k); } /// Add a stop sequence /// /// # Examples /// /// ```ignore /// worker.add_stop_sequence("\n\n"); /// ``` pub fn add_stop_sequence(&mut self, sequence: impl Into) { self.request_config.stop_sequences.push(sequence.into()); } /// Clear stop sequences pub fn clear_stop_sequences(&mut self) { self.request_config.stop_sequences.clear(); } /// Get the cancel notification sender pub fn cancel_sender(&self) -> mpsc::Sender<()> { self.cancel_tx.clone() } /// Set request configuration at once pub fn set_request_config(&mut self, config: RequestConfig) { self.request_config = config; } /// Cancel execution /// /// Interrupts currently running streaming or tool execution. /// WorkerError::Cancelled is returned at the next event loop checkpoint. /// /// # Examples /// /// ```ignore /// use std::sync::Arc; /// let worker = Arc::new(Mutex::new(Worker::new(client))); /// /// // Run in another thread /// let worker_clone = worker.clone(); /// tokio::spawn(async move { /// let mut w = worker_clone.lock().unwrap(); /// w.run("Long task...").await /// }); /// /// // Cancel /// worker.lock().unwrap().cancel(); /// ``` pub fn cancel(&self) { let _ = self.cancel_tx.try_send(()); } /// Check if cancelled pub fn is_cancelled(&mut self) -> bool { self.try_cancelled() } /// Whether the previous run was interrupted pub fn last_run_interrupted(&self) -> bool { self.last_run_interrupted } /// Generate list of ToolDefinitions for LLM from registered tools fn build_tool_definitions(&self) -> Vec { self.tool_server.tool_definitions_sorted() } /// Build assistant response items from text blocks and tool calls fn build_assistant_items(&self, text_blocks: &[String], tool_calls: &[ToolCall]) -> Vec { let mut items = Vec::new(); // Add text as assistant message if present let text = text_blocks.join(""); if !text.is_empty() { items.push(Item::assistant_message(text)); } // Add tool calls as ToolCall items for call in tool_calls { items.push(Item::tool_call_json( &call.id, &call.name, call.input.clone(), )); } items } /// Build a request fn build_request(&self, tool_definitions: &[ToolDefinition], context: &[Item]) -> Request { let mut request = Request::new(); // Set system prompt if let Some(ref system) = self.system_prompt { request = request.system(system); } // Add items directly (Request now uses Items natively) request = request.items(context.iter().cloned()); // Add tool definitions for tool_def in tool_definitions { request = request.tool(tool_def.clone()); } // Apply request configuration request = request.config(self.request_config.clone()); request } /// Hooks: on_prompt_submit /// async fn finalize_interruption( &mut self, result: Result, ) -> Result { match result { Ok(value) => Ok(value), Err(err) => { self.last_run_interrupted = true; let reason = match &err { WorkerError::Aborted(reason) => reason.clone(), WorkerError::Cancelled => "Cancelled".to_string(), _ => err.to_string(), }; self.interceptor.on_abort(&reason).await; Err(err) } } } /// Check for pending tool calls (for resuming from Pause) fn get_pending_tool_calls(&self) -> Option> { // Find the last ToolCall items that don't have corresponding ToolResult let mut pending_calls = Vec::new(); let mut answered_call_ids = std::collections::HashSet::new(); // First pass: collect all answered call IDs for item in &self.history { if let Item::ToolResult { call_id, .. } = item { answered_call_ids.insert(call_id.clone()); } } // Second pass: find unanswered tool calls for item in &self.history { if let Item::ToolCall { call_id, name, arguments, .. } = item { if !answered_call_ids.contains(call_id) { let input = serde_json::from_str(arguments) .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new())); pending_calls.push(ToolCall { id: call_id.clone(), name: name.clone(), input, }); } } } if pending_calls.is_empty() { None } else { Some(pending_calls) } } /// Execute tools in parallel /// /// After running pre_tool_call hooks on all tools, /// executes approved tools in parallel and applies post_tool_call hooks to results. async fn execute_tools( &mut self, tool_calls: Vec, ) -> Result { use futures::future::join_all; // Map from tool call ID to (ToolCall, Meta, Tool) // Retained because it's needed for PostToolCall hooks let mut call_info_map = HashMap::new(); // Phase 1: Apply pre_tool_call interceptor (determine skip/abort) let mut approved_calls = Vec::new(); for mut tool_call in tool_calls { if let Some((meta, tool)) = self.tool_server.get_tool(&tool_call.name) { let mut info = ToolCallInfo { call: tool_call.clone(), meta, tool, }; match self.interceptor.pre_tool_call(&mut info).await { PreToolAction::Continue => {} PreToolAction::Skip => { continue; } PreToolAction::Abort(reason) => { self.last_run_interrupted = true; return Err(WorkerError::Aborted(reason)); } PreToolAction::Pause => { self.last_run_interrupted = true; return Ok(ToolExecutionResult::Paused); } } // Reflect changes made by interceptor tool_call = info.call; call_info_map.insert( tool_call.id.clone(), (tool_call.clone(), info.meta.clone(), info.tool.clone()), ); approved_calls.push(tool_call); } else { // Unknown tools go into approved list as-is (will error at execution) approved_calls.push(tool_call); } } // Phase 2: Execute approved tools in parallel (cancellable) let futures: Vec<_> = approved_calls .into_iter() .map(|tool_call| { let tool_server = self.tool_server.clone(); async move { let input_json = serde_json::to_string(&tool_call.input).unwrap_or_default(); match tool_server.call_tool(&tool_call.name, &input_json).await { Ok(content) => ToolResult::success(&tool_call.id, content), Err(e) => ToolResult::error(&tool_call.id, e.to_string()), } } }) .collect(); // Make tool execution cancellable let mut results = tokio::select! { results = join_all(futures) => results, cancel = self.cancel_rx.recv() => { if cancel.is_some() { info!("Tool execution cancelled"); } self.timeline.abort_current_block(); self.last_run_interrupted = true; return Err(WorkerError::Cancelled); } }; // Phase 2.5: Apply output processor (store large results externally) if let Some(ref processor) = self.output_processor { for tool_result in &mut results { if !tool_result.is_error { match processor.process(tool_result.content.clone()).await { Ok(processed) => tool_result.content = processed, Err(e) => { warn!(error = %e, "Output processor failed, keeping original content"); } } } } } // Phase 3: Apply post_tool_call interceptor for tool_result in &mut results { if let Some((tool_call, meta, tool)) = call_info_map.get(&tool_result.tool_use_id) { let mut info = ToolResultInfo { call: tool_call.clone(), result: tool_result.clone(), meta: meta.clone(), tool: tool.clone(), }; match self.interceptor.post_tool_call(&mut info).await { PostToolAction::Continue => {} PostToolAction::Abort(reason) => { self.last_run_interrupted = true; return Err(WorkerError::Aborted(reason)); } } // Reflect interceptor-modified results *tool_result = info.result; } } Ok(ToolExecutionResult::Completed(results)) } /// Internal turn execution logic async fn run_turn_loop(&mut self) -> Result { self.reset_interruption_state(); self.drain_cancel_queue(); let tool_definitions = self.build_tool_definitions(); info!( item_count = self.history.len(), tool_count = tool_definitions.len(), "Starting worker run" ); // Resume check: Pending tool calls if let Some(tool_calls) = self.get_pending_tool_calls() { info!("Resuming pending tool calls"); match self.execute_tools(tool_calls).await { Ok(ToolExecutionResult::Paused) => { self.last_run_interrupted = true; return Ok(WorkerResult::Paused); } Ok(ToolExecutionResult::Completed(results)) => { for result in results { self.history.push(Item::tool_result( &result.tool_use_id, &result.content, )); } // Continue to loop } Err(err) => { self.last_run_interrupted = true; return Err(err); } } } loop { // Check for cancellation if self.try_cancelled() { info!("Execution cancelled"); self.timeline.abort_current_block(); self.last_run_interrupted = true; return Err(WorkerError::Cancelled); } // Notify turn start let current_turn = self.turn_count; debug!(turn = current_turn, "Turn start"); for cb in &self.turn_start_cbs { cb(current_turn); } // Interceptor: pre_llm_request let mut request_context = self.history.clone(); match self.interceptor.pre_llm_request(&mut request_context).await { PreRequestAction::Cancel(reason) => { info!(reason = %reason, "Aborted by interceptor"); for cb in &self.turn_end_cbs { cb(current_turn); } self.last_run_interrupted = true; return Err(WorkerError::Aborted(reason)); } PreRequestAction::Continue => {} } // Build request let request = self.build_request(&tool_definitions, &request_context); debug!( item_count = request.items.len(), tool_count = request.tools.len(), has_system = request.system_prompt.is_some(), "Sending request to LLM" ); // Stream processing debug!("Starting stream..."); let mut event_count = 0; // Get stream (cancellable) let mut stream = tokio::select! { stream_result = self.client.stream(request) => stream_result .inspect_err(|_| self.last_run_interrupted = true)?, cancel = self.cancel_rx.recv() => { if cancel.is_some() { info!("Cancelled before stream started"); } self.timeline.abort_current_block(); self.last_run_interrupted = true; return Err(WorkerError::Cancelled); } }; loop { tokio::select! { // Receive event from stream event_result = stream.next() => { match event_result { Some(result) => { match &result { Ok(event) => { trace!(event = ?event, "Received event"); event_count += 1; } Err(e) => { warn!(error = %e, "Stream error"); } } let event = result .inspect_err(|_| self.last_run_interrupted = true)?; self.timeline.dispatch(&event); } None => break, // Stream ended } } // Wait for cancellation cancel = self.cancel_rx.recv() => { if cancel.is_some() { info!("Stream cancelled"); } self.timeline.abort_current_block(); self.last_run_interrupted = true; return Err(WorkerError::Cancelled); } } } debug!(event_count = event_count, "Stream completed"); // Notify turn end for cb in &self.turn_end_cbs { cb(current_turn); } self.turn_count += 1; // Get collected results let text_blocks = self.text_block_collector.take_collected(); let tool_calls = self.tool_call_collector.take_collected(); // Add assistant items to history let assistant_items = self.build_assistant_items(&text_blocks, &tool_calls); self.history.extend(assistant_items); if tool_calls.is_empty() { // No tool calls → determine turn end via interceptor match self.interceptor.on_turn_end(&self.history).await { TurnEndAction::Finish => { self.last_run_interrupted = false; return Ok(WorkerResult::Finished); } TurnEndAction::ContinueWithMessages(additional) => { self.history.extend(additional); continue; } TurnEndAction::Pause => { self.last_run_interrupted = true; return Ok(WorkerResult::Paused); } } } // Execute tools match self.execute_tools(tool_calls).await { Ok(ToolExecutionResult::Paused) => { self.last_run_interrupted = true; return Ok(WorkerResult::Paused); } Ok(ToolExecutionResult::Completed(results)) => { for result in results { self.history.push(Item::tool_result( &result.tool_use_id, &result.content, )); } } Err(err) => { self.last_run_interrupted = true; return Err(err); } } // Check turn limit (after assistant items and tool results are in history) if let Some(max) = self.max_turns { if self.turn_count >= max as usize { info!(turn_count = self.turn_count, max_turns = max, "Turn limit reached"); self.last_run_interrupted = false; return Ok(WorkerResult::LimitReached); } } } } } impl Worker { /// Create a new Worker (in Mutable state) pub fn new(client: C) -> Self { let text_block_collector = TextBlockCollector::new(); let tool_call_collector = ToolCallCollector::new(); let mut timeline = Timeline::new(); let (cancel_tx, cancel_rx) = mpsc::channel(1); // Register collectors with Timeline timeline.on_text_block(text_block_collector.clone()); timeline.on_tool_use_block(tool_call_collector.clone()); Self { client, timeline, text_block_collector, tool_call_collector, tool_server: ToolServer::new().handle(), interceptor: Box::new(DefaultInterceptor), system_prompt: None, history: Vec::new(), locked_prefix_len: 0, turn_count: 0, max_turns: None, turn_start_cbs: Vec::new(), turn_end_cbs: Vec::new(), request_config: RequestConfig::default(), last_run_interrupted: false, output_processor: None, cancel_tx, cancel_rx, _state: PhantomData, } } /// Register a tool factory for deferred initialization. /// /// The factory is queued and executed at the next `run()` or `resume()` call. /// Duplicate name detection occurs at that point and surfaces as /// [`WorkerError::ToolRegistry`]. pub fn register_tool(&mut self, factory: WorkerToolDefinition) { self.tool_server.register_tool(factory); } /// Register multiple tool factories for deferred initialization. pub fn register_tools( &mut self, factories: impl IntoIterator, ) { self.tool_server.register_tools(factories); } /// Set system prompt (builder pattern) pub fn system_prompt(mut self, prompt: impl Into) -> Self { self.system_prompt = Some(prompt.into()); self } /// Set system prompt (mutable reference version) pub fn set_system_prompt(&mut self, prompt: impl Into) { self.system_prompt = Some(prompt.into()); } /// Set maximum tokens (builder pattern) /// /// # Examples /// /// ```ignore /// let worker = Worker::new(client) /// .system_prompt("You are a helpful assistant.") /// .max_tokens(4096); /// ``` pub fn max_tokens(mut self, max_tokens: u32) -> Self { self.request_config.max_tokens = Some(max_tokens); self } /// Set temperature (builder pattern) /// /// # Examples /// /// ```ignore /// let worker = Worker::new(client) /// .temperature(0.7); /// ``` pub fn temperature(mut self, temperature: f32) -> Self { self.request_config.temperature = Some(temperature); self } /// Set top_p (builder pattern) pub fn top_p(mut self, top_p: f32) -> Self { self.request_config.top_p = Some(top_p); self } /// Set top_k (builder pattern) pub fn top_k(mut self, top_k: u32) -> Self { self.request_config.top_k = Some(top_k); self } /// Add stop sequence (builder pattern) pub fn stop_sequence(mut self, sequence: impl Into) -> Self { self.request_config.stop_sequences.push(sequence.into()); self } /// Set request configuration at once (builder pattern) /// /// # Examples /// /// ```ignore /// let config = RequestConfig::new() /// .with_max_tokens(4096) /// .with_temperature(0.7); /// /// let worker = Worker::new(client) /// .system_prompt("...") /// .with_config(config); /// ``` pub fn with_config(mut self, config: RequestConfig) -> Self { self.request_config = config; self } /// Validate current configuration against the provider /// /// Returns an error if there are unsupported settings. /// Call at the end of the chain to detect configuration issues early. /// /// # Examples /// /// ```ignore /// let worker = Worker::new(client) /// .temperature(0.7) /// .top_k(40) /// .validate()?; // Error if using OpenAI since top_k is not supported /// ``` /// /// # Returns /// * `Ok(Self)` - Validation successful /// * `Err(WorkerError::ConfigWarnings)` - Has unsupported settings pub fn validate(self) -> Result { let warnings = self.client.validate_config(&self.request_config); if warnings.is_empty() { Ok(self) } else { Err(WorkerError::ConfigWarnings(warnings)) } } /// Get a mutable reference to history /// /// Available only in Mutable state. pub fn history_mut(&mut self) -> &mut Vec { &mut self.history } /// Set history pub fn set_history(&mut self, items: Vec) { self.history = items; } /// Add an item to history (builder pattern) pub fn with_item(mut self, item: Item) -> Self { self.history.push(item); self } /// Add an item to history pub fn push_item(&mut self, item: Item) { self.history.push(item); } /// Add multiple items to history (builder pattern) pub fn with_items(mut self, items: impl IntoIterator) -> Self { self.history.extend(items); self } /// Add multiple items to history pub fn extend_history(&mut self, items: impl IntoIterator) { self.history.extend(items); } /// Clear history pub fn clear_history(&mut self) { self.history.clear(); } /// Set the turn count (for session restoration) pub fn set_turn_count(&mut self, count: usize) { self.turn_count = count; } /// Set the maximum number of turns. None means unlimited. pub fn set_max_turns(&mut self, max_turns: Option) { self.max_turns = max_turns; } /// Set the last_run_interrupted flag (for session restoration) pub fn set_last_run_interrupted(&mut self, interrupted: bool) { self.last_run_interrupted = interrupted; } /// Set a tool output processor for handling large tool results. /// /// When set, tool execution results are passed through this processor /// before being placed into conversation history. pub fn set_output_processor(&mut self, processor: Arc) { self.output_processor = Some(processor); } /// Apply configuration (reserved for future extensions) #[allow(dead_code)] pub fn config(self, _config: WorkerConfig) -> Self { self } /// Execute a turn, consuming self and transitioning to Locked. /// /// This is the primary entry point for first use. Equivalent to /// `self.lock()` followed by `locked.run(user_input)`. /// /// Subsequent runs can use [`Worker::run()`] directly. /// To edit state between turns, call [`unlock()`](Worker::unlock) first. pub async fn run( self, user_input: impl Into, ) -> Result<(Worker, WorkerResult), WorkerError> { let mut locked = self.lock(); let result = locked.run(user_input).await?; Ok((locked, result)) } /// Resume from Paused, consuming self and transitioning to Locked. /// /// Used after `unlock()` → edit → resume. pub async fn resume( self, ) -> Result<(Worker, WorkerResult), WorkerError> { let mut locked = self.lock(); let result = locked.resume().await?; Ok((locked, result)) } /// Lock and transition to Locked state /// /// Flushes pending tool factories, then fixes the current system prompt /// and history as a "committed prefix". After this, only `run()` / `resume()` /// may append to history, ensuring cache hits. /// /// Most callers should use [`run()`](Self::run) instead, which calls /// this internally. Use `lock()` directly only when you need the /// `Locked` worker back on error (e.g. in a persistence layer). /// /// # Panics /// /// Panics if a pending tool factory produces a duplicate name. pub fn lock(self) -> Worker { self.tool_server.flush_pending(); let locked_prefix_len = self.history.len(); Worker { client: self.client, timeline: self.timeline, text_block_collector: self.text_block_collector, tool_call_collector: self.tool_call_collector, tool_server: self.tool_server, interceptor: self.interceptor, system_prompt: self.system_prompt, history: self.history, locked_prefix_len, turn_count: self.turn_count, max_turns: self.max_turns, turn_start_cbs: self.turn_start_cbs, turn_end_cbs: self.turn_end_cbs, request_config: self.request_config, last_run_interrupted: self.last_run_interrupted, output_processor: self.output_processor, cancel_tx: self.cancel_tx, cancel_rx: self.cancel_rx, _state: PhantomData, } } } impl Worker { /// Execute a turn /// /// Adds a new user message to history and sends a request to the LLM. /// Automatically loops if there are tool calls. pub async fn run( &mut self, user_input: impl Into, ) -> Result { self.reset_interruption_state(); // Interceptor: on_prompt_submit let mut user_item = Item::user_message(user_input); match self.interceptor.on_prompt_submit(&mut user_item).await { PromptAction::Cancel(reason) => { self.last_run_interrupted = true; return self .finalize_interruption(Err(WorkerError::Aborted(reason))) .await; } PromptAction::Continue => {} } self.history.push(user_item); let result = self.run_turn_loop().await; self.finalize_interruption(result).await } /// Resume execution (from Paused state) /// /// Resumes turn processing from current state without adding a new user message. pub async fn resume(&mut self) -> Result { self.reset_interruption_state(); let result = self.run_turn_loop().await; self.finalize_interruption(result).await } /// Get the prefix length at lock time pub fn locked_prefix_len(&self) -> usize { self.locked_prefix_len } /// Unlock and return to Mutable state /// /// Note: After this operation, subsequent requests may not hit the cache. /// Use only when you need to edit history. pub fn unlock(self) -> Worker { Worker { client: self.client, timeline: self.timeline, text_block_collector: self.text_block_collector, tool_call_collector: self.tool_call_collector, tool_server: self.tool_server, interceptor: self.interceptor, system_prompt: self.system_prompt, history: self.history, locked_prefix_len: 0, turn_count: self.turn_count, max_turns: self.max_turns, turn_start_cbs: self.turn_start_cbs, turn_end_cbs: self.turn_end_cbs, request_config: self.request_config, last_run_interrupted: self.last_run_interrupted, output_processor: self.output_processor, cancel_tx: self.cancel_tx, cancel_rx: self.cancel_rx, _state: PhantomData, } } } #[cfg(test)] mod tests { // Basic tests only. Tests using LlmClient are done in integration tests. }