From bb73dc6a45a7b7e3c646ee381ad9a74f85ee749e Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 7 Jan 2026 22:04:32 +0900 Subject: [PATCH] fix: inject missing block_type into Anthropic BlockStop events --- worker/src/llm_client/providers/anthropic.rs | 41 ++++++++++++++------ 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/worker/src/llm_client/providers/anthropic.rs b/worker/src/llm_client/providers/anthropic.rs index 27d37d0..70fd74e 100644 --- a/worker/src/llm_client/providers/anthropic.rs +++ b/worker/src/llm_client/providers/anthropic.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use async_trait::async_trait; use eventsource_stream::Eventsource; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{future::ready, Stream, StreamExt, TryStreamExt}; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue}; use worker_types::Event; @@ -137,22 +137,41 @@ impl LlmClient for AnthropicClient { .map_err(|e| std::io::Error::other(e)); let event_stream = byte_stream.eventsource(); - let stream = event_stream.map(move |result| { - match result { + // AnthropicはBlockStopイベントに正しいblock_typeを含まないため、 + // クライアント側で状態を追跡して補完する + let mut current_block_type = None; + + let stream = event_stream.filter_map(move |result| { + ready(match result { Ok(event) => { // SSEイベントをパース match scheme.parse_event(&event.event, &event.data) { - Ok(Some(evt)) => Ok(evt), - Ok(None) => { - // イベントを無視(空のStatusで代用し、後でフィルタ) - // 実際にはOptionを返すべきだが、Stream型の都合上こうする - Ok(Event::Ping(worker_types::PingEvent { timestamp: None })) + Ok(Some(mut evt)) => { + // ブロックタイプの追跡と修正 + match &evt { + Event::BlockStart(start) => { + current_block_type = Some(start.block_type); + } + Event::BlockStop(stop) => { + if let Some(block_type) = current_block_type.take() { + // 正しいブロックタイプで上書き + // (Event::BlockStopの中身を置換) + evt = Event::BlockStop(worker_types::BlockStop { + block_type, + ..stop.clone() + }); + } + } + _ => {} + } + Some(Ok(evt)) } - Err(e) => Err(e), + Ok(None) => None, + Err(e) => Some(Err(e)), } } - Err(e) => Err(ClientError::Sse(e.to_string())), - } + Err(e) => Some(Err(ClientError::Sse(e.to_string()))), + }) }); Ok(Box::pin(stream))