Skip to content

Commit 1a421e7

Browse files
committed
Merge branch 'develop' into feature/st-prover-3
# Conflicts: # packages/persistance/src/services/prisma/PrismaBlockStorage.ts # packages/sequencer/src/protocol/production/BatchProducerModule.ts # packages/sequencer/src/protocol/production/BlockTaskFlowService.ts # packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts # packages/sequencer/src/protocol/production/sequencing/BlockResultService.ts # packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts # packages/sequencer/src/storage/repositories/BlockStorage.ts # packages/sequencer/test/integration/BlockProduction.test.ts
2 parents bb3409b + a5843d2 commit 1a421e7

33 files changed

+492
-184
lines changed

.github/workflows/pull-request-develop.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ jobs:
9191

9292
- name: "Test"
9393
run: npm run test:ci
94+
env:
95+
IN_CI: true
9496

9597
integration:
9698
runs-on: ubuntu-latest

packages/api/src/graphql/modules/MempoolResolver.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ export class MempoolResolver extends GraphqlModule {
137137
return decoded.hash().toString();
138138
}
139139

140+
// TODO Add retrieval of pending messages somewhere as well
140141
@Query(() => InclusionStatus, {
141142
description: "Returns the state of a given transaction",
142143
})
@@ -146,13 +147,6 @@ export class MempoolResolver extends GraphqlModule {
146147
})
147148
hash: string
148149
): Promise<InclusionStatus> {
149-
const txs = await this.mempool.getTxs();
150-
const tx = txs.find((x) => x.hash().toString() === hash);
151-
152-
if (tx) {
153-
return InclusionStatus.PENDING;
154-
}
155-
156150
const dbTx = await this.transactionStorage.findTransaction(hash);
157151

158152
if (dbTx !== undefined) {
@@ -162,6 +156,7 @@ export class MempoolResolver extends GraphqlModule {
162156
if (dbTx.block !== undefined) {
163157
return InclusionStatus.INCLUDED;
164158
}
159+
return InclusionStatus.PENDING;
165160
}
166161

167162
return InclusionStatus.UNKNOWN;
@@ -172,7 +167,7 @@ export class MempoolResolver extends GraphqlModule {
172167
"Returns the hashes of all transactions that are currently inside the mempool",
173168
})
174169
public async transactions() {
175-
const txs = await this.mempool.getTxs();
170+
const txs = await this.transactionStorage.getPendingUserTransactions();
176171
return txs.map((x) => x.hash().toString());
177172
}
178173
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ export * from "./compiling/AtomicCompileHelper";
2323
export * from "./compiling/CompileRegistry";
2424
export * from "./compiling/CompilableModule";
2525
export * from "./compiling/services/ChildVerificationKeyService";
26+
export * from "./config/injectAlias";

packages/common/src/log.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ function logProvable(
2525
}
2626
/* eslint-enable */
2727

28+
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
29+
if (process.env?.IN_CI ?? false) {
30+
loglevel.setLevel("ERROR");
31+
}
32+
2833
const timeMap: Record<string, number> = {};
2934

3035
function time(label = "time") {
@@ -111,7 +116,10 @@ export const log = {
111116
},
112117

113118
setLevel: (level: LogLevelDesc) => {
114-
loglevel.setLevel(level);
119+
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
120+
if (!(process.env?.IN_CI ?? false)) {
121+
loglevel.setLevel(level);
122+
}
115123
},
116124

117125
get levels() {

packages/common/src/utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ export function requireTrue(
2020
}
2121
}
2222

23+
/**
24+
* Utility function to split an array of type T into a record <K, T[]> based on a
25+
* function T => K that determines the key of each record
26+
*/
2327
export function splitArray<T, K extends string | number>(
2428
arr: T[],
2529
split: (t: T) => K

packages/deployment/src/queue/BullQueue.ts

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ export interface BullQueueConfig {
2222
retryAttempts?: number;
2323
}
2424

25-
interface BullWorker extends Closeable {
26-
get worker(): Worker;
27-
}
28-
2925
/**
3026
* TaskQueue implementation for BullMQ
3127
*/
@@ -36,10 +32,6 @@ export class BullQueue
3632
{
3733
private activePromise?: Promise<void>;
3834

39-
private activeWorkers: Record<string, BullWorker> = {};
40-
41-
private activeJobs = 0;
42-
4335
public createWorker(
4436
name: string,
4537
executor: (data: TaskPayload) => Promise<TaskPayload>,
@@ -52,7 +44,6 @@ export class BullQueue
5244
// This is by far not optimal - since it still picks up 1 task per queue but waits until
5345
// computing them, so that leads to bad performance over multiple workers.
5446
// For that we need to restructure tasks to be flowing through a single queue however
55-
this.activeJobs += 1;
5647

5748
// TODO Use worker.pause()
5849
while (this.activePromise !== undefined) {
@@ -66,27 +57,10 @@ export class BullQueue
6657
});
6758
this.activePromise = promise;
6859

69-
// Pause all other workers
70-
const workersToPause = Object.entries(this.activeWorkers).filter(
71-
([key]) => key !== name
72-
);
73-
await Promise.all(
74-
workersToPause.map(([, workerToPause]) =>
75-
workerToPause.worker.pause(true)
76-
)
77-
);
78-
7960
const result = await executor(job.data);
8061
this.activePromise = undefined;
8162
void resOutside();
8263

83-
this.activeJobs -= 1;
84-
if (this.activeJobs === 0) {
85-
Object.entries(this.activeWorkers).forEach(([, resumingWorker]) =>
86-
resumingWorker.worker.resume()
87-
);
88-
}
89-
9064
return result;
9165
},
9266
{
@@ -105,16 +79,11 @@ export class BullQueue
10579
log.error(error);
10680
});
10781

108-
const instantiatedWorker = {
82+
return {
10983
async close() {
11084
await worker.close();
11185
},
112-
get worker() {
113-
return worker;
114-
},
11586
};
116-
this.activeWorkers[name] = instantiatedWorker;
117-
return instantiatedWorker;
11887
}
11988

12089
public async getQueue(queueName: string): Promise<InstantiatedQueue> {

packages/persistance/src/PrismaDatabaseConnection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,8 @@ export class PrismaDatabaseConnection
137137
public async close() {
138138
await this.prismaClient.$disconnect();
139139
}
140+
141+
public async executeInTransaction(f: () => Promise<void>) {
142+
await this.prismaClient.$transaction(f);
143+
}
140144
}

packages/persistance/src/PrismaRedisDatabase.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
RedisConnection,
1919
RedisConnectionConfig,
2020
RedisConnectionModule,
21+
RedisTransaction,
2122
} from "./RedisConnection";
2223

2324
export interface PrismaRedisCombinedConfig {
@@ -49,6 +50,10 @@ export class PrismaRedisDatabase
4950
return this.redis.redisClient;
5051
}
5152

53+
public get currentMulti(): RedisTransaction {
54+
return this.redis.currentMulti;
55+
}
56+
5257
public create(childContainerProvider: ChildContainerProvider) {
5358
super.create(childContainerProvider);
5459
this.prisma.create(childContainerProvider);
@@ -79,4 +84,12 @@ export class PrismaRedisDatabase
7984
await this.prisma.pruneDatabase();
8085
await this.redis.pruneDatabase();
8186
}
87+
88+
public async executeInTransaction(f: () => Promise<void>) {
89+
// TODO Long-term we want to somehow make sure we can rollback one data source
90+
// if commiting the other one's transaction fails
91+
await this.prisma.executeInTransaction(async () => {
92+
await this.redis.executeInTransaction(f);
93+
});
94+
}
8295
}

packages/persistance/src/RedisConnection.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ export interface RedisConnectionConfig {
1414
username?: string;
1515
}
1616

17+
export type RedisTransaction = ReturnType<RedisClientType["multi"]>;
18+
1719
export interface RedisConnection {
1820
get redisClient(): RedisClientType;
21+
get currentMulti(): RedisTransaction;
1922
}
2023

2124
export class RedisConnectionModule
@@ -82,4 +85,20 @@ export class RedisConnectionModule
8285
public async pruneDatabase() {
8386
await this.redisClient.flushDb();
8487
}
88+
89+
private multi?: RedisTransaction;
90+
91+
public get currentMulti() {
92+
if (this.multi === undefined) {
93+
throw new Error("Redis multi was access outside of a transaction");
94+
}
95+
return this.multi;
96+
}
97+
98+
public async executeInTransaction(f: () => Promise<void>) {
99+
this.multi = this.redisClient.multi();
100+
await f();
101+
await this.multi.exec();
102+
this.multi = undefined;
103+
}
85104
}

packages/persistance/src/services/prisma/PrismaBlockStorage.ts

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
BlockQueue,
77
BlockStorage,
88
BlockWithResult,
9+
BlockWithMaybeResult,
910
} from "@proto-kit/sequencer";
1011
import { log } from "@proto-kit/common";
1112
import {
@@ -37,7 +38,7 @@ export class PrismaBlockStorage
3738

3839
private async getBlockByQuery(
3940
where: { height: number } | { hash: string }
40-
): Promise<BlockWithResult | undefined> {
41+
): Promise<BlockWithMaybeResult | undefined> {
4142
const dbResult = await this.connection.prismaClient.block.findFirst({
4243
where,
4344
include: {
@@ -55,18 +56,15 @@ export class PrismaBlockStorage
5556
const transactions = dbResult.transactions.map<TransactionExecutionResult>(
5657
(txresult) => this.transactionResultMapper.mapIn([txresult, txresult.tx])
5758
);
58-
if (dbResult.result === undefined || dbResult.result === null) {
59-
throw new Error(
60-
`No Metadata has been set for block ${JSON.stringify(where)} yet`
61-
);
62-
}
6359

6460
return {
6561
block: {
6662
...this.blockMapper.mapIn(dbResult),
6763
transactions,
6864
},
69-
result: this.blockResultMapper.mapIn(dbResult.result),
65+
result: dbResult.result
66+
? this.blockResultMapper.mapIn(dbResult.result)
67+
: undefined,
7068
};
7169
}
7270

@@ -98,45 +96,43 @@ export class PrismaBlockStorage
9896

9997
const { prismaClient } = this.connection;
10098

101-
await prismaClient.$transaction([
102-
prismaClient.transaction.createMany({
103-
data: block.transactions.map((txr) =>
104-
this.transactionMapper.mapOut(txr.tx)
105-
),
106-
skipDuplicates: true,
107-
}),
108-
109-
prismaClient.block.create({
110-
data: {
111-
...encodedBlock,
112-
beforeBlockStateTransitions:
113-
encodedBlock.beforeBlockStateTransitions as Prisma.InputJsonArray,
114-
beforeNetworkState:
115-
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
116-
duringNetworkState:
117-
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
118-
119-
transactions: {
120-
createMany: {
121-
data: transactions.map((tx) => {
122-
return {
123-
status: tx.status,
124-
statusMessage: tx.statusMessage,
125-
txHash: tx.txHash,
126-
127-
stateTransitions:
128-
tx.stateTransitions as Prisma.InputJsonArray,
129-
events: tx.events as Prisma.InputJsonArray,
130-
};
131-
}),
132-
skipDuplicates: true,
133-
},
134-
},
99+
await prismaClient.transaction.createMany({
100+
data: block.transactions.map((txr) =>
101+
this.transactionMapper.mapOut(txr.tx)
102+
),
103+
skipDuplicates: true,
104+
});
105+
106+
await prismaClient.block.create({
107+
data: {
108+
...encodedBlock,
109+
beforeBlockStateTransitions:
110+
encodedBlock.beforeBlockStateTransitions as Prisma.InputJsonArray,
111+
beforeNetworkState:
112+
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
113+
duringNetworkState:
114+
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
135115

136-
batchHeight: undefined,
116+
transactions: {
117+
createMany: {
118+
data: transactions.map((tx) => {
119+
return {
120+
status: tx.status,
121+
statusMessage: tx.statusMessage,
122+
txHash: tx.txHash,
123+
124+
stateTransitions:
125+
tx.stateTransitions as Prisma.InputJsonArray,
126+
events: tx.events as Prisma.InputJsonArray,
127+
};
128+
}),
129+
skipDuplicates: true,
130+
},
137131
},
138-
}),
139-
]);
132+
133+
batchHeight: undefined,
134+
},
135+
});
140136
}
141137

142138
public async pushResult(result: BlockResult): Promise<void> {
@@ -167,7 +163,9 @@ export class PrismaBlockStorage
167163
return (result?._max.height ?? -1) + 1;
168164
}
169165

170-
public async getLatestBlock(): Promise<BlockWithResult | undefined> {
166+
public async getLatestBlockAndResult(): Promise<
167+
BlockWithMaybeResult | undefined
168+
> {
171169
const latestBlock = await this.connection.prismaClient.$queryRaw<
172170
{ hash: string }[]
173171
>`SELECT b1."hash" FROM "Block" b1
@@ -183,6 +181,22 @@ export class PrismaBlockStorage
183181
});
184182
}
185183

184+
public async getLatestBlock(): Promise<BlockWithResult | undefined> {
185+
const result = await this.getLatestBlockAndResult();
186+
if (result !== undefined) {
187+
if (result.result === undefined) {
188+
throw new Error(
189+
`Block result for block ${result.block.height.toString()} not found`
190+
);
191+
}
192+
return {
193+
block: result.block,
194+
result: result.result,
195+
};
196+
}
197+
return result;
198+
}
199+
186200
public async getNewBlocks(): Promise<BlockWithResult[]> {
187201
const blocks = await this.connection.prismaClient.block.findMany({
188202
where: {

0 commit comments

Comments
 (0)