Merge branch 'llm-worker-transient-retry' into develop

This commit is contained in:
Keisuke Hirata 2026-05-04 13:16:26 +09:00
commit 344dca6ffa
9 changed files with 504 additions and 102 deletions

1
Cargo.lock generated
View File

@ -1650,6 +1650,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"trybuild",
"wiremock",
]
[[package]]

View File

@ -7,7 +7,6 @@
- Pod: 任意ターンからの Fork複数ターン巻き戻しを汎用化 → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
- Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md)
- llm-worker のエラー耐性
- HTTP transient リトライ → [tickets/llm-worker-transient-retry.md](tickets/llm-worker-transient-retry.md)
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
- ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md)
- TUI 拡充

View File

@ -25,3 +25,4 @@ tempfile = { workspace = true }
dotenv = "0.15"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
trybuild = "1.0.116"
wiremock = "0.6.5"

View File

@ -67,3 +67,69 @@ impl From<serde_json::Error> for ClientError {
ClientError::Json(err)
}
}
/// transient な失敗としてリトライ対象になるかを判定する。
///
/// 対象:
/// - `Api { status }` のうち 408 / 425 / 429 / 500 / 502 / 503 / 504 / 529
/// - `Http(reqwest::Error)` のうち `is_connect()` または `is_timeout()`
///
/// それ以外Json、Sse、Config、上記以外の Api ステータス)は false。
/// SSE 読み出し開始後の失敗は呼び出し側で `Sse` として上に流すため、
/// ここで対象外にしておけば自動的に弾かれる。
pub fn is_retryable(error: &ClientError) -> bool {
match error {
ClientError::Api {
status: Some(code), ..
} => matches!(*code, 408 | 425 | 429 | 500 | 502 | 503 | 504 | 529),
ClientError::Api { status: None, .. } => false,
ClientError::Http(e) => e.is_connect() || e.is_timeout(),
ClientError::Json(_) | ClientError::Sse(_) | ClientError::Config(_) => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn api_err(status: Option<u16>) -> ClientError {
ClientError::Api {
status,
code: None,
message: String::new(),
}
}
#[test]
fn retryable_status_codes() {
for code in [408u16, 425, 429, 500, 502, 503, 504, 529] {
assert!(
is_retryable(&api_err(Some(code))),
"status {code} should be retryable",
);
}
}
#[test]
fn non_retryable_status_codes() {
for code in [400u16, 401, 403, 404, 409, 410, 422, 501] {
assert!(
!is_retryable(&api_err(Some(code))),
"status {code} should not be retryable",
);
}
}
#[test]
fn api_without_status_not_retryable() {
assert!(!is_retryable(&api_err(None)));
}
#[test]
fn json_sse_config_not_retryable() {
let json_err = serde_json::from_str::<serde_json::Value>("not json").unwrap_err();
assert!(!is_retryable(&ClientError::Json(json_err)));
assert!(!is_retryable(&ClientError::Sse("boom".into())));
assert!(!is_retryable(&ClientError::Config("boom".into())));
}
}

View File

@ -23,6 +23,7 @@ pub mod error;
pub mod event;
pub mod types;
pub mod retry;
pub mod scheme;
pub mod transport;

View File

@ -0,0 +1,104 @@
//! HTTP transient エラー向けリトライポリシー。
//!
//! `transport.rs` の HTTP 送信〜ステータスチェック区間で `is_retryable`
//! が true を返した失敗をリトライする際に、待ち時間と打ち切り条件を
//! 提供する。SSE 読み出し開始後の失敗は対象外。
use std::time::Duration;
/// 指数バックオフ + ジッター + 累積タイムアウトを表すポリシー。
///
/// `Default` は llm-worker 全体の固定値を返す。manifest 経由の上書きが
/// 必要になったら拡張する(現状は不要 → `tickets/llm-worker-transient-retry.md`)。
#[derive(Debug, Clone)]
pub struct RetryPolicy {
/// 指数の基準値。`base * 2^attempt` を `cap` で頭打ちにした上限から
/// フルジッターで実際の wait を抽選する。
pub base: Duration,
/// 1 回あたりの wait の上限。
pub cap: Duration,
/// 試行の合計回数(初回 + リトライ)。`1` ならリトライしない。
pub max_attempts: u32,
/// 初回送信開始からの累積タイムアウト。これを超える wait は打ち切る。
pub total_timeout: Duration,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
base: Duration::from_millis(500),
cap: Duration::from_secs(10),
max_attempts: 4,
total_timeout: Duration::from_secs(30),
}
}
}
impl RetryPolicy {
/// `attempt` 回目の失敗0-indexed後に待つ時間を返す。
/// `Retry-After` で上書きしたい場合は呼び出さず、その値をそのまま使う。
pub fn backoff(&self, attempt: u32) -> Duration {
let shift = attempt.min(20);
let base_nanos = self.base.as_nanos() as u64;
let exp_nanos = base_nanos.saturating_mul(1u64 << shift);
let cap_nanos = self.cap.as_nanos() as u64;
let upper = exp_nanos.min(cap_nanos);
Duration::from_nanos(jitter_nanos(upper))
}
}
/// `[0, max_nanos]` から擬似乱数的に 1 つ取り出す。`SystemTime` の
/// 下位ビットを splitmix64 で攪拌するだけの軽量実装で、暗号的乱数性は
/// 持たないがフルジッターのぶつかり回避には十分。
fn jitter_nanos(max_nanos: u64) -> u64 {
if max_nanos == 0 {
return 0;
}
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let mut x = seed.wrapping_add(0x9E37_79B9_7F4A_7C15);
x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
x ^= x >> 31;
x % (max_nanos + 1)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_policy_values() {
let p = RetryPolicy::default();
assert_eq!(p.base, Duration::from_millis(500));
assert_eq!(p.cap, Duration::from_secs(10));
assert_eq!(p.max_attempts, 4);
assert_eq!(p.total_timeout, Duration::from_secs(30));
}
#[test]
fn backoff_respects_cap() {
let p = RetryPolicy::default();
for attempt in 0..30u32 {
assert!(
p.backoff(attempt) <= p.cap,
"attempt {attempt} exceeded cap",
);
}
}
#[test]
fn backoff_zero_when_base_zero() {
let p = RetryPolicy {
base: Duration::ZERO,
cap: Duration::from_secs(10),
max_attempts: 4,
total_timeout: Duration::from_secs(30),
};
for attempt in 0..5 {
assert_eq!(p.backoff(attempt), Duration::ZERO);
}
}
}

View File

@ -6,17 +6,21 @@
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, RETRY_AFTER};
use tokio::time::Instant;
use tracing::warn;
use super::auth::{AuthProvider, AuthRequirement};
use super::capability::ModelCapability;
use super::client::{ConfigWarning, LlmClient};
use super::error::ClientError;
use super::error::{ClientError, is_retryable};
use super::event::Event;
use super::retry::RetryPolicy;
use super::scheme::Scheme;
use super::types::{Request, RequestConfig};
@ -63,6 +67,7 @@ pub struct HttpTransport<S: Scheme> {
base_url: String,
auth: ResolvedAuth,
capability: ModelCapability,
retry_policy: RetryPolicy,
}
impl<S: Scheme> HttpTransport<S> {
@ -84,6 +89,7 @@ impl<S: Scheme> HttpTransport<S> {
base_url,
auth,
capability,
retry_policy: RetryPolicy::default(),
}
}
@ -93,6 +99,12 @@ impl<S: Scheme> HttpTransport<S> {
self
}
/// リトライポリシーを差し替える(テスト用 / 将来の manifest 化フック)。
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
fn build_url(&self) -> String {
let path = self.scheme.path(&self.model_id);
let url = format!("{}{}", self.base_url, path);
@ -159,10 +171,45 @@ impl<S: Scheme + Clone> Clone for HttpTransport<S> {
base_url: self.base_url.clone(),
auth: self.auth.clone(),
capability: self.capability.clone(),
retry_policy: self.retry_policy.clone(),
}
}
}
/// エラーレスポンスを `ClientError::Api` に変換し、`Retry-After` の秒数を
/// 同時に取り出す。リトライループで wait の上書きに使う。
async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Option<Duration>) {
let status = resp.status().as_u16();
let retry_after = resp
.headers()
.get(RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.trim().parse::<u64>().ok())
.map(Duration::from_secs);
let text = resp.text().await.unwrap_or_default();
let err = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
let error = json.get("error").unwrap_or(&json);
let code = error.get("type").and_then(|v| v.as_str()).map(String::from);
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or(&text)
.to_string();
ClientError::Api {
status: Some(status),
code,
message,
}
} else {
ClientError::Api {
status: Some(status),
code: None,
message: text,
}
};
(err, retry_after)
}
#[async_trait]
impl<S: Scheme + Clone + 'static> LlmClient for HttpTransport<S> {
fn clone_boxed(&self) -> Box<dyn LlmClient> {
@ -183,37 +230,41 @@ impl<S: Scheme + Clone + 'static> LlmClient for HttpTransport<S> {
.scheme
.build_request_body(&self.model_id, &request, &self.capability);
let response = self
.http_client
.post(&url)
.headers(headers)
.json(&body)
.send()
.await?;
let policy = &self.retry_policy;
let started = Instant::now();
let mut attempt: u32 = 0;
let response = loop {
let send_result = self
.http_client
.post(&url)
.headers(headers.clone())
.json(&body)
.send()
.await;
if !response.status().is_success() {
let status = response.status().as_u16();
let text = response.text().await.unwrap_or_default();
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
let error = json.get("error").unwrap_or(&json);
let code = error.get("type").and_then(|v| v.as_str()).map(String::from);
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or(&text)
.to_string();
return Err(ClientError::Api {
status: Some(status),
code,
message,
});
let (err, retry_after) = match send_result {
Ok(resp) if resp.status().is_success() => break resp,
Ok(resp) => classify_error_response(resp).await,
Err(e) => (ClientError::Http(e), None),
};
let next_attempt = attempt + 1;
if next_attempt >= policy.max_attempts || !is_retryable(&err) {
return Err(err);
}
return Err(ClientError::Api {
status: Some(status),
code: None,
message: text,
});
}
let wait = retry_after.unwrap_or_else(|| policy.backoff(attempt));
if started.elapsed() + wait > policy.total_timeout {
return Err(err);
}
warn!(
error = %err,
attempt = next_attempt,
wait_ms = wait.as_millis() as u64,
"transient HTTP error, retrying"
);
tokio::time::sleep(wait).await;
attempt = next_attempt;
};
let scheme = self.scheme.clone();
let byte_stream = response.bytes_stream().map_err(std::io::Error::other);

View File

@ -0,0 +1,249 @@
//! HTTP transport の transient エラーリトライ挙動の integration テスト。
//!
//! 対応チケット: `tickets/llm-worker-transient-retry.md`。
//! - 503 / 529 / connect refused でリトライ発火
//! - max_attempts 上限到達でエラー
//! - `Retry-After` ヘッダで指数バックオフを上書き
//! - `parse_sse` 由来の `ClientError::Sse`mid-stream 想定)はリトライしない
use std::time::{Duration, Instant};
use futures::StreamExt;
use llm_worker::llm_client::LlmClient;
use llm_worker::llm_client::auth::AuthRequirement;
use llm_worker::llm_client::capability::ModelCapability;
use llm_worker::llm_client::error::ClientError;
use llm_worker::llm_client::event::Event;
use llm_worker::llm_client::retry::RetryPolicy;
use llm_worker::llm_client::scheme::Scheme;
use llm_worker::llm_client::transport::{HttpTransport, ResolvedAuth};
use llm_worker::llm_client::types::Request;
use serde_json::Value;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
/// SSE 本体は触らないテスト用 scheme。`parse_fail` を立てると
/// stream 消費中(= retry loop の外)で `ClientError::Sse` を返す。
#[derive(Clone)]
struct DummyScheme {
parse_fail: bool,
}
impl Scheme for DummyScheme {
type State = ();
fn default_base_url(&self) -> &'static str {
""
}
fn path(&self, _: &str) -> String {
"/v1/chat".into()
}
fn required_auth(&self) -> AuthRequirement {
AuthRequirement::None
}
fn build_request_body(&self, _: &str, _: &Request, _: &ModelCapability) -> Value {
serde_json::json!({})
}
fn parse_sse(&self, _: &str, _: &str, _: &mut ()) -> Result<Vec<Event>, ClientError> {
if self.parse_fail {
Err(ClientError::Sse("simulated mid-stream parse failure".into()))
} else {
Ok(vec![])
}
}
fn default_capability(&self) -> ModelCapability {
ModelCapability::minimal()
}
}
fn fast_policy(max_attempts: u32) -> RetryPolicy {
RetryPolicy {
base: Duration::from_millis(1),
cap: Duration::from_millis(1),
max_attempts,
total_timeout: Duration::from_secs(60),
}
}
fn build_transport(
base_url: impl Into<String>,
parse_fail: bool,
policy: RetryPolicy,
) -> HttpTransport<DummyScheme> {
HttpTransport::new(
DummyScheme { parse_fail },
"test-model",
base_url,
ResolvedAuth::None,
ModelCapability::minimal(),
)
.with_retry_policy(policy)
}
fn ok_sse() -> ResponseTemplate {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(b"".to_vec(), "text/event-stream")
}
#[tokio::test]
async fn retries_503_then_succeeds() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(503).set_body_string("upstream connect error"))
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ok_sse())
.mount(&server)
.await;
let transport = build_transport(server.uri(), false, fast_policy(5));
let mut stream = transport
.stream(Request::default())
.await
.expect("stream should succeed after retries");
while stream.next().await.is_some() {}
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 3, "two failures plus one success expected");
}
#[tokio::test]
async fn retries_529_then_exhausts() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(529).set_body_string("overloaded"))
.mount(&server)
.await;
let transport = build_transport(server.uri(), false, fast_policy(3));
match transport.stream(Request::default()).await {
Err(ClientError::Api {
status: Some(529), ..
}) => {}
Err(other) => panic!("expected Api(529), got {other:?}"),
Ok(_) => panic!("expected error after exhausting retries"),
}
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 3, "should hit max_attempts and stop");
}
#[tokio::test]
async fn connect_refused_retries_then_fails() {
// 接続不能なローカルアドレスを使う。Linux では `Connection refused` で
// 即時失敗するため、`fast_policy` ならテストが秒以下で終わる。
let unreachable = "http://127.0.0.1:1";
let transport = build_transport(unreachable, false, fast_policy(3));
match transport.stream(Request::default()).await {
Err(ClientError::Http(e)) => {
assert!(
e.is_connect() || e.is_timeout(),
"expected connect/timeout, got {e:?}"
);
}
Err(other) => panic!("expected Http error, got {other:?}"),
Ok(_) => panic!("expected error connecting to closed port"),
}
}
#[tokio::test]
async fn retry_after_header_overrides_backoff() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(503).insert_header("retry-after", "1"))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ok_sse())
.mount(&server)
.await;
// base/cap を 1ms に絞った policy で `Retry-After: 1` を観察すると、
// 指数バックオフ単独なら 1ms 程度で終わるはずが Retry-After に従って
// 1 秒待つ → 経過時間で override を検証できる。
let policy = RetryPolicy {
base: Duration::from_millis(1),
cap: Duration::from_millis(1),
max_attempts: 3,
total_timeout: Duration::from_secs(10),
};
let transport = build_transport(server.uri(), false, policy);
let start = Instant::now();
let mut stream = transport.stream(Request::default()).await.expect("ok");
while stream.next().await.is_some() {}
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_secs(1),
"Retry-After=1 should make us wait >=1s, elapsed={elapsed:?}"
);
assert!(
elapsed < Duration::from_secs(3),
"Retry-After=1 should not balloon, elapsed={elapsed:?}"
);
}
#[tokio::test]
async fn mid_stream_sse_error_does_not_retry() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
b"event: data\ndata: payload\n\n".to_vec(),
"text/event-stream",
),
)
.mount(&server)
.await;
let transport = build_transport(server.uri(), true, fast_policy(5));
let mut stream = transport
.stream(Request::default())
.await
.expect("status 200 should bypass retry loop");
let mut saw_sse_err = false;
while let Some(item) = stream.next().await {
if matches!(item, Err(ClientError::Sse(_))) {
saw_sse_err = true;
}
}
assert!(saw_sse_err, "expected Sse error from stream consumer");
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 1, "mid-stream Sse must not retry");
}
#[tokio::test]
async fn non_retryable_status_returns_immediately() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/chat"))
.respond_with(ResponseTemplate::new(401).set_body_string("unauthorized"))
.mount(&server)
.await;
let transport = build_transport(server.uri(), false, fast_policy(5));
match transport.stream(Request::default()).await {
Err(ClientError::Api {
status: Some(401), ..
}) => {}
Err(other) => panic!("expected Api(401), got {other:?}"),
Ok(_) => panic!("expected error"),
}
let received = server.received_requests().await.unwrap();
assert_eq!(received.len(), 1, "401 must not retry");
}

View File

@ -1,70 +0,0 @@
# llm-worker: HTTP transient リトライ
## 背景
`crates/llm-worker/src/llm_client/transport.rs` はリトライを持たず、
upstream が一時的に不調だったときのエラーがそのまま `WorkerError`
伝播して Run が中断する。実セッションでも以下が観測されている:
- セッション `019de419-6f8b-71a0-9e56-f0b9a6c7098b.jsonl:85`
`API error (status: 503): upstream connect error ... Connection refused`
これは `transport.rs:194-216``response.status().is_success()` 前に
返される pre-stream の経路。リクエストはまだ消費されていない。
- Anthropic の `overloaded_error` (529) や Codex backend の 503 も
同経路で観測される transient な事象。
これらは「ヘッダが返る前の段階」で出るため、SSE を読み始めて
出力トークンを発生させる前であり、素朴な再送でべき等に復旧できる典型ケース。
ストリームが途中で切れた場合のリカバリは別の話(→ `llm-worker-stream-continuation`)。
## 方針
`transport.rs` の HTTP 送信層に transient エラー向けの再送を追加する。
SSE 読み出し開始後 (`response.bytes_stream()` 以降) のエラーは対象外で、
従来どおり `ClientError::Sse` として上に流す。
schemeOpenAI / Anthropic / Responses 等)に依存しない共通処理として、
すべての client から同じ振る舞いで使える形にする。
## 要件
### リトライ対象
- HTTP ステータス: 408 / 425 / 429 / 500 / 502 / 503 / 504 / 529
- `reqwest::Error::is_connect()` / `is_timeout()` 由来の送信失敗
- それ以外の `ClientError::Api { status }` および `ClientError::Json`
ストリーム開始後の `ClientError::Sse` は対象外
判定は `is_retryable(&ClientError) -> bool``error.rs` に置いて一箇所に集約する。
### バックオフ
- フルジッタ付き指数base/cap は実装時に妥当な値で固定。後で manifest 化したくなったら別ticket
- `Retry-After` ヘッダがあれば指数バックオフを上書きしてその時間待つ
- 上限: 最大試行回数 + 累積タイムアウトの両方を持つ
### ログ
- リトライ発火ごとに `warn!`ステータス、attempt 番号、次の wait
### 既存挙動の温存
- ストリーム途中で切れた場合の挙動には手を入れない
`transport.rs:231` の `ClientError::Sse` 経路はそのまま)
- 成功時のレイテンシに観測可能なオーバヘッドを足さない
## 完了条件
- `is_retryable` のテーブル駆動 unit test
- 503 / 529 / connect refused をモックした unit test が、
規定回数までリトライして「最終的に成功」「上限到達でエラー」の両ケースを通る
- `Retry-After: 5` を返すモックでは指数を上書きして 5s 待っている
(仮想時間で検証)
- mid-stream で `ClientError::Sse` を起こすモックでリトライが発火しない
- `cargo check` / `cargo test``llm-worker` で通る
## 範囲外
- mid-streamSSE 読み中)の継続再開 → `llm-worker-stream-continuation`
- プロバイダ別の細かい retry policy共通既定で十分
- リトライ上限値の manifest からの上書き必要になったら別ticket