Skip to content

Commit 98c765c

Browse files
committed
storage: include PATs alongside personal sessions
1 parent 34b3462 commit 98c765c

File tree

5 files changed

+199
-27
lines changed

5 files changed

+199
-27
lines changed

crates/storage-pg/src/iden.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,18 @@ pub enum PersonalSessions {
125125
LastActiveIp,
126126
}
127127

128+
#[derive(sea_query::Iden)]
129+
#[iden = "personal_access_tokens"]
130+
pub enum PersonalAccessTokens {
131+
Table,
132+
PersonalAccessTokenId,
133+
PersonalSessionId,
134+
// AccessTokenSha256,
135+
CreatedAt,
136+
ExpiresAt,
137+
RevokedAt,
138+
}
139+
128140
#[derive(sea_query::Iden)]
129141
#[iden = "upstream_oauth_providers"]
130142
pub enum UpstreamOAuthProviders {

crates/storage-pg/src/personal/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,16 @@ mod tests {
105105

106106
let full_list = repo.personal_session().list(all, pagination).await.unwrap();
107107
assert_eq!(full_list.edges.len(), 1);
108-
assert_eq!(full_list.edges[0].node.id, session.id);
109-
assert!(full_list.edges[0].node.is_valid());
108+
assert_eq!(full_list.edges[0].node.0.id, session.id);
109+
assert!(full_list.edges[0].node.0.is_valid());
110110
let active_list = repo
111111
.personal_session()
112112
.list(active, pagination)
113113
.await
114114
.unwrap();
115115
assert_eq!(active_list.edges.len(), 1);
116-
assert_eq!(active_list.edges[0].node.id, session.id);
117-
assert!(active_list.edges[0].node.is_valid());
116+
assert_eq!(active_list.edges[0].node.0.id, session.id);
117+
assert!(active_list.edges[0].node.0.is_valid());
118118
let finished_list = repo
119119
.personal_session()
120120
.list(finished, pagination)
@@ -154,7 +154,7 @@ mod tests {
154154

155155
let full_list = repo.personal_session().list(all, pagination).await.unwrap();
156156
assert_eq!(full_list.edges.len(), 1);
157-
assert_eq!(full_list.edges[0].node.id, session.id);
157+
assert_eq!(full_list.edges[0].node.0.id, session.id);
158158
let active_list = repo
159159
.personal_session()
160160
.list(active, pagination)
@@ -167,8 +167,8 @@ mod tests {
167167
.await
168168
.unwrap();
169169
assert_eq!(finished_list.edges.len(), 1);
170-
assert_eq!(finished_list.edges[0].node.id, session.id);
171-
assert!(finished_list.edges[0].node.is_revoked());
170+
assert_eq!(finished_list.edges[0].node.0.id, session.id);
171+
assert!(finished_list.edges[0].node.0.is_revoked());
172172

173173
// Reload the session and check again
174174
let session_lookup = repo

crates/storage-pg/src/personal/session.rs

Lines changed: 138 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use async_trait::async_trait;
99
use chrono::{DateTime, Utc};
1010
use mas_data_model::{
1111
Clock, User,
12-
personal::session::{PersonalSession, PersonalSessionOwner, SessionState},
12+
personal::{
13+
PersonalAccessToken,
14+
session::{PersonalSession, PersonalSessionOwner, SessionState},
15+
},
1316
};
1417
use mas_storage::{
1518
Page, Pagination,
@@ -19,7 +22,7 @@ use mas_storage::{
1922
use oauth2_types::scope::Scope;
2023
use rand::RngCore;
2124
use sea_query::{
22-
Condition, Expr, PgFunc, PostgresQueryBuilder, Query, SimpleExpr, enum_def,
25+
Cond, Condition, Expr, PgFunc, PostgresQueryBuilder, Query, SimpleExpr, enum_def,
2326
extension::postgres::PgExpr as _,
2427
};
2528
use sea_query_binder::SqlxBinder as _;
@@ -31,7 +34,7 @@ use crate::{
3134
DatabaseError,
3235
errors::DatabaseInconsistencyError,
3336
filter::{Filter, StatementExt as _},
34-
iden::PersonalSessions,
37+
iden::{PersonalAccessTokens, PersonalSessions},
3538
pagination::QueryBuilderExt as _,
3639
tracing::ExecuteExt as _,
3740
};
@@ -116,6 +119,73 @@ impl TryFrom<PersonalSessionLookup> for PersonalSession {
116119
}
117120
}
118121

122+
#[derive(sqlx::FromRow)]
123+
#[enum_def]
124+
struct PersonalSessionAndAccessTokenLookup {
125+
personal_session_id: Uuid,
126+
owner_user_id: Option<Uuid>,
127+
owner_oauth2_client_id: Option<Uuid>,
128+
actor_user_id: Uuid,
129+
human_name: String,
130+
scope_list: Vec<String>,
131+
created_at: DateTime<Utc>,
132+
revoked_at: Option<DateTime<Utc>>,
133+
last_active_at: Option<DateTime<Utc>>,
134+
last_active_ip: Option<IpAddr>,
135+
136+
// tokens
137+
personal_access_token_id: Option<Uuid>,
138+
token_created_at: Option<DateTime<Utc>>,
139+
token_expires_at: Option<DateTime<Utc>>,
140+
}
141+
142+
impl Node<Ulid> for PersonalSessionAndAccessTokenLookup {
143+
fn cursor(&self) -> Ulid {
144+
self.personal_session_id.into()
145+
}
146+
}
147+
148+
impl TryFrom<PersonalSessionAndAccessTokenLookup>
149+
for (PersonalSession, Option<PersonalAccessToken>)
150+
{
151+
type Error = DatabaseInconsistencyError;
152+
153+
fn try_from(value: PersonalSessionAndAccessTokenLookup) -> Result<Self, Self::Error> {
154+
let session = PersonalSession::try_from(PersonalSessionLookup {
155+
personal_session_id: value.personal_session_id,
156+
owner_user_id: value.owner_user_id,
157+
owner_oauth2_client_id: value.owner_oauth2_client_id,
158+
actor_user_id: value.actor_user_id,
159+
human_name: value.human_name,
160+
scope_list: value.scope_list,
161+
created_at: value.created_at,
162+
revoked_at: value.revoked_at,
163+
last_active_at: value.last_active_at,
164+
last_active_ip: value.last_active_ip,
165+
})?;
166+
167+
let token_opt = if let Some(id) = value.personal_access_token_id {
168+
let id = Ulid::from(id);
169+
Some(PersonalAccessToken {
170+
id,
171+
session_id: session.id,
172+
// should not be possible
173+
created_at: value.token_created_at.ok_or(
174+
DatabaseInconsistencyError::on("personal_sessions")
175+
.column("created_at")
176+
.row(id),
177+
)?,
178+
expires_at: value.token_expires_at,
179+
revoked_at: None,
180+
})
181+
} else {
182+
None
183+
};
184+
185+
Ok((session, token_opt))
186+
}
187+
}
188+
119189
#[async_trait]
120190
impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
121191
type Error = DatabaseError;
@@ -274,60 +344,90 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
274344
&mut self,
275345
filter: PersonalSessionFilter<'_>,
276346
pagination: Pagination,
277-
) -> Result<Page<PersonalSession>, Self::Error> {
347+
) -> Result<Page<(PersonalSession, Option<PersonalAccessToken>)>, Self::Error> {
278348
let (sql, arguments) = Query::select()
279349
.expr_as(
280350
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)),
281-
PersonalSessionLookupIden::PersonalSessionId,
351+
PersonalSessionAndAccessTokenLookupIden::PersonalSessionId,
282352
)
283353
.expr_as(
284354
Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)),
285-
PersonalSessionLookupIden::OwnerUserId,
355+
PersonalSessionAndAccessTokenLookupIden::OwnerUserId,
286356
)
287357
.expr_as(
288358
Expr::col((
289359
PersonalSessions::Table,
290360
PersonalSessions::OwnerOAuth2ClientId,
291361
)),
292-
PersonalSessionLookupIden::OwnerOauth2ClientId,
362+
PersonalSessionAndAccessTokenLookupIden::OwnerOauth2ClientId,
293363
)
294364
.expr_as(
295365
Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)),
296-
PersonalSessionLookupIden::ActorUserId,
366+
PersonalSessionAndAccessTokenLookupIden::ActorUserId,
297367
)
298368
.expr_as(
299369
Expr::col((PersonalSessions::Table, PersonalSessions::HumanName)),
300-
PersonalSessionLookupIden::HumanName,
370+
PersonalSessionAndAccessTokenLookupIden::HumanName,
301371
)
302372
.expr_as(
303373
Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)),
304-
PersonalSessionLookupIden::ScopeList,
374+
PersonalSessionAndAccessTokenLookupIden::ScopeList,
305375
)
306376
.expr_as(
307377
Expr::col((PersonalSessions::Table, PersonalSessions::CreatedAt)),
308-
PersonalSessionLookupIden::CreatedAt,
378+
PersonalSessionAndAccessTokenLookupIden::CreatedAt,
309379
)
310380
.expr_as(
311381
Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)),
312-
PersonalSessionLookupIden::RevokedAt,
382+
PersonalSessionAndAccessTokenLookupIden::RevokedAt,
313383
)
314384
.expr_as(
315385
Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)),
316-
PersonalSessionLookupIden::LastActiveAt,
386+
PersonalSessionAndAccessTokenLookupIden::LastActiveAt,
317387
)
318388
.expr_as(
319389
Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveIp)),
320-
PersonalSessionLookupIden::LastActiveIp,
390+
PersonalSessionAndAccessTokenLookupIden::LastActiveIp,
391+
)
392+
.expr_as(
393+
Expr::col((
394+
PersonalAccessTokens::Table,
395+
PersonalAccessTokens::PersonalAccessTokenId,
396+
)),
397+
PersonalSessionAndAccessTokenLookupIden::PersonalAccessTokenId,
398+
)
399+
.expr_as(
400+
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::CreatedAt)),
401+
PersonalSessionAndAccessTokenLookupIden::TokenCreatedAt,
402+
)
403+
.expr_as(
404+
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::ExpiresAt)),
405+
PersonalSessionAndAccessTokenLookupIden::TokenExpiresAt,
321406
)
322407
.from(PersonalSessions::Table)
408+
.left_join(
409+
PersonalAccessTokens::Table,
410+
Cond::all()
411+
.add(
412+
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
413+
.eq(Expr::col((
414+
PersonalAccessTokens::Table,
415+
PersonalAccessTokens::PersonalSessionId,
416+
))),
417+
)
418+
.add(
419+
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt))
420+
.is_null(),
421+
),
422+
)
323423
.apply_filter(filter)
324424
.generate_pagination(
325425
(PersonalSessions::Table, PersonalSessions::PersonalSessionId),
326426
pagination,
327427
)
328428
.build_sqlx(PostgresQueryBuilder);
329429

330-
let edges: Vec<PersonalSessionLookup> = sqlx::query_as_with(&sql, arguments)
430+
let edges: Vec<PersonalSessionAndAccessTokenLookup> = sqlx::query_as_with(&sql, arguments)
331431
.traced()
332432
.fetch_all(&mut *self.conn)
333433
.await?;
@@ -349,6 +449,21 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
349449
let (sql, arguments) = Query::select()
350450
.expr(Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)).count())
351451
.from(PersonalSessions::Table)
452+
.left_join(
453+
PersonalAccessTokens::Table,
454+
Cond::all()
455+
.add(
456+
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
457+
.eq(Expr::col((
458+
PersonalAccessTokens::Table,
459+
PersonalAccessTokens::PersonalSessionId,
460+
))),
461+
)
462+
.add(
463+
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt))
464+
.is_null(),
465+
),
466+
)
352467
.apply_filter(filter)
353468
.build_sqlx(PostgresQueryBuilder);
354469

@@ -419,5 +534,13 @@ impl Filter for PersonalSessionFilter<'_> {
419534
Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt))
420535
.gt(last_active_after)
421536
}))
537+
.add_option(self.expires_before().map(|expires_before| {
538+
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::ExpiresAt))
539+
.lt(expires_before)
540+
}))
541+
.add_option(self.expires_after().map(|expires_after| {
542+
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::ExpiresAt))
543+
.gt(expires_after)
544+
}))
422545
}
423546
}

crates/storage/src/personal/session.rs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use async_trait::async_trait;
77
use chrono::{DateTime, Utc};
88
use mas_data_model::{
99
Client, Clock, Device, User,
10-
personal::session::{PersonalSession, PersonalSessionOwner},
10+
personal::{
11+
PersonalAccessToken,
12+
session::{PersonalSession, PersonalSessionOwner},
13+
},
1114
};
1215
use oauth2_types::scope::Scope;
1316
use rand_core::RngCore;
@@ -97,7 +100,7 @@ pub trait PersonalSessionRepository: Send + Sync {
97100
&mut self,
98101
filter: PersonalSessionFilter<'_>,
99102
pagination: Pagination,
100-
) -> Result<Page<PersonalSession>, Self::Error>;
103+
) -> Result<Page<(PersonalSession, Option<PersonalAccessToken>)>, Self::Error>;
101104

102105
/// Count [`PersonalSession`]s matching the given filter
103106
///
@@ -134,12 +137,13 @@ repository_impl!(PersonalSessionRepository:
134137
&mut self,
135138
filter: PersonalSessionFilter<'_>,
136139
pagination: Pagination,
137-
) -> Result<Page<PersonalSession>, Self::Error>;
140+
) -> Result<Page<(PersonalSession, Option<PersonalAccessToken>)>, Self::Error>;
138141

139142
async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result<usize, Self::Error>;
140143
);
141144

142-
/// Filter parameters for listing personal sessions
145+
/// Filter parameters for listing personal sessions alongside personal access
146+
/// tokens
143147
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
144148
pub struct PersonalSessionFilter<'a> {
145149
owner_user: Option<&'a User>,
@@ -150,6 +154,8 @@ pub struct PersonalSessionFilter<'a> {
150154
scope: Option<&'a Scope>,
151155
last_active_before: Option<DateTime<Utc>>,
152156
last_active_after: Option<DateTime<Utc>>,
157+
expires_before: Option<DateTime<Utc>>,
158+
expires_after: Option<DateTime<Utc>>,
153159
}
154160

155161
/// Filter for what state a personal session is in.
@@ -296,4 +302,34 @@ impl<'a> PersonalSessionFilter<'a> {
296302
pub fn device(&self) -> Option<&'a Device> {
297303
self.device
298304
}
305+
306+
/// Only return sessions whose access tokens expire before the given time
307+
#[must_use]
308+
pub fn with_expires_before(mut self, expires_before: DateTime<Utc>) -> Self {
309+
self.expires_before = Some(expires_before);
310+
self
311+
}
312+
313+
/// Get the expires before filter
314+
///
315+
/// Returns [`None`] if no expires before filter was set
316+
#[must_use]
317+
pub fn expires_before(&self) -> Option<DateTime<Utc>> {
318+
self.expires_before
319+
}
320+
321+
/// Only return sessions whose access tokens expire after the given time
322+
#[must_use]
323+
pub fn with_expires_after(mut self, expires_after: DateTime<Utc>) -> Self {
324+
self.expires_after = Some(expires_after);
325+
self
326+
}
327+
328+
/// Get the expires after filter
329+
///
330+
/// Returns [`None`] if no expires after filter was set
331+
#[must_use]
332+
pub fn expires_after(&self) -> Option<DateTime<Utc>> {
333+
self.expires_after
334+
}
299335
}

crates/tasks/src/matrix.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ impl RunnableJob for SyncDevicesJob {
259259
.map_err(JobError::retry)?;
260260

261261
for edge in page.edges {
262-
for scope in &*edge.node.scope {
262+
let (session, _) = &edge.node;
263+
for scope in &*session.scope {
263264
if let Some(device) = Device::from_scope_token(scope) {
264265
devices.insert(device.as_str().to_owned());
265266
}

0 commit comments

Comments
 (0)