Skip to content

Commit b539021

Browse files
holgerd77ScottyPoiacolytec3
authored
Client: Fetcher Small Bugfixes and Log Improvements (#3024)
* Client -> Execution: less frequent cache stats * Client -> Fetcher: log improvements * Client -> Fetcher: fix unwanted job execution on skip case, added some no-result guards * Client -> Fetcher: more explicit push to Readable stream * Client -> Fetcher: add initialization debug msg * Client -> Fetcher: tie job skipping to finished jobs instead of processed to avoid to large bulk import chunks * Client: a bit less frequent memory stats * Client -> Fetcher: fix occasional re-enqueue bug when length of an object is not defined, minor log improvements * Lighsync hot fix for fetcher reenqueue logic switches * add explicit boolean checks --------- Co-authored-by: Scotty <[email protected]> Co-authored-by: acolytec3 <[email protected]>
1 parent 3bd1573 commit b539021

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

packages/client/src/service/service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export class Service {
5757
*
5858
* (for info there will be somewhat reduced output)
5959
*/
60-
private STATS_INTERVAL = 1000 * 20 // 20 seconds
60+
private STATS_INTERVAL = 1000 * 30 // 30 seconds
6161

6262
/**
6363
* Shutdown the client when memory threshold is reached (in percent)

packages/client/src/sync/fetcher/blockfetcher.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,14 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
104104
process(job: Job<JobTask, Block[], Block>, result: Block[]) {
105105
result = (job.partialResult ?? []).concat(result)
106106
job.partialResult = undefined
107-
if (result.length === job.task.count) {
108-
return result
109-
} else if (result.length > 0 && result.length < job.task.count) {
110-
// Save partial result to re-request missing items.
111-
job.partialResult = result
112-
this.debug(`Partial result received=${result.length} expected=${job.task.count}`)
107+
if (result !== undefined) {
108+
if (result.length === job.task.count) {
109+
return result
110+
} else if (result.length > 0 && result.length < job.task.count) {
111+
// Save partial result to re-request missing items.
112+
job.partialResult = result
113+
this.debug(`Partial result received=${result.length} expected=${job.task.count}`)
114+
}
113115
}
114116
return
115117
}

packages/client/src/sync/fetcher/blockfetcherbase.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
114114
this.debug(`Enqueued num=${tasks.length} tasks`)
115115
} else {
116116
this.debug(
117-
`Fetcher skipping nextTasks in=${this.in.length} count=${this.count} processed=${this.processed} finished=${this.finished}`
117+
`No new tasks enqueued in=${this.in.length} count=${this.count} processed=${this.processed} finished=${this.finished}`
118118
)
119119
}
120120
}
@@ -245,6 +245,7 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
245245
if ('reverse' in this) {
246246
str += ` reverse=${this.reverse}`
247247
}
248+
str += ` state=${job.state}`
248249
return str
249250
}
250251
}

packages/client/src/sync/fetcher/fetcher.ts

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
8888
this.banTime = options.banTime ?? 60000
8989
this.maxQueue = options.maxQueue ?? 4
9090

91+
this.debug(
92+
`Fetcher initialized timeout=${this.timeout} interval=${this.interval} banTime=${this.banTime} maxQueue=${this.maxQueue}`
93+
)
94+
9195
this.in = new Heap({
9296
comparBefore: (
9397
a: Job<JobTask, JobResult, StorageItem>,
@@ -183,7 +187,9 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
183187
for (let f = this.out.peek(); f && f.index <= this.processed; ) {
184188
this.processed++
185189
const job = this.out.remove()
186-
if (!this.push(job)) {
190+
// Push the job to the Readable stream
191+
const success = this.push(job)
192+
if (!success) {
187193
return
188194
}
189195
f = this.out.peek()
@@ -226,17 +232,30 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
226232
* @param result job result
227233
*/
228234
private success(job: Job<JobTask, JobResult, StorageItem>, result?: JobResult) {
229-
if (job.state !== 'active') return
230235
let jobStr = this.jobStr(job, true)
236+
if (job.state !== 'active') return
237+
231238
let reenqueue = false
232239
let resultSet = ''
233240
if (result === undefined) {
234241
resultSet = 'undefined'
235242
reenqueue = true
236243
}
237-
if (result !== undefined && (result as any).length === 0) {
238-
resultSet = 'empty'
239-
reenqueue = true
244+
if (result !== undefined) {
245+
if ('length' in (result as any)) {
246+
if ((result as any).length === 0) {
247+
resultSet = 'empty'
248+
reenqueue = true
249+
}
250+
} else {
251+
// Hot-Fix for lightsync, 2023-12-29
252+
// (delete (only the if clause) in case lightsync code
253+
// has been removed at some point)
254+
if (!('reqId' in (result as any))) {
255+
resultSet = 'unknown'
256+
reenqueue = true
257+
}
258+
}
240259
}
241260
if (reenqueue) {
242261
this.debug(
@@ -253,7 +272,8 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
253272
job.peer!.idle = true
254273
job.result = this.process(job, result)
255274
jobStr = this.jobStr(job, true)
256-
if (job.result) {
275+
if (job.result !== undefined) {
276+
this.debug(`Successful job completion job ${jobStr}, writing to out and dequeue`)
257277
this.out.insert(job)
258278
this.dequeue()
259279
} else {
@@ -317,11 +337,11 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
317337
next() {
318338
this.nextTasks()
319339
const job = this.in.peek()
320-
if (!job) {
340+
if (job === undefined) {
321341
if (this.finished !== this.total) {
322342
// There are still jobs waiting to be processed out in the writer pipe
323343
this.debug(
324-
`No job found on next task, skip next job execution processed=${this.processed} finished=${this.finished} total=${this.total}`
344+
`No job found as next task, skip next job execution processed=${this.processed} finished=${this.finished} total=${this.total}`
325345
)
326346
} else {
327347
// There are no more jobs in the fetcher, so its better to resolve
@@ -331,19 +351,21 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
331351
}
332352
return false
333353
}
334-
if (this._readableState!.length > this.maxQueue) {
354+
const jobStr = this.jobStr(job)
355+
if (this._readableState === undefined || this._readableState!.length > this.maxQueue) {
335356
this.debug(
336357
`Readable state length=${this._readableState!.length} exceeds max queue size=${
337358
this.maxQueue
338-
}, skip next job execution.`
359+
}, skip job ${jobStr} execution.`
339360
)
340361
return false
341362
}
342-
if (job.index > this.processed + this.maxQueue) {
343-
this.debug(`Job index greater than processed + max queue size, skip next job execution.`)
363+
if (job.index > this.finished + this.maxQueue) {
364+
this.debug(`Job index greater than finished + max queue size, skip job ${jobStr} execution.`)
365+
return false
344366
}
345367
if (this.processed === this.total) {
346-
this.debug(`Total number of tasks reached, skip next job execution.`)
368+
this.debug(`Total number of tasks reached, skip job ${jobStr} execution.`)
347369
return false
348370
}
349371
const peer = this.peer()
@@ -355,16 +377,19 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
355377
const timeout = setTimeout(() => {
356378
this.expire(job)
357379
}, this.timeout)
380+
this.debug(`All requirements met for job ${jobStr}, start requesting.`)
358381
this.request(job, peer)
359-
.then((result?: JobResult) => this.success(job, result))
382+
.then((result?: JobResult) => {
383+
this.success(job, result)
384+
})
360385
.catch((error: Error) => {
361386
const { banPeer } = this.processStoreError(error, job.task)
362387
this.failure(job, error, false, false, banPeer)
363388
})
364389
.finally(() => clearTimeout(timeout))
365390
return job
366391
} else {
367-
this.debug(`No idle peer available, skip next job execution.`)
392+
this.debug(`No idle peer available, skip execution for job ${jobStr}.`)
368393
return false
369394
}
370395
}
@@ -415,6 +440,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
415440
cb: Function
416441
) => {
417442
const jobItems = job instanceof Array ? job : [job]
443+
this.debug(`Starting write for ${jobItems.length} jobs...`)
418444
try {
419445
for (const jobItem of jobItems) {
420446
await this.store(jobItem.result as StorageItem[])

0 commit comments

Comments
 (0)