yoi/crates/llm-worker/src/llm_client/transport.rs
Hare 7183847ee5
feat(llm-worker): HTTP transient エラーへのリトライを追加
`transport.rs` の HTTP 送信〜ステータスチェック区間に指数バックオフ
+ フルジッターのリトライループを追加する。SSE 読み出し開始後 (
`bytes_stream()` 以降) のエラーは従来どおりそのまま流す。

- `is_retryable(&ClientError)`: 408/425/429/500/502/503/504/529 と
  reqwest の connect/timeout のみ true
- `RetryPolicy` (default: base 500ms / cap 10s / max_attempts 4 /
  total_timeout 30s)
- `Retry-After` ヘッダ (秒数) があればバックオフを上書き
- リトライ発火ごとに warn! でステータス・attempt・wait を出す

ref: tickets/llm-worker-transient-retry.md
2026-05-04 12:45:33 +09:00

296 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! `HttpTransport<S: Scheme>`: すべての LLM wire scheme を共通の 1 本の
//! HTTP クライアントで扱う。
//!
//! 旧 `providers/{anthropic,openai,gemini,ollama}.rs` を置き換える。
//! scheme 固有の差分は [`Scheme`] trait 実装に委譲する。
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, RETRY_AFTER};
use tokio::time::Instant;
use tracing::warn;
use super::auth::{AuthProvider, AuthRequirement};
use super::capability::ModelCapability;
use super::client::{ConfigWarning, LlmClient};
use super::error::{ClientError, is_retryable};
use super::event::Event;
use super::retry::RetryPolicy;
use super::scheme::Scheme;
use super::types::{Request, RequestConfig};
/// `AuthRef` を解決したランタイム表現。`crates/provider` が構築する。
///
/// - `None`: 認証ヘッダを送らないOllama 等の opt-out
/// - `ApiKey`: 静的な API key 文字列
/// - `Custom`: リクエスト毎に動的にヘッダを組み立てるCodex OAuth 等)
#[derive(Debug, Clone)]
pub enum ResolvedAuth {
None,
ApiKey(String),
Custom(Arc<dyn AuthProvider>),
}
impl ResolvedAuth {
/// 認証要件と実際の解決値が噛み合うか検査する。構築時検証用。
///
/// - `ResolvedAuth::None` は認証を付けない宣言なので、どの
/// `AuthRequirement` でも受け入れるOllama の Anthropic scheme
/// 流用は `required_auth = XApiKey` だが認証ヘッダなしで動く)
/// - `ResolvedAuth::Custom` は「ヘッダ組立を全部こちらで行う」
/// 宣言なので、scheme が要求する形式によらず受け入れる
pub fn matches(&self, req: AuthRequirement) -> bool {
match (self, req) {
(Self::None, _) => true,
(Self::Custom(_), _) => true,
(
Self::ApiKey(_),
AuthRequirement::Bearer
| AuthRequirement::XApiKey
| AuthRequirement::QueryParam { .. },
) => true,
_ => false,
}
}
}
/// scheme 共通の HTTP 通信層。
pub struct HttpTransport<S: Scheme> {
http_client: reqwest::Client,
scheme: S,
model_id: String,
base_url: String,
auth: ResolvedAuth,
capability: ModelCapability,
retry_policy: RetryPolicy,
}
impl<S: Scheme> HttpTransport<S> {
/// 新しい transport を作る。`base_url` は末尾スラッシュの有無を
/// どちらでも受け付ける(内部で正規化)。
pub fn new(
scheme: S,
model_id: impl Into<String>,
base_url: impl Into<String>,
auth: ResolvedAuth,
capability: ModelCapability,
) -> Self {
let base_url = base_url.into();
let base_url = base_url.trim_end_matches('/').to_string();
Self {
http_client: reqwest::Client::new(),
scheme,
model_id: model_id.into(),
base_url,
auth,
capability,
retry_policy: RetryPolicy::default(),
}
}
/// カスタム HTTP クライアントを差し込む(テスト等)。
pub fn with_http_client(mut self, client: reqwest::Client) -> Self {
self.http_client = client;
self
}
/// リトライポリシーを差し替える(テスト用 / 将来の manifest 化フック)。
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
fn build_url(&self) -> String {
let path = self.scheme.path(&self.model_id);
let url = format!("{}{}", self.base_url, path);
// Gemini のようにクエリパラメータで認証する場合は URL にキーを追記する
if let (AuthRequirement::QueryParam { name }, ResolvedAuth::ApiKey(key)) =
(self.scheme.required_auth(), &self.auth)
{
let sep = if url.contains('?') { '&' } else { '?' };
format!("{url}{sep}{name}={key}")
} else {
url
}
}
async fn build_headers(&self) -> Result<HeaderMap, ClientError> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
match (&self.auth, self.scheme.required_auth()) {
(ResolvedAuth::None, _) | (_, AuthRequirement::None) => {}
(ResolvedAuth::Custom(provider), _) => {
for (name, mut value) in provider.headers().await? {
value.set_sensitive(true);
headers.insert(name, value);
}
}
(ResolvedAuth::ApiKey(key), AuthRequirement::Bearer) => {
let mut val = HeaderValue::from_str(&format!("Bearer {key}"))
.map_err(|e| ClientError::Config(format!("invalid api key: {e}")))?;
val.set_sensitive(true);
headers.insert("Authorization", val);
}
(ResolvedAuth::ApiKey(key), AuthRequirement::XApiKey) => {
let mut val = HeaderValue::from_str(key.as_str())
.map_err(|e| ClientError::Config(format!("invalid api key: {e}")))?;
val.set_sensitive(true);
headers.insert("x-api-key", val);
}
(_, AuthRequirement::QueryParam { .. }) => {
// クエリパラメータは `build_url` で付与済み
}
(ResolvedAuth::ApiKey(_), AuthRequirement::Custom) => {
// scheme が Custom を要求する組合せに ApiKey は流れてこない想定
// `matches()` で弾かれる)。安全側で何もしない
}
}
for (name, value) in self.scheme.additional_headers() {
let hv = HeaderValue::from_str(&value)
.map_err(|e| ClientError::Config(format!("invalid header {name}: {e}")))?;
headers.insert(name, hv);
}
Ok(headers)
}
}
impl<S: Scheme + Clone> Clone for HttpTransport<S> {
fn clone(&self) -> Self {
Self {
http_client: self.http_client.clone(),
scheme: self.scheme.clone(),
model_id: self.model_id.clone(),
base_url: self.base_url.clone(),
auth: self.auth.clone(),
capability: self.capability.clone(),
retry_policy: self.retry_policy.clone(),
}
}
}
/// エラーレスポンスを `ClientError::Api` に変換し、`Retry-After` の秒数を
/// 同時に取り出す。リトライループで wait の上書きに使う。
async fn classify_error_response(resp: reqwest::Response) -> (ClientError, Option<Duration>) {
let status = resp.status().as_u16();
let retry_after = resp
.headers()
.get(RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.trim().parse::<u64>().ok())
.map(Duration::from_secs);
let text = resp.text().await.unwrap_or_default();
let err = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
let error = json.get("error").unwrap_or(&json);
let code = error.get("type").and_then(|v| v.as_str()).map(String::from);
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or(&text)
.to_string();
ClientError::Api {
status: Some(status),
code,
message,
}
} else {
ClientError::Api {
status: Some(status),
code: None,
message: text,
}
};
(err, retry_after)
}
#[async_trait]
impl<S: Scheme + Clone + 'static> LlmClient for HttpTransport<S> {
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
fn validate_config(&self, config: &RequestConfig) -> Vec<ConfigWarning> {
self.scheme.validate_config(config)
}
async fn stream(
&self,
request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError> {
let url = self.build_url();
let headers = self.build_headers().await?;
let body = self
.scheme
.build_request_body(&self.model_id, &request, &self.capability);
let policy = &self.retry_policy;
let started = Instant::now();
let mut attempt: u32 = 0;
let response = loop {
let send_result = self
.http_client
.post(&url)
.headers(headers.clone())
.json(&body)
.send()
.await;
let (err, retry_after) = match send_result {
Ok(resp) if resp.status().is_success() => break resp,
Ok(resp) => classify_error_response(resp).await,
Err(e) => (ClientError::Http(e), None),
};
let next_attempt = attempt + 1;
if next_attempt >= policy.max_attempts || !is_retryable(&err) {
return Err(err);
}
let wait = retry_after.unwrap_or_else(|| policy.backoff(attempt));
if started.elapsed() + wait > policy.total_timeout {
return Err(err);
}
warn!(
error = %err,
attempt = next_attempt,
wait_ms = wait.as_millis() as u64,
"transient HTTP error, retrying"
);
tokio::time::sleep(wait).await;
attempt = next_attempt;
};
let scheme = self.scheme.clone();
let byte_stream = response.bytes_stream().map_err(std::io::Error::other);
let event_stream = byte_stream.eventsource();
// scheme 固有のパース状態をストリーム単位で保持する
let mut state = <S::State as Default>::default();
let stream = event_stream
.map(move |result| match result {
Ok(frame) => match scheme.parse_sse(&frame.event, &frame.data, &mut state) {
Ok(events) => Ok(events),
Err(e) => Err(e),
},
Err(e) => Err(ClientError::Sse(e.to_string())),
})
.map(|res| {
let s: Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>> = match res {
Ok(events) => Box::pin(futures::stream::iter(events.into_iter().map(Ok))),
Err(e) => Box::pin(futures::stream::once(async move { Err(e) })),
};
s
})
.flatten();
Ok(Box::pin(stream))
}
}