3.0 KiB
3.0 KiB
非同期キャンセル設計
Workerの非同期キャンセル機構についての設計ドキュメント。
概要
tokio_util::sync::CancellationTokenを用いて、別タスクからWorkerの実行を安全にキャンセルできる。
let worker = Arc::new(Mutex::new(Worker::new(client)));
// 実行タスク
let w = worker.clone();
let handle = tokio::spawn(async move {
w.lock().await.run("prompt").await
});
// キャンセル
worker.lock().await.cancel();
キャンセルポイント
キャンセルは以下のタイミングでチェックされる:
- ターンループ先頭 —
is_cancelled()で即座にチェック - ストリーム開始前 —
client.stream()呼び出し時 - ストリーム受信中 —
tokio::select!で各イベント受信と並行監視 - ツール実行中 —
join_all()と並行監視
キャンセル時の処理フロー
キャンセル検知
↓
timeline.abort_current_block() // 進行中ブロックの終端処理
↓
run_on_abort_hooks("Cancelled") // on_abort フック呼び出し
↓
Err(WorkerError::Cancelled) // エラー返却
API
| メソッド | 説明 |
|---|---|
cancel() |
キャンセルをトリガー |
is_cancelled() |
キャンセル状態を確認 |
cancellation_token() |
トークンへの参照を取得(clone()してタスク間で共有可能) |
on_abort フック
Hook::on_abort(&self, reason: &str)がキャンセル時に呼ばれる。
クリーンアップ処理やログ記録に使用できる。
async fn on_abort(&self, reason: &str) -> Result<(), HookError> {
log::info!("Aborted: {}", reason);
Ok(())
}
呼び出しタイミング:
WorkerError::Cancelled— reason:"Cancelled"ControlFlow::Abort(reason)— reason: フックが指定した理由
既知の問題
1. キャンセルトークンの再利用不可
CancellationTokenは一度キャンセルされると永続的にキャンセル状態になる。
同じWorkerインスタンスで再度run()を呼ぶと即座にCancelledエラーになる。
対応案:
run()開始時に新しいトークンを生成するreset_cancellation()メソッドを提供する
2. Sync バウンドの追加(破壊的変更)
tokio::select!使用のため、Handler/Scope型にSyncバウンドを追加した。
既存のユーザーコードでSync未実装の型を使用している場合、コンパイルエラーになる。
3. エラー時のon_abort呼び出し
現在、on_abortはキャンセルとフックAbort時のみ呼ばれる。
ストリームエラー等のその他エラー時には呼ばれないため、一貫性に欠ける可能性がある。