feat: add session stream event trace flag
This commit is contained in:
parent
77e2ad0c40
commit
9405ffc633
|
|
@ -18,7 +18,8 @@ use crate::{
|
|||
},
|
||||
llm_client::{
|
||||
ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ResponseStream,
|
||||
ToolDefinition, error::is_retryable, retry::RetryPolicy, types::parse_tool_arguments,
|
||||
ToolDefinition, error::is_retryable, event::Event, retry::RetryPolicy,
|
||||
types::parse_tool_arguments,
|
||||
},
|
||||
state::{Locked, Mutable, WorkerState},
|
||||
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
|
||||
|
|
@ -200,6 +201,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
llm_retry_cbs: Vec<Box<dyn Fn(usize, &LlmRetryNotice) + Send + Sync>>,
|
||||
/// Stream continuation callbacks for a specific LlmCall.
|
||||
llm_continuation_cbs: Vec<Box<dyn Fn(usize, u32, u32, &str) + Send + Sync>>,
|
||||
/// Stream event callbacks. Fired for every normalized provider stream
|
||||
/// event before it enters the Timeline.
|
||||
stream_event_cbs: Vec<Box<dyn Fn(usize, usize, &Event) + Send + Sync>>,
|
||||
/// Non-fatal warning callbacks. Invoked when the Worker wants to
|
||||
/// surface an advisory message to the upper layer (e.g. Pod) so it
|
||||
/// can be forwarded to the user — distinct from `tracing::warn!`,
|
||||
|
|
@ -408,6 +412,20 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Register a raw normalized stream event callback.
|
||||
pub fn on_stream_event(
|
||||
&mut self,
|
||||
callback: impl Fn(usize, usize, &Event) + Send + Sync + 'static,
|
||||
) {
|
||||
self.stream_event_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
fn emit_stream_event(&self, turn: usize, llm_call: usize, event: &Event) {
|
||||
for cb in &self.stream_event_cbs {
|
||||
cb(turn, llm_call, event);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a non-fatal warning callback.
|
||||
///
|
||||
/// The callback is invoked with a short human-readable message
|
||||
|
|
@ -1137,7 +1155,9 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
|
||||
// Stream LLM response
|
||||
let request = self.build_request(&tool_definitions, &request_context);
|
||||
let stream_outcome = self.stream_response(request, current_llm_call).await?;
|
||||
let stream_outcome = self
|
||||
.stream_response(request, current_turn, current_llm_call)
|
||||
.await?;
|
||||
|
||||
for cb in &self.llm_call_end_cbs {
|
||||
cb(current_llm_call);
|
||||
|
|
@ -1312,6 +1332,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
async fn stream_response(
|
||||
&mut self,
|
||||
request: Request,
|
||||
turn: usize,
|
||||
llm_call: usize,
|
||||
) -> Result<StreamCompletion, WorkerError> {
|
||||
debug!(
|
||||
|
|
@ -1349,6 +1370,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
});
|
||||
}
|
||||
};
|
||||
self.emit_stream_event(turn, llm_call, &event);
|
||||
self.timeline.dispatch(&event);
|
||||
}
|
||||
None => break,
|
||||
|
|
@ -1441,6 +1463,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
llm_call_end_cbs: Vec::new(),
|
||||
llm_retry_cbs: Vec::new(),
|
||||
llm_continuation_cbs: Vec::new(),
|
||||
stream_event_cbs: Vec::new(),
|
||||
warning_cbs: Vec::new(),
|
||||
tool_result_cbs: Vec::new(),
|
||||
history_append_cbs: Vec::new(),
|
||||
|
|
@ -1703,6 +1726,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
llm_retry_cbs: self.llm_retry_cbs,
|
||||
llm_continuation_cbs: self.llm_continuation_cbs,
|
||||
stream_event_cbs: self.stream_event_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
@ -1793,6 +1817,7 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
llm_retry_cbs: self.llm_retry_cbs,
|
||||
llm_continuation_cbs: self.llm_continuation_cbs,
|
||||
stream_event_cbs: self.stream_event_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ use crate::defaults;
|
|||
use crate::model::{AuthRef, ModelManifest, ReasoningControl};
|
||||
use crate::{
|
||||
CompactionConfig, FileUploadLimits, MemoryConfig, PodManifest, PodMeta, ScopeConfig,
|
||||
SkillsConfig, ToolOutputLimits, ToolPermissionConfig, ToolPermissionRule, WorkerManifest,
|
||||
SessionConfig, SkillsConfig, ToolOutputLimits, ToolPermissionConfig, ToolPermissionRule,
|
||||
WorkerManifest,
|
||||
};
|
||||
|
||||
/// Partial-form Pod manifest. Every field is optional; one or more
|
||||
|
|
@ -37,6 +38,8 @@ pub struct PodManifestConfig {
|
|||
pub worker: WorkerManifestConfig,
|
||||
#[serde(default)]
|
||||
pub scope: ScopeConfig,
|
||||
#[serde(default)]
|
||||
pub session: Option<SessionConfigPartial>,
|
||||
/// Optional `[permissions]` section. `None` means the permission layer
|
||||
/// is disabled; `Some` requires `default_action` during final resolve.
|
||||
#[serde(default)]
|
||||
|
|
@ -102,6 +105,12 @@ pub struct FileUploadLimitsPartial {
|
|||
pub max_bytes: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct SessionConfigPartial {
|
||||
#[serde(default)]
|
||||
pub record_event_trace: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct PermissionConfigPartial {
|
||||
#[serde(default)]
|
||||
|
|
@ -260,6 +269,7 @@ impl PodManifestConfig {
|
|||
model: self.model.merge(upper.model),
|
||||
worker: self.worker.merge(upper.worker),
|
||||
scope: merge_scope(self.scope, upper.scope),
|
||||
session: merge_option(self.session, upper.session, SessionConfigPartial::merge),
|
||||
permissions: merge_option(
|
||||
self.permissions,
|
||||
upper.permissions,
|
||||
|
|
@ -353,6 +363,14 @@ impl FileUploadLimitsPartial {
|
|||
}
|
||||
}
|
||||
|
||||
impl SessionConfigPartial {
|
||||
fn merge(self, upper: Self) -> Self {
|
||||
Self {
|
||||
record_event_trace: upper.record_event_trace.or(self.record_event_trace),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PermissionConfigPartial {
|
||||
fn merge(mut self, upper: Self) -> Self {
|
||||
self.rules.extend(upper.rules);
|
||||
|
|
@ -496,6 +514,12 @@ impl TryFrom<PodManifestConfig> for PodManifest {
|
|||
for rule in &cfg.scope.deny {
|
||||
ensure_absolute("scope.deny.target", &rule.target)?;
|
||||
}
|
||||
let session = SessionConfig {
|
||||
record_event_trace: cfg
|
||||
.session
|
||||
.and_then(|s| s.record_event_trace)
|
||||
.unwrap_or(false),
|
||||
};
|
||||
|
||||
let permissions = cfg
|
||||
.permissions
|
||||
|
|
@ -550,6 +574,7 @@ impl TryFrom<PodManifestConfig> for PodManifest {
|
|||
model: cfg.model,
|
||||
worker,
|
||||
scope: cfg.scope,
|
||||
session,
|
||||
permissions,
|
||||
compaction,
|
||||
memory: cfg.memory,
|
||||
|
|
@ -596,6 +621,7 @@ mod tests {
|
|||
deny: Vec::new(),
|
||||
},
|
||||
permissions: None,
|
||||
session: None,
|
||||
compaction: None,
|
||||
memory: None,
|
||||
skills: None,
|
||||
|
|
@ -610,6 +636,17 @@ mod tests {
|
|||
assert!(manifest.permissions.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_session_record_event_trace() {
|
||||
let mut cfg = minimal_valid();
|
||||
cfg.session = Some(SessionConfigPartial {
|
||||
record_event_trace: Some(true),
|
||||
});
|
||||
|
||||
let manifest: PodManifest = cfg.try_into().unwrap();
|
||||
assert!(manifest.session.record_event_trace);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_permissions_requires_default_action_when_present() {
|
||||
let mut cfg = minimal_valid();
|
||||
|
|
|
|||
|
|
@ -37,6 +37,9 @@ pub struct PodManifest {
|
|||
pub model: ModelManifest,
|
||||
pub worker: WorkerManifest,
|
||||
pub scope: ScopeConfig,
|
||||
/// Session/debug persistence settings. Defaults keep extra traces off.
|
||||
#[serde(default)]
|
||||
pub session: SessionConfig,
|
||||
/// Optional manifest-level tool permission policy. Absent means the
|
||||
/// permission layer is disabled and tool calls run as before.
|
||||
#[serde(default)]
|
||||
|
|
@ -300,6 +303,15 @@ pub struct ScopeConfig {
|
|||
pub deny: Vec<ScopeRule>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
||||
pub struct SessionConfig {
|
||||
/// Persist every provider stream event directly to `trace.jsonl` next to the
|
||||
/// segment log. Intended for debugging stalls between stream requests; off
|
||||
/// by default because it can be verbose.
|
||||
#[serde(default)]
|
||||
pub record_event_trace: bool,
|
||||
}
|
||||
|
||||
/// Manifest-level pattern-based tool permission policy.
|
||||
///
|
||||
/// Presence of `[permissions]` enables this layer. Rules are evaluated
|
||||
|
|
|
|||
|
|
@ -167,6 +167,15 @@ where
|
|||
self.sink.publish(entry);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Append a debug trace record alongside the current segment log. Trace
|
||||
/// writes deliberately do not affect the segment entry counter or live
|
||||
/// replay sink because they are not conversation history.
|
||||
pub fn append_trace(&self, entry: &session_store::TraceEntry) -> Result<(), StoreError> {
|
||||
let loc = self.state.location();
|
||||
self.store
|
||||
.append_trace(loc.session_id, loc.segment_id, entry)
|
||||
}
|
||||
}
|
||||
|
||||
/// Type-erased commit handle for the interceptor. Lets the
|
||||
|
|
@ -502,6 +511,20 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
warn!(error = %err, "history append commit failed; dropping");
|
||||
}
|
||||
});
|
||||
if self.manifest.session.record_event_trace {
|
||||
let writer = self.log_writer_handle();
|
||||
self.worker_mut()
|
||||
.on_stream_event(move |turn, _llm_call, event| {
|
||||
let entry = session_store::TraceEntry {
|
||||
ts: segment_log::now_millis(),
|
||||
turn,
|
||||
event: event.clone(),
|
||||
};
|
||||
if let Err(err) = writer.append_trace(&entry) {
|
||||
warn!(error = %err, "stream event trace commit failed; dropping");
|
||||
}
|
||||
});
|
||||
}
|
||||
self.history_persistence_wired = true;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user