mas_storage/queue/worker.rs
1// Copyright 2024, 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
4// Please see LICENSE files in the repository root for full details.
5
6//! Repository to interact with workers in the job queue
7
8use async_trait::async_trait;
9use chrono::Duration;
10use mas_data_model::Clock;
11use rand_core::RngCore;
12use ulid::Ulid;
13
14use crate::repository_impl;
15
16/// A worker is an entity which can execute jobs.
17pub struct Worker {
18 /// The ID of the worker.
19 pub id: Ulid,
20}
21
22/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
23/// worker.
24#[async_trait]
25pub trait QueueWorkerRepository: Send + Sync {
26 /// The error type returned by the repository.
27 type Error;
28
29 /// Register a new worker.
30 ///
31 /// Returns a reference to the worker.
32 ///
33 /// # Errors
34 ///
35 /// Returns an error if the underlying repository fails.
36 async fn register(
37 &mut self,
38 rng: &mut (dyn RngCore + Send),
39 clock: &dyn Clock,
40 ) -> Result<Worker, Self::Error>;
41
42 /// Send a heartbeat for the given worker.
43 ///
44 /// # Errors
45 ///
46 /// Returns an error if the underlying repository fails or if the worker was
47 /// shutdown.
48 async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
49
50 /// Mark the given worker as shutdown.
51 ///
52 /// # Errors
53 ///
54 /// Returns an error if the underlying repository fails.
55 async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
56
57 /// Find dead workers and shut them down.
58 ///
59 /// # Errors
60 ///
61 /// Returns an error if the underlying repository fails.
62 async fn shutdown_dead_workers(
63 &mut self,
64 clock: &dyn Clock,
65 threshold: Duration,
66 ) -> Result<(), Self::Error>;
67
68 /// Remove the leader lease if it is expired, sending a notification to
69 /// trigger a new leader election.
70 ///
71 /// # Errors
72 ///
73 /// Returns an error if the underlying repository fails.
74 async fn remove_leader_lease_if_expired(
75 &mut self,
76 clock: &dyn Clock,
77 ) -> Result<(), Self::Error>;
78
79 /// Try to get the leader lease, renewing it if we already have it
80 ///
81 /// Returns `true` if we got the leader lease, `false` if we didn't
82 ///
83 /// # Errors
84 ///
85 /// Returns an error if the underlying repository fails.
86 async fn try_get_leader_lease(
87 &mut self,
88 clock: &dyn Clock,
89 worker: &Worker,
90 ) -> Result<bool, Self::Error>;
91}
92
93repository_impl!(QueueWorkerRepository:
94 async fn register(
95 &mut self,
96 rng: &mut (dyn RngCore + Send),
97 clock: &dyn Clock,
98 ) -> Result<Worker, Self::Error>;
99
100 async fn heartbeat(
101 &mut self,
102 clock: &dyn Clock,
103 worker: &Worker,
104 ) -> Result<(), Self::Error>;
105
106 async fn shutdown(
107 &mut self,
108 clock: &dyn Clock,
109 worker: &Worker,
110 ) -> Result<(), Self::Error>;
111
112 async fn shutdown_dead_workers(
113 &mut self,
114 clock: &dyn Clock,
115 threshold: Duration,
116 ) -> Result<(), Self::Error>;
117
118 async fn remove_leader_lease_if_expired(
119 &mut self,
120 clock: &dyn Clock,
121 ) -> Result<(), Self::Error>;
122
123 async fn try_get_leader_lease(
124 &mut self,
125 clock: &dyn Clock,
126 worker: &Worker,
127 ) -> Result<bool, Self::Error>;
128);