Compare commits

...

7 Commits

25 changed files with 880 additions and 370 deletions

View File

@ -7,13 +7,16 @@
- Pod: 任意ターンからの Fork複数ターン巻き戻しを汎用化 → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
- Pod/TUI: 手動 rollback 導線 → [tickets/manual-turn-rollback.md](tickets/manual-turn-rollback.md)
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
- llm-worker のエラー耐性
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
- SpawnPod 初回 task delivery の受理確認 → [tickets/spawnpod-initial-run-confirmation.md](tickets/spawnpod-initial-run-confirmation.md)
- E2E テストハーネス(`tests/e2e/`、opt-in → [tickets/e2e-harness.md](tickets/e2e-harness.md)
- メモリ機構
- consolidation skip 表示と invalid staging の観測性 → [tickets/memory-consolidation-skip-observability.md](tickets/memory-consolidation-skip-observability.md)
- TUI 拡充
- navigation mode / block focus の設計 → [tickets/tui-navigation-mode-design.md](tickets/tui-navigation-mode-design.md)
- spawned child Pod の一覧と一時 attach → [tickets/tui-spawned-pod-panel.md](tickets/tui-spawned-pod-panel.md)
- actionbar transient notice API → [tickets/tui-actionbar-transient-notice-api.md](tickets/tui-actionbar-transient-notice-api.md)
- tui -r picker で live pending Pod が表示から漏れる → [tickets/tui-picker-live-pending-pods.md](tickets/tui-picker-live-pending-pods.md)
- user manifest env override 時の spawn scope overlay 前提ズレ → [tickets/tui-user-manifest-env-overlay.md](tickets/tui-user-manifest-env-overlay.md)
- ユーザーマニフェストのモデル設定 wizard → [tickets/tui-user-model-setup.md](tickets/tui-user-model-setup.md)
- セッション内 Task ツールの注意機構(無アクティビティで `<system-reminder>` ナッジ) → [tickets/session-todo-reminder.md](tickets/session-todo-reminder.md)

View File

@ -59,4 +59,6 @@ pub use interceptor::Interceptor;
pub use message::{ContentPart, Item, Message, Role};
pub use tool::{ToolCall, ToolOutputLimits, ToolResult};
pub use usage_record::UsageRecord;
pub use worker::{RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult};
pub use worker::{
LlmRetryNotice, RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult,
};

View File

@ -36,6 +36,8 @@ impl std::fmt::Display for ConfigWarning {
}
}
pub type ResponseStream = Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>;
/// LLMクライアントのtrait
///
/// 各プロバイダはこのtraitを実装し、統一されたインターフェースを提供する。
@ -49,10 +51,7 @@ pub trait LlmClient: Send + Sync {
/// # Returns
/// * `Ok(Stream)` - イベントストリーム
/// * `Err(ClientError)` - エラー
async fn stream(
&self,
request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError>;
async fn stream(&self, request: Request) -> Result<ResponseStream, ClientError>;
/// Clone this client into a new `Box<dyn LlmClient>`.
///
@ -85,10 +84,7 @@ impl Clone for Box<dyn LlmClient> {
/// これにより、動的ディスパッチを使用するクライアントも `Worker` で利用可能になる。
#[async_trait]
impl LlmClient for Box<dyn LlmClient> {
async fn stream(
&self,
request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError> {
async fn stream(&self, request: Request) -> Result<ResponseStream, ClientError> {
(**self).stream(request).await
}

View File

@ -1,6 +1,6 @@
//! LLMクライアントエラー型
use std::fmt;
use std::{fmt, time::Duration};
/// LLMクライアントのエラー
#[derive(Debug)]
@ -16,6 +16,7 @@ pub enum ClientError {
status: Option<u16>,
code: Option<String>,
message: String,
retry_after: Option<Duration>,
},
/// 設定エラー
Config(String),
@ -31,6 +32,7 @@ impl fmt::Display for ClientError {
status,
code,
message,
..
} => {
write!(f, "API error")?;
if let Some(s) = status {
@ -68,6 +70,22 @@ impl From<serde_json::Error> for ClientError {
}
}
impl ClientError {
pub fn status(&self) -> Option<u16> {
match self {
ClientError::Api { status, .. } => *status,
_ => None,
}
}
pub fn retry_after(&self) -> Option<Duration> {
match self {
ClientError::Api { retry_after, .. } => *retry_after,
_ => None,
}
}
}
/// transient な失敗としてリトライ対象になるかを判定する。
///
/// 対象:
@ -97,6 +115,7 @@ mod tests {
status,
code: None,
message: String::new(),
retry_after: None,
}
}

View File

@ -1,8 +1,8 @@
//! HTTP transient エラー向けリトライポリシー。
//! LLM response stream を開く前の transient error 向けリトライポリシー。
//!
//! `transport.rs` の HTTP 送信〜ステータスチェック区間で `is_retryable`
//! が true を返した失敗をリトライする際に、待ち時間と打ち切り条件を
//! 提供する。SSE 読み出し開始後の失敗は対象外。
//! Worker が `LlmClient::stream` の open error に対して `is_retryable` を見て
//! retry / backoff / TUI event / cancellation をまとめて管理する。
//! SSE 読み出し開始後の失敗は対象外。
use std::time::Duration;

View File

@ -131,6 +131,7 @@ impl GeminiScheme {
status: None,
code: Some("parse_error".to_string()),
message: format!("Failed to parse Gemini SSE data: {} -> {}", e, data),
retry_after: None,
})?;
let mut events = Vec::new();

View File

@ -75,6 +75,7 @@ impl OpenAIScheme {
status: None,
code: Some("parse_error".to_string()),
message: format!("Failed to parse SSE data: {} -> {}", e, data),
retry_after: None,
})?;
let mut events = Vec::new();

View File

@ -597,6 +597,7 @@ fn from_json<T: for<'de> Deserialize<'de>>(data: &str) -> Result<T, ClientError>
status: None,
code: Some("parse_error".to_string()),
message: format!("Failed to parse SSE data: {e}"),
retry_after: None,
})
}

View File

@ -12,15 +12,12 @@ use async_trait::async_trait;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, RETRY_AFTER};
use tokio::time::Instant;
use tracing::warn;
use super::auth::{AuthProvider, AuthRequirement};
use super::capability::ModelCapability;
use super::client::{ConfigWarning, LlmClient};
use super::error::{ClientError, is_retryable};
use super::client::{ConfigWarning, LlmClient, ResponseStream};
use super::error::ClientError;
use super::event::Event;
use super::retry::RetryPolicy;
use super::scheme::Scheme;
use super::types::{Request, RequestConfig};
@ -67,7 +64,6 @@ pub struct HttpTransport<S: Scheme> {
base_url: String,
auth: ResolvedAuth,
capability: ModelCapability,
retry_policy: RetryPolicy,
}
impl<S: Scheme> HttpTransport<S> {
@ -89,7 +85,6 @@ impl<S: Scheme> HttpTransport<S> {
base_url,
auth,
capability,
retry_policy: RetryPolicy::default(),
}
}
@ -99,12 +94,6 @@ impl<S: Scheme> HttpTransport<S> {
self
}
/// リトライポリシーを差し替える(テスト用 / 将来の manifest 化フック)。
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
fn build_url(&self) -> String {
let path = self.scheme.path(&self.model_id);
let url = format!("{}{}", self.base_url, path);
@ -171,14 +160,12 @@ impl<S: Scheme + Clone> Clone for HttpTransport<S> {
base_url: self.base_url.clone(),
auth: self.auth.clone(),
capability: self.capability.clone(),
retry_policy: self.retry_policy.clone(),
}
}
}
/// エラーレスポンスを `ClientError::Api` に変換し、`Retry-After` の秒数を
/// 同時に取り出す。リトライループで wait の上書きに使う。
async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Option<Duration>) {
/// エラーレスポンスを `ClientError::Api` に変換する。
async fn classify_error_response(resp: reqwest::Response) -> ClientError {
let status = resp.status().as_u16();
let retry_after = resp
.headers()
@ -187,7 +174,7 @@ async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Optio
.and_then(|s| s.trim().parse::<u64>().ok())
.map(Duration::from_secs);
let text = resp.text().await.unwrap_or_default();
let err = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
let error = json.get("error").unwrap_or(&json);
let code = error.get("type").and_then(|v| v.as_str()).map(String::from);
let message = error
@ -199,15 +186,16 @@ async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Optio
status: Some(status),
code,
message,
retry_after,
}
} else {
ClientError::Api {
status: Some(status),
code: None,
message: text,
retry_after,
}
};
(err, retry_after)
}
}
#[async_trait]
@ -220,51 +208,25 @@ impl<S: Scheme + Clone + 'static> LlmClient for HttpTransport<S> {
self.scheme.validate_config(config)
}
async fn stream(
&self,
request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError> {
async fn stream(&self, request: Request) -> Result<ResponseStream, ClientError> {
let url = self.build_url();
let headers = self.build_headers().await?;
let body = self
.scheme
.build_request_body(&self.model_id, &request, &self.capability);
let policy = &self.retry_policy;
let started = Instant::now();
let mut attempt: u32 = 0;
let response = loop {
let send_result = self
.http_client
.post(&url)
.headers(headers.clone())
.json(&body)
.send()
.await;
let response = self
.http_client
.post(&url)
.headers(headers)
.json(&body)
.send()
.await
.map_err(ClientError::Http)?;
let (err, retry_after) = match send_result {
Ok(resp) if resp.status().is_success() => break resp,
Ok(resp) => classify_error_response(resp).await,
Err(e) => (ClientError::Http(e), None),
};
let next_attempt = attempt + 1;
if next_attempt >= policy.max_attempts || !is_retryable(&err) {
return Err(err);
}
let wait = retry_after.unwrap_or_else(|| policy.backoff(attempt));
if started.elapsed() + wait > policy.total_timeout {
return Err(err);
}
warn!(
error = %err,
attempt = next_attempt,
wait_ms = wait.as_millis() as u64,
"transient HTTP error, retrying"
);
tokio::time::sleep(wait).await;
attempt = next_attempt;
};
if !response.status().is_success() {
return Err(classify_error_response(response).await);
}
let scheme = self.scheme.clone();
let byte_stream = response.bytes_stream().map_err(std::io::Error::other);

View File

@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::{marker::PhantomData, time::Instant};
use futures::StreamExt;
use tokio::sync::mpsc;
@ -17,8 +17,8 @@ use crate::{
PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction,
},
llm_client::{
ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition,
types::parse_tool_arguments,
ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ResponseStream,
ToolDefinition, error::is_retryable, retry::RetryPolicy, types::parse_tool_arguments,
},
state::{Locked, Mutable, WorkerState},
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
@ -99,6 +99,8 @@ enum ToolExecutionResult {
Paused,
}
const MAX_STREAM_CONTINUATIONS: u32 = 3;
/// Central component for managing LLM interactions
///
/// Receives input from the user, sends requests to the LLM, and
@ -131,9 +133,28 @@ enum ToolExecutionResult {
/// let out = worker.run("Continue").await?;
/// let mut worker = out.worker;
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LlmRetryNotice {
/// 直近で失敗した attempt 番号。1 origin。
pub failed_attempt: u32,
pub max_attempts: u32,
pub wait: std::time::Duration,
pub elapsed: std::time::Duration,
pub status: Option<u16>,
pub error: String,
}
#[derive(Debug)]
enum StreamCompletion {
Complete,
Interrupted { reason: String },
}
pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
/// LLM client
client: C,
/// Retry policy for opening an LLM response stream.
retry_policy: RetryPolicy,
/// Event timeline
timeline: Timeline,
/// Text block collector (Timeline handler)
@ -175,6 +196,10 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
llm_call_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
/// LlmCall-end callbacks
llm_call_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
/// Transport-level retry callbacks for a specific LlmCall.
llm_retry_cbs: Vec<Box<dyn Fn(usize, &LlmRetryNotice) + Send + Sync>>,
/// Stream continuation callbacks for a specific LlmCall.
llm_continuation_cbs: Vec<Box<dyn Fn(usize, u32, u32, &str) + Send + Sync>>,
/// Non-fatal warning callbacks. Invoked when the Worker wants to
/// surface an advisory message to the upper layer (e.g. Pod) so it
/// can be forwarded to the user — distinct from `tracing::warn!`,
@ -355,6 +380,34 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.llm_call_end_cbs.push(Box::new(callback));
}
/// Register a transport-level retry callback.
pub fn on_llm_retry(
&mut self,
callback: impl Fn(usize, &LlmRetryNotice) + Send + Sync + 'static,
) {
self.llm_retry_cbs.push(Box::new(callback));
}
/// Register a stream continuation callback.
pub fn on_llm_continuation(
&mut self,
callback: impl Fn(usize, u32, u32, &str) + Send + Sync + 'static,
) {
self.llm_continuation_cbs.push(Box::new(callback));
}
fn emit_llm_continuation(
&self,
llm_call: usize,
attempt: u32,
max_attempts: u32,
reason: &str,
) {
for cb in &self.llm_continuation_cbs {
cb(llm_call, attempt, max_attempts, reason);
}
}
/// Register a non-fatal warning callback.
///
/// The callback is invoked with a short human-readable message
@ -964,6 +1017,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
}
let mut stream_continuations: u32 = 0;
let mut continuing_stream = false;
loop {
if self.try_cancelled() {
info!("Execution cancelled");
@ -973,9 +1028,11 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
let current_turn = self.turn_count;
debug!(turn = current_turn, "Turn start");
for cb in &self.turn_start_cbs {
cb(current_turn);
if !continuing_stream {
debug!(turn = current_turn, "Turn start");
for cb in &self.turn_start_cbs {
cb(current_turn);
}
}
// Drain interceptor-side inputs that are meant to land in
@ -1080,13 +1137,50 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// Stream LLM response
let request = self.build_request(&tool_definitions, &request_context);
self.stream_response(request).await?;
let stream_outcome = self.stream_response(request, current_llm_call).await?;
for cb in &self.llm_call_end_cbs {
cb(current_llm_call);
}
self.llm_call_count += 1;
if let StreamCompletion::Interrupted { reason } = stream_outcome {
stream_continuations += 1;
if stream_continuations > MAX_STREAM_CONTINUATIONS {
self.last_run_interrupted = true;
return Err(WorkerError::Client(ClientError::Api {
status: None,
code: None,
message: format!("LLM stream interrupted too many times: {reason}"),
retry_after: None,
}));
}
self.timeline.abort_current_block();
self.timeline.flush_usage();
let reasoning_items = self.reasoning_item_collector.take_collected();
let text_blocks = self.text_block_collector.take_collected();
// Do not recover tool calls from an interrupted stream. A completed
// tool_use is executable only when the provider finishes the stream.
let _dropped_tool_calls = self.tool_call_collector.take_collected();
let assistant_items =
self.build_assistant_items(&reasoning_items, &text_blocks, &[]);
if !assistant_items.is_empty() {
self.append_history_items(assistant_items);
}
self.emit_llm_continuation(
current_llm_call,
stream_continuations,
MAX_STREAM_CONTINUATIONS,
&reason,
);
continuing_stream = true;
continue;
}
stream_continuations = 0;
continuing_stream = false;
for cb in &self.turn_end_cbs {
cb(current_turn);
}
@ -1138,8 +1232,88 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
}
async fn open_stream_with_retry(
&mut self,
request: Request,
llm_call: usize,
) -> Result<ResponseStream, WorkerError> {
let policy = self.retry_policy.clone();
let started = Instant::now();
let mut failed_attempt: u32 = 0;
loop {
let stream_result = tokio::select! {
stream_result = self.client.stream(request.clone()) => stream_result,
cancel = self.cancel_rx.recv() => {
if cancel.is_some() {
info!("Cancelled before stream started");
}
self.timeline.abort_current_block();
self.last_run_interrupted = true;
return Err(WorkerError::Cancelled);
}
};
match stream_result {
Ok(stream) => return Ok(stream),
Err(err) => {
let next_failed_attempt = failed_attempt + 1;
if next_failed_attempt >= policy.max_attempts || !is_retryable(&err) {
self.last_run_interrupted = true;
return Err(WorkerError::Client(err));
}
let wait = err
.retry_after()
.unwrap_or_else(|| policy.backoff(failed_attempt));
let elapsed = started.elapsed();
if elapsed + wait > policy.total_timeout {
self.last_run_interrupted = true;
return Err(WorkerError::Client(err));
}
warn!(
error = %err,
failed_attempt = next_failed_attempt,
wait_ms = wait.as_millis() as u64,
"transient LLM request error, retrying"
);
let notice = LlmRetryNotice {
failed_attempt: next_failed_attempt,
max_attempts: policy.max_attempts,
wait,
elapsed,
status: err.status(),
error: err.to_string(),
};
for cb in &self.llm_retry_cbs {
cb(llm_call, &notice);
}
tokio::select! {
_ = tokio::time::sleep(wait) => {}
cancel = self.cancel_rx.recv() => {
if cancel.is_some() {
info!("Cancelled during LLM retry backoff");
}
self.timeline.abort_current_block();
self.last_run_interrupted = true;
return Err(WorkerError::Cancelled);
}
}
failed_attempt = next_failed_attempt;
}
}
}
}
/// Open a stream, dispatch all events to the timeline, handle cancellation.
async fn stream_response(&mut self, request: Request) -> Result<(), WorkerError> {
async fn stream_response(
&mut self,
request: Request,
llm_call: usize,
) -> Result<StreamCompletion, WorkerError> {
debug!(
item_count = request.items.len(),
tool_count = request.tools.len(),
@ -1147,18 +1321,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
"Sending request to LLM"
);
let mut stream = tokio::select! {
stream_result = self.client.stream(request) => stream_result
.inspect_err(|_| self.last_run_interrupted = true)?,
cancel = self.cancel_rx.recv() => {
if cancel.is_some() {
info!("Cancelled before stream started");
}
self.timeline.abort_current_block();
self.last_run_interrupted = true;
return Err(WorkerError::Cancelled);
}
};
let mut stream = self.open_stream_with_retry(request, llm_call).await?;
let mut event_count: usize = 0;
loop {
@ -1175,12 +1338,17 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
warn!(error = %e, "Stream error");
}
}
let event = result
.inspect_err(|_| {
let event = match result {
Ok(event) => event,
Err(err) => {
self.last_run_interrupted = true;
// 部分情報でも発火しておく(料金会計用)
self.timeline.flush_usage();
})?;
return Ok(StreamCompletion::Interrupted {
reason: err.to_string(),
});
}
};
self.timeline.dispatch(&event);
}
None => break,
@ -1200,7 +1368,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// ストリーム完了時に集約済み Usage を 1 度だけ発火
self.timeline.flush_usage();
debug!(event_count = event_count, "Stream completed");
Ok(())
Ok(StreamCompletion::Complete)
}
/// Execute tools and push results to history.
@ -1254,6 +1422,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
Self {
client,
retry_policy: RetryPolicy::default(),
timeline,
text_block_collector,
tool_call_collector,
@ -1270,6 +1439,8 @@ impl<C: LlmClient> Worker<C, Mutable> {
turn_end_cbs: Vec::new(),
llm_call_start_cbs: Vec::new(),
llm_call_end_cbs: Vec::new(),
llm_retry_cbs: Vec::new(),
llm_continuation_cbs: Vec::new(),
warning_cbs: Vec::new(),
tool_result_cbs: Vec::new(),
history_append_cbs: Vec::new(),
@ -1385,6 +1556,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
self
}
/// Set the retry policy used when opening an LLM response stream.
pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
self.retry_policy = retry_policy;
self
}
/// Validate current configuration against the provider
///
/// Returns an error if there are unsupported settings.
@ -1507,6 +1684,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
let locked_prefix_len = self.history.len();
Worker {
client: self.client,
retry_policy: self.retry_policy,
timeline: self.timeline,
text_block_collector: self.text_block_collector,
tool_call_collector: self.tool_call_collector,
@ -1523,6 +1701,8 @@ impl<C: LlmClient> Worker<C, Mutable> {
turn_end_cbs: self.turn_end_cbs,
llm_call_start_cbs: self.llm_call_start_cbs,
llm_call_end_cbs: self.llm_call_end_cbs,
llm_retry_cbs: self.llm_retry_cbs,
llm_continuation_cbs: self.llm_continuation_cbs,
warning_cbs: self.warning_cbs,
tool_result_cbs: self.tool_result_cbs,
history_append_cbs: self.history_append_cbs,
@ -1594,6 +1774,7 @@ impl<C: LlmClient> Worker<C, Locked> {
pub fn unlock(self) -> Worker<C, Mutable> {
Worker {
client: self.client,
retry_policy: self.retry_policy,
timeline: self.timeline,
text_block_collector: self.text_block_collector,
tool_call_collector: self.tool_call_collector,
@ -1610,6 +1791,8 @@ impl<C: LlmClient> Worker<C, Locked> {
turn_end_cbs: self.turn_end_cbs,
llm_call_start_cbs: self.llm_call_start_cbs,
llm_call_end_cbs: self.llm_call_end_cbs,
llm_retry_cbs: self.llm_retry_cbs,
llm_continuation_cbs: self.llm_continuation_cbs,
warning_cbs: self.warning_cbs,
tool_result_cbs: self.tool_result_cbs,
history_append_cbs: self.history_append_cbs,

View File

@ -4,17 +4,77 @@
mod common;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use common::MockLlmClient;
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent as ClientStatusEvent};
use llm_worker::llm_client::retry::RetryPolicy;
use llm_worker::llm_client::{ClientError, LlmClient, Request, ResponseStream};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
// =============================================================================
// Tests
// =============================================================================
#[derive(Clone)]
struct FailOnceClient {
calls: Arc<AtomicUsize>,
events: Vec<Event>,
}
#[async_trait]
impl LlmClient for FailOnceClient {
async fn stream(&self, _request: Request) -> Result<ResponseStream, ClientError> {
if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
return Err(ClientError::Api {
status: Some(504),
code: None,
message: "gateway timeout".into(),
retry_after: None,
});
}
Ok(Box::pin(futures::stream::iter(
self.events.clone().into_iter().map(Ok),
)))
}
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
}
#[tokio::test]
async fn test_callback_llm_retry_event() {
let events = vec![Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
})];
let client = FailOnceClient {
calls: Arc::new(AtomicUsize::new(0)),
events,
};
let mut worker = Worker::new(client).with_retry_policy(RetryPolicy {
base: Duration::from_millis(1),
cap: Duration::from_millis(1),
max_attempts: 2,
total_timeout: Duration::from_secs(1),
});
let notices = Arc::new(Mutex::new(Vec::new()));
let sink = notices.clone();
worker.on_llm_retry(move |llm_call, notice| {
sink.lock().unwrap().push((llm_call, notice.clone()));
});
let result = worker.run("retry once").await;
assert!(result.is_ok(), "worker should succeed after one retry");
let notices = notices.lock().unwrap();
assert_eq!(notices.len(), 1);
assert_eq!(notices[0].0, 0);
assert_eq!(notices[0].1.failed_attempt, 1);
assert_eq!(notices[0].1.max_attempts, 2);
assert_eq!(notices[0].1.status, Some(504));
}
/// Verify that on_text_block correctly receives delta and stop events
#[tokio::test]

View File

@ -59,6 +59,7 @@ impl LlmClient for MockLlmClient {
status: Some(500),
code: Some("mock_error".to_string()),
message: "No more mock responses".to_string(),
retry_after: None,
});
}
let events = self.responses[count].clone();

View File

@ -1,12 +1,7 @@
//! HTTP transport の transient エラーリトライ挙動の integration テスト。
//! HTTP transport の単発 request / error classification テスト。
//!
//! 対応チケット: `tickets/llm-worker-transient-retry.md`。
//! - 503 / 529 / connect refused でリトライ発火
//! - max_attempts 上限到達でエラー
//! - `Retry-After` ヘッダで指数バックオフを上書き
//! - `parse_sse` 由来の `ClientError::Sse`mid-stream 想定)はリトライしない
use std::time::{Duration, Instant};
//! Retry/backoff は Worker の lifecycle 管理に属するため、transport は 1 回だけ
//! request を送り、HTTP status / Retry-After を `ClientError` に載せて返す。
use futures::StreamExt;
use llm_worker::llm_client::LlmClient;
@ -14,16 +9,16 @@ use llm_worker::llm_client::auth::AuthRequirement;
use llm_worker::llm_client::capability::ModelCapability;
use llm_worker::llm_client::error::ClientError;
use llm_worker::llm_client::event::Event;
use llm_worker::llm_client::retry::RetryPolicy;
use llm_worker::llm_client::scheme::Scheme;
use llm_worker::llm_client::transport::{HttpTransport, ResolvedAuth};
use llm_worker::llm_client::types::Request;
use serde_json::Value;
use std::time::Duration;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
/// SSE 本体は触らないテスト用 scheme。`parse_fail` を立てると
/// stream 消費中= retry loop の外)で `ClientError::Sse` を返す。
/// stream 消費中で `ClientError::Sse` を返す。
#[derive(Clone)]
struct DummyScheme {
parse_fail: bool,
@ -31,18 +26,23 @@ struct DummyScheme {
impl Scheme for DummyScheme {
type State = ();
fn default_base_url(&self) -> &'static str {
""
}
fn path(&self, _: &str) -> String {
"/v1/chat".into()
}
fn required_auth(&self) -> AuthRequirement {
AuthRequirement::None
}
fn build_request_body(&self, _: &str, _: &Request, _: &ModelCapability) -> Value {
serde_json::json!({})
}
fn parse_sse(&self, _: &str, _: &str, _: &mut ()) -> Result<Vec<Event>, ClientError> {
if self.parse_fail {
Err(ClientError::Sse(
@ -52,25 +52,13 @@ impl Scheme for DummyScheme {
Ok(vec![])
}
}
fn default_capability(&self) -> ModelCapability {
ModelCapability::minimal()
}
}
fn fast_policy(max_attempts: u32) -> RetryPolicy {
RetryPolicy {
base: Duration::from_millis(1),
cap: Duration::from_millis(1),
max_attempts,
total_timeout: Duration::from_secs(60),
}
}
fn build_transport(
base_url: impl Into<String>,
parse_fail: bool,
policy: RetryPolicy,
) -> HttpTransport<DummyScheme> {
fn build_transport(base_url: impl Into<String>, parse_fail: bool) -> HttpTransport<DummyScheme> {
HttpTransport::new(
DummyScheme { parse_fail },
"test-model",
@ -78,7 +66,6 @@ fn build_transport(
ResolvedAuth::None,
ModelCapability::minimal(),
)
.with_retry_policy(policy)
}
fn ok_sse() -> ResponseTemplate {
@ -88,78 +75,11 @@ fn ok_sse() -> ResponseTemplate {
}
#[tokio::test]
async fn retries_503_then_succeeds() {
async fn retryable_status_returns_api_error_without_retrying() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(503).set_body_string("upstream connect error"))
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ok_sse())
.mount(&server)
.await;
let transport = build_transport(server.uri(), false, fast_policy(5));
let mut stream = transport
.stream(Request::default())
.await
.expect("stream should succeed after retries");
while stream.next().await.is_some() {}
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 3, "two failures plus one success expected");
}
#[tokio::test]
async fn retries_529_then_exhausts() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(529).set_body_string("overloaded"))
.mount(&server)
.await;
let transport = build_transport(server.uri(), false, fast_policy(3));
match transport.stream(Request::default()).await {
Err(ClientError::Api {
status: Some(529), ..
}) => {}
Err(other) => panic!("expected Api(529), got {other:?}"),
Ok(_) => panic!("expected error after exhausting retries"),
}
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 3, "should hit max_attempts and stop");
}
#[tokio::test]
async fn connect_refused_retries_then_fails() {
// 接続不能なローカルアドレスを使う。Linux では `Connection refused` で
// 即時失敗するため、`fast_policy` ならテストが秒以下で終わる。
let unreachable = "http://127.0.0.1:1";
let transport = build_transport(unreachable, false, fast_policy(3));
match transport.stream(Request::default()).await {
Err(ClientError::Http(e)) => {
assert!(
e.is_connect() || e.is_timeout(),
"expected connect/timeout, got {e:?}"
);
}
Err(other) => panic!("expected Http error, got {other:?}"),
Ok(_) => panic!("expected error connecting to closed port"),
}
}
#[tokio::test]
async fn retry_after_header_overrides_backoff() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(503).insert_header("retry-after", "1"))
.up_to_n_times(1)
.mount(&server)
.await;
@ -169,34 +89,48 @@ async fn retry_after_header_overrides_backoff() {
.mount(&server)
.await;
// base/cap を 1ms に絞った policy で `Retry-After: 1` を観察すると、
// 指数バックオフ単独なら 1ms 程度で終わるはずが Retry-After に従って
// 1 秒待つ → 経過時間で override を検証できる。
let policy = RetryPolicy {
base: Duration::from_millis(1),
cap: Duration::from_millis(1),
max_attempts: 3,
total_timeout: Duration::from_secs(10),
};
let transport = build_transport(server.uri(), false, policy);
let transport = build_transport(server.uri(), false);
match transport.stream(Request::default()).await {
Err(ClientError::Api {
status: Some(503), ..
}) => {}
Err(other) => panic!("expected Api(503), got {other:?}"),
Ok(_) => panic!("transport must not retry internally"),
}
let start = Instant::now();
let mut stream = transport.stream(Request::default()).await.expect("ok");
while stream.next().await.is_some() {}
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_secs(1),
"Retry-After=1 should make us wait >=1s, elapsed={elapsed:?}"
);
assert!(
elapsed < Duration::from_secs(3),
"Retry-After=1 should not balloon, elapsed={elapsed:?}"
let received = server.received_requests().await.unwrap();
assert_eq!(
received.len(),
1,
"transport should send exactly one request"
);
}
#[tokio::test]
async fn mid_stream_sse_error_does_not_retry() {
async fn retry_after_header_is_preserved_on_api_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(503).insert_header("retry-after", "1"))
.mount(&server)
.await;
let transport = build_transport(server.uri(), false);
match transport.stream(Request::default()).await {
Err(
err @ ClientError::Api {
status: Some(503), ..
},
) => {
assert_eq!(err.retry_after(), Some(Duration::from_secs(1)));
}
Err(other) => panic!("expected Api(503), got {other:?}"),
Ok(_) => panic!("expected error"),
}
}
#[tokio::test]
async fn mid_stream_sse_error_is_stream_item_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
@ -211,11 +145,11 @@ async fn mid_stream_sse_error_does_not_retry() {
.mount(&server)
.await;
let transport = build_transport(server.uri(), true, fast_policy(5));
let transport = build_transport(server.uri(), true);
let mut stream = transport
.stream(Request::default())
.await
.expect("status 200 should bypass retry loop");
.expect("status 200 should open stream");
let mut saw_sse_err = false;
while let Some(item) = stream.next().await {
if matches!(item, Err(ClientError::Sse(_))) {
@ -225,11 +159,11 @@ async fn mid_stream_sse_error_does_not_retry() {
assert!(saw_sse_err, "expected Sse error from stream consumer");
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 1, "mid-stream Sse must not retry");
assert_eq!(received.len(), 1, "mid-stream Sse must not reopen stream");
}
#[tokio::test]
async fn non_retryable_status_returns_immediately() {
async fn non_retryable_status_returns_api_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
@ -237,7 +171,7 @@ async fn non_retryable_status_returns_immediately() {
.mount(&server)
.await;
let transport = build_transport(server.uri(), false, fast_policy(5));
let transport = build_transport(server.uri(), false);
match transport.stream(Request::default()).await {
Err(ClientError::Api {
status: Some(401), ..
@ -247,5 +181,5 @@ async fn non_retryable_status_returns_immediately() {
}
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 1, "401 must not retry");
assert_eq!(received.len(), 1);
}

View File

@ -330,6 +330,29 @@ fn wire_event_bridges_on_worker<C, St>(
let _ = tx.send(Event::LlmCallEnd { llm_call });
});
let tx = event_tx.clone();
worker.on_llm_retry(move |llm_call, notice| {
let _ = tx.send(Event::LlmRetry {
llm_call,
failed_attempt: notice.failed_attempt,
max_attempts: notice.max_attempts,
wait_ms: notice.wait.as_millis() as u64,
elapsed_ms: notice.elapsed.as_millis() as u64,
status: notice.status,
error: notice.error.clone(),
});
});
let tx = event_tx.clone();
worker.on_llm_continuation(move |llm_call, attempt, max_attempts, reason| {
let _ = tx.send(Event::LlmContinuation {
llm_call,
attempt,
max_attempts,
reason: reason.to_owned(),
});
});
let tx = event_tx.clone();
let activity = ai_activity.clone();
worker.on_text_block(move |block| {

View File

@ -101,6 +101,7 @@ impl LlmClient for MockClient {
status: Some(500),
code: Some("mock".into()),
message: "No more responses".into(),
retry_after: None,
});
}
let response = self.responses[count].clone();

View File

@ -298,6 +298,29 @@ pub enum Event {
LlmCallEnd {
llm_call: usize,
},
/// A transport-level LLM request retry has been scheduled.
///
/// This is operational state for clients to render while the worker is
/// waiting in backoff. It is not part of conversation history.
LlmRetry {
llm_call: usize,
/// The attempt that just failed. 1 origin.
failed_attempt: u32,
max_attempts: u32,
wait_ms: u64,
elapsed_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
status: Option<u16>,
error: String,
},
/// Stream generation was interrupted after events had begun and the worker
/// is continuing with a follow-up LLM request.
LlmContinuation {
llm_call: usize,
attempt: u32,
max_attempts: u32,
reason: String,
},
TextDelta {
text: String,
},
@ -867,6 +890,69 @@ mod tests {
assert_eq!(parsed["data"]["llm_call"], 3);
}
#[test]
fn event_llm_retry_roundtrip() {
let event = Event::LlmRetry {
llm_call: 3,
failed_attempt: 1,
max_attempts: 4,
wait_ms: 800,
elapsed_ms: 120,
status: Some(504),
error: "API error (status: 504): gateway timeout".into(),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "llm_retry");
assert_eq!(parsed["data"]["status"], 504);
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::LlmRetry {
llm_call,
failed_attempt,
max_attempts,
wait_ms,
status,
..
} => {
assert_eq!(llm_call, 3);
assert_eq!(failed_attempt, 1);
assert_eq!(max_attempts, 4);
assert_eq!(wait_ms, 800);
assert_eq!(status, Some(504));
}
other => panic!("expected LlmRetry, got {other:?}"),
}
}
#[test]
fn event_llm_continuation_roundtrip() {
let event = Event::LlmContinuation {
llm_call: 4,
attempt: 1,
max_attempts: 3,
reason: "SSE parse error: closed".into(),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "llm_continuation");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::LlmContinuation {
llm_call,
attempt,
max_attempts,
reason,
} => {
assert_eq!(llm_call, 4);
assert_eq!(attempt, 1);
assert_eq!(max_attempts, 3);
assert_eq!(reason, "SSE parse error: closed");
}
other => panic!("expected LlmContinuation, got {other:?}"),
}
}
#[test]
fn method_notify_json_roundtrip() {
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;

View File

@ -56,6 +56,7 @@ impl CodexAuthError {
status: None,
code: Some("refresh_transient".into()),
message: msg,
retry_after: None,
},
CodexAuthError::RefreshPermanent { reason, message } => ClientError::Api {
status: Some(401),
@ -66,6 +67,7 @@ impl CodexAuthError {
PermanentReason::Other => "refresh_token_failed".into(),
}),
message: format!("{message}. Please run `codex login` again."),
retry_after: None,
},
}
}

View File

@ -45,6 +45,7 @@ impl LlmClient for MockLlmClient {
status: Some(500),
code: Some("mock_error".to_string()),
message: "No more mock responses".to_string(),
retry_after: None,
});
}
let events = self.responses[count].clone();

View File

@ -89,6 +89,8 @@ pub struct App {
pub context_window: u64,
pub turn_index: usize,
pub current_tool: Option<String>,
/// Latest LLM wait/retry lifecycle event for actionbar observability.
pub latest_llm_wait_event: Option<String>,
/// Latest memory extract/consolidation lifecycle event for actionbar observability.
pub latest_memory_worker_event: Option<String>,
/// Normal composer input that is submitted as `Method::Run`.
@ -150,6 +152,7 @@ impl App {
context_window: 0,
turn_index: 0,
current_tool: None,
latest_llm_wait_event: None,
latest_memory_worker_event: None,
input: InputBuffer::new(),
command_input: InputBuffer::new(),
@ -608,20 +611,52 @@ impl App {
self.set_pod_status(PodStatus::Running);
self.run_requests += 1;
self.current_tool = None;
self.latest_llm_wait_event = None;
self.assistant_streaming = false;
}
// UI consumers of Invoke / LlmCall semantics are out of scope
// for `tickets/invoke-turn-llmcall-semantics.md`; events flow
// through to subscribers but the TUI currently derives its
// turn header from `UserMessage` / `SystemItem` arrivals.
Event::InvokeStart { .. } | Event::LlmCallStart { .. } | Event::LlmCallEnd { .. } => {}
Event::InvokeStart { .. } | Event::LlmCallStart { .. } | Event::LlmCallEnd { .. } => {
self.latest_llm_wait_event = None;
}
Event::LlmRetry {
failed_attempt,
max_attempts,
wait_ms,
status,
error,
..
} => {
let next_attempt = failed_attempt.saturating_add(1).min(max_attempts);
let reason = status
.map(|code| format!("HTTP {code}"))
.unwrap_or_else(|| error);
self.latest_llm_wait_event = Some(format!(
"retrying LLM request after {reason} (attempt {next_attempt}/{max_attempts} in {})",
fmt_millis(wait_ms)
));
}
Event::LlmContinuation {
attempt,
max_attempts,
reason,
..
} => {
self.latest_llm_wait_event = Some(format!(
"LLM stream interrupted; continuing generation ({attempt}/{max_attempts}): {reason}"
));
}
Event::TextDelta { text } => {
self.latest_llm_wait_event = None;
self.append_assistant_text(&text);
}
Event::TextDone { .. } => {
self.assistant_streaming = false;
}
Event::ThinkingStart => {
self.latest_llm_wait_event = None;
self.assistant_streaming = false;
self.blocks.push(Block::Thinking(ThinkingBlock {
text: String::new(),
@ -661,6 +696,7 @@ impl App {
self.current_tool = None;
}
Event::ToolCallStart { id, name } => {
self.latest_llm_wait_event = None;
self.current_tool = Some(name.clone());
self.assistant_streaming = false;
self.blocks.push(Block::ToolCall(ToolCallBlock {
@ -702,6 +738,7 @@ impl App {
output,
is_error,
} => {
self.latest_llm_wait_event = None;
// Pull the name / args out first so we can look at the
// (immutable) cache before taking the mutable block
// borrow below.
@ -776,6 +813,7 @@ impl App {
self.push_error(format!("[{code:?}] {message}"));
}
Event::RunEnd { result } => {
self.latest_llm_wait_event = None;
if matches!(result, RunResult::RolledBack) {
self.handle_rolled_back_run();
} else {
@ -889,6 +927,7 @@ impl App {
self.run_upload_tokens = 0;
self.run_output_tokens = 0;
self.current_tool = None;
self.latest_llm_wait_event = None;
self.assistant_streaming = false;
}
@ -1291,6 +1330,14 @@ pub fn fmt_tokens(n: u64) -> String {
}
}
fn fmt_millis(ms: u64) -> String {
if ms >= 1_000 {
format!("{:.1}s", ms as f64 / 1_000.0)
} else {
format!("{ms}ms")
}
}
fn message_text(item: &serde_json::Value) -> String {
item["content"]
.as_array()
@ -1356,6 +1403,47 @@ pub fn alert_source_label(source: AlertSource) -> &'static str {
}
}
#[cfg(test)]
mod llm_wait_event_tests {
use super::*;
#[test]
fn llm_retry_updates_and_progress_clears_transient_status() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::LlmRetry {
llm_call: 2,
failed_attempt: 1,
max_attempts: 4,
wait_ms: 1_200,
elapsed_ms: 50,
status: Some(504),
error: "gateway timeout".into(),
});
assert_eq!(
app.latest_llm_wait_event.as_deref(),
Some("retrying LLM request after HTTP 504 (attempt 2/4 in 1.2s)")
);
app.handle_pod_event(Event::TextDelta { text: "ok".into() });
assert!(app.latest_llm_wait_event.is_none());
}
#[test]
fn llm_continuation_updates_transient_status() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::LlmContinuation {
llm_call: 3,
attempt: 1,
max_attempts: 3,
reason: "SSE parse error: closed".into(),
});
assert_eq!(
app.latest_llm_wait_event.as_deref(),
Some("LLM stream interrupted; continuing generation (1/3): SSE parse error: closed")
);
}
}
#[cfg(test)]
mod completion_flow_tests {
use super::*;

View File

@ -1158,7 +1158,14 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
];
if app.running {
let status = if let Some(tool) = &app.current_tool {
let status = if let Some(wait_event) = &app.latest_llm_wait_event {
format!(
"request: {} | ↑{}/↓{} | {wait_event}",
app.run_requests,
fmt_tokens(app.run_upload_tokens),
fmt_tokens(app.run_output_tokens),
)
} else if let Some(tool) = &app.current_tool {
format!(
"request: {} | ↑{}/↓{} | tool: {tool}",
app.run_requests,
@ -1218,6 +1225,11 @@ fn draw_actionbar(frame: &mut Frame, app: &App, area: Rect) {
"Alt-q edit queued Alt-c clear queued",
Style::default().fg(Color::DarkGray),
));
} else if let Some(llm_event) = app.latest_llm_wait_event.as_deref() {
left.push(Span::styled(
truncate_with_ellipsis(llm_event, 96),
Style::default().fg(Color::Yellow),
));
} else if let Some(memory_event) = app.latest_memory_worker_event.as_deref() {
left.push(Span::styled(
truncate_with_ellipsis(memory_event, 72),

View File

@ -0,0 +1,116 @@
# Compact 後 retained tail が巨大化して次 turn が壊れた疑い
## 概要
2026-05-24 の開発セッション中、手動 compact 後に「続けられる?」と入力した turn が、ユーザー側では context length 超過系のエラーで切れたように見えた。セッション永続化を確認すると、manual compact 自体は成功扱いで新 segment を作っているが、その post-compact `SegmentStart.history` が 892 entries / 約 2.3MB と巨大なままだった。
その直後の turn は `invoke` / `user_input` / `turn_end` / `run_completed result=finished` が残っている一方で、`assistant_item` と `llm_usage` が残っていない。つまり Pod 永続化上は正常終了扱いだが、実際には LLM 応答が生成されていない空 turn になっている。
次の turn の前には自動 pre-run compact がもう一度走り、今度は `SegmentStart.history` が 24 entries まで縮んだ。その後の turn は正常に `llm_usage input_total_tokens=20182` を記録して進行している。
## 観測したセッション
親 session:
- `/home/hare/.insomnia/sessions/019e5769-73fa-72a0-b501-b657a8976dd3`
関連 segment:
- 元 segment: `019e5769-73fa-72a0-b501-b665cb0ce470`
- 1回目 compact 後: `019e5c2f-a23c-7a00-b5db-fb31fccec9fb`
- 2回目 compact 後: `019e5c34-cea7-7e72-8565-2d5447fa0b70`
1回目 compact 後の `SegmentStart.history` は 892 entries。内訳は概ね以下。
- `tool_call`: 338
- `tool_result`: 338
- `reasoning`: 171
- `message`: 45
これは 1 turn で大量 tool call したというより、前回 compact 以後の長い履歴 suffix が retained tail として残ったものと考える方が自然。
## 現時点の推定
### 1. manual compact は「400K 未満にする」処理ではない
400K は compact 発火閾値であり、compact 後の post-condition ではない。manual compact の履歴分割は `compact_retained_tokens` を目標に末尾履歴を残すが、その結果が compact 閾値未満かどうかを強く検証していないように見える。
### 2. retained tail の見積もりが実際の persisted history サイズと乖離している可能性
`split_for_retained` / `token_estimates_for_prune_impl` 周辺を見る限り、retained split は usage records や token estimate に依存する。LLM request 時には tool result pruning / projection が関わるため、LLM に実際に投げた context の usage と、session log に永続化されている unpruned history の大きさが一致しない可能性がある。
その場合、manual compact 前の retained split では「小さく見える」が、compact 後に usage records がない状態で byte fallback 的な推定を使うと「大きく見える」ため、直後に auto compact が必要になる、という挙動を説明できる。
この点はまだ確定ではない。次に `019e5c2f` の retained tail が旧 history のどの index から始まったか、当時の usage records がどの history_len に対応していたかを再現コードか追加ログで詰める必要がある。
### 3. `just_compacted` が safety net を一時的に止める
compact 成功後は `compact_state.record_compact_success()` により `just_compacted = true` になる。`prepare_for_run()` の pre-run compact と interceptor の between-request compact は `!just_compacted` を条件にしているため、compact 直後の次 turn では再 compact が抑止される。
今回、1回目 compact の post-compact history が巨大なままでも、直後の turn ではその safety net が効かなかった。その turn が実質失敗しているのに `run_completed result=finished` として扱われたことで `just_compacted` が解除され、さらに次の turn の pre-run compact が走ったと考えられる。
### 4. context length 系失敗が `run_errored` として残っていない
問題の turn には `assistant_item` / `llm_usage` が無いにもかかわらず `run_completed finished` が残っている。ユーザー体感では context length 超過で切れている。LLM worker 側、stream continuation 側、または rollback/empty turn 処理で、エラーが正常終了扱いに丸められている可能性がある。
これは compact 問題とは別に、永続化と UI observability のバグとして調べるべき。
## 追加で気になった点
runtime state に segment 表示の不一致があった。
- `/run/user/1000/insomnia/insomnia/status.json` は古い segment id を指していた
- `/run/user/1000/insomnia/pods.json` は新しい segment id を指していた
今回の context 超過の主因ではなさそうだが、attach/restore 時に混乱要因になり得る。
## 修正候補
1. compact 成功後に post-compact context estimate を検査する。
- `new_history_len`
- estimated prompt tokens
- retained item count
- retained byte size
- threshold に対する比率
2. post-compact history が threshold を超える場合、単純な成功扱いにしない。
- `CompactFailed` 相当にする
- あるいは `just_compacted` を立てず、pre-run safety net を有効のままにする
- ただし infinite compact loop / thrash を避けるため、失敗理由を明示する必要がある
3. retained split の token estimate を、LLM に投げた pruned context ではなく、persisted history の実サイズに近い形で検証する。
- 少なくとも post-condition は usage record だけに依存しない
- byte fallback と usage-based estimate の乖離を metrics に出す
4. tool call / tool result boundary 保護が retained budget をどれだけ破ったかを可視化する。
- `initial_cut`
- `balanced_cut`
- `items_pulled_back`
- `bytes_pulled_back`
- `estimated_tokens_pulled_back`
5. compact metrics を session log に残す。
- source: manual / pre_run / between_requests
- old_segment_id / new_segment_id
- old_history_len / new_history_len
- retained_from index
- retained_items
- estimated_tokens_before / after
- estimate source: usage_records / byte_fallback / mixed
6. context length 系エラーが `run_completed finished` になる経路を調べる。
- `assistant_item``llm_usage` が無い run を正常終了扱いにしてよいか
- LLM request 前の context-build error と upstream error の永続化
- TUI に `RunEnd(Finished)` だけが届く経路がないか
## 次にやる調査
- `019e5c2f` の 892-entry history が、旧 segment history の何番目から retained されたものか特定する。
- 旧 segment の `LogEntry::LlmUsage { history_len, input_total_tokens, ... }` と retained cut の対応を確認する。
- `split_for_retained` を当時の history / usage records に対して再実行し、見積もりと実 persisted size の差を出す。
- `llm-worker` の prune threshold / protected area / min savings を確認し、pruning がこの乖離にどの程度寄与したかを確定する。
- 空 turn が `run_completed finished` になった理由を追う。
## 注意
このレポートは調査途中の暫定まとめ。特に「pruning が retained estimate を小さく見せた」という仮説はまだ未確定。確実に言えるのは、manual compact 後の `SegmentStart.history` が 892 entries と巨大で、その直後の turn が assistant/usage 無しに finished 扱いになり、次 turn で再 compact されて復旧した、という観測事実。

View File

@ -1,145 +0,0 @@
# llm-worker: ストリーム途中失敗時の継続
## 背景
LLM 応答の SSE ストリームを読んでいる途中で upstream が切れると、
`crates/llm-worker/src/llm_client/transport.rs:231`
`ClientError::Sse(...)`(中身は `eventsource_stream::Error::Transport`
さらに reqwest の `error decoding response body`)として上に投げられ、
`worker.rs:933 stream_response``WorkerError` に変換して Run 全体が中断する。
実例: セッション `019de419-6f8b-71a0-9e56-f0b9a6c7098b.jsonl:236`
直前まで text / tool_use を Timeline に積み続けていたが、
最後の SSE フレームが届く前に接続が切れ、Run が落ちた。
ここで重要な点:
- 出力トークンは upstream 側で既に発生しており、課金済み。
単純に同じ request を再送すると二重出力 + 二重課金。
- Anthropic / OpenAI のいずれの API も、途切れた SSE を
resume するエンドポイントは持たない。継続したい場合は
「assistant turn として部分出力を history に置いた状態で再リクエスト」
という形を自作する必要がある。
- 部分出力の質は内容依存:
- 完成した text ブロックは原則そのまま history に置ける
- 未完成の通常 text ブロックも、LLM の仕様上は assistant partial として置けば続きを生成させられる
- tool_use の `input_json` が途中で切れたブロックは破損 JSON で、そのままは置けない
- reasoning / thinking ブロックも provider 依存の扱いが要る
このため、これは「リトライ」ではなく「継続 (continuation)」。
`history` を編集する責務であり、`transport.rs` には収まらず、
`worker.rs` 層(または上位)の機能になる。
`feedback_llm_worker_scope.md` の方針llm-worker は低レベル基盤に留める)にも合致する。
なお `worker.rs:973` 付近で部分 `flush_usage()` だけは既に行っており、
半分くらいは継続を意識した作りになっている。あとは
「壊れていないブロックの確定」と「次 call の起動条件」を足す形。
## 決定した方針
stream 開始後に transport / SSE error で落ちた場合、同じ request をそのまま再送しない。
Timeline に積まれた部分生成を安全な範囲で assistant history として確定し、その history を前提に continuation call を起動する。
- 通常 text は partial でも残す。
- 完成済み text block はそのまま確定する。
- 未完成 text block も text として確定し、次の LLMCall で続きを生成させる。
- tool_use は壊れていないものだけ残す。
- 完成済み tool_use は通常通り確定する。
- 未完成 tool_use / partial JSON は history に入れず破棄する。
- 破棄した事実は structured diagnostic event として記録する。
- reasoning / thinking block は初期実装では保守的に扱う。
- provider が history に安全に戻せる完成 block として扱えるものだけ残す。
- 未完成または復元不能な thinking/reasoning は破棄し、diagnostic event に残す。
- continuation は自動で最大 5 回試す。
- backoff は attempt ごとに伸ばす。例: 1s, 2s, 4s, 8s, 16s。
- 5 回失敗したら turn を中断し、通常の失敗として上位へ返す。
- `Cancelled` / `Aborted` / interceptor `Yield` は continuation より優先する。
- 明示的な user cancel や上位制御を transport error retry で覆さない。
- 明示的な safety / content filter stop reason が provider event として返る場合は retry 対象外にする。
- transport / SSE error としてしか見えない場合は continuation 対象にする。
- 同じ箇所で繰り返し切られる場合は最大 5 回で exhausted する。
## 失敗ログ / 統計
この機能は実運用での発生頻度と回復率を見たいので、continuation lifecycle を structured log として残す。
ログは統計・デバッグ用であり、通常の LLM context へ暗黙注入しない。
最低限記録する event:
- `llm_stream_interrupted`
- provider / model
- run_id / turn_id 相当
- original attempt / continuation attempt
- error kind: `sse_transport`, `sse_parse`, `body_decode`, `unknown`
- error message
- committed text block count
- committed partial text の有無
- discarded partial tool_use count
- discarded thinking/reasoning count
- usage flush の有無
- `llm_stream_continuation_started`
- continuation attempt
- backoff duration
- history に確定した block summary
- `llm_stream_continuation_completed`
- continuation attempt
- completion reason
- `llm_stream_continuation_failed`
- continuation attempt
- error kind / message
- `llm_stream_continuation_exhausted`
- attempts
- final reason
未完成 tool_use を破棄した場合は、可能な範囲で以下も残す。
```json
{
"event": "discarded_partial_tool_use",
"tool_name": "Bash",
"partial_input_bytes": 1234,
"reason": "sse_transport_error"
}
```
## 要件
- ストリーム開始後の transport / SSE error を `worker.rs` 層で捕捉し、continuation 対象か判定する。
- pre-stream の transient retry とは別枠にする。
- 同じ request の単純再送はしない。
- Timeline に積まれた安全な block を assistant history として確定する。
- 完成済み text block を残す。
- 未完成 text block も残す。
- 完成済み tool_use を残す。
- 未完成 tool_use / partial JSON は破棄し、diagnostic event を記録する。
- 未完成または復元不能な thinking/reasoning は破棄し、diagnostic event を記録する。
- continuation call を最大 5 回まで自動実行する。
- attempt ごとに backoff を伸ばす。
- 成功したら通常の LLMCall 完了として扱う。
- exhausted したら turn を中断する。
- `Cancelled` / `Aborted` / interceptor `Yield` がある場合は continuation しない。
- provider が明示的な safety / content filter stop reason を正常 event として返した場合は continuation しない。
- continuation lifecycle と破棄した partial block の概要を structured log に残す。
- 統計として provider/model 別の失敗頻度、回復率、partial tool_use 発生有無を後から集計できること。
- continuation のために context へ一時的な system message を暗黙注入しない。
- もし LLM に中断事実を知らせる必要が出た場合は、history に残る明示 event/message として設計する。
- 初期実装では壊れた tool_use は LLM に知らせず、ログにだけ残す。
## 完了条件
- stream 途中で transport / SSE error を起こすモック integration test がある。
- text-only partial response では、未完成 text が history に残り、continuation call が続きを生成する。
- partial tool_use response では、壊れた tool_use が history に入らず、discard diagnostic が記録される。
- completed tool_use は破棄されず、通常通り history に残る。
- continuation が最大 5 回で exhausted し、turn が中断される test がある。
- `Cancelled` / `Aborted` / `Yield` が continuation より優先される test がある。
- structured log から interrupted / started / completed / failed / exhausted が確認できる。
- 課金重複が起きないこと(過去ターンの単純再生成ではなく partial assistant history からの continuation であること)が test または手動手順で確認されている。
- `cargo check` / `cargo test``llm-worker` で通る。
## 範囲外
- pre-stream の transient リトライ → `llm-worker-transient-retry`
- ストリーム resume API の実装(プロバイダ側に存在しないので不可能)
- 課金額の自動上限制御
- 壊れた partial tool_use を system message 等で LLM に説明して復旧させる高度な戦略

View File

@ -0,0 +1,48 @@
# メモリ機構: consolidation skip 表示と invalid staging の観測性
## 背景
`memory-audit-log` 実装後、TUI actionbar に `memory consolidation skipped: no_staging_entries` が表示されるようになった。これは一見すると「staging が無い」状態に見えるが、実際の workspace には `.insomnia/memory/_staging/*.json` が残っている場合がある。
現状の原因は二つある。
1. `run_consolidate_once` が成功後に backlog drain のため loop し、直後の空確認で `skipped: no_staging_entries` を emit する。これにより、直前の `completed_record_changes` 表示が actionbar 上で上書きされる。
2. 旧 schema の staging file は `source.session_id` を持ち、現行 schema が要求する `source.segment_id` を持たないため parse で skip される。結果として、ファイルは存在するのに valid entry が 0 件となり、`no_staging_entries` と記録される。
この状態は機能停止ではないが、観測面として misleading であり、memory 機構の稼働状況を人間が誤解しやすい。
## 方針
Audit log には worker の skip / no-op を残しつつ、actionbar には人間が見る価値のある状態だけを出す。特に successful consolidation の直後に発生する drain 終端確認の `no_staging_entries` は actionbar に出さない。
また、staging directory にファイルがあるが current schema として読めない場合は、`no_staging_entries` ではなく invalid staging が存在することを audit log から分かるようにする。
## 要件
- consolidation が `completed` した直後の drain 確認で発生する idle skip が、直前の actionbar 表示を上書きしない。
- 例: `completed_record_changes` の直後に `no_staging_entries` を actionbar へ出さない。
- audit log へ残すかどうかは実装判断でよいが、UI event としては抑制する。
- 通常の post-run check で staging が本当に空の場合も、actionbar に `no_staging_entries` を毎回出さない。
- idle/no-op 状態は audit log 側で確認できればよい。
- `list_staging_entries` あるいはその呼び出し側で、invalid / parse-failed staging file の存在を区別できるようにする。
- 少なくとも invalid count が audit reason または structured field から分かる。
- 例: `no_valid_staging_entries invalid=6`
- `threshold_not_reached``no_valid_staging_entries` / `invalid_staging_entries` は区別される。
- 既存の old schema staging を自動 migration / delete しない。
- `source.session_id``source.segment_id` は意味が違う可能性があるため、この ticket では観測性改善に留める。
- 必要なら後続で archive/drop/migration 方針を決める。
## 完了条件
- successful consolidation の直後に actionbar が `no_staging_entries` で上書きされない。
- staging directory に parse 不能な `.json` がある場合、audit log が `no_staging_entries` ではなく invalid staging の存在を示す。
- staging が本当に空の periodic/post-run check は actionbar にノイズを出さない。
- consolidation skip / invalid staging の挙動を確認する test がある。
- `cargo fmt --check` と関連 crate の test が通る。
## 範囲外
- old schema staging file の自動 migration。
- old schema staging file の自動削除 / archive。
- actionbar transient notice 汎用 API の実装。
- memory audit log の保存形式の大幅変更。

View File

@ -0,0 +1,53 @@
# SpawnPod: initial Run delivery confirmation
## 背景
`SpawnPod` は child Pod を起動し、初回 task を `Method::Run` として送る。しかし、実例として `impl-llm-worker-stream-continuation` を再作成した際、runtime registry / socket / process は生きている一方で、初回 task の session log が materialize されず、Pod は `idle` のままだった。
確認された状態:
- `/run/user/1000/insomnia/pods.json` に live allocation がある
- `/run/user/1000/insomnia/<pod>/status.json``state: "idle"` と runtime `segment_id` を持つ
- `/home/hare/.insomnia/sessions/pods/<pod>/metadata.json` は pending segment のまま
- 対応する session / segment `.jsonl` が存在しない
- `ReadPodOutput` は no new assistant text
`SpawnPod` の送信側は `send_run``Method::Run` を write してすぐ切断し、`TurnStart` 等の ack を待っていない。一方 server 側は接続直後に `Snapshot` を書いてから method を読むため、client がすぐ close すると server が snapshot write で失敗し、method を読む前に connection handler が終了する race があり得る。
この場合 `SpawnPod` は成功を返すが、child Pod は初回 task を実行していない。
同種の問題は child Pod の通知経路でも既に踏んでおり、送信側が write 後にすぐ切断せず、receiver 側の acknowledgement / observable event を待つ形にして解消している。`SpawnPod` の初回 task delivery も同じ性質の race と見なす。
## 方針
`SpawnPod` は child process / socket の起動だけでなく、初回 task が controller に受理され、少なくとも `UserMessage` または `TurnStart` が観測できるまで確認してから成功を返す。
既存の `SendToPod` が使う `send_run_and_confirm` と同等の acknowledgement を `SpawnPod` の初回 task 送信にも適用する。
## 要件
- `SpawnPod` の初回 task 送信は fire-and-forget にしない。
- `Method::Run` 送信後、`UserMessage` / `TurnStart` / `InvokeStart` など、run が受理されたことを示す event を待つ。
- timeout 時は `SpawnPod` を失敗扱いにする。
- 初回 task delivery に失敗した場合、process / registry / delegated scope の扱いを明確にする。
- cleanup するか、attach 可能な idle Pod として残すかを実装で決める。
- 少なくとも成功扱いで返さない。
- Server が connection 開始時に `Snapshot` を書く設計と競合しない。
- client 側が snapshot/event を読みながら `Method::Run` ack を待つ形にする。
- `SpawnPod` 成功後は、child Pod の metadata が pending でも、初回 run が開始済みであることを確認できる。
- session log materialization のタイミングそのものは別設計でもよい。
- `SendToPod``SpawnPod` の run delivery confirmation ロジックを可能な範囲で共通化する。
## 完了条件
- `SpawnPod` が初回 task の受理確認を待つ。
- 初回 task が実行されない race を再現する test または regression test がある。
- `SpawnPod` が success を返した後、child Pod が idle pending のまま task 未実行になる状態が起きない。
- delivery timeout / failure 時の error message が人間に分かる。
- `cargo fmt --check` と関連 crate の test が通る。
## 範囲外
- `tui -r` picker に live pending Pod を表示する修正。
- session log の SegmentStart materialization 方針変更。
- spawned child Pod panel UI。

View File

@ -0,0 +1,62 @@
# TUI picker: live pending Pod の表示優先と状態補完
## 背景
`tui -r` の Pod picker は session store の name-keyed Pod metadata と runtime registry の live allocation を合わせて表示している。しかし、spawned child Pod がまだ最初の user turn / SegmentStart を materialize していない場合、Pod metadata は pending segment のままになり、session log も存在しない。
実例として、`impl-llm-worker-stream-continuation` は live socket と runtime registry 上の segment_id を持っていたが、metadata は以下のように `session_id` のみだった。
```json
{
"pod_name": "impl-llm-worker-stream-continuation",
"active": {
"session_id": "019e5bc6-c3f3-7193-98a1-d64c635f86a1"
}
}
```
一方で runtime 側には segment_id が存在する。
```json
{
"pod_name": "impl-llm-worker-stream-continuation",
"segment_id": "019e5bc6-c3f3-7193-98a1-d6559bdc9cd6",
"state": "idle"
}
```
この状態の Pod は attach 可能だが、session log がないため `updated_at = 0` になり、picker の `updated_at desc` sort と `MAX_ROWS = 10` truncate によって一覧から漏れやすい。
## 方針
Live socket が reachable な Pod は、session log / metadata active segment が未確定でも attach 可能な対象として picker に表示する。restore 可能性と attach 可能性を分け、live pending Pod は restore 不能でも live attach 対象として扱う。
## 要件
- `tui -r` picker は reachable live Pod を stopped Pod より優先して表示する。
- `updated_at = 0` でも live row が `MAX_ROWS` truncate で落ちない。
- sort key は少なくとも live first, updated_at desc, pod_name になる。
- Live Pod の metadata が pending segment の場合でも picker row に表示する。
- preview は `[live, pending segment]` など、人間が状態を理解できる文言にする。
- debug id 表示では runtime registry の segment_id を可能なら表示する。
- Runtime registry / live status に segment_id があり、metadata に segment_id が無い場合、表示上は runtime segment_id を補完できるようにする。
- ただし session log が存在しない限り restore 可能とは扱わない。
- attach は live socket に対して行う。
- Existing stopped / corrupt Pod metadata rows の表示を壊さない。
- `ListVisiblePods` / discovery 側にも同様の pending live 表示不整合がある場合、必要なら後続 ticket に切り出す。
- この ticket の主対象は `tui -r` picker。
## 完了条件
- live pending Pod が `tui -r` に表示される。
- live pending Pod を選択すると live socket に attach する。
- live pending Pod が多数の stopped Pod によって `MAX_ROWS` truncate から漏れない。
- picker の sort / row build の unit test が追加または更新されている。
- `cargo fmt --check``cargo test -p tui picker` あるいは関連 TUI test が通る。
## 範囲外
- pending Pod metadata を runtime segment_id で永続的に書き換えること。
- session log が無い Pod を restore 可能にすること。
- spawned child Pod の first turn / SegmentStart materialization 方針の変更。
- 汎用 spawned Pod panel UI。