Skip to content

Commit bff75d0

Browse files
committed
Ensure ordered ActivityPub message delivery
Add ordering keys to outgoing ActivityPub sends so related events for the same object are delivered in order across queue workers. This covers create, update, and delete flows for posts and articles, plus share/unshare, follow transitions, block/unblock, and reaction undo paths. Closes #162
1 parent fc038f8 commit bff75d0

File tree

7 files changed

+53
-8
lines changed

7 files changed

+53
-8
lines changed

federation/inbox/following.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,10 @@ export async function onFollowed(
215215
actor: fedCtx.getActorUri(followee.id),
216216
object: follow,
217217
}),
218-
{ excludeBaseUris: [new URL(fedCtx.canonicalOrigin)] },
218+
{
219+
orderingKey: rows[0].iri,
220+
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
221+
},
219222
);
220223
}
221224

models/article.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ export async function createArticle(
240240
object: articleObject,
241241
}),
242242
{
243+
orderingKey: post.iri,
243244
preferSharedInbox: true,
244245
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
245246
},
@@ -343,6 +344,7 @@ export async function updateArticle(
343344
object: articleObject,
344345
}),
345346
{
347+
orderingKey: post.iri,
346348
preferSharedInbox: true,
347349
excludeBaseUris: [
348350
new URL(fedCtx.origin),
@@ -563,11 +565,14 @@ export async function startArticleContentTranslation(
563565
ccs: articleObject.ccIds,
564566
object: articleObject,
565567
});
568+
const orderingKey = fedCtx.getObjectUri(vocab.Article, { id: article.id })
569+
.href;
566570
await fedCtx.sendActivity(
567571
{ identifier: article.accountId },
568572
"followers",
569573
update,
570574
{
575+
orderingKey,
571576
preferSharedInbox: true,
572577
excludeBaseUris: [
573578
new URL(fedCtx.origin),

models/blocking.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ export async function block(
102102
toRecipient(blockee),
103103
block,
104104
{
105+
orderingKey: rows[0].iri,
105106
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
106107
fanout: "skip",
107108
preferSharedInbox: false,
@@ -140,7 +141,10 @@ export async function unblock(
140141
object: new URL(blockee.iri),
141142
}),
142143
}),
143-
{ excludeBaseUris: [new URL(fedCtx.canonicalOrigin)] },
144+
{
145+
orderingKey: rows[0].iri,
146+
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
147+
},
144148
);
145149
}
146150
return rows[0];

models/following.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ export async function follow(
4848
actor: fedCtx.getActorUri(follower.id),
4949
object: new URL(followee.iri),
5050
}),
51-
{ excludeBaseUris: [new URL(fedCtx.canonicalOrigin)] },
51+
{
52+
orderingKey: rows[0].iri,
53+
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
54+
},
5255
);
5356
} else if (rows.length > 0 && followee.accountId != null) {
5457
await updateFolloweesCount(db, rows[0].followerId, 1);
@@ -131,7 +134,10 @@ export async function unfollow(
131134
object: new URL(followee.iri),
132135
}),
133136
}),
134-
{ excludeBaseUris: [new URL(fedCtx.canonicalOrigin)] },
137+
{
138+
orderingKey: rows[0].iri,
139+
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
140+
},
135141
);
136142
}
137143
if (rows.length > 0) {
@@ -180,6 +186,7 @@ export async function removeFollower(
180186
object: fedCtx.getActorUri(followee.id),
181187
}),
182188
}),
189+
{ orderingKey: rows[0].iri },
183190
);
184191
}
185192
return rows[0];

models/note.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ export async function createNote(
359359
ccs: noteObject.ccIds,
360360
object: noteObject,
361361
});
362+
const orderingKey = post.iri;
362363
if (post.mentions.length > 0) {
363364
const directRecipients: Recipient[] = post.mentions.map((m) => ({
364365
id: new URL(m.actor.iri),
@@ -372,6 +373,7 @@ export async function createNote(
372373
directRecipients,
373374
activity,
374375
{
376+
orderingKey,
375377
preferSharedInbox: false,
376378
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
377379
},
@@ -383,6 +385,7 @@ export async function createNote(
383385
"followers",
384386
activity,
385387
{
388+
orderingKey,
386389
preferSharedInbox: true,
387390
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
388391
},
@@ -501,6 +504,7 @@ export async function updateNote(
501504
object: noteObject,
502505
}),
503506
{
507+
orderingKey: post.iri,
504508
preferSharedInbox: true,
505509
excludeBaseUris: [
506510
new URL(fedCtx.origin),

models/post.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,7 @@ export async function sharePost(
890890
"followers",
891891
announce,
892892
{
893+
orderingKey: share.iri,
893894
preferSharedInbox: true,
894895
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
895896
},
@@ -898,7 +899,10 @@ export async function sharePost(
898899
{ identifier: account.id },
899900
toRecipient(post.actor),
900901
announce,
901-
{ excludeBaseUris: [new URL(fedCtx.canonicalOrigin)] },
902+
{
903+
orderingKey: share.iri,
904+
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
905+
},
902906
);
903907
return share;
904908
}
@@ -948,6 +952,7 @@ export async function unsharePost(
948952
"followers",
949953
undo,
950954
{
955+
orderingKey: unshared[0].iri,
951956
preferSharedInbox: true,
952957
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
953958
},
@@ -956,7 +961,10 @@ export async function unsharePost(
956961
{ identifier: account.id },
957962
toRecipient(sharedPost.actor),
958963
undo,
959-
{ excludeBaseUris: [new URL(fedCtx.canonicalOrigin)] },
964+
{
965+
orderingKey: unshared[0].iri,
966+
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
967+
},
960968
);
961969
return unshared[0];
962970
}
@@ -1587,6 +1595,7 @@ export async function deletePost(
15871595
"followers",
15881596
activity,
15891597
{
1598+
orderingKey: post.iri,
15901599
preferSharedInbox: true,
15911600
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
15921601
},
@@ -1596,6 +1605,7 @@ export async function deletePost(
15961605
recipients,
15971606
activity,
15981607
{
1608+
orderingKey: post.iri,
15991609
preferSharedInbox: true,
16001610
excludeBaseUris: [new URL(fedCtx.canonicalOrigin)],
16011611
},

models/reaction.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ export async function react(
235235
post,
236236
});
237237
if (activity == null) return rows[0];
238+
const orderingKey = id.href;
238239
await ctx.sendActivity(
239240
{ identifier: account.id },
240241
{
@@ -245,13 +246,18 @@ export async function react(
245246
: { sharedInbox: new URL(post.actor.sharedInboxUrl) },
246247
},
247248
activity,
248-
{ excludeBaseUris: [new URL(ctx.canonicalOrigin)], fanout: "skip" },
249+
{
250+
orderingKey,
251+
excludeBaseUris: [new URL(ctx.canonicalOrigin)],
252+
fanout: "skip",
253+
},
249254
);
250255
await ctx.sendActivity(
251256
{ identifier: account.id },
252257
"followers",
253258
activity,
254259
{
260+
orderingKey,
255261
excludeBaseUris: [new URL(ctx.canonicalOrigin)],
256262
preferSharedInbox: true,
257263
},
@@ -292,6 +298,7 @@ export async function undoReaction(
292298
post,
293299
});
294300
if (activity?.id == null) return rows[0];
301+
const orderingKey = activity.id.href;
295302
const undo = new vocab.Undo({
296303
id: new URL("#undo", activity.id),
297304
actor: ctx.getActorUri(account.id),
@@ -310,13 +317,18 @@ export async function undoReaction(
310317
: { sharedInbox: new URL(post.actor.sharedInboxUrl) },
311318
},
312319
undo,
313-
{ excludeBaseUris: [new URL(ctx.canonicalOrigin)], fanout: "skip" },
320+
{
321+
orderingKey,
322+
excludeBaseUris: [new URL(ctx.canonicalOrigin)],
323+
fanout: "skip",
324+
},
314325
);
315326
await ctx.sendActivity(
316327
{ identifier: account.id },
317328
"followers",
318329
undo,
319330
{
331+
orderingKey,
320332
excludeBaseUris: [new URL(ctx.canonicalOrigin)],
321333
preferSharedInbox: true,
322334
},

0 commit comments

Comments
 (0)