diff --git a/crates/llm-worker/src/llm_client/transport.rs b/crates/llm-worker/src/llm_client/transport.rs index 9954e214..63661332 100644 --- a/crates/llm-worker/src/llm_client/transport.rs +++ b/crates/llm-worker/src/llm_client/transport.rs @@ -63,7 +63,7 @@ impl ResolvedAuth { } } -fn header_value_for_diagnostics(headers: &HeaderMap, name: &'static HeaderName) -> Option { +fn header_value_for_diagnostics(headers: &HeaderMap, name: &str) -> Option { headers .get(name) .and_then(|value| value.to_str().ok()) @@ -74,10 +74,25 @@ fn header_value_for_diagnostics(headers: &HeaderMap, name: &'static HeaderName) fn response_header_diagnostics(headers: &HeaderMap) -> serde_json::Value { serde_json::json!({ - "content_type": header_value_for_diagnostics(headers, &CONTENT_TYPE), - "content_encoding": header_value_for_diagnostics(headers, &CONTENT_ENCODING), - "transfer_encoding": header_value_for_diagnostics(headers, &TRANSFER_ENCODING), - "content_length": header_value_for_diagnostics(headers, &CONTENT_LENGTH), + "content_type": header_value_for_diagnostics(headers, CONTENT_TYPE.as_str()), + "content_encoding": header_value_for_diagnostics(headers, CONTENT_ENCODING.as_str()), + "transfer_encoding": header_value_for_diagnostics(headers, TRANSFER_ENCODING.as_str()), + "content_length": header_value_for_diagnostics(headers, CONTENT_LENGTH.as_str()), + }) +} + +fn request_header_diagnostics(headers: &HeaderMap) -> serde_json::Value { + serde_json::json!({ + "content_type": header_value_for_diagnostics(headers, CONTENT_TYPE.as_str()), + "content_encoding": header_value_for_diagnostics(headers, CONTENT_ENCODING.as_str()), + "accept": header_value_for_diagnostics(headers, ACCEPT.as_str()), + "openai_beta": header_value_for_diagnostics(headers, "openai-beta"), + "session_id_present": headers.contains_key("session-id"), + "thread_id_present": headers.contains_key("thread-id"), + "legacy_session_id_present": headers.contains_key("session_id"), + "legacy_thread_id_present": headers.contains_key("thread_id"), + "x_client_request_id_present": headers.contains_key("x-client-request-id"), + "chatgpt_account_id_present": headers.contains_key("chatgpt-account-id"), }) } @@ -211,6 +226,11 @@ impl HttpTransport { let value = HeaderValue::from_str(cache_key).map_err(|e| { ClientError::Config(format!("invalid Codex conversation header: {e}")) })?; + // Codex CLI sends hyphenated session/thread headers to the + // ChatGPT Codex backend. Keep the legacy underscore header for + // existing traces/backends while exposing the current Codex shape. + headers.insert(HeaderName::from_static("session-id"), value.clone()); + headers.insert(HeaderName::from_static("thread-id"), value.clone()); headers.insert(HeaderName::from_static("session_id"), value.clone()); headers.insert(HeaderName::from_static("x-client-request-id"), value); } @@ -451,6 +471,7 @@ impl LlmClient for HttpTransport { json!({ "elapsed_ms": headers_started.elapsed().as_millis() as u64, "headers_len": headers.len(), + "headers": request_header_diagnostics(&headers), }), ); headers @@ -486,6 +507,7 @@ impl LlmClient for HttpTransport { json!({ "elapsed_ms": stream_headers_started.elapsed().as_millis() as u64, "headers_len": headers.len(), + "headers": request_header_diagnostics(&headers), }), ); @@ -520,15 +542,19 @@ impl LlmClient for HttpTransport { return Err(error); } }; + let final_request_headers = request_header_diagnostics(&headers); + let body_compression = request_body.encoding().to_string(); emit_transport_trace( &request, "transport_body_encode_done", json!({ "elapsed_ms": encode_started.elapsed().as_millis() as u64, - "encoding": request_body.encoding(), + "encoding": body_compression.as_str(), + "body_compression": body_compression.as_str(), "raw_json_bytes": request_body.raw_json_bytes(), "wire_bytes": request_body.wire_bytes(), "request_shape": body_shape.clone(), + "headers": final_request_headers.clone(), }), ); @@ -586,6 +612,8 @@ impl LlmClient for HttpTransport { "context_length_exceeded": context_length_exceeded, "provider_usage_absent": context_length_exceeded, "request_shape": body_shape.clone(), + "request_headers": final_request_headers.clone(), + "body_compression": body_compression.as_str(), }), ); return Err(error); @@ -817,10 +845,26 @@ mod tests { let encoded = transport.encode_request_body(&body, &mut headers).unwrap(); assert_eq!(headers.get(ACCEPT).unwrap(), "text/event-stream"); + assert_eq!(headers.get("session-id").unwrap(), "segment-123"); + assert_eq!(headers.get("thread-id").unwrap(), "segment-123"); assert_eq!(headers.get("session_id").unwrap(), "segment-123"); assert_eq!(headers.get("x-client-request-id").unwrap(), "segment-123"); assert_eq!(headers.get(CONTENT_ENCODING).unwrap(), "zstd"); + let diagnostics = request_header_diagnostics(&headers); + assert_eq!(diagnostics["content_type"], "application/json"); + assert_eq!(diagnostics["content_encoding"], "zstd"); + assert_eq!(diagnostics["accept"], "text/event-stream"); + assert!(diagnostics["session_id_present"].as_bool().unwrap()); + assert!(diagnostics["thread_id_present"].as_bool().unwrap()); + assert!(diagnostics["legacy_session_id_present"].as_bool().unwrap()); + assert!( + diagnostics["x_client_request_id_present"] + .as_bool() + .unwrap() + ); + assert!(diagnostics["chatgpt_account_id_present"].as_bool().unwrap()); + let RequestBody::CompressedJson { bytes: compressed, raw_json_bytes, @@ -850,6 +894,8 @@ mod tests { let encoded = transport.encode_request_body(&body, &mut headers).unwrap(); assert_eq!(headers.get(ACCEPT).unwrap(), "text/event-stream"); + assert!(headers.get("session-id").is_none()); + assert!(headers.get("thread-id").is_none()); assert!(headers.get("session_id").is_none()); assert!(headers.get("x-client-request-id").is_none()); assert!(headers.get(CONTENT_ENCODING).is_none()); diff --git a/crates/pod/src/in_flight.rs b/crates/pod/src/in_flight.rs index b8ca6c58..029f487a 100644 --- a/crates/pod/src/in_flight.rs +++ b/crates/pod/src/in_flight.rs @@ -235,8 +235,15 @@ impl InFlightInner { self.remove_first_text_matching(&text); } } - LoggedItem::Reasoning { text, .. } => { - self.remove_first_thinking_matching(text); + LoggedItem::Reasoning { text, summary, .. } => { + if !text.is_empty() { + self.remove_first_thinking_matching(text); + } + for summary_text in summary { + if !summary_text.is_empty() { + self.remove_first_thinking_matching(summary_text); + } + } } LoggedItem::ToolCall { call_id, .. } => { self.remove_tool_call(call_id); @@ -474,4 +481,29 @@ mod tests { let guard = in_flight.snapshot_guard(); assert!(snapshot_from_guard(&guard).is_empty()); } + + #[test] + fn committed_reasoning_summary_clears_matching_in_flight_thinking_blocks() { + let (event_tx, _) = broadcast::channel(16); + let in_flight = InFlightEvents::new(event_tx); + let first = in_flight.thinking_start(); + in_flight.thinking_delta(first, "summary A".into()); + in_flight.thinking_done(first, "".into()); + let second = in_flight.thinking_start(); + in_flight.thinking_delta(second, "summary B".into()); + in_flight.thinking_done(second, "".into()); + + in_flight.clear_for_committed_item_then( + &LoggedItem::Reasoning { + text: String::new(), + summary: vec!["summary A".into(), "summary B".into()], + encrypted_content: Some("opaque".into()), + signature: None, + }, + || (), + ); + + let guard = in_flight.snapshot_guard(); + assert!(snapshot_from_guard(&guard).is_empty()); + } }