mas_storage/queue/
job.rs

1// Copyright 2024, 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
4// Please see LICENSE files in the repository root for full details.
5
6//! Repository to interact with jobs in the job queue
7
8use async_trait::async_trait;
9use chrono::{DateTime, Duration, Utc};
10use mas_data_model::Clock;
11use opentelemetry::trace::TraceContextExt;
12use rand_core::RngCore;
13use serde::{Deserialize, Serialize};
14use tracing_opentelemetry::OpenTelemetrySpanExt;
15use ulid::Ulid;
16
17use super::Worker;
18use crate::repository_impl;
19
20/// Represents a job in the job queue
21pub struct Job {
22    /// The ID of the job
23    pub id: Ulid,
24
25    /// The queue on which the job was placed
26    pub queue_name: String,
27
28    /// The payload of the job
29    pub payload: serde_json::Value,
30
31    /// Arbitrary metadata about the job
32    pub metadata: JobMetadata,
33
34    /// Which attempt it is
35    pub attempt: usize,
36}
37
38/// Metadata stored alongside the job
39#[derive(Serialize, Deserialize, Default, Clone, Debug)]
40pub struct JobMetadata {
41    #[serde(default)]
42    trace_id: String,
43
44    #[serde(default)]
45    span_id: String,
46
47    #[serde(default)]
48    trace_flags: u8,
49}
50
51impl JobMetadata {
52    fn new(span_context: &opentelemetry::trace::SpanContext) -> Self {
53        Self {
54            trace_id: span_context.trace_id().to_string(),
55            span_id: span_context.span_id().to_string(),
56            trace_flags: span_context.trace_flags().to_u8(),
57        }
58    }
59
60    /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`]
61    #[must_use]
62    pub fn span_context(&self) -> opentelemetry::trace::SpanContext {
63        use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
64        SpanContext::new(
65            TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID),
66            SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID),
67            TraceFlags::new(self.trace_flags),
68            // Trace context is remote, as it comes from another service/from the database
69            true,
70            TraceState::NONE,
71        )
72    }
73}
74
75/// A trait that represents a job which can be inserted into a queue
76pub trait InsertableJob: Serialize + Send {
77    /// The name of the queue this job belongs to
78    const QUEUE_NAME: &'static str;
79}
80
81/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
82/// worker.
83#[async_trait]
84pub trait QueueJobRepository: Send + Sync {
85    /// The error type returned by the repository.
86    type Error;
87
88    /// Schedule a job to be executed as soon as possible by a worker.
89    ///
90    /// # Parameters
91    ///
92    /// * `rng` - The random number generator used to generate a new job ID
93    /// * `clock` - The clock used to generate timestamps
94    /// * `queue_name` - The name of the queue to schedule the job on
95    /// * `payload` - The payload of the job
96    /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the underlying repository fails.
101    async fn schedule(
102        &mut self,
103        rng: &mut (dyn RngCore + Send),
104        clock: &dyn Clock,
105        queue_name: &str,
106        payload: serde_json::Value,
107        metadata: serde_json::Value,
108    ) -> Result<(), Self::Error>;
109
110    /// Schedule a job to be executed at a later date by a worker.
111    ///
112    /// # Parameters
113    ///
114    /// * `rng` - The random number generator used to generate a new job ID
115    /// * `clock` - The clock used to generate timestamps
116    /// * `queue_name` - The name of the queue to schedule the job on
117    /// * `payload` - The payload of the job
118    /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
119    /// * `scheduled_at` - The date and time to schedule the job for
120    /// * `schedule_name` - The name of the recurring schedule which scheduled
121    ///   this job
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the underlying repository fails.
126    #[allow(clippy::too_many_arguments)]
127    async fn schedule_later(
128        &mut self,
129        rng: &mut (dyn RngCore + Send),
130        clock: &dyn Clock,
131        queue_name: &str,
132        payload: serde_json::Value,
133        metadata: serde_json::Value,
134        scheduled_at: DateTime<Utc>,
135        schedule_name: Option<&str>,
136    ) -> Result<(), Self::Error>;
137
138    /// Reserve multiple jobs from multiple queues
139    ///
140    /// # Parameters
141    ///
142    /// * `clock` - The clock used to generate timestamps
143    /// * `worker` - The worker that is reserving the jobs
144    /// * `queues` - The queues to reserve jobs from
145    /// * `count` - The number of jobs to reserve
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the underlying repository fails.
150    async fn reserve(
151        &mut self,
152        clock: &dyn Clock,
153        worker: &Worker,
154        queues: &[&str],
155        count: usize,
156    ) -> Result<Vec<Job>, Self::Error>;
157
158    /// Mark a job as completed
159    ///
160    /// # Parameters
161    ///
162    /// * `clock` - The clock used to generate timestamps
163    /// * `id` - The ID of the job to mark as completed
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the underlying repository fails.
168    async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
169
170    /// Marks a job as failed.
171    ///
172    /// # Parameters
173    ///
174    /// * `clock` - The clock used to generate timestamps
175    /// * `id` - The ID of the job to mark as failed
176    /// * `reason` - The reason for the failure
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the underlying repository fails.
181    async fn mark_as_failed(
182        &mut self,
183        clock: &dyn Clock,
184        id: Ulid,
185        reason: &str,
186    ) -> Result<(), Self::Error>;
187
188    /// Retry a job.
189    ///
190    /// # Parameters
191    ///
192    /// * `rng` - The random number generator used to generate a new job ID
193    /// * `clock` - The clock used to generate timestamps
194    /// * `id` - The ID of the job to reschedule
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the underlying repository fails.
199    async fn retry(
200        &mut self,
201        rng: &mut (dyn RngCore + Send),
202        clock: &dyn Clock,
203        id: Ulid,
204        delay: Duration,
205    ) -> Result<(), Self::Error>;
206
207    /// Mark all scheduled jobs past their scheduled date as available to be
208    /// executed.
209    ///
210    /// Returns the number of jobs that were marked as available.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the underlying repository fails.
215    async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
216}
217
218repository_impl!(QueueJobRepository:
219    async fn schedule(
220        &mut self,
221        rng: &mut (dyn RngCore + Send),
222        clock: &dyn Clock,
223        queue_name: &str,
224        payload: serde_json::Value,
225        metadata: serde_json::Value,
226    ) -> Result<(), Self::Error>;
227
228    async fn schedule_later(
229        &mut self,
230        rng: &mut (dyn RngCore + Send),
231        clock: &dyn Clock,
232        queue_name: &str,
233        payload: serde_json::Value,
234        metadata: serde_json::Value,
235        scheduled_at: DateTime<Utc>,
236        schedule_name: Option<&str>,
237    ) -> Result<(), Self::Error>;
238
239    async fn reserve(
240        &mut self,
241        clock: &dyn Clock,
242        worker: &Worker,
243        queues: &[&str],
244        count: usize,
245    ) -> Result<Vec<Job>, Self::Error>;
246
247    async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
248
249    async fn mark_as_failed(&mut self,
250        clock: &dyn Clock,
251        id: Ulid,
252        reason: &str,
253    ) -> Result<(), Self::Error>;
254
255    async fn retry(
256        &mut self,
257        rng: &mut (dyn RngCore + Send),
258        clock: &dyn Clock,
259        id: Ulid,
260        delay: Duration,
261    ) -> Result<(), Self::Error>;
262
263    async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
264);
265
266/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
267/// through the [`InsertableJob`] trait. This isn't in the
268/// [`QueueJobRepository`] trait to keep it object safe.
269#[async_trait]
270pub trait QueueJobRepositoryExt: QueueJobRepository {
271    /// Schedule a job to be executed as soon as possible by a worker.
272    ///
273    /// # Parameters
274    ///
275    /// * `rng` - The random number generator used to generate a new job ID
276    /// * `clock` - The clock used to generate timestamps
277    /// * `job` - The job to schedule
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if the underlying repository fails.
282    async fn schedule_job<J: InsertableJob>(
283        &mut self,
284        rng: &mut (dyn RngCore + Send),
285        clock: &dyn Clock,
286        job: J,
287    ) -> Result<(), Self::Error>;
288
289    /// Schedule a job to be executed at a later date by a worker.
290    ///
291    /// # Parameters
292    ///
293    /// * `rng` - The random number generator used to generate a new job ID
294    /// * `clock` - The clock used to generate timestamps
295    /// * `job` - The job to schedule
296    /// * `scheduled_at` - The date and time to schedule the job for
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the underlying repository fails.
301    async fn schedule_job_later<J: InsertableJob>(
302        &mut self,
303        rng: &mut (dyn RngCore + Send),
304        clock: &dyn Clock,
305        job: J,
306        scheduled_at: DateTime<Utc>,
307    ) -> Result<(), Self::Error>;
308}
309
310#[async_trait]
311impl<T> QueueJobRepositoryExt for T
312where
313    T: QueueJobRepository,
314{
315    #[tracing::instrument(
316        name = "db.queue_job.schedule_job",
317        fields(
318            queue_job.queue_name = J::QUEUE_NAME,
319        ),
320        skip_all,
321    )]
322    async fn schedule_job<J: InsertableJob>(
323        &mut self,
324        rng: &mut (dyn RngCore + Send),
325        clock: &dyn Clock,
326        job: J,
327    ) -> Result<(), Self::Error> {
328        // Grab the span context from the current span
329        let span = tracing::Span::current();
330        let ctx = span.context();
331        let span = ctx.span();
332        let span_context = span.span_context();
333
334        let metadata = JobMetadata::new(span_context);
335        let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
336
337        let payload = serde_json::to_value(job).expect("Could not serialize job");
338        self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
339            .await
340    }
341
342    #[tracing::instrument(
343        name = "db.queue_job.schedule_job_later",
344        fields(
345            queue_job.queue_name = J::QUEUE_NAME,
346        ),
347        skip_all,
348    )]
349    async fn schedule_job_later<J: InsertableJob>(
350        &mut self,
351        rng: &mut (dyn RngCore + Send),
352        clock: &dyn Clock,
353        job: J,
354        scheduled_at: DateTime<Utc>,
355    ) -> Result<(), Self::Error> {
356        // Grab the span context from the current span
357        let span = tracing::Span::current();
358        let ctx = span.context();
359        let span = ctx.span();
360        let span_context = span.span_context();
361
362        let metadata = JobMetadata::new(span_context);
363        let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
364
365        let payload = serde_json::to_value(job).expect("Could not serialize job");
366        self.schedule_later(
367            rng,
368            clock,
369            J::QUEUE_NAME,
370            payload,
371            metadata,
372            scheduled_at,
373            None,
374        )
375        .await
376    }
377}