fix: keep worker terminal lifecycle stable
This commit is contained in:
parent
593db95175
commit
fbd358a195
|
|
@ -361,18 +361,16 @@ impl Runtime {
|
||||||
state.ensure_running()?;
|
state.ensure_running()?;
|
||||||
state.ensure_worker_ref(worker_ref)?;
|
state.ensure_worker_ref(worker_ref)?;
|
||||||
|
|
||||||
let already_status = {
|
{
|
||||||
let worker = state.worker(worker_ref)?;
|
|
||||||
worker.status == status
|
|
||||||
};
|
|
||||||
if already_status {
|
|
||||||
let worker = state.worker(worker_ref)?;
|
let worker = state.worker(worker_ref)?;
|
||||||
|
if !worker.status.is_active() {
|
||||||
return Ok(WorkerLifecycleAck {
|
return Ok(WorkerLifecycleAck {
|
||||||
worker_ref: worker_ref.clone(),
|
worker_ref: worker_ref.clone(),
|
||||||
status: worker.status,
|
status: worker.status,
|
||||||
event_id: worker.last_event_id,
|
event_id: worker.last_event_id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason);
|
let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason);
|
||||||
let worker = state.worker_mut(worker_ref)?;
|
let worker = state.worker_mut(worker_ref)?;
|
||||||
|
|
@ -736,6 +734,96 @@ mod tests {
|
||||||
assert_eq!(summary.cancelled_worker_count, 1);
|
assert_eq!(summary.cancelled_worker_count, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stop_then_cancel_preserves_stopped_terminal_state() {
|
||||||
|
let runtime = Runtime::new_memory();
|
||||||
|
let cursor = runtime.event_cursor_from_start().unwrap();
|
||||||
|
let worker = runtime
|
||||||
|
.create_worker(task_request("stable stopped"))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let stop_ack = runtime
|
||||||
|
.stop_worker(&worker.worker_ref, Some("done".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
let cancel_ack = runtime
|
||||||
|
.cancel_worker(&worker.worker_ref, Some("late cancel".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(stop_ack.status, WorkerStatus::Stopped);
|
||||||
|
assert_eq!(cancel_ack.status, WorkerStatus::Stopped);
|
||||||
|
assert_eq!(cancel_ack.event_id, stop_ack.event_id);
|
||||||
|
assert_eq!(
|
||||||
|
runtime.worker_detail(&worker.worker_ref).unwrap().status,
|
||||||
|
WorkerStatus::Stopped
|
||||||
|
);
|
||||||
|
|
||||||
|
let summary = runtime.summary().unwrap();
|
||||||
|
assert_eq!(summary.active_worker_count, 0);
|
||||||
|
assert_eq!(summary.stopped_worker_count, 1);
|
||||||
|
assert_eq!(summary.cancelled_worker_count, 0);
|
||||||
|
|
||||||
|
let events = runtime.read_events(&cursor, 10).unwrap().events;
|
||||||
|
assert_eq!(
|
||||||
|
events
|
||||||
|
.iter()
|
||||||
|
.filter(|event| event.kind == RuntimeEventKind::WorkerStopped)
|
||||||
|
.count(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
events
|
||||||
|
.iter()
|
||||||
|
.filter(|event| event.kind == RuntimeEventKind::WorkerCancelled)
|
||||||
|
.count(),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cancel_then_stop_preserves_cancelled_terminal_state() {
|
||||||
|
let runtime = Runtime::new_memory();
|
||||||
|
let cursor = runtime.event_cursor_from_start().unwrap();
|
||||||
|
let worker = runtime
|
||||||
|
.create_worker(task_request("stable cancelled"))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let cancel_ack = runtime
|
||||||
|
.cancel_worker(&worker.worker_ref, Some("abort".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
let stop_ack = runtime
|
||||||
|
.stop_worker(&worker.worker_ref, Some("late stop".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(cancel_ack.status, WorkerStatus::Cancelled);
|
||||||
|
assert_eq!(stop_ack.status, WorkerStatus::Cancelled);
|
||||||
|
assert_eq!(stop_ack.event_id, cancel_ack.event_id);
|
||||||
|
assert_eq!(
|
||||||
|
runtime.worker_detail(&worker.worker_ref).unwrap().status,
|
||||||
|
WorkerStatus::Cancelled
|
||||||
|
);
|
||||||
|
|
||||||
|
let summary = runtime.summary().unwrap();
|
||||||
|
assert_eq!(summary.active_worker_count, 0);
|
||||||
|
assert_eq!(summary.stopped_worker_count, 0);
|
||||||
|
assert_eq!(summary.cancelled_worker_count, 1);
|
||||||
|
|
||||||
|
let events = runtime.read_events(&cursor, 10).unwrap().events;
|
||||||
|
assert_eq!(
|
||||||
|
events
|
||||||
|
.iter()
|
||||||
|
.filter(|event| event.kind == RuntimeEventKind::WorkerCancelled)
|
||||||
|
.count(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
events
|
||||||
|
.iter()
|
||||||
|
.filter(|event| event.kind == RuntimeEventKind::WorkerStopped)
|
||||||
|
.count(),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() {
|
fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() {
|
||||||
let runtime = Runtime::new_memory();
|
let runtime = Runtime::new_memory();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user