Skip to content

Commit dd0fcae

Browse files
OTLegendOTLegend
authored andcommitted
fix: split publish/finality handling with phased flow and timeouts
- add phased DKG adapter and use publish+mint phases, enqueue finality jobs - introduce job defaults for publish/finality timeouts (3m publish, 15m finality) - add mint_submitted status migration and update setup to include it
1 parent 43a7b0f commit dd0fcae

File tree

15 files changed

+1604
-218
lines changed

15 files changed

+1604
-218
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/* eslint-disable @typescript-eslint/no-explicit-any */
2+
3+
import { describe, it, beforeEach, afterEach } from "mocha";
4+
import { expect } from "chai";
5+
import { PublishingService } from "../../../../../packages/plugin-dkg-publisher/src/services/PublishingService";
6+
import { DkgService } from "../../../../../packages/plugin-dkg-publisher/src/services/DkgService";
7+
import { QueueService } from "../../../../../packages/plugin-dkg-publisher/src/services/QueueService";
8+
9+
// Minimal fake DB that mimics the chained drizzle calls used in PublishingService
10+
class FakeDb {
11+
public assetRow: any;
12+
public updates: any[];
13+
14+
constructor(assetRow: any) {
15+
this.assetRow = assetRow;
16+
this.updates = [];
17+
}
18+
19+
select() {
20+
return {
21+
from: () => ({
22+
where: () => ({
23+
limit: async () => [this.assetRow],
24+
}),
25+
}),
26+
};
27+
}
28+
29+
update() {
30+
const self = this;
31+
return {
32+
set(values: any) {
33+
return {
34+
where: async () => {
35+
self.updates.push(values);
36+
return [{ affectedRows: 1 }];
37+
},
38+
};
39+
},
40+
};
41+
}
42+
}
43+
44+
// Fake phased client to avoid hitting real dkg.js
45+
class FakePhasedClient {
46+
public publishCalled: boolean;
47+
public mintCalled: boolean;
48+
public ual: string;
49+
public txHash: string;
50+
51+
constructor(ual: string, txHash: string) {
52+
this.ual = ual;
53+
this.txHash = txHash;
54+
this.publishCalled = false;
55+
this.mintCalled = false;
56+
}
57+
58+
async publishPhase() {
59+
this.publishCalled = true;
60+
return {
61+
readyForMint: true,
62+
publishOperationId: "op-123",
63+
};
64+
}
65+
66+
async mintPhase() {
67+
this.mintCalled = true;
68+
return {
69+
UAL: this.ual,
70+
mintKnowledgeCollectionReceipt: { transactionHash: this.txHash },
71+
};
72+
}
73+
}
74+
75+
describe("PublishingService phased publish", () => {
76+
const wallet = {
77+
id: 1,
78+
address: "0xabc",
79+
privateKey: "0xkey",
80+
blockchain: "chain",
81+
};
82+
const assetRow = {
83+
id: 42,
84+
contentUrl: "http://example.com/test.json",
85+
epochs: 2,
86+
replications: 1,
87+
privacy: "private",
88+
status: "queued",
89+
};
90+
91+
let fakeDb: FakeDb;
92+
let fakeClient: FakePhasedClient;
93+
let publishingService: PublishingService;
94+
let originalFetch: any;
95+
96+
beforeEach(() => {
97+
fakeDb = new FakeDb(assetRow);
98+
fakeClient = new FakePhasedClient("did:dkg:ual123", "0xtxhash");
99+
100+
// Stub DkgService to return our fake phased client
101+
const dkgServiceStub = {
102+
createWalletPhasedClient: () => fakeClient,
103+
} as unknown as DkgService;
104+
105+
publishingService = new PublishingService(fakeDb as any, dkgServiceStub);
106+
107+
// Stub fetch used for loading content
108+
originalFetch = (global as any).fetch;
109+
(global as any).fetch = async () => ({
110+
ok: true,
111+
json: async () => ({ foo: "bar" }),
112+
});
113+
});
114+
115+
afterEach(() => {
116+
(global as any).fetch = originalFetch as any;
117+
});
118+
119+
it("runs publish+mint phases and sets status mint_submitted", async () => {
120+
const result = await publishingService.publishAsset(assetRow.id, wallet);
121+
122+
expect(result.success).to.equal(true);
123+
expect(result.ual).to.equal("did:dkg:ual123");
124+
expect(fakeClient.publishCalled).to.equal(true);
125+
expect(fakeClient.mintCalled).to.equal(true);
126+
127+
// Last DB update should set status to mint_submitted and store UAL/tx
128+
const lastUpdate = fakeDb.updates[fakeDb.updates.length - 1];
129+
expect(lastUpdate.status).to.equal("mint_submitted");
130+
expect(lastUpdate.ual).to.equal("did:dkg:ual123");
131+
expect(lastUpdate.transactionHash).to.equal("0xtxhash");
132+
});
133+
});
134+
135+
describe("QueueService finality-check handler", () => {
136+
it("marks asset published when finality is reached", async () => {
137+
// Fake asset/DB/wallet/DKG services
138+
const assetService = {
139+
getAsset: async () => ({ id: 7, ual: "did:dkg:ual123" }),
140+
updateAssetStatus: async (_id: number, status: string) => {
141+
expect(status).to.equal("published");
142+
},
143+
};
144+
const walletService = {
145+
getWalletForQueries: async () => ({
146+
id: 1,
147+
address: "0xabc",
148+
privateKey: "0xkey",
149+
}),
150+
};
151+
const dkgService = {
152+
createWalletPhasedClient: () => ({
153+
finalityPhase: async () => ({
154+
finality: { status: "FINALIZED" },
155+
numberOfConfirmations: 3,
156+
requiredConfirmations: 1,
157+
}),
158+
}),
159+
} as unknown as DkgService;
160+
161+
const queueService = Object.create(QueueService.prototype) as any;
162+
queueService.assetService = assetService;
163+
queueService.walletService = walletService;
164+
queueService.dkgService = dkgService;
165+
166+
const job = {
167+
id: "job-1",
168+
data: { assetId: 7, ual: "did:dkg:ual123" },
169+
updateProgress: async () => {},
170+
} as any;
171+
172+
const res = await queueService.processFinalityJob(job, 0);
173+
expect(res.success).to.equal(true);
174+
expect(res.assetId).to.equal(7);
175+
});
176+
177+
it("bubbles error when finality is not reached (timeout/lag)", async () => {
178+
let updateCalled = false;
179+
180+
const assetService = {
181+
getAsset: async () => ({ id: 8, ual: "did:dkg:ual999" }),
182+
updateAssetStatus: async () => {
183+
updateCalled = true;
184+
},
185+
};
186+
const walletService = {
187+
getWalletForQueries: async () => ({
188+
id: 2,
189+
address: "0xdef",
190+
privateKey: "0xkey2",
191+
}),
192+
};
193+
const dkgService = {
194+
createWalletPhasedClient: () => ({
195+
finalityPhase: async () => ({
196+
finality: { status: "NOT FINALIZED" },
197+
numberOfConfirmations: 0,
198+
requiredConfirmations: 3,
199+
}),
200+
}),
201+
} as unknown as DkgService;
202+
203+
const queueService = Object.create(QueueService.prototype) as any;
204+
queueService.assetService = assetService;
205+
queueService.walletService = walletService;
206+
queueService.dkgService = dkgService;
207+
208+
const job = {
209+
id: "job-2",
210+
data: { assetId: 8, ual: "did:dkg:ual999" },
211+
updateProgress: async () => {},
212+
} as any;
213+
214+
let caught = false;
215+
try {
216+
await queueService.processFinalityJob(job, 0);
217+
} catch (error: any) {
218+
caught = true;
219+
expect(error.message).to.include("Finality not reached");
220+
}
221+
222+
expect(caught).to.equal(true);
223+
expect(updateCalled).to.equal(false);
224+
});
225+
226+
it("enqueues a finality-check job after successful publish+mint", async () => {
227+
let finalityCalled = false;
228+
let finalityArgs: any = null;
229+
230+
const queueService = Object.create(QueueService.prototype) as any;
231+
queueService.assetService = {
232+
claimAssetForProcessing: async () => true,
233+
createPublishingAttempt: async () => 99,
234+
updatePublishingAttempt: async () => {},
235+
handleAssetFailure: async () => {},
236+
};
237+
queueService.walletService = {
238+
assignWalletToAsset: async () => ({
239+
id: 1,
240+
address: "0xabc",
241+
privateKey: "0xkey",
242+
}),
243+
releaseWallet: async () => {},
244+
};
245+
queueService.publishingService = {
246+
publishAsset: async () => ({
247+
success: true,
248+
ual: "did:dkg:ual123",
249+
transactionHash: "0xtxhash",
250+
}),
251+
};
252+
queueService.enqueueFinalityJob = async (
253+
assetId: number,
254+
priority: number,
255+
ual: string,
256+
transactionHash: string,
257+
) => {
258+
finalityCalled = true;
259+
finalityArgs = { assetId, priority, ual, transactionHash };
260+
};
261+
262+
const job = {
263+
id: "job-publish",
264+
data: { assetId: 123 },
265+
opts: { priority: 50 },
266+
timestamp: Date.now(),
267+
updateProgress: async () => {},
268+
} as any;
269+
270+
const res = await queueService.processPublishJob(job, 0);
271+
272+
expect(res.success).to.equal(true);
273+
expect(finalityCalled).to.equal(true);
274+
expect(finalityArgs).to.deep.equal({
275+
assetId: 123,
276+
priority: 50,
277+
ual: "did:dkg:ual123",
278+
transactionHash: "0xtxhash",
279+
});
280+
});
281+
});

packages/plugin-dkg-publisher/tests/dkg-publisher.spec.ts renamed to apps/agent/tests/integration/workflows/publisher-plugin.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import { describe, it, beforeEach, afterEach } from "mocha";
44
import { expect } from "chai";
5-
import dkgPublisherPlugin from "../dist/index.mjs";
5+
import dkgPublisherPlugin from "../../../../../packages/plugin-dkg-publisher/src/index";
66
import express from "express";
77
import request from "supertest";
88
import {
@@ -362,4 +362,4 @@ describe("@dkg/plugin-dkg-publisher checks", () => {
362362
}
363363
});
364364
});
365-
});
365+
});

packages/plugin-dkg-publisher/setup.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ volumes:
812812
max_attempts INTEGER DEFAULT 3,
813813
814814
-- Status and attempts
815-
status ENUM('pending', 'queued', 'assigned', 'publishing', 'published', 'failed') NOT NULL DEFAULT 'pending',
815+
status ENUM('pending', 'queued', 'assigned', 'publishing', 'mint_submitted', 'published', 'failed') NOT NULL DEFAULT 'pending',
816816
status_message TEXT,
817817
attempt_count INTEGER DEFAULT 0,
818818
retry_count INTEGER DEFAULT 0,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE `assets` MODIFY COLUMN `status` enum('pending','queued','assigned','publishing','mint_submitted','published','failed') NOT NULL DEFAULT 'pending';

0 commit comments

Comments
 (0)