Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5387b8c
Merge branch 'feature/sideloading_runtime' into feature/custom-token-…
rpanic Oct 15, 2024
25c6d20
Renamed ProtocolStartupModule to SequencerStartupModule
rpanic Oct 15, 2024
e7cfa11
Added timing log utils
rpanic Oct 15, 2024
a00bd14
Added bridge contract vk to compile task and registration args
rpanic Oct 16, 2024
83de537
Made LocalTaskQueue sequential
rpanic Oct 16, 2024
db3a52f
Moved CompileRegistry
rpanic Oct 29, 2024
32534b5
Added argument to app.start for proofs enabled
rpanic Oct 29, 2024
2587180
Renamed remaining appchain references to AreProofsEnabled
rpanic Oct 29, 2024
13ad8d2
Merge branch 'develop' into feature/enable-proving
rpanic Oct 29, 2024
658b1ff
Enabled ModuleContainers to also emit events themselves
rpanic Oct 29, 2024
b0ef48f
Made LocalTaskWorkerModule emit event when ready
rpanic Oct 29, 2024
ec36df6
LocalTaskQueue merge
rpanic Oct 29, 2024
0643231
Refactored BlockProduction Test
rpanic Oct 29, 2024
89699e7
Refactoring leftovers
rpanic Oct 29, 2024
95610b9
Created test for proven batch production
rpanic Oct 29, 2024
cffc909
Enabled proving for batches
rpanic Oct 29, 2024
8d3d613
Merge branch 'develop' into feature/enable-proving
rpanic Oct 29, 2024
36f2eba
Linting
rpanic Oct 30, 2024
d78b390
reduced timeout
rpanic Oct 30, 2024
d0d597d
Fixed bullmq worker implementation
rpanic Oct 30, 2024
4726371
Merge pull request #218 from proto-kit/feature/proving-workers
maht0rz Jan 20, 2025
b88afdb
Merge branch 'develop' into feature/enable-proving
rpanic Jan 20, 2025
90a1dd8
Merge branch 'develop' into feature/enable-proving
rpanic Jan 27, 2025
a627060
Fixed compile errors after merge
rpanic Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
370 changes: 246 additions & 124 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/api/src/graphql/modules/BatchStorageResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class ComputedBlockModel {
blockHashes.map(
(blockHash) => blocks.find((block) => block?.hash === blockHash)!
),
proof.proof === MOCK_PROOF ? "mock-proof" : JSON.stringify(proof)
proof.proof === MOCK_PROOF ? MOCK_PROOF : JSON.stringify(proof)
);
}

Expand Down
30 changes: 27 additions & 3 deletions packages/common/src/events/EventEmitterProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import type {
import { StringKeyOf, UnionToIntersection } from "../types";

import { EventEmitter } from "./EventEmitter";
import { EventEmittingComponent, EventsRecord } from "./EventEmittingComponent";
import {
EventEmittingComponent,
EventEmittingContainer,
EventsRecord,
} from "./EventEmittingComponent";

export type CastToEventsRecord<Record> = Record extends EventsRecord
? Record
Expand All @@ -17,7 +21,13 @@ export type ModuleEvents<ModuleType extends BaseModuleType> =
? Events
: InstanceType<ModuleType> extends ModuleContainer<infer NestedModules>
? CastToEventsRecord<ContainerEvents<NestedModules>>
: EventsRecord;
: // &
// (InstanceType<ModuleType> extends EventEmittingContainer<
// infer ContainerEvents
// >
// ? ContainerEvents
// : {})
EventsRecord;

export type ContainerEvents<Modules extends ModulesRecord> = {
[Key in StringKeyOf<Modules>]: ModuleEvents<Modules[Key]>;
Expand All @@ -27,7 +37,7 @@ export type FlattenObject<Target extends Record<string, EventsRecord>> =
UnionToIntersection<Target[keyof Target]>;

export type FlattenedContainerEvents<Modules extends ModulesRecord> =
FlattenObject<ContainerEvents<Modules>>;
FlattenObject<ContainerEvents<Modules>>; // & FlattenObject<any>;

export class EventEmitterProxy<
Modules extends ModulesRecord,
Expand All @@ -45,10 +55,24 @@ export class EventEmitterProxy<
this.emit(events, ...args);
});
}
if (this.isEventEmittingContainer(module)) {
module.containerEvents.onAll((events: any, args: any[]) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.emit(events, ...args);
});
}
}
});
}

private isEventEmittingContainer(
module: any
): module is EventEmittingContainer<EventsRecord> {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const emitter = module.containerEvents;
return emitter !== undefined && emitter instanceof EventEmitter;
}

private isEventEmitter(
module: any
): module is EventEmittingComponent<EventsRecord> {
Expand Down
4 changes: 4 additions & 0 deletions packages/common/src/events/EventEmittingComponent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ export type EventsRecord = Record<string, unknown[]>;
export interface EventEmittingComponent<Events extends EventsRecord> {
events: EventEmitter<Events>;
}

export interface EventEmittingContainer<Events extends EventsRecord> {
containerEvents: EventEmitter<Events>;
}
38 changes: 38 additions & 0 deletions packages/common/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ function logProvable(
}
/* eslint-enable */

const timeMap: Record<string, number> = {};

function time(label = "time") {
timeMap[label] = Date.now();
}

function timeLog(label = "time"): string {
const prev = timeMap[label];
if (prev === undefined) {
return "Label not found";
}
return `${label} took ${Date.now() - prev}ms`;
}

function timeEnd(label = "time"): string {
const str = timeLog(label);
delete timeMap[label];
return str;
}

export const log = {
provable: {
info: (...args: unknown[]) => {
Expand All @@ -48,6 +68,24 @@ export const log = {
},
},

time,

timeLog: {
info: (label?: string) => loglevel.info(timeLog(label)),
debug: (label?: string) => loglevel.debug(timeLog(label)),
error: (label?: string) => loglevel.error(timeLog(label)),
trace: (label?: string) => loglevel.trace(timeLog(label)),
warn: (label?: string) => loglevel.warn(timeLog(label)),
},

timeEnd: {
info: (label?: string) => loglevel.info(timeEnd(label)),
debug: (label?: string) => loglevel.debug(timeEnd(label)),
error: (label?: string) => loglevel.error(timeEnd(label)),
trace: (label?: string) => loglevel.trace(timeEnd(label)),
warn: (label?: string) => loglevel.warn(timeEnd(label)),
},

info: (...args: unknown[]) => {
loglevel.info(...args);
},
Expand Down
24 changes: 15 additions & 9 deletions packages/common/src/zkProgrammable/ZkProgrammable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { dummyVerificationKey } from "../dummyVerificationKey";
import { MOCK_PROOF } from "./provableMethod";

const errors = {
appChainNotSet: (name: string) =>
new Error(`Appchain was not injected for: ${name}`),
areProofsEnabledNotSet: (name: string) =>
new Error(`AreProofsEnabled was not injected for: ${name}`),
};

export interface CompileArtifact {
Expand Down Expand Up @@ -72,6 +72,8 @@ export function verifyToMockable<PublicInput, PublicOutput>(
return verified;
}

console.log("VerifyMocked");

return proof.proof === MOCK_PROOF;
};
}
Expand All @@ -97,25 +99,29 @@ export abstract class ZkProgrammable<
PublicInput = undefined,
PublicOutput = void,
> {
public abstract get appChain(): AreProofsEnabled | undefined;
public abstract get areProofsEnabled(): AreProofsEnabled | undefined;

public abstract zkProgramFactory(): PlainZkProgram<
PublicInput,
PublicOutput
>[];

private zkProgramSingleton?: PlainZkProgram<PublicInput, PublicOutput>[];

@Memoize()
public get zkProgram(): PlainZkProgram<PublicInput, PublicOutput>[] {
const zkProgram = this.zkProgramFactory();
if (this.zkProgramSingleton === undefined) {
this.zkProgramSingleton = this.zkProgramFactory();
}

return zkProgram.map((bucket) => {
if (!this.appChain) {
throw errors.appChainNotSet(this.constructor.name);
return this.zkProgramSingleton.map((bucket) => {
if (!this.areProofsEnabled) {
throw errors.areProofsEnabledNotSet(this.constructor.name);
}
return {
...bucket,
verify: verifyToMockable(bucket.verify, this.appChain),
compile: compileToMockable(bucket.compile, this.appChain),
verify: verifyToMockable(bucket.verify, this.areProofsEnabled),
compile: compileToMockable(bucket.compile, this.areProofsEnabled),
};
});
}
Expand Down
28 changes: 14 additions & 14 deletions packages/common/src/zkProgrammable/provableMethod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ export function toProver(
...args: ArgumentTypes
) {
return async function prover(this: ZkProgrammable<any, any>) {
const areProofsEnabled = this.appChain?.areProofsEnabled;
if (areProofsEnabled ?? false) {
for (const prog of this.zkProgram) {
if (Object.keys(prog.methods).includes(methodName)) {
const programProvableMethod = prog.methods[methodName];
// eslint-disable-next-line no-await-in-loop
return await Reflect.apply(programProvableMethod, this, args);
}
}
const { areProofsEnabled } = this.areProofsEnabled!;

const zkProgram = this.zkProgram.find((prog) =>
Object.keys(prog.methods).includes(methodName)
);

if (zkProgram === undefined) {
throw new Error("Correct ZkProgram not found");
}

if (areProofsEnabled) {
const programProvableMethod = zkProgram.methods[methodName];
return await Reflect.apply(programProvableMethod, this, args);
}

// create a mock proof by simulating method execution in JS
// create a mock proof by simulating method> execution in JS
const publicOutput = await Reflect.apply(simulatedMethod, this, args);
const zkProgram =
this.zkProgram.find((prog) => {
return Object.keys(prog.methods).includes(methodName);
}) ?? this.zkProgram[0];

return new zkProgram.Proof({
proof: MOCK_PROOF,
Expand Down
6 changes: 3 additions & 3 deletions packages/common/test/zkProgrammable/ZkProgrammable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestProgrammable extends ZkProgrammable<
TestPublicInput,
TestPublicOutput
> {
public appChain: AreProofsEnabled = appChainMock;
public areProofsEnabled: AreProofsEnabled = appChainMock;

@provableMethod()
public async foo(publicInput: TestPublicInput, bar: Balance) {
Expand Down Expand Up @@ -97,7 +97,7 @@ class TestProgrammable extends ZkProgrammable<
}

class OtherTestProgrammable extends ZkProgrammable<undefined, void> {
public appChain: AreProofsEnabled = appChainMock;
public areProofsEnabled: AreProofsEnabled = appChainMock;

public constructor(public testProgrammable: TestProgrammable) {
super();
Expand Down Expand Up @@ -183,7 +183,7 @@ describe("zkProgrammable", () => {
(areProofsEnabled, { verificationKey, shouldVerifyMockProofs }) => {
beforeAll(async () => {
testProgrammable = new TestProgrammable();
testProgrammable.appChain.setProofsEnabled(areProofsEnabled);
testProgrammable.areProofsEnabled.setProofsEnabled(areProofsEnabled);
zkProgramFactorySpy = jest.spyOn(testProgrammable, "zkProgramFactory");
artifact = await testProgrammable.zkProgram[0].compile();
}, 500_000);
Expand Down
43 changes: 40 additions & 3 deletions packages/deployment/src/queue/BullQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface BullQueueConfig {
port: number;
username?: string;
password?: string;
db?: number;
};
retryAttempts?: number;
}
Expand All @@ -25,17 +26,41 @@ export class BullQueue
extends SequencerModule<BullQueueConfig>
implements TaskQueue
{
private activePromise?: Promise<void>;

public createWorker(
name: string,
executor: (data: TaskPayload) => Promise<TaskPayload>,
options?: { concurrency?: number }
): Closeable {
const worker = new Worker<TaskPayload, TaskPayload>(
name,
async (job) => await executor(job.data),
async (job) => {
// This weird promise logic is needed to make sure the worker is not proving in parallel
// This is by far not optimal - since it still picks up 1 task per queue but waits until
// computing them, so that leads to bad performance over multiple workers.
// For that we need to restructure tasks to be flowing through a single queue however
while (this.activePromise !== undefined) {
// eslint-disable-next-line no-await-in-loop
await this.activePromise;
}
let resOutside: () => void = () => {};
const promise = new Promise<void>((res) => {
resOutside = res;
});
this.activePromise = promise;

const result = await executor(job.data);
this.activePromise = undefined;
void resOutside();

return result;
},
{
concurrency: options?.concurrency ?? 1,
connection: this.config.redis,
stalledInterval: 60000, // 1 minute
lockDuration: 60000, // 1 minute

metrics: { maxDataPoints: MetricsTime.ONE_HOUR * 24 },
}
Expand Down Expand Up @@ -68,6 +93,7 @@ export class BullQueue
name: queueName,

async addTask(payload: TaskPayload): Promise<{ taskId: string }> {
log.debug("Adding task: ", payload);
const job = await queue.add(queueName, payload, {
attempts: retryAttempts ?? 2,
});
Expand All @@ -76,14 +102,25 @@ export class BullQueue

async onCompleted(listener: (payload: TaskPayload) => Promise<void>) {
events.on("completed", async (result) => {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
await listener(JSON.parse(result.returnvalue) as TaskPayload);
log.debug("Completed task: ", result);
try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
await listener(result.returnvalue as unknown as TaskPayload);
} catch (e) {
// Catch error explicitly since this promise is dangling,
// therefore any error will be voided as well
log.error(e);
}
});
events.on("error", async (error) => {
log.error("Error in worker", error);
});
await events.waitUntilReady();
},

async close(): Promise<void> {
await events.close();
await queue.drain();
await queue.close();
},
};
Expand Down
10 changes: 5 additions & 5 deletions packages/library/src/sequencer/SimpleSequencerModules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import {
BlockTrigger,
Database,
SequencerModule,
ProtocolStartupModule,
SequencerStartupModule,
} from "@proto-kit/sequencer";
import { TypedClass, ModulesConfig } from "@proto-kit/common";

type PreconfiguredSimpleSequencerModulesRecord = {
Mempool: typeof PrivateMempool;
BatchProducerModule: typeof BatchProducerModule;
BlockProducerModule: typeof BlockProducerModule;
ProtocolStartupModule: TypedClass<
ProtocolStartupModule & SequencerModule<unknown>
SequencerStartupModule: TypedClass<
SequencerStartupModule & SequencerModule<unknown>
>;
};

Expand Down Expand Up @@ -90,7 +90,7 @@ export class SimpleSequencerModules {
BlockTrigger,
TaskQueue,
...reducedModules,
ProtocolStartupModule,
SequencerStartupModule,
} satisfies SimpleSequencerModulesRecord;
}

Expand All @@ -102,7 +102,7 @@ export class SimpleSequencerModules {

Mempool: {},
BatchProducerModule: {},
ProtocolStartupModule: {},
SequencerStartupModule: {},
} satisfies ModulesConfig<PreconfiguredSimpleSequencerModulesRecord>;
}

Expand Down
6 changes: 3 additions & 3 deletions packages/module/src/runtime/Runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ export class RuntimeZkProgrammable<
super();
}

public get appChain() {
return this.runtime.appChain;
public get areProofsEnabled() {
return this.runtime.areProofsEnabled;
}

public zkProgramFactory(): PlainZkProgram<undefined, MethodPublicOutput>[] {
Expand Down Expand Up @@ -299,7 +299,7 @@ export class Runtime<Modules extends RuntimeModulesRecord>
this.useDependencyFactory(this.container.resolve(MethodIdFactory));
}

public get appChain(): AreProofsEnabled | undefined {
public get areProofsEnabled(): AreProofsEnabled | undefined {
return this.container.resolve<AreProofsEnabled>("AreProofsEnabled");
}

Expand Down
Loading
Loading