Compare commits
4 Commits
e00e284d8c
...
be5e413b55
| Author | SHA1 | Date | |
|---|---|---|---|
| be5e413b55 | |||
| 58c2a51ae1 | |||
| 9304b52f17 | |||
| d0dbac109d |
1
TODO.md
1
TODO.md
|
|
@ -9,7 +9,6 @@
|
|||
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
|
||||
- Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md)
|
||||
- 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md)
|
||||
- Invoke / Turn / LlmCall セマンティクス整理 → [tickets/invoke-turn-llmcall-semantics.md](tickets/invoke-turn-llmcall-semantics.md)
|
||||
- llm-worker のエラー耐性
|
||||
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
|
||||
- llm-worker: history append を callback 経由の単一経路に閉じる → [tickets/worker-history-append-contract.md](tickets/worker-history-append-contract.md)
|
||||
|
|
|
|||
|
|
@ -153,14 +153,28 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
history: Vec<Item>,
|
||||
/// History length at lock time (only meaningful in Locked state)
|
||||
locked_prefix_len: usize,
|
||||
/// Turn count
|
||||
/// AgentTurn count.
|
||||
///
|
||||
/// Once retry (`llm-worker-stream-continuation`) is implemented, an
|
||||
/// AgentTurn collapses N retried `LlmCall`s with identical input;
|
||||
/// today retry is not implemented so AgentTurn and LlmCall fire 1:1
|
||||
/// and the increment site (the LLM-call loop) is shared.
|
||||
/// `max_turns` is interpreted as a per-`run()` AgentTurn cap.
|
||||
turn_count: usize,
|
||||
/// Maximum number of turns (None = unlimited)
|
||||
/// LlmCall count (per-Worker running counter, monotonic). Unlike
|
||||
/// `turn_count` this never collapses retries.
|
||||
llm_call_count: usize,
|
||||
/// Maximum number of AgentTurns (None = unlimited)
|
||||
max_turns: Option<u32>,
|
||||
/// Turn-start callbacks
|
||||
/// AgentTurn-start callbacks (1:1 with LlmCall today)
|
||||
turn_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// Turn-end callbacks
|
||||
/// AgentTurn-end callbacks (1:1 with LlmCall today)
|
||||
turn_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// LlmCall-start callbacks (per individual LLM generation request,
|
||||
/// retries included once retry lands)
|
||||
llm_call_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// LlmCall-end callbacks
|
||||
llm_call_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// Non-fatal warning callbacks. Invoked when the Worker wants to
|
||||
/// surface an advisory message to the upper layer (e.g. Pod) so it
|
||||
/// can be forwarded to the user — distinct from `tracing::warn!`,
|
||||
|
|
@ -315,11 +329,28 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
});
|
||||
}
|
||||
|
||||
/// Register a turn-start callback (receives 0-based turn number).
|
||||
/// Register an AgentTurn-start callback (receives the AgentTurn
|
||||
/// index from `turn_count`).
|
||||
///
|
||||
/// Today fires 1:1 with the per-LLM-call boundary because retry is
|
||||
/// not yet implemented. Once retry lands, this will fire only once
|
||||
/// per AgentTurn (= retried LlmCall group with identical input).
|
||||
pub fn on_turn_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.turn_start_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register an LlmCall-start callback (receives the LlmCall index
|
||||
/// from `llm_call_count`). Fires once per LLM generation request,
|
||||
/// retries included.
|
||||
pub fn on_llm_call_start(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.llm_call_start_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register an LlmCall-end callback.
|
||||
pub fn on_llm_call_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.llm_call_end_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register a non-fatal warning callback.
|
||||
///
|
||||
/// The callback is invoked with a short human-readable message
|
||||
|
|
@ -372,7 +403,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Register a turn-end callback (receives 0-based turn number).
|
||||
/// Register an AgentTurn-end callback. See [`on_turn_start`](Self::on_turn_start)
|
||||
/// for the 1:1-vs-N relation with `LlmCall*`.
|
||||
pub fn on_turn_end(&mut self, callback: impl Fn(usize) + Send + Sync + 'static) {
|
||||
self.turn_end_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
|
@ -463,11 +495,22 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.system_prompt.as_deref()
|
||||
}
|
||||
|
||||
/// Get the current turn count
|
||||
/// Get the current AgentTurn count.
|
||||
///
|
||||
/// AgentTurn is a maximal run of LLM generation calls with identical
|
||||
/// input; today retry is unimplemented so this is also the LLM call
|
||||
/// count. Use [`llm_call_count`](Self::llm_call_count) when the
|
||||
/// caller specifically needs the per-LLM-call number.
|
||||
pub fn turn_count(&self) -> usize {
|
||||
self.turn_count
|
||||
}
|
||||
|
||||
/// Get the current LlmCall count (per-Worker running counter, never
|
||||
/// collapsed by retry).
|
||||
pub fn llm_call_count(&self) -> usize {
|
||||
self.llm_call_count
|
||||
}
|
||||
|
||||
/// Get a reference to the current request configuration
|
||||
pub fn request_config(&self) -> &RequestConfig {
|
||||
&self.request_config
|
||||
|
|
@ -1004,10 +1047,23 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
PreRequestAction::Continue => {}
|
||||
}
|
||||
|
||||
// LlmCall boundary fires per LLM generation request — today
|
||||
// 1:1 with AgentTurn, but retry (`llm-worker-stream-continuation`)
|
||||
// will multiply this within a single AgentTurn.
|
||||
let current_llm_call = self.llm_call_count;
|
||||
for cb in &self.llm_call_start_cbs {
|
||||
cb(current_llm_call);
|
||||
}
|
||||
|
||||
// Stream LLM response
|
||||
let request = self.build_request(&tool_definitions, &request_context);
|
||||
self.stream_response(request).await?;
|
||||
|
||||
for cb in &self.llm_call_end_cbs {
|
||||
cb(current_llm_call);
|
||||
}
|
||||
self.llm_call_count += 1;
|
||||
|
||||
for cb in &self.turn_end_cbs {
|
||||
cb(current_turn);
|
||||
}
|
||||
|
|
@ -1185,9 +1241,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
history: Vec::new(),
|
||||
locked_prefix_len: 0,
|
||||
turn_count: 0,
|
||||
llm_call_count: 0,
|
||||
max_turns: None,
|
||||
turn_start_cbs: Vec::new(),
|
||||
turn_end_cbs: Vec::new(),
|
||||
llm_call_start_cbs: Vec::new(),
|
||||
llm_call_end_cbs: Vec::new(),
|
||||
warning_cbs: Vec::new(),
|
||||
tool_result_cbs: Vec::new(),
|
||||
history_append_cbs: Vec::new(),
|
||||
|
|
@ -1444,9 +1503,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
history: self.history,
|
||||
locked_prefix_len,
|
||||
turn_count: self.turn_count,
|
||||
llm_call_count: self.llm_call_count,
|
||||
max_turns: self.max_turns,
|
||||
turn_start_cbs: self.turn_start_cbs,
|
||||
turn_end_cbs: self.turn_end_cbs,
|
||||
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
@ -1527,9 +1589,12 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
history: self.history,
|
||||
locked_prefix_len: 0,
|
||||
turn_count: self.turn_count,
|
||||
llm_call_count: self.llm_call_count,
|
||||
max_turns: self.max_turns,
|
||||
turn_start_cbs: self.turn_start_cbs,
|
||||
turn_end_cbs: self.turn_end_cbs,
|
||||
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
|
|||
|
|
@ -352,14 +352,18 @@ async fn test_turn_count_increment() -> Result<(), WorkerError> {
|
|||
let worker = Worker::new(client);
|
||||
|
||||
assert_eq!(worker.turn_count(), 0);
|
||||
assert_eq!(worker.llm_call_count(), 0);
|
||||
|
||||
// First run consumes Mutable, returns RunOutput
|
||||
let mut worker = worker.run("First").await?.worker;
|
||||
assert_eq!(worker.turn_count(), 1);
|
||||
// Retry not yet implemented → AgentTurn:LlmCall is 1:1.
|
||||
assert_eq!(worker.llm_call_count(), 1);
|
||||
|
||||
// Subsequent runs on Locked take &mut self
|
||||
worker.run("Second").await?;
|
||||
assert_eq!(worker.turn_count(), 2);
|
||||
assert_eq!(worker.llm_call_count(), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,7 +94,11 @@ async fn finish_controller_run<C, St>(
|
|||
/// `pod.run_for_notification()` drains the NotifyBuffer on its own.
|
||||
enum PendingRun {
|
||||
Run(Vec<Segment>),
|
||||
RunForNotification,
|
||||
/// Self-initiated turn kicked from the notify buffer. The carried
|
||||
/// `InvokeKind` is the trigger that flipped the Pod from IDLE
|
||||
/// (Notify or PodEvent) and is recorded by the Invoke marker
|
||||
/// committed at the start of `pod.run_for_notification`.
|
||||
RunForNotification(protocol::InvokeKind),
|
||||
Resume,
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +112,7 @@ impl PendingRun {
|
|||
fn is_parent_originated(&self) -> bool {
|
||||
match self {
|
||||
PendingRun::Run(_) | PendingRun::Resume => true,
|
||||
PendingRun::RunForNotification => false,
|
||||
PendingRun::RunForNotification(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -298,6 +302,16 @@ fn wire_event_bridges_on_worker<C, St>(
|
|||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_llm_call_start(move |llm_call| {
|
||||
let _ = tx.send(Event::LlmCallStart { llm_call });
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_llm_call_end(move |llm_call| {
|
||||
let _ = tx.send(Event::LlmCallEnd { llm_call });
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_text_block(move |block| {
|
||||
let tx_d = tx.clone();
|
||||
|
|
@ -535,9 +549,9 @@ async fn controller_loop<C, St>(
|
|||
)
|
||||
.await
|
||||
}
|
||||
PendingRun::RunForNotification => {
|
||||
PendingRun::RunForNotification(kind) => {
|
||||
drive_turn(
|
||||
pod.run_for_notification(),
|
||||
pod.run_for_notification(kind),
|
||||
&mut method_rx,
|
||||
&event_tx,
|
||||
&cancel_tx,
|
||||
|
|
@ -625,7 +639,9 @@ async fn controller_loop<C, St>(
|
|||
// sees the buffered notification(s) without a human
|
||||
// Run.
|
||||
if shared_state.get_status() == PodStatus::Idle {
|
||||
pending = Some(PendingRun::RunForNotification);
|
||||
pending = Some(PendingRun::RunForNotification(
|
||||
protocol::InvokeKind::Notify,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -693,7 +709,9 @@ async fn controller_loop<C, St>(
|
|||
// notification is not stranded. Matches the
|
||||
// `Method::Notify` idle path.
|
||||
if shared_state.get_status() == PodStatus::Idle {
|
||||
pending = Some(PendingRun::RunForNotification);
|
||||
pending = Some(PendingRun::RunForNotification(
|
||||
protocol::InvokeKind::PodEvent,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -941,7 +959,9 @@ mod tests {
|
|||
fn pending_run_parent_origin_table() {
|
||||
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
|
||||
assert!(PendingRun::Resume.is_parent_originated());
|
||||
assert!(!PendingRun::RunForNotification.is_parent_originated());
|
||||
assert!(
|
||||
!PendingRun::RunForNotification(protocol::InvokeKind::Notify).is_parent_originated()
|
||||
);
|
||||
}
|
||||
|
||||
struct DriveTurnEnv {
|
||||
|
|
|
|||
|
|
@ -115,6 +115,9 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
|||
.expect("SystemItem is Serialize");
|
||||
Some(Event::SystemItem { item: value })
|
||||
}
|
||||
session_store::LogEntry::Invoke { trigger, .. } => {
|
||||
Some(Event::InvokeStart { kind: trigger })
|
||||
}
|
||||
other => {
|
||||
// `SessionLogSink::is_live_relevant` keeps
|
||||
// non-live-relevant variants off the
|
||||
|
|
|
|||
|
|
@ -1158,10 +1158,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
// IDLE → active marker. Commits first so the next UserInput entry
|
||||
// is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::Invoke {
|
||||
ts: session_log::now_millis(),
|
||||
trigger: protocol::InvokeKind::UserSend,
|
||||
})?;
|
||||
|
||||
// Persist the user input as typed segments before the worker
|
||||
// pushes its flattened copy into history. save_delta deliberately
|
||||
// skips the resulting `is_user_message()` item to avoid double-write.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::UserInput {
|
||||
ts: session_log::now_millis(),
|
||||
segments: input.clone(),
|
||||
|
|
@ -1525,9 +1532,31 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// `Item::system_message` into the per-request context, then the
|
||||
/// Worker's resume path issues the LLM request without a new
|
||||
/// user turn.
|
||||
pub async fn run_for_notification(&mut self) -> Result<PodRunResult, PodError> {
|
||||
pub async fn run_for_notification(
|
||||
&mut self,
|
||||
kind: protocol::InvokeKind,
|
||||
) -> Result<PodRunResult, PodError> {
|
||||
debug_assert!(
|
||||
matches!(
|
||||
kind,
|
||||
protocol::InvokeKind::Notify
|
||||
| protocol::InvokeKind::PodEvent
|
||||
| protocol::InvokeKind::SystemReminder
|
||||
| protocol::InvokeKind::Wakeup
|
||||
),
|
||||
"run_for_notification expects a non-UserSend InvokeKind; got {kind:?}"
|
||||
);
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
// IDLE → active marker for the buffered notification / pod-event
|
||||
// drain. The trailing SystemItem entries (drained by the
|
||||
// PodInterceptor) carry the actual payload.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::Invoke {
|
||||
ts: session_log::now_millis(),
|
||||
trigger: kind,
|
||||
})?;
|
||||
|
||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||
|
||||
let worker = self.worker.take().expect("worker taken during run");
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ impl SessionLogSink {
|
|||
/// lane does not cover:
|
||||
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
|
||||
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
||||
/// - `LogEntry::Invoke` → `Event::InvokeStart`.
|
||||
/// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd,
|
||||
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
||||
/// reflected in the mirror so reconnect snapshots stay accurate,
|
||||
|
|
@ -118,7 +119,9 @@ impl SessionLogSink {
|
|||
fn is_live_relevant(entry: &LogEntry) -> bool {
|
||||
matches!(
|
||||
entry,
|
||||
LogEntry::SessionStart { .. } | LogEntry::SystemItem { .. }
|
||||
LogEntry::SessionStart { .. }
|
||||
| LogEntry::SystemItem { .. }
|
||||
| LogEntry::Invoke { .. }
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -209,7 +209,8 @@ pub enum Event {
|
|||
/// submitting clients would see tool calls and assistant text
|
||||
/// appear without any preceding user message.
|
||||
///
|
||||
/// Fires exactly once per accepted `Method::Run`, before
|
||||
/// Fires exactly once per accepted `Method::Run`, after
|
||||
/// `InvokeStart { kind: UserSend }` and before the first
|
||||
/// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit.
|
||||
UserMessage {
|
||||
segments: Vec<Segment>,
|
||||
|
|
@ -231,13 +232,57 @@ pub enum Event {
|
|||
SystemItem {
|
||||
item: serde_json::Value,
|
||||
},
|
||||
/// A new self-driving cycle has begun (IDLE → active transition).
|
||||
///
|
||||
/// Marker event for the start of an Invoke range; the range extends
|
||||
/// implicitly until the next `InvokeStart`. Fires for every accepted
|
||||
/// `Method::Run` (kind=`UserSend`), `Method::Notify` (kind=`Notify`),
|
||||
/// `Method::PodEvent` re-injection (kind=`PodEvent`), and any other
|
||||
/// IDLE-breaking trigger. Mid-run interrupts (e.g. hook output,
|
||||
/// `<system-reminder>` injection that doesn't break IDLE) do not
|
||||
/// emit `InvokeStart` — they appear as `SystemItem` only.
|
||||
///
|
||||
/// Carries `kind` only; the payload (user text / notify message /
|
||||
/// pod event body) is delivered separately via the immediately
|
||||
/// following `UserMessage` / `SystemItem` event.
|
||||
InvokeStart {
|
||||
kind: InvokeKind,
|
||||
},
|
||||
/// One AgentTurn boundary opened. An AgentTurn is a maximal run of
|
||||
/// LLM generation calls whose input messages are identical (i.e.
|
||||
/// retries from network errors / 5xx / stream disconnects collapse
|
||||
/// into the same AgentTurn). When the input changes (a new tool
|
||||
/// result lands, a user interrupts, etc.), the next LLM call belongs
|
||||
/// to a new AgentTurn.
|
||||
///
|
||||
/// `turn` is the AgentTurn index from `Worker::turn_count`.
|
||||
///
|
||||
/// Currently retry is not yet implemented (`llm-worker-stream-continuation`)
|
||||
/// so AgentTurn and `LlmCall` fire 1:1, but the contract here is
|
||||
/// the AgentTurn boundary; consumers that want per-LLM-call signals
|
||||
/// must subscribe to `LlmCallStart` / `LlmCallEnd` instead.
|
||||
TurnStart {
|
||||
turn: usize,
|
||||
},
|
||||
/// AgentTurn closed.
|
||||
TurnEnd {
|
||||
turn: usize,
|
||||
result: TurnResult,
|
||||
},
|
||||
/// One LLM generation call started (1 request → 1 generation, retry
|
||||
/// included). Multiple `LlmCall*` pairs may fire inside a single
|
||||
/// `TurnStart` / `TurnEnd` pair when a request is retried; today
|
||||
/// they fire 1:1 because retry is not implemented.
|
||||
///
|
||||
/// `llm_call` is the worker-wide running counter from
|
||||
/// `Worker::llm_call_count`.
|
||||
LlmCallStart {
|
||||
llm_call: usize,
|
||||
},
|
||||
/// LLM generation call ended.
|
||||
LlmCallEnd {
|
||||
llm_call: usize,
|
||||
},
|
||||
TextDelta {
|
||||
text: String,
|
||||
},
|
||||
|
|
@ -466,6 +511,31 @@ pub enum TurnResult {
|
|||
Paused,
|
||||
}
|
||||
|
||||
/// Kind of trigger that opened a new Invoke (IDLE → active) range.
|
||||
///
|
||||
/// One Invoke groups all entries from this trigger up to the next
|
||||
/// `Invoke` marker. The kind is the only payload — content (user text,
|
||||
/// notify message, pod event body) is delivered by the immediately
|
||||
/// following Turn entry, not by the marker itself.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum InvokeKind {
|
||||
/// `Method::Run` — a user submission.
|
||||
UserSend,
|
||||
/// `Method::Notify` — free-text notification injected into history.
|
||||
Notify,
|
||||
/// `Method::PodEvent` — typed lifecycle report from a child Pod.
|
||||
PodEvent,
|
||||
/// `<system-reminder>` etc. that crosses an IDLE boundary (mid-run
|
||||
/// reminders that don't break IDLE are SystemItem-only and do not
|
||||
/// open a new Invoke).
|
||||
SystemReminder,
|
||||
/// Cron / RemoteTrigger style scheduled wake-up. Reserved; no
|
||||
/// producer is wired today but the variant is part of the initial
|
||||
/// set so future schedulers don't have to amend the wire enum.
|
||||
Wakeup,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum RunResult {
|
||||
|
|
@ -678,6 +748,59 @@ mod tests {
|
|||
assert_eq!(parsed["data"]["result"], "limit_reached");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_invoke_start_roundtrip() {
|
||||
for kind in [
|
||||
InvokeKind::UserSend,
|
||||
InvokeKind::Notify,
|
||||
InvokeKind::PodEvent,
|
||||
InvokeKind::SystemReminder,
|
||||
InvokeKind::Wakeup,
|
||||
] {
|
||||
let event = Event::InvokeStart { kind };
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["event"], "invoke_start");
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
Event::InvokeStart { kind: k } => assert_eq!(k, kind),
|
||||
other => panic!("expected InvokeStart, got {other:?}"),
|
||||
}
|
||||
}
|
||||
let parsed: serde_json::Value = serde_json::from_str(
|
||||
&serde_json::to_string(&Event::InvokeStart {
|
||||
kind: InvokeKind::UserSend,
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(parsed["data"]["kind"], "user_send");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_llm_call_start_end_roundtrip() {
|
||||
for event in [
|
||||
Event::LlmCallStart { llm_call: 0 },
|
||||
Event::LlmCallEnd { llm_call: 7 },
|
||||
] {
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
match (&event, &decoded) {
|
||||
(Event::LlmCallStart { llm_call: a }, Event::LlmCallStart { llm_call: b })
|
||||
| (Event::LlmCallEnd { llm_call: a }, Event::LlmCallEnd { llm_call: b }) => {
|
||||
assert_eq!(a, b);
|
||||
}
|
||||
_ => panic!("variant mismatch: {event:?} vs {decoded:?}"),
|
||||
}
|
||||
}
|
||||
let parsed: serde_json::Value = serde_json::from_str(
|
||||
&serde_json::to_string(&Event::LlmCallStart { llm_call: 3 }).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(parsed["event"], "llm_call_start");
|
||||
assert_eq!(parsed["data"]["llm_call"], 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn method_notify_json_roundtrip() {
|
||||
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use llm_worker::{UsageRecord, WorkerResult};
|
||||
use protocol::{ScopeRule, Segment};
|
||||
use protocol::{InvokeKind, ScopeRule, Segment};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
|
|
@ -91,8 +91,13 @@ pub struct HashedEntry {
|
|||
///
|
||||
/// Variants correspond to specific mutation points in `Worker`:
|
||||
/// - `SessionStart` — always the first entry; captures initial state
|
||||
/// - `Invoke` — IDLE → active marker (start of a new self-driving cycle)
|
||||
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
|
||||
/// - `TurnEnd` — turn boundary marker
|
||||
/// - `TurnEnd` — AgentTurn boundary marker; carries the post-increment
|
||||
/// `turn_count`. With retry unimplemented today this fires once per
|
||||
/// `run()`/`resume()` (current callers persist a single TurnEnd at
|
||||
/// run completion); the fork-point seq for `at_turn_index` is the
|
||||
/// preceding `Invoke` entry, not the TurnEnd.
|
||||
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
||||
/// - `ConfigChanged` — `RequestConfig` mutation
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -113,6 +118,23 @@ pub enum LogEntry {
|
|||
compacted_from: Option<SessionOrigin>,
|
||||
},
|
||||
|
||||
/// IDLE → active marker. Records the start of a new self-driving
|
||||
/// cycle (Invoke range). The range extends implicitly until the
|
||||
/// next `Invoke` entry; this entry carries the trigger only — the
|
||||
/// actual payload (user text / notify message / pod event body) is
|
||||
/// in the immediately following Turn entry (`UserInput` / `SystemItem`).
|
||||
///
|
||||
/// Used by `pod-session-fork` style operations: the fork-point seq
|
||||
/// (`at_turn_index` in persistence-semantics) points at one of these
|
||||
/// `Invoke` entries so "back to N-th send" maps cleanly to the
|
||||
/// IDLE-break boundary the user sees.
|
||||
///
|
||||
/// Field name is `trigger` (not `kind`) because the LogEntry
|
||||
/// serde tag already occupies `"kind"`.
|
||||
///
|
||||
/// Marker only — replay does not mutate `RestoredState`.
|
||||
Invoke { ts: u64, trigger: InvokeKind },
|
||||
|
||||
/// User input accepted at submit time. Carries the original typed
|
||||
/// `Vec<Segment>` so clients can re-render typed atoms (paste chips,
|
||||
/// file/knowledge refs, workflow invocations) on session restore.
|
||||
|
|
@ -290,6 +312,11 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
|
|||
state.config = config.clone();
|
||||
state.history = history.iter().cloned().map(Item::from).collect();
|
||||
}
|
||||
LogEntry::Invoke { .. } => {
|
||||
// Marker only; no state mutation. The trailing
|
||||
// UserInput / SystemItem / TurnEnd entries carry all
|
||||
// replay-relevant data.
|
||||
}
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = Segment::flatten_to_text(segments);
|
||||
state.history.push(Item::user_message(text));
|
||||
|
|
@ -655,6 +682,59 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invoke_entry_round_trip_via_json() {
|
||||
let entry = LogEntry::Invoke {
|
||||
ts: 12345,
|
||||
trigger: InvokeKind::UserSend,
|
||||
};
|
||||
let json = serde_json::to_string(&entry).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["kind"], "invoke");
|
||||
assert_eq!(parsed["trigger"], "user_send");
|
||||
let decoded: LogEntry = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
LogEntry::Invoke { ts, trigger } => {
|
||||
assert_eq!(ts, 12345);
|
||||
assert_eq!(trigger, InvokeKind::UserSend);
|
||||
}
|
||||
other => panic!("expected Invoke, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_invoke_marker_does_not_mutate_state() {
|
||||
let entries = build_chain(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 0,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
LogEntry::Invoke {
|
||||
ts: 100,
|
||||
trigger: InvokeKind::UserSend,
|
||||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 101,
|
||||
segments: vec![Segment::text("hi")],
|
||||
},
|
||||
LogEntry::TurnEnd {
|
||||
ts: 200,
|
||||
turn_count: 1,
|
||||
},
|
||||
LogEntry::Invoke {
|
||||
ts: 300,
|
||||
trigger: InvokeKind::Notify,
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.history.len(), 1);
|
||||
assert_eq!(state.turn_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_extension_collects_domain_payload_pairs() {
|
||||
let entries = build_chain(&[
|
||||
|
|
|
|||
|
|
@ -498,6 +498,13 @@ impl App {
|
|||
self.current_tool = None;
|
||||
self.assistant_streaming = false;
|
||||
}
|
||||
// UI consumers of Invoke / LlmCall semantics are out of scope
|
||||
// for `tickets/invoke-turn-llmcall-semantics.md`; events flow
|
||||
// through to subscribers but the TUI currently derives its
|
||||
// turn header from `UserMessage` / `SystemItem` arrivals.
|
||||
Event::InvokeStart { .. }
|
||||
| Event::LlmCallStart { .. }
|
||||
| Event::LlmCallEnd { .. } => {}
|
||||
Event::TextDelta { text } => {
|
||||
self.append_assistant_text(&text);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,155 +0,0 @@
|
|||
# Invoke / Turn / LlmCall セマンティクス整理
|
||||
|
||||
## 背景
|
||||
|
||||
現在のコード・protocol・UI では `turn` / `run` / `request` の意味が混ざり始めている。
|
||||
|
||||
特に `llm-worker` では、`run()` によって user input を history に append し、その後 LLM 呼び出し、tool 実行、再度 LLM 呼び出し、という自走 loop が完了するまでを扱う。一方で `turn_count` や `TurnStart` / `TurnEnd` は、実態としては loop 全体ではなく、loop 内の 1 回の LLM 生成境界に近い。
|
||||
|
||||
界隈の慣習でも、Anthropic / OpenAI 系 API では「1 input → 1 generation」をおおむね 1 turn と呼ぶ用例が多く、外側の自走完結単位に turn を当てると読み手の期待と逆走する。
|
||||
|
||||
加えて `turn` は本来「順番」「誰の番か」を表す語であり、user → assistant → tool → assistant ... の各 actor 区間に当てる用途と、外側の自走完結単位に当てる用途を兼ねさせると衝突する。
|
||||
|
||||
今後、永続化層 (`tickets/persistence-semantics.md`) の Session / Segment 整理、compaction、fork、resume、usage accounting、TUI 表示を進めるにあたり、これらを簡潔に区別する必要がある。
|
||||
|
||||
## 方針
|
||||
|
||||
中心語を以下に整理する。
|
||||
|
||||
- **`Invoke`**: IDLE → active 遷移を示す **marker entry**。「ここから新しい自走サイクルが始まった」という区切りを記録するのみ。range (まとまり) は Invoke から次の Invoke までとして暗黙に表現する。kind 以外の payload は持たない。
|
||||
- **`InvokeKind`**: Invoke の種別。`UserSend` / `Notify` / `PodEvent` / `SystemReminder` / `Wakeup` / ...
|
||||
- **`Turn`**: 1 actor の発話・行動区間。慣習に従い、actor 視点の「番」を素直に表す。
|
||||
- `UserTurn`: user の発話
|
||||
- `AgentTurn`: assistant の応答区間 (内部に 1 または複数の `LlmCall`)
|
||||
- `ToolTurn`: tool 実行区間
|
||||
- `SystemTurn`: Hook / Notify payload / `<system-reminder>` injection など、システム介入のキャッチオール
|
||||
- **`LlmCall`**: LLM を 1 回呼び、1 回の generation を得る単位。AgentTurn の内部に 1:N で属する。retry を含む。
|
||||
- **`Request`**: protocol / provider / HTTP などの I/O 要求。会話上の単位には使わない。
|
||||
- **`Run`**: 実装上の関数名・runtime 制御語としては残してよいが、ユーザー向け・永続化上の中心概念にはしない。
|
||||
|
||||
外側の自走完結単位を 1 概念として閉じ込めない (= `Exchange` / `Round` 等の新語は導入しない)。それは `Invoke` から次の `Invoke` までの range として暗黙に扱う。
|
||||
|
||||
### 構造
|
||||
|
||||
Segment 内 entry 列 (flat) として表すと以下:
|
||||
|
||||
```text
|
||||
Invoke(kind=UserSend) ← marker のみ (payload なし)
|
||||
UserTurn { text }
|
||||
AgentTurn
|
||||
LlmCall { usage, content (tool_use 含む) }
|
||||
LlmCall ← retry (同一 input、network error / 5xx 等)
|
||||
ToolTurn { tool_call_id, result }
|
||||
AgentTurn
|
||||
LlmCall
|
||||
Invoke(kind=Notify)
|
||||
SystemTurn { notify payload }
|
||||
AgentTurn
|
||||
LlmCall
|
||||
(自走中の割り込み: Invoke なし、SystemTurn のみ)
|
||||
SystemTurn { hook output }
|
||||
```
|
||||
|
||||
ポイント:
|
||||
|
||||
- `Invoke` は seq に挟まる「---」相当の marker。kind のみを持ち、内容 (user の入力テキスト等) は直後の Turn entry に書く。actor 分類 (Turn) と起動 trigger (Invoke) が直交した役割を保つ。
|
||||
- 自走中の割り込み (Hook 等) は IDLE → active 遷移ではないので Invoke を伴わず、SystemTurn のみが entry 列に現れる。
|
||||
- TUI の "Send #N" 表示は `kind=UserSend` の Invoke を数えた連番。
|
||||
|
||||
### retry の境界
|
||||
|
||||
AgentTurn 内に LlmCall を 1:N で持たせる際、retry の境界線を以下で定義する:
|
||||
|
||||
- **同 AgentTurn 内の LlmCall 連続** = input messages 列が **完全に同一** な再呼び出し (network error / 5xx / rate limit / stream 切断後の再接続)
|
||||
- **新 AgentTurn** = input messages 列が変化したとき (tool result が増えた、user が割り込んだ、など)
|
||||
|
||||
この基準は判別が明快で、usage / cost 集計も「LlmCall 単位で取り、AgentTurn 単位で sum」の 2 段集計で済む。stream 切断 → 再接続のケースが「同じ turn の継続」として自然に扱える。
|
||||
|
||||
### System actor の範囲
|
||||
|
||||
`SystemTurn` は actor 分類のキャッチオール枠として、以下を全て含む:
|
||||
|
||||
- Hook 出力の context 注入
|
||||
- Notify payload の history 記録 (Invoke(Notify) と組で現れる)
|
||||
- `<system-reminder>` injection
|
||||
- pod.scope 変更などシステム由来の追記
|
||||
- その他、user でも assistant でも tool でもない context 介入
|
||||
|
||||
これにより actor 分類は 4 値 (User / Agent / Tool / System) に収まる。
|
||||
|
||||
## 既存コードへの影響
|
||||
|
||||
### Event 名
|
||||
|
||||
現在の `Event::TurnStart` / `Event::TurnEnd` は実態として **LLM call 境界** で発火している。以下のいずれかで段階移行する:
|
||||
|
||||
- (案 A) 新規イベント `LlmCallStart` / `LlmCallEnd` を導入し、現状の TurnStart/TurnEnd の発火タイミングはそちらに移す。`TurnStart` / `TurnEnd` は AgentTurn 境界 (= retry を含むまとまり) の意味で再定義。
|
||||
- (案 B) 旧 event 名は alias として残しつつ、新名 (`LlmCallStart` / `LlmCallEnd` および `AgentTurnStart` / `AgentTurnEnd` 等) で完全分離。
|
||||
|
||||
どちらにせよ、AgentTurn と LlmCall の発火頻度は retry 集約により異なる点だけは明示する。
|
||||
|
||||
### Usage / cost accounting
|
||||
|
||||
- Usage / prompt cache hit / provider request metrics は **LlmCall に紐づく**。
|
||||
- AgentTurn の usage は内部 LlmCall の sum。
|
||||
- Invoke 単位の usage が必要なら、Invoke → 次の Invoke までの全 LlmCall を sum。
|
||||
|
||||
### TUI
|
||||
|
||||
- Invoke 境界に対応するヘッダー (`Send #1` / `Notify` / `Event` を kind に応じて表示) を新設または既存 `TurnHeader` を意味繰り上げ。
|
||||
- actor ごとの表示は UserTurn / AgentTurn / ToolTurn / SystemTurn それぞれに対応するブロックとして描画。
|
||||
- LlmCall 境界を UI に出すかは別判断 (デフォルトは AgentTurn にまとめる)。
|
||||
|
||||
### Worker / Protocol
|
||||
|
||||
- llm-worker は IDLE → active 遷移時に Invoke marker を history に追記する責務を持つ。
|
||||
- `WorkerResult` / `RunResult` / `TurnResult` などの命名は、責務 (Invoke 範囲の結果 / AgentTurn の結果 / LlmCall の結果) を区別して整理する。
|
||||
|
||||
## persistence-semantics との関係
|
||||
|
||||
`tickets/persistence-semantics.md` の Segment 内 `(segment_id, seq)` PK と整合する:
|
||||
|
||||
- Invoke marker / Turn 境界 / LlmCall 境界 / content entry は全て同じ entry 列に flat に並ぶ。Tree を畳む必要なし。
|
||||
- fork 起点の `at_turn_index` は **Invoke marker の seq** に揃える。TUI で見る "Send #N" 境界と fork 起点が一致し、ユーザーが "N 回目の send まで戻る" と素直に認識できる。
|
||||
- compaction の境界も Invoke 境界で取るのが自然 (途中 LlmCall や AgentTurn の中間では切らない)。
|
||||
|
||||
## 決定事項
|
||||
|
||||
- **Event 移行**: 案 A 採用。`Event::TurnStart` / `Event::TurnEnd` を **AgentTurn 境界** (retry 集約後) の意味に doc 再定義し、LLM call 境界の発火タイミングは **新規** `Event::LlmCallStart` / `Event::LlmCallEnd` に移す。protocol は新 variant 追加のみで既存 variant は破壊しない (互換維持)。
|
||||
- **`Worker::turn_count` の意味**: **AgentTurn 数**。現状 retry が実装されていないため LLM call 数と 1:1 だが、`llm-worker-stream-continuation` 等で retry が入った時に意味が分岐する想定。
|
||||
- **LlmCall 通し番号**: 新規 `llm_call_count: usize` を `Worker` に追加し、`Event::LlmCallStart` / `End` で通知。
|
||||
- **`InvokeKind`**: `UserSend` / `Notify` / `PodEvent` / `SystemReminder` / `Wakeup` の 5 variants を初期セットに含める。Wakeup は実体未登場だが将来追加コストを下げるため最初から枠を切る。配置は `protocol` crate (`session-store` が依存する向き)。
|
||||
- **`LogEntry::TurnEnd` の扱い**: **位置・形式そのまま維持**。新規 `LogEntry::Invoke { ts, kind: InvokeKind }` を **追加** (run / run_for_notification の開始時に commit)。TurnEnd の commit 位置を AgentTurn 単位に動かすのは互換破壊になるため、本チケット範囲外とする。`persistence-semantics` の `at_turn_index` は新 `Invoke` entry の seq を指すように後続で整合させる。
|
||||
- **`SystemTurn` の sub-kind 化**: 行わない。SystemTurn は actor キャッチオール枠の単一分類として平坦に扱う。Hook / Notify / system-reminder の区別は **対応する Invoke の `kind`** で取れるため重複しない。
|
||||
- **`Worker::history` への追記タイミング**: 実装中に Invoke 境界 (= run / run_for_notification 開始) と Invoke marker commit が同タイミングであることを確認する。ずれていれば marker 側を Worker 入口に揃える。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- `Invoke` / `Turn` / `LlmCall` / `Request` / `Run` の定義が文書化されている。
|
||||
- AgentTurn における retry の境界線が明確化されている。
|
||||
- SystemTurn が actor キャッチオールとして定義され、Hook / Notify / system-reminder がそこに含まれることが示されている。
|
||||
- 既存 `Event::TurnStart` / `Event::TurnEnd` の段階移行方針 (案 A) が決まっている。
|
||||
- persistence-semantics の `at_turn_index` 等が Invoke seq を指すことが整合している。
|
||||
- **コード反映**: 以下が実装されている。
|
||||
- `protocol::Event::InvokeStart { kind: InvokeKind }` / `Event::LlmCallStart` / `Event::LlmCallEnd` が追加されている。
|
||||
- `protocol::InvokeKind` enum が定義されている。
|
||||
- `Worker` に `llm_call_count` フィールドと `on_llm_call_start` / `on_llm_call_end` callback が追加されている。
|
||||
- `Worker::turn_count` の doc が AgentTurn 数の意味に更新されている。
|
||||
- `session_store::LogEntry::Invoke { ts, kind }` が追加され、run / run_for_notification 開始時に commit されている。
|
||||
- controller が新 callback を wire し、新 Event を broadcast している。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- このチケット単体での大規模 rename 実装 (具体的には `Hook::OnTurnEnd` の名前変更、`TUI Block::TurnHeader` の名前変更、`Pod::add_on_turn_end_hook` 等の API rename)。
|
||||
- `LogEntry::TurnEnd` の commit 位置を AgentTurn 単位に動かすこと (互換破壊)。
|
||||
- retry の実体実装 (`llm-worker-stream-continuation` の領分)。
|
||||
- 永続化 DB backend の実装。
|
||||
- TUI の詳細 UX 設計 (Invoke 境界での kind 別表示、LlmCall の UI 露出など)。
|
||||
- protocol の互換破壊的変更 (既存 variant の変更・削除)。
|
||||
|
||||
## 関連
|
||||
|
||||
- `tickets/persistence-semantics.md` — Segment / Entry の永続化単位を定義する別チケット。本チケットは論理単位 (Invoke / Turn / LlmCall) の定義に閉じる。
|
||||
- `crates/llm-worker/`
|
||||
- `crates/protocol/`
|
||||
- `crates/tui/`
|
||||
Loading…
Reference in New Issue
Block a user