mas_storage/queue/job.rs
1// Copyright 2024, 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
4// Please see LICENSE files in the repository root for full details.
5
6//! Repository to interact with jobs in the job queue
7
8use async_trait::async_trait;
9use chrono::{DateTime, Duration, Utc};
10use mas_data_model::Clock;
11use opentelemetry::trace::TraceContextExt;
12use rand_core::RngCore;
13use serde::{Deserialize, Serialize};
14use tracing_opentelemetry::OpenTelemetrySpanExt;
15use ulid::Ulid;
16
17use super::Worker;
18use crate::repository_impl;
19
20/// Represents a job in the job queue
21pub struct Job {
22 /// The ID of the job
23 pub id: Ulid,
24
25 /// The queue on which the job was placed
26 pub queue_name: String,
27
28 /// The payload of the job
29 pub payload: serde_json::Value,
30
31 /// Arbitrary metadata about the job
32 pub metadata: JobMetadata,
33
34 /// Which attempt it is
35 pub attempt: usize,
36}
37
38/// Metadata stored alongside the job
39#[derive(Serialize, Deserialize, Default, Clone, Debug)]
40pub struct JobMetadata {
41 #[serde(default)]
42 trace_id: String,
43
44 #[serde(default)]
45 span_id: String,
46
47 #[serde(default)]
48 trace_flags: u8,
49}
50
51impl JobMetadata {
52 fn new(span_context: &opentelemetry::trace::SpanContext) -> Self {
53 Self {
54 trace_id: span_context.trace_id().to_string(),
55 span_id: span_context.span_id().to_string(),
56 trace_flags: span_context.trace_flags().to_u8(),
57 }
58 }
59
60 /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`]
61 #[must_use]
62 pub fn span_context(&self) -> opentelemetry::trace::SpanContext {
63 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
64 SpanContext::new(
65 TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID),
66 SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID),
67 TraceFlags::new(self.trace_flags),
68 // Trace context is remote, as it comes from another service/from the database
69 true,
70 TraceState::NONE,
71 )
72 }
73}
74
75/// A trait that represents a job which can be inserted into a queue
76pub trait InsertableJob: Serialize + Send {
77 /// The name of the queue this job belongs to
78 const QUEUE_NAME: &'static str;
79}
80
81/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
82/// worker.
83#[async_trait]
84pub trait QueueJobRepository: Send + Sync {
85 /// The error type returned by the repository.
86 type Error;
87
88 /// Schedule a job to be executed as soon as possible by a worker.
89 ///
90 /// # Parameters
91 ///
92 /// * `rng` - The random number generator used to generate a new job ID
93 /// * `clock` - The clock used to generate timestamps
94 /// * `queue_name` - The name of the queue to schedule the job on
95 /// * `payload` - The payload of the job
96 /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
97 ///
98 /// # Errors
99 ///
100 /// Returns an error if the underlying repository fails.
101 async fn schedule(
102 &mut self,
103 rng: &mut (dyn RngCore + Send),
104 clock: &dyn Clock,
105 queue_name: &str,
106 payload: serde_json::Value,
107 metadata: serde_json::Value,
108 ) -> Result<(), Self::Error>;
109
110 /// Schedule a job to be executed at a later date by a worker.
111 ///
112 /// # Parameters
113 ///
114 /// * `rng` - The random number generator used to generate a new job ID
115 /// * `clock` - The clock used to generate timestamps
116 /// * `queue_name` - The name of the queue to schedule the job on
117 /// * `payload` - The payload of the job
118 /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
119 /// * `scheduled_at` - The date and time to schedule the job for
120 /// * `schedule_name` - The name of the recurring schedule which scheduled
121 /// this job
122 ///
123 /// # Errors
124 ///
125 /// Returns an error if the underlying repository fails.
126 #[allow(clippy::too_many_arguments)]
127 async fn schedule_later(
128 &mut self,
129 rng: &mut (dyn RngCore + Send),
130 clock: &dyn Clock,
131 queue_name: &str,
132 payload: serde_json::Value,
133 metadata: serde_json::Value,
134 scheduled_at: DateTime<Utc>,
135 schedule_name: Option<&str>,
136 ) -> Result<(), Self::Error>;
137
138 /// Reserve multiple jobs from multiple queues
139 ///
140 /// # Parameters
141 ///
142 /// * `clock` - The clock used to generate timestamps
143 /// * `worker` - The worker that is reserving the jobs
144 /// * `queues` - The queues to reserve jobs from
145 /// * `count` - The number of jobs to reserve
146 ///
147 /// # Errors
148 ///
149 /// Returns an error if the underlying repository fails.
150 async fn reserve(
151 &mut self,
152 clock: &dyn Clock,
153 worker: &Worker,
154 queues: &[&str],
155 count: usize,
156 ) -> Result<Vec<Job>, Self::Error>;
157
158 /// Mark a job as completed
159 ///
160 /// # Parameters
161 ///
162 /// * `clock` - The clock used to generate timestamps
163 /// * `id` - The ID of the job to mark as completed
164 ///
165 /// # Errors
166 ///
167 /// Returns an error if the underlying repository fails.
168 async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
169
170 /// Marks a job as failed.
171 ///
172 /// # Parameters
173 ///
174 /// * `clock` - The clock used to generate timestamps
175 /// * `id` - The ID of the job to mark as failed
176 /// * `reason` - The reason for the failure
177 ///
178 /// # Errors
179 ///
180 /// Returns an error if the underlying repository fails.
181 async fn mark_as_failed(
182 &mut self,
183 clock: &dyn Clock,
184 id: Ulid,
185 reason: &str,
186 ) -> Result<(), Self::Error>;
187
188 /// Retry a job.
189 ///
190 /// # Parameters
191 ///
192 /// * `rng` - The random number generator used to generate a new job ID
193 /// * `clock` - The clock used to generate timestamps
194 /// * `id` - The ID of the job to reschedule
195 ///
196 /// # Errors
197 ///
198 /// Returns an error if the underlying repository fails.
199 async fn retry(
200 &mut self,
201 rng: &mut (dyn RngCore + Send),
202 clock: &dyn Clock,
203 id: Ulid,
204 delay: Duration,
205 ) -> Result<(), Self::Error>;
206
207 /// Mark all scheduled jobs past their scheduled date as available to be
208 /// executed.
209 ///
210 /// Returns the number of jobs that were marked as available.
211 ///
212 /// # Errors
213 ///
214 /// Returns an error if the underlying repository fails.
215 async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
216}
217
218repository_impl!(QueueJobRepository:
219 async fn schedule(
220 &mut self,
221 rng: &mut (dyn RngCore + Send),
222 clock: &dyn Clock,
223 queue_name: &str,
224 payload: serde_json::Value,
225 metadata: serde_json::Value,
226 ) -> Result<(), Self::Error>;
227
228 async fn schedule_later(
229 &mut self,
230 rng: &mut (dyn RngCore + Send),
231 clock: &dyn Clock,
232 queue_name: &str,
233 payload: serde_json::Value,
234 metadata: serde_json::Value,
235 scheduled_at: DateTime<Utc>,
236 schedule_name: Option<&str>,
237 ) -> Result<(), Self::Error>;
238
239 async fn reserve(
240 &mut self,
241 clock: &dyn Clock,
242 worker: &Worker,
243 queues: &[&str],
244 count: usize,
245 ) -> Result<Vec<Job>, Self::Error>;
246
247 async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
248
249 async fn mark_as_failed(&mut self,
250 clock: &dyn Clock,
251 id: Ulid,
252 reason: &str,
253 ) -> Result<(), Self::Error>;
254
255 async fn retry(
256 &mut self,
257 rng: &mut (dyn RngCore + Send),
258 clock: &dyn Clock,
259 id: Ulid,
260 delay: Duration,
261 ) -> Result<(), Self::Error>;
262
263 async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
264);
265
266/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
267/// through the [`InsertableJob`] trait. This isn't in the
268/// [`QueueJobRepository`] trait to keep it object safe.
269#[async_trait]
270pub trait QueueJobRepositoryExt: QueueJobRepository {
271 /// Schedule a job to be executed as soon as possible by a worker.
272 ///
273 /// # Parameters
274 ///
275 /// * `rng` - The random number generator used to generate a new job ID
276 /// * `clock` - The clock used to generate timestamps
277 /// * `job` - The job to schedule
278 ///
279 /// # Errors
280 ///
281 /// Returns an error if the underlying repository fails.
282 async fn schedule_job<J: InsertableJob>(
283 &mut self,
284 rng: &mut (dyn RngCore + Send),
285 clock: &dyn Clock,
286 job: J,
287 ) -> Result<(), Self::Error>;
288
289 /// Schedule a job to be executed at a later date by a worker.
290 ///
291 /// # Parameters
292 ///
293 /// * `rng` - The random number generator used to generate a new job ID
294 /// * `clock` - The clock used to generate timestamps
295 /// * `job` - The job to schedule
296 /// * `scheduled_at` - The date and time to schedule the job for
297 ///
298 /// # Errors
299 ///
300 /// Returns an error if the underlying repository fails.
301 async fn schedule_job_later<J: InsertableJob>(
302 &mut self,
303 rng: &mut (dyn RngCore + Send),
304 clock: &dyn Clock,
305 job: J,
306 scheduled_at: DateTime<Utc>,
307 ) -> Result<(), Self::Error>;
308}
309
310#[async_trait]
311impl<T> QueueJobRepositoryExt for T
312where
313 T: QueueJobRepository,
314{
315 #[tracing::instrument(
316 name = "db.queue_job.schedule_job",
317 fields(
318 queue_job.queue_name = J::QUEUE_NAME,
319 ),
320 skip_all,
321 )]
322 async fn schedule_job<J: InsertableJob>(
323 &mut self,
324 rng: &mut (dyn RngCore + Send),
325 clock: &dyn Clock,
326 job: J,
327 ) -> Result<(), Self::Error> {
328 // Grab the span context from the current span
329 let span = tracing::Span::current();
330 let ctx = span.context();
331 let span = ctx.span();
332 let span_context = span.span_context();
333
334 let metadata = JobMetadata::new(span_context);
335 let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
336
337 let payload = serde_json::to_value(job).expect("Could not serialize job");
338 self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
339 .await
340 }
341
342 #[tracing::instrument(
343 name = "db.queue_job.schedule_job_later",
344 fields(
345 queue_job.queue_name = J::QUEUE_NAME,
346 ),
347 skip_all,
348 )]
349 async fn schedule_job_later<J: InsertableJob>(
350 &mut self,
351 rng: &mut (dyn RngCore + Send),
352 clock: &dyn Clock,
353 job: J,
354 scheduled_at: DateTime<Utc>,
355 ) -> Result<(), Self::Error> {
356 // Grab the span context from the current span
357 let span = tracing::Span::current();
358 let ctx = span.context();
359 let span = ctx.span();
360 let span_context = span.span_context();
361
362 let metadata = JobMetadata::new(span_context);
363 let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
364
365 let payload = serde_json::to_value(job).expect("Could not serialize job");
366 self.schedule_later(
367 rng,
368 clock,
369 J::QUEUE_NAME,
370 payload,
371 metadata,
372 scheduled_at,
373 None,
374 )
375 .await
376 }
377}