feat: Invoke marker と LlmCall callback を導入し AgentTurn セマンティクスを明確化
- protocol: InvokeKind enum、Event::InvokeStart / LlmCallStart / LlmCallEnd 追加
- llm-worker: Worker.llm_call_count と on_llm_call_start/end callback、turn_count を AgentTurn 数として doc 更新
- session-store: LogEntry::Invoke { ts, trigger } 追加 (replay は marker のみで state 不変)
- pod: run/run_for_notification 開始時に Invoke marker commit、PendingRun::RunForNotification(InvokeKind) で kind を伝搬
- pod ipc: sink + server で Invoke エントリーを Event::InvokeStart として broadcast
- tui: 新 Event 3種を no-op で受理 (UI 設計はチケット範囲外)
This commit is contained in:
parent
d710cac879
commit
d0dbac109d
|
|
@ -153,14 +153,28 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
||||||
history: Vec<Item>,
|
history: Vec<Item>,
|
||||||
/// History length at lock time (only meaningful in Locked state)
|
/// History length at lock time (only meaningful in Locked state)
|
||||||
locked_prefix_len: usize,
|
locked_prefix_len: usize,
|
||||||
/// Turn count
|
/// AgentTurn count.
|
||||||
|
///
|
||||||
|
/// Once retry (`llm-worker-stream-continuation`) is implemented, an
|
||||||
|
/// AgentTurn collapses N retried `LlmCall`s with identical input;
|
||||||
|
/// today retry is not implemented so AgentTurn and LlmCall fire 1:1
|
||||||
|
/// and the increment site (the LLM-call loop) is shared.
|
||||||
|
/// `max_turns` is interpreted as a per-`run()` AgentTurn cap.
|
||||||
turn_count: usize,
|
turn_count: usize,
|
||||||
/// Maximum number of turns (None = unlimited)
|
/// LlmCall count (per-Worker running counter, monotonic). Unlike
|
||||||
|
/// `turn_count` this never collapses retries.
|
||||||
|
llm_call_count: usize,
|
||||||
|
/// Maximum number of AgentTurns (None = unlimited)
|
||||||
max_turns: Option<u32>,
|
max_turns: Option<u32>,
|
||||||
/// Turn-start callbacks
|
/// AgentTurn-start callbacks (1:1 with LlmCall today)
|
||||||
turn_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
turn_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||||
/// Turn-end callbacks
|
/// AgentTurn-end callbacks (1:1 with LlmCall today)
|
||||||
turn_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
turn_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||||
|
/// LlmCall-start callbacks (per individual LLM generation request,
|
||||||
|
/// retries included once retry lands)
|
||||||
|
llm_call_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||||
|
/// LlmCall-end callbacks
|
||||||
|
llm_call_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||||
/// Non-fatal warning callbacks. Invoked when the Worker wants to
|
/// Non-fatal warning callbacks. Invoked when the Worker wants to
|
||||||
/// surface an advisory message to the upper layer (e.g. Pod) so it
|
/// surface an advisory message to the upper layer (e.g. Pod) so it
|
||||||
/// can be forwarded to the user — distinct from `tracing::warn!`,
|
/// can be forwarded to the user — distinct from `tracing::warn!`,
|
||||||
|
|
@ -315,11 +329,28 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a turn-start callback (receives 0-based turn number).
|
/// Register an AgentTurn-start callback (receives the AgentTurn
|
||||||
|
/// index from `turn_count`).
|
||||||
|
///
|
||||||
|
/// Today fires 1:1 with the per-LLM-call boundary because retry is
|
||||||
|
/// not yet implemented. Once retry lands, this will fire only once
|
||||||
|
/// per AgentTurn (= retried LlmCall group with identical input).
|
||||||
pub fn on_turn_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
pub fn on_turn_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||||
self.turn_start_cbs.push(Box::new(callback));
|
self.turn_start_cbs.push(Box::new(callback));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register an LlmCall-start callback (receives the LlmCall index
|
||||||
|
/// from `llm_call_count`). Fires once per LLM generation request,
|
||||||
|
/// retries included.
|
||||||
|
pub fn on_llm_call_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||||
|
self.llm_call_start_cbs.push(Box::new(callback));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register an LlmCall-end callback.
|
||||||
|
pub fn on_llm_call_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||||
|
self.llm_call_end_cbs.push(Box::new(callback));
|
||||||
|
}
|
||||||
|
|
||||||
/// Register a non-fatal warning callback.
|
/// Register a non-fatal warning callback.
|
||||||
///
|
///
|
||||||
/// The callback is invoked with a short human-readable message
|
/// The callback is invoked with a short human-readable message
|
||||||
|
|
@ -372,7 +403,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a turn-end callback (receives 0-based turn number).
|
/// Register an AgentTurn-end callback. See [`on_turn_start`](Self::on_turn_start)
|
||||||
|
/// for the 1:1-vs-N relation with `LlmCall*`.
|
||||||
pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||||
self.turn_end_cbs.push(Box::new(callback));
|
self.turn_end_cbs.push(Box::new(callback));
|
||||||
}
|
}
|
||||||
|
|
@ -463,11 +495,22 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
||||||
self.system_prompt.as_deref()
|
self.system_prompt.as_deref()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the current turn count
|
/// Get the current AgentTurn count.
|
||||||
|
///
|
||||||
|
/// AgentTurn is a maximal run of LLM generation calls with identical
|
||||||
|
/// input; today retry is unimplemented so this is also the LLM call
|
||||||
|
/// count. Use [`llm_call_count`](Self::llm_call_count) when the
|
||||||
|
/// caller specifically needs the per-LLM-call number.
|
||||||
pub fn turn_count(&self) -> usize {
|
pub fn turn_count(&self) -> usize {
|
||||||
self.turn_count
|
self.turn_count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the current LlmCall count (per-Worker running counter, never
|
||||||
|
/// collapsed by retry).
|
||||||
|
pub fn llm_call_count(&self) -> usize {
|
||||||
|
self.llm_call_count
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a reference to the current request configuration
|
/// Get a reference to the current request configuration
|
||||||
pub fn request_config(&self) -> &RequestConfig {
|
pub fn request_config(&self) -> &RequestConfig {
|
||||||
&self.request_config
|
&self.request_config
|
||||||
|
|
@ -1004,10 +1047,23 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
||||||
PreRequestAction::Continue => {}
|
PreRequestAction::Continue => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LlmCall boundary fires per LLM generation request — today
|
||||||
|
// 1:1 with AgentTurn, but retry (`llm-worker-stream-continuation`)
|
||||||
|
// will multiply this within a single AgentTurn.
|
||||||
|
let current_llm_call = self.llm_call_count;
|
||||||
|
for cb in &self.llm_call_start_cbs {
|
||||||
|
cb(current_llm_call);
|
||||||
|
}
|
||||||
|
|
||||||
// Stream LLM response
|
// Stream LLM response
|
||||||
let request = self.build_request(&tool_definitions, &request_context);
|
let request = self.build_request(&tool_definitions, &request_context);
|
||||||
self.stream_response(request).await?;
|
self.stream_response(request).await?;
|
||||||
|
|
||||||
|
for cb in &self.llm_call_end_cbs {
|
||||||
|
cb(current_llm_call);
|
||||||
|
}
|
||||||
|
self.llm_call_count += 1;
|
||||||
|
|
||||||
for cb in &self.turn_end_cbs {
|
for cb in &self.turn_end_cbs {
|
||||||
cb(current_turn);
|
cb(current_turn);
|
||||||
}
|
}
|
||||||
|
|
@ -1185,9 +1241,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
||||||
history: Vec::new(),
|
history: Vec::new(),
|
||||||
locked_prefix_len: 0,
|
locked_prefix_len: 0,
|
||||||
turn_count: 0,
|
turn_count: 0,
|
||||||
|
llm_call_count: 0,
|
||||||
max_turns: None,
|
max_turns: None,
|
||||||
turn_start_cbs: Vec::new(),
|
turn_start_cbs: Vec::new(),
|
||||||
turn_end_cbs: Vec::new(),
|
turn_end_cbs: Vec::new(),
|
||||||
|
llm_call_start_cbs: Vec::new(),
|
||||||
|
llm_call_end_cbs: Vec::new(),
|
||||||
warning_cbs: Vec::new(),
|
warning_cbs: Vec::new(),
|
||||||
tool_result_cbs: Vec::new(),
|
tool_result_cbs: Vec::new(),
|
||||||
history_append_cbs: Vec::new(),
|
history_append_cbs: Vec::new(),
|
||||||
|
|
@ -1444,9 +1503,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
||||||
history: self.history,
|
history: self.history,
|
||||||
locked_prefix_len,
|
locked_prefix_len,
|
||||||
turn_count: self.turn_count,
|
turn_count: self.turn_count,
|
||||||
|
llm_call_count: self.llm_call_count,
|
||||||
max_turns: self.max_turns,
|
max_turns: self.max_turns,
|
||||||
turn_start_cbs: self.turn_start_cbs,
|
turn_start_cbs: self.turn_start_cbs,
|
||||||
turn_end_cbs: self.turn_end_cbs,
|
turn_end_cbs: self.turn_end_cbs,
|
||||||
|
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||||
|
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||||
warning_cbs: self.warning_cbs,
|
warning_cbs: self.warning_cbs,
|
||||||
tool_result_cbs: self.tool_result_cbs,
|
tool_result_cbs: self.tool_result_cbs,
|
||||||
history_append_cbs: self.history_append_cbs,
|
history_append_cbs: self.history_append_cbs,
|
||||||
|
|
@ -1527,9 +1589,12 @@ impl<C: LlmClient> Worker<C, Locked> {
|
||||||
history: self.history,
|
history: self.history,
|
||||||
locked_prefix_len: 0,
|
locked_prefix_len: 0,
|
||||||
turn_count: self.turn_count,
|
turn_count: self.turn_count,
|
||||||
|
llm_call_count: self.llm_call_count,
|
||||||
max_turns: self.max_turns,
|
max_turns: self.max_turns,
|
||||||
turn_start_cbs: self.turn_start_cbs,
|
turn_start_cbs: self.turn_start_cbs,
|
||||||
turn_end_cbs: self.turn_end_cbs,
|
turn_end_cbs: self.turn_end_cbs,
|
||||||
|
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||||
|
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||||
warning_cbs: self.warning_cbs,
|
warning_cbs: self.warning_cbs,
|
||||||
tool_result_cbs: self.tool_result_cbs,
|
tool_result_cbs: self.tool_result_cbs,
|
||||||
history_append_cbs: self.history_append_cbs,
|
history_append_cbs: self.history_append_cbs,
|
||||||
|
|
|
||||||
|
|
@ -352,14 +352,18 @@ async fn test_turn_count_increment() -> Result<(), WorkerError> {
|
||||||
let worker = Worker::new(client);
|
let worker = Worker::new(client);
|
||||||
|
|
||||||
assert_eq!(worker.turn_count(), 0);
|
assert_eq!(worker.turn_count(), 0);
|
||||||
|
assert_eq!(worker.llm_call_count(), 0);
|
||||||
|
|
||||||
// First run consumes Mutable, returns RunOutput
|
// First run consumes Mutable, returns RunOutput
|
||||||
let mut worker = worker.run("First").await?.worker;
|
let mut worker = worker.run("First").await?.worker;
|
||||||
assert_eq!(worker.turn_count(), 1);
|
assert_eq!(worker.turn_count(), 1);
|
||||||
|
// Retry not yet implemented → AgentTurn:LlmCall is 1:1.
|
||||||
|
assert_eq!(worker.llm_call_count(), 1);
|
||||||
|
|
||||||
// Subsequent runs on Locked take &mut self
|
// Subsequent runs on Locked take &mut self
|
||||||
worker.run("Second").await?;
|
worker.run("Second").await?;
|
||||||
assert_eq!(worker.turn_count(), 2);
|
assert_eq!(worker.turn_count(), 2);
|
||||||
|
assert_eq!(worker.llm_call_count(), 2);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,11 @@ async fn finish_controller_run<C, St>(
|
||||||
enum PendingRun {
|
enum PendingRun {
|
||||||
Run(Vec<Segment>),
|
Run(Vec<Segment>),
|
||||||
InterruptAndRun(Vec<Segment>),
|
InterruptAndRun(Vec<Segment>),
|
||||||
RunForNotification,
|
/// Self-initiated turn kicked from the notify buffer. The carried
|
||||||
|
/// `InvokeKind` is the trigger that flipped the Pod from IDLE
|
||||||
|
/// (Notify or PodEvent) and is recorded by the Invoke marker
|
||||||
|
/// committed at the start of `pod.run_for_notification`.
|
||||||
|
RunForNotification(protocol::InvokeKind),
|
||||||
Resume,
|
Resume,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -109,7 +113,7 @@ impl PendingRun {
|
||||||
fn is_parent_originated(&self) -> bool {
|
fn is_parent_originated(&self) -> bool {
|
||||||
match self {
|
match self {
|
||||||
PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true,
|
PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true,
|
||||||
PendingRun::RunForNotification => false,
|
PendingRun::RunForNotification(_) => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -299,6 +303,16 @@ fn wire_event_bridges_on_worker<C, St>(
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let tx = event_tx.clone();
|
||||||
|
worker.on_llm_call_start(move |llm_call| {
|
||||||
|
let _ = tx.send(Event::LlmCallStart { llm_call });
|
||||||
|
});
|
||||||
|
|
||||||
|
let tx = event_tx.clone();
|
||||||
|
worker.on_llm_call_end(move |llm_call| {
|
||||||
|
let _ = tx.send(Event::LlmCallEnd { llm_call });
|
||||||
|
});
|
||||||
|
|
||||||
let tx = event_tx.clone();
|
let tx = event_tx.clone();
|
||||||
worker.on_text_block(move |block| {
|
worker.on_text_block(move |block| {
|
||||||
let tx_d = tx.clone();
|
let tx_d = tx.clone();
|
||||||
|
|
@ -551,9 +565,9 @@ async fn controller_loop<C, St>(
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
PendingRun::RunForNotification => {
|
PendingRun::RunForNotification(kind) => {
|
||||||
drive_turn(
|
drive_turn(
|
||||||
pod.run_for_notification(),
|
pod.run_for_notification(kind),
|
||||||
&mut method_rx,
|
&mut method_rx,
|
||||||
&event_tx,
|
&event_tx,
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
|
|
@ -643,7 +657,9 @@ async fn controller_loop<C, St>(
|
||||||
// sees the buffered notification(s) without a human
|
// sees the buffered notification(s) without a human
|
||||||
// Run.
|
// Run.
|
||||||
if shared_state.get_status() == PodStatus::Idle {
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
pending = Some(PendingRun::RunForNotification);
|
pending = Some(PendingRun::RunForNotification(
|
||||||
|
protocol::InvokeKind::Notify,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -711,7 +727,9 @@ async fn controller_loop<C, St>(
|
||||||
// notification is not stranded. Matches the
|
// notification is not stranded. Matches the
|
||||||
// `Method::Notify` idle path.
|
// `Method::Notify` idle path.
|
||||||
if shared_state.get_status() == PodStatus::Idle {
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
pending = Some(PendingRun::RunForNotification);
|
pending = Some(PendingRun::RunForNotification(
|
||||||
|
protocol::InvokeKind::PodEvent,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -960,7 +978,9 @@ mod tests {
|
||||||
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
|
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
|
||||||
assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated());
|
assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated());
|
||||||
assert!(PendingRun::Resume.is_parent_originated());
|
assert!(PendingRun::Resume.is_parent_originated());
|
||||||
assert!(!PendingRun::RunForNotification.is_parent_originated());
|
assert!(
|
||||||
|
!PendingRun::RunForNotification(protocol::InvokeKind::Notify).is_parent_originated()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DriveTurnEnv {
|
struct DriveTurnEnv {
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,9 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
||||||
.expect("SystemItem is Serialize");
|
.expect("SystemItem is Serialize");
|
||||||
Some(Event::SystemItem { item: value })
|
Some(Event::SystemItem { item: value })
|
||||||
}
|
}
|
||||||
|
session_store::LogEntry::Invoke { trigger, .. } => {
|
||||||
|
Some(Event::InvokeStart { kind: trigger })
|
||||||
|
}
|
||||||
other => {
|
other => {
|
||||||
// `SessionLogSink::is_live_relevant` keeps
|
// `SessionLogSink::is_live_relevant` keeps
|
||||||
// non-live-relevant variants off the
|
// non-live-relevant variants off the
|
||||||
|
|
|
||||||
|
|
@ -1144,10 +1144,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
|
|
||||||
self.prepare_for_run().await?;
|
self.prepare_for_run().await?;
|
||||||
|
|
||||||
|
// IDLE → active marker. Commits first so the next UserInput entry
|
||||||
|
// is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`.
|
||||||
|
self.session_id = self.session_head.lock().session_id;
|
||||||
|
self.commit_entry(LogEntry::Invoke {
|
||||||
|
ts: session_log::now_millis(),
|
||||||
|
trigger: protocol::InvokeKind::UserSend,
|
||||||
|
})?;
|
||||||
|
|
||||||
// Persist the user input as typed segments before the worker
|
// Persist the user input as typed segments before the worker
|
||||||
// pushes its flattened copy into history. save_delta deliberately
|
// pushes its flattened copy into history. save_delta deliberately
|
||||||
// skips the resulting `is_user_message()` item to avoid double-write.
|
// skips the resulting `is_user_message()` item to avoid double-write.
|
||||||
self.session_id = self.session_head.lock().session_id;
|
|
||||||
self.commit_entry(LogEntry::UserInput {
|
self.commit_entry(LogEntry::UserInput {
|
||||||
ts: session_log::now_millis(),
|
ts: session_log::now_millis(),
|
||||||
segments: input.clone(),
|
segments: input.clone(),
|
||||||
|
|
@ -1482,9 +1489,31 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
/// `Item::system_message` into the per-request context, then the
|
/// `Item::system_message` into the per-request context, then the
|
||||||
/// Worker's resume path issues the LLM request without a new
|
/// Worker's resume path issues the LLM request without a new
|
||||||
/// user turn.
|
/// user turn.
|
||||||
pub async fn run_for_notification(&mut self) -> Result<PodRunResult, PodError> {
|
pub async fn run_for_notification(
|
||||||
|
&mut self,
|
||||||
|
kind: protocol::InvokeKind,
|
||||||
|
) -> Result<PodRunResult, PodError> {
|
||||||
|
debug_assert!(
|
||||||
|
matches!(
|
||||||
|
kind,
|
||||||
|
protocol::InvokeKind::Notify
|
||||||
|
| protocol::InvokeKind::PodEvent
|
||||||
|
| protocol::InvokeKind::SystemReminder
|
||||||
|
| protocol::InvokeKind::Wakeup
|
||||||
|
),
|
||||||
|
"run_for_notification expects a non-UserSend InvokeKind; got {kind:?}"
|
||||||
|
);
|
||||||
self.prepare_for_run().await?;
|
self.prepare_for_run().await?;
|
||||||
|
|
||||||
|
// IDLE → active marker for the buffered notification / pod-event
|
||||||
|
// drain. The trailing SystemItem entries (drained by the
|
||||||
|
// PodInterceptor) carry the actual payload.
|
||||||
|
self.session_id = self.session_head.lock().session_id;
|
||||||
|
self.commit_entry(LogEntry::Invoke {
|
||||||
|
ts: session_log::now_millis(),
|
||||||
|
trigger: kind,
|
||||||
|
})?;
|
||||||
|
|
||||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||||
|
|
||||||
let worker = self.worker.take().expect("worker taken during run");
|
let worker = self.worker.take().expect("worker taken during run");
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,7 @@ impl SessionLogSink {
|
||||||
/// lane does not cover:
|
/// lane does not cover:
|
||||||
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
|
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
|
||||||
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
||||||
|
/// - `LogEntry::Invoke` → `Event::InvokeStart`.
|
||||||
/// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd,
|
/// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd,
|
||||||
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
||||||
/// reflected in the mirror so reconnect snapshots stay accurate,
|
/// reflected in the mirror so reconnect snapshots stay accurate,
|
||||||
|
|
@ -118,7 +119,9 @@ impl SessionLogSink {
|
||||||
fn is_live_relevant(entry: &LogEntry) -> bool {
|
fn is_live_relevant(entry: &LogEntry) -> bool {
|
||||||
matches!(
|
matches!(
|
||||||
entry,
|
entry,
|
||||||
LogEntry::SessionStart { .. } | LogEntry::SystemItem { .. }
|
LogEntry::SessionStart { .. }
|
||||||
|
| LogEntry::SystemItem { .. }
|
||||||
|
| LogEntry::Invoke { .. }
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -209,7 +209,8 @@ pub enum Event {
|
||||||
/// submitting clients would see tool calls and assistant text
|
/// submitting clients would see tool calls and assistant text
|
||||||
/// appear without any preceding user message.
|
/// appear without any preceding user message.
|
||||||
///
|
///
|
||||||
/// Fires exactly once per accepted `Method::Run`, before
|
/// Fires exactly once per accepted `Method::Run`, after
|
||||||
|
/// `InvokeStart { kind: UserSend }` and before the first
|
||||||
/// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit.
|
/// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit.
|
||||||
UserMessage {
|
UserMessage {
|
||||||
segments: Vec<Segment>,
|
segments: Vec<Segment>,
|
||||||
|
|
@ -231,13 +232,57 @@ pub enum Event {
|
||||||
SystemItem {
|
SystemItem {
|
||||||
item: serde_json::Value,
|
item: serde_json::Value,
|
||||||
},
|
},
|
||||||
|
/// A new self-driving cycle has begun (IDLE → active transition).
|
||||||
|
///
|
||||||
|
/// Marker event for the start of an Invoke range; the range extends
|
||||||
|
/// implicitly until the next `InvokeStart`. Fires for every accepted
|
||||||
|
/// `Method::Run` (kind=`UserSend`), `Method::Notify` (kind=`Notify`),
|
||||||
|
/// `Method::PodEvent` re-injection (kind=`PodEvent`), and any other
|
||||||
|
/// IDLE-breaking trigger. Mid-run interrupts (e.g. hook output,
|
||||||
|
/// `<system-reminder>` injection that doesn't break IDLE) do not
|
||||||
|
/// emit `InvokeStart` — they appear as `SystemItem` only.
|
||||||
|
///
|
||||||
|
/// Carries `kind` only; the payload (user text / notify message /
|
||||||
|
/// pod event body) is delivered separately via the immediately
|
||||||
|
/// following `UserMessage` / `SystemItem` event.
|
||||||
|
InvokeStart {
|
||||||
|
kind: InvokeKind,
|
||||||
|
},
|
||||||
|
/// One AgentTurn boundary opened. An AgentTurn is a maximal run of
|
||||||
|
/// LLM generation calls whose input messages are identical (i.e.
|
||||||
|
/// retries from network errors / 5xx / stream disconnects collapse
|
||||||
|
/// into the same AgentTurn). When the input changes (a new tool
|
||||||
|
/// result lands, a user interrupts, etc.), the next LLM call belongs
|
||||||
|
/// to a new AgentTurn.
|
||||||
|
///
|
||||||
|
/// `turn` is the AgentTurn index from `Worker::turn_count`.
|
||||||
|
///
|
||||||
|
/// Currently retry is not yet implemented (`llm-worker-stream-continuation`)
|
||||||
|
/// so AgentTurn and `LlmCall` fire 1:1, but the contract here is
|
||||||
|
/// the AgentTurn boundary; consumers that want per-LLM-call signals
|
||||||
|
/// must subscribe to `LlmCallStart` / `LlmCallEnd` instead.
|
||||||
TurnStart {
|
TurnStart {
|
||||||
turn: usize,
|
turn: usize,
|
||||||
},
|
},
|
||||||
|
/// AgentTurn closed.
|
||||||
TurnEnd {
|
TurnEnd {
|
||||||
turn: usize,
|
turn: usize,
|
||||||
result: TurnResult,
|
result: TurnResult,
|
||||||
},
|
},
|
||||||
|
/// One LLM generation call started (1 request → 1 generation, retry
|
||||||
|
/// included). Multiple `LlmCall*` pairs may fire inside a single
|
||||||
|
/// `TurnStart` / `TurnEnd` pair when a request is retried; today
|
||||||
|
/// they fire 1:1 because retry is not implemented.
|
||||||
|
///
|
||||||
|
/// `llm_call` is the worker-wide running counter from
|
||||||
|
/// `Worker::llm_call_count`.
|
||||||
|
LlmCallStart {
|
||||||
|
llm_call: usize,
|
||||||
|
},
|
||||||
|
/// LLM generation call ended.
|
||||||
|
LlmCallEnd {
|
||||||
|
llm_call: usize,
|
||||||
|
},
|
||||||
TextDelta {
|
TextDelta {
|
||||||
text: String,
|
text: String,
|
||||||
},
|
},
|
||||||
|
|
@ -466,6 +511,31 @@ pub enum TurnResult {
|
||||||
Paused,
|
Paused,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Kind of trigger that opened a new Invoke (IDLE → active) range.
|
||||||
|
///
|
||||||
|
/// One Invoke groups all entries from this trigger up to the next
|
||||||
|
/// `Invoke` marker. The kind is the only payload — content (user text,
|
||||||
|
/// notify message, pod event body) is delivered by the immediately
|
||||||
|
/// following Turn entry, not by the marker itself.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum InvokeKind {
|
||||||
|
/// `Method::Run` — a user submission.
|
||||||
|
UserSend,
|
||||||
|
/// `Method::Notify` — free-text notification injected into history.
|
||||||
|
Notify,
|
||||||
|
/// `Method::PodEvent` — typed lifecycle report from a child Pod.
|
||||||
|
PodEvent,
|
||||||
|
/// `<system-reminder>` etc. that crosses an IDLE boundary (mid-run
|
||||||
|
/// reminders that don't break IDLE are SystemItem-only and do not
|
||||||
|
/// open a new Invoke).
|
||||||
|
SystemReminder,
|
||||||
|
/// Cron / RemoteTrigger style scheduled wake-up. Reserved; no
|
||||||
|
/// producer is wired today but the variant is part of the initial
|
||||||
|
/// set so future schedulers don't have to amend the wire enum.
|
||||||
|
Wakeup,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum RunResult {
|
pub enum RunResult {
|
||||||
|
|
@ -678,6 +748,59 @@ mod tests {
|
||||||
assert_eq!(parsed["data"]["result"], "limit_reached");
|
assert_eq!(parsed["data"]["result"], "limit_reached");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_invoke_start_roundtrip() {
|
||||||
|
for kind in [
|
||||||
|
InvokeKind::UserSend,
|
||||||
|
InvokeKind::Notify,
|
||||||
|
InvokeKind::PodEvent,
|
||||||
|
InvokeKind::SystemReminder,
|
||||||
|
InvokeKind::Wakeup,
|
||||||
|
] {
|
||||||
|
let event = Event::InvokeStart { kind };
|
||||||
|
let json = serde_json::to_string(&event).unwrap();
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(parsed["event"], "invoke_start");
|
||||||
|
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
Event::InvokeStart { kind: k } => assert_eq!(k, kind),
|
||||||
|
other => panic!("expected InvokeStart, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(
|
||||||
|
&serde_json::to_string(&Event::InvokeStart {
|
||||||
|
kind: InvokeKind::UserSend,
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(parsed["data"]["kind"], "user_send");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_llm_call_start_end_roundtrip() {
|
||||||
|
for event in [
|
||||||
|
Event::LlmCallStart { llm_call: 0 },
|
||||||
|
Event::LlmCallEnd { llm_call: 7 },
|
||||||
|
] {
|
||||||
|
let json = serde_json::to_string(&event).unwrap();
|
||||||
|
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||||
|
match (&event, &decoded) {
|
||||||
|
(Event::LlmCallStart { llm_call: a }, Event::LlmCallStart { llm_call: b })
|
||||||
|
| (Event::LlmCallEnd { llm_call: a }, Event::LlmCallEnd { llm_call: b }) => {
|
||||||
|
assert_eq!(a, b);
|
||||||
|
}
|
||||||
|
_ => panic!("variant mismatch: {event:?} vs {decoded:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(
|
||||||
|
&serde_json::to_string(&Event::LlmCallStart { llm_call: 3 }).unwrap(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(parsed["event"], "llm_call_start");
|
||||||
|
assert_eq!(parsed["data"]["llm_call"], 3);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn method_notify_json_roundtrip() {
|
fn method_notify_json_roundtrip() {
|
||||||
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||||
use llm_worker::{UsageRecord, WorkerResult};
|
use llm_worker::{UsageRecord, WorkerResult};
|
||||||
use protocol::{ScopeRule, Segment};
|
use protocol::{InvokeKind, ScopeRule, Segment};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
|
|
@ -91,8 +91,13 @@ pub struct HashedEntry {
|
||||||
///
|
///
|
||||||
/// Variants correspond to specific mutation points in `Worker`:
|
/// Variants correspond to specific mutation points in `Worker`:
|
||||||
/// - `SessionStart` — always the first entry; captures initial state
|
/// - `SessionStart` — always the first entry; captures initial state
|
||||||
|
/// - `Invoke` — IDLE → active marker (start of a new self-driving cycle)
|
||||||
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
|
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
|
||||||
/// - `TurnEnd` — turn boundary marker
|
/// - `TurnEnd` — AgentTurn boundary marker; carries the post-increment
|
||||||
|
/// `turn_count`. With retry unimplemented today this fires once per
|
||||||
|
/// `run()`/`resume()` (current callers persist a single TurnEnd at
|
||||||
|
/// run completion); the fork-point seq for `at_turn_index` is the
|
||||||
|
/// preceding `Invoke` entry, not the TurnEnd.
|
||||||
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
||||||
/// - `ConfigChanged` — `RequestConfig` mutation
|
/// - `ConfigChanged` — `RequestConfig` mutation
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|
@ -113,6 +118,23 @@ pub enum LogEntry {
|
||||||
compacted_from: Option<SessionOrigin>,
|
compacted_from: Option<SessionOrigin>,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// IDLE → active marker. Records the start of a new self-driving
|
||||||
|
/// cycle (Invoke range). The range extends implicitly until the
|
||||||
|
/// next `Invoke` entry; this entry carries the trigger only — the
|
||||||
|
/// actual payload (user text / notify message / pod event body) is
|
||||||
|
/// in the immediately following Turn entry (`UserInput` / `SystemItem`).
|
||||||
|
///
|
||||||
|
/// Used by `pod-session-fork` style operations: the fork-point seq
|
||||||
|
/// (`at_turn_index` in persistence-semantics) points at one of these
|
||||||
|
/// `Invoke` entries so "back to N-th send" maps cleanly to the
|
||||||
|
/// IDLE-break boundary the user sees.
|
||||||
|
///
|
||||||
|
/// Field name is `trigger` (not `kind`) because the LogEntry
|
||||||
|
/// serde tag already occupies `"kind"`.
|
||||||
|
///
|
||||||
|
/// Marker only — replay does not mutate `RestoredState`.
|
||||||
|
Invoke { ts: u64, trigger: InvokeKind },
|
||||||
|
|
||||||
/// User input accepted at submit time. Carries the original typed
|
/// User input accepted at submit time. Carries the original typed
|
||||||
/// `Vec<Segment>` so clients can re-render typed atoms (paste chips,
|
/// `Vec<Segment>` so clients can re-render typed atoms (paste chips,
|
||||||
/// file/knowledge refs, workflow invocations) on session restore.
|
/// file/knowledge refs, workflow invocations) on session restore.
|
||||||
|
|
@ -290,6 +312,11 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
|
||||||
state.config = config.clone();
|
state.config = config.clone();
|
||||||
state.history = history.iter().cloned().map(Item::from).collect();
|
state.history = history.iter().cloned().map(Item::from).collect();
|
||||||
}
|
}
|
||||||
|
LogEntry::Invoke { .. } => {
|
||||||
|
// Marker only; no state mutation. The trailing
|
||||||
|
// UserInput / SystemItem / TurnEnd entries carry all
|
||||||
|
// replay-relevant data.
|
||||||
|
}
|
||||||
LogEntry::UserInput { segments, .. } => {
|
LogEntry::UserInput { segments, .. } => {
|
||||||
let text = Segment::flatten_to_text(segments);
|
let text = Segment::flatten_to_text(segments);
|
||||||
state.history.push(Item::user_message(text));
|
state.history.push(Item::user_message(text));
|
||||||
|
|
@ -655,6 +682,59 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn invoke_entry_round_trip_via_json() {
|
||||||
|
let entry = LogEntry::Invoke {
|
||||||
|
ts: 12345,
|
||||||
|
trigger: InvokeKind::UserSend,
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&entry).unwrap();
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(parsed["kind"], "invoke");
|
||||||
|
assert_eq!(parsed["trigger"], "user_send");
|
||||||
|
let decoded: LogEntry = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
LogEntry::Invoke { ts, trigger } => {
|
||||||
|
assert_eq!(ts, 12345);
|
||||||
|
assert_eq!(trigger, InvokeKind::UserSend);
|
||||||
|
}
|
||||||
|
other => panic!("expected Invoke, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn replay_invoke_marker_does_not_mutate_state() {
|
||||||
|
let entries = build_chain(&[
|
||||||
|
LogEntry::SessionStart {
|
||||||
|
ts: 0,
|
||||||
|
system_prompt: None,
|
||||||
|
config: RequestConfig::default(),
|
||||||
|
history: vec![],
|
||||||
|
forked_from: None,
|
||||||
|
compacted_from: None,
|
||||||
|
},
|
||||||
|
LogEntry::Invoke {
|
||||||
|
ts: 100,
|
||||||
|
trigger: InvokeKind::UserSend,
|
||||||
|
},
|
||||||
|
LogEntry::UserInput {
|
||||||
|
ts: 101,
|
||||||
|
segments: vec![Segment::text("hi")],
|
||||||
|
},
|
||||||
|
LogEntry::TurnEnd {
|
||||||
|
ts: 200,
|
||||||
|
turn_count: 1,
|
||||||
|
},
|
||||||
|
LogEntry::Invoke {
|
||||||
|
ts: 300,
|
||||||
|
trigger: InvokeKind::Notify,
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
let state = collect_state(&entries);
|
||||||
|
assert_eq!(state.history.len(), 1);
|
||||||
|
assert_eq!(state.turn_count, 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn replay_extension_collects_domain_payload_pairs() {
|
fn replay_extension_collects_domain_payload_pairs() {
|
||||||
let entries = build_chain(&[
|
let entries = build_chain(&[
|
||||||
|
|
|
||||||
|
|
@ -498,6 +498,13 @@ impl App {
|
||||||
self.current_tool = None;
|
self.current_tool = None;
|
||||||
self.assistant_streaming = false;
|
self.assistant_streaming = false;
|
||||||
}
|
}
|
||||||
|
// UI consumers of Invoke / LlmCall semantics are out of scope
|
||||||
|
// for `tickets/invoke-turn-llmcall-semantics.md`; events flow
|
||||||
|
// through to subscribers but the TUI currently derives its
|
||||||
|
// turn header from `UserMessage` / `SystemItem` arrivals.
|
||||||
|
Event::InvokeStart { .. }
|
||||||
|
| Event::LlmCallStart { .. }
|
||||||
|
| Event::LlmCallEnd { .. } => {}
|
||||||
Event::TextDelta { text } => {
|
Event::TextDelta { text } => {
|
||||||
self.append_assistant_text(&text);
|
self.append_assistant_text(&text);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user