feat: add session stream event trace flag

This commit is contained in:
Keisuke Hirata 2026-05-26 19:57:47 +09:00
parent 5ccfdea7c8
commit 372a99bc0b
4 changed files with 100 additions and 3 deletions

View File

@ -18,7 +18,8 @@ use crate::{
}, },
llm_client::{ llm_client::{
ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ResponseStream, ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ResponseStream,
ToolDefinition, error::is_retryable, retry::RetryPolicy, types::parse_tool_arguments, ToolDefinition, error::is_retryable, event::Event, retry::RetryPolicy,
types::parse_tool_arguments,
}, },
state::{Locked, Mutable, WorkerState}, state::{Locked, Mutable, WorkerState},
timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
@ -200,6 +201,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
llm_retry_cbs: Vec<Box<dyn Fn(usize, &LlmRetryNotice) + Send + Sync>>, llm_retry_cbs: Vec<Box<dyn Fn(usize, &LlmRetryNotice) + Send + Sync>>,
/// Stream continuation callbacks for a specific LlmCall. /// Stream continuation callbacks for a specific LlmCall.
llm_continuation_cbs: Vec<Box<dyn Fn(usize, u32, u32, &str) + Send + Sync>>, llm_continuation_cbs: Vec<Box<dyn Fn(usize, u32, u32, &str) + Send + Sync>>,
/// Stream event callbacks. Fired for every normalized provider stream
/// event before it enters the Timeline.
stream_event_cbs: Vec<Box<dyn Fn(usize, usize, &Event) + Send + Sync>>,
/// Non-fatal warning callbacks. Invoked when the Worker wants to /// Non-fatal warning callbacks. Invoked when the Worker wants to
/// surface an advisory message to the upper layer (e.g. Pod) so it /// surface an advisory message to the upper layer (e.g. Pod) so it
/// can be forwarded to the user — distinct from `tracing::warn!`, /// can be forwarded to the user — distinct from `tracing::warn!`,
@ -408,6 +412,20 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
} }
} }
/// Register a raw normalized stream event callback.
pub fn on_stream_event(
&mut self,
callback: impl Fn(usize, usize, &Event) + Send + Sync + 'static,
) {
self.stream_event_cbs.push(Box::new(callback));
}
fn emit_stream_event(&self, turn: usize, llm_call: usize, event: &Event) {
for cb in &self.stream_event_cbs {
cb(turn, llm_call, event);
}
}
/// Register a non-fatal warning callback. /// Register a non-fatal warning callback.
/// ///
/// The callback is invoked with a short human-readable message /// The callback is invoked with a short human-readable message
@ -1137,7 +1155,9 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// Stream LLM response // Stream LLM response
let request = self.build_request(&tool_definitions, &request_context); let request = self.build_request(&tool_definitions, &request_context);
let stream_outcome = self.stream_response(request, current_llm_call).await?; let stream_outcome = self
.stream_response(request, current_turn, current_llm_call)
.await?;
for cb in &self.llm_call_end_cbs { for cb in &self.llm_call_end_cbs {
cb(current_llm_call); cb(current_llm_call);
@ -1312,6 +1332,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
async fn stream_response( async fn stream_response(
&mut self, &mut self,
request: Request, request: Request,
turn: usize,
llm_call: usize, llm_call: usize,
) -> Result<StreamCompletion, WorkerError> { ) -> Result<StreamCompletion, WorkerError> {
debug!( debug!(
@ -1349,6 +1370,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}); });
} }
}; };
self.emit_stream_event(turn, llm_call, &event);
self.timeline.dispatch(&event); self.timeline.dispatch(&event);
} }
None => break, None => break,
@ -1441,6 +1463,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
llm_call_end_cbs: Vec::new(), llm_call_end_cbs: Vec::new(),
llm_retry_cbs: Vec::new(), llm_retry_cbs: Vec::new(),
llm_continuation_cbs: Vec::new(), llm_continuation_cbs: Vec::new(),
stream_event_cbs: Vec::new(),
warning_cbs: Vec::new(), warning_cbs: Vec::new(),
tool_result_cbs: Vec::new(), tool_result_cbs: Vec::new(),
history_append_cbs: Vec::new(), history_append_cbs: Vec::new(),
@ -1703,6 +1726,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
llm_call_end_cbs: self.llm_call_end_cbs, llm_call_end_cbs: self.llm_call_end_cbs,
llm_retry_cbs: self.llm_retry_cbs, llm_retry_cbs: self.llm_retry_cbs,
llm_continuation_cbs: self.llm_continuation_cbs, llm_continuation_cbs: self.llm_continuation_cbs,
stream_event_cbs: self.stream_event_cbs,
warning_cbs: self.warning_cbs, warning_cbs: self.warning_cbs,
tool_result_cbs: self.tool_result_cbs, tool_result_cbs: self.tool_result_cbs,
history_append_cbs: self.history_append_cbs, history_append_cbs: self.history_append_cbs,
@ -1793,6 +1817,7 @@ impl<C: LlmClient> Worker<C, Locked> {
llm_call_end_cbs: self.llm_call_end_cbs, llm_call_end_cbs: self.llm_call_end_cbs,
llm_retry_cbs: self.llm_retry_cbs, llm_retry_cbs: self.llm_retry_cbs,
llm_continuation_cbs: self.llm_continuation_cbs, llm_continuation_cbs: self.llm_continuation_cbs,
stream_event_cbs: self.stream_event_cbs,
warning_cbs: self.warning_cbs, warning_cbs: self.warning_cbs,
tool_result_cbs: self.tool_result_cbs, tool_result_cbs: self.tool_result_cbs,
history_append_cbs: self.history_append_cbs, history_append_cbs: self.history_append_cbs,

View File

@ -17,7 +17,8 @@ use crate::defaults;
use crate::model::{AuthRef, ModelManifest, ReasoningControl}; use crate::model::{AuthRef, ModelManifest, ReasoningControl};
use crate::{ use crate::{
CompactionConfig, FileUploadLimits, MemoryConfig, PodManifest, PodMeta, ScopeConfig, CompactionConfig, FileUploadLimits, MemoryConfig, PodManifest, PodMeta, ScopeConfig,
SkillsConfig, ToolOutputLimits, ToolPermissionConfig, ToolPermissionRule, WorkerManifest, SessionConfig, SkillsConfig, ToolOutputLimits, ToolPermissionConfig, ToolPermissionRule,
WorkerManifest,
}; };
/// Partial-form Pod manifest. Every field is optional; one or more /// Partial-form Pod manifest. Every field is optional; one or more
@ -37,6 +38,8 @@ pub struct PodManifestConfig {
pub worker: WorkerManifestConfig, pub worker: WorkerManifestConfig,
#[serde(default)] #[serde(default)]
pub scope: ScopeConfig, pub scope: ScopeConfig,
#[serde(default)]
pub session: Option<SessionConfigPartial>,
/// Optional `[permissions]` section. `None` means the permission layer /// Optional `[permissions]` section. `None` means the permission layer
/// is disabled; `Some` requires `default_action` during final resolve. /// is disabled; `Some` requires `default_action` during final resolve.
#[serde(default)] #[serde(default)]
@ -102,6 +105,12 @@ pub struct FileUploadLimitsPartial {
pub max_bytes: Option<usize>, pub max_bytes: Option<usize>,
} }
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SessionConfigPartial {
#[serde(default)]
pub record_event_trace: Option<bool>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PermissionConfigPartial { pub struct PermissionConfigPartial {
#[serde(default)] #[serde(default)]
@ -260,6 +269,7 @@ impl PodManifestConfig {
model: self.model.merge(upper.model), model: self.model.merge(upper.model),
worker: self.worker.merge(upper.worker), worker: self.worker.merge(upper.worker),
scope: merge_scope(self.scope, upper.scope), scope: merge_scope(self.scope, upper.scope),
session: merge_option(self.session, upper.session, SessionConfigPartial::merge),
permissions: merge_option( permissions: merge_option(
self.permissions, self.permissions,
upper.permissions, upper.permissions,
@ -353,6 +363,14 @@ impl FileUploadLimitsPartial {
} }
} }
impl SessionConfigPartial {
fn merge(self, upper: Self) -> Self {
Self {
record_event_trace: upper.record_event_trace.or(self.record_event_trace),
}
}
}
impl PermissionConfigPartial { impl PermissionConfigPartial {
fn merge(mut self, upper: Self) -> Self { fn merge(mut self, upper: Self) -> Self {
self.rules.extend(upper.rules); self.rules.extend(upper.rules);
@ -496,6 +514,12 @@ impl TryFrom<PodManifestConfig> for PodManifest {
for rule in &cfg.scope.deny { for rule in &cfg.scope.deny {
ensure_absolute("scope.deny.target", &rule.target)?; ensure_absolute("scope.deny.target", &rule.target)?;
} }
let session = SessionConfig {
record_event_trace: cfg
.session
.and_then(|s| s.record_event_trace)
.unwrap_or(false),
};
let permissions = cfg let permissions = cfg
.permissions .permissions
@ -550,6 +574,7 @@ impl TryFrom<PodManifestConfig> for PodManifest {
model: cfg.model, model: cfg.model,
worker, worker,
scope: cfg.scope, scope: cfg.scope,
session,
permissions, permissions,
compaction, compaction,
memory: cfg.memory, memory: cfg.memory,
@ -596,6 +621,7 @@ mod tests {
deny: Vec::new(), deny: Vec::new(),
}, },
permissions: None, permissions: None,
session: None,
compaction: None, compaction: None,
memory: None, memory: None,
skills: None, skills: None,
@ -610,6 +636,17 @@ mod tests {
assert!(manifest.permissions.is_none()); assert!(manifest.permissions.is_none());
} }
#[test]
fn resolve_session_record_event_trace() {
let mut cfg = minimal_valid();
cfg.session = Some(SessionConfigPartial {
record_event_trace: Some(true),
});
let manifest: PodManifest = cfg.try_into().unwrap();
assert!(manifest.session.record_event_trace);
}
#[test] #[test]
fn resolve_permissions_requires_default_action_when_present() { fn resolve_permissions_requires_default_action_when_present() {
let mut cfg = minimal_valid(); let mut cfg = minimal_valid();

View File

@ -37,6 +37,9 @@ pub struct PodManifest {
pub model: ModelManifest, pub model: ModelManifest,
pub worker: WorkerManifest, pub worker: WorkerManifest,
pub scope: ScopeConfig, pub scope: ScopeConfig,
/// Session/debug persistence settings. Defaults keep extra traces off.
#[serde(default)]
pub session: SessionConfig,
/// Optional manifest-level tool permission policy. Absent means the /// Optional manifest-level tool permission policy. Absent means the
/// permission layer is disabled and tool calls run as before. /// permission layer is disabled and tool calls run as before.
#[serde(default)] #[serde(default)]
@ -300,6 +303,15 @@ pub struct ScopeConfig {
pub deny: Vec<ScopeRule>, pub deny: Vec<ScopeRule>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct SessionConfig {
/// Persist every provider stream event directly to `trace.jsonl` next to the
/// segment log. Intended for debugging stalls between stream requests; off
/// by default because it can be verbose.
#[serde(default)]
pub record_event_trace: bool,
}
/// Manifest-level pattern-based tool permission policy. /// Manifest-level pattern-based tool permission policy.
/// ///
/// Presence of `[permissions]` enables this layer. Rules are evaluated /// Presence of `[permissions]` enables this layer. Rules are evaluated

View File

@ -167,6 +167,15 @@ where
self.sink.publish(entry); self.sink.publish(entry);
Ok(()) Ok(())
} }
/// Append a debug trace record alongside the current segment log. Trace
/// writes deliberately do not affect the segment entry counter or live
/// replay sink because they are not conversation history.
pub fn append_trace(&self, entry: &session_store::TraceEntry) -> Result<(), StoreError> {
let loc = self.state.location();
self.store
.append_trace(loc.session_id, loc.segment_id, entry)
}
} }
/// Type-erased commit handle for the interceptor. Lets the /// Type-erased commit handle for the interceptor. Lets the
@ -502,6 +511,20 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
warn!(error = %err, "history append commit failed; dropping"); warn!(error = %err, "history append commit failed; dropping");
} }
}); });
if self.manifest.session.record_event_trace {
let writer = self.log_writer_handle();
self.worker_mut()
.on_stream_event(move |turn, _llm_call, event| {
let entry = session_store::TraceEntry {
ts: segment_log::now_millis(),
turn,
event: event.clone(),
};
if let Err(err) = writer.append_trace(&entry) {
warn!(error = %err, "stream event trace commit failed; dropping");
}
});
}
self.history_persistence_wired = true; self.history_persistence_wired = true;
} }