Skip to content

Commit d48fcbc

Browse files
committed
properly implement finalized stream
1 parent 6f7a572 commit d48fcbc

File tree

4 files changed

+101
-113
lines changed

4 files changed

+101
-113
lines changed

processor/batch-processor/src/run.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class Processor<B extends BlockBase, S> {
6969
private db: Database<S>,
7070
private handler: (ctx: DataHandlerContext<B, S>) => Promise<void>
7171
) {
72-
this.chainHeight = new Throttler(() => this.src.getHead()?.then((r) => r?.number ?? -1), 30_000)
72+
this.chainHeight = new Throttler(() => this.src.getFinalizedHead()?.then((r) => r?.number ?? -1), 30_000)
7373
}
7474

7575
async run(): Promise<void> {
@@ -95,10 +95,9 @@ class Processor<B extends BlockBase, S> {
9595
let prevBlockNumber = head.height
9696
let prevBlockHash = head.height < 0 ? undefined : head.hash
9797

98-
let stream = this.src.getStream({
99-
range: {from: prevBlockNumber + 1},
100-
parentHash: prevBlockHash,
101-
})
98+
let stream = this.db.supportsHotBlocks
99+
? this.src.getStream({range: {from: prevBlockNumber + 1}, parentHash: prevBlockHash})
100+
: this.src.getFinalizedStream({range: {from: prevBlockNumber + 1}, parentHash: prevBlockHash})
102101

103102
for await (let data of stream) {
104103
let finalizedHead: HashAndHeight =

solana/solana-stream/src/archive/source.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,15 @@ export class PortalDataSource implements DataSource<PartialBlock> {
6262
return blocks[0].header as BlockHeader
6363
}
6464

65-
getFinalizedStream(req: DataSourceStreamOptions): DataSourceStream<PartialBlock> {
66-
throw new Error('Method not implemented.')
65+
getFinalizedStream(opts?: DataSourceStreamOptions): DataSourceStream<PartialBlock> {
66+
return this._getStream(opts, true)
6767
}
6868

69-
async *getStream(opts?: DataSourceStreamOptions) {
69+
getStream(opts?: DataSourceStreamOptions): DataSourceStream<PartialBlock> {
70+
return this._getStream(opts, false)
71+
}
72+
73+
private async *_getStream(opts?: DataSourceStreamOptions, finalized?: boolean): DataSourceStream<PartialBlock> {
7074
let requests = applyRangeBound(this.requests, opts?.range)
7175

7276
let archiveRequests = mapRangeRequestList(requests, (req) => {
@@ -110,7 +114,9 @@ export class PortalDataSource implements DataSource<PartialBlock> {
110114
},
111115
}
112116

113-
for await (let data of this.client.getStream(q)) {
117+
let stream = finalized ? this.client.getFinalizedStream(q) : this.client.getStream(q)
118+
119+
for await (let data of stream) {
114120
let finalizedHead = data.finalizedHead
115121
if (finalizedHead != null && finalizedHead.number >= parentBlockNumber) {
116122
// the gap is already finalized, we can skip it

solana/solana-stream/src/source.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,6 @@ class SolanaDataSource implements DataSource<PartialBlock> {
218218
) {
219219
this.ranges = this.requests.map((req) => req.range)
220220
}
221-
getFinalizedStream(req: DataSourceStreamOptions): DataSourceStream<PartialBlock> {
222-
throw new Error('Method not implemented.')
223-
}
224221

225222
getHead(): Promise<BlockRef | undefined> {
226223
return this.createArchive().getHead()
@@ -230,18 +227,35 @@ class SolanaDataSource implements DataSource<PartialBlock> {
230227
return this.createArchive().getFinalizedHead()
231228
}
232229

233-
async *getStream(opts?: DataSourceStreamOptions) {
230+
getFinalizedStream(opts: DataSourceStreamOptions): DataSourceStream<PartialBlock> {
231+
return this._getStream(opts, true)
232+
}
233+
234+
getStream(opts?: DataSourceStreamOptions): DataSourceStream<PartialBlock> {
235+
return this._getStream(opts, false)
236+
}
237+
238+
private async *_getStream(opts?: DataSourceStreamOptions, finalized?: boolean): DataSourceStream<PartialBlock> {
234239
let from = opts?.range?.from ?? 0
235240
let parentHash = opts?.parentHash
236241
if (this.archiveSettings) {
237242
let agent = new HttpAgent({keepAlive: true})
238243
try {
239244
let archive = this.createArchive(agent)
240-
for await (let batch of archive.getStream({
241-
range: {from, to: opts?.range?.to},
242-
stopOnHead: opts?.stopOnHead,
243-
parentHash,
244-
})) {
245+
246+
let stream = finalized
247+
? archive.getFinalizedStream({
248+
range: {from, to: opts?.range?.to},
249+
stopOnHead: opts?.stopOnHead,
250+
parentHash,
251+
})
252+
: archive.getStream({
253+
range: {from, to: opts?.range?.to},
254+
stopOnHead: opts?.stopOnHead,
255+
parentHash,
256+
})
257+
258+
for await (let batch of stream) {
245259
yield batch
246260
from = last(batch.blocks).header.number + 1
247261
parentHash = last(batch.blocks).header.hash

util/portal-client/src/client.ts

Lines changed: 64 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {HttpClient, HttpError} from '@subsquid/http-client'
1+
import {HttpClient, HttpError, HttpResponse} from '@subsquid/http-client'
22
import {
33
addErrorContext,
44
createFuture,
@@ -224,42 +224,7 @@ export class PortalClient {
224224
request,
225225
stopOnHead,
226226
},
227-
async (q, o) => {
228-
// NOTE: we emulate the same behavior as will be implemented for hot blocks stream,
229-
// but unfortunately we don't have any information about finalized block hash at the moment
230-
let finalizedHead = {
231-
number: await this.getFinalizedHeight(o),
232-
hash: '',
233-
}
234-
235-
let res = await this.client
236-
.request<Readable>('POST', this.getDatasetUrl('finalized-stream'), {
237-
...o,
238-
json: q,
239-
stream: true,
240-
})
241-
.catch(
242-
withErrorContext({
243-
query: q,
244-
})
245-
)
246-
247-
switch (res.status) {
248-
case 200:
249-
let stream = Readable.toWeb(res.body) as ReadableStream<Uint8Array>
250-
251-
return {
252-
finalizedHead,
253-
stream: stream
254-
.pipeThrough(new TextDecoderStream('utf8'))
255-
.pipeThrough(new LineSplitStream('\n')),
256-
}
257-
case 204:
258-
return undefined
259-
default:
260-
throw unexpectedCase(res.status)
261-
}
262-
}
227+
async (q, o) => this.getStreamRequest('finalized-stream', q, o)
263228
)
264229
}
265230

@@ -288,67 +253,59 @@ export class PortalClient {
288253
request,
289254
stopOnHead,
290255
},
291-
async (q, o) => {
292-
try {
293-
let res = await this.client
294-
.request<Readable | undefined>('POST', this.getDatasetUrl('stream'), {
295-
...o,
296-
json: q,
297-
stream: true,
298-
})
299-
.catch(
300-
withErrorContext({
301-
query: q,
302-
})
303-
)
304-
305-
switch (res.status) {
306-
case 200:
307-
let finalizedHeadHash = res.headers.get('X-Sqd-Finalized-Head-Hash')
308-
let finalizedHeadNumber = res.headers.get('X-Sqd-Finalized-Head-Number')
309-
310-
let finalizedHead: BlockRef | undefined =
311-
finalizedHeadHash != null && finalizedHeadNumber != null
312-
? {
313-
hash: finalizedHeadHash,
314-
number: parseInt(finalizedHeadNumber),
315-
}
316-
: undefined
317-
318-
let stream = res.body ? (Readable.toWeb(res.body) as ReadableStream<Uint8Array>) : undefined
319-
320-
return {
321-
finalizedHead,
322-
stream: stream
323-
?.pipeThrough(new TextDecoderStream('utf8'))
324-
?.pipeThrough(new LineSplitStream('\n')),
325-
}
326-
case 204:
327-
return undefined
328-
default:
329-
throw unexpectedCase(res.status)
330-
}
331-
} catch (e: unknown) {
332-
if (
333-
e instanceof HttpError &&
334-
e.response.status === 409 &&
335-
q.fromBlock &&
336-
q.parentBlockHash &&
337-
e.response.body.lastBlocks
338-
) {
339-
let blocks = e.response.body.lastBlocks as BlockRef[]
340-
e = new ForkException(blocks, {
341-
fromBlock: q.fromBlock,
342-
parentBlockHash: q.parentBlockHash,
343-
})
344-
}
256+
async (q, o) => this.getStreamRequest('stream', q, o)
257+
)
258+
}
345259

346-
throw addErrorContext(e as any, {
347-
query,
260+
private async getStreamRequest(path: string, query: PortalQuery, options?: PortalRequestOptions) {
261+
try {
262+
let res = await this.client
263+
.request<Readable | undefined>('POST', this.getDatasetUrl(path), {
264+
...options,
265+
json: query,
266+
stream: true,
267+
})
268+
.catch(
269+
withErrorContext({
270+
query: query,
348271
})
349-
}
272+
)
273+
274+
switch (res.status) {
275+
case 200:
276+
let finalizedHead = getFinalizedHeadHeader(res.headers)
277+
let stream = res.body ? (Readable.toWeb(res.body) as ReadableStream<Uint8Array>) : undefined
278+
279+
return {
280+
finalizedHead,
281+
stream: stream
282+
?.pipeThrough(new TextDecoderStream('utf8'))
283+
?.pipeThrough(new LineSplitStream('\n')),
284+
}
285+
case 204:
286+
return undefined
287+
default:
288+
throw unexpectedCase(res.status)
350289
}
351-
)
290+
} catch (e: unknown) {
291+
if (
292+
e instanceof HttpError &&
293+
e.response.status === 409 &&
294+
query.fromBlock &&
295+
query.parentBlockHash &&
296+
e.response.body.lastBlocks
297+
) {
298+
let blocks = e.response.body.lastBlocks as BlockRef[]
299+
e = new ForkException(blocks, {
300+
fromBlock: query.fromBlock,
301+
parentBlockHash: query.parentBlockHash,
302+
})
303+
}
304+
305+
throw addErrorContext(e as any, {
306+
query,
307+
})
308+
}
352309
}
353310
}
354311

@@ -660,3 +617,15 @@ export function isForkException(err: unknown): err is ForkException {
660617
if (err instanceof Error && err.name === 'ForkError') return true
661618
return false
662619
}
620+
621+
function getFinalizedHeadHeader(headers: HttpResponse['headers']) {
622+
let finalizedHeadHash = headers.get('X-Sqd-Finalized-Head-Hash')
623+
let finalizedHeadNumber = headers.get('X-Sqd-Finalized-Head-Number')
624+
625+
return finalizedHeadHash != null && finalizedHeadNumber != null
626+
? {
627+
hash: finalizedHeadHash,
628+
number: parseInt(finalizedHeadNumber),
629+
}
630+
: undefined
631+
}

0 commit comments

Comments
 (0)