From 9405ffc6332babdd72be98adc5da361aef07b54b Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 26 May 2026 19:57:47 +0900 Subject: [PATCH] feat: add session stream event trace flag --- crates/llm-worker/src/worker.rs | 29 ++++++++++++++++++++++-- crates/manifest/src/config.rs | 39 ++++++++++++++++++++++++++++++++- crates/manifest/src/lib.rs | 12 ++++++++++ crates/pod/src/pod.rs | 23 +++++++++++++++++++ 4 files changed, 100 insertions(+), 3 deletions(-) diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index ece8c1d1..0d4e16f2 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -18,7 +18,8 @@ use crate::{ }, llm_client::{ ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ResponseStream, - ToolDefinition, error::is_retryable, retry::RetryPolicy, types::parse_tool_arguments, + ToolDefinition, error::is_retryable, event::Event, retry::RetryPolicy, + types::parse_tool_arguments, }, state::{Locked, Mutable, WorkerState}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, @@ -200,6 +201,9 @@ pub struct Worker { llm_retry_cbs: Vec>, /// Stream continuation callbacks for a specific LlmCall. llm_continuation_cbs: Vec>, + /// Stream event callbacks. Fired for every normalized provider stream + /// event before it enters the Timeline. + stream_event_cbs: Vec>, /// Non-fatal warning callbacks. Invoked when the Worker wants to /// surface an advisory message to the upper layer (e.g. Pod) so it /// can be forwarded to the user — distinct from `tracing::warn!`, @@ -408,6 +412,20 @@ impl Worker { } } + /// Register a raw normalized stream event callback. + pub fn on_stream_event( + &mut self, + callback: impl Fn(usize, usize, &Event) + Send + Sync + 'static, + ) { + self.stream_event_cbs.push(Box::new(callback)); + } + + fn emit_stream_event(&self, turn: usize, llm_call: usize, event: &Event) { + for cb in &self.stream_event_cbs { + cb(turn, llm_call, event); + } + } + /// Register a non-fatal warning callback. /// /// The callback is invoked with a short human-readable message @@ -1137,7 +1155,9 @@ impl Worker { // Stream LLM response let request = self.build_request(&tool_definitions, &request_context); - let stream_outcome = self.stream_response(request, current_llm_call).await?; + let stream_outcome = self + .stream_response(request, current_turn, current_llm_call) + .await?; for cb in &self.llm_call_end_cbs { cb(current_llm_call); @@ -1312,6 +1332,7 @@ impl Worker { async fn stream_response( &mut self, request: Request, + turn: usize, llm_call: usize, ) -> Result { debug!( @@ -1349,6 +1370,7 @@ impl Worker { }); } }; + self.emit_stream_event(turn, llm_call, &event); self.timeline.dispatch(&event); } None => break, @@ -1441,6 +1463,7 @@ impl Worker { llm_call_end_cbs: Vec::new(), llm_retry_cbs: Vec::new(), llm_continuation_cbs: Vec::new(), + stream_event_cbs: Vec::new(), warning_cbs: Vec::new(), tool_result_cbs: Vec::new(), history_append_cbs: Vec::new(), @@ -1703,6 +1726,7 @@ impl Worker { llm_call_end_cbs: self.llm_call_end_cbs, llm_retry_cbs: self.llm_retry_cbs, llm_continuation_cbs: self.llm_continuation_cbs, + stream_event_cbs: self.stream_event_cbs, warning_cbs: self.warning_cbs, tool_result_cbs: self.tool_result_cbs, history_append_cbs: self.history_append_cbs, @@ -1793,6 +1817,7 @@ impl Worker { llm_call_end_cbs: self.llm_call_end_cbs, llm_retry_cbs: self.llm_retry_cbs, llm_continuation_cbs: self.llm_continuation_cbs, + stream_event_cbs: self.stream_event_cbs, warning_cbs: self.warning_cbs, tool_result_cbs: self.tool_result_cbs, history_append_cbs: self.history_append_cbs, diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index 67f9130a..cd362713 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -17,7 +17,8 @@ use crate::defaults; use crate::model::{AuthRef, ModelManifest, ReasoningControl}; use crate::{ CompactionConfig, FileUploadLimits, MemoryConfig, PodManifest, PodMeta, ScopeConfig, - SkillsConfig, ToolOutputLimits, ToolPermissionConfig, ToolPermissionRule, WorkerManifest, + SessionConfig, SkillsConfig, ToolOutputLimits, ToolPermissionConfig, ToolPermissionRule, + WorkerManifest, }; /// Partial-form Pod manifest. Every field is optional; one or more @@ -37,6 +38,8 @@ pub struct PodManifestConfig { pub worker: WorkerManifestConfig, #[serde(default)] pub scope: ScopeConfig, + #[serde(default)] + pub session: Option, /// Optional `[permissions]` section. `None` means the permission layer /// is disabled; `Some` requires `default_action` during final resolve. #[serde(default)] @@ -102,6 +105,12 @@ pub struct FileUploadLimitsPartial { pub max_bytes: Option, } +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SessionConfigPartial { + #[serde(default)] + pub record_event_trace: Option, +} + #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct PermissionConfigPartial { #[serde(default)] @@ -260,6 +269,7 @@ impl PodManifestConfig { model: self.model.merge(upper.model), worker: self.worker.merge(upper.worker), scope: merge_scope(self.scope, upper.scope), + session: merge_option(self.session, upper.session, SessionConfigPartial::merge), permissions: merge_option( self.permissions, upper.permissions, @@ -353,6 +363,14 @@ impl FileUploadLimitsPartial { } } +impl SessionConfigPartial { + fn merge(self, upper: Self) -> Self { + Self { + record_event_trace: upper.record_event_trace.or(self.record_event_trace), + } + } +} + impl PermissionConfigPartial { fn merge(mut self, upper: Self) -> Self { self.rules.extend(upper.rules); @@ -496,6 +514,12 @@ impl TryFrom for PodManifest { for rule in &cfg.scope.deny { ensure_absolute("scope.deny.target", &rule.target)?; } + let session = SessionConfig { + record_event_trace: cfg + .session + .and_then(|s| s.record_event_trace) + .unwrap_or(false), + }; let permissions = cfg .permissions @@ -550,6 +574,7 @@ impl TryFrom for PodManifest { model: cfg.model, worker, scope: cfg.scope, + session, permissions, compaction, memory: cfg.memory, @@ -596,6 +621,7 @@ mod tests { deny: Vec::new(), }, permissions: None, + session: None, compaction: None, memory: None, skills: None, @@ -610,6 +636,17 @@ mod tests { assert!(manifest.permissions.is_none()); } + #[test] + fn resolve_session_record_event_trace() { + let mut cfg = minimal_valid(); + cfg.session = Some(SessionConfigPartial { + record_event_trace: Some(true), + }); + + let manifest: PodManifest = cfg.try_into().unwrap(); + assert!(manifest.session.record_event_trace); + } + #[test] fn resolve_permissions_requires_default_action_when_present() { let mut cfg = minimal_valid(); diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index 254df718..2ad132cf 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -37,6 +37,9 @@ pub struct PodManifest { pub model: ModelManifest, pub worker: WorkerManifest, pub scope: ScopeConfig, + /// Session/debug persistence settings. Defaults keep extra traces off. + #[serde(default)] + pub session: SessionConfig, /// Optional manifest-level tool permission policy. Absent means the /// permission layer is disabled and tool calls run as before. #[serde(default)] @@ -300,6 +303,15 @@ pub struct ScopeConfig { pub deny: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct SessionConfig { + /// Persist every provider stream event directly to `trace.jsonl` next to the + /// segment log. Intended for debugging stalls between stream requests; off + /// by default because it can be verbose. + #[serde(default)] + pub record_event_trace: bool, +} + /// Manifest-level pattern-based tool permission policy. /// /// Presence of `[permissions]` enables this layer. Rules are evaluated diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index e9434aa8..630ef87f 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -167,6 +167,15 @@ where self.sink.publish(entry); Ok(()) } + + /// Append a debug trace record alongside the current segment log. Trace + /// writes deliberately do not affect the segment entry counter or live + /// replay sink because they are not conversation history. + pub fn append_trace(&self, entry: &session_store::TraceEntry) -> Result<(), StoreError> { + let loc = self.state.location(); + self.store + .append_trace(loc.session_id, loc.segment_id, entry) + } } /// Type-erased commit handle for the interceptor. Lets the @@ -502,6 +511,20 @@ impl Pod { warn!(error = %err, "history append commit failed; dropping"); } }); + if self.manifest.session.record_event_trace { + let writer = self.log_writer_handle(); + self.worker_mut() + .on_stream_event(move |turn, _llm_call, event| { + let entry = session_store::TraceEntry { + ts: segment_log::now_millis(), + turn, + event: event.clone(), + }; + if let Err(err) = writer.append_trace(&entry) { + warn!(error = %err, "stream event trace commit failed; dropping"); + } + }); + } self.history_persistence_wired = true; }