mas_storage/queue/
worker.rs

1// Copyright 2024, 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
4// Please see LICENSE files in the repository root for full details.
5
6//! Repository to interact with workers in the job queue
7
8use async_trait::async_trait;
9use chrono::Duration;
10use mas_data_model::Clock;
11use rand_core::RngCore;
12use ulid::Ulid;
13
14use crate::repository_impl;
15
16/// A worker is an entity which can execute jobs.
17pub struct Worker {
18    /// The ID of the worker.
19    pub id: Ulid,
20}
21
22/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
23/// worker.
24#[async_trait]
25pub trait QueueWorkerRepository: Send + Sync {
26    /// The error type returned by the repository.
27    type Error;
28
29    /// Register a new worker.
30    ///
31    /// Returns a reference to the worker.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if the underlying repository fails.
36    async fn register(
37        &mut self,
38        rng: &mut (dyn RngCore + Send),
39        clock: &dyn Clock,
40    ) -> Result<Worker, Self::Error>;
41
42    /// Send a heartbeat for the given worker.
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if the underlying repository fails or if the worker was
47    /// shutdown.
48    async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
49
50    /// Mark the given worker as shutdown.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the underlying repository fails.
55    async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
56
57    /// Find dead workers and shut them down.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the underlying repository fails.
62    async fn shutdown_dead_workers(
63        &mut self,
64        clock: &dyn Clock,
65        threshold: Duration,
66    ) -> Result<(), Self::Error>;
67
68    /// Remove the leader lease if it is expired, sending a notification to
69    /// trigger a new leader election.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the underlying repository fails.
74    async fn remove_leader_lease_if_expired(
75        &mut self,
76        clock: &dyn Clock,
77    ) -> Result<(), Self::Error>;
78
79    /// Try to get the leader lease, renewing it if we already have it
80    ///
81    /// Returns `true` if we got the leader lease, `false` if we didn't
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the underlying repository fails.
86    async fn try_get_leader_lease(
87        &mut self,
88        clock: &dyn Clock,
89        worker: &Worker,
90    ) -> Result<bool, Self::Error>;
91}
92
93repository_impl!(QueueWorkerRepository:
94    async fn register(
95        &mut self,
96        rng: &mut (dyn RngCore + Send),
97        clock: &dyn Clock,
98    ) -> Result<Worker, Self::Error>;
99
100    async fn heartbeat(
101        &mut self,
102        clock: &dyn Clock,
103        worker: &Worker,
104    ) -> Result<(), Self::Error>;
105
106    async fn shutdown(
107        &mut self,
108        clock: &dyn Clock,
109        worker: &Worker,
110    ) -> Result<(), Self::Error>;
111
112    async fn shutdown_dead_workers(
113        &mut self,
114        clock: &dyn Clock,
115        threshold: Duration,
116    ) -> Result<(), Self::Error>;
117
118    async fn remove_leader_lease_if_expired(
119        &mut self,
120        clock: &dyn Clock,
121    ) -> Result<(), Self::Error>;
122
123    async fn try_get_leader_lease(
124        &mut self,
125        clock: &dyn Clock,
126        worker: &Worker,
127    ) -> Result<bool, Self::Error>;
128);