Skip to content

Commit f696264

Browse files
committed
Fix halting bug when executing with remote worker
1 parent 3ffc0c7 commit f696264

File tree

4 files changed

+176
-10
lines changed

4 files changed

+176
-10
lines changed

packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export class ReductionTaskFlow<Input, Output> {
4242
reductionTask: Task<PairTuple<Output>, Output>;
4343
mergableFunction: (a: Output, b: Output) => boolean;
4444
},
45-
private readonly flowCreator: FlowCreator
45+
flowCreator: FlowCreator
4646
) {
4747
this.flow = flowCreator.createFlow<ReductionState<Output>>(options.name, {
4848
numMergesCompleted: 0,
@@ -121,8 +121,10 @@ export class ReductionTaskFlow<Input, Output> {
121121
const { availableReductions, touchedIndizes } =
122122
this.resolveReducibleTasks(flow.state.queue, options.mergableFunction);
123123

124-
// I don't know exactly what this rule wants from me, I suspect
125-
// it complains bcs the function is called forEach
124+
flow.state.queue = flow.state.queue.filter(
125+
(ignored, index) => !touchedIndizes.includes(index)
126+
);
127+
126128
await flow.forEach(availableReductions, async (reduction) => {
127129
const taskParameters: PairTuple<Output> = [reduction.r1, reduction.r2];
128130
await flow.pushTask(
@@ -135,10 +137,6 @@ export class ReductionTaskFlow<Input, Output> {
135137
}
136138
);
137139
});
138-
139-
flow.state.queue = flow.state.queue.filter(
140-
(ignored, index) => !touchedIndizes.includes(index)
141-
);
142140
}
143141
}
144142

packages/sequencer/src/worker/queue/LocalTaskQueue.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { log, mapSequential, noop } from "@proto-kit/common";
22

3-
import { SequencerModule } from "../../sequencer/builder/SequencerModule";
3+
import {
4+
sequencerModule,
5+
SequencerModule,
6+
} from "../../sequencer/builder/SequencerModule";
47
import { TaskPayload } from "../flow/Task";
58

69
import { Closeable, InstantiatedQueue, TaskQueue } from "./TaskQueue";
@@ -20,6 +23,7 @@ export interface LocalTaskQueueConfig {
2023
simulatedDuration?: number;
2124
}
2225

26+
@sequencerModule()
2327
export class LocalTaskQueue
2428
extends SequencerModule<LocalTaskQueueConfig>
2529
implements TaskQueue

packages/sequencer/test-integration/workers/workers-proven.test.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import { ChildProcessWorker } from "./ChildProcessWorker";
2626

2727
const timeout = 300000;
2828

29+
const proofsEnabled = false;
30+
2931
describe("worker-proven", () => {
3032
describe("sequencer", () => {
3133
let test: BlockTestService;
@@ -87,7 +89,7 @@ describe("worker-proven", () => {
8789
try {
8890
// Start AppChain
8991
const childContainer = container.createChildContainer();
90-
await app.start(false, childContainer);
92+
await app.start(proofsEnabled, childContainer);
9193

9294
test = app.sequencer.dependencyContainer.resolve(BlockTestService);
9395

@@ -124,10 +126,33 @@ describe("worker-proven", () => {
124126

125127
console.log(batch.proof);
126128

127-
expect(batch.proof.proof.length).toBeGreaterThan(50);
129+
expect(batch.proof.proof.length).toBeGreaterThan(
130+
proofsEnabled ? 50 : 0
131+
);
128132
expect(batch.blockHashes).toHaveLength(1);
129133
},
130134
timeout
131135
);
136+
137+
it.each([5, 14, 20])(
138+
"should produce a batch of a %s of blocks",
139+
async (numBlocks) => {
140+
for (let i = 0; i < numBlocks; i++) {
141+
await test.produceBlock();
142+
}
143+
144+
const batch = await test.produceBatch();
145+
146+
expectDefined(batch);
147+
148+
console.log(batch.proof);
149+
150+
expect(batch.proof.proof.length).toBeGreaterThan(
151+
proofsEnabled ? 50 : 0
152+
);
153+
expect(batch.blockHashes).toHaveLength(numBlocks);
154+
},
155+
timeout
156+
);
132157
});
133158
});
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import "reflect-metadata";
2+
import { container, DependencyContainer } from "tsyringe";
3+
import { noop, sleep } from "@proto-kit/common";
4+
5+
import {
6+
FlowCreator,
7+
FlowTaskWorker,
8+
JSONTaskSerializer,
9+
LocalTaskQueue,
10+
PairTuple,
11+
ReductionTaskFlow,
12+
Task,
13+
TaskSerializer,
14+
TaskWorkerModule,
15+
} from "../../../../src";
16+
17+
type IndexNumber = {
18+
index: number;
19+
value: number;
20+
};
21+
22+
type RangeSum = {
23+
from: number;
24+
to: number;
25+
value: number;
26+
};
27+
28+
class PairedMulTask
29+
extends TaskWorkerModule
30+
implements Task<PairTuple<RangeSum>, RangeSum>
31+
{
32+
public name = "sum";
33+
34+
public inputSerializer(): TaskSerializer<PairTuple<RangeSum>> {
35+
return JSONTaskSerializer.fromType<PairTuple<RangeSum>>();
36+
}
37+
38+
public resultSerializer(): TaskSerializer<RangeSum> {
39+
return JSONTaskSerializer.fromType<RangeSum>();
40+
}
41+
42+
public async compute([a, b]: PairTuple<RangeSum>): Promise<RangeSum> {
43+
return {
44+
from: a.from,
45+
to: b.to,
46+
value: a.value + b.value,
47+
};
48+
}
49+
50+
public async prepare(): Promise<void> {
51+
noop();
52+
}
53+
}
54+
55+
class NumberIdentityTask
56+
extends TaskWorkerModule
57+
implements Task<IndexNumber, RangeSum>
58+
{
59+
public name = "numberIdentity";
60+
61+
public inputSerializer(): TaskSerializer<IndexNumber> {
62+
return JSONTaskSerializer.fromType<IndexNumber>();
63+
}
64+
65+
public resultSerializer(): TaskSerializer<RangeSum> {
66+
return JSONTaskSerializer.fromType<RangeSum>();
67+
}
68+
69+
public async compute(input: IndexNumber): Promise<RangeSum> {
70+
return {
71+
from: input.index,
72+
to: input.index + 1,
73+
value: input.value,
74+
};
75+
}
76+
77+
public async prepare(): Promise<void> {
78+
noop();
79+
}
80+
}
81+
82+
describe("ReductionTaskFlow", () => {
83+
let di: DependencyContainer;
84+
beforeAll(async () => {
85+
di = container.createChildContainer();
86+
87+
const queue = new LocalTaskQueue();
88+
queue.config = {};
89+
90+
di.register("TaskQueue", {
91+
useValue: queue,
92+
});
93+
94+
const worker = new FlowTaskWorker(di.resolve("TaskQueue"), [
95+
di.resolve(NumberIdentityTask),
96+
di.resolve(PairedMulTask),
97+
]);
98+
await worker.start();
99+
});
100+
101+
it("regressions - should work for parallel result stream", async () => {
102+
expect.assertions(1);
103+
104+
const creator = di.resolve(FlowCreator);
105+
const flow = new ReductionTaskFlow<IndexNumber, RangeSum>(
106+
{
107+
inputLength: 5,
108+
mappingTask: di.resolve(NumberIdentityTask),
109+
reductionTask: di.resolve(PairedMulTask),
110+
name: "test",
111+
mergableFunction: (a, b) => {
112+
return a.to === b.from;
113+
},
114+
},
115+
creator
116+
);
117+
118+
const result = await new Promise(async (res) => {
119+
flow.onCompletion(async (output) => res(output));
120+
121+
await flow.pushInput({ index: 0, value: 1 });
122+
await flow.pushInput({ index: 1, value: 2 });
123+
await flow.pushInput({ index: 2, value: 3 });
124+
125+
await sleep(100);
126+
127+
await flow.pushInput({ index: 3, value: 4 });
128+
await flow.pushInput({ index: 4, value: 0 });
129+
});
130+
131+
const expected: RangeSum = {
132+
from: 0,
133+
to: 5,
134+
value: 1 + 2 + 3 + 4,
135+
};
136+
137+
expect(result).toStrictEqual(expected);
138+
}, 1000000);
139+
});

0 commit comments

Comments
 (0)