From cf4c454a03a8cdc797a1ad433ef661d09abfd0eb Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 28 Apr 2026 16:10:48 +0900 Subject: [PATCH] =?UTF-8?q?TUI=E3=81=ABThinking=E3=82=92=E8=A1=A8=E7=A4=BA?= =?UTF-8?q?=E3=81=99=E3=82=8B=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/llm-worker/src/callback.rs | 79 ++++++++++++++++++++++++- crates/llm-worker/src/lib.rs | 2 +- crates/llm-worker/src/worker.rs | 19 +++++- crates/pod/src/controller.rs | 20 +++++++ crates/protocol/src/lib.rs | 43 ++++++++++++++ crates/tui/src/app.rs | 90 +++++++++++++++++++++++++++- crates/tui/src/block.rs | 22 +++++++ crates/tui/src/ui.rs | 98 ++++++++++++++++++++++++++++++- 8 files changed, 366 insertions(+), 7 deletions(-) diff --git a/crates/llm-worker/src/callback.rs b/crates/llm-worker/src/callback.rs index 7d431bbd..644e0178 100644 --- a/crates/llm-worker/src/callback.rs +++ b/crates/llm-worker/src/callback.rs @@ -7,8 +7,8 @@ use std::marker::PhantomData; use crate::handler::{ - Handler, Kind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, ToolUseBlockKind, - ToolUseBlockStart, + Handler, Kind, TextBlockEvent, TextBlockKind, ThinkingBlockEvent, ThinkingBlockKind, + ToolUseBlockEvent, ToolUseBlockKind, ToolUseBlockStart, }; use crate::tool::ToolCall; @@ -95,6 +95,81 @@ impl Handler for ClosureTextBlockHandler { } } +// ============================================================================= +// ThinkingBlock Closure Handler +// ============================================================================= + +/// Callback scope for a thinking block. +/// +/// Mirrors `TextBlockScope`. Some providers (or some configurations) +/// emit thinking metadata without plaintext deltas — in that case the +/// block fires `Start` and `Stop` with no `Delta` in between, which is +/// expected and not an error. +pub struct ThinkingBlockScope { + pub(crate) on_delta: Option>, + pub(crate) on_stop: Option>, +} + +impl ThinkingBlockScope { + fn new() -> Self { + Self { + on_delta: None, + on_stop: None, + } + } + + /// Register a callback for each thinking text delta (streaming fragment). + pub fn on_delta(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) { + self.on_delta = Some(Box::new(f)); + } + + /// Register a callback invoked when the block completes. + /// + /// Receives the full accumulated thinking text. May be empty when + /// the provider didn't emit any plaintext deltas. + pub fn on_stop(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) { + self.on_stop = Some(Box::new(f)); + } +} + +#[derive(Default)] +pub(crate) struct ThinkingBlockClosureState { + on_delta: Option>, + on_stop: Option>, + buffer: String, +} + +pub(crate) struct ClosureThinkingBlockHandler { + pub(crate) setup: Box, +} + +impl Handler for ClosureThinkingBlockHandler { + type Scope = ThinkingBlockClosureState; + + fn on_event(&mut self, scope: &mut Self::Scope, event: &ThinkingBlockEvent) { + match event { + ThinkingBlockEvent::Start(_) => { + scope.buffer.clear(); + let mut builder = ThinkingBlockScope::new(); + (self.setup)(&mut builder); + scope.on_delta = builder.on_delta; + scope.on_stop = builder.on_stop; + } + ThinkingBlockEvent::Delta(text) => { + scope.buffer.push_str(text); + if let Some(f) = &mut scope.on_delta { + f(text); + } + } + ThinkingBlockEvent::Stop(_) => { + if let Some(f) = &mut scope.on_stop { + f(&scope.buffer); + } + } + } + } +} + // ============================================================================= // ToolUseBlock Closure Handler // ============================================================================= diff --git a/crates/llm-worker/src/lib.rs b/crates/llm-worker/src/lib.rs index 0630334c..7815e96d 100644 --- a/crates/llm-worker/src/lib.rs +++ b/crates/llm-worker/src/lib.rs @@ -53,7 +53,7 @@ pub mod tool; pub mod tool_server; pub mod usage_record; -pub use callback::{TextBlockScope, ToolUseBlockScope}; +pub use callback::{TextBlockScope, ThinkingBlockScope, ToolUseBlockScope}; pub use handler::ToolUseBlockStart; pub use interceptor::Interceptor; pub use message::{ContentPart, Item, Message, Role}; diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 45465c3d..2285e217 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -8,8 +8,8 @@ use tracing::{debug, info, trace, warn}; use crate::{ Item, callback::{ - ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope, - ToolUseBlockScope, + ClosureMetaHandler, ClosureTextBlockHandler, ClosureThinkingBlockHandler, + ClosureToolUseBlockHandler, TextBlockScope, ThinkingBlockScope, ToolUseBlockScope, }, handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind}, interceptor::{ @@ -237,6 +237,21 @@ impl Worker { }); } + /// Register a thinking block observer with scoped callbacks. + /// + /// Mirrors `on_text_block`. Some providers don't expose plaintext + /// reasoning content; in that case the block fires Start and Stop + /// with no Delta in between, and `on_stop` receives an empty string. + pub fn on_thinking_block( + &mut self, + setup: impl FnMut(&mut ThinkingBlockScope) + Send + Sync + 'static, + ) { + self.timeline + .on_thinking_block(ClosureThinkingBlockHandler { + setup: Box::new(setup), + }); + } + /// Register a tool use block observer with scoped callbacks. /// /// The setup closure receives `&ToolUseBlockStart` (containing `id` and `name`) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 7ae5637c..7a5370cb 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -141,6 +141,26 @@ impl PodController { }); }); + let tx = event_tx.clone(); + worker.on_thinking_block(move |block| { + // Start fires unconditionally so the TUI can show + // "Thinking..." even when the provider doesn't emit + // plaintext deltas. + let _ = tx.send(Event::ThinkingStart); + let tx_d = tx.clone(); + block.on_delta(move |text| { + let _ = tx_d.send(Event::ThinkingDelta { + text: text.to_owned(), + }); + }); + let tx_s = tx.clone(); + block.on_stop(move |text| { + let _ = tx_s.send(Event::ThinkingDone { + text: text.to_owned(), + }); + }); + }); + let tx = event_tx.clone(); worker.on_tool_use_block(move |start, block| { let _ = tx.send(Event::ToolCallStart { diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 1e0a496f..7f9a460e 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -178,6 +178,21 @@ pub enum Event { TextDone { text: String, }, + /// A reasoning / thinking block has started. + /// + /// Always paired with a `ThinkingDone`. `ThinkingDelta` is optional: + /// some providers (or some configurations) emit thinking metadata + /// without plaintext, in which case Start → Done arrive with no + /// deltas in between. Multiple thinking blocks per turn are allowed. + ThinkingStart, + ThinkingDelta { + text: String, + }, + /// Thinking block completed. `text` is the full accumulated body + /// (empty string when the provider didn't emit plaintext). + ThinkingDone { + text: String, + }, ToolCallStart { id: String, name: String, @@ -468,6 +483,34 @@ mod tests { assert_eq!(parsed["data"]["text"], "Hello"); } + #[test] + fn event_thinking_roundtrip() { + for event in [ + Event::ThinkingStart, + Event::ThinkingDelta { + text: "step 1".into(), + }, + Event::ThinkingDone { + text: "step 1\nstep 2".into(), + }, + ] { + let json = serde_json::to_string(&event).unwrap(); + let decoded: Event = serde_json::from_str(&json).unwrap(); + match (&event, &decoded) { + (Event::ThinkingStart, Event::ThinkingStart) => {} + (Event::ThinkingDelta { text: a }, Event::ThinkingDelta { text: b }) + | (Event::ThinkingDone { text: a }, Event::ThinkingDone { text: b }) => { + assert_eq!(a, b); + } + _ => panic!("variant mismatch: {event:?} vs {decoded:?}"), + } + } + + let parsed: serde_json::Value = + serde_json::from_str(&serde_json::to_string(&Event::ThinkingStart).unwrap()).unwrap(); + assert_eq!(parsed["event"], "thinking_start"); + } + #[test] fn event_run_end_format() { let event = Event::RunEnd { diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 5df5c6d0..2671bb0e 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1,6 +1,10 @@ +use std::time::Instant; + use protocol::{AlertLevel, AlertSource, Event, Method, RunResult, Segment}; -use crate::block::{Block, CompactEvent, ToolCallBlock, ToolCallState}; +use crate::block::{ + Block, CompactEvent, ThinkingBlock, ThinkingState, ToolCallBlock, ToolCallState, +}; use crate::cache::FileCache; use crate::input::InputBuffer; use crate::scroll::Scroll; @@ -111,9 +115,40 @@ impl App { Event::TextDone { .. } => { self.assistant_streaming = false; } + Event::ThinkingStart => { + self.assistant_streaming = false; + self.blocks.push(Block::Thinking(ThinkingBlock { + text: String::new(), + state: ThinkingState::Streaming { + started_at: Instant::now(), + }, + })); + } + Event::ThinkingDelta { text } => { + if let Some(b) = self.last_streaming_thinking_mut() { + b.text.push_str(&text); + } + } + Event::ThinkingDone { text } => { + if let Some(b) = self.last_streaming_thinking_mut() { + if b.text.is_empty() { + b.text = text; + } + let elapsed = match &b.state { + ThinkingState::Streaming { started_at } => { + Some(started_at.elapsed().as_secs()) + } + _ => None, + }; + b.state = ThinkingState::Finished { + elapsed_secs: elapsed, + }; + } + } Event::TurnEnd { .. } => { self.assistant_streaming = false; self.mark_orphan_tool_calls_incomplete(); + self.mark_orphan_thinking_incomplete(); self.current_tool = None; } Event::ToolCallStart { id, name } => { @@ -272,6 +307,39 @@ impl App { self.assistant_streaming = true; } + /// Walk the most recently pushed blocks looking for a thinking + /// block that's still in `Streaming`. Stops at the current + /// `TurnHeader` to avoid latching onto a thinking block from a + /// previous turn after it was somehow left dangling. + fn last_streaming_thinking_mut(&mut self) -> Option<&mut ThinkingBlock> { + for b in self.blocks.iter_mut().rev() { + match b { + Block::Thinking(t) if matches!(t.state, ThinkingState::Streaming { .. }) => { + return Some(t); + } + Block::TurnHeader { .. } => return None, + _ => continue, + } + } + None + } + + fn mark_orphan_thinking_incomplete(&mut self) { + for b in self.blocks.iter_mut().rev() { + match b { + Block::Thinking(t) => { + if let ThinkingState::Streaming { started_at } = t.state { + t.state = ThinkingState::Incomplete { + elapsed_secs: Some(started_at.elapsed().as_secs()), + }; + } + } + Block::TurnHeader { .. } => break, + _ => {} + } + } + } + fn find_tool_call_mut(&mut self, id: &str) -> Option<&mut ToolCallBlock> { for b in self.blocks.iter_mut().rev() { if let Block::ToolCall(tc) = b @@ -396,6 +464,26 @@ impl App { edit_snapshot: None, })); } + "reasoning" => { + let text = item["text"].as_str().unwrap_or("").to_owned(); + let body = if text.is_empty() { + item["summary"] + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str()) + .collect::>() + .join("\n") + }) + .unwrap_or_default() + } else { + text + }; + self.blocks.push(Block::Thinking(ThinkingBlock { + text: body, + state: ThinkingState::Finished { elapsed_secs: None }, + })); + } "tool_result" => { let id = item["call_id"].as_str().unwrap_or("").to_owned(); let summary = item["summary"].as_str().unwrap_or("").to_owned(); diff --git a/crates/tui/src/block.rs b/crates/tui/src/block.rs index 0d173829..dba41865 100644 --- a/crates/tui/src/block.rs +++ b/crates/tui/src/block.rs @@ -7,6 +7,8 @@ #![allow(dead_code)] // Phase 5 will consume `output` in detail mode. +use std::time::Instant; + use protocol::{AlertLevel, AlertSource, Greeting, Segment}; pub enum Block { @@ -20,6 +22,7 @@ pub enum Block { AssistantText { text: String, }, + Thinking(ThinkingBlock), ToolCall(ToolCallBlock), Alert { level: AlertLevel, @@ -34,6 +37,25 @@ pub enum Block { }, } +pub struct ThinkingBlock { + /// Accumulated reasoning body. Empty for providers that emit only + /// metadata (no plaintext deltas). + pub text: String, + pub state: ThinkingState, +} + +pub enum ThinkingState { + /// Live block: actively streaming. `started_at` is `None` only for + /// blocks materialised from `Event::History`, which never enter the + /// streaming state. + Streaming { started_at: Instant }, + /// Block ended cleanly with `ThinkingDone`. + Finished { elapsed_secs: Option }, + /// `TurnEnd` arrived before `ThinkingDone`. Elapsed time is frozen + /// at the last observed instant. + Incomplete { elapsed_secs: Option }, +} + pub enum CompactEvent { Start, Done { new_session_id: uuid::Uuid }, diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index badbb3ea..8de4032e 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -23,7 +23,7 @@ use unicode_width::{UnicodeWidthChar, UnicodeWidthStr}; use protocol::{AlertLevel, Greeting, Segment}; use crate::app::{App, alert_source_label, fmt_tokens}; -use crate::block::{Block, CompactEvent}; +use crate::block::{Block, CompactEvent, ThinkingBlock, ThinkingState}; /// Display density for the history view. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -298,6 +298,7 @@ fn render_block_into(lines: &mut Vec>, block: &Block, width: u16, Mode::Overview => push_overview_line(lines, text, width, MessageKind::Assistant, ""), _ => push_padded_lines(lines, text, MessageKind::Assistant), }, + Block::Thinking(t) => render_thinking(lines, t, width, mode), // ToolCall is dispatched in `compute_history` via `tool::render_tool` // so it can consume multiple adjacent blocks (Read aggregation). Block::ToolCall(_) => unreachable!("ToolCall handled by compute_history"), @@ -541,6 +542,97 @@ fn count_visual_rows(text: &str, width: u16) -> usize { total.max(1) } +fn render_thinking(lines: &mut Vec>, t: &ThinkingBlock, width: u16, mode: Mode) { + let header_style = kind_style(MessageKind::Thinking); + let body_style = Style::default().fg(Color::DarkGray); + + let header = match &t.state { + ThinkingState::Streaming { started_at } => { + let secs = started_at.elapsed().as_secs(); + format!("Thinking... ({})", fmt_elapsed(secs)) + } + ThinkingState::Finished { elapsed_secs } => match elapsed_secs { + Some(s) => format!("Thought for {}", fmt_elapsed(*s)), + None => "Thought".to_owned(), + }, + ThinkingState::Incomplete { elapsed_secs } => match elapsed_secs { + Some(s) => format!("Thinking interrupted ({})", fmt_elapsed(*s)), + None => "Thinking interrupted".to_owned(), + }, + }; + + if matches!(mode, Mode::Overview) { + push_overview_line(lines, &header, width, MessageKind::Thinking, ""); + return; + } + + lines.push(Line::from(Span::styled(header, header_style))); + + if t.text.is_empty() { + return; + } + + match mode { + Mode::Detail => { + for raw in t.text.lines() { + lines.push(Line::from(vec![ + Span::styled(" ", body_style), + Span::styled(raw.to_owned(), body_style), + ])); + } + } + Mode::Normal => { + // Streaming: show the *latest* tail to keep the cursor of + // attention near where new tokens are appearing. Finished: + // show the first line as a static preview — collapsing it + // entirely would lose the only context most users want + // ("what was it thinking about"). + let preview = match &t.state { + ThinkingState::Streaming { .. } => trailing_line_preview(&t.text), + _ => first_line_preview(&t.text), + }; + if !preview.is_empty() { + let budget = width.saturating_sub(2) as usize; + let truncated = truncate_with_ellipsis(&preview, budget); + lines.push(Line::from(vec![ + Span::styled(" ", body_style), + Span::styled(truncated, body_style), + ])); + } + } + Mode::Overview => unreachable!("handled above"), + } +} + +/// Last segment of `text` after the final newline (or the whole string +/// if it has no newline). Used as the live "what is it thinking now" +/// 1-liner. +fn trailing_line_preview(text: &str) -> String { + text.rsplit_once('\n') + .map(|(_, tail)| tail) + .unwrap_or(text) + .trim_end() + .to_owned() +} + +/// First non-empty line of `text`. Used as the static preview after a +/// thinking block finishes, mirroring the "first line + (+N lines)" +/// idiom of the overview mode. +fn first_line_preview(text: &str) -> String { + text.lines() + .find(|l| !l.trim().is_empty()) + .unwrap_or("") + .to_owned() +} + +fn fmt_elapsed(secs: u64) -> String { + if secs < 60 { + format!("{secs}s") + } else { + format!("{}m{:02}s", secs / 60, secs % 60) + } +} + fn render_compact(lines: &mut Vec>, evt: &CompactEvent, width: u16, mode: Mode) { let (text, kind) = match evt { CompactEvent::Start => ("[compact] starting".to_owned(), MessageKind::NoticeWarn), @@ -745,6 +837,7 @@ pub enum MessageKind { TurnHeader, User, Assistant, + Thinking, TurnStats, NoticeWarn, NoticeError, @@ -755,6 +848,9 @@ pub fn kind_style(kind: MessageKind) -> Style { MessageKind::TurnHeader => Style::default().fg(Color::DarkGray), MessageKind::User => Style::default().fg(Color::Green), MessageKind::Assistant => Style::default().fg(Color::White), + MessageKind::Thinking => Style::default() + .fg(Color::Magenta) + .add_modifier(Modifier::ITALIC), MessageKind::TurnStats => Style::default().fg(Color::DarkGray), MessageKind::NoticeWarn => Style::default() .fg(Color::Black)