feat: trace pre-stream lifecycle

This commit is contained in:
Keisuke Hirata 2026-05-26 21:05:45 +09:00
parent 9405ffc633
commit e49817c2d5
5 changed files with 204 additions and 15 deletions

View File

@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::{marker::PhantomData, time::Instant};
use futures::StreamExt;
use serde_json::{Value, json};
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};
@ -204,6 +205,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
/// Stream event callbacks. Fired for every normalized provider stream
/// event before it enters the Timeline.
stream_event_cbs: Vec<Box<dyn Fn(usize, usize, &Event) + Send + Sync>>,
/// Pre-stream lifecycle callbacks for debugging stalls before provider
/// stream events become visible.
lifecycle_trace_cbs: Vec<Box<dyn Fn(usize, usize, &str, &Value) + Send + Sync>>,
/// Non-fatal warning callbacks. Invoked when the Worker wants to
/// surface an advisory message to the upper layer (e.g. Pod) so it
/// can be forwarded to the user — distinct from `tracing::warn!`,
@ -426,6 +430,20 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
}
/// Register a pre-stream lifecycle trace callback.
pub fn on_lifecycle_trace(
&mut self,
callback: impl Fn(usize, usize, &str, &Value) + Send + Sync + 'static,
) {
self.lifecycle_trace_cbs.push(Box::new(callback));
}
fn emit_lifecycle_trace(&self, turn: usize, llm_call: usize, label: &str, data: Value) {
for cb in &self.lifecycle_trace_cbs {
cb(turn, llm_call, label, &data);
}
}
/// Register a non-fatal warning callback.
///
/// The callback is invoked with a short human-readable message
@ -478,6 +496,15 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
}
fn request_trace_payload(&self, request: &Request) -> Value {
items_trace_payload(
&request.items,
request.tools.len(),
request.cache_anchor,
request.cache_key.is_some(),
)
}
/// Register an AgentTurn-end callback. See [`on_turn_start`](Self::on_turn_start)
/// for the 1:1-vs-N relation with `LlmCall*`.
pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
@ -1154,7 +1181,19 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
// Stream LLM response
self.emit_lifecycle_trace(
current_turn,
current_llm_call,
"build_request_start",
items_trace_payload(&request_context, tool_definitions.len(), None, false),
);
let request = self.build_request(&tool_definitions, &request_context);
self.emit_lifecycle_trace(
current_turn,
current_llm_call,
"build_request_done",
self.request_trace_payload(&request),
);
let stream_outcome = self
.stream_response(request, current_turn, current_llm_call)
.await?;
@ -1255,6 +1294,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
async fn open_stream_with_retry(
&mut self,
request: Request,
turn: usize,
llm_call: usize,
) -> Result<ResponseStream, WorkerError> {
let policy = self.retry_policy.clone();
@ -1262,12 +1302,32 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
let mut failed_attempt: u32 = 0;
loop {
let attempt = failed_attempt + 1;
self.emit_lifecycle_trace(
turn,
llm_call,
"stream_open_start",
json!({
"attempt": attempt,
"request": self.request_trace_payload(&request),
}),
);
let stream_started = Instant::now();
let stream_result = tokio::select! {
stream_result = self.client.stream(request.clone()) => stream_result,
cancel = self.cancel_rx.recv() => {
if cancel.is_some() {
info!("Cancelled before stream started");
}
self.emit_lifecycle_trace(
turn,
llm_call,
"stream_open_cancelled",
json!({
"attempt": attempt,
"elapsed_ms": stream_started.elapsed().as_millis() as u64,
}),
);
self.timeline.abort_current_block();
self.last_run_interrupted = true;
return Err(WorkerError::Cancelled);
@ -1275,8 +1335,31 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
};
match stream_result {
Ok(stream) => return Ok(stream),
Ok(stream) => {
self.emit_lifecycle_trace(
turn,
llm_call,
"stream_open_success",
json!({
"attempt": attempt,
"elapsed_ms": stream_started.elapsed().as_millis() as u64,
}),
);
return Ok(stream);
}
Err(err) => {
self.emit_lifecycle_trace(
turn,
llm_call,
"stream_open_error",
json!({
"attempt": attempt,
"elapsed_ms": stream_started.elapsed().as_millis() as u64,
"retryable": is_retryable(&err),
"status": err.status(),
"error": err.to_string(),
}),
);
let next_failed_attempt = failed_attempt + 1;
if next_failed_attempt >= policy.max_attempts || !is_retryable(&err) {
self.last_run_interrupted = true;
@ -1342,7 +1425,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
"Sending request to LLM"
);
let mut stream = self.open_stream_with_retry(request, llm_call).await?;
let mut stream = self.open_stream_with_retry(request, turn, llm_call).await?;
let mut event_count: usize = 0;
loop {
@ -1370,6 +1453,14 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
});
}
};
if event_count == 1 {
self.emit_lifecycle_trace(
turn,
llm_call,
"stream_first_event",
json!({}),
);
}
self.emit_stream_event(turn, llm_call, &event);
self.timeline.dispatch(&event);
}
@ -1464,6 +1555,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
llm_retry_cbs: Vec::new(),
llm_continuation_cbs: Vec::new(),
stream_event_cbs: Vec::new(),
lifecycle_trace_cbs: Vec::new(),
warning_cbs: Vec::new(),
tool_result_cbs: Vec::new(),
history_append_cbs: Vec::new(),
@ -1727,6 +1819,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
llm_retry_cbs: self.llm_retry_cbs,
llm_continuation_cbs: self.llm_continuation_cbs,
stream_event_cbs: self.stream_event_cbs,
lifecycle_trace_cbs: self.lifecycle_trace_cbs,
warning_cbs: self.warning_cbs,
tool_result_cbs: self.tool_result_cbs,
history_append_cbs: self.history_append_cbs,
@ -1818,6 +1911,7 @@ impl<C: LlmClient> Worker<C, Locked> {
llm_retry_cbs: self.llm_retry_cbs,
llm_continuation_cbs: self.llm_continuation_cbs,
stream_event_cbs: self.stream_event_cbs,
lifecycle_trace_cbs: self.lifecycle_trace_cbs,
warning_cbs: self.warning_cbs,
tool_result_cbs: self.tool_result_cbs,
history_append_cbs: self.history_append_cbs,
@ -1838,6 +1932,62 @@ impl<C: LlmClient> Worker<C, Locked> {
}
}
fn items_trace_payload(
items: &[Item],
tools_len: usize,
cache_anchor: Option<usize>,
cache_key_present: bool,
) -> Value {
let last = items.last();
let last_tool_result = match last {
Some(Item::ToolResult {
call_id,
summary,
content,
is_error,
..
}) => {
let tool_name = items.iter().rev().find_map(|item| match item {
Item::ToolCall {
call_id: candidate,
name,
..
} if candidate == call_id => Some(name.as_str()),
_ => None,
});
Some(json!({
"call_id": call_id,
"tool_name": tool_name,
"summary": summary,
"summary_bytes": summary.len(),
"content_bytes": content.as_ref().map(|s| s.len()).unwrap_or(0),
"is_error": is_error,
}))
}
_ => None,
};
json!({
"items_len": items.len(),
"items_json_bytes": serde_json::to_vec(items).map(|bytes| bytes.len()).ok(),
"tools_len": tools_len,
"cache_anchor": cache_anchor,
"cache_key_present": cache_key_present,
"last_item_kind": last.map(item_kind),
"last_item_json_bytes": last.and_then(|item| serde_json::to_vec(item).ok().map(|bytes| bytes.len())),
"last_tool_result": last_tool_result,
})
}
fn item_kind(item: &Item) -> &'static str {
match item {
Item::Message { .. } => "message",
Item::ToolCall { .. } => "tool_call",
Item::ToolResult { .. } => "tool_result",
Item::Reasoning { .. } => "reasoning",
}
}
#[cfg(test)]
mod tests {
// Basic tests only. Tests using LlmClient are done in integration tests.

View File

@ -514,16 +514,35 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
if self.manifest.session.record_event_trace {
let writer = self.log_writer_handle();
self.worker_mut()
.on_stream_event(move |turn, _llm_call, event| {
.on_stream_event(move |turn, llm_call, event| {
let entry = session_store::TraceEntry {
ts: segment_log::now_millis(),
turn,
event: event.clone(),
llm_call: Some(llm_call),
payload: session_store::TracePayload::StreamEvent {
event: event.clone(),
},
};
if let Err(err) = writer.append_trace(&entry) {
warn!(error = %err, "stream event trace commit failed; dropping");
}
});
let writer = self.log_writer_handle();
self.worker_mut()
.on_lifecycle_trace(move |turn, llm_call, label, data| {
let entry = session_store::TraceEntry {
ts: segment_log::now_millis(),
turn,
llm_call: Some(llm_call),
payload: session_store::TracePayload::Lifecycle {
label: label.to_string(),
data: data.clone(),
},
};
if let Err(err) = writer.append_trace(&entry) {
warn!(error = %err, "lifecycle trace commit failed; dropping");
}
});
}
self.history_persistence_wired = true;
}

View File

@ -1,21 +1,38 @@
//! Debug-only raw stream event recording.
//! Debug-only LLM request/stream trace recording.
//!
//! [`TraceEntry`] captures every LLM stream event verbatim for debugging
//! and post-hoc analysis. Written to a separate `.trace.jsonl` file,
//! [`TraceEntry`] captures stream lifecycle markers and raw provider stream
//! events for debugging stalls. Written to a separate `.trace.jsonl` file,
//! completely independent of the segment log used for state restoration.
//!
//! Disabled by default. Enable via `SessionConfig::record_event_trace`.
use llm_worker::llm_client::event::Event;
use serde::{Deserialize, Serialize};
use serde_json::Value;
/// A single trace entry recording a raw stream event.
#[derive(Debug, Clone, Serialize, Deserialize)]
/// A single trace entry recording either a lifecycle marker or raw stream event.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TraceEntry {
/// Timestamp in milliseconds since Unix epoch.
pub ts: u64,
/// Turn number at the time of recording.
pub turn: usize,
/// The raw stream event.
pub event: Event,
/// LLM call index within the worker, when known.
#[serde(skip_serializing_if = "Option::is_none")]
pub llm_call: Option<usize>,
#[serde(flatten)]
pub payload: TracePayload,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum TracePayload {
/// Normalized provider stream event.
StreamEvent { event: Event },
/// Marker for code that runs before/around provider stream events.
Lifecycle {
label: String,
#[serde(default, skip_serializing_if = "Value::is_null")]
data: Value,
},
}

View File

@ -39,7 +39,7 @@ pub mod segment_log;
pub mod store;
pub mod system_item;
pub use event_trace::TraceEntry;
pub use event_trace::{TraceEntry, TracePayload};
pub use fs_store::FsStore;
pub use llm_worker::UsageRecord;
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};

View File

@ -174,9 +174,12 @@ fn trace_entries_in_separate_file() {
let trace = TraceEntry {
ts: 1500,
turn: 0,
event: llm_worker::llm_client::event::Event::Ping(
llm_worker::llm_client::event::PingEvent { timestamp: None },
),
llm_call: Some(0),
payload: session_store::TracePayload::StreamEvent {
event: llm_worker::llm_client::event::Event::Ping(
llm_worker::llm_client::event::PingEvent { timestamp: None },
),
},
};
store.append_trace(sid, segid, &trace).unwrap();