@@ -4,7 +4,7 @@ const Heap = require('qheap')
4
4
import { PeerPool } from '../../net/peerpool'
5
5
import { Config } from '../../config'
6
6
7
- import { QHeap , Job } from '../../types'
7
+ import { QHeap , Job } from '../../types'
8
8
import { Peer } from '../../net/peer'
9
9
10
10
export interface FetcherOptions {
@@ -37,7 +37,7 @@ export interface FetcherOptions {
37
37
* inorder.
38
38
* @memberof module:sync/fetcher
39
39
*/
40
- export abstract class Fetcher < JobTask , JobResult > extends Readable {
40
+ export abstract class Fetcher < JobTask , JobResult , StorageItem > extends Readable {
41
41
public config : Config
42
42
43
43
protected pool : PeerPool
@@ -53,7 +53,8 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
53
53
protected finished : number // number of tasks which are both processed and also finished writing
54
54
protected running : boolean
55
55
protected reading : boolean
56
- private _readableState ?: { // This property is inherited from Readable. We only need `length`.
56
+ private _readableState ?: {
57
+ // This property is inherited from Readable. We only need `length`.
57
58
length : number
58
59
}
59
60
@@ -72,8 +73,12 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
72
73
this . maxQueue = options . maxQueue ?? 16
73
74
this . maxPerRequest = options . maxPerRequest ?? 128
74
75
75
- this . in = new Heap ( { comparBefore : ( a : Job < JobTask , JobResult > , b : Job < JobTask , JobResult > ) => a . index < b . index } )
76
- this . out = new Heap ( { comparBefore : ( a : Job < JobTask , JobResult > , b : Job < JobTask , JobResult > ) => a . index < b . index } )
76
+ this . in = new Heap ( {
77
+ comparBefore : ( a : Job < JobTask , JobResult > , b : Job < JobTask , JobResult > ) => a . index < b . index ,
78
+ } )
79
+ this . out = new Heap ( {
80
+ comparBefore : ( a : Job < JobTask , JobResult > , b : Job < JobTask , JobResult > ) => a . index < b . index ,
81
+ } )
77
82
this . total = 0
78
83
this . processed = 0
79
84
this . finished = 0
@@ -131,7 +136,7 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
131
136
* @param job successful job
132
137
* @param result job result
133
138
*/
134
- success ( job : Job < JobTask , JobResult > , result ?: JobResult [ ] ) {
139
+ success ( job : Job < JobTask , JobResult > , result ?: JobResult ) {
135
140
if ( job . state !== 'active' ) return
136
141
if ( result === undefined ) {
137
142
this . enqueue ( job )
@@ -194,7 +199,7 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
194
199
this . expire ( job )
195
200
} , this . timeout )
196
201
this . request ( job , peer )
197
- . then ( ( result : JobResult [ ] ) => this . success ( job , result ) )
202
+ . then ( ( result ? : JobResult ) => this . success ( job , result ) )
198
203
. catch ( ( error : Error ) => this . failure ( job , error ) )
199
204
. finally ( ( ) => clearTimeout ( timeout ) )
200
205
return job
@@ -217,7 +222,7 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
217
222
* to support backpressure from storing results.
218
223
*/
219
224
write ( ) {
220
- const _write = async ( result : JobResult [ ] , encoding : string | null , cb : Function ) => {
225
+ const _write = async ( result : StorageItem [ ] , encoding : string | null , cb : Function ) => {
221
226
try {
222
227
await this . store ( result )
223
228
this . finished ++
@@ -230,8 +235,14 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
230
235
const writer = new Writable ( {
231
236
objectMode : true ,
232
237
write : _write ,
233
- writev : ( many : { chunk : JobResult , encoding : string } [ ] , cb : Function ) =>
234
- _write ( ( < JobResult [ ] > [ ] ) . concat ( ...many . map ( ( x : { chunk : JobResult , encoding : string } ) => x . chunk ) ) , null , cb ) ,
238
+ writev : ( many : { chunk : StorageItem ; encoding : string } [ ] , cb : Function ) =>
239
+ _write (
240
+ ( < StorageItem [ ] > [ ] ) . concat (
241
+ ...many . map ( ( x : { chunk : StorageItem ; encoding : string } ) => x . chunk )
242
+ ) ,
243
+ null ,
244
+ cb
245
+ ) ,
235
246
} )
236
247
this . on ( 'close' , ( ) => {
237
248
this . running = false
@@ -296,14 +307,14 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
296
307
* @param peer
297
308
* @return {Promise }
298
309
*/
299
- abstract request ( _job ?: Job < JobTask , JobResult > , _peer ?: Peer ) : Promise < JobResult [ ] >
310
+ abstract request ( _job ?: Job < JobTask , JobResult > , _peer ?: Peer ) : Promise < JobResult | undefined >
300
311
301
312
/**
302
313
* Process the reply for the given job
303
314
* @param job fetch job
304
315
* @param result result data
305
316
*/
306
- abstract process ( _job ?: Job < JobTask , JobResult > , _result ?: JobResult [ ] ) : JobResult [ ] | null
317
+ abstract process ( _job ?: Job < JobTask , JobResult > , _result ?: JobResult ) : JobResult | null
307
318
308
319
/**
309
320
* Expire job that has timed out and ban associated peer. Timed out tasks will
@@ -329,7 +340,7 @@ export abstract class Fetcher<JobTask, JobResult> extends Readable {
329
340
* @param result fetch result
330
341
* @return {Promise }
331
342
*/
332
- abstract async store ( _result ?: JobResult [ ] ) : Promise < void >
343
+ abstract async store ( _result ?: StorageItem [ ] ) : Promise < void >
333
344
334
345
async wait ( delay ?: number ) {
335
346
await new Promise ( ( resolve ) => setTimeout ( resolve , delay || this . interval ) )
0 commit comments