diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index aee5b0b8..54c72795 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -361,17 +361,15 @@ impl Runtime { state.ensure_running()?; state.ensure_worker_ref(worker_ref)?; - let already_status = { + { let worker = state.worker(worker_ref)?; - worker.status == status - }; - if already_status { - let worker = state.worker(worker_ref)?; - return Ok(WorkerLifecycleAck { - worker_ref: worker_ref.clone(), - status: worker.status, - event_id: worker.last_event_id, - }); + if !worker.status.is_active() { + return Ok(WorkerLifecycleAck { + worker_ref: worker_ref.clone(), + status: worker.status, + event_id: worker.last_event_id, + }); + } } let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason); @@ -736,6 +734,96 @@ mod tests { assert_eq!(summary.cancelled_worker_count, 1); } + #[test] + fn stop_then_cancel_preserves_stopped_terminal_state() { + let runtime = Runtime::new_memory(); + let cursor = runtime.event_cursor_from_start().unwrap(); + let worker = runtime + .create_worker(task_request("stable stopped")) + .unwrap(); + + let stop_ack = runtime + .stop_worker(&worker.worker_ref, Some("done".to_string())) + .unwrap(); + let cancel_ack = runtime + .cancel_worker(&worker.worker_ref, Some("late cancel".to_string())) + .unwrap(); + + assert_eq!(stop_ack.status, WorkerStatus::Stopped); + assert_eq!(cancel_ack.status, WorkerStatus::Stopped); + assert_eq!(cancel_ack.event_id, stop_ack.event_id); + assert_eq!( + runtime.worker_detail(&worker.worker_ref).unwrap().status, + WorkerStatus::Stopped + ); + + let summary = runtime.summary().unwrap(); + assert_eq!(summary.active_worker_count, 0); + assert_eq!(summary.stopped_worker_count, 1); + assert_eq!(summary.cancelled_worker_count, 0); + + let events = runtime.read_events(&cursor, 10).unwrap().events; + assert_eq!( + events + .iter() + .filter(|event| event.kind == RuntimeEventKind::WorkerStopped) + .count(), + 1 + ); + assert_eq!( + events + .iter() + .filter(|event| event.kind == RuntimeEventKind::WorkerCancelled) + .count(), + 0 + ); + } + + #[test] + fn cancel_then_stop_preserves_cancelled_terminal_state() { + let runtime = Runtime::new_memory(); + let cursor = runtime.event_cursor_from_start().unwrap(); + let worker = runtime + .create_worker(task_request("stable cancelled")) + .unwrap(); + + let cancel_ack = runtime + .cancel_worker(&worker.worker_ref, Some("abort".to_string())) + .unwrap(); + let stop_ack = runtime + .stop_worker(&worker.worker_ref, Some("late stop".to_string())) + .unwrap(); + + assert_eq!(cancel_ack.status, WorkerStatus::Cancelled); + assert_eq!(stop_ack.status, WorkerStatus::Cancelled); + assert_eq!(stop_ack.event_id, cancel_ack.event_id); + assert_eq!( + runtime.worker_detail(&worker.worker_ref).unwrap().status, + WorkerStatus::Cancelled + ); + + let summary = runtime.summary().unwrap(); + assert_eq!(summary.active_worker_count, 0); + assert_eq!(summary.stopped_worker_count, 0); + assert_eq!(summary.cancelled_worker_count, 1); + + let events = runtime.read_events(&cursor, 10).unwrap().events; + assert_eq!( + events + .iter() + .filter(|event| event.kind == RuntimeEventKind::WorkerCancelled) + .count(), + 1 + ); + assert_eq!( + events + .iter() + .filter(|event| event.kind == RuntimeEventKind::WorkerStopped) + .count(), + 0 + ); + } + #[test] fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() { let runtime = Runtime::new_memory();