Skip to content

Commit b9b3f78

Browse files
committed
Merge branch 'develop' into refactor/sequencing
# Conflicts: # packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts # packages/sequencer/src/protocol/production/sequencing/TransactionExecutionService.ts
2 parents 9a23eec + 2f2900f commit b9b3f78

35 files changed

+487
-192
lines changed

.github/workflows/pull-request-develop.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ jobs:
9191

9292
- name: "Test"
9393
run: npm run test:ci
94+
env:
95+
IN_CI: true
9496

9597
integration:
9698
runs-on: ubuntu-latest

packages/api/src/graphql/modules/MempoolResolver.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ export class MempoolResolver extends GraphqlModule {
137137
return decoded.hash().toString();
138138
}
139139

140+
// TODO Add retrieval of pending messages somewhere as well
140141
@Query(() => InclusionStatus, {
141142
description: "Returns the state of a given transaction",
142143
})
@@ -146,13 +147,6 @@ export class MempoolResolver extends GraphqlModule {
146147
})
147148
hash: string
148149
): Promise<InclusionStatus> {
149-
const txs = await this.mempool.getTxs();
150-
const tx = txs.find((x) => x.hash().toString() === hash);
151-
152-
if (tx) {
153-
return InclusionStatus.PENDING;
154-
}
155-
156150
const dbTx = await this.transactionStorage.findTransaction(hash);
157151

158152
if (dbTx !== undefined) {
@@ -162,6 +156,7 @@ export class MempoolResolver extends GraphqlModule {
162156
if (dbTx.block !== undefined) {
163157
return InclusionStatus.INCLUDED;
164158
}
159+
return InclusionStatus.PENDING;
165160
}
166161

167162
return InclusionStatus.UNKNOWN;
@@ -172,7 +167,7 @@ export class MempoolResolver extends GraphqlModule {
172167
"Returns the hashes of all transactions that are currently inside the mempool",
173168
})
174169
public async transactions() {
175-
const txs = await this.mempool.getTxs();
170+
const txs = await this.transactionStorage.getPendingUserTransactions();
176171
return txs.map((x) => x.hash().toString());
177172
}
178173
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ export * from "./compiling/AtomicCompileHelper";
2222
export * from "./compiling/CompileRegistry";
2323
export * from "./compiling/CompilableModule";
2424
export * from "./compiling/services/ChildVerificationKeyService";
25+
export * from "./config/injectAlias";

packages/common/src/log.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ function logProvable(
2525
}
2626
/* eslint-enable */
2727

28+
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
29+
if (process.env?.IN_CI ?? false) {
30+
loglevel.setLevel("ERROR");
31+
}
32+
2833
const timeMap: Record<string, number> = {};
2934

3035
function time(label = "time") {
@@ -111,7 +116,10 @@ export const log = {
111116
},
112117

113118
setLevel: (level: LogLevelDesc) => {
114-
loglevel.setLevel(level);
119+
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
120+
if (!(process.env?.IN_CI ?? false)) {
121+
loglevel.setLevel(level);
122+
}
115123
},
116124

117125
get levels() {

packages/common/src/utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ export function requireTrue(
1919
}
2020
}
2121

22+
/**
23+
* Utility function to split an array of type T into a record <K, T[]> based on a
24+
* function T => K that determines the key of each record
25+
*/
2226
export function splitArray<T, K extends string | number>(
2327
arr: T[],
2428
split: (t: T) => K

packages/deployment/src/queue/BullQueue.ts

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ export interface BullQueueConfig {
2222
retryAttempts?: number;
2323
}
2424

25-
interface BullWorker extends Closeable {
26-
get worker(): Worker;
27-
}
28-
2925
/**
3026
* TaskQueue implementation for BullMQ
3127
*/
@@ -36,10 +32,6 @@ export class BullQueue
3632
{
3733
private activePromise?: Promise<void>;
3834

39-
private activeWorkers: Record<string, BullWorker> = {};
40-
41-
private activeJobs = 0;
42-
4335
public createWorker(
4436
name: string,
4537
executor: (data: TaskPayload) => Promise<TaskPayload>,
@@ -52,7 +44,6 @@ export class BullQueue
5244
// This is by far not optimal - since it still picks up 1 task per queue but waits until
5345
// computing them, so that leads to bad performance over multiple workers.
5446
// For that we need to restructure tasks to be flowing through a single queue however
55-
this.activeJobs += 1;
5647

5748
// TODO Use worker.pause()
5849
while (this.activePromise !== undefined) {
@@ -65,27 +56,10 @@ export class BullQueue
6556
});
6657
this.activePromise = promise;
6758

68-
// Pause all other workers
69-
const workersToPause = Object.entries(this.activeWorkers).filter(
70-
([key]) => key !== name
71-
);
72-
await Promise.all(
73-
workersToPause.map(([, workerToPause]) =>
74-
workerToPause.worker.pause(true)
75-
)
76-
);
77-
7859
const result = await executor(job.data);
7960
this.activePromise = undefined;
8061
void resOutside();
8162

82-
this.activeJobs -= 1;
83-
if (this.activeJobs === 0) {
84-
Object.entries(this.activeWorkers).forEach(([, resumingWorker]) =>
85-
resumingWorker.worker.resume()
86-
);
87-
}
88-
8963
return result;
9064
},
9165
{
@@ -104,16 +78,11 @@ export class BullQueue
10478
log.error(error);
10579
});
10680

107-
const instantiatedWorker = {
81+
return {
10882
async close() {
10983
await worker.close();
11084
},
111-
get worker() {
112-
return worker;
113-
},
11485
};
115-
this.activeWorkers[name] = instantiatedWorker;
116-
return instantiatedWorker;
11786
}
11887

11988
public async getQueue(queueName: string): Promise<InstantiatedQueue> {

packages/persistance/src/PrismaDatabaseConnection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,8 @@ export class PrismaDatabaseConnection
137137
public async close() {
138138
await this.prismaClient.$disconnect();
139139
}
140+
141+
public async executeInTransaction(f: () => Promise<void>) {
142+
await this.prismaClient.$transaction(f);
143+
}
140144
}

packages/persistance/src/PrismaRedisDatabase.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
RedisConnection,
1919
RedisConnectionConfig,
2020
RedisConnectionModule,
21+
RedisTransaction,
2122
} from "./RedisConnection";
2223

2324
export interface PrismaRedisCombinedConfig {
@@ -49,6 +50,10 @@ export class PrismaRedisDatabase
4950
return this.redis.redisClient;
5051
}
5152

53+
public get currentMulti(): RedisTransaction {
54+
return this.redis.currentMulti;
55+
}
56+
5257
public create(childContainerProvider: ChildContainerProvider) {
5358
super.create(childContainerProvider);
5459
this.prisma.create(childContainerProvider);
@@ -79,4 +84,12 @@ export class PrismaRedisDatabase
7984
await this.prisma.pruneDatabase();
8085
await this.redis.pruneDatabase();
8186
}
87+
88+
public async executeInTransaction(f: () => Promise<void>) {
89+
// TODO Long-term we want to somehow make sure we can rollback one data source
90+
// if commiting the other one's transaction fails
91+
await this.prisma.executeInTransaction(async () => {
92+
await this.redis.executeInTransaction(f);
93+
});
94+
}
8295
}

packages/persistance/src/RedisConnection.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ export interface RedisConnectionConfig {
1414
username?: string;
1515
}
1616

17+
export type RedisTransaction = ReturnType<RedisClientType["multi"]>;
18+
1719
export interface RedisConnection {
1820
get redisClient(): RedisClientType;
21+
get currentMulti(): RedisTransaction;
1922
}
2023

2124
export class RedisConnectionModule
@@ -82,4 +85,20 @@ export class RedisConnectionModule
8285
public async pruneDatabase() {
8386
await this.redisClient.flushDb();
8487
}
88+
89+
private multi?: RedisTransaction;
90+
91+
public get currentMulti() {
92+
if (this.multi === undefined) {
93+
throw new Error("Redis multi was access outside of a transaction");
94+
}
95+
return this.multi;
96+
}
97+
98+
public async executeInTransaction(f: () => Promise<void>) {
99+
this.multi = this.redisClient.multi();
100+
await f();
101+
await this.multi.exec();
102+
this.multi = undefined;
103+
}
85104
}

packages/persistance/src/services/prisma/PrismaBlockStorage.ts

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
BlockStorage,
99
BlockWithResult,
1010
BlockWithPreviousResult,
11+
BlockWithMaybeResult,
1112
} from "@proto-kit/sequencer";
1213
import { filterNonNull, log } from "@proto-kit/common";
1314
import {
@@ -39,7 +40,7 @@ export class PrismaBlockStorage
3940

4041
private async getBlockByQuery(
4142
where: { height: number } | { hash: string }
42-
): Promise<BlockWithResult | undefined> {
43+
): Promise<BlockWithMaybeResult | undefined> {
4344
const dbResult = await this.connection.prismaClient.block.findFirst({
4445
where,
4546
include: {
@@ -57,18 +58,15 @@ export class PrismaBlockStorage
5758
const transactions = dbResult.transactions.map<TransactionExecutionResult>(
5859
(txresult) => this.transactionResultMapper.mapIn([txresult, txresult.tx])
5960
);
60-
if (dbResult.result === undefined || dbResult.result === null) {
61-
throw new Error(
62-
`No Metadata has been set for block ${JSON.stringify(where)} yet`
63-
);
64-
}
6561

6662
return {
6763
block: {
6864
...this.blockMapper.mapIn(dbResult),
6965
transactions,
7066
},
71-
result: this.blockResultMapper.mapIn(dbResult.result),
67+
result: dbResult.result
68+
? this.blockResultMapper.mapIn(dbResult.result)
69+
: undefined,
7270
};
7371
}
7472

@@ -100,45 +98,42 @@ export class PrismaBlockStorage
10098

10199
const { prismaClient } = this.connection;
102100

103-
await prismaClient.$transaction([
104-
prismaClient.transaction.createMany({
105-
data: block.transactions.map((txr) =>
106-
this.transactionMapper.mapOut(txr.tx)
107-
),
108-
skipDuplicates: true,
109-
}),
110-
111-
prismaClient.block.create({
112-
data: {
113-
...encodedBlock,
114-
beforeNetworkState:
115-
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
116-
duringNetworkState:
117-
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
118-
119-
transactions: {
120-
createMany: {
121-
data: transactions.map((tx) => {
122-
return {
123-
status: tx.status,
124-
statusMessage: tx.statusMessage,
125-
txHash: tx.txHash,
126-
127-
stateTransitions:
128-
tx.stateTransitions as Prisma.InputJsonArray,
129-
protocolTransitions:
130-
tx.protocolTransitions as Prisma.InputJsonArray,
131-
events: tx.events as Prisma.InputJsonArray,
132-
};
133-
}),
134-
skipDuplicates: true,
135-
},
136-
},
101+
await prismaClient.transaction.createMany({
102+
data: block.transactions.map((txr) =>
103+
this.transactionMapper.mapOut(txr.tx)
104+
),
105+
skipDuplicates: true,
106+
});
107+
108+
await prismaClient.block.create({
109+
data: {
110+
...encodedBlock,
111+
beforeNetworkState:
112+
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
113+
duringNetworkState:
114+
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
137115

138-
batchHeight: undefined,
116+
transactions: {
117+
createMany: {
118+
data: transactions.map((tx) => {
119+
return {
120+
status: tx.status,
121+
statusMessage: tx.statusMessage,
122+
txHash: tx.txHash,
123+
124+
stateTransitions: tx.stateTransitions as Prisma.InputJsonArray,
125+
protocolTransitions:
126+
tx.protocolTransitions as Prisma.InputJsonArray,
127+
events: tx.events as Prisma.InputJsonArray,
128+
};
129+
}),
130+
skipDuplicates: true,
131+
},
139132
},
140-
}),
141-
]);
133+
134+
batchHeight: undefined,
135+
},
136+
});
142137
}
143138

144139
public async pushResult(result: BlockResult): Promise<void> {
@@ -169,7 +164,9 @@ export class PrismaBlockStorage
169164
return (result?._max.height ?? -1) + 1;
170165
}
171166

172-
public async getLatestBlock(): Promise<BlockWithResult | undefined> {
167+
public async getLatestBlockAndResult(): Promise<
168+
BlockWithMaybeResult | undefined
169+
> {
173170
const latestBlock = await this.connection.prismaClient.$queryRaw<
174171
{ hash: string }[]
175172
>`SELECT b1."hash" FROM "Block" b1
@@ -185,6 +182,22 @@ export class PrismaBlockStorage
185182
});
186183
}
187184

185+
public async getLatestBlock(): Promise<BlockWithResult | undefined> {
186+
const result = await this.getLatestBlockAndResult();
187+
if (result !== undefined) {
188+
if (result.result === undefined) {
189+
throw new Error(
190+
`Block result for block ${result.block.height.toString()} not found`
191+
);
192+
}
193+
return {
194+
block: result.block,
195+
result: result.result,
196+
};
197+
}
198+
return result;
199+
}
200+
188201
public async getNewBlocks(): Promise<BlockWithPreviousResult[]> {
189202
const blocks = await this.connection.prismaClient.block.findMany({
190203
where: {

0 commit comments

Comments
 (0)