Skip to content

Commit 2d9e7e2

Browse files
authored
WorkPackagePool refactor (#291)
* rename * rename * RefWithHash * WorkPackageRef * work package refactor
1 parent 73eab47 commit 2d9e7e2

File tree

12 files changed

+157
-127
lines changed

12 files changed

+157
-127
lines changed

Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ public enum RuntimeEvents {
5252
public let block: BlockRef
5353
}
5454

55-
// New WorkPackagesGenerated by Guaranteeing Service
56-
public struct WorkPackagesGenerated: Event {
57-
public let items: [WorkPackage]
55+
// New WorkPackagesReceived by Guaranteeing Service
56+
public struct WorkPackagesReceived: Event {
57+
public let items: [WorkPackageRef]
5858
}
5959

6060
// WorkPackages Finalize by WorkPackages Service
6161
public struct WorkPackagesFinalized: Event {
62-
public let items: [WorkPackage]
62+
public let items: [WorkPackageRef]
6363
}
6464

6565
// New WorkReportGenerated by Guaranteeing Service
@@ -69,6 +69,6 @@ public enum RuntimeEvents {
6969

7070
// New GuaranteeGenerated by Guaranteeing Service
7171
public struct GuaranteeGenerated: Event {
72-
public let items: [WorkPackage]
72+
public let items: [WorkPackageRef]
7373
}
7474
}

Blockchain/Sources/Blockchain/Types/Block.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,9 @@ extension Block: Dummy {
2727
}
2828

2929
extension Block: Validate {}
30+
31+
extension Block: Hashable32 {
32+
public func hash() -> Data32 {
33+
header.hash()
34+
}
35+
}

Blockchain/Sources/Blockchain/Types/BlockRef.swift

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,6 @@
11
import Utils
22

3-
public final class BlockRef: Ref<Block>, @unchecked Sendable {
4-
public required init(_ value: Block) {
5-
lazyHash = Lazy {
6-
Ref(value.header.hash())
7-
}
8-
9-
super.init(value)
10-
}
11-
12-
private let lazyHash: Lazy<Ref<Data32>>
13-
14-
public var hash: Data32 {
15-
lazyHash.value.value
16-
}
17-
3+
public final class BlockRef: RefWithHash<Block>, @unchecked Sendable {
184
public var header: Header { value.header }
195
public var extrinsic: Extrinsic { value.extrinsic }
206

Blockchain/Sources/Blockchain/Types/Header.swift

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -136,39 +136,14 @@ extension Header: Hashable {
136136
}
137137
}
138138

139-
extension Header {
140-
public func asRef() -> HeaderRef {
141-
HeaderRef(self)
142-
}
143-
}
144-
145-
public final class HeaderRef: Ref<Header>, @unchecked Sendable {
146-
public required init(_ value: Header) {
147-
lazyHash = Lazy {
148-
Ref(value.hash())
139+
extension Header: Hashable32 {
140+
public func hash() -> Data32 {
141+
do {
142+
return try JamEncoder.encode(self).blake2b256hash()
143+
} catch {
144+
logger.error("Failed to encode header, returning empty hash", metadata: ["error": "\(error)"])
145+
return Data32()
149146
}
150-
151-
super.init(value)
152-
}
153-
154-
private let lazyHash: Lazy<Ref<Data32>>
155-
156-
public var hash: Data32 {
157-
lazyHash.value.value
158-
}
159-
160-
override public var description: String {
161-
"Header(hash: \(hash), timeslot: \(value.timeslot))"
162-
}
163-
}
164-
165-
extension HeaderRef: Codable {
166-
public convenience init(from decoder: Decoder) throws {
167-
try self.init(.init(from: decoder))
168-
}
169-
170-
public func encode(to encoder: Encoder) throws {
171-
try value.encode(to: encoder)
172147
}
173148
}
174149

@@ -200,15 +175,6 @@ extension Header: Dummy {
200175
}
201176

202177
extension Header {
203-
public func hash() -> Data32 {
204-
do {
205-
return try JamEncoder.encode(self).blake2b256hash()
206-
} catch {
207-
logger.error("Failed to encode header, returning empty hash", metadata: ["error": "\(error)"])
208-
return Data32()
209-
}
210-
}
211-
212178
public var parentHash: Data32 { unsigned.parentHash }
213179
public var priorStateRoot: Data32 { unsigned.priorStateRoot }
214180
public var extrinsicsHash: Data32 { unsigned.extrinsicsHash }
@@ -231,3 +197,25 @@ extension Header: Validate {
231197
}
232198
}
233199
}
200+
201+
extension Header {
202+
public func asRef() -> HeaderRef {
203+
HeaderRef(self)
204+
}
205+
}
206+
207+
public final class HeaderRef: RefWithHash<Header>, @unchecked Sendable {
208+
override public var description: String {
209+
"Header(hash: \(hash), timeslot: \(value.timeslot))"
210+
}
211+
}
212+
213+
extension HeaderRef: Codable {
214+
public convenience init(from decoder: Decoder) throws {
215+
try self.init(.init(from: decoder))
216+
}
217+
218+
public func encode(to encoder: Encoder) throws {
219+
try value.encode(to: encoder)
220+
}
221+
}

Blockchain/Sources/Blockchain/Types/WorkPackage.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public struct WorkPackage: Comparable, Sendable, Equatable, Codable {
6060
}
6161
}
6262

63-
extension WorkPackage {
63+
extension WorkPackage: Hashable32 {
6464
public func hash() -> Data32 {
6565
try! JamEncoder.encode(self).blake2b256hash()
6666
}
@@ -96,3 +96,11 @@ extension WorkPackage {
9696
) ?? Data()
9797
}
9898
}
99+
100+
extension WorkPackage {
101+
public func asRef() -> WorkPackageRef {
102+
WorkPackageRef(self)
103+
}
104+
}
105+
106+
public typealias WorkPackageRef = RefWithHash<WorkPackage>
Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,22 @@
11
import TracingUtils
22
import Utils
33

4+
enum WorkPackageStatus {
5+
case pending
6+
// work report is generated and waiting for assursor to make it available
7+
case refined
8+
// work report is available and waiting for the work report to be accurmulated
9+
case assured
10+
}
11+
12+
private struct WorkPackageInfo {
13+
let workPackage: WorkPackageRef
14+
var status: WorkPackageStatus
15+
}
16+
417
private actor WorkPackageStorage {
518
var logger: Logger!
6-
var workPackages: SortedUniqueArray<WorkPackage> = .init()
19+
var workPackages: [Data32: WorkPackageInfo] = [:]
720

821
let ringContext: Bandersnatch.RingContext
922
var verifier: Bandersnatch.Verifier!
@@ -18,29 +31,29 @@ private actor WorkPackageStorage {
1831

1932
func update(state _: StateRef, config _: ProtocolConfigRef) throws {}
2033

21-
func add(packages: [WorkPackage], config: ProtocolConfigRef) {
34+
func add(packages: [WorkPackageRef], config: ProtocolConfigRef) {
2235
for package in packages {
2336
guard validatePackage(package, config: config) else {
2437
logger.warning("Invalid work package: \(package)")
2538
continue
2639
}
27-
workPackages.append(contentsOf: [package])
40+
workPackages[package.hash] = WorkPackageInfo(workPackage: package, status: .pending)
2841
}
2942
}
3043

31-
private func validatePackage(_: WorkPackage, config _: ProtocolConfigRef) -> Bool {
44+
private func validatePackage(_: WorkPackageRef, config _: ProtocolConfigRef) -> Bool {
3245
// TODO: add validate logic
3346
true
3447
}
3548

36-
func removeWorkPackages(_ packages: [WorkPackage]) {
37-
workPackages.remove { workPackage in
38-
packages.contains { $0 == workPackage }
49+
func packageRefined(packageHashes: [Data32]) {
50+
for hash in packageHashes {
51+
workPackages[hash]?.status = .refined
3952
}
4053
}
4154

42-
func getWorkPackages() -> SortedUniqueArray<WorkPackage> {
43-
workPackages
55+
func getPendingPackages() -> [WorkPackageRef] {
56+
workPackages.values.filter { $0.status == .pending }.map(\.workPackage)
4457
}
4558
}
4659

@@ -61,32 +74,24 @@ public final class WorkPackagePoolService: ServiceBase, @unchecked Sendable {
6174
super.init(id: "WorkPackagePoolService", config: config, eventBus: eventBus)
6275
await storage.setLogger(logger)
6376

64-
await subscribe(RuntimeEvents.WorkPackagesGenerated.self, id: "WorkPackagePool.WorkPackagesGenerated") { [weak self] event in
77+
await subscribe(RuntimeEvents.WorkPackagesReceived.self, id: "WorkPackagePool.WorkPackagesReceived") { [weak self] event in
78+
try await self?.on(workPackagesReceived: event)
79+
}
80+
81+
await subscribe(RuntimeEvents.WorkReportGenerated.self, id: "WorkPackagePool.WorkPackagesReceived") { [weak self] event in
6582
try await self?.on(workPackagesGenerated: event)
6683
}
67-
// TODO: add remove subscribe
68-
// TODO: add receive subscribe?
6984
}
7085

71-
private func on(workPackagesGenerated event: RuntimeEvents.WorkPackagesGenerated) async throws {
72-
let state = try await dataProvider.getBestState()
73-
try await storage.update(state: state, config: config)
86+
private func on(workPackagesReceived event: RuntimeEvents.WorkPackagesReceived) async throws {
7487
await storage.add(packages: event.items, config: config)
7588
}
7689

77-
public func update(state: StateRef, config: ProtocolConfigRef) async throws {
78-
try await storage.update(state: state, config: config)
79-
}
80-
81-
public func addWorkPackages(packages: [WorkPackage]) async throws {
82-
await storage.add(packages: packages, config: config)
83-
}
84-
85-
public func removeWorkPackages(packages: [WorkPackage]) async throws {
86-
await storage.removeWorkPackages(packages)
90+
private func on(workPackagesGenerated event: RuntimeEvents.WorkReportGenerated) async throws {
91+
await storage.packageRefined(packageHashes: event.items.map(\.packageSpecification.workPackageHash))
8792
}
8893

89-
public func getWorkPackages() async -> SortedUniqueArray<WorkPackage> {
90-
await storage.getWorkPackages()
94+
public func getPendingPackages() async -> [WorkPackageRef] {
95+
await storage.getPendingPackages()
9196
}
9297
}

Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,10 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
8383
throw GuaranteeingServiceError.invalidCoreIndex
8484
}
8585

86-
let workPackages = await workPackagePool.getWorkPackages()
87-
for workPackage in workPackages.array {
86+
let workPackages = await workPackagePool.getPendingPackages()
87+
for workPackage in workPackages {
8888
if try validate(workPackage: workPackage) {
8989
let workReport = try await createWorkReport(for: workPackage, coreIndex: coreIndex)
90-
try await workPackagePool.removeWorkPackages(packages: [workPackage])
9190
let event = RuntimeEvents.WorkReportGenerated(items: [workReport])
9291
publish(event)
9392
break
@@ -98,9 +97,9 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
9897
}
9998

10099
// workpackage -> workresult -> workreport
101-
private func createWorkReport(for workPackage: WorkPackage, coreIndex: CoreIndex) async throws -> WorkReport {
100+
private func createWorkReport(for workPackage: WorkPackageRef, coreIndex: CoreIndex) async throws -> WorkReport {
102101
let state = try await dataProvider.getState(hash: dataProvider.bestHead.hash)
103-
let packageHash = workPackage.hash()
102+
let packageHash = workPackage.hash
104103
let corePool = state.value.coreAuthorizationPool[coreIndex]
105104
let authorizerHash = try corePool.array.first.unwrap(orError: GuaranteeingServiceError.noAuthorizerHash)
106105
var exportSegmentOffset: UInt16 = 0
@@ -116,7 +115,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
116115

117116
var exportSegments = [Data4104]()
118117

119-
for item in workPackage.workItems {
118+
for item in workPackage.value.workItems {
120119
// TODO: make this lazy. only fetch when needed by PVM
121120
let importSegments = try await dataAvailability.fetchSegment(segments: item.inputs)
122121

@@ -128,12 +127,12 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
128127
.invoke(
129128
config: config,
130129
serviceAccounts: state.value,
131-
codeHash: workPackage.authorizationCodeHash,
130+
codeHash: workPackage.value.authorizationCodeHash,
132131
gas: item.refineGasLimit,
133132
service: item.serviceIndex,
134133
workPackageHash: packageHash,
135134
workPayload: item.payloadBlob,
136-
refinementCtx: workPackage.context,
135+
refinementCtx: workPackage.value.context,
137136
authorizerHash: authorizerHash,
138137
authorizationOutput: authorizationOutput,
139138
importSegments: importSegments,
@@ -144,7 +143,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
144143
exportSegmentOffset += item.outputDataSegmentsCount
145144
let workResult = WorkResult(
146145
serviceIndex: item.serviceIndex,
147-
codeHash: workPackage.authorizationCodeHash,
146+
codeHash: workPackage.value.authorizationCodeHash,
148147
payloadHash: item.payloadBlob.blake2b256hash(),
149148
gas: item.refineGasLimit,
150149
output: WorkOutput(refineRes.result)
@@ -159,7 +158,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
159158
}
160159

161160
let (erasureRoot, length) = try await dataAvailability.exportWorkpackageBundle(bundle: WorkPackageBundle(
162-
workPackage: workPackage,
161+
workPackage: workPackage.value,
163162
extrinsic: [], // TODO: get extrinsic data
164163
importSegments: [],
165164
justifications: []
@@ -184,7 +183,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
184183
authorizerHash: authorizerHash,
185184
coreIndex: coreIndex,
186185
authorizationOutput: authorizationOutput,
187-
refinementContext: workPackage.context,
186+
refinementContext: workPackage.value.context,
188187
packageSpecification: packageSpecification,
189188
lookup: oldLookups,
190189
results: ConfigLimitedSizeArray(config: config, array: workResults)
@@ -196,7 +195,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
196195
}
197196
}
198197

199-
private func validate(workPackage _: WorkPackage) throws -> Bool {
198+
private func validate(workPackage _: WorkPackageRef) throws -> Bool {
200199
// TODO: Add validate func
201200
true
202201
}

Blockchain/Tests/BlockchainTests/GuaranteeingServiceTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ struct GuaranteeingServiceTests {
4444
let storeMiddleware = services.storeMiddleware
4545
let scheduler = services.scheduler
4646

47-
var allWorkPackages = [WorkPackage]()
47+
var allWorkPackages = [WorkPackageRef]()
4848
for _ in 0 ..< services.config.value.totalNumberOfCores {
4949
let workpackage = WorkPackage(
5050
authorizationToken: Data(),
@@ -54,9 +54,9 @@ struct GuaranteeingServiceTests {
5454
context: RefinementContext.dummy(config: services.config),
5555
workItems: try! ConfigLimitedSizeArray(config: services.config, defaultValue: WorkItem.dummy(config: services.config))
5656
)
57-
allWorkPackages.append(workpackage)
57+
allWorkPackages.append(workpackage.asRef())
5858
}
59-
await services.eventBus.publish(RuntimeEvents.WorkPackagesGenerated(items: allWorkPackages))
59+
await services.eventBus.publish(RuntimeEvents.WorkPackagesReceived(items: allWorkPackages))
6060
await validatorService.on(genesis: genesisState)
6161
await storeMiddleware.wait()
6262
#expect(scheduler.taskCount == 1)

0 commit comments

Comments
 (0)