Compare commits
7 Commits
8dc23183c1
...
46765404bf
| Author | SHA1 | Date | |
|---|---|---|---|
| 46765404bf | |||
| 3d3db8b6ac | |||
| 3f750668ba | |||
| fe9b12aa65 | |||
| a414655366 | |||
| 0b6f09c112 | |||
| e5b918283a |
7
TODO.md
7
TODO.md
|
|
@ -7,13 +7,16 @@
|
|||
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
||||
- Pod/TUI: 手動 rollback 導線 → [tickets/manual-turn-rollback.md](tickets/manual-turn-rollback.md)
|
||||
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
|
||||
- llm-worker のエラー耐性
|
||||
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
|
||||
- SpawnPod 初回 task delivery の受理確認 → [tickets/spawnpod-initial-run-confirmation.md](tickets/spawnpod-initial-run-confirmation.md)
|
||||
- E2E テストハーネス(`tests/e2e/`、opt-in) → [tickets/e2e-harness.md](tickets/e2e-harness.md)
|
||||
- メモリ機構
|
||||
- consolidation skip 表示と invalid staging の観測性 → [tickets/memory-consolidation-skip-observability.md](tickets/memory-consolidation-skip-observability.md)
|
||||
|
||||
- TUI 拡充
|
||||
- navigation mode / block focus の設計 → [tickets/tui-navigation-mode-design.md](tickets/tui-navigation-mode-design.md)
|
||||
- spawned child Pod の一覧と一時 attach → [tickets/tui-spawned-pod-panel.md](tickets/tui-spawned-pod-panel.md)
|
||||
- actionbar transient notice API → [tickets/tui-actionbar-transient-notice-api.md](tickets/tui-actionbar-transient-notice-api.md)
|
||||
- tui -r picker で live pending Pod が表示から漏れる → [tickets/tui-picker-live-pending-pods.md](tickets/tui-picker-live-pending-pods.md)
|
||||
- user manifest env override 時の spawn scope overlay 前提ズレ → [tickets/tui-user-manifest-env-overlay.md](tickets/tui-user-manifest-env-overlay.md)
|
||||
- ユーザーマニフェストのモデル設定 wizard → [tickets/tui-user-model-setup.md](tickets/tui-user-model-setup.md)
|
||||
- セッション内 Task ツールの注意機構(無アクティビティで `<system-reminder>` ナッジ) → [tickets/session-todo-reminder.md](tickets/session-todo-reminder.md)
|
||||
|
|
|
|||
|
|
@ -59,4 +59,6 @@ pub use interceptor::Interceptor;
|
|||
pub use message::{ContentPart, Item, Message, Role};
|
||||
pub use tool::{ToolCall, ToolOutputLimits, ToolResult};
|
||||
pub use usage_record::UsageRecord;
|
||||
pub use worker::{RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult};
|
||||
pub use worker::{
|
||||
LlmRetryNotice, RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -36,6 +36,8 @@ impl std::fmt::Display for ConfigWarning {
|
|||
}
|
||||
}
|
||||
|
||||
pub type ResponseStream = Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>;
|
||||
|
||||
/// LLMクライアントのtrait
|
||||
///
|
||||
/// 各プロバイダはこのtraitを実装し、統一されたインターフェースを提供する。
|
||||
|
|
@ -49,10 +51,7 @@ pub trait LlmClient: Send + Sync {
|
|||
/// # Returns
|
||||
/// * `Ok(Stream)` - イベントストリーム
|
||||
/// * `Err(ClientError)` - エラー
|
||||
async fn stream(
|
||||
&self,
|
||||
request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError>;
|
||||
async fn stream(&self, request: Request) -> Result<ResponseStream, ClientError>;
|
||||
|
||||
/// Clone this client into a new `Box<dyn LlmClient>`.
|
||||
///
|
||||
|
|
@ -85,10 +84,7 @@ impl Clone for Box<dyn LlmClient> {
|
|||
/// これにより、動的ディスパッチを使用するクライアントも `Worker` で利用可能になる。
|
||||
#[async_trait]
|
||||
impl LlmClient for Box<dyn LlmClient> {
|
||||
async fn stream(
|
||||
&self,
|
||||
request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError> {
|
||||
async fn stream(&self, request: Request) -> Result<ResponseStream, ClientError> {
|
||||
(**self).stream(request).await
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
//! LLMクライアントエラー型
|
||||
|
||||
use std::fmt;
|
||||
use std::{fmt, time::Duration};
|
||||
|
||||
/// LLMクライアントのエラー
|
||||
#[derive(Debug)]
|
||||
|
|
@ -16,6 +16,7 @@ pub enum ClientError {
|
|||
status: Option<u16>,
|
||||
code: Option<String>,
|
||||
message: String,
|
||||
retry_after: Option<Duration>,
|
||||
},
|
||||
/// 設定エラー
|
||||
Config(String),
|
||||
|
|
@ -31,6 +32,7 @@ impl fmt::Display for ClientError {
|
|||
status,
|
||||
code,
|
||||
message,
|
||||
..
|
||||
} => {
|
||||
write!(f, "API error")?;
|
||||
if let Some(s) = status {
|
||||
|
|
@ -68,6 +70,22 @@ impl From<serde_json::Error> for ClientError {
|
|||
}
|
||||
}
|
||||
|
||||
impl ClientError {
|
||||
pub fn status(&self) -> Option<u16> {
|
||||
match self {
|
||||
ClientError::Api { status, .. } => *status,
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(&self) -> Option<Duration> {
|
||||
match self {
|
||||
ClientError::Api { retry_after, .. } => *retry_after,
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// transient な失敗としてリトライ対象になるかを判定する。
|
||||
///
|
||||
/// 対象:
|
||||
|
|
@ -97,6 +115,7 @@ mod tests {
|
|||
status,
|
||||
code: None,
|
||||
message: String::new(),
|
||||
retry_after: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
//! HTTP transient エラー向けリトライポリシー。
|
||||
//! LLM response stream を開く前の transient error 向けリトライポリシー。
|
||||
//!
|
||||
//! `transport.rs` の HTTP 送信〜ステータスチェック区間で `is_retryable`
|
||||
//! が true を返した失敗をリトライする際に、待ち時間と打ち切り条件を
|
||||
//! 提供する。SSE 読み出し開始後の失敗は対象外。
|
||||
//! Worker が `LlmClient::stream` の open error に対して `is_retryable` を見て
|
||||
//! retry / backoff / TUI event / cancellation をまとめて管理する。
|
||||
//! SSE 読み出し開始後の失敗は対象外。
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
|
|
|
|||
|
|
@ -131,6 +131,7 @@ impl GeminiScheme {
|
|||
status: None,
|
||||
code: Some("parse_error".to_string()),
|
||||
message: format!("Failed to parse Gemini SSE data: {} -> {}", e, data),
|
||||
retry_after: None,
|
||||
})?;
|
||||
|
||||
let mut events = Vec::new();
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ impl OpenAIScheme {
|
|||
status: None,
|
||||
code: Some("parse_error".to_string()),
|
||||
message: format!("Failed to parse SSE data: {} -> {}", e, data),
|
||||
retry_after: None,
|
||||
})?;
|
||||
|
||||
let mut events = Vec::new();
|
||||
|
|
|
|||
|
|
@ -597,6 +597,7 @@ fn from_json<T: for<'de> Deserialize<'de>>(data: &str) -> Result<T, ClientError>
|
|||
status: None,
|
||||
code: Some("parse_error".to_string()),
|
||||
message: format!("Failed to parse SSE data: {e}"),
|
||||
retry_after: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,15 +12,12 @@ use async_trait::async_trait;
|
|||
use eventsource_stream::Eventsource;
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, RETRY_AFTER};
|
||||
use tokio::time::Instant;
|
||||
use tracing::warn;
|
||||
|
||||
use super::auth::{AuthProvider, AuthRequirement};
|
||||
use super::capability::ModelCapability;
|
||||
use super::client::{ConfigWarning, LlmClient};
|
||||
use super::error::{ClientError, is_retryable};
|
||||
use super::client::{ConfigWarning, LlmClient, ResponseStream};
|
||||
use super::error::ClientError;
|
||||
use super::event::Event;
|
||||
use super::retry::RetryPolicy;
|
||||
use super::scheme::Scheme;
|
||||
use super::types::{Request, RequestConfig};
|
||||
|
||||
|
|
@ -67,7 +64,6 @@ pub struct HttpTransport<S: Scheme> {
|
|||
base_url: String,
|
||||
auth: ResolvedAuth,
|
||||
capability: ModelCapability,
|
||||
retry_policy: RetryPolicy,
|
||||
}
|
||||
|
||||
impl<S: Scheme> HttpTransport<S> {
|
||||
|
|
@ -89,7 +85,6 @@ impl<S: Scheme> HttpTransport<S> {
|
|||
base_url,
|
||||
auth,
|
||||
capability,
|
||||
retry_policy: RetryPolicy::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -99,12 +94,6 @@ impl<S: Scheme> HttpTransport<S> {
|
|||
self
|
||||
}
|
||||
|
||||
/// リトライポリシーを差し替える(テスト用 / 将来の manifest 化フック)。
|
||||
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
|
||||
self.retry_policy = policy;
|
||||
self
|
||||
}
|
||||
|
||||
fn build_url(&self) -> String {
|
||||
let path = self.scheme.path(&self.model_id);
|
||||
let url = format!("{}{}", self.base_url, path);
|
||||
|
|
@ -171,14 +160,12 @@ impl<S: Scheme + Clone> Clone for HttpTransport<S> {
|
|||
base_url: self.base_url.clone(),
|
||||
auth: self.auth.clone(),
|
||||
capability: self.capability.clone(),
|
||||
retry_policy: self.retry_policy.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// エラーレスポンスを `ClientError::Api` に変換し、`Retry-After` の秒数を
|
||||
/// 同時に取り出す。リトライループで wait の上書きに使う。
|
||||
async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Option<Duration>) {
|
||||
/// エラーレスポンスを `ClientError::Api` に変換する。
|
||||
async fn classify_error_response(resp: reqwest::Response) -> ClientError {
|
||||
let status = resp.status().as_u16();
|
||||
let retry_after = resp
|
||||
.headers()
|
||||
|
|
@ -187,7 +174,7 @@ async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Optio
|
|||
.and_then(|s| s.trim().parse::<u64>().ok())
|
||||
.map(Duration::from_secs);
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
let err = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
|
||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
|
||||
let error = json.get("error").unwrap_or(&json);
|
||||
let code = error.get("type").and_then(|v| v.as_str()).map(String::from);
|
||||
let message = error
|
||||
|
|
@ -199,15 +186,16 @@ async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Optio
|
|||
status: Some(status),
|
||||
code,
|
||||
message,
|
||||
retry_after,
|
||||
}
|
||||
} else {
|
||||
ClientError::Api {
|
||||
status: Some(status),
|
||||
code: None,
|
||||
message: text,
|
||||
retry_after,
|
||||
}
|
||||
}
|
||||
};
|
||||
(err, retry_after)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -220,51 +208,25 @@ impl<S: Scheme + Clone + 'static> LlmClient for HttpTransport<S> {
|
|||
self.scheme.validate_config(config)
|
||||
}
|
||||
|
||||
async fn stream(
|
||||
&self,
|
||||
request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError> {
|
||||
async fn stream(&self, request: Request) -> Result<ResponseStream, ClientError> {
|
||||
let url = self.build_url();
|
||||
let headers = self.build_headers().await?;
|
||||
let body = self
|
||||
.scheme
|
||||
.build_request_body(&self.model_id, &request, &self.capability);
|
||||
|
||||
let policy = &self.retry_policy;
|
||||
let started = Instant::now();
|
||||
let mut attempt: u32 = 0;
|
||||
let response = loop {
|
||||
let send_result = self
|
||||
let response = self
|
||||
.http_client
|
||||
.post(&url)
|
||||
.headers(headers.clone())
|
||||
.headers(headers)
|
||||
.json(&body)
|
||||
.send()
|
||||
.await;
|
||||
.await
|
||||
.map_err(ClientError::Http)?;
|
||||
|
||||
let (err, retry_after) = match send_result {
|
||||
Ok(resp) if resp.status().is_success() => break resp,
|
||||
Ok(resp) => classify_error_response(resp).await,
|
||||
Err(e) => (ClientError::Http(e), None),
|
||||
};
|
||||
|
||||
let next_attempt = attempt + 1;
|
||||
if next_attempt >= policy.max_attempts || !is_retryable(&err) {
|
||||
return Err(err);
|
||||
if !response.status().is_success() {
|
||||
return Err(classify_error_response(response).await);
|
||||
}
|
||||
let wait = retry_after.unwrap_or_else(|| policy.backoff(attempt));
|
||||
if started.elapsed() + wait > policy.total_timeout {
|
||||
return Err(err);
|
||||
}
|
||||
warn!(
|
||||
error = %err,
|
||||
attempt = next_attempt,
|
||||
wait_ms = wait.as_millis() as u64,
|
||||
"transient HTTP error, retrying"
|
||||
);
|
||||
tokio::time::sleep(wait).await;
|
||||
attempt = next_attempt;
|
||||
};
|
||||
|
||||
let scheme = self.scheme.clone();
|
||||
let byte_stream = response.bytes_stream().map_err(std::io::Error::other);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::{marker::PhantomData, time::Instant};
|
||||
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
|
|
@ -17,8 +17,8 @@ use crate::{
|
|||
PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction,
|
||||
},
|
||||
llm_client::{
|
||||
ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition,
|
||||
types::parse_tool_arguments,
|
||||
ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ResponseStream,
|
||||
ToolDefinition, error::is_retryable, retry::RetryPolicy, types::parse_tool_arguments,
|
||||
},
|
||||
state::{Locked, Mutable, WorkerState},
|
||||
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
|
||||
|
|
@ -99,6 +99,8 @@ enum ToolExecutionResult {
|
|||
Paused,
|
||||
}
|
||||
|
||||
const MAX_STREAM_CONTINUATIONS: u32 = 3;
|
||||
|
||||
/// Central component for managing LLM interactions
|
||||
///
|
||||
/// Receives input from the user, sends requests to the LLM, and
|
||||
|
|
@ -131,9 +133,28 @@ enum ToolExecutionResult {
|
|||
/// let out = worker.run("Continue").await?;
|
||||
/// let mut worker = out.worker;
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LlmRetryNotice {
|
||||
/// 直近で失敗した attempt 番号。1 origin。
|
||||
pub failed_attempt: u32,
|
||||
pub max_attempts: u32,
|
||||
pub wait: std::time::Duration,
|
||||
pub elapsed: std::time::Duration,
|
||||
pub status: Option<u16>,
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum StreamCompletion {
|
||||
Complete,
|
||||
Interrupted { reason: String },
|
||||
}
|
||||
|
||||
pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
||||
/// LLM client
|
||||
client: C,
|
||||
/// Retry policy for opening an LLM response stream.
|
||||
retry_policy: RetryPolicy,
|
||||
/// Event timeline
|
||||
timeline: Timeline,
|
||||
/// Text block collector (Timeline handler)
|
||||
|
|
@ -175,6 +196,10 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
llm_call_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// LlmCall-end callbacks
|
||||
llm_call_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
|
||||
/// Transport-level retry callbacks for a specific LlmCall.
|
||||
llm_retry_cbs: Vec<Box<dyn Fn(usize, &LlmRetryNotice) + Send + Sync>>,
|
||||
/// Stream continuation callbacks for a specific LlmCall.
|
||||
llm_continuation_cbs: Vec<Box<dyn Fn(usize, u32, u32, &str) + Send + Sync>>,
|
||||
/// Non-fatal warning callbacks. Invoked when the Worker wants to
|
||||
/// surface an advisory message to the upper layer (e.g. Pod) so it
|
||||
/// can be forwarded to the user — distinct from `tracing::warn!`,
|
||||
|
|
@ -355,6 +380,34 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.llm_call_end_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register a transport-level retry callback.
|
||||
pub fn on_llm_retry(
|
||||
&mut self,
|
||||
callback: impl Fn(usize, &LlmRetryNotice) + Send + Sync + 'static,
|
||||
) {
|
||||
self.llm_retry_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
/// Register a stream continuation callback.
|
||||
pub fn on_llm_continuation(
|
||||
&mut self,
|
||||
callback: impl Fn(usize, u32, u32, &str) + Send + Sync + 'static,
|
||||
) {
|
||||
self.llm_continuation_cbs.push(Box::new(callback));
|
||||
}
|
||||
|
||||
fn emit_llm_continuation(
|
||||
&self,
|
||||
llm_call: usize,
|
||||
attempt: u32,
|
||||
max_attempts: u32,
|
||||
reason: &str,
|
||||
) {
|
||||
for cb in &self.llm_continuation_cbs {
|
||||
cb(llm_call, attempt, max_attempts, reason);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a non-fatal warning callback.
|
||||
///
|
||||
/// The callback is invoked with a short human-readable message
|
||||
|
|
@ -964,6 +1017,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
}
|
||||
|
||||
let mut stream_continuations: u32 = 0;
|
||||
let mut continuing_stream = false;
|
||||
loop {
|
||||
if self.try_cancelled() {
|
||||
info!("Execution cancelled");
|
||||
|
|
@ -973,10 +1028,12 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
|
||||
let current_turn = self.turn_count;
|
||||
if !continuing_stream {
|
||||
debug!(turn = current_turn, "Turn start");
|
||||
for cb in &self.turn_start_cbs {
|
||||
cb(current_turn);
|
||||
}
|
||||
}
|
||||
|
||||
// Drain interceptor-side inputs that are meant to land in
|
||||
// history (notifications, cross-Pod events, system
|
||||
|
|
@ -1080,13 +1137,50 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
|
||||
// Stream LLM response
|
||||
let request = self.build_request(&tool_definitions, &request_context);
|
||||
self.stream_response(request).await?;
|
||||
let stream_outcome = self.stream_response(request, current_llm_call).await?;
|
||||
|
||||
for cb in &self.llm_call_end_cbs {
|
||||
cb(current_llm_call);
|
||||
}
|
||||
self.llm_call_count += 1;
|
||||
|
||||
if let StreamCompletion::Interrupted { reason } = stream_outcome {
|
||||
stream_continuations += 1;
|
||||
if stream_continuations > MAX_STREAM_CONTINUATIONS {
|
||||
self.last_run_interrupted = true;
|
||||
return Err(WorkerError::Client(ClientError::Api {
|
||||
status: None,
|
||||
code: None,
|
||||
message: format!("LLM stream interrupted too many times: {reason}"),
|
||||
retry_after: None,
|
||||
}));
|
||||
}
|
||||
|
||||
self.timeline.abort_current_block();
|
||||
self.timeline.flush_usage();
|
||||
let reasoning_items = self.reasoning_item_collector.take_collected();
|
||||
let text_blocks = self.text_block_collector.take_collected();
|
||||
// Do not recover tool calls from an interrupted stream. A completed
|
||||
// tool_use is executable only when the provider finishes the stream.
|
||||
let _dropped_tool_calls = self.tool_call_collector.take_collected();
|
||||
let assistant_items =
|
||||
self.build_assistant_items(&reasoning_items, &text_blocks, &[]);
|
||||
if !assistant_items.is_empty() {
|
||||
self.append_history_items(assistant_items);
|
||||
}
|
||||
self.emit_llm_continuation(
|
||||
current_llm_call,
|
||||
stream_continuations,
|
||||
MAX_STREAM_CONTINUATIONS,
|
||||
&reason,
|
||||
);
|
||||
continuing_stream = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
stream_continuations = 0;
|
||||
continuing_stream = false;
|
||||
|
||||
for cb in &self.turn_end_cbs {
|
||||
cb(current_turn);
|
||||
}
|
||||
|
|
@ -1138,18 +1232,18 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Open a stream, dispatch all events to the timeline, handle cancellation.
|
||||
async fn stream_response(&mut self, request: Request) -> Result<(), WorkerError> {
|
||||
debug!(
|
||||
item_count = request.items.len(),
|
||||
tool_count = request.tools.len(),
|
||||
has_system = request.system_prompt.is_some(),
|
||||
"Sending request to LLM"
|
||||
);
|
||||
async fn open_stream_with_retry(
|
||||
&mut self,
|
||||
request: Request,
|
||||
llm_call: usize,
|
||||
) -> Result<ResponseStream, WorkerError> {
|
||||
let policy = self.retry_policy.clone();
|
||||
let started = Instant::now();
|
||||
let mut failed_attempt: u32 = 0;
|
||||
|
||||
let mut stream = tokio::select! {
|
||||
stream_result = self.client.stream(request) => stream_result
|
||||
.inspect_err(|_| self.last_run_interrupted = true)?,
|
||||
loop {
|
||||
let stream_result = tokio::select! {
|
||||
stream_result = self.client.stream(request.clone()) => stream_result,
|
||||
cancel = self.cancel_rx.recv() => {
|
||||
if cancel.is_some() {
|
||||
info!("Cancelled before stream started");
|
||||
|
|
@ -1160,6 +1254,75 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
};
|
||||
|
||||
match stream_result {
|
||||
Ok(stream) => return Ok(stream),
|
||||
Err(err) => {
|
||||
let next_failed_attempt = failed_attempt + 1;
|
||||
if next_failed_attempt >= policy.max_attempts || !is_retryable(&err) {
|
||||
self.last_run_interrupted = true;
|
||||
return Err(WorkerError::Client(err));
|
||||
}
|
||||
|
||||
let wait = err
|
||||
.retry_after()
|
||||
.unwrap_or_else(|| policy.backoff(failed_attempt));
|
||||
let elapsed = started.elapsed();
|
||||
if elapsed + wait > policy.total_timeout {
|
||||
self.last_run_interrupted = true;
|
||||
return Err(WorkerError::Client(err));
|
||||
}
|
||||
|
||||
warn!(
|
||||
error = %err,
|
||||
failed_attempt = next_failed_attempt,
|
||||
wait_ms = wait.as_millis() as u64,
|
||||
"transient LLM request error, retrying"
|
||||
);
|
||||
let notice = LlmRetryNotice {
|
||||
failed_attempt: next_failed_attempt,
|
||||
max_attempts: policy.max_attempts,
|
||||
wait,
|
||||
elapsed,
|
||||
status: err.status(),
|
||||
error: err.to_string(),
|
||||
};
|
||||
for cb in &self.llm_retry_cbs {
|
||||
cb(llm_call, ¬ice);
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(wait) => {}
|
||||
cancel = self.cancel_rx.recv() => {
|
||||
if cancel.is_some() {
|
||||
info!("Cancelled during LLM retry backoff");
|
||||
}
|
||||
self.timeline.abort_current_block();
|
||||
self.last_run_interrupted = true;
|
||||
return Err(WorkerError::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
failed_attempt = next_failed_attempt;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Open a stream, dispatch all events to the timeline, handle cancellation.
|
||||
async fn stream_response(
|
||||
&mut self,
|
||||
request: Request,
|
||||
llm_call: usize,
|
||||
) -> Result<StreamCompletion, WorkerError> {
|
||||
debug!(
|
||||
item_count = request.items.len(),
|
||||
tool_count = request.tools.len(),
|
||||
has_system = request.system_prompt.is_some(),
|
||||
"Sending request to LLM"
|
||||
);
|
||||
|
||||
let mut stream = self.open_stream_with_retry(request, llm_call).await?;
|
||||
|
||||
let mut event_count: usize = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
|
@ -1175,12 +1338,17 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
warn!(error = %e, "Stream error");
|
||||
}
|
||||
}
|
||||
let event = result
|
||||
.inspect_err(|_| {
|
||||
let event = match result {
|
||||
Ok(event) => event,
|
||||
Err(err) => {
|
||||
self.last_run_interrupted = true;
|
||||
// 部分情報でも発火しておく(料金会計用)
|
||||
self.timeline.flush_usage();
|
||||
})?;
|
||||
return Ok(StreamCompletion::Interrupted {
|
||||
reason: err.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
self.timeline.dispatch(&event);
|
||||
}
|
||||
None => break,
|
||||
|
|
@ -1200,7 +1368,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
// ストリーム完了時に集約済み Usage を 1 度だけ発火
|
||||
self.timeline.flush_usage();
|
||||
debug!(event_count = event_count, "Stream completed");
|
||||
Ok(())
|
||||
Ok(StreamCompletion::Complete)
|
||||
}
|
||||
|
||||
/// Execute tools and push results to history.
|
||||
|
|
@ -1254,6 +1422,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
|
||||
Self {
|
||||
client,
|
||||
retry_policy: RetryPolicy::default(),
|
||||
timeline,
|
||||
text_block_collector,
|
||||
tool_call_collector,
|
||||
|
|
@ -1270,6 +1439,8 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
turn_end_cbs: Vec::new(),
|
||||
llm_call_start_cbs: Vec::new(),
|
||||
llm_call_end_cbs: Vec::new(),
|
||||
llm_retry_cbs: Vec::new(),
|
||||
llm_continuation_cbs: Vec::new(),
|
||||
warning_cbs: Vec::new(),
|
||||
tool_result_cbs: Vec::new(),
|
||||
history_append_cbs: Vec::new(),
|
||||
|
|
@ -1385,6 +1556,12 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
self
|
||||
}
|
||||
|
||||
/// Set the retry policy used when opening an LLM response stream.
|
||||
pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
|
||||
self.retry_policy = retry_policy;
|
||||
self
|
||||
}
|
||||
|
||||
/// Validate current configuration against the provider
|
||||
///
|
||||
/// Returns an error if there are unsupported settings.
|
||||
|
|
@ -1507,6 +1684,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
let locked_prefix_len = self.history.len();
|
||||
Worker {
|
||||
client: self.client,
|
||||
retry_policy: self.retry_policy,
|
||||
timeline: self.timeline,
|
||||
text_block_collector: self.text_block_collector,
|
||||
tool_call_collector: self.tool_call_collector,
|
||||
|
|
@ -1523,6 +1701,8 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
turn_end_cbs: self.turn_end_cbs,
|
||||
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
llm_retry_cbs: self.llm_retry_cbs,
|
||||
llm_continuation_cbs: self.llm_continuation_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
@ -1594,6 +1774,7 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
pub fn unlock(self) -> Worker<C, Mutable> {
|
||||
Worker {
|
||||
client: self.client,
|
||||
retry_policy: self.retry_policy,
|
||||
timeline: self.timeline,
|
||||
text_block_collector: self.text_block_collector,
|
||||
tool_call_collector: self.tool_call_collector,
|
||||
|
|
@ -1610,6 +1791,8 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
turn_end_cbs: self.turn_end_cbs,
|
||||
llm_call_start_cbs: self.llm_call_start_cbs,
|
||||
llm_call_end_cbs: self.llm_call_end_cbs,
|
||||
llm_retry_cbs: self.llm_retry_cbs,
|
||||
llm_continuation_cbs: self.llm_continuation_cbs,
|
||||
warning_cbs: self.warning_cbs,
|
||||
tool_result_cbs: self.tool_result_cbs,
|
||||
history_append_cbs: self.history_append_cbs,
|
||||
|
|
|
|||
|
|
@ -4,17 +4,77 @@
|
|||
|
||||
mod common;
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common::MockLlmClient;
|
||||
use llm_worker::Worker;
|
||||
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent as ClientStatusEvent};
|
||||
use llm_worker::llm_client::retry::RetryPolicy;
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request, ResponseStream};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
|
||||
// =============================================================================
|
||||
// Tests
|
||||
// =============================================================================
|
||||
#[derive(Clone)]
|
||||
struct FailOnceClient {
|
||||
calls: Arc<AtomicUsize>,
|
||||
events: Vec<Event>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LlmClient for FailOnceClient {
|
||||
async fn stream(&self, _request: Request) -> Result<ResponseStream, ClientError> {
|
||||
if self.calls.fetch_add(1, Ordering::SeqCst) == 0 {
|
||||
return Err(ClientError::Api {
|
||||
status: Some(504),
|
||||
code: None,
|
||||
message: "gateway timeout".into(),
|
||||
retry_after: None,
|
||||
});
|
||||
}
|
||||
Ok(Box::pin(futures::stream::iter(
|
||||
self.events.clone().into_iter().map(Ok),
|
||||
)))
|
||||
}
|
||||
|
||||
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_callback_llm_retry_event() {
|
||||
let events = vec![Event::Status(ClientStatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
})];
|
||||
let client = FailOnceClient {
|
||||
calls: Arc::new(AtomicUsize::new(0)),
|
||||
events,
|
||||
};
|
||||
let mut worker = Worker::new(client).with_retry_policy(RetryPolicy {
|
||||
base: Duration::from_millis(1),
|
||||
cap: Duration::from_millis(1),
|
||||
max_attempts: 2,
|
||||
total_timeout: Duration::from_secs(1),
|
||||
});
|
||||
|
||||
let notices = Arc::new(Mutex::new(Vec::new()));
|
||||
let sink = notices.clone();
|
||||
worker.on_llm_retry(move |llm_call, notice| {
|
||||
sink.lock().unwrap().push((llm_call, notice.clone()));
|
||||
});
|
||||
|
||||
let result = worker.run("retry once").await;
|
||||
assert!(result.is_ok(), "worker should succeed after one retry");
|
||||
|
||||
let notices = notices.lock().unwrap();
|
||||
assert_eq!(notices.len(), 1);
|
||||
assert_eq!(notices[0].0, 0);
|
||||
assert_eq!(notices[0].1.failed_attempt, 1);
|
||||
assert_eq!(notices[0].1.max_attempts, 2);
|
||||
assert_eq!(notices[0].1.status, Some(504));
|
||||
}
|
||||
|
||||
/// Verify that on_text_block correctly receives delta and stop events
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@ impl LlmClient for MockLlmClient {
|
|||
status: Some(500),
|
||||
code: Some("mock_error".to_string()),
|
||||
message: "No more mock responses".to_string(),
|
||||
retry_after: None,
|
||||
});
|
||||
}
|
||||
let events = self.responses[count].clone();
|
||||
|
|
|
|||
|
|
@ -1,12 +1,7 @@
|
|||
//! HTTP transport の transient エラーリトライ挙動の integration テスト。
|
||||
//! HTTP transport の単発 request / error classification テスト。
|
||||
//!
|
||||
//! 対応チケット: `tickets/llm-worker-transient-retry.md`。
|
||||
//! - 503 / 529 / connect refused でリトライ発火
|
||||
//! - max_attempts 上限到達でエラー
|
||||
//! - `Retry-After` ヘッダで指数バックオフを上書き
|
||||
//! - `parse_sse` 由来の `ClientError::Sse`(mid-stream 想定)はリトライしない
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
//! Retry/backoff は Worker の lifecycle 管理に属するため、transport は 1 回だけ
|
||||
//! request を送り、HTTP status / Retry-After を `ClientError` に載せて返す。
|
||||
|
||||
use futures::StreamExt;
|
||||
use llm_worker::llm_client::LlmClient;
|
||||
|
|
@ -14,16 +9,16 @@ use llm_worker::llm_client::auth::AuthRequirement;
|
|||
use llm_worker::llm_client::capability::ModelCapability;
|
||||
use llm_worker::llm_client::error::ClientError;
|
||||
use llm_worker::llm_client::event::Event;
|
||||
use llm_worker::llm_client::retry::RetryPolicy;
|
||||
use llm_worker::llm_client::scheme::Scheme;
|
||||
use llm_worker::llm_client::transport::{HttpTransport, ResolvedAuth};
|
||||
use llm_worker::llm_client::types::Request;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
use wiremock::matchers::{method, path};
|
||||
use wiremock::{Mock, MockServer, ResponseTemplate};
|
||||
|
||||
/// SSE 本体は触らないテスト用 scheme。`parse_fail` を立てると
|
||||
/// stream 消費中(= retry loop の外)で `ClientError::Sse` を返す。
|
||||
/// stream 消費中で `ClientError::Sse` を返す。
|
||||
#[derive(Clone)]
|
||||
struct DummyScheme {
|
||||
parse_fail: bool,
|
||||
|
|
@ -31,18 +26,23 @@ struct DummyScheme {
|
|||
|
||||
impl Scheme for DummyScheme {
|
||||
type State = ();
|
||||
|
||||
fn default_base_url(&self) -> &'static str {
|
||||
""
|
||||
}
|
||||
|
||||
fn path(&self, _: &str) -> String {
|
||||
"/v1/chat".into()
|
||||
}
|
||||
|
||||
fn required_auth(&self) -> AuthRequirement {
|
||||
AuthRequirement::None
|
||||
}
|
||||
|
||||
fn build_request_body(&self, _: &str, _: &Request, _: &ModelCapability) -> Value {
|
||||
serde_json::json!({})
|
||||
}
|
||||
|
||||
fn parse_sse(&self, _: &str, _: &str, _: &mut ()) -> Result<Vec<Event>, ClientError> {
|
||||
if self.parse_fail {
|
||||
Err(ClientError::Sse(
|
||||
|
|
@ -52,25 +52,13 @@ impl Scheme for DummyScheme {
|
|||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
fn default_capability(&self) -> ModelCapability {
|
||||
ModelCapability::minimal()
|
||||
}
|
||||
}
|
||||
|
||||
fn fast_policy(max_attempts: u32) -> RetryPolicy {
|
||||
RetryPolicy {
|
||||
base: Duration::from_millis(1),
|
||||
cap: Duration::from_millis(1),
|
||||
max_attempts,
|
||||
total_timeout: Duration::from_secs(60),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_transport(
|
||||
base_url: impl Into<String>,
|
||||
parse_fail: bool,
|
||||
policy: RetryPolicy,
|
||||
) -> HttpTransport<DummyScheme> {
|
||||
fn build_transport(base_url: impl Into<String>, parse_fail: bool) -> HttpTransport<DummyScheme> {
|
||||
HttpTransport::new(
|
||||
DummyScheme { parse_fail },
|
||||
"test-model",
|
||||
|
|
@ -78,7 +66,6 @@ fn build_transport(
|
|||
ResolvedAuth::None,
|
||||
ModelCapability::minimal(),
|
||||
)
|
||||
.with_retry_policy(policy)
|
||||
}
|
||||
|
||||
fn ok_sse() -> ResponseTemplate {
|
||||
|
|
@ -88,78 +75,11 @@ fn ok_sse() -> ResponseTemplate {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_503_then_succeeds() {
|
||||
async fn retryable_status_returns_api_error_without_retrying() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
.respond_with(ResponseTemplate::new(503).set_body_string("upstream connect error"))
|
||||
.up_to_n_times(2)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
.respond_with(ok_sse())
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let transport = build_transport(server.uri(), false, fast_policy(5));
|
||||
let mut stream = transport
|
||||
.stream(Request::default())
|
||||
.await
|
||||
.expect("stream should succeed after retries");
|
||||
while stream.next().await.is_some() {}
|
||||
|
||||
let received = server.received_requests().await.unwrap();
|
||||
assert_eq!(received.len(), 3, "two failures plus one success expected");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_529_then_exhausts() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
.respond_with(ResponseTemplate::new(529).set_body_string("overloaded"))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let transport = build_transport(server.uri(), false, fast_policy(3));
|
||||
match transport.stream(Request::default()).await {
|
||||
Err(ClientError::Api {
|
||||
status: Some(529), ..
|
||||
}) => {}
|
||||
Err(other) => panic!("expected Api(529), got {other:?}"),
|
||||
Ok(_) => panic!("expected error after exhausting retries"),
|
||||
}
|
||||
|
||||
let received = server.received_requests().await.unwrap();
|
||||
assert_eq!(received.len(), 3, "should hit max_attempts and stop");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_refused_retries_then_fails() {
|
||||
// 接続不能なローカルアドレスを使う。Linux では `Connection refused` で
|
||||
// 即時失敗するため、`fast_policy` ならテストが秒以下で終わる。
|
||||
let unreachable = "http://127.0.0.1:1";
|
||||
|
||||
let transport = build_transport(unreachable, false, fast_policy(3));
|
||||
match transport.stream(Request::default()).await {
|
||||
Err(ClientError::Http(e)) => {
|
||||
assert!(
|
||||
e.is_connect() || e.is_timeout(),
|
||||
"expected connect/timeout, got {e:?}"
|
||||
);
|
||||
}
|
||||
Err(other) => panic!("expected Http error, got {other:?}"),
|
||||
Ok(_) => panic!("expected error connecting to closed port"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retry_after_header_overrides_backoff() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
.respond_with(ResponseTemplate::new(503).insert_header("retry-after", "1"))
|
||||
.up_to_n_times(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
|
@ -169,34 +89,48 @@ async fn retry_after_header_overrides_backoff() {
|
|||
.mount(&server)
|
||||
.await;
|
||||
|
||||
// base/cap を 1ms に絞った policy で `Retry-After: 1` を観察すると、
|
||||
// 指数バックオフ単独なら 1ms 程度で終わるはずが Retry-After に従って
|
||||
// 1 秒待つ → 経過時間で override を検証できる。
|
||||
let policy = RetryPolicy {
|
||||
base: Duration::from_millis(1),
|
||||
cap: Duration::from_millis(1),
|
||||
max_attempts: 3,
|
||||
total_timeout: Duration::from_secs(10),
|
||||
};
|
||||
let transport = build_transport(server.uri(), false, policy);
|
||||
let transport = build_transport(server.uri(), false);
|
||||
match transport.stream(Request::default()).await {
|
||||
Err(ClientError::Api {
|
||||
status: Some(503), ..
|
||||
}) => {}
|
||||
Err(other) => panic!("expected Api(503), got {other:?}"),
|
||||
Ok(_) => panic!("transport must not retry internally"),
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let mut stream = transport.stream(Request::default()).await.expect("ok");
|
||||
while stream.next().await.is_some() {}
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
assert!(
|
||||
elapsed >= Duration::from_secs(1),
|
||||
"Retry-After=1 should make us wait >=1s, elapsed={elapsed:?}"
|
||||
);
|
||||
assert!(
|
||||
elapsed < Duration::from_secs(3),
|
||||
"Retry-After=1 should not balloon, elapsed={elapsed:?}"
|
||||
let received = server.received_requests().await.unwrap();
|
||||
assert_eq!(
|
||||
received.len(),
|
||||
1,
|
||||
"transport should send exactly one request"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mid_stream_sse_error_does_not_retry() {
|
||||
async fn retry_after_header_is_preserved_on_api_error() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
.respond_with(ResponseTemplate::new(503).insert_header("retry-after", "1"))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let transport = build_transport(server.uri(), false);
|
||||
match transport.stream(Request::default()).await {
|
||||
Err(
|
||||
err @ ClientError::Api {
|
||||
status: Some(503), ..
|
||||
},
|
||||
) => {
|
||||
assert_eq!(err.retry_after(), Some(Duration::from_secs(1)));
|
||||
}
|
||||
Err(other) => panic!("expected Api(503), got {other:?}"),
|
||||
Ok(_) => panic!("expected error"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mid_stream_sse_error_is_stream_item_error() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
|
|
@ -211,11 +145,11 @@ async fn mid_stream_sse_error_does_not_retry() {
|
|||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let transport = build_transport(server.uri(), true, fast_policy(5));
|
||||
let transport = build_transport(server.uri(), true);
|
||||
let mut stream = transport
|
||||
.stream(Request::default())
|
||||
.await
|
||||
.expect("status 200 should bypass retry loop");
|
||||
.expect("status 200 should open stream");
|
||||
let mut saw_sse_err = false;
|
||||
while let Some(item) = stream.next().await {
|
||||
if matches!(item, Err(ClientError::Sse(_))) {
|
||||
|
|
@ -225,11 +159,11 @@ async fn mid_stream_sse_error_does_not_retry() {
|
|||
assert!(saw_sse_err, "expected Sse error from stream consumer");
|
||||
|
||||
let received = server.received_requests().await.unwrap();
|
||||
assert_eq!(received.len(), 1, "mid-stream Sse must not retry");
|
||||
assert_eq!(received.len(), 1, "mid-stream Sse must not reopen stream");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn non_retryable_status_returns_immediately() {
|
||||
async fn non_retryable_status_returns_api_error() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat"))
|
||||
|
|
@ -237,7 +171,7 @@ async fn non_retryable_status_returns_immediately() {
|
|||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let transport = build_transport(server.uri(), false, fast_policy(5));
|
||||
let transport = build_transport(server.uri(), false);
|
||||
match transport.stream(Request::default()).await {
|
||||
Err(ClientError::Api {
|
||||
status: Some(401), ..
|
||||
|
|
@ -247,5 +181,5 @@ async fn non_retryable_status_returns_immediately() {
|
|||
}
|
||||
|
||||
let received = server.received_requests().await.unwrap();
|
||||
assert_eq!(received.len(), 1, "401 must not retry");
|
||||
assert_eq!(received.len(), 1);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -330,6 +330,29 @@ fn wire_event_bridges_on_worker<C, St>(
|
|||
let _ = tx.send(Event::LlmCallEnd { llm_call });
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_llm_retry(move |llm_call, notice| {
|
||||
let _ = tx.send(Event::LlmRetry {
|
||||
llm_call,
|
||||
failed_attempt: notice.failed_attempt,
|
||||
max_attempts: notice.max_attempts,
|
||||
wait_ms: notice.wait.as_millis() as u64,
|
||||
elapsed_ms: notice.elapsed.as_millis() as u64,
|
||||
status: notice.status,
|
||||
error: notice.error.clone(),
|
||||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
worker.on_llm_continuation(move |llm_call, attempt, max_attempts, reason| {
|
||||
let _ = tx.send(Event::LlmContinuation {
|
||||
llm_call,
|
||||
attempt,
|
||||
max_attempts,
|
||||
reason: reason.to_owned(),
|
||||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
let activity = ai_activity.clone();
|
||||
worker.on_text_block(move |block| {
|
||||
|
|
|
|||
|
|
@ -101,6 +101,7 @@ impl LlmClient for MockClient {
|
|||
status: Some(500),
|
||||
code: Some("mock".into()),
|
||||
message: "No more responses".into(),
|
||||
retry_after: None,
|
||||
});
|
||||
}
|
||||
let response = self.responses[count].clone();
|
||||
|
|
|
|||
|
|
@ -298,6 +298,29 @@ pub enum Event {
|
|||
LlmCallEnd {
|
||||
llm_call: usize,
|
||||
},
|
||||
/// A transport-level LLM request retry has been scheduled.
|
||||
///
|
||||
/// This is operational state for clients to render while the worker is
|
||||
/// waiting in backoff. It is not part of conversation history.
|
||||
LlmRetry {
|
||||
llm_call: usize,
|
||||
/// The attempt that just failed. 1 origin.
|
||||
failed_attempt: u32,
|
||||
max_attempts: u32,
|
||||
wait_ms: u64,
|
||||
elapsed_ms: u64,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
status: Option<u16>,
|
||||
error: String,
|
||||
},
|
||||
/// Stream generation was interrupted after events had begun and the worker
|
||||
/// is continuing with a follow-up LLM request.
|
||||
LlmContinuation {
|
||||
llm_call: usize,
|
||||
attempt: u32,
|
||||
max_attempts: u32,
|
||||
reason: String,
|
||||
},
|
||||
TextDelta {
|
||||
text: String,
|
||||
},
|
||||
|
|
@ -867,6 +890,69 @@ mod tests {
|
|||
assert_eq!(parsed["data"]["llm_call"], 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_llm_retry_roundtrip() {
|
||||
let event = Event::LlmRetry {
|
||||
llm_call: 3,
|
||||
failed_attempt: 1,
|
||||
max_attempts: 4,
|
||||
wait_ms: 800,
|
||||
elapsed_ms: 120,
|
||||
status: Some(504),
|
||||
error: "API error (status: 504): gateway timeout".into(),
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["event"], "llm_retry");
|
||||
assert_eq!(parsed["data"]["status"], 504);
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
Event::LlmRetry {
|
||||
llm_call,
|
||||
failed_attempt,
|
||||
max_attempts,
|
||||
wait_ms,
|
||||
status,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(llm_call, 3);
|
||||
assert_eq!(failed_attempt, 1);
|
||||
assert_eq!(max_attempts, 4);
|
||||
assert_eq!(wait_ms, 800);
|
||||
assert_eq!(status, Some(504));
|
||||
}
|
||||
other => panic!("expected LlmRetry, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_llm_continuation_roundtrip() {
|
||||
let event = Event::LlmContinuation {
|
||||
llm_call: 4,
|
||||
attempt: 1,
|
||||
max_attempts: 3,
|
||||
reason: "SSE parse error: closed".into(),
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["event"], "llm_continuation");
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
Event::LlmContinuation {
|
||||
llm_call,
|
||||
attempt,
|
||||
max_attempts,
|
||||
reason,
|
||||
} => {
|
||||
assert_eq!(llm_call, 4);
|
||||
assert_eq!(attempt, 1);
|
||||
assert_eq!(max_attempts, 3);
|
||||
assert_eq!(reason, "SSE parse error: closed");
|
||||
}
|
||||
other => panic!("expected LlmContinuation, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn method_notify_json_roundtrip() {
|
||||
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ impl CodexAuthError {
|
|||
status: None,
|
||||
code: Some("refresh_transient".into()),
|
||||
message: msg,
|
||||
retry_after: None,
|
||||
},
|
||||
CodexAuthError::RefreshPermanent { reason, message } => ClientError::Api {
|
||||
status: Some(401),
|
||||
|
|
@ -66,6 +67,7 @@ impl CodexAuthError {
|
|||
PermanentReason::Other => "refresh_token_failed".into(),
|
||||
}),
|
||||
message: format!("{message}. Please run `codex login` again."),
|
||||
retry_after: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ impl LlmClient for MockLlmClient {
|
|||
status: Some(500),
|
||||
code: Some("mock_error".to_string()),
|
||||
message: "No more mock responses".to_string(),
|
||||
retry_after: None,
|
||||
});
|
||||
}
|
||||
let events = self.responses[count].clone();
|
||||
|
|
|
|||
|
|
@ -89,6 +89,8 @@ pub struct App {
|
|||
pub context_window: u64,
|
||||
pub turn_index: usize,
|
||||
pub current_tool: Option<String>,
|
||||
/// Latest LLM wait/retry lifecycle event for actionbar observability.
|
||||
pub latest_llm_wait_event: Option<String>,
|
||||
/// Latest memory extract/consolidation lifecycle event for actionbar observability.
|
||||
pub latest_memory_worker_event: Option<String>,
|
||||
/// Normal composer input that is submitted as `Method::Run`.
|
||||
|
|
@ -150,6 +152,7 @@ impl App {
|
|||
context_window: 0,
|
||||
turn_index: 0,
|
||||
current_tool: None,
|
||||
latest_llm_wait_event: None,
|
||||
latest_memory_worker_event: None,
|
||||
input: InputBuffer::new(),
|
||||
command_input: InputBuffer::new(),
|
||||
|
|
@ -608,20 +611,52 @@ impl App {
|
|||
self.set_pod_status(PodStatus::Running);
|
||||
self.run_requests += 1;
|
||||
self.current_tool = None;
|
||||
self.latest_llm_wait_event = None;
|
||||
self.assistant_streaming = false;
|
||||
}
|
||||
// UI consumers of Invoke / LlmCall semantics are out of scope
|
||||
// for `tickets/invoke-turn-llmcall-semantics.md`; events flow
|
||||
// through to subscribers but the TUI currently derives its
|
||||
// turn header from `UserMessage` / `SystemItem` arrivals.
|
||||
Event::InvokeStart { .. } | Event::LlmCallStart { .. } | Event::LlmCallEnd { .. } => {}
|
||||
Event::InvokeStart { .. } | Event::LlmCallStart { .. } | Event::LlmCallEnd { .. } => {
|
||||
self.latest_llm_wait_event = None;
|
||||
}
|
||||
Event::LlmRetry {
|
||||
failed_attempt,
|
||||
max_attempts,
|
||||
wait_ms,
|
||||
status,
|
||||
error,
|
||||
..
|
||||
} => {
|
||||
let next_attempt = failed_attempt.saturating_add(1).min(max_attempts);
|
||||
let reason = status
|
||||
.map(|code| format!("HTTP {code}"))
|
||||
.unwrap_or_else(|| error);
|
||||
self.latest_llm_wait_event = Some(format!(
|
||||
"retrying LLM request after {reason} (attempt {next_attempt}/{max_attempts} in {})",
|
||||
fmt_millis(wait_ms)
|
||||
));
|
||||
}
|
||||
Event::LlmContinuation {
|
||||
attempt,
|
||||
max_attempts,
|
||||
reason,
|
||||
..
|
||||
} => {
|
||||
self.latest_llm_wait_event = Some(format!(
|
||||
"LLM stream interrupted; continuing generation ({attempt}/{max_attempts}): {reason}"
|
||||
));
|
||||
}
|
||||
Event::TextDelta { text } => {
|
||||
self.latest_llm_wait_event = None;
|
||||
self.append_assistant_text(&text);
|
||||
}
|
||||
Event::TextDone { .. } => {
|
||||
self.assistant_streaming = false;
|
||||
}
|
||||
Event::ThinkingStart => {
|
||||
self.latest_llm_wait_event = None;
|
||||
self.assistant_streaming = false;
|
||||
self.blocks.push(Block::Thinking(ThinkingBlock {
|
||||
text: String::new(),
|
||||
|
|
@ -661,6 +696,7 @@ impl App {
|
|||
self.current_tool = None;
|
||||
}
|
||||
Event::ToolCallStart { id, name } => {
|
||||
self.latest_llm_wait_event = None;
|
||||
self.current_tool = Some(name.clone());
|
||||
self.assistant_streaming = false;
|
||||
self.blocks.push(Block::ToolCall(ToolCallBlock {
|
||||
|
|
@ -702,6 +738,7 @@ impl App {
|
|||
output,
|
||||
is_error,
|
||||
} => {
|
||||
self.latest_llm_wait_event = None;
|
||||
// Pull the name / args out first so we can look at the
|
||||
// (immutable) cache before taking the mutable block
|
||||
// borrow below.
|
||||
|
|
@ -776,6 +813,7 @@ impl App {
|
|||
self.push_error(format!("[{code:?}] {message}"));
|
||||
}
|
||||
Event::RunEnd { result } => {
|
||||
self.latest_llm_wait_event = None;
|
||||
if matches!(result, RunResult::RolledBack) {
|
||||
self.handle_rolled_back_run();
|
||||
} else {
|
||||
|
|
@ -889,6 +927,7 @@ impl App {
|
|||
self.run_upload_tokens = 0;
|
||||
self.run_output_tokens = 0;
|
||||
self.current_tool = None;
|
||||
self.latest_llm_wait_event = None;
|
||||
self.assistant_streaming = false;
|
||||
}
|
||||
|
||||
|
|
@ -1291,6 +1330,14 @@ pub fn fmt_tokens(n: u64) -> String {
|
|||
}
|
||||
}
|
||||
|
||||
fn fmt_millis(ms: u64) -> String {
|
||||
if ms >= 1_000 {
|
||||
format!("{:.1}s", ms as f64 / 1_000.0)
|
||||
} else {
|
||||
format!("{ms}ms")
|
||||
}
|
||||
}
|
||||
|
||||
fn message_text(item: &serde_json::Value) -> String {
|
||||
item["content"]
|
||||
.as_array()
|
||||
|
|
@ -1356,6 +1403,47 @@ pub fn alert_source_label(source: AlertSource) -> &'static str {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod llm_wait_event_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn llm_retry_updates_and_progress_clears_transient_status() {
|
||||
let mut app = App::new("test".into());
|
||||
app.handle_pod_event(Event::LlmRetry {
|
||||
llm_call: 2,
|
||||
failed_attempt: 1,
|
||||
max_attempts: 4,
|
||||
wait_ms: 1_200,
|
||||
elapsed_ms: 50,
|
||||
status: Some(504),
|
||||
error: "gateway timeout".into(),
|
||||
});
|
||||
assert_eq!(
|
||||
app.latest_llm_wait_event.as_deref(),
|
||||
Some("retrying LLM request after HTTP 504 (attempt 2/4 in 1.2s)")
|
||||
);
|
||||
|
||||
app.handle_pod_event(Event::TextDelta { text: "ok".into() });
|
||||
assert!(app.latest_llm_wait_event.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn llm_continuation_updates_transient_status() {
|
||||
let mut app = App::new("test".into());
|
||||
app.handle_pod_event(Event::LlmContinuation {
|
||||
llm_call: 3,
|
||||
attempt: 1,
|
||||
max_attempts: 3,
|
||||
reason: "SSE parse error: closed".into(),
|
||||
});
|
||||
assert_eq!(
|
||||
app.latest_llm_wait_event.as_deref(),
|
||||
Some("LLM stream interrupted; continuing generation (1/3): SSE parse error: closed")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod completion_flow_tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -1158,7 +1158,14 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
|
|||
];
|
||||
|
||||
if app.running {
|
||||
let status = if let Some(tool) = &app.current_tool {
|
||||
let status = if let Some(wait_event) = &app.latest_llm_wait_event {
|
||||
format!(
|
||||
"request: {} | ↑{}/↓{} | {wait_event}",
|
||||
app.run_requests,
|
||||
fmt_tokens(app.run_upload_tokens),
|
||||
fmt_tokens(app.run_output_tokens),
|
||||
)
|
||||
} else if let Some(tool) = &app.current_tool {
|
||||
format!(
|
||||
"request: {} | ↑{}/↓{} | tool: {tool}",
|
||||
app.run_requests,
|
||||
|
|
@ -1218,6 +1225,11 @@ fn draw_actionbar(frame: &mut Frame, app: &App, area: Rect) {
|
|||
"Alt-q edit queued Alt-c clear queued",
|
||||
Style::default().fg(Color::DarkGray),
|
||||
));
|
||||
} else if let Some(llm_event) = app.latest_llm_wait_event.as_deref() {
|
||||
left.push(Span::styled(
|
||||
truncate_with_ellipsis(llm_event, 96),
|
||||
Style::default().fg(Color::Yellow),
|
||||
));
|
||||
} else if let Some(memory_event) = app.latest_memory_worker_event.as_deref() {
|
||||
left.push(Span::styled(
|
||||
truncate_with_ellipsis(memory_event, 72),
|
||||
|
|
|
|||
116
docs/report/2026-05-24-compact-retained-tail-oversize.md
Normal file
116
docs/report/2026-05-24-compact-retained-tail-oversize.md
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
# Compact 後 retained tail が巨大化して次 turn が壊れた疑い
|
||||
|
||||
## 概要
|
||||
|
||||
2026-05-24 の開発セッション中、手動 compact 後に「続けられる?」と入力した turn が、ユーザー側では context length 超過系のエラーで切れたように見えた。セッション永続化を確認すると、manual compact 自体は成功扱いで新 segment を作っているが、その post-compact `SegmentStart.history` が 892 entries / 約 2.3MB と巨大なままだった。
|
||||
|
||||
その直後の turn は `invoke` / `user_input` / `turn_end` / `run_completed result=finished` が残っている一方で、`assistant_item` と `llm_usage` が残っていない。つまり Pod 永続化上は正常終了扱いだが、実際には LLM 応答が生成されていない空 turn になっている。
|
||||
|
||||
次の turn の前には自動 pre-run compact がもう一度走り、今度は `SegmentStart.history` が 24 entries まで縮んだ。その後の turn は正常に `llm_usage input_total_tokens=20182` を記録して進行している。
|
||||
|
||||
## 観測したセッション
|
||||
|
||||
親 session:
|
||||
|
||||
- `/home/hare/.insomnia/sessions/019e5769-73fa-72a0-b501-b657a8976dd3`
|
||||
|
||||
関連 segment:
|
||||
|
||||
- 元 segment: `019e5769-73fa-72a0-b501-b665cb0ce470`
|
||||
- 1回目 compact 後: `019e5c2f-a23c-7a00-b5db-fb31fccec9fb`
|
||||
- 2回目 compact 後: `019e5c34-cea7-7e72-8565-2d5447fa0b70`
|
||||
|
||||
1回目 compact 後の `SegmentStart.history` は 892 entries。内訳は概ね以下。
|
||||
|
||||
- `tool_call`: 338
|
||||
- `tool_result`: 338
|
||||
- `reasoning`: 171
|
||||
- `message`: 45
|
||||
|
||||
これは 1 turn で大量 tool call したというより、前回 compact 以後の長い履歴 suffix が retained tail として残ったものと考える方が自然。
|
||||
|
||||
## 現時点の推定
|
||||
|
||||
### 1. manual compact は「400K 未満にする」処理ではない
|
||||
|
||||
400K は compact 発火閾値であり、compact 後の post-condition ではない。manual compact の履歴分割は `compact_retained_tokens` を目標に末尾履歴を残すが、その結果が compact 閾値未満かどうかを強く検証していないように見える。
|
||||
|
||||
### 2. retained tail の見積もりが実際の persisted history サイズと乖離している可能性
|
||||
|
||||
`split_for_retained` / `token_estimates_for_prune_impl` 周辺を見る限り、retained split は usage records や token estimate に依存する。LLM request 時には tool result pruning / projection が関わるため、LLM に実際に投げた context の usage と、session log に永続化されている unpruned history の大きさが一致しない可能性がある。
|
||||
|
||||
その場合、manual compact 前の retained split では「小さく見える」が、compact 後に usage records がない状態で byte fallback 的な推定を使うと「大きく見える」ため、直後に auto compact が必要になる、という挙動を説明できる。
|
||||
|
||||
この点はまだ確定ではない。次に `019e5c2f` の retained tail が旧 history のどの index から始まったか、当時の usage records がどの history_len に対応していたかを再現コードか追加ログで詰める必要がある。
|
||||
|
||||
### 3. `just_compacted` が safety net を一時的に止める
|
||||
|
||||
compact 成功後は `compact_state.record_compact_success()` により `just_compacted = true` になる。`prepare_for_run()` の pre-run compact と interceptor の between-request compact は `!just_compacted` を条件にしているため、compact 直後の次 turn では再 compact が抑止される。
|
||||
|
||||
今回、1回目 compact の post-compact history が巨大なままでも、直後の turn ではその safety net が効かなかった。その turn が実質失敗しているのに `run_completed result=finished` として扱われたことで `just_compacted` が解除され、さらに次の turn の pre-run compact が走ったと考えられる。
|
||||
|
||||
### 4. context length 系失敗が `run_errored` として残っていない
|
||||
|
||||
問題の turn には `assistant_item` / `llm_usage` が無いにもかかわらず `run_completed finished` が残っている。ユーザー体感では context length 超過で切れている。LLM worker 側、stream continuation 側、または rollback/empty turn 処理で、エラーが正常終了扱いに丸められている可能性がある。
|
||||
|
||||
これは compact 問題とは別に、永続化と UI observability のバグとして調べるべき。
|
||||
|
||||
## 追加で気になった点
|
||||
|
||||
runtime state に segment 表示の不一致があった。
|
||||
|
||||
- `/run/user/1000/insomnia/insomnia/status.json` は古い segment id を指していた
|
||||
- `/run/user/1000/insomnia/pods.json` は新しい segment id を指していた
|
||||
|
||||
今回の context 超過の主因ではなさそうだが、attach/restore 時に混乱要因になり得る。
|
||||
|
||||
## 修正候補
|
||||
|
||||
1. compact 成功後に post-compact context estimate を検査する。
|
||||
- `new_history_len`
|
||||
- estimated prompt tokens
|
||||
- retained item count
|
||||
- retained byte size
|
||||
- threshold に対する比率
|
||||
|
||||
2. post-compact history が threshold を超える場合、単純な成功扱いにしない。
|
||||
- `CompactFailed` 相当にする
|
||||
- あるいは `just_compacted` を立てず、pre-run safety net を有効のままにする
|
||||
- ただし infinite compact loop / thrash を避けるため、失敗理由を明示する必要がある
|
||||
|
||||
3. retained split の token estimate を、LLM に投げた pruned context ではなく、persisted history の実サイズに近い形で検証する。
|
||||
- 少なくとも post-condition は usage record だけに依存しない
|
||||
- byte fallback と usage-based estimate の乖離を metrics に出す
|
||||
|
||||
4. tool call / tool result boundary 保護が retained budget をどれだけ破ったかを可視化する。
|
||||
- `initial_cut`
|
||||
- `balanced_cut`
|
||||
- `items_pulled_back`
|
||||
- `bytes_pulled_back`
|
||||
- `estimated_tokens_pulled_back`
|
||||
|
||||
5. compact metrics を session log に残す。
|
||||
- source: manual / pre_run / between_requests
|
||||
- old_segment_id / new_segment_id
|
||||
- old_history_len / new_history_len
|
||||
- retained_from index
|
||||
- retained_items
|
||||
- estimated_tokens_before / after
|
||||
- estimate source: usage_records / byte_fallback / mixed
|
||||
|
||||
6. context length 系エラーが `run_completed finished` になる経路を調べる。
|
||||
- `assistant_item` と `llm_usage` が無い run を正常終了扱いにしてよいか
|
||||
- LLM request 前の context-build error と upstream error の永続化
|
||||
- TUI に `RunEnd(Finished)` だけが届く経路がないか
|
||||
|
||||
## 次にやる調査
|
||||
|
||||
- `019e5c2f` の 892-entry history が、旧 segment history の何番目から retained されたものか特定する。
|
||||
- 旧 segment の `LogEntry::LlmUsage { history_len, input_total_tokens, ... }` と retained cut の対応を確認する。
|
||||
- `split_for_retained` を当時の history / usage records に対して再実行し、見積もりと実 persisted size の差を出す。
|
||||
- `llm-worker` の prune threshold / protected area / min savings を確認し、pruning がこの乖離にどの程度寄与したかを確定する。
|
||||
- 空 turn が `run_completed finished` になった理由を追う。
|
||||
|
||||
## 注意
|
||||
|
||||
このレポートは調査途中の暫定まとめ。特に「pruning が retained estimate を小さく見せた」という仮説はまだ未確定。確実に言えるのは、manual compact 後の `SegmentStart.history` が 892 entries と巨大で、その直後の turn が assistant/usage 無しに finished 扱いになり、次 turn で再 compact されて復旧した、という観測事実。
|
||||
|
|
@ -1,145 +0,0 @@
|
|||
# llm-worker: ストリーム途中失敗時の継続
|
||||
|
||||
## 背景
|
||||
|
||||
LLM 応答の SSE ストリームを読んでいる途中で upstream が切れると、
|
||||
`crates/llm-worker/src/llm_client/transport.rs:231` で
|
||||
`ClientError::Sse(...)`(中身は `eventsource_stream::Error::Transport`、
|
||||
さらに reqwest の `error decoding response body`)として上に投げられ、
|
||||
`worker.rs:933 stream_response` が `WorkerError` に変換して Run 全体が中断する。
|
||||
|
||||
実例: セッション `019de419-6f8b-71a0-9e56-f0b9a6c7098b.jsonl:236`。
|
||||
直前まで text / tool_use を Timeline に積み続けていたが、
|
||||
最後の SSE フレームが届く前に接続が切れ、Run が落ちた。
|
||||
|
||||
ここで重要な点:
|
||||
|
||||
- 出力トークンは upstream 側で既に発生しており、課金済み。
|
||||
単純に同じ request を再送すると二重出力 + 二重課金。
|
||||
- Anthropic / OpenAI のいずれの API も、途切れた SSE を
|
||||
resume するエンドポイントは持たない。継続したい場合は
|
||||
「assistant turn として部分出力を history に置いた状態で再リクエスト」
|
||||
という形を自作する必要がある。
|
||||
- 部分出力の質は内容依存:
|
||||
- 完成した text ブロックは原則そのまま history に置ける
|
||||
- 未完成の通常 text ブロックも、LLM の仕様上は assistant partial として置けば続きを生成させられる
|
||||
- tool_use の `input_json` が途中で切れたブロックは破損 JSON で、そのままは置けない
|
||||
- reasoning / thinking ブロックも provider 依存の扱いが要る
|
||||
|
||||
このため、これは「リトライ」ではなく「継続 (continuation)」。
|
||||
`history` を編集する責務であり、`transport.rs` には収まらず、
|
||||
`worker.rs` 層(または上位)の機能になる。
|
||||
`feedback_llm_worker_scope.md` の方針(llm-worker は低レベル基盤に留める)にも合致する。
|
||||
|
||||
なお `worker.rs:973` 付近で部分 `flush_usage()` だけは既に行っており、
|
||||
半分くらいは継続を意識した作りになっている。あとは
|
||||
「壊れていないブロックの確定」と「次 call の起動条件」を足す形。
|
||||
|
||||
## 決定した方針
|
||||
|
||||
stream 開始後に transport / SSE error で落ちた場合、同じ request をそのまま再送しない。
|
||||
Timeline に積まれた部分生成を安全な範囲で assistant history として確定し、その history を前提に continuation call を起動する。
|
||||
|
||||
- 通常 text は partial でも残す。
|
||||
- 完成済み text block はそのまま確定する。
|
||||
- 未完成 text block も text として確定し、次の LLMCall で続きを生成させる。
|
||||
- tool_use は壊れていないものだけ残す。
|
||||
- 完成済み tool_use は通常通り確定する。
|
||||
- 未完成 tool_use / partial JSON は history に入れず破棄する。
|
||||
- 破棄した事実は structured diagnostic event として記録する。
|
||||
- reasoning / thinking block は初期実装では保守的に扱う。
|
||||
- provider が history に安全に戻せる完成 block として扱えるものだけ残す。
|
||||
- 未完成または復元不能な thinking/reasoning は破棄し、diagnostic event に残す。
|
||||
- continuation は自動で最大 5 回試す。
|
||||
- backoff は attempt ごとに伸ばす。例: 1s, 2s, 4s, 8s, 16s。
|
||||
- 5 回失敗したら turn を中断し、通常の失敗として上位へ返す。
|
||||
- `Cancelled` / `Aborted` / interceptor `Yield` は continuation より優先する。
|
||||
- 明示的な user cancel や上位制御を transport error retry で覆さない。
|
||||
- 明示的な safety / content filter stop reason が provider event として返る場合は retry 対象外にする。
|
||||
- transport / SSE error としてしか見えない場合は continuation 対象にする。
|
||||
- 同じ箇所で繰り返し切られる場合は最大 5 回で exhausted する。
|
||||
|
||||
## 失敗ログ / 統計
|
||||
|
||||
この機能は実運用での発生頻度と回復率を見たいので、continuation lifecycle を structured log として残す。
|
||||
ログは統計・デバッグ用であり、通常の LLM context へ暗黙注入しない。
|
||||
|
||||
最低限記録する event:
|
||||
|
||||
- `llm_stream_interrupted`
|
||||
- provider / model
|
||||
- run_id / turn_id 相当
|
||||
- original attempt / continuation attempt
|
||||
- error kind: `sse_transport`, `sse_parse`, `body_decode`, `unknown` 等
|
||||
- error message
|
||||
- committed text block count
|
||||
- committed partial text の有無
|
||||
- discarded partial tool_use count
|
||||
- discarded thinking/reasoning count
|
||||
- usage flush の有無
|
||||
- `llm_stream_continuation_started`
|
||||
- continuation attempt
|
||||
- backoff duration
|
||||
- history に確定した block summary
|
||||
- `llm_stream_continuation_completed`
|
||||
- continuation attempt
|
||||
- completion reason
|
||||
- `llm_stream_continuation_failed`
|
||||
- continuation attempt
|
||||
- error kind / message
|
||||
- `llm_stream_continuation_exhausted`
|
||||
- attempts
|
||||
- final reason
|
||||
|
||||
未完成 tool_use を破棄した場合は、可能な範囲で以下も残す。
|
||||
|
||||
```json
|
||||
{
|
||||
"event": "discarded_partial_tool_use",
|
||||
"tool_name": "Bash",
|
||||
"partial_input_bytes": 1234,
|
||||
"reason": "sse_transport_error"
|
||||
}
|
||||
```
|
||||
|
||||
## 要件
|
||||
|
||||
- ストリーム開始後の transport / SSE error を `worker.rs` 層で捕捉し、continuation 対象か判定する。
|
||||
- pre-stream の transient retry とは別枠にする。
|
||||
- 同じ request の単純再送はしない。
|
||||
- Timeline に積まれた安全な block を assistant history として確定する。
|
||||
- 完成済み text block を残す。
|
||||
- 未完成 text block も残す。
|
||||
- 完成済み tool_use を残す。
|
||||
- 未完成 tool_use / partial JSON は破棄し、diagnostic event を記録する。
|
||||
- 未完成または復元不能な thinking/reasoning は破棄し、diagnostic event を記録する。
|
||||
- continuation call を最大 5 回まで自動実行する。
|
||||
- attempt ごとに backoff を伸ばす。
|
||||
- 成功したら通常の LLMCall 完了として扱う。
|
||||
- exhausted したら turn を中断する。
|
||||
- `Cancelled` / `Aborted` / interceptor `Yield` がある場合は continuation しない。
|
||||
- provider が明示的な safety / content filter stop reason を正常 event として返した場合は continuation しない。
|
||||
- continuation lifecycle と破棄した partial block の概要を structured log に残す。
|
||||
- 統計として provider/model 別の失敗頻度、回復率、partial tool_use 発生有無を後から集計できること。
|
||||
- continuation のために context へ一時的な system message を暗黙注入しない。
|
||||
- もし LLM に中断事実を知らせる必要が出た場合は、history に残る明示 event/message として設計する。
|
||||
- 初期実装では壊れた tool_use は LLM に知らせず、ログにだけ残す。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- stream 途中で transport / SSE error を起こすモック integration test がある。
|
||||
- text-only partial response では、未完成 text が history に残り、continuation call が続きを生成する。
|
||||
- partial tool_use response では、壊れた tool_use が history に入らず、discard diagnostic が記録される。
|
||||
- completed tool_use は破棄されず、通常通り history に残る。
|
||||
- continuation が最大 5 回で exhausted し、turn が中断される test がある。
|
||||
- `Cancelled` / `Aborted` / `Yield` が continuation より優先される test がある。
|
||||
- structured log から interrupted / started / completed / failed / exhausted が確認できる。
|
||||
- 課金重複が起きないこと(過去ターンの単純再生成ではなく partial assistant history からの continuation であること)が test または手動手順で確認されている。
|
||||
- `cargo check` / `cargo test` が `llm-worker` で通る。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- pre-stream の transient リトライ → `llm-worker-transient-retry`
|
||||
- ストリーム resume API の実装(プロバイダ側に存在しないので不可能)
|
||||
- 課金額の自動上限制御
|
||||
- 壊れた partial tool_use を system message 等で LLM に説明して復旧させる高度な戦略
|
||||
48
tickets/memory-consolidation-skip-observability.md
Normal file
48
tickets/memory-consolidation-skip-observability.md
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
# メモリ機構: consolidation skip 表示と invalid staging の観測性
|
||||
|
||||
## 背景
|
||||
|
||||
`memory-audit-log` 実装後、TUI actionbar に `memory consolidation skipped: no_staging_entries` が表示されるようになった。これは一見すると「staging が無い」状態に見えるが、実際の workspace には `.insomnia/memory/_staging/*.json` が残っている場合がある。
|
||||
|
||||
現状の原因は二つある。
|
||||
|
||||
1. `run_consolidate_once` が成功後に backlog drain のため loop し、直後の空確認で `skipped: no_staging_entries` を emit する。これにより、直前の `completed_record_changes` 表示が actionbar 上で上書きされる。
|
||||
2. 旧 schema の staging file は `source.session_id` を持ち、現行 schema が要求する `source.segment_id` を持たないため parse で skip される。結果として、ファイルは存在するのに valid entry が 0 件となり、`no_staging_entries` と記録される。
|
||||
|
||||
この状態は機能停止ではないが、観測面として misleading であり、memory 機構の稼働状況を人間が誤解しやすい。
|
||||
|
||||
## 方針
|
||||
|
||||
Audit log には worker の skip / no-op を残しつつ、actionbar には人間が見る価値のある状態だけを出す。特に successful consolidation の直後に発生する drain 終端確認の `no_staging_entries` は actionbar に出さない。
|
||||
|
||||
また、staging directory にファイルがあるが current schema として読めない場合は、`no_staging_entries` ではなく invalid staging が存在することを audit log から分かるようにする。
|
||||
|
||||
## 要件
|
||||
|
||||
- consolidation が `completed` した直後の drain 確認で発生する idle skip が、直前の actionbar 表示を上書きしない。
|
||||
- 例: `completed_record_changes` の直後に `no_staging_entries` を actionbar へ出さない。
|
||||
- audit log へ残すかどうかは実装判断でよいが、UI event としては抑制する。
|
||||
- 通常の post-run check で staging が本当に空の場合も、actionbar に `no_staging_entries` を毎回出さない。
|
||||
- idle/no-op 状態は audit log 側で確認できればよい。
|
||||
- `list_staging_entries` あるいはその呼び出し側で、invalid / parse-failed staging file の存在を区別できるようにする。
|
||||
- 少なくとも invalid count が audit reason または structured field から分かる。
|
||||
- 例: `no_valid_staging_entries invalid=6`。
|
||||
- `threshold_not_reached` と `no_valid_staging_entries` / `invalid_staging_entries` は区別される。
|
||||
- 既存の old schema staging を自動 migration / delete しない。
|
||||
- `source.session_id` と `source.segment_id` は意味が違う可能性があるため、この ticket では観測性改善に留める。
|
||||
- 必要なら後続で archive/drop/migration 方針を決める。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- successful consolidation の直後に actionbar が `no_staging_entries` で上書きされない。
|
||||
- staging directory に parse 不能な `.json` がある場合、audit log が `no_staging_entries` ではなく invalid staging の存在を示す。
|
||||
- staging が本当に空の periodic/post-run check は actionbar にノイズを出さない。
|
||||
- consolidation skip / invalid staging の挙動を確認する test がある。
|
||||
- `cargo fmt --check` と関連 crate の test が通る。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- old schema staging file の自動 migration。
|
||||
- old schema staging file の自動削除 / archive。
|
||||
- actionbar transient notice 汎用 API の実装。
|
||||
- memory audit log の保存形式の大幅変更。
|
||||
53
tickets/spawnpod-initial-run-confirmation.md
Normal file
53
tickets/spawnpod-initial-run-confirmation.md
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
# SpawnPod: initial Run delivery confirmation
|
||||
|
||||
## 背景
|
||||
|
||||
`SpawnPod` は child Pod を起動し、初回 task を `Method::Run` として送る。しかし、実例として `impl-llm-worker-stream-continuation` を再作成した際、runtime registry / socket / process は生きている一方で、初回 task の session log が materialize されず、Pod は `idle` のままだった。
|
||||
|
||||
確認された状態:
|
||||
|
||||
- `/run/user/1000/insomnia/pods.json` に live allocation がある
|
||||
- `/run/user/1000/insomnia/<pod>/status.json` は `state: "idle"` と runtime `segment_id` を持つ
|
||||
- `/home/hare/.insomnia/sessions/pods/<pod>/metadata.json` は pending segment のまま
|
||||
- 対応する session / segment `.jsonl` が存在しない
|
||||
- `ReadPodOutput` は no new assistant text
|
||||
|
||||
`SpawnPod` の送信側は `send_run` で `Method::Run` を write してすぐ切断し、`TurnStart` 等の ack を待っていない。一方 server 側は接続直後に `Snapshot` を書いてから method を読むため、client がすぐ close すると server が snapshot write で失敗し、method を読む前に connection handler が終了する race があり得る。
|
||||
|
||||
この場合 `SpawnPod` は成功を返すが、child Pod は初回 task を実行していない。
|
||||
|
||||
同種の問題は child Pod の通知経路でも既に踏んでおり、送信側が write 後にすぐ切断せず、receiver 側の acknowledgement / observable event を待つ形にして解消している。`SpawnPod` の初回 task delivery も同じ性質の race と見なす。
|
||||
|
||||
## 方針
|
||||
|
||||
`SpawnPod` は child process / socket の起動だけでなく、初回 task が controller に受理され、少なくとも `UserMessage` または `TurnStart` が観測できるまで確認してから成功を返す。
|
||||
|
||||
既存の `SendToPod` が使う `send_run_and_confirm` と同等の acknowledgement を `SpawnPod` の初回 task 送信にも適用する。
|
||||
|
||||
## 要件
|
||||
|
||||
- `SpawnPod` の初回 task 送信は fire-and-forget にしない。
|
||||
- `Method::Run` 送信後、`UserMessage` / `TurnStart` / `InvokeStart` など、run が受理されたことを示す event を待つ。
|
||||
- timeout 時は `SpawnPod` を失敗扱いにする。
|
||||
- 初回 task delivery に失敗した場合、process / registry / delegated scope の扱いを明確にする。
|
||||
- cleanup するか、attach 可能な idle Pod として残すかを実装で決める。
|
||||
- 少なくとも成功扱いで返さない。
|
||||
- Server が connection 開始時に `Snapshot` を書く設計と競合しない。
|
||||
- client 側が snapshot/event を読みながら `Method::Run` ack を待つ形にする。
|
||||
- `SpawnPod` 成功後は、child Pod の metadata が pending でも、初回 run が開始済みであることを確認できる。
|
||||
- session log materialization のタイミングそのものは別設計でもよい。
|
||||
- `SendToPod` と `SpawnPod` の run delivery confirmation ロジックを可能な範囲で共通化する。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- `SpawnPod` が初回 task の受理確認を待つ。
|
||||
- 初回 task が実行されない race を再現する test または regression test がある。
|
||||
- `SpawnPod` が success を返した後、child Pod が idle pending のまま task 未実行になる状態が起きない。
|
||||
- delivery timeout / failure 時の error message が人間に分かる。
|
||||
- `cargo fmt --check` と関連 crate の test が通る。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- `tui -r` picker に live pending Pod を表示する修正。
|
||||
- session log の SegmentStart materialization 方針変更。
|
||||
- spawned child Pod panel UI。
|
||||
62
tickets/tui-picker-live-pending-pods.md
Normal file
62
tickets/tui-picker-live-pending-pods.md
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
# TUI picker: live pending Pod の表示優先と状態補完
|
||||
|
||||
## 背景
|
||||
|
||||
`tui -r` の Pod picker は session store の name-keyed Pod metadata と runtime registry の live allocation を合わせて表示している。しかし、spawned child Pod がまだ最初の user turn / SegmentStart を materialize していない場合、Pod metadata は pending segment のままになり、session log も存在しない。
|
||||
|
||||
実例として、`impl-llm-worker-stream-continuation` は live socket と runtime registry 上の segment_id を持っていたが、metadata は以下のように `session_id` のみだった。
|
||||
|
||||
```json
|
||||
{
|
||||
"pod_name": "impl-llm-worker-stream-continuation",
|
||||
"active": {
|
||||
"session_id": "019e5bc6-c3f3-7193-98a1-d64c635f86a1"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
一方で runtime 側には segment_id が存在する。
|
||||
|
||||
```json
|
||||
{
|
||||
"pod_name": "impl-llm-worker-stream-continuation",
|
||||
"segment_id": "019e5bc6-c3f3-7193-98a1-d6559bdc9cd6",
|
||||
"state": "idle"
|
||||
}
|
||||
```
|
||||
|
||||
この状態の Pod は attach 可能だが、session log がないため `updated_at = 0` になり、picker の `updated_at desc` sort と `MAX_ROWS = 10` truncate によって一覧から漏れやすい。
|
||||
|
||||
## 方針
|
||||
|
||||
Live socket が reachable な Pod は、session log / metadata active segment が未確定でも attach 可能な対象として picker に表示する。restore 可能性と attach 可能性を分け、live pending Pod は restore 不能でも live attach 対象として扱う。
|
||||
|
||||
## 要件
|
||||
|
||||
- `tui -r` picker は reachable live Pod を stopped Pod より優先して表示する。
|
||||
- `updated_at = 0` でも live row が `MAX_ROWS` truncate で落ちない。
|
||||
- sort key は少なくとも live first, updated_at desc, pod_name になる。
|
||||
- Live Pod の metadata が pending segment の場合でも picker row に表示する。
|
||||
- preview は `[live, pending segment]` など、人間が状態を理解できる文言にする。
|
||||
- debug id 表示では runtime registry の segment_id を可能なら表示する。
|
||||
- Runtime registry / live status に segment_id があり、metadata に segment_id が無い場合、表示上は runtime segment_id を補完できるようにする。
|
||||
- ただし session log が存在しない限り restore 可能とは扱わない。
|
||||
- attach は live socket に対して行う。
|
||||
- Existing stopped / corrupt Pod metadata rows の表示を壊さない。
|
||||
- `ListVisiblePods` / discovery 側にも同様の pending live 表示不整合がある場合、必要なら後続 ticket に切り出す。
|
||||
- この ticket の主対象は `tui -r` picker。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- live pending Pod が `tui -r` に表示される。
|
||||
- live pending Pod を選択すると live socket に attach する。
|
||||
- live pending Pod が多数の stopped Pod によって `MAX_ROWS` truncate から漏れない。
|
||||
- picker の sort / row build の unit test が追加または更新されている。
|
||||
- `cargo fmt --check` と `cargo test -p tui picker` あるいは関連 TUI test が通る。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- pending Pod metadata を runtime segment_id で永続的に書き換えること。
|
||||
- session log が無い Pod を restore 可能にすること。
|
||||
- spawned child Pod の first turn / SegmentStart materialization 方針の変更。
|
||||
- 汎用 spawned Pod panel UI。
|
||||
Loading…
Reference in New Issue
Block a user