feat: Invoke marker と LlmCall callback を導入し AgentTurn セマンティクスを明確化
- protocol: InvokeKind enum、Event::InvokeStart / LlmCallStart / LlmCallEnd 追加
- llm-worker: Worker.llm_call_count と on_llm_call_start/end callback、turn_count を AgentTurn 数として doc 更新
- session-store: LogEntry::Invoke { ts, trigger } 追加 (replay は marker のみで state 不変)
- pod: run/run_for_notification 開始時に Invoke marker commit、PendingRun::RunForNotification(InvokeKind) で kind を伝搬
- pod ipc: sink + server で Invoke エントリーを Event::InvokeStart として broadcast
- tui: 新 Event 3種を no-op で受理 (UI 設計はチケット範囲外)
This commit is contained in:
parent
fd8526799b
commit
79b8336a14
|
|
@ -153,14 +153,28 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
history: Vec<Item>,
|
||||
/// History length at lock time (only meaningful in Locked state)
|
||||
locked_prefix_len: usize,
|
||||
/// Turn count
|
||||
/// AgentTurn count.
|
||||
///
|
||||
/// Once retry (`llm-worker-stream-continuation`) is implemented, an
|
||||
/// AgentTurn collapses N retried `LlmCall`s with identical input;
|
||||
/// today retry is not implemented so AgentTurn and LlmCall fire 1:1
|
||||
/// and the increment site (the LLM-call loop) is shared.
|
||||
/// `max_turns` is interpreted as a per-`run()` AgentTurn cap.
|
||||
turn_count: usize,
|
||||
/// Maximum number of turns (None = unlimited)
|
||||
/// LlmCall count (per-Worker running counter, monotonic). Unlike
|
||||
/// `turn_count` this never collapses retries.
|
||||
llm_call_count: usize,
|
||||
/// Maximum number of AgentTurns (None = unlimited)
|
||||
max_turns: Option<u32>,
|
||||
/// Turn-start callbacks
|
||||
/// AgentTurn-start callbacks (1:1 with LlmCall today)
|
||||
turn_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// Turn-end callbacks
|
||||
/// AgentTurn-end callbacks (1:1 with LlmCall today)
|
||||
turn_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// LlmCall-start callbacks (per individual LLM generation request,
|
||||
/// retries included once retry lands)
|
||||
llm_call_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// LlmCall-end callbacks
|
||||
llm_call_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// Non-fatal warning callbacks. Invoked when the Worker wants to
|
||||
/// surface an advisory message to the upper layer (e.g. Pod) so it
|
||||
/// can be forwarded to the user — distinct from `tracing::warn!`,
|
||||
|
|
@ -315,11 +329,28 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
});
|
||||
}
|
||||
|
||||
/// Register a turn-start callback (receives 0-based turn number).
|
||||
/// Register an AgentTurn-start callback (receives the AgentTurn
|
||||
/// index from `turn_count`).
|
||||
///
|
||||
/// Today fires 1:1 with the per-LLM-call boundary because retry is
|
||||
/// not yet implemented. Once retry lands, this will fire only once
|
||||
/// per AgentTurn (= retried LlmCall group with identical input).
|
||||
pub fn on_turn_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.turn_start_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register an LlmCall-start callback (receives the LlmCall index
|
||||
/// from `llm_call_count`). Fires once per LLM generation request,
|
||||
/// retries included.
|
||||
pub fn on_llm_call_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.llm_call_start_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register an LlmCall-end callback.
|
||||
pub fn on_llm_call_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.llm_call_end_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register a non-fatal warning callback.
|
||||
///
|
||||
/// The callback is invoked with a short human-readable message
|
||||
|
|
@ -372,7 +403,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Register a turn-end callback (receives 0-based turn number).
|
||||
/// Register an AgentTurn-end callback. See [`on_turn_start`](Self::on_turn_start)
|
||||
/// for the 1:1-vs-N relation with `LlmCall*`.
|
||||
pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.turn_end_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
|
@ -463,11 +495,22 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.system_prompt.as_deref()
|
||||
}
|
||||
|
||||
/// Get the current turn count
|
||||
/// Get the current AgentTurn count.
|
||||
///
|
||||
/// AgentTurn is a maximal run of LLM generation calls with identical
|
||||
/// input; today retry is unimplemented so this is also the LLM call
|
||||
/// count. Use [`llm_call_count`](Self::llm_call_count) when the
|
||||
/// caller specifically needs the per-LLM-call number.
|
||||
pub fn turn_count(&self) -> usize {
|
||||
self.turn_count
|
||||
}
|
||||
|
||||
/// Get the current LlmCall count (per-Worker running counter, never
|
||||
/// collapsed by retry).
|
||||
pub fn llm_call_count(&self) -> usize {
|
||||
self.llm_call_count
|
||||
}
|
||||
|
||||
/// Get a reference to the current request configuration
|
||||
pub fn request_config(&self) -> &RequestConfig {
|
||||
&self.request_config
|
||||
|
|
@ -1004,10 +1047,23 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
PreRequestAction::Continue => {}
|
||||
}
|
||||
|
||||
// LlmCall boundary fires per LLM generation request — today
|
||||
// 1:1 with AgentTurn, but retry (`llm-worker-stream-continuation`)
|
||||
// will multiply this within a single AgentTurn.
|
||||
let current_llm_call = self.llm_call_count;
|
||||
for cb in &self.llm_call_start_cbs {
|
||||
cb(current_llm_call);
|
||||
}
|
||||
|
||||
// Stream LLM response
|
||||
let request = self.build_request(&tool_definitions, &request_context);
|
||||
self.stream_response(request).await?;
|
||||
|
||||
for cb in &self.llm_call_end_cbs {
|
||||
cb(current_llm_call);
|
||||
}
|
||||
self.llm_call_count += 1;
|
||||
|
||||
for cb in &self.turn_end_cbs {
|
||||
cb(current_turn);
|
||||
}
|
||||
|
|
@ -1185,9 +1241,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
history: Vec::new(),
|
||||
locked_prefix_len: 0,
|
||||
turn_count: 0,
|
||||
llm_call_count: 0,
|
||||
max_turns: None,
|
||||
turn_start_cbs: Vec::new(),
|
||||
turn_end_cbs: Vec::new(),
|
||||
llm_call_start_cbs: Vec::new(),
|
||||
llm_call_end_cbs: Vec::new(),
|
||||
warning_cbs: Vec::new(),
|
||||
tool_result_cbs: Vec::new(),
|
||||
history_append_cbs: Vec::new(),
|
||||
|
|
@ -1444,9 +1503,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
history: self.history,
|
||||
locked_prefix_len,
|
||||
turn_count: self.turn_count,
|
||||
llm_call_count: self.llm_call_count,
|
||||
max_turns: self.max_turns,
|
||||
turn_start_cbs: self.turn_start_cbs,
|
||||
turn_end_cbs: self.turn_end_cbs,
|
||||
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
@ -1527,9 +1589,12 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
history: self.history,
|
||||
locked_prefix_len: 0,
|
||||
turn_count: self.turn_count,
|
||||
llm_call_count: self.llm_call_count,
|
||||
max_turns: self.max_turns,
|
||||
turn_start_cbs: self.turn_start_cbs,
|
||||
turn_end_cbs: self.turn_end_cbs,
|
||||
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
|
|||
|
|
@ -352,14 +352,18 @@ async fn test_turn_count_increment() -> Result<(), WorkerError> {
|
|||
let worker = Worker::new(client);
|
||||
|
||||
assert_eq!(worker.turn_count(), 0);
|
||||
assert_eq!(worker.llm_call_count(), 0);
|
||||
|
||||
// First run consumes Mutable, returns RunOutput
|
||||
let mut worker = worker.run("First").await?.worker;
|
||||
assert_eq!(worker.turn_count(), 1);
|
||||
// Retry not yet implemented → AgentTurn:LlmCall is 1:1.
|
||||
assert_eq!(worker.llm_call_count(), 1);
|
||||
|
||||
// Subsequent runs on Locked take &mut self
|
||||
worker.run("Second").await?;
|
||||
assert_eq!(worker.turn_count(), 2);
|
||||
assert_eq!(worker.llm_call_count(), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,7 +95,11 @@ async fn finish_controller_run<C, St>(
|
|||
enum PendingRun {
|
||||
Run(Vec<Segment>),
|
||||
InterruptAndRun(Vec<Segment>),
|
||||
RunForNotification,
|
||||
/// Self-initiated turn kicked from the notify buffer. The carried
|
||||
/// `InvokeKind` is the trigger that flipped the Pod from IDLE
|
||||
/// (Notify or PodEvent) and is recorded by the Invoke marker
|
||||
/// committed at the start of `pod.run_for_notification`.
|
||||
RunForNotification(protocol::InvokeKind),
|
||||
Resume,
|
||||
}
|
||||
|
||||
|
|
@ -109,7 +113,7 @@ impl PendingRun {
|
|||
fn is_parent_originated(&self) -> bool {
|
||||
match self {
|
||||
PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true,
|
||||
PendingRun::RunForNotification => false,
|
||||
PendingRun::RunForNotification(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -299,6 +303,16 @@ fn wire_event_bridges_on_worker<C, St>(
|
|||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_llm_call_start(move |llm_call| {
|
||||
let _ = tx.send(Event::LlmCallStart { llm_call });
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_llm_call_end(move |llm_call| {
|
||||
let _ = tx.send(Event::LlmCallEnd { llm_call });
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_text_block(move |block| {
|
||||
let tx_d = tx.clone();
|
||||
|
|
@ -551,9 +565,9 @@ async fn controller_loop<C, St>(
|
|||
)
|
||||
.await
|
||||
}
|
||||
PendingRun::RunForNotification => {
|
||||
PendingRun::RunForNotification(kind) => {
|
||||
drive_turn(
|
||||
pod.run_for_notification(),
|
||||
pod.run_for_notification(kind),
|
||||
&mut method_rx,
|
||||
&event_tx,
|
||||
&cancel_tx,
|
||||
|
|
@ -643,7 +657,9 @@ async fn controller_loop<C, St>(
|
|||
// sees the buffered notification(s) without a human
|
||||
// Run.
|
||||
if shared_state.get_status() == PodStatus::Idle {
|
||||
pending = Some(PendingRun::RunForNotification);
|
||||
pending = Some(PendingRun::RunForNotification(
|
||||
protocol::InvokeKind::Notify,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -711,7 +727,9 @@ async fn controller_loop<C, St>(
|
|||
// notification is not stranded. Matches the
|
||||
// `Method::Notify` idle path.
|
||||
if shared_state.get_status() == PodStatus::Idle {
|
||||
pending = Some(PendingRun::RunForNotification);
|
||||
pending = Some(PendingRun::RunForNotification(
|
||||
protocol::InvokeKind::PodEvent,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -960,7 +978,9 @@ mod tests {
|
|||
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
|
||||
assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated());
|
||||
assert!(PendingRun::Resume.is_parent_originated());
|
||||
assert!(!PendingRun::RunForNotification.is_parent_originated());
|
||||
assert!(
|
||||
!PendingRun::RunForNotification(protocol::InvokeKind::Notify).is_parent_originated()
|
||||
);
|
||||
}
|
||||
|
||||
struct DriveTurnEnv {
|
||||
|
|
|
|||
|
|
@ -115,6 +115,9 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
|||
.expect("SystemItem is Serialize");
|
||||
Some(Event::SystemItem { item: value })
|
||||
}
|
||||
session_store::LogEntry::Invoke { trigger, .. } => {
|
||||
Some(Event::InvokeStart { kind: trigger })
|
||||
}
|
||||
other => {
|
||||
// `SessionLogSink::is_live_relevant` keeps
|
||||
// non-live-relevant variants off the
|
||||
|
|
|
|||
|
|
@ -1144,10 +1144,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
// IDLE → active marker. Commits first so the next UserInput entry
|
||||
// is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::Invoke {
|
||||
ts: session_log::now_millis(),
|
||||
trigger: protocol::InvokeKind::UserSend,
|
||||
})?;
|
||||
|
||||
// Persist the user input as typed segments before the worker
|
||||
// pushes its flattened copy into history. save_delta deliberately
|
||||
// skips the resulting `is_user_message()` item to avoid double-write.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::UserInput {
|
||||
ts: session_log::now_millis(),
|
||||
segments: input.clone(),
|
||||
|
|
@ -1482,9 +1489,31 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// `Item::system_message` into the per-request context, then the
|
||||
/// Worker's resume path issues the LLM request without a new
|
||||
/// user turn.
|
||||
pub async fn run_for_notification(&mut self) -> Result<PodRunResult, PodError> {
|
||||
pub async fn run_for_notification(
|
||||
&mut self,
|
||||
kind: protocol::InvokeKind,
|
||||
) -> Result<PodRunResult, PodError> {
|
||||
debug_assert!(
|
||||
matches!(
|
||||
kind,
|
||||
protocol::InvokeKind::Notify
|
||||
| protocol::InvokeKind::PodEvent
|
||||
| protocol::InvokeKind::SystemReminder
|
||||
| protocol::InvokeKind::Wakeup
|
||||
),
|
||||
"run_for_notification expects a non-UserSend InvokeKind; got {kind:?}"
|
||||
);
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
// IDLE → active marker for the buffered notification / pod-event
|
||||
// drain. The trailing SystemItem entries (drained by the
|
||||
// PodInterceptor) carry the actual payload.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::Invoke {
|
||||
ts: session_log::now_millis(),
|
||||
trigger: kind,
|
||||
})?;
|
||||
|
||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||
|
||||
let worker = self.worker.take().expect("worker taken during run");
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ impl SessionLogSink {
|
|||
/// lane does not cover:
|
||||
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
|
||||
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
||||
/// - `LogEntry::Invoke` → `Event::InvokeStart`.
|
||||
/// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd,
|
||||
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
||||
/// reflected in the mirror so reconnect snapshots stay accurate,
|
||||
|
|
@ -118,7 +119,9 @@ impl SessionLogSink {
|
|||
fn is_live_relevant(entry: &LogEntry) -> bool {
|
||||
matches!(
|
||||
entry,
|
||||
LogEntry::SessionStart { .. } | LogEntry::SystemItem { .. }
|
||||
LogEntry::SessionStart { .. }
|
||||
| LogEntry::SystemItem { .. }
|
||||
| LogEntry::Invoke { .. }
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -209,7 +209,8 @@ pub enum Event {
|
|||
/// submitting clients would see tool calls and assistant text
|
||||
/// appear without any preceding user message.
|
||||
///
|
||||
/// Fires exactly once per accepted `Method::Run`, before
|
||||
/// Fires exactly once per accepted `Method::Run`, after
|
||||
/// `InvokeStart { kind: UserSend }` and before the first
|
||||
/// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit.
|
||||
UserMessage {
|
||||
segments: Vec<Segment>,
|
||||
|
|
@ -231,13 +232,57 @@ pub enum Event {
|
|||
SystemItem {
|
||||
item: serde_json::Value,
|
||||
},
|
||||
/// A new self-driving cycle has begun (IDLE → active transition).
|
||||
///
|
||||
/// Marker event for the start of an Invoke range; the range extends
|
||||
/// implicitly until the next `InvokeStart`. Fires for every accepted
|
||||
/// `Method::Run` (kind=`UserSend`), `Method::Notify` (kind=`Notify`),
|
||||
/// `Method::PodEvent` re-injection (kind=`PodEvent`), and any other
|
||||
/// IDLE-breaking trigger. Mid-run interrupts (e.g. hook output,
|
||||
/// `<system-reminder>` injection that doesn't break IDLE) do not
|
||||
/// emit `InvokeStart` — they appear as `SystemItem` only.
|
||||
///
|
||||
/// Carries `kind` only; the payload (user text / notify message /
|
||||
/// pod event body) is delivered separately via the immediately
|
||||
/// following `UserMessage` / `SystemItem` event.
|
||||
InvokeStart {
|
||||
kind: InvokeKind,
|
||||
},
|
||||
/// One AgentTurn boundary opened. An AgentTurn is a maximal run of
|
||||
/// LLM generation calls whose input messages are identical (i.e.
|
||||
/// retries from network errors / 5xx / stream disconnects collapse
|
||||
/// into the same AgentTurn). When the input changes (a new tool
|
||||
/// result lands, a user interrupts, etc.), the next LLM call belongs
|
||||
/// to a new AgentTurn.
|
||||
///
|
||||
/// `turn` is the AgentTurn index from `Worker::turn_count`.
|
||||
///
|
||||
/// Currently retry is not yet implemented (`llm-worker-stream-continuation`)
|
||||
/// so AgentTurn and `LlmCall` fire 1:1, but the contract here is
|
||||
/// the AgentTurn boundary; consumers that want per-LLM-call signals
|
||||
/// must subscribe to `LlmCallStart` / `LlmCallEnd` instead.
|
||||
TurnStart {
|
||||
turn: usize,
|
||||
},
|
||||
/// AgentTurn closed.
|
||||
TurnEnd {
|
||||
turn: usize,
|
||||
result: TurnResult,
|
||||
},
|
||||
/// One LLM generation call started (1 request → 1 generation, retry
|
||||
/// included). Multiple `LlmCall*` pairs may fire inside a single
|
||||
/// `TurnStart` / `TurnEnd` pair when a request is retried; today
|
||||
/// they fire 1:1 because retry is not implemented.
|
||||
///
|
||||
/// `llm_call` is the worker-wide running counter from
|
||||
/// `Worker::llm_call_count`.
|
||||
LlmCallStart {
|
||||
llm_call: usize,
|
||||
},
|
||||
/// LLM generation call ended.
|
||||
LlmCallEnd {
|
||||
llm_call: usize,
|
||||
},
|
||||
TextDelta {
|
||||
text: String,
|
||||
},
|
||||
|
|
@ -466,6 +511,31 @@ pub enum TurnResult {
|
|||
Paused,
|
||||
}
|
||||
|
||||
/// Kind of trigger that opened a new Invoke (IDLE → active) range.
|
||||
///
|
||||
/// One Invoke groups all entries from this trigger up to the next
|
||||
/// `Invoke` marker. The kind is the only payload — content (user text,
|
||||
/// notify message, pod event body) is delivered by the immediately
|
||||
/// following Turn entry, not by the marker itself.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum InvokeKind {
|
||||
/// `Method::Run` — a user submission.
|
||||
UserSend,
|
||||
/// `Method::Notify` — free-text notification injected into history.
|
||||
Notify,
|
||||
/// `Method::PodEvent` — typed lifecycle report from a child Pod.
|
||||
PodEvent,
|
||||
/// `<system-reminder>` etc. that crosses an IDLE boundary (mid-run
|
||||
/// reminders that don't break IDLE are SystemItem-only and do not
|
||||
/// open a new Invoke).
|
||||
SystemReminder,
|
||||
/// Cron / RemoteTrigger style scheduled wake-up. Reserved; no
|
||||
/// producer is wired today but the variant is part of the initial
|
||||
/// set so future schedulers don't have to amend the wire enum.
|
||||
Wakeup,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum RunResult {
|
||||
|
|
@ -678,6 +748,59 @@ mod tests {
|
|||
assert_eq!(parsed["data"]["result"], "limit_reached");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_invoke_start_roundtrip() {
|
||||
for kind in [
|
||||
InvokeKind::UserSend,
|
||||
InvokeKind::Notify,
|
||||
InvokeKind::PodEvent,
|
||||
InvokeKind::SystemReminder,
|
||||
InvokeKind::Wakeup,
|
||||
] {
|
||||
let event = Event::InvokeStart { kind };
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["event"], "invoke_start");
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
Event::InvokeStart { kind: k } => assert_eq!(k, kind),
|
||||
other => panic!("expected InvokeStart, got {other:?}"),
|
||||
}
|
||||
}
|
||||
let parsed: serde_json::Value = serde_json::from_str(
|
||||
&serde_json::to_string(&Event::InvokeStart {
|
||||
kind: InvokeKind::UserSend,
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(parsed["data"]["kind"], "user_send");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_llm_call_start_end_roundtrip() {
|
||||
for event in [
|
||||
Event::LlmCallStart { llm_call: 0 },
|
||||
Event::LlmCallEnd { llm_call: 7 },
|
||||
] {
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
match (&event, &decoded) {
|
||||
(Event::LlmCallStart { llm_call: a }, Event::LlmCallStart { llm_call: b })
|
||||
| (Event::LlmCallEnd { llm_call: a }, Event::LlmCallEnd { llm_call: b }) => {
|
||||
assert_eq!(a, b);
|
||||
}
|
||||
_ => panic!("variant mismatch: {event:?} vs {decoded:?}"),
|
||||
}
|
||||
}
|
||||
let parsed: serde_json::Value = serde_json::from_str(
|
||||
&serde_json::to_string(&Event::LlmCallStart { llm_call: 3 }).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(parsed["event"], "llm_call_start");
|
||||
assert_eq!(parsed["data"]["llm_call"], 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn method_notify_json_roundtrip() {
|
||||
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use llm_worker::{UsageRecord, WorkerResult};
|
||||
use protocol::{ScopeRule, Segment};
|
||||
use protocol::{InvokeKind, ScopeRule, Segment};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
|
|
@ -91,8 +91,13 @@ pub struct HashedEntry {
|
|||
///
|
||||
/// Variants correspond to specific mutation points in `Worker`:
|
||||
/// - `SessionStart` — always the first entry; captures initial state
|
||||
/// - `Invoke` — IDLE → active marker (start of a new self-driving cycle)
|
||||
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
|
||||
/// - `TurnEnd` — turn boundary marker
|
||||
/// - `TurnEnd` — AgentTurn boundary marker; carries the post-increment
|
||||
/// `turn_count`. With retry unimplemented today this fires once per
|
||||
/// `run()`/`resume()` (current callers persist a single TurnEnd at
|
||||
/// run completion); the fork-point seq for `at_turn_index` is the
|
||||
/// preceding `Invoke` entry, not the TurnEnd.
|
||||
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
||||
/// - `ConfigChanged` — `RequestConfig` mutation
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -113,6 +118,23 @@ pub enum LogEntry {
|
|||
compacted_from: Option<SessionOrigin>,
|
||||
},
|
||||
|
||||
/// IDLE → active marker. Records the start of a new self-driving
|
||||
/// cycle (Invoke range). The range extends implicitly until the
|
||||
/// next `Invoke` entry; this entry carries the trigger only — the
|
||||
/// actual payload (user text / notify message / pod event body) is
|
||||
/// in the immediately following Turn entry (`UserInput` / `SystemItem`).
|
||||
///
|
||||
/// Used by `pod-session-fork` style operations: the fork-point seq
|
||||
/// (`at_turn_index` in persistence-semantics) points at one of these
|
||||
/// `Invoke` entries so "back to N-th send" maps cleanly to the
|
||||
/// IDLE-break boundary the user sees.
|
||||
///
|
||||
/// Field name is `trigger` (not `kind`) because the LogEntry
|
||||
/// serde tag already occupies `"kind"`.
|
||||
///
|
||||
/// Marker only — replay does not mutate `RestoredState`.
|
||||
Invoke { ts: u64, trigger: InvokeKind },
|
||||
|
||||
/// User input accepted at submit time. Carries the original typed
|
||||
/// `Vec<Segment>` so clients can re-render typed atoms (paste chips,
|
||||
/// file/knowledge refs, workflow invocations) on session restore.
|
||||
|
|
@ -290,6 +312,11 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
|
|||
state.config = config.clone();
|
||||
state.history = history.iter().cloned().map(Item::from).collect();
|
||||
}
|
||||
LogEntry::Invoke { .. } => {
|
||||
// Marker only; no state mutation. The trailing
|
||||
// UserInput / SystemItem / TurnEnd entries carry all
|
||||
// replay-relevant data.
|
||||
}
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = Segment::flatten_to_text(segments);
|
||||
state.history.push(Item::user_message(text));
|
||||
|
|
@ -655,6 +682,59 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invoke_entry_round_trip_via_json() {
|
||||
let entry = LogEntry::Invoke {
|
||||
ts: 12345,
|
||||
trigger: InvokeKind::UserSend,
|
||||
};
|
||||
let json = serde_json::to_string(&entry).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["kind"], "invoke");
|
||||
assert_eq!(parsed["trigger"], "user_send");
|
||||
let decoded: LogEntry = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
LogEntry::Invoke { ts, trigger } => {
|
||||
assert_eq!(ts, 12345);
|
||||
assert_eq!(trigger, InvokeKind::UserSend);
|
||||
}
|
||||
other => panic!("expected Invoke, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_invoke_marker_does_not_mutate_state() {
|
||||
let entries = build_chain(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 0,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
LogEntry::Invoke {
|
||||
ts: 100,
|
||||
trigger: InvokeKind::UserSend,
|
||||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 101,
|
||||
segments: vec![Segment::text("hi")],
|
||||
},
|
||||
LogEntry::TurnEnd {
|
||||
ts: 200,
|
||||
turn_count: 1,
|
||||
},
|
||||
LogEntry::Invoke {
|
||||
ts: 300,
|
||||
trigger: InvokeKind::Notify,
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.history.len(), 1);
|
||||
assert_eq!(state.turn_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_extension_collects_domain_payload_pairs() {
|
||||
let entries = build_chain(&[
|
||||
|
|
|
|||
|
|
@ -498,6 +498,13 @@ impl App {
|
|||
self.current_tool = None;
|
||||
self.assistant_streaming = false;
|
||||
}
|
||||
// UI consumers of Invoke / LlmCall semantics are out of scope
|
||||
// for `tickets/invoke-turn-llmcall-semantics.md`; events flow
|
||||
// through to subscribers but the TUI currently derives its
|
||||
// turn header from `UserMessage` / `SystemItem` arrivals.
|
||||
Event::InvokeStart { .. }
|
||||
| Event::LlmCallStart { .. }
|
||||
| Event::LlmCallEnd { .. } => {}
|
||||
Event::TextDelta { text } => {
|
||||
self.append_assistant_text(&text);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user