fix: inject missing block_type into Anthropic BlockStop events

This commit is contained in:
Keisuke Hirata 2026-01-07 22:04:32 +09:00
parent 1fbd4c8380
commit bb73dc6a45

View File

@ -6,7 +6,7 @@ use std::pin::Pin;
use async_trait::async_trait;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{future::ready, Stream, StreamExt, TryStreamExt};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
use worker_types::Event;
@ -137,22 +137,41 @@ impl LlmClient for AnthropicClient {
.map_err(|e| std::io::Error::other(e));
let event_stream = byte_stream.eventsource();
let stream = event_stream.map(move |result| {
match result {
// AnthropicはBlockStopイベントに正しいblock_typeを含まないため、
// クライアント側で状態を追跡して補完する
let mut current_block_type = None;
let stream = event_stream.filter_map(move |result| {
ready(match result {
Ok(event) => {
// SSEイベントをパース
match scheme.parse_event(&event.event, &event.data) {
Ok(Some(evt)) => Ok(evt),
Ok(None) => {
// イベントを無視空のStatusで代用し、後でフィルタ
// 実際にはOption<Event>を返すべきだが、Stream型の都合上こうする
Ok(Event::Ping(worker_types::PingEvent { timestamp: None }))
Ok(Some(mut evt)) => {
// ブロックタイプの追跡と修正
match &evt {
Event::BlockStart(start) => {
current_block_type = Some(start.block_type);
}
Err(e) => Err(e),
Event::BlockStop(stop) => {
if let Some(block_type) = current_block_type.take() {
// 正しいブロックタイプで上書き
// (Event::BlockStopの中身を置換)
evt = Event::BlockStop(worker_types::BlockStop {
block_type,
..stop.clone()
});
}
}
Err(e) => Err(ClientError::Sse(e.to_string())),
_ => {}
}
Some(Ok(evt))
}
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
Err(e) => Some(Err(ClientError::Sse(e.to_string()))),
})
});
Ok(Box::pin(stream))