mas_storage_pg/
app_session.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{
11    Clock, CompatSession, CompatSessionState, Device, Session, SessionState, User,
12};
13use mas_storage::{
14    Page, Pagination,
15    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
16    compat::CompatSessionFilter,
17    oauth2::OAuth2SessionFilter,
18};
19use oauth2_types::scope::{Scope, ScopeToken};
20use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
21use sea_query::{
22    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
23};
24use sea_query_binder::SqlxBinder;
25use sqlx::PgConnection;
26use tracing::Instrument;
27use ulid::Ulid;
28use uuid::Uuid;
29
30use crate::{
31    DatabaseError, ExecuteExt,
32    errors::DatabaseInconsistencyError,
33    filter::StatementExt,
34    iden::{CompatSessions, OAuth2Sessions},
35    pagination::QueryBuilderExt,
36};
37
38/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
39pub struct PgAppSessionRepository<'c> {
40    conn: &'c mut PgConnection,
41}
42
43impl<'c> PgAppSessionRepository<'c> {
44    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
45    /// connection
46    pub fn new(conn: &'c mut PgConnection) -> Self {
47        Self { conn }
48    }
49}
50
51mod priv_ {
52    // The enum_def macro generates a public enum, which we don't want, because it
53    // triggers the missing docs warning
54
55    use std::net::IpAddr;
56
57    use chrono::{DateTime, Utc};
58    use sea_query::enum_def;
59    use uuid::Uuid;
60
61    #[derive(sqlx::FromRow)]
62    #[enum_def]
63    pub(super) struct AppSessionLookup {
64        pub(super) cursor: Uuid,
65        pub(super) compat_session_id: Option<Uuid>,
66        pub(super) oauth2_session_id: Option<Uuid>,
67        pub(super) oauth2_client_id: Option<Uuid>,
68        pub(super) user_session_id: Option<Uuid>,
69        pub(super) user_id: Option<Uuid>,
70        pub(super) scope_list: Option<Vec<String>>,
71        pub(super) device_id: Option<String>,
72        pub(super) human_name: Option<String>,
73        pub(super) created_at: DateTime<Utc>,
74        pub(super) finished_at: Option<DateTime<Utc>>,
75        pub(super) is_synapse_admin: Option<bool>,
76        pub(super) user_agent: Option<String>,
77        pub(super) last_active_at: Option<DateTime<Utc>>,
78        pub(super) last_active_ip: Option<IpAddr>,
79    }
80}
81
82use priv_::{AppSessionLookup, AppSessionLookupIden};
83
84impl TryFrom<AppSessionLookup> for AppSession {
85    type Error = DatabaseError;
86
87    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
88        // This is annoying to do, but we have to match on all the fields to determine
89        // whether it's a compat session or an oauth2 session
90        let AppSessionLookup {
91            cursor,
92            compat_session_id,
93            oauth2_session_id,
94            oauth2_client_id,
95            user_session_id,
96            user_id,
97            scope_list,
98            device_id,
99            human_name,
100            created_at,
101            finished_at,
102            is_synapse_admin,
103            user_agent,
104            last_active_at,
105            last_active_ip,
106        } = value;
107
108        let user_session_id = user_session_id.map(Ulid::from);
109
110        match (
111            compat_session_id,
112            oauth2_session_id,
113            oauth2_client_id,
114            user_id,
115            scope_list,
116            device_id,
117            is_synapse_admin,
118        ) {
119            (
120                Some(compat_session_id),
121                None,
122                None,
123                Some(user_id),
124                None,
125                device_id_opt,
126                Some(is_synapse_admin),
127            ) => {
128                let id = compat_session_id.into();
129                let device = device_id_opt
130                    .map(Device::try_from)
131                    .transpose()
132                    .map_err(|e| {
133                        DatabaseInconsistencyError::on("compat_sessions")
134                            .column("device_id")
135                            .row(id)
136                            .source(e)
137                    })?;
138
139                let state = match finished_at {
140                    None => CompatSessionState::Valid,
141                    Some(finished_at) => CompatSessionState::Finished { finished_at },
142                };
143
144                let session = CompatSession {
145                    id,
146                    state,
147                    user_id: user_id.into(),
148                    device,
149                    human_name,
150                    user_session_id,
151                    created_at,
152                    is_synapse_admin,
153                    user_agent,
154                    last_active_at,
155                    last_active_ip,
156                };
157
158                Ok(AppSession::Compat(Box::new(session)))
159            }
160
161            (
162                None,
163                Some(oauth2_session_id),
164                Some(oauth2_client_id),
165                user_id,
166                Some(scope_list),
167                None,
168                None,
169            ) => {
170                let id = oauth2_session_id.into();
171                let scope: Result<Scope, _> =
172                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
173                let scope = scope.map_err(|e| {
174                    DatabaseInconsistencyError::on("oauth2_sessions")
175                        .column("scope")
176                        .row(id)
177                        .source(e)
178                })?;
179
180                let state = match value.finished_at {
181                    None => SessionState::Valid,
182                    Some(finished_at) => SessionState::Finished { finished_at },
183                };
184
185                let session = Session {
186                    id,
187                    state,
188                    created_at,
189                    client_id: oauth2_client_id.into(),
190                    user_id: user_id.map(Ulid::from),
191                    user_session_id,
192                    scope,
193                    user_agent,
194                    last_active_at,
195                    last_active_ip,
196                    human_name,
197                };
198
199                Ok(AppSession::OAuth2(Box::new(session)))
200            }
201
202            _ => Err(DatabaseInconsistencyError::on("sessions")
203                .row(cursor.into())
204                .into()),
205        }
206    }
207}
208
209/// Split a [`AppSessionFilter`] into two separate filters: a
210/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
211fn split_filter(
212    filter: AppSessionFilter<'_>,
213) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
214    let mut compat_filter = CompatSessionFilter::new();
215    let mut oauth2_filter = OAuth2SessionFilter::new();
216
217    if let Some(user) = filter.user() {
218        compat_filter = compat_filter.for_user(user);
219        oauth2_filter = oauth2_filter.for_user(user);
220    }
221
222    match filter.state() {
223        Some(AppSessionState::Active) => {
224            compat_filter = compat_filter.active_only();
225            oauth2_filter = oauth2_filter.active_only();
226        }
227        Some(AppSessionState::Finished) => {
228            compat_filter = compat_filter.finished_only();
229            oauth2_filter = oauth2_filter.finished_only();
230        }
231        None => {}
232    }
233
234    if let Some(device) = filter.device() {
235        compat_filter = compat_filter.for_device(device);
236        oauth2_filter = oauth2_filter.for_device(device);
237    }
238
239    if let Some(browser_session) = filter.browser_session() {
240        compat_filter = compat_filter.for_browser_session(browser_session);
241        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
242    }
243
244    if let Some(last_active_before) = filter.last_active_before() {
245        compat_filter = compat_filter.with_last_active_before(last_active_before);
246        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
247    }
248
249    if let Some(last_active_after) = filter.last_active_after() {
250        compat_filter = compat_filter.with_last_active_after(last_active_after);
251        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
252    }
253
254    (compat_filter, oauth2_filter)
255}
256
257#[async_trait]
258impl AppSessionRepository for PgAppSessionRepository<'_> {
259    type Error = DatabaseError;
260
261    #[tracing::instrument(
262        name = "db.app_session.list",
263        fields(
264            db.query.text,
265        ),
266        skip_all,
267        err,
268    )]
269    async fn list(
270        &mut self,
271        filter: AppSessionFilter<'_>,
272        pagination: Pagination,
273    ) -> Result<Page<AppSession>, Self::Error> {
274        let (compat_filter, oauth2_filter) = split_filter(filter);
275
276        let mut oauth2_session_select = Query::select()
277            .expr_as(
278                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
279                AppSessionLookupIden::Cursor,
280            )
281            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
282            .expr_as(
283                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
284                AppSessionLookupIden::Oauth2SessionId,
285            )
286            .expr_as(
287                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
288                AppSessionLookupIden::Oauth2ClientId,
289            )
290            .expr_as(
291                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
292                AppSessionLookupIden::UserSessionId,
293            )
294            .expr_as(
295                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
296                AppSessionLookupIden::UserId,
297            )
298            .expr_as(
299                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
300                AppSessionLookupIden::ScopeList,
301            )
302            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
303            .expr_as(
304                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::HumanName)),
305                AppSessionLookupIden::HumanName,
306            )
307            .expr_as(
308                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
309                AppSessionLookupIden::CreatedAt,
310            )
311            .expr_as(
312                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
313                AppSessionLookupIden::FinishedAt,
314            )
315            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
316            .expr_as(
317                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
318                AppSessionLookupIden::UserAgent,
319            )
320            .expr_as(
321                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
322                AppSessionLookupIden::LastActiveAt,
323            )
324            .expr_as(
325                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
326                AppSessionLookupIden::LastActiveIp,
327            )
328            .from(OAuth2Sessions::Table)
329            .apply_filter(oauth2_filter)
330            .clone();
331
332        let compat_session_select = Query::select()
333            .expr_as(
334                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
335                AppSessionLookupIden::Cursor,
336            )
337            .expr_as(
338                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
339                AppSessionLookupIden::CompatSessionId,
340            )
341            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
342            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
343            .expr_as(
344                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
345                AppSessionLookupIden::UserSessionId,
346            )
347            .expr_as(
348                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
349                AppSessionLookupIden::UserId,
350            )
351            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
352            .expr_as(
353                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
354                AppSessionLookupIden::DeviceId,
355            )
356            .expr_as(
357                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
358                AppSessionLookupIden::HumanName,
359            )
360            .expr_as(
361                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
362                AppSessionLookupIden::CreatedAt,
363            )
364            .expr_as(
365                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
366                AppSessionLookupIden::FinishedAt,
367            )
368            .expr_as(
369                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
370                AppSessionLookupIden::IsSynapseAdmin,
371            )
372            .expr_as(
373                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
374                AppSessionLookupIden::UserAgent,
375            )
376            .expr_as(
377                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
378                AppSessionLookupIden::LastActiveAt,
379            )
380            .expr_as(
381                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
382                AppSessionLookupIden::LastActiveIp,
383            )
384            .from(CompatSessions::Table)
385            .apply_filter(compat_filter)
386            .clone();
387
388        let common_table_expression = CommonTableExpression::new()
389            .query(
390                oauth2_session_select
391                    .union(UnionType::All, compat_session_select)
392                    .clone(),
393            )
394            .table_name(Alias::new("sessions"))
395            .clone();
396
397        let with_clause = Query::with().cte(common_table_expression).clone();
398
399        let select = Query::select()
400            .column(ColumnRef::Asterisk)
401            .from(Alias::new("sessions"))
402            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
403            .clone();
404
405        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
406
407        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
408            .traced()
409            .fetch_all(&mut *self.conn)
410            .await?;
411
412        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
413
414        Ok(page)
415    }
416
417    #[tracing::instrument(
418        name = "db.app_session.count",
419        fields(
420            db.query.text,
421        ),
422        skip_all,
423        err,
424    )]
425    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
426        let (compat_filter, oauth2_filter) = split_filter(filter);
427        let mut oauth2_session_select = Query::select()
428            .expr(Expr::cust("1"))
429            .from(OAuth2Sessions::Table)
430            .apply_filter(oauth2_filter)
431            .clone();
432
433        let compat_session_select = Query::select()
434            .expr(Expr::cust("1"))
435            .from(CompatSessions::Table)
436            .apply_filter(compat_filter)
437            .clone();
438
439        let common_table_expression = CommonTableExpression::new()
440            .query(
441                oauth2_session_select
442                    .union(UnionType::All, compat_session_select)
443                    .clone(),
444            )
445            .table_name(Alias::new("sessions"))
446            .clone();
447
448        let with_clause = Query::with().cte(common_table_expression).clone();
449
450        let select = Query::select()
451            .expr(Expr::cust("COUNT(*)"))
452            .from(Alias::new("sessions"))
453            .clone();
454
455        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
456
457        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
458            .traced()
459            .fetch_one(&mut *self.conn)
460            .await?;
461
462        count
463            .try_into()
464            .map_err(DatabaseError::to_invalid_operation)
465    }
466
467    #[tracing::instrument(
468        name = "db.app_session.finish_sessions_to_replace_device",
469        fields(
470            db.query.text,
471            %user.id,
472            %device_id = device.as_str()
473        ),
474        skip_all,
475        err,
476    )]
477    async fn finish_sessions_to_replace_device(
478        &mut self,
479        clock: &dyn Clock,
480        user: &User,
481        device: &Device,
482    ) -> Result<(), Self::Error> {
483        // TODO need to invoke this from all the oauth2 login sites
484        let span = tracing::info_span!(
485            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
486            { DB_QUERY_TEXT } = tracing::field::Empty,
487        );
488        let finished_at = clock.now();
489        sqlx::query!(
490            "
491                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
492            ",
493            Uuid::from(user.id),
494            device.as_str(),
495            finished_at
496        )
497        .record(&span)
498        .execute(&mut *self.conn)
499        .instrument(span)
500        .await?;
501
502        if let Ok([stable_device_as_scope_token, unstable_device_as_scope_token]) =
503            device.to_scope_token()
504        {
505            let span = tracing::info_span!(
506                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
507                { DB_QUERY_TEXT } = tracing::field::Empty,
508            );
509            sqlx::query!(
510                "
511                    UPDATE oauth2_sessions
512                    SET finished_at = $4
513                    WHERE user_id = $1
514                      AND ($2 = ANY(scope_list) OR $3 = ANY(scope_list))
515                      AND finished_at IS NULL
516                ",
517                Uuid::from(user.id),
518                stable_device_as_scope_token.as_str(),
519                unstable_device_as_scope_token.as_str(),
520                finished_at
521            )
522            .record(&span)
523            .execute(&mut *self.conn)
524            .instrument(span)
525            .await?;
526        }
527
528        Ok(())
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use chrono::Duration;
535    use mas_data_model::{Device, clock::MockClock};
536    use mas_storage::{
537        Pagination, RepositoryAccess,
538        app_session::{AppSession, AppSessionFilter},
539        oauth2::OAuth2SessionRepository,
540    };
541    use oauth2_types::{
542        requests::GrantType,
543        scope::{OPENID, Scope},
544    };
545    use rand::SeedableRng;
546    use rand_chacha::ChaChaRng;
547    use sqlx::PgPool;
548
549    use crate::PgRepository;
550
551    #[sqlx::test(migrator = "crate::MIGRATOR")]
552    async fn test_app_repo(pool: PgPool) {
553        let mut rng = ChaChaRng::seed_from_u64(42);
554        let clock = MockClock::default();
555        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
556
557        // Create a user
558        let user = repo
559            .user()
560            .add(&mut rng, &clock, "john".to_owned())
561            .await
562            .unwrap();
563
564        let all = AppSessionFilter::new().for_user(&user);
565        let active = all.active_only();
566        let finished = all.finished_only();
567        let pagination = Pagination::first(10);
568
569        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
570        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
571        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
572
573        let full_list = repo.app_session().list(all, pagination).await.unwrap();
574        assert!(full_list.edges.is_empty());
575        let active_list = repo.app_session().list(active, pagination).await.unwrap();
576        assert!(active_list.edges.is_empty());
577        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
578        assert!(finished_list.edges.is_empty());
579
580        // Start a compat session for that user
581        let device = Device::generate(&mut rng);
582        let compat_session = repo
583            .compat_session()
584            .add(&mut rng, &clock, &user, device.clone(), None, false, None)
585            .await
586            .unwrap();
587
588        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
589        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
590        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
591
592        let full_list = repo.app_session().list(all, pagination).await.unwrap();
593        assert_eq!(full_list.edges.len(), 1);
594        assert_eq!(
595            full_list.edges[0],
596            AppSession::Compat(Box::new(compat_session.clone()))
597        );
598        let active_list = repo.app_session().list(active, pagination).await.unwrap();
599        assert_eq!(active_list.edges.len(), 1);
600        assert_eq!(
601            active_list.edges[0],
602            AppSession::Compat(Box::new(compat_session.clone()))
603        );
604        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
605        assert!(finished_list.edges.is_empty());
606
607        // Finish the session
608        let compat_session = repo
609            .compat_session()
610            .finish(&clock, compat_session)
611            .await
612            .unwrap();
613
614        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
615        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
616        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
617
618        let full_list = repo.app_session().list(all, pagination).await.unwrap();
619        assert_eq!(full_list.edges.len(), 1);
620        assert_eq!(
621            full_list.edges[0],
622            AppSession::Compat(Box::new(compat_session.clone()))
623        );
624        let active_list = repo.app_session().list(active, pagination).await.unwrap();
625        assert!(active_list.edges.is_empty());
626        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
627        assert_eq!(finished_list.edges.len(), 1);
628        assert_eq!(
629            finished_list.edges[0],
630            AppSession::Compat(Box::new(compat_session.clone()))
631        );
632
633        // Start an OAuth2 session
634        let client = repo
635            .oauth2_client()
636            .add(
637                &mut rng,
638                &clock,
639                vec!["https://example.com/redirect".parse().unwrap()],
640                None,
641                None,
642                None,
643                vec![GrantType::AuthorizationCode],
644                Some("First client".to_owned()),
645                Some("https://example.com/logo.png".parse().unwrap()),
646                Some("https://example.com/".parse().unwrap()),
647                Some("https://example.com/policy".parse().unwrap()),
648                Some("https://example.com/tos".parse().unwrap()),
649                Some("https://example.com/jwks.json".parse().unwrap()),
650                None,
651                None,
652                None,
653                None,
654                None,
655                Some("https://example.com/login".parse().unwrap()),
656            )
657            .await
658            .unwrap();
659
660        let device2 = Device::generate(&mut rng);
661        let scope: Scope = [OPENID]
662            .into_iter()
663            .chain(device2.to_scope_token().unwrap().into_iter())
664            .collect();
665
666        // We're moving the clock forward by 1 minute between each session to ensure
667        // we're getting consistent ordering in lists.
668        clock.advance(Duration::try_minutes(1).unwrap());
669
670        let oauth_session = repo
671            .oauth2_session()
672            .add(&mut rng, &clock, &client, Some(&user), None, scope)
673            .await
674            .unwrap();
675
676        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
677        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
678        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
679
680        let full_list = repo.app_session().list(all, pagination).await.unwrap();
681        assert_eq!(full_list.edges.len(), 2);
682        assert_eq!(
683            full_list.edges[0],
684            AppSession::Compat(Box::new(compat_session.clone()))
685        );
686        assert_eq!(
687            full_list.edges[1],
688            AppSession::OAuth2(Box::new(oauth_session.clone()))
689        );
690
691        let active_list = repo.app_session().list(active, pagination).await.unwrap();
692        assert_eq!(active_list.edges.len(), 1);
693        assert_eq!(
694            active_list.edges[0],
695            AppSession::OAuth2(Box::new(oauth_session.clone()))
696        );
697
698        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
699        assert_eq!(finished_list.edges.len(), 1);
700        assert_eq!(
701            finished_list.edges[0],
702            AppSession::Compat(Box::new(compat_session.clone()))
703        );
704
705        // Finish the session
706        let oauth_session = repo
707            .oauth2_session()
708            .finish(&clock, oauth_session)
709            .await
710            .unwrap();
711
712        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
713        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
714        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
715
716        let full_list = repo.app_session().list(all, pagination).await.unwrap();
717        assert_eq!(full_list.edges.len(), 2);
718        assert_eq!(
719            full_list.edges[0],
720            AppSession::Compat(Box::new(compat_session.clone()))
721        );
722        assert_eq!(
723            full_list.edges[1],
724            AppSession::OAuth2(Box::new(oauth_session.clone()))
725        );
726
727        let active_list = repo.app_session().list(active, pagination).await.unwrap();
728        assert!(active_list.edges.is_empty());
729
730        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
731        assert_eq!(finished_list.edges.len(), 2);
732        assert_eq!(
733            finished_list.edges[0],
734            AppSession::Compat(Box::new(compat_session.clone()))
735        );
736        assert_eq!(
737            full_list.edges[1],
738            AppSession::OAuth2(Box::new(oauth_session.clone()))
739        );
740
741        // Query by device
742        let filter = AppSessionFilter::new().for_device(&device);
743        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
744        let list = repo.app_session().list(filter, pagination).await.unwrap();
745        assert_eq!(list.edges.len(), 1);
746        assert_eq!(
747            list.edges[0],
748            AppSession::Compat(Box::new(compat_session.clone()))
749        );
750
751        let filter = AppSessionFilter::new().for_device(&device2);
752        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
753        let list = repo.app_session().list(filter, pagination).await.unwrap();
754        assert_eq!(list.edges.len(), 1);
755        assert_eq!(
756            list.edges[0],
757            AppSession::OAuth2(Box::new(oauth_session.clone()))
758        );
759
760        // Create a second user
761        let user2 = repo
762            .user()
763            .add(&mut rng, &clock, "alice".to_owned())
764            .await
765            .unwrap();
766
767        // If we list/count for this user, we should get nothing
768        let filter = AppSessionFilter::new().for_user(&user2);
769        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
770        let list = repo.app_session().list(filter, pagination).await.unwrap();
771        assert!(list.edges.is_empty());
772    }
773}