From 7183847ee5ddc134fe2ac1da9084959ef32f5268 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 4 May 2026 12:45:33 +0900 Subject: [PATCH 1/3] =?UTF-8?q?feat(llm-worker):=20HTTP=20transient=20?= =?UTF-8?q?=E3=82=A8=E3=83=A9=E3=83=BC=E3=81=B8=E3=81=AE=E3=83=AA=E3=83=88?= =?UTF-8?q?=E3=83=A9=E3=82=A4=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `transport.rs` の HTTP 送信〜ステータスチェック区間に指数バックオフ + フルジッターのリトライループを追加する。SSE 読み出し開始後 ( `bytes_stream()` 以降) のエラーは従来どおりそのまま流す。 - `is_retryable(&ClientError)`: 408/425/429/500/502/503/504/529 と reqwest の connect/timeout のみ true - `RetryPolicy` (default: base 500ms / cap 10s / max_attempts 4 / total_timeout 30s) - `Retry-After` ヘッダ (秒数) があればバックオフを上書き - リトライ発火ごとに warn! でステータス・attempt・wait を出す ref: tickets/llm-worker-transient-retry.md --- Cargo.lock | 1 + crates/llm-worker/Cargo.toml | 1 + crates/llm-worker/src/llm_client/error.rs | 66 +++++ crates/llm-worker/src/llm_client/mod.rs | 1 + crates/llm-worker/src/llm_client/retry.rs | 104 ++++++++ crates/llm-worker/src/llm_client/transport.rs | 113 +++++--- .../llm-worker/tests/transport_retry_test.rs | 249 ++++++++++++++++++ 7 files changed, 504 insertions(+), 31 deletions(-) create mode 100644 crates/llm-worker/src/llm_client/retry.rs create mode 100644 crates/llm-worker/tests/transport_retry_test.rs diff --git a/Cargo.lock b/Cargo.lock index 2d18267b..36bc3719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1650,6 +1650,7 @@ dependencies = [ "tracing", "tracing-subscriber", "trybuild", + "wiremock", ] [[package]] diff --git a/crates/llm-worker/Cargo.toml b/crates/llm-worker/Cargo.toml index 7eb09302..f40776f3 100644 --- a/crates/llm-worker/Cargo.toml +++ b/crates/llm-worker/Cargo.toml @@ -25,3 +25,4 @@ tempfile = { workspace = true } dotenv = "0.15" tracing-subscriber = { version = "0.3", features = ["env-filter"] } trybuild = "1.0.116" +wiremock = "0.6.5" diff --git a/crates/llm-worker/src/llm_client/error.rs b/crates/llm-worker/src/llm_client/error.rs index 02ecbf1b..819ed84e 100644 --- a/crates/llm-worker/src/llm_client/error.rs +++ b/crates/llm-worker/src/llm_client/error.rs @@ -67,3 +67,69 @@ impl From for ClientError { ClientError::Json(err) } } + +/// transient な失敗としてリトライ対象になるかを判定する。 +/// +/// 対象: +/// - `Api { status }` のうち 408 / 425 / 429 / 500 / 502 / 503 / 504 / 529 +/// - `Http(reqwest::Error)` のうち `is_connect()` または `is_timeout()` +/// +/// それ以外(Json、Sse、Config、上記以外の Api ステータス)は false。 +/// SSE 読み出し開始後の失敗は呼び出し側で `Sse` として上に流すため、 +/// ここで対象外にしておけば自動的に弾かれる。 +pub fn is_retryable(error: &ClientError) -> bool { + match error { + ClientError::Api { + status: Some(code), .. + } => matches!(*code, 408 | 425 | 429 | 500 | 502 | 503 | 504 | 529), + ClientError::Api { status: None, .. } => false, + ClientError::Http(e) => e.is_connect() || e.is_timeout(), + ClientError::Json(_) | ClientError::Sse(_) | ClientError::Config(_) => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn api_err(status: Option) -> ClientError { + ClientError::Api { + status, + code: None, + message: String::new(), + } + } + + #[test] + fn retryable_status_codes() { + for code in [408u16, 425, 429, 500, 502, 503, 504, 529] { + assert!( + is_retryable(&api_err(Some(code))), + "status {code} should be retryable", + ); + } + } + + #[test] + fn non_retryable_status_codes() { + for code in [400u16, 401, 403, 404, 409, 410, 422, 501] { + assert!( + !is_retryable(&api_err(Some(code))), + "status {code} should not be retryable", + ); + } + } + + #[test] + fn api_without_status_not_retryable() { + assert!(!is_retryable(&api_err(None))); + } + + #[test] + fn json_sse_config_not_retryable() { + let json_err = serde_json::from_str::("not json").unwrap_err(); + assert!(!is_retryable(&ClientError::Json(json_err))); + assert!(!is_retryable(&ClientError::Sse("boom".into()))); + assert!(!is_retryable(&ClientError::Config("boom".into()))); + } +} diff --git a/crates/llm-worker/src/llm_client/mod.rs b/crates/llm-worker/src/llm_client/mod.rs index c707f94f..3037820a 100644 --- a/crates/llm-worker/src/llm_client/mod.rs +++ b/crates/llm-worker/src/llm_client/mod.rs @@ -23,6 +23,7 @@ pub mod error; pub mod event; pub mod types; +pub mod retry; pub mod scheme; pub mod transport; diff --git a/crates/llm-worker/src/llm_client/retry.rs b/crates/llm-worker/src/llm_client/retry.rs new file mode 100644 index 00000000..8f4d766a --- /dev/null +++ b/crates/llm-worker/src/llm_client/retry.rs @@ -0,0 +1,104 @@ +//! HTTP transient エラー向けリトライポリシー。 +//! +//! `transport.rs` の HTTP 送信〜ステータスチェック区間で `is_retryable` +//! が true を返した失敗をリトライする際に、待ち時間と打ち切り条件を +//! 提供する。SSE 読み出し開始後の失敗は対象外。 + +use std::time::Duration; + +/// 指数バックオフ + ジッター + 累積タイムアウトを表すポリシー。 +/// +/// `Default` は llm-worker 全体の固定値を返す。manifest 経由の上書きが +/// 必要になったら拡張する(現状は不要 → `tickets/llm-worker-transient-retry.md`)。 +#[derive(Debug, Clone)] +pub struct RetryPolicy { + /// 指数の基準値。`base * 2^attempt` を `cap` で頭打ちにした上限から + /// フルジッターで実際の wait を抽選する。 + pub base: Duration, + /// 1 回あたりの wait の上限。 + pub cap: Duration, + /// 試行の合計回数(初回 + リトライ)。`1` ならリトライしない。 + pub max_attempts: u32, + /// 初回送信開始からの累積タイムアウト。これを超える wait は打ち切る。 + pub total_timeout: Duration, +} + +impl Default for RetryPolicy { + fn default() -> Self { + Self { + base: Duration::from_millis(500), + cap: Duration::from_secs(10), + max_attempts: 4, + total_timeout: Duration::from_secs(30), + } + } +} + +impl RetryPolicy { + /// `attempt` 回目の失敗(0-indexed)後に待つ時間を返す。 + /// `Retry-After` で上書きしたい場合は呼び出さず、その値をそのまま使う。 + pub fn backoff(&self, attempt: u32) -> Duration { + let shift = attempt.min(20); + let base_nanos = self.base.as_nanos() as u64; + let exp_nanos = base_nanos.saturating_mul(1u64 << shift); + let cap_nanos = self.cap.as_nanos() as u64; + let upper = exp_nanos.min(cap_nanos); + Duration::from_nanos(jitter_nanos(upper)) + } +} + +/// `[0, max_nanos]` から擬似乱数的に 1 つ取り出す。`SystemTime` の +/// 下位ビットを splitmix64 で攪拌するだけの軽量実装で、暗号的乱数性は +/// 持たないがフルジッターのぶつかり回避には十分。 +fn jitter_nanos(max_nanos: u64) -> u64 { + if max_nanos == 0 { + return 0; + } + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let mut x = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + x ^= x >> 31; + x % (max_nanos + 1) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_policy_values() { + let p = RetryPolicy::default(); + assert_eq!(p.base, Duration::from_millis(500)); + assert_eq!(p.cap, Duration::from_secs(10)); + assert_eq!(p.max_attempts, 4); + assert_eq!(p.total_timeout, Duration::from_secs(30)); + } + + #[test] + fn backoff_respects_cap() { + let p = RetryPolicy::default(); + for attempt in 0..30u32 { + assert!( + p.backoff(attempt) <= p.cap, + "attempt {attempt} exceeded cap", + ); + } + } + + #[test] + fn backoff_zero_when_base_zero() { + let p = RetryPolicy { + base: Duration::ZERO, + cap: Duration::from_secs(10), + max_attempts: 4, + total_timeout: Duration::from_secs(30), + }; + for attempt in 0..5 { + assert_eq!(p.backoff(attempt), Duration::ZERO); + } + } +} diff --git a/crates/llm-worker/src/llm_client/transport.rs b/crates/llm-worker/src/llm_client/transport.rs index 42a0a3c4..45a5198e 100644 --- a/crates/llm-worker/src/llm_client/transport.rs +++ b/crates/llm-worker/src/llm_client/transport.rs @@ -6,17 +6,21 @@ use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use eventsource_stream::Eventsource; use futures::{Stream, StreamExt, TryStreamExt}; -use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue}; +use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, RETRY_AFTER}; +use tokio::time::Instant; +use tracing::warn; use super::auth::{AuthProvider, AuthRequirement}; use super::capability::ModelCapability; use super::client::{ConfigWarning, LlmClient}; -use super::error::ClientError; +use super::error::{ClientError, is_retryable}; use super::event::Event; +use super::retry::RetryPolicy; use super::scheme::Scheme; use super::types::{Request, RequestConfig}; @@ -63,6 +67,7 @@ pub struct HttpTransport { base_url: String, auth: ResolvedAuth, capability: ModelCapability, + retry_policy: RetryPolicy, } impl HttpTransport { @@ -84,6 +89,7 @@ impl HttpTransport { base_url, auth, capability, + retry_policy: RetryPolicy::default(), } } @@ -93,6 +99,12 @@ impl HttpTransport { self } + /// リトライポリシーを差し替える(テスト用 / 将来の manifest 化フック)。 + pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self { + self.retry_policy = policy; + self + } + fn build_url(&self) -> String { let path = self.scheme.path(&self.model_id); let url = format!("{}{}", self.base_url, path); @@ -159,10 +171,45 @@ impl Clone for HttpTransport { base_url: self.base_url.clone(), auth: self.auth.clone(), capability: self.capability.clone(), + retry_policy: self.retry_policy.clone(), } } } +/// エラーレスポンスを `ClientError::Api` に変換し、`Retry-After` の秒数を +/// 同時に取り出す。リトライループで wait の上書きに使う。 +async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Option) { + let status = resp.status().as_u16(); + let retry_after = resp + .headers() + .get(RETRY_AFTER) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.trim().parse::().ok()) + .map(Duration::from_secs); + let text = resp.text().await.unwrap_or_default(); + let err = if let Ok(json) = serde_json::from_str::(&text) { + let error = json.get("error").unwrap_or(&json); + let code = error.get("type").and_then(|v| v.as_str()).map(String::from); + let message = error + .get("message") + .and_then(|v| v.as_str()) + .unwrap_or(&text) + .to_string(); + ClientError::Api { + status: Some(status), + code, + message, + } + } else { + ClientError::Api { + status: Some(status), + code: None, + message: text, + } + }; + (err, retry_after) +} + #[async_trait] impl LlmClient for HttpTransport { fn clone_boxed(&self) -> Box { @@ -183,37 +230,41 @@ impl LlmClient for HttpTransport { .scheme .build_request_body(&self.model_id, &request, &self.capability); - let response = self - .http_client - .post(&url) - .headers(headers) - .json(&body) - .send() - .await?; + let policy = &self.retry_policy; + let started = Instant::now(); + let mut attempt: u32 = 0; + let response = loop { + let send_result = self + .http_client + .post(&url) + .headers(headers.clone()) + .json(&body) + .send() + .await; - if !response.status().is_success() { - let status = response.status().as_u16(); - let text = response.text().await.unwrap_or_default(); - if let Ok(json) = serde_json::from_str::(&text) { - let error = json.get("error").unwrap_or(&json); - let code = error.get("type").and_then(|v| v.as_str()).map(String::from); - let message = error - .get("message") - .and_then(|v| v.as_str()) - .unwrap_or(&text) - .to_string(); - return Err(ClientError::Api { - status: Some(status), - code, - message, - }); + let (err, retry_after) = match send_result { + Ok(resp) if resp.status().is_success() => break resp, + Ok(resp) => classify_error_response(resp).await, + Err(e) => (ClientError::Http(e), None), + }; + + let next_attempt = attempt + 1; + if next_attempt >= policy.max_attempts || !is_retryable(&err) { + return Err(err); } - return Err(ClientError::Api { - status: Some(status), - code: None, - message: text, - }); - } + let wait = retry_after.unwrap_or_else(|| policy.backoff(attempt)); + if started.elapsed() + wait > policy.total_timeout { + return Err(err); + } + warn!( + error = %err, + attempt = next_attempt, + wait_ms = wait.as_millis() as u64, + "transient HTTP error, retrying" + ); + tokio::time::sleep(wait).await; + attempt = next_attempt; + }; let scheme = self.scheme.clone(); let byte_stream = response.bytes_stream().map_err(std::io::Error::other); diff --git a/crates/llm-worker/tests/transport_retry_test.rs b/crates/llm-worker/tests/transport_retry_test.rs new file mode 100644 index 00000000..64108da1 --- /dev/null +++ b/crates/llm-worker/tests/transport_retry_test.rs @@ -0,0 +1,249 @@ +//! HTTP transport の transient エラーリトライ挙動の integration テスト。 +//! +//! 対応チケット: `tickets/llm-worker-transient-retry.md`。 +//! - 503 / 529 / connect refused でリトライ発火 +//! - max_attempts 上限到達でエラー +//! - `Retry-After` ヘッダで指数バックオフを上書き +//! - `parse_sse` 由来の `ClientError::Sse`(mid-stream 想定)はリトライしない + +use std::time::{Duration, Instant}; + +use futures::StreamExt; +use llm_worker::llm_client::LlmClient; +use llm_worker::llm_client::auth::AuthRequirement; +use llm_worker::llm_client::capability::ModelCapability; +use llm_worker::llm_client::error::ClientError; +use llm_worker::llm_client::event::Event; +use llm_worker::llm_client::retry::RetryPolicy; +use llm_worker::llm_client::scheme::Scheme; +use llm_worker::llm_client::transport::{HttpTransport, ResolvedAuth}; +use llm_worker::llm_client::types::Request; +use serde_json::Value; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +/// SSE 本体は触らないテスト用 scheme。`parse_fail` を立てると +/// stream 消費中(= retry loop の外)で `ClientError::Sse` を返す。 +#[derive(Clone)] +struct DummyScheme { + parse_fail: bool, +} + +impl Scheme for DummyScheme { + type State = (); + fn default_base_url(&self) -> &'static str { + "" + } + fn path(&self, _: &str) -> String { + "/v1/chat".into() + } + fn required_auth(&self) -> AuthRequirement { + AuthRequirement::None + } + fn build_request_body(&self, _: &str, _: &Request, _: &ModelCapability) -> Value { + serde_json::json!({}) + } + fn parse_sse(&self, _: &str, _: &str, _: &mut ()) -> Result, ClientError> { + if self.parse_fail { + Err(ClientError::Sse("simulated mid-stream parse failure".into())) + } else { + Ok(vec![]) + } + } + fn default_capability(&self) -> ModelCapability { + ModelCapability::minimal() + } +} + +fn fast_policy(max_attempts: u32) -> RetryPolicy { + RetryPolicy { + base: Duration::from_millis(1), + cap: Duration::from_millis(1), + max_attempts, + total_timeout: Duration::from_secs(60), + } +} + +fn build_transport( + base_url: impl Into, + parse_fail: bool, + policy: RetryPolicy, +) -> HttpTransport { + HttpTransport::new( + DummyScheme { parse_fail }, + "test-model", + base_url, + ResolvedAuth::None, + ModelCapability::minimal(), + ) + .with_retry_policy(policy) +} + +fn ok_sse() -> ResponseTemplate { + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_raw(b"".to_vec(), "text/event-stream") +} + +#[tokio::test] +async fn retries_503_then_succeeds() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with(ResponseTemplate::new(503).set_body_string("upstream connect error")) + .up_to_n_times(2) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with(ok_sse()) + .mount(&server) + .await; + + let transport = build_transport(server.uri(), false, fast_policy(5)); + let mut stream = transport + .stream(Request::default()) + .await + .expect("stream should succeed after retries"); + while stream.next().await.is_some() {} + + let received = server.received_requests().await.unwrap(); + assert_eq!(received.len(), 3, "two failures plus one success expected"); +} + +#[tokio::test] +async fn retries_529_then_exhausts() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with(ResponseTemplate::new(529).set_body_string("overloaded")) + .mount(&server) + .await; + + let transport = build_transport(server.uri(), false, fast_policy(3)); + match transport.stream(Request::default()).await { + Err(ClientError::Api { + status: Some(529), .. + }) => {} + Err(other) => panic!("expected Api(529), got {other:?}"), + Ok(_) => panic!("expected error after exhausting retries"), + } + + let received = server.received_requests().await.unwrap(); + assert_eq!(received.len(), 3, "should hit max_attempts and stop"); +} + +#[tokio::test] +async fn connect_refused_retries_then_fails() { + // 接続不能なローカルアドレスを使う。Linux では `Connection refused` で + // 即時失敗するため、`fast_policy` ならテストが秒以下で終わる。 + let unreachable = "http://127.0.0.1:1"; + + let transport = build_transport(unreachable, false, fast_policy(3)); + match transport.stream(Request::default()).await { + Err(ClientError::Http(e)) => { + assert!( + e.is_connect() || e.is_timeout(), + "expected connect/timeout, got {e:?}" + ); + } + Err(other) => panic!("expected Http error, got {other:?}"), + Ok(_) => panic!("expected error connecting to closed port"), + } +} + +#[tokio::test] +async fn retry_after_header_overrides_backoff() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with(ResponseTemplate::new(503).insert_header("retry-after", "1")) + .up_to_n_times(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with(ok_sse()) + .mount(&server) + .await; + + // base/cap を 1ms に絞った policy で `Retry-After: 1` を観察すると、 + // 指数バックオフ単独なら 1ms 程度で終わるはずが Retry-After に従って + // 1 秒待つ → 経過時間で override を検証できる。 + let policy = RetryPolicy { + base: Duration::from_millis(1), + cap: Duration::from_millis(1), + max_attempts: 3, + total_timeout: Duration::from_secs(10), + }; + let transport = build_transport(server.uri(), false, policy); + + let start = Instant::now(); + let mut stream = transport.stream(Request::default()).await.expect("ok"); + while stream.next().await.is_some() {} + let elapsed = start.elapsed(); + + assert!( + elapsed >= Duration::from_secs(1), + "Retry-After=1 should make us wait >=1s, elapsed={elapsed:?}" + ); + assert!( + elapsed < Duration::from_secs(3), + "Retry-After=1 should not balloon, elapsed={elapsed:?}" + ); +} + +#[tokio::test] +async fn mid_stream_sse_error_does_not_retry() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_raw( + b"event: data\ndata: payload\n\n".to_vec(), + "text/event-stream", + ), + ) + .mount(&server) + .await; + + let transport = build_transport(server.uri(), true, fast_policy(5)); + let mut stream = transport + .stream(Request::default()) + .await + .expect("status 200 should bypass retry loop"); + let mut saw_sse_err = false; + while let Some(item) = stream.next().await { + if matches!(item, Err(ClientError::Sse(_))) { + saw_sse_err = true; + } + } + assert!(saw_sse_err, "expected Sse error from stream consumer"); + + let received = server.received_requests().await.unwrap(); + assert_eq!(received.len(), 1, "mid-stream Sse must not retry"); +} + +#[tokio::test] +async fn non_retryable_status_returns_immediately() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/chat")) + .respond_with(ResponseTemplate::new(401).set_body_string("unauthorized")) + .mount(&server) + .await; + + let transport = build_transport(server.uri(), false, fast_policy(5)); + match transport.stream(Request::default()).await { + Err(ClientError::Api { + status: Some(401), .. + }) => {} + Err(other) => panic!("expected Api(401), got {other:?}"), + Ok(_) => panic!("expected error"), + } + + let received = server.received_requests().await.unwrap(); + assert_eq!(received.len(), 1, "401 must not retry"); +} From 09a1cde92cdb7872224cee0119368bd0707a9cf3 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 4 May 2026 12:49:13 +0900 Subject: [PATCH 2/3] =?UTF-8?q?docs(tickets):=20llm-worker-transient-retry?= =?UTF-8?q?=20=E3=83=AC=E3=83=93=E3=83=A5=E3=83=BC=E8=BF=BD=E8=A8=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 7183847 のレビュー結果を Approve として記録する。チケット要件 (リトライ対象 / バックオフ / Retry-After 上書き / mid-stream 温存 / 完了条件) はすべて満たしており、コードベースの層構造を歪める変更も ない。Retry-After テストの方針差 (実時間 1s vs 仮想時間 5s) と connect refused テストの試行回数未検証は non-blocking として review.md に記録。 --- tickets/llm-worker-transient-retry.md | 7 +++ tickets/llm-worker-transient-retry.review.md | 62 ++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 tickets/llm-worker-transient-retry.review.md diff --git a/tickets/llm-worker-transient-retry.md b/tickets/llm-worker-transient-retry.md index 2a769830..9582ea47 100644 --- a/tickets/llm-worker-transient-retry.md +++ b/tickets/llm-worker-transient-retry.md @@ -68,3 +68,10 @@ scheme(OpenAI / Anthropic / Responses 等)に依存しない共通処理と - mid-stream(SSE 読み中)の継続再開 → `llm-worker-stream-continuation` - プロバイダ別の細かい retry policy(共通既定で十分) - リトライ上限値の manifest からの上書き(必要になったら別ticket) + +## Review + +- 状態: Approve +- レビュー詳細: [./llm-worker-transient-retry.review.md](./llm-worker-transient-retry.review.md) +- 対象 commit: `7183847` +- 日付: 2026-05-04 diff --git a/tickets/llm-worker-transient-retry.review.md b/tickets/llm-worker-transient-retry.review.md new file mode 100644 index 00000000..fd532080 --- /dev/null +++ b/tickets/llm-worker-transient-retry.review.md @@ -0,0 +1,62 @@ +# Review: llm-worker HTTP transient リトライ + +対象 commit: `7183847` (`develop` `1451998` から派生) +対象 branch: `llm-worker-transient-retry` + +## 前提・要件の確認 + +### リトライ対象 (要件) +- HTTP ステータス 408 / 425 / 429 / 500 / 502 / 503 / 504 / 529 → `crates/llm-worker/src/llm_client/error.rs:80-89` の `is_retryable` で網羅。 +- `reqwest::Error::is_connect()` / `is_timeout()` → 同 `is_retryable` で対応。 +- それ以外 (`Json` / `Sse` / `Config` / 上記以外の `Api`) を弾く → 同所で明示的に `false`。 +- `is_retryable` を `error.rs` に置く要件 → 充足。 +- テーブル駆動 unit test → `error.rs:104-121` (retryable) / `:113-121` (non-retryable) / `:128-134` (Json/Sse/Config) で網羅。 + +### バックオフ (要件) +- フルジッタ + 指数バックオフ → `crates/llm-worker/src/llm_client/retry.rs:40-47` で `min(base * 2^attempt, cap)` の上限から jitter で抽選。 +- `Retry-After` で上書き → `transport.rs:182-188` で取り出し、`:255` で `retry_after.unwrap_or_else(|| policy.backoff(...))` により上書き。 +- 上限: 最大試行回数 + 累積タイムアウトの両方 → `transport.rs:251-258`。`max_attempts` (default 4) と `total_timeout` (default 30s) で二段に打ち切り。 + +### ログ (要件) +- リトライ発火ごとに `warn!` (status / attempt / wait) → `transport.rs:259-264`。`error = %err` で表示するためエラー本体に status が含まれる (`ClientError::Display` の "API error (status: N)")。`attempt = next_attempt` と `wait_ms` も出ており要件通り。 + +### 既存挙動の温存 (要件) +- mid-stream `ClientError::Sse` 経路 (旧 `transport.rs:231` 相当) は `bytes_stream()` 以降にあり、retry loop の外。`tests/transport_retry_test.rs:196-227` で「mid-stream Sse は再送しない (受信回数 1)」を検証済み。 +- 成功パスの計測オーバヘッド: 成功時は loop 1 周のみで `break resp` → 余分な await/allocation なし。`headers.clone()` が増えるが retry 不要時にも走る。`HeaderMap` の clone は軽量だが厳密にはわずかな増分あり。問題視するレベルではない。 + +### 完了条件 +- `is_retryable` のテーブル駆動 unit test → ✓ (`error.rs` mod tests) +- 503 / 529 / connect refused モックでリトライ + 上限到達 → ✓ (`transport_retry_test.rs:88-153`) +- `Retry-After: 5` の指数上書き → 実装は「base/cap=1ms に絞った policy で `Retry-After: 1` を観察し、実時間で elapsed >= 1s を assert」(`transport_retry_test.rs:155-194`)。チケット文面の「5s / 仮想時間」とは異なるが、検証している性質 ("指数バックオフを `Retry-After` 値で上書きして待つ") は同一。後述。 +- mid-stream `Sse` で再送しない → ✓ (`:196-227`) +- `cargo check` / `cargo test` → ✓ (実行確認済み: lib 8/8 pass、`transport_retry_test` 6/6 pass、warning は既存の `end_scope` のみで本 PR 由来でない) + +## アーキテクチャ・スコープ + +- 配置: `llm_client/retry.rs` 新設、`error.rs` への `is_retryable` 追加、`transport.rs` 内のリトライループ。すべて `llm-worker` の低レベル基盤に閉じており、上位層 (worker / pod) を巻き込んでいない。`MEMORY.md:llm-worker scope` に整合。 +- scheme 非依存性: `Scheme` trait に retry 関連の API を増やしていない。`HttpTransport` の単一の retry loop で全 scheme をカバーする方針はチケット記載 ("scheme に依存しない共通処理") と一致。 +- 依存追加: `wiremock = "0.6.5"` が `[dev-dependencies]` に追加 (`Cargo.toml:28`)。`cargo add` 経由かは履歴から確認できないが、配置・バージョン指定とも妥当で `MEMORY.md:Use cargo add` の趣旨に反する痕跡は見えない。 +- API 表面: `HttpTransport::with_retry_policy` を新規 pub 公開。チケット範囲外 (manifest 化) に踏み込んでおらず、test 用 + 将来フックとして 1 メソッドのみ。過剰実装ではない。 +- 不要な抽象: なし。`RetryPolicy` は struct + `Default` + `backoff()` のみで、trait 化や builder 化に走っていない。`classify_error_response` は旧 inline ロジックの抽出で、retry loop が再利用するために必要な分解。 + +## 指摘事項 + +### Blocking + +なし。 + +### Non-blocking / Follow-up + +- **`Retry-After` 検証の方針差**: チケット文面は「5s / 仮想時間」、実装は「1s / 実時間」。検証している不変条件 ("`Retry-After` が指数バックオフより大きいときに `Retry-After` 値に従って待つ") は同等で、テスト所要時間も ~1s なので CI への影響も限定的。ただし将来 `tokio::time::pause()` を使う方針に揃えたくなったら、`reqwest`/`wiremock` を実 socket でなく `tower::Service` レベルに張り替える必要があり、軽い改修で済まない。本 PR の判断は妥当だが、ticket 文面とのズレはレビューレポートに残す価値あり。 +- **connect refused テストが retry 回数を assert していない** (`tests/transport_retry_test.rs:136-153`): 最終的に `Http` エラーが返ることだけ確認しているので、`fast_policy(3)` で実際に 3 回試行したかは間接確認のみ。retry loop 自体は wiremock の 503/529 ケースで覆われているので致命傷ではないが、`reqwest::Error::is_connect()` 経路を打つ唯一のテストでもあるので、「connect 失敗が retryable と判定されて確実に retry される」ことを示すために試行回数を観測したい。listener を立てて connection を drop する方式 (`std::net::TcpListener::bind` + `drop`) で観測できる。 +- **Retry-After の HTTP-date 非対応**: 仕様上は秒数のみ対応で OK (ユーザー合意済み)。Anthropic / Codex の実装が将来 HTTP-date 形式を返したらサイレントに無視されて指数バックオフに戻る (現状エラーにはならない)。後追いで気付きにくい挙動なので、`Retry-After` 形式の合意を別ドキュメント (この `.review.md` か後続 ticket) で残しておくと安全。 +- **Custom auth provider のヘッダ取得が retry loop の外** (`transport.rs:228` の `build_headers().await?`): 初回試行のヘッダを clone して使い回す。OAuth access token の有効期限が極端に短い場合、長め (>1 token TTL) の `total_timeout` で粘ると古いトークンを送り続ける可能性がある。現状の Codex OAuth は実質長寿命で問題ないが、retry 中に provider を再 invoke する設計に切り替えたいときの拡張余地として認識しておくと良い。今 ticket では範囲外。 + +### Nits + +- `jitter_nanos` で `max_nanos + 1` のオーバーフロー: `cap = 10s` 想定なら届かないが、将来 cap を `u64::MAX` に近づけたら破綻する。`max_nanos.saturating_add(1)` または `wrapping_rem` 等への置き換えで防御可。実害なし。 +- `tokio::time::Instant` を import しているが、`std::time::Instant` でも実害はない (`tokio::time::sleep` は両方を扱える)。`tokio::time::pause()` 連動を見越した選択であれば、コメントで一行触れておくと意図が伝わりやすい。 + +## 判断 + +**Approve** — チケットの背景・要件・完了条件はすべて満たされており、コードベースの層構造・命名・抽象度を歪めていない。`Retry-After` テストの方針差は実装者が自己申告しているとおり等価な性質を検証しているため非ブロッキング。`with_retry_policy` の pub 化と manifest 送りの分離もチケット記載どおりで、過剰実装ではない。 From 72e03f9e8f98932abd25ecd61b8d750a7acd0da5 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 4 May 2026 12:51:41 +0900 Subject: [PATCH 3/3] =?UTF-8?q?docs(tickets):=20llm-worker-transient-retry?= =?UTF-8?q?=E5=AE=8C=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO.md | 1 - tickets/llm-worker-transient-retry.md | 77 -------------------- tickets/llm-worker-transient-retry.review.md | 62 ---------------- 3 files changed, 140 deletions(-) delete mode 100644 tickets/llm-worker-transient-retry.md delete mode 100644 tickets/llm-worker-transient-retry.review.md diff --git a/TODO.md b/TODO.md index d75ddc1a..e576f387 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ - Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md) - Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md) - llm-worker のエラー耐性 - - HTTP transient リトライ → [tickets/llm-worker-transient-retry.md](tickets/llm-worker-transient-retry.md) - ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md) - ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md) - TUI 拡充 diff --git a/tickets/llm-worker-transient-retry.md b/tickets/llm-worker-transient-retry.md deleted file mode 100644 index 9582ea47..00000000 --- a/tickets/llm-worker-transient-retry.md +++ /dev/null @@ -1,77 +0,0 @@ -# llm-worker: HTTP transient リトライ - -## 背景 - -`crates/llm-worker/src/llm_client/transport.rs` はリトライを持たず、 -upstream が一時的に不調だったときのエラーがそのまま `WorkerError` に -伝播して Run が中断する。実セッションでも以下が観測されている: - -- セッション `019de419-6f8b-71a0-9e56-f0b9a6c7098b.jsonl:85` で - `API error (status: 503): upstream connect error ... Connection refused`。 - これは `transport.rs:194-216` で `response.status().is_success()` 前に - 返される pre-stream の経路。リクエストはまだ消費されていない。 -- Anthropic の `overloaded_error` (529) や Codex backend の 503 も - 同経路で観測される transient な事象。 - -これらは「ヘッダが返る前の段階」で出るため、SSE を読み始めて -出力トークンを発生させる前であり、素朴な再送でべき等に復旧できる典型ケース。 -ストリームが途中で切れた場合のリカバリは別の話(→ `llm-worker-stream-continuation`)。 - -## 方針 - -`transport.rs` の HTTP 送信層に transient エラー向けの再送を追加する。 -SSE 読み出し開始後 (`response.bytes_stream()` 以降) のエラーは対象外で、 -従来どおり `ClientError::Sse` として上に流す。 - -scheme(OpenAI / Anthropic / Responses 等)に依存しない共通処理として、 -すべての client から同じ振る舞いで使える形にする。 - -## 要件 - -### リトライ対象 - -- HTTP ステータス: 408 / 425 / 429 / 500 / 502 / 503 / 504 / 529 -- `reqwest::Error::is_connect()` / `is_timeout()` 由来の送信失敗 -- それ以外の `ClientError::Api { status }` および `ClientError::Json`、 - ストリーム開始後の `ClientError::Sse` は対象外 - -判定は `is_retryable(&ClientError) -> bool` を `error.rs` に置いて一箇所に集約する。 - -### バックオフ - -- フルジッタ付き指数(base/cap は実装時に妥当な値で固定。後で manifest 化したくなったら別ticket) -- `Retry-After` ヘッダがあれば指数バックオフを上書きしてその時間待つ -- 上限: 最大試行回数 + 累積タイムアウトの両方を持つ - -### ログ - -- リトライ発火ごとに `warn!`(ステータス、attempt 番号、次の wait) - -### 既存挙動の温存 - -- ストリーム途中で切れた場合の挙動には手を入れない - (`transport.rs:231` の `ClientError::Sse` 経路はそのまま) -- 成功時のレイテンシに観測可能なオーバヘッドを足さない - -## 完了条件 - -- `is_retryable` のテーブル駆動 unit test -- 503 / 529 / connect refused をモックした unit test が、 - 規定回数までリトライして「最終的に成功」「上限到達でエラー」の両ケースを通る -- `Retry-After: 5` を返すモックでは指数を上書きして 5s 待っている - (仮想時間で検証) -- mid-stream で `ClientError::Sse` を起こすモックでリトライが発火しない -- `cargo check` / `cargo test` が `llm-worker` で通る - -## 範囲外 - -- mid-stream(SSE 読み中)の継続再開 → `llm-worker-stream-continuation` -- プロバイダ別の細かい retry policy(共通既定で十分) -- リトライ上限値の manifest からの上書き(必要になったら別ticket) - -## Review - -- 状態: Approve -- レビュー詳細: [./llm-worker-transient-retry.review.md](./llm-worker-transient-retry.review.md) -- 対象 commit: `7183847` -- 日付: 2026-05-04 diff --git a/tickets/llm-worker-transient-retry.review.md b/tickets/llm-worker-transient-retry.review.md deleted file mode 100644 index fd532080..00000000 --- a/tickets/llm-worker-transient-retry.review.md +++ /dev/null @@ -1,62 +0,0 @@ -# Review: llm-worker HTTP transient リトライ - -対象 commit: `7183847` (`develop` `1451998` から派生) -対象 branch: `llm-worker-transient-retry` - -## 前提・要件の確認 - -### リトライ対象 (要件) -- HTTP ステータス 408 / 425 / 429 / 500 / 502 / 503 / 504 / 529 → `crates/llm-worker/src/llm_client/error.rs:80-89` の `is_retryable` で網羅。 -- `reqwest::Error::is_connect()` / `is_timeout()` → 同 `is_retryable` で対応。 -- それ以外 (`Json` / `Sse` / `Config` / 上記以外の `Api`) を弾く → 同所で明示的に `false`。 -- `is_retryable` を `error.rs` に置く要件 → 充足。 -- テーブル駆動 unit test → `error.rs:104-121` (retryable) / `:113-121` (non-retryable) / `:128-134` (Json/Sse/Config) で網羅。 - -### バックオフ (要件) -- フルジッタ + 指数バックオフ → `crates/llm-worker/src/llm_client/retry.rs:40-47` で `min(base * 2^attempt, cap)` の上限から jitter で抽選。 -- `Retry-After` で上書き → `transport.rs:182-188` で取り出し、`:255` で `retry_after.unwrap_or_else(|| policy.backoff(...))` により上書き。 -- 上限: 最大試行回数 + 累積タイムアウトの両方 → `transport.rs:251-258`。`max_attempts` (default 4) と `total_timeout` (default 30s) で二段に打ち切り。 - -### ログ (要件) -- リトライ発火ごとに `warn!` (status / attempt / wait) → `transport.rs:259-264`。`error = %err` で表示するためエラー本体に status が含まれる (`ClientError::Display` の "API error (status: N)")。`attempt = next_attempt` と `wait_ms` も出ており要件通り。 - -### 既存挙動の温存 (要件) -- mid-stream `ClientError::Sse` 経路 (旧 `transport.rs:231` 相当) は `bytes_stream()` 以降にあり、retry loop の外。`tests/transport_retry_test.rs:196-227` で「mid-stream Sse は再送しない (受信回数 1)」を検証済み。 -- 成功パスの計測オーバヘッド: 成功時は loop 1 周のみで `break resp` → 余分な await/allocation なし。`headers.clone()` が増えるが retry 不要時にも走る。`HeaderMap` の clone は軽量だが厳密にはわずかな増分あり。問題視するレベルではない。 - -### 完了条件 -- `is_retryable` のテーブル駆動 unit test → ✓ (`error.rs` mod tests) -- 503 / 529 / connect refused モックでリトライ + 上限到達 → ✓ (`transport_retry_test.rs:88-153`) -- `Retry-After: 5` の指数上書き → 実装は「base/cap=1ms に絞った policy で `Retry-After: 1` を観察し、実時間で elapsed >= 1s を assert」(`transport_retry_test.rs:155-194`)。チケット文面の「5s / 仮想時間」とは異なるが、検証している性質 ("指数バックオフを `Retry-After` 値で上書きして待つ") は同一。後述。 -- mid-stream `Sse` で再送しない → ✓ (`:196-227`) -- `cargo check` / `cargo test` → ✓ (実行確認済み: lib 8/8 pass、`transport_retry_test` 6/6 pass、warning は既存の `end_scope` のみで本 PR 由来でない) - -## アーキテクチャ・スコープ - -- 配置: `llm_client/retry.rs` 新設、`error.rs` への `is_retryable` 追加、`transport.rs` 内のリトライループ。すべて `llm-worker` の低レベル基盤に閉じており、上位層 (worker / pod) を巻き込んでいない。`MEMORY.md:llm-worker scope` に整合。 -- scheme 非依存性: `Scheme` trait に retry 関連の API を増やしていない。`HttpTransport` の単一の retry loop で全 scheme をカバーする方針はチケット記載 ("scheme に依存しない共通処理") と一致。 -- 依存追加: `wiremock = "0.6.5"` が `[dev-dependencies]` に追加 (`Cargo.toml:28`)。`cargo add` 経由かは履歴から確認できないが、配置・バージョン指定とも妥当で `MEMORY.md:Use cargo add` の趣旨に反する痕跡は見えない。 -- API 表面: `HttpTransport::with_retry_policy` を新規 pub 公開。チケット範囲外 (manifest 化) に踏み込んでおらず、test 用 + 将来フックとして 1 メソッドのみ。過剰実装ではない。 -- 不要な抽象: なし。`RetryPolicy` は struct + `Default` + `backoff()` のみで、trait 化や builder 化に走っていない。`classify_error_response` は旧 inline ロジックの抽出で、retry loop が再利用するために必要な分解。 - -## 指摘事項 - -### Blocking - -なし。 - -### Non-blocking / Follow-up - -- **`Retry-After` 検証の方針差**: チケット文面は「5s / 仮想時間」、実装は「1s / 実時間」。検証している不変条件 ("`Retry-After` が指数バックオフより大きいときに `Retry-After` 値に従って待つ") は同等で、テスト所要時間も ~1s なので CI への影響も限定的。ただし将来 `tokio::time::pause()` を使う方針に揃えたくなったら、`reqwest`/`wiremock` を実 socket でなく `tower::Service` レベルに張り替える必要があり、軽い改修で済まない。本 PR の判断は妥当だが、ticket 文面とのズレはレビューレポートに残す価値あり。 -- **connect refused テストが retry 回数を assert していない** (`tests/transport_retry_test.rs:136-153`): 最終的に `Http` エラーが返ることだけ確認しているので、`fast_policy(3)` で実際に 3 回試行したかは間接確認のみ。retry loop 自体は wiremock の 503/529 ケースで覆われているので致命傷ではないが、`reqwest::Error::is_connect()` 経路を打つ唯一のテストでもあるので、「connect 失敗が retryable と判定されて確実に retry される」ことを示すために試行回数を観測したい。listener を立てて connection を drop する方式 (`std::net::TcpListener::bind` + `drop`) で観測できる。 -- **Retry-After の HTTP-date 非対応**: 仕様上は秒数のみ対応で OK (ユーザー合意済み)。Anthropic / Codex の実装が将来 HTTP-date 形式を返したらサイレントに無視されて指数バックオフに戻る (現状エラーにはならない)。後追いで気付きにくい挙動なので、`Retry-After` 形式の合意を別ドキュメント (この `.review.md` か後続 ticket) で残しておくと安全。 -- **Custom auth provider のヘッダ取得が retry loop の外** (`transport.rs:228` の `build_headers().await?`): 初回試行のヘッダを clone して使い回す。OAuth access token の有効期限が極端に短い場合、長め (>1 token TTL) の `total_timeout` で粘ると古いトークンを送り続ける可能性がある。現状の Codex OAuth は実質長寿命で問題ないが、retry 中に provider を再 invoke する設計に切り替えたいときの拡張余地として認識しておくと良い。今 ticket では範囲外。 - -### Nits - -- `jitter_nanos` で `max_nanos + 1` のオーバーフロー: `cap = 10s` 想定なら届かないが、将来 cap を `u64::MAX` に近づけたら破綻する。`max_nanos.saturating_add(1)` または `wrapping_rem` 等への置き換えで防御可。実害なし。 -- `tokio::time::Instant` を import しているが、`std::time::Instant` でも実害はない (`tokio::time::sleep` は両方を扱える)。`tokio::time::pause()` 連動を見越した選択であれば、コメントで一行触れておくと意図が伝わりやすい。 - -## 判断 - -**Approve** — チケットの背景・要件・完了条件はすべて満たされており、コードベースの層構造・命名・抽象度を歪めていない。`Retry-After` テストの方針差は実装者が自己申告しているとおり等価な性質を検証しているため非ブロッキング。`with_retry_policy` の pub 化と manifest 送りの分離もチケット記載どおりで、過剰実装ではない。