Skip to content

Commit 58d69b9

Browse files
authored
Merge pull request #233 from proto-kit/fix/atomic-block-production-db
Created transactions on Database to do atomic writes for blocks, results and batches
2 parents 6f782d5 + dfab89c commit 58d69b9

File tree

12 files changed

+257
-77
lines changed

12 files changed

+257
-77
lines changed

packages/persistance/src/PrismaDatabaseConnection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,8 @@ export class PrismaDatabaseConnection
137137
public async close() {
138138
await this.prismaClient.$disconnect();
139139
}
140+
141+
public async executeInTransaction(f: () => Promise<void>) {
142+
await this.prismaClient.$transaction(f);
143+
}
140144
}

packages/persistance/src/PrismaRedisDatabase.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
RedisConnection,
1818
RedisConnectionConfig,
1919
RedisConnectionModule,
20+
RedisTransaction,
2021
} from "./RedisConnection";
2122

2223
export interface PrismaRedisCombinedConfig {
@@ -47,6 +48,10 @@ export class PrismaRedisDatabase
4748
return this.redis.redisClient;
4849
}
4950

51+
public get currentMulti(): RedisTransaction {
52+
return this.redis.currentMulti;
53+
}
54+
5055
public create(childContainerProvider: ChildContainerProvider) {
5156
super.create(childContainerProvider);
5257
this.prisma.create(childContainerProvider);
@@ -77,4 +82,12 @@ export class PrismaRedisDatabase
7782
await this.prisma.pruneDatabase();
7883
await this.redis.pruneDatabase();
7984
}
85+
86+
public async executeInTransaction(f: () => Promise<void>) {
87+
// TODO Long-term we want to somehow make sure we can rollback one data source
88+
// if commiting the other one's transaction fails
89+
await this.prisma.executeInTransaction(async () => {
90+
await this.redis.executeInTransaction(f);
91+
});
92+
}
8093
}

packages/persistance/src/RedisConnection.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ export interface RedisConnectionConfig {
1414
username?: string;
1515
}
1616

17+
export type RedisTransaction = ReturnType<RedisClientType["multi"]>;
18+
1719
export interface RedisConnection {
1820
get redisClient(): RedisClientType;
21+
get currentMulti(): RedisTransaction;
1922
}
2023

2124
export class RedisConnectionModule
@@ -82,4 +85,20 @@ export class RedisConnectionModule
8285
public async pruneDatabase() {
8386
await this.redisClient.flushDb();
8487
}
88+
89+
private multi?: RedisTransaction;
90+
91+
public get currentMulti() {
92+
if (this.multi === undefined) {
93+
throw new Error("Redis multi was access outside of a transaction");
94+
}
95+
return this.multi;
96+
}
97+
98+
public async executeInTransaction(f: () => Promise<void>) {
99+
this.multi = this.redisClient.multi();
100+
await f();
101+
await this.multi.exec();
102+
this.multi = undefined;
103+
}
85104
}

packages/persistance/src/services/prisma/PrismaBlockStorage.ts

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -98,45 +98,42 @@ export class PrismaBlockStorage
9898

9999
const { prismaClient } = this.connection;
100100

101-
await prismaClient.$transaction([
102-
prismaClient.transaction.createMany({
103-
data: block.transactions.map((txr) =>
104-
this.transactionMapper.mapOut(txr.tx)
105-
),
106-
skipDuplicates: true,
107-
}),
101+
await prismaClient.transaction.createMany({
102+
data: block.transactions.map((txr) =>
103+
this.transactionMapper.mapOut(txr.tx)
104+
),
105+
skipDuplicates: true,
106+
});
108107

109-
prismaClient.block.create({
110-
data: {
111-
...encodedBlock,
112-
beforeNetworkState:
113-
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
114-
duringNetworkState:
115-
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
108+
await prismaClient.block.create({
109+
data: {
110+
...encodedBlock,
111+
beforeNetworkState:
112+
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
113+
duringNetworkState:
114+
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
116115

117-
transactions: {
118-
createMany: {
119-
data: transactions.map((tx) => {
120-
return {
121-
status: tx.status,
122-
statusMessage: tx.statusMessage,
123-
txHash: tx.txHash,
116+
transactions: {
117+
createMany: {
118+
data: transactions.map((tx) => {
119+
return {
120+
status: tx.status,
121+
statusMessage: tx.statusMessage,
122+
txHash: tx.txHash,
124123

125-
stateTransitions:
126-
tx.stateTransitions as Prisma.InputJsonArray,
127-
protocolTransitions:
128-
tx.protocolTransitions as Prisma.InputJsonArray,
129-
events: tx.events as Prisma.InputJsonArray,
130-
};
131-
}),
132-
skipDuplicates: true,
133-
},
124+
stateTransitions: tx.stateTransitions as Prisma.InputJsonArray,
125+
protocolTransitions:
126+
tx.protocolTransitions as Prisma.InputJsonArray,
127+
events: tx.events as Prisma.InputJsonArray,
128+
};
129+
}),
130+
skipDuplicates: true,
134131
},
135-
136-
batchHeight: undefined,
137132
},
138-
}),
139-
]);
133+
134+
batchHeight: undefined,
135+
},
136+
});
140137
}
141138

142139
public async pushResult(result: BlockResult): Promise<void> {

packages/persistance/src/services/prisma/PrismaMessageStorage.ts

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,25 +51,24 @@ export class PrismaMessageStorage implements MessageStorage {
5151
);
5252

5353
const { prismaClient } = this.connection;
54-
await prismaClient.$transaction([
55-
prismaClient.transaction.createMany({
56-
data: transactions,
57-
skipDuplicates: true,
58-
}),
5954

60-
prismaClient.incomingMessageBatch.create({
61-
data: {
62-
fromMessageHash,
63-
toMessageHash,
64-
messages: {
65-
createMany: {
66-
data: transactions.map((transaction) => ({
67-
transactionHash: transaction.hash,
68-
})),
69-
},
55+
await prismaClient.transaction.createMany({
56+
data: transactions,
57+
skipDuplicates: true,
58+
});
59+
60+
await prismaClient.incomingMessageBatch.create({
61+
data: {
62+
fromMessageHash,
63+
toMessageHash,
64+
messages: {
65+
createMany: {
66+
data: transactions.map((transaction) => ({
67+
transactionHash: transaction.hash,
68+
})),
7069
},
7170
},
72-
}),
73-
]);
71+
},
72+
});
7473
}
7574
}

packages/persistance/src/services/prisma/PrismaStateService.ts

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,17 @@ export class PrismaStateService implements AsyncStateService {
3636
mask: this.mask,
3737
}));
3838

39-
await prismaClient.$transaction([
40-
prismaClient.state.deleteMany({
41-
where: {
42-
path: {
43-
in: this.cache.map((x) => new Decimal(x.key.toString())),
44-
},
45-
mask: this.mask,
39+
await prismaClient.state.deleteMany({
40+
where: {
41+
path: {
42+
in: this.cache.map((x) => new Decimal(x.key.toString())),
4643
},
47-
}),
48-
prismaClient.state.createMany({
49-
data,
50-
}),
51-
]);
44+
mask: this.mask,
45+
},
46+
});
47+
await prismaClient.state.createMany({
48+
data,
49+
});
5250

5351
this.cache = [];
5452
}

packages/persistance/src/services/redis/RedisMerkleTreeStore.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore {
3434
}
3535

3636
try {
37-
await this.connection.redisClient.mSet(array.flat(1));
37+
this.connection.currentMulti.mSet(array.flat(1));
3838
} catch (error) {
3939
log.error(error);
4040
}
@@ -62,8 +62,8 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore {
6262
public writeNodes(nodes: MerkleTreeNode[]): void {
6363
this.cache = this.cache.concat(nodes);
6464
// TODO Filter distinct
65-
// We might not even need this, since the distinctness filter might already
66-
// be implicitely done by the layer above (i.e. cachedmtstore)
65+
// We might not even need this, since the distinctness filter might already
66+
// be implicitely done by the layer above (i.e. cachedmtstore)
6767

6868
// Leaving this for now until I get to implementing it
6969
// const concat = this.cache.concat(nodes);

packages/sequencer/src/protocol/production/BatchProducerModule.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { AsyncStateService } from "../../state/async/AsyncStateService";
2222
import { AsyncMerkleTreeStore } from "../../state/async/AsyncMerkleTreeStore";
2323
import { BlockResult, BlockWithResult } from "../../storage/model/Block";
2424
import { VerificationKeyService } from "../runtime/RuntimeVerificationKeyService";
25+
import type { Database } from "../../storage/Database";
2526

2627
import { BlockProverParameters } from "./tasks/BlockProvingTask";
2728
import { StateTransitionProofParameters } from "./tasks/StateTransitionTaskParameters";
@@ -81,6 +82,8 @@ export class BatchProducerModule extends SequencerModule {
8182
@inject("BatchStorage") private readonly batchStorage: BatchStorage,
8283
@inject("BlockTreeStore")
8384
private readonly blockTreeStore: AsyncMerkleTreeStore,
85+
@inject("Database")
86+
private readonly database: Database,
8487
private readonly traceService: TransactionTraceService,
8588
private readonly blockFlowService: BlockTaskFlowService,
8689
private readonly blockProofSerializer: BlockProofSerializer,
@@ -90,8 +93,11 @@ export class BatchProducerModule extends SequencerModule {
9093
}
9194

9295
private async applyStateChanges(batch: BatchMetadata) {
93-
await batch.stateService.mergeIntoParent();
94-
await batch.merkleStore.mergeIntoParent();
96+
// TODO Introduce Proven and Unproven BlockHashTree stores - for rollbacks
97+
await this.database.executeInTransaction(async () => {
98+
await batch.stateService.mergeIntoParent();
99+
await batch.merkleStore.mergeIntoParent();
100+
});
95101
}
96102

97103
/**

packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
} from "../../../storage/model/Block";
2626
import { CachedStateService } from "../../../state/state/CachedStateService";
2727
import { MessageStorage } from "../../../storage/repositories/MessageStorage";
28+
import { Database } from "../../../storage/Database";
2829

2930
import { TransactionExecutionService } from "./TransactionExecutionService";
3031

@@ -51,7 +52,8 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
5152
private readonly executionService: TransactionExecutionService,
5253
@inject("MethodIdResolver")
5354
private readonly methodIdResolver: MethodIdResolver,
54-
@inject("Runtime") private readonly runtime: Runtime<RuntimeModulesRecord>
55+
@inject("Runtime") private readonly runtime: Runtime<RuntimeModulesRecord>,
56+
@inject("Database") private readonly database: Database
5557
) {
5658
super();
5759
}
@@ -111,10 +113,12 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
111113
this.blockTreeStore
112114
);
113115

114-
await blockHashTreeStore.mergeIntoParent();
115-
await treeStore.mergeIntoParent();
116+
await this.database.executeInTransaction(async () => {
117+
await blockHashTreeStore.mergeIntoParent();
118+
await treeStore.mergeIntoParent();
116119

117-
await this.blockQueue.pushResult(result);
120+
await this.blockQueue.pushResult(result);
121+
});
118122

119123
return result;
120124
}
@@ -216,17 +220,18 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
216220
);
217221

218222
if (block !== undefined) {
219-
await cachedStateService.mergeIntoParent();
220-
221-
await this.blockQueue.pushBlock(block);
223+
await this.database.executeInTransaction(async () => {
224+
await cachedStateService.mergeIntoParent();
225+
await this.blockQueue.pushBlock(block);
226+
});
222227
}
223228

224229
this.productionInProgress = false;
225230

226231
return block;
227232
}
228233

229-
public async start() {
234+
public async blockResultCompleteCheck() {
230235
// Check if metadata height is behind block production.
231236
// This can happen when the sequencer crashes after a block has been produced
232237
// but before the metadata generation has finished
@@ -240,4 +245,8 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
240245
}
241246
// If we reach here, its a genesis startup, no blocks exist yet
242247
}
248+
249+
public async start() {
250+
await this.blockResultCompleteCheck();
251+
}
243252
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { StorageDependencyFactory } from "./StorageDependencyFactory";
1+
import type { StorageDependencyFactory } from "./StorageDependencyFactory";
22

33
export interface Database extends StorageDependencyFactory {
44
/**
@@ -7,4 +7,6 @@ export interface Database extends StorageDependencyFactory {
77
* everything else will lead to unexpected behaviour and errors
88
*/
99
pruneDatabase(): Promise<void>;
10+
11+
executeInTransaction(f: () => Promise<void>): Promise<void>;
1012
}

0 commit comments

Comments
 (0)