From e49817c2d532c4fa03be9112912e484ca7d79882 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 26 May 2026 21:05:45 +0900 Subject: [PATCH] feat: trace pre-stream lifecycle --- crates/llm-worker/src/worker.rs | 154 +++++++++++++++++++- crates/pod/src/pod.rs | 23 ++- crates/session-store/src/event_trace.rs | 31 +++- crates/session-store/src/lib.rs | 2 +- crates/session-store/tests/fs_store_test.rs | 9 +- 5 files changed, 204 insertions(+), 15 deletions(-) diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 0d4e16f2..b65036be 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::{marker::PhantomData, time::Instant}; use futures::StreamExt; +use serde_json::{Value, json}; use tokio::sync::mpsc; use tracing::{debug, info, trace, warn}; @@ -204,6 +205,9 @@ pub struct Worker { /// Stream event callbacks. Fired for every normalized provider stream /// event before it enters the Timeline. stream_event_cbs: Vec>, + /// Pre-stream lifecycle callbacks for debugging stalls before provider + /// stream events become visible. + lifecycle_trace_cbs: Vec>, /// Non-fatal warning callbacks. Invoked when the Worker wants to /// surface an advisory message to the upper layer (e.g. Pod) so it /// can be forwarded to the user — distinct from `tracing::warn!`, @@ -426,6 +430,20 @@ impl Worker { } } + /// Register a pre-stream lifecycle trace callback. + pub fn on_lifecycle_trace( + &mut self, + callback: impl Fn(usize, usize, &str, &Value) + Send + Sync + 'static, + ) { + self.lifecycle_trace_cbs.push(Box::new(callback)); + } + + fn emit_lifecycle_trace(&self, turn: usize, llm_call: usize, label: &str, data: Value) { + for cb in &self.lifecycle_trace_cbs { + cb(turn, llm_call, label, &data); + } + } + /// Register a non-fatal warning callback. /// /// The callback is invoked with a short human-readable message @@ -478,6 +496,15 @@ impl Worker { } } + fn request_trace_payload(&self, request: &Request) -> Value { + items_trace_payload( + &request.items, + request.tools.len(), + request.cache_anchor, + request.cache_key.is_some(), + ) + } + /// Register an AgentTurn-end callback. See [`on_turn_start`](Self::on_turn_start) /// for the 1:1-vs-N relation with `LlmCall*`. pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) { @@ -1154,7 +1181,19 @@ impl Worker { } // Stream LLM response + self.emit_lifecycle_trace( + current_turn, + current_llm_call, + "build_request_start", + items_trace_payload(&request_context, tool_definitions.len(), None, false), + ); let request = self.build_request(&tool_definitions, &request_context); + self.emit_lifecycle_trace( + current_turn, + current_llm_call, + "build_request_done", + self.request_trace_payload(&request), + ); let stream_outcome = self .stream_response(request, current_turn, current_llm_call) .await?; @@ -1255,6 +1294,7 @@ impl Worker { async fn open_stream_with_retry( &mut self, request: Request, + turn: usize, llm_call: usize, ) -> Result { let policy = self.retry_policy.clone(); @@ -1262,12 +1302,32 @@ impl Worker { let mut failed_attempt: u32 = 0; loop { + let attempt = failed_attempt + 1; + self.emit_lifecycle_trace( + turn, + llm_call, + "stream_open_start", + json!({ + "attempt": attempt, + "request": self.request_trace_payload(&request), + }), + ); + let stream_started = Instant::now(); let stream_result = tokio::select! { stream_result = self.client.stream(request.clone()) => stream_result, cancel = self.cancel_rx.recv() => { if cancel.is_some() { info!("Cancelled before stream started"); } + self.emit_lifecycle_trace( + turn, + llm_call, + "stream_open_cancelled", + json!({ + "attempt": attempt, + "elapsed_ms": stream_started.elapsed().as_millis() as u64, + }), + ); self.timeline.abort_current_block(); self.last_run_interrupted = true; return Err(WorkerError::Cancelled); @@ -1275,8 +1335,31 @@ impl Worker { }; match stream_result { - Ok(stream) => return Ok(stream), + Ok(stream) => { + self.emit_lifecycle_trace( + turn, + llm_call, + "stream_open_success", + json!({ + "attempt": attempt, + "elapsed_ms": stream_started.elapsed().as_millis() as u64, + }), + ); + return Ok(stream); + } Err(err) => { + self.emit_lifecycle_trace( + turn, + llm_call, + "stream_open_error", + json!({ + "attempt": attempt, + "elapsed_ms": stream_started.elapsed().as_millis() as u64, + "retryable": is_retryable(&err), + "status": err.status(), + "error": err.to_string(), + }), + ); let next_failed_attempt = failed_attempt + 1; if next_failed_attempt >= policy.max_attempts || !is_retryable(&err) { self.last_run_interrupted = true; @@ -1342,7 +1425,7 @@ impl Worker { "Sending request to LLM" ); - let mut stream = self.open_stream_with_retry(request, llm_call).await?; + let mut stream = self.open_stream_with_retry(request, turn, llm_call).await?; let mut event_count: usize = 0; loop { @@ -1370,6 +1453,14 @@ impl Worker { }); } }; + if event_count == 1 { + self.emit_lifecycle_trace( + turn, + llm_call, + "stream_first_event", + json!({}), + ); + } self.emit_stream_event(turn, llm_call, &event); self.timeline.dispatch(&event); } @@ -1464,6 +1555,7 @@ impl Worker { llm_retry_cbs: Vec::new(), llm_continuation_cbs: Vec::new(), stream_event_cbs: Vec::new(), + lifecycle_trace_cbs: Vec::new(), warning_cbs: Vec::new(), tool_result_cbs: Vec::new(), history_append_cbs: Vec::new(), @@ -1727,6 +1819,7 @@ impl Worker { llm_retry_cbs: self.llm_retry_cbs, llm_continuation_cbs: self.llm_continuation_cbs, stream_event_cbs: self.stream_event_cbs, + lifecycle_trace_cbs: self.lifecycle_trace_cbs, warning_cbs: self.warning_cbs, tool_result_cbs: self.tool_result_cbs, history_append_cbs: self.history_append_cbs, @@ -1818,6 +1911,7 @@ impl Worker { llm_retry_cbs: self.llm_retry_cbs, llm_continuation_cbs: self.llm_continuation_cbs, stream_event_cbs: self.stream_event_cbs, + lifecycle_trace_cbs: self.lifecycle_trace_cbs, warning_cbs: self.warning_cbs, tool_result_cbs: self.tool_result_cbs, history_append_cbs: self.history_append_cbs, @@ -1838,6 +1932,62 @@ impl Worker { } } +fn items_trace_payload( + items: &[Item], + tools_len: usize, + cache_anchor: Option, + cache_key_present: bool, +) -> Value { + let last = items.last(); + let last_tool_result = match last { + Some(Item::ToolResult { + call_id, + summary, + content, + is_error, + .. + }) => { + let tool_name = items.iter().rev().find_map(|item| match item { + Item::ToolCall { + call_id: candidate, + name, + .. + } if candidate == call_id => Some(name.as_str()), + _ => None, + }); + Some(json!({ + "call_id": call_id, + "tool_name": tool_name, + "summary": summary, + "summary_bytes": summary.len(), + "content_bytes": content.as_ref().map(|s| s.len()).unwrap_or(0), + "is_error": is_error, + })) + } + _ => None, + }; + + json!({ + "items_len": items.len(), + "items_json_bytes": serde_json::to_vec(items).map(|bytes| bytes.len()).ok(), + "tools_len": tools_len, + "cache_anchor": cache_anchor, + "cache_key_present": cache_key_present, + "last_item_kind": last.map(item_kind), + "last_item_json_bytes": last.and_then(|item| serde_json::to_vec(item).ok().map(|bytes| bytes.len())), + "last_tool_result": last_tool_result, + }) +} + +fn item_kind(item: &Item) -> &'static str { + match item { + Item::Message { .. } => "message", + Item::ToolCall { .. } => "tool_call", + Item::ToolResult { .. } => "tool_result", + Item::Reasoning { .. } => "reasoning", + } +} + #[cfg(test)] mod tests { // Basic tests only. Tests using LlmClient are done in integration tests. diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 630ef87f..44b97ff7 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -514,16 +514,35 @@ impl Pod { if self.manifest.session.record_event_trace { let writer = self.log_writer_handle(); self.worker_mut() - .on_stream_event(move |turn, _llm_call, event| { + .on_stream_event(move |turn, llm_call, event| { let entry = session_store::TraceEntry { ts: segment_log::now_millis(), turn, - event: event.clone(), + llm_call: Some(llm_call), + payload: session_store::TracePayload::StreamEvent { + event: event.clone(), + }, }; if let Err(err) = writer.append_trace(&entry) { warn!(error = %err, "stream event trace commit failed; dropping"); } }); + let writer = self.log_writer_handle(); + self.worker_mut() + .on_lifecycle_trace(move |turn, llm_call, label, data| { + let entry = session_store::TraceEntry { + ts: segment_log::now_millis(), + turn, + llm_call: Some(llm_call), + payload: session_store::TracePayload::Lifecycle { + label: label.to_string(), + data: data.clone(), + }, + }; + if let Err(err) = writer.append_trace(&entry) { + warn!(error = %err, "lifecycle trace commit failed; dropping"); + } + }); } self.history_persistence_wired = true; } diff --git a/crates/session-store/src/event_trace.rs b/crates/session-store/src/event_trace.rs index 7f152520..75445da0 100644 --- a/crates/session-store/src/event_trace.rs +++ b/crates/session-store/src/event_trace.rs @@ -1,21 +1,38 @@ -//! Debug-only raw stream event recording. +//! Debug-only LLM request/stream trace recording. //! -//! [`TraceEntry`] captures every LLM stream event verbatim for debugging -//! and post-hoc analysis. Written to a separate `.trace.jsonl` file, +//! [`TraceEntry`] captures stream lifecycle markers and raw provider stream +//! events for debugging stalls. Written to a separate `.trace.jsonl` file, //! completely independent of the segment log used for state restoration. //! //! Disabled by default. Enable via `SessionConfig::record_event_trace`. use llm_worker::llm_client::event::Event; use serde::{Deserialize, Serialize}; +use serde_json::Value; -/// A single trace entry recording a raw stream event. -#[derive(Debug, Clone, Serialize, Deserialize)] +/// A single trace entry recording either a lifecycle marker or raw stream event. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TraceEntry { /// Timestamp in milliseconds since Unix epoch. pub ts: u64, /// Turn number at the time of recording. pub turn: usize, - /// The raw stream event. - pub event: Event, + /// LLM call index within the worker, when known. + #[serde(skip_serializing_if = "Option::is_none")] + pub llm_call: Option, + #[serde(flatten)] + pub payload: TracePayload, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum TracePayload { + /// Normalized provider stream event. + StreamEvent { event: Event }, + /// Marker for code that runs before/around provider stream events. + Lifecycle { + label: String, + #[serde(default, skip_serializing_if = "Value::is_null")] + data: Value, + }, } diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 3dd80450..da11e4dc 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -39,7 +39,7 @@ pub mod segment_log; pub mod store; pub mod system_item; -pub use event_trace::TraceEntry; +pub use event_trace::{TraceEntry, TracePayload}; pub use fs_store::FsStore; pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index c920b8c6..5c6d7327 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -174,9 +174,12 @@ fn trace_entries_in_separate_file() { let trace = TraceEntry { ts: 1500, turn: 0, - event: llm_worker::llm_client::event::Event::Ping( - llm_worker::llm_client::event::PingEvent { timestamp: None }, - ), + llm_call: Some(0), + payload: session_store::TracePayload::StreamEvent { + event: llm_worker::llm_client::event::Event::Ping( + llm_worker::llm_client::event::PingEvent { timestamp: None }, + ), + }, }; store.append_trace(sid, segid, &trace).unwrap();