Skip to content

Commit c507f3e

Browse files
jochem-brouwerholgerd77
authored andcommitted
client: fetcher: add more docs, make store method explicitly convert
1 parent d5bd1ac commit c507f3e

File tree

4 files changed

+44
-30
lines changed

4 files changed

+44
-30
lines changed

packages/client/lib/sync/fetcher/blockfetcher.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
4040
* Requests blocks associated with this job
4141
* @param job
4242
*/
43-
async request(job: Job<JobTask, Block[]>): Promise<Block[]> {
43+
async request(job: Job<JobTask, Block[], Block>): Promise<Block[]> {
4444
const { task, peer } = job
4545
const { first, count } = task
4646
const headers = await (peer!.eth as EthProtocolMethods).getBlockHeaders({
@@ -62,12 +62,12 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
6262
* @param result fetch result
6363
* @return {*} results of processing job or undefined if job not finished
6464
*/
65-
process(job: Job<JobTask, Block[]>, result: Block[]): Block[] | null {
65+
process(job: Job<JobTask, Block[], Block>, result: Block[]) {
6666
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
6767
if (result && result.length === job.task.count) {
6868
return result
6969
}
70-
return null
70+
return
7171
}
7272

7373
/**

packages/client/lib/sync/fetcher/fetcher.ts

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export interface FetcherOptions {
3535
* request(), process() and store() methods. Tasks can be arbitrary objects whose structure
3636
* is defined by subclasses. A priority queue is used to ensure tasks are fetched
3737
* inorder. Three types need to be provided: the JobTask, which describes a task the job should perform,
38-
* a JobResult, which is the direct result when a Peer replies to a Task, and a StorageItem, which
38+
* a JobResult, which is the direct result when a Peer replies to a Task, and a StorageItem, which
3939
* represents the to-be-stored items.
4040
* @memberof module:sync/fetcher
4141
*/
@@ -48,8 +48,8 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
4848
protected banTime: number
4949
protected maxQueue: number
5050
protected maxPerRequest: number
51-
protected in: QHeap<Job<JobTask, JobResult>>
52-
protected out: QHeap<Job<JobTask, JobResult>>
51+
protected in: QHeap<Job<JobTask, JobResult, StorageItem>>
52+
protected out: QHeap<Job<JobTask, JobResult, StorageItem>>
5353
protected total: number
5454
protected processed: number // number of processed tasks, awaiting the write job
5555
protected finished: number // number of tasks which are both processed and also finished writing
@@ -76,10 +76,16 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
7676
this.maxPerRequest = options.maxPerRequest ?? 128
7777

7878
this.in = new Heap({
79-
comparBefore: (a: Job<JobTask, JobResult>, b: Job<JobTask, JobResult>) => a.index < b.index,
79+
comparBefore: (
80+
a: Job<JobTask, JobResult, StorageItem>,
81+
b: Job<JobTask, JobResult, StorageItem>
82+
) => a.index < b.index,
8083
})
8184
this.out = new Heap({
82-
comparBefore: (a: Job<JobTask, JobResult>, b: Job<JobTask, JobResult>) => a.index < b.index,
85+
comparBefore: (
86+
a: Job<JobTask, JobResult, StorageItem>,
87+
b: Job<JobTask, JobResult, StorageItem>
88+
) => a.index < b.index,
8389
})
8490
this.total = 0
8591
this.processed = 0
@@ -89,26 +95,34 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
8995
}
9096

9197
/**
92-
* Request results from peer for the given job. Resolves with the raw result.
98+
* Request results from peer for the given job. Resolves with the raw result. If `undefined` is returned,
99+
* re-queue the job.
93100
* @param job
94101
* @param peer
95102
* @return {Promise}
96103
*/
97-
abstract request(_job?: Job<JobTask, JobResult>, _peer?: Peer): Promise<JobResult | undefined>
104+
abstract request(
105+
_job?: Job<JobTask, JobResult, StorageItem>,
106+
_peer?: Peer
107+
): Promise<JobResult | undefined>
98108

99109
/**
100-
* Process the reply for the given job
110+
* Process the reply for the given job. If the reply contains unexpected data, return `undefined`, this
111+
* re-queues the job.
101112
* @param job fetch job
102113
* @param result result data
103114
*/
104-
abstract process(_job?: Job<JobTask, JobResult>, _result?: JobResult): JobResult | null
115+
abstract process(
116+
_job?: Job<JobTask, JobResult, StorageItem>,
117+
_result?: JobResult
118+
): StorageItem[] | undefined
105119

106-
/**
120+
/**
107121
* Store fetch result. Resolves once store operation is complete.
108122
* @param result fetch result
109123
* @return {Promise}
110124
*/
111-
abstract async store(_result?: StorageItem[]): Promise<void>
125+
abstract async store(_result: StorageItem[]): Promise<void>
112126

113127
/**
114128
* Generate list of tasks to fetch
@@ -122,13 +136,12 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
122136
* Enqueue job
123137
* @param job
124138
*/
125-
enqueue(job: Job<JobTask, JobResult>) {
139+
enqueue(job: Job<JobTask, JobResult, StorageItem>) {
126140
if (this.running) {
127141
this.in.insert({
128142
...job,
129143
time: Date.now(),
130144
state: 'idle',
131-
result: null,
132145
})
133146
}
134147
}
@@ -160,7 +173,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
160173
* @param job successful job
161174
* @param result job result
162175
*/
163-
success(job: Job<JobTask, JobResult>, result?: JobResult) {
176+
success(job: Job<JobTask, JobResult, StorageItem>, result?: JobResult) {
164177
if (job.state !== 'active') return
165178
if (result === undefined) {
166179
this.enqueue(job)
@@ -188,7 +201,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
188201
* @param job failed job
189202
* @param [error] error
190203
*/
191-
failure(job: Job<JobTask, JobResult>, error?: Error) {
204+
failure(job: Job<JobTask, JobResult, StorageItem>, error?: Error) {
192205
if (job.state !== 'active') return
193206
job.peer!.idle = true
194207
this.pool.ban(job.peer!, this.banTime)
@@ -235,7 +248,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
235248
* @param {Error} error error object
236249
* @param {Object} job task
237250
*/
238-
error(error: Error, job?: Job<JobTask, JobResult>) {
251+
error(error: Error, job?: Job<JobTask, JobResult, StorageItem>) {
239252
if (this.running) {
240253
this.emit('error', error, job && job.task, job && job.peer)
241254
}
@@ -293,11 +306,10 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
293306
}
294307
this.write()
295308
this.tasks().forEach((task: JobTask) => {
296-
const job: Job<JobTask, JobResult> = {
309+
const job: Job<JobTask, JobResult, StorageItem> = {
297310
task,
298311
time: Date.now(),
299312
index: this.total++,
300-
result: null,
301313
state: 'idle',
302314
peer: null,
303315
}
@@ -321,15 +333,15 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
321333
* @return {Peer}
322334
*/
323335
// TODO: what is job supposed to be?
324-
peer(_job?: Job<JobTask, JobResult>) {
336+
peer(_job?: Job<JobTask, JobResult, StorageItem>) {
325337
return this.pool.idle()
326338
}
327339

328340
/**
329341
* Expire job that has timed out and ban associated peer. Timed out tasks will
330342
* be re-inserted into the queue.
331343
*/
332-
expire(job: Job<JobTask, JobResult>) {
344+
expire(job: Job<JobTask, JobResult, StorageItem>) {
333345
job.state = 'expired'
334346
if (this.pool.contains(job.peer!)) {
335347
this.config.logger.debug(

packages/client/lib/sync/fetcher/headerfetcher.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export class HeaderFetcher extends BlockFetcherBase<BlockHeaderResult, BlockHead
3333
* Requests block headers for the given task
3434
* @param job
3535
*/
36-
async request(job: Job<JobTask, BlockHeaderResult>) {
36+
async request(job: Job<JobTask, BlockHeaderResult, BlockHeader>) {
3737
const { task, peer } = job
3838
if (this.flow.maxRequestCount(peer!, 'GetBlockHeaders') < this.maxPerRequest) {
3939
// we reached our request limit. try with a different peer.
@@ -56,19 +56,21 @@ export class HeaderFetcher extends BlockFetcherBase<BlockHeaderResult, BlockHead
5656
* @param result fetch result
5757
* @return {*} results of processing job or undefined if job not finished
5858
*/
59-
process(job: any, result: any) {
60-
this.flow.handleReply(job.peer, result.bv)
59+
process(job: Job<JobTask, BlockHeaderResult, BlockHeader>, result: BlockHeaderResult) {
60+
this.flow.handleReply(job.peer!, result.bv.toNumber())
61+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
6162
if (result.headers && result.headers.length === job.task.count) {
6263
return result.headers
6364
}
65+
return
6466
}
6567

6668
/**
6769
* Store fetch result. Resolves once store operation is complete.
6870
* @param {Header[]} headers fetch result
6971
* @return {Promise}
7072
*/
71-
async store(headers: any[]) {
73+
async store(headers: BlockHeader[]) {
7274
await this.chain.putHeaders(headers)
7375
}
7476

@@ -78,7 +80,7 @@ export class HeaderFetcher extends BlockFetcherBase<BlockHeaderResult, BlockHead
7880
* @return {Peer}
7981
*/
8082
// TODO: what is job supposed to be?
81-
peer(_job: any): Peer {
83+
peer(_job: Job<JobTask, BlockHeaderResult, BlockHeader>): Peer {
8284
return this.pool.idle((p: any) => p.les && p.les.status.serveHeaders)
8385
}
8486
}

packages/client/lib/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ export interface QHeap<T> {
3535
gc(opts: { minLength: number; maxLength: number }): void
3636
}
3737

38-
export type Job<JobTask, JobResult> = {
38+
export type Job<JobTask, JobResult, StorageItem> = {
3939
task: JobTask
4040
time: number
4141
index: number
42-
result: JobResult | null
42+
result?: JobResult | StorageItem[]
4343
state: 'idle' | 'expired' | 'active'
4444
peer: Peer | null
4545
}

0 commit comments

Comments
 (0)