Skip to content

Commit e2bd8c4

Browse files
MacOMNImackunxlc
authored
update shardDistribution (#336)
* update Work-report distribution * update workReportRequest * update WorkReportRef * update blockchain * update workreport * update swift testing * update package * fix xcode udpate issues * update tests * update guaranteedWorkReport * update work report * update db * update OnSyncCompleted * update networkmanager * update block request * update more tests * update open rpc * fix some unstable tests * fix some issues * fix open rpc * update OpenRPC * update OpenRPC * update open rpc * update swiftlint * update OpenRPC * update OpenRPC * update vapor * update vapor * fix unstable tests * fix unstable tests * fix unstable tests * update some issues * update networkmanager * update vapor * update vapor * update rpc package * update swift test * update swift pm * update package * update swift testing * update more tests * update swift testing * update local * update package * update handleShardDistributionReceived * update networkmanager * update shard * update data service * update service * update data service * update TODO * update rpc * fix some issues * remove bad test --------- Co-authored-by: mackun <[email protected]> Co-authored-by: Bryan Chen <[email protected]>
1 parent 5999c93 commit e2bd8c4

File tree

16 files changed

+280
-92
lines changed

16 files changed

+280
-92
lines changed

Blockchain/Package.resolved

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,14 @@ extension BlockchainDataProvider {
172172

173173
public func remove(hash: Data32) async throws {
174174
logger.debug("removing block: \(hash)")
175-
176175
try await dataProvider.remove(hash: hash)
177176
}
178177

178+
public func remove(workReportHash: Data32) async throws {
179+
logger.debug("removing workReportHash: \(workReportHash)")
180+
try await dataProvider.remove(workReportHash: workReportHash)
181+
}
182+
179183
public nonisolated var genesisBlockHash: Data32 {
180184
dataProvider.genesisBlockHash
181185
}

Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,8 @@ public protocol BlockchainDataProviderProtocol: Sendable {
4040
/// remove header, block, workReport, state
4141
func remove(hash: Data32) async throws
4242

43+
/// remove workReport
44+
func remove(workReportHash: Data32) async throws
45+
4346
var genesisBlockHash: Data32 { get }
4447
}

Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol {
129129
heads.insert(hash)
130130
}
131131

132+
public func remove(workReportHash hash: Data32) {
133+
guaranteedWorkReports.removeValue(forKey: hash)
134+
}
135+
132136
public func remove(hash: Data32) {
133137
let timeslot = blockByHash[hash]?.header.timeslot ?? stateByBlockHash[hash]?.value.timeslot
134138
stateByBlockHash.removeValue(forKey: hash)

Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,9 @@ public enum RuntimeEvents {
261261

262262
public struct ShardDistributionReceived: Event {
263263
public var erasureRoot: Data32
264-
public var shardIndex: UInt32
264+
public var shardIndex: UInt16
265265

266-
public init(erasureRoot: Data32, shardIndex: UInt32) {
266+
public init(erasureRoot: Data32, shardIndex: UInt16) {
267267
self.erasureRoot = erasureRoot
268268
self.shardIndex = shardIndex
269269
}
@@ -275,7 +275,7 @@ public enum RuntimeEvents {
275275

276276
// Response to shard distribution
277277
public struct ShardDistributionReceivedResponse: Event {
278-
public var requestId: Data32
278+
public let requestId: Data32
279279

280280
public let result: Result<(bundleShard: Data, segmentShards: [Data], justification: Justification), Error>
281281

Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift

Lines changed: 112 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ public enum DataAvailabilityError: Error {
1919
case invalidWorkReportSlot
2020
case invalidWorkReport
2121
case insufficientSignatures
22+
case invalidMerklePath
23+
case emptySegmentShards
24+
case invalidJustificationFormat
2225
}
2326

2427
public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, OnSyncCompleted {
@@ -47,12 +50,21 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
4750
await subscribe(RuntimeEvents.WorkReportReceived.self, id: "DataAvailabilityService.WorkReportReceived") { [weak self] event in
4851
await self?.handleWorkReportReceived(event)
4952
}
53+
await subscribe(RuntimeEvents.ShardDistributionReceived.self,
54+
id: "DataAvailabilityService.ShardDistributionReceived")
55+
{ [weak self] event in
56+
await self?.handleShardDistributionReceived(event)
57+
}
5058
}
5159

5260
public func handleWorkReportReceived(_ event: RuntimeEvents.WorkReportReceived) async {
5361
await workReportDistribution(workReport: event.workReport, slot: event.slot, signatures: event.signatures)
5462
}
5563

64+
public func handleShardDistributionReceived(_ event: RuntimeEvents.ShardDistributionReceived) async {
65+
try? await shardDistribution(erasureRoot: event.erasureRoot, shardIndex: event.shardIndex)
66+
}
67+
5668
/// Purge old data from the data availability stores
5769
/// - Parameter epoch: The current epoch index
5870
public func purge(epoch _: EpochIndex) async {
@@ -130,21 +142,18 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
130142
/// - Parameter bundle: The bundle to export
131143
/// - Returns: The erasure root and length of the bundle
132144
public func exportWorkpackageBundle(bundle: WorkPackageBundle) async throws -> (erasureRoot: Data32, length: DataLength) {
133-
// 1. Serialize the bundle
145+
// Serialize the bundle
134146
let serializedData = try JamEncoder.encode(bundle)
135147
let dataLength = DataLength(UInt32(serializedData.count))
136148

137-
// 2. Calculate the erasure root
138-
// TODO: replace this with real implementation
139-
let erasureRoot = serializedData.blake2b256hash()
140-
141-
// 3. Extract the work package hash from the bundle
142-
let workPackageHash = bundle.workPackage.hash()
143-
144-
// 4. Store the serialized bundle in the audit store (short-term storage)
145-
146-
// chunk the bundle into segments
147-
149+
// Calculate the erasure root
150+
// Work-package bundle shard hash
151+
let bundleShards = try ErasureCoding.chunk(
152+
data: serializedData,
153+
basicSize: config.value.erasureCodedPieceSize,
154+
recoveryCount: config.value.totalNumberOfValidators
155+
)
156+
// Chunk the bundle into segments
148157
let segmentCount = serializedData.count / 4104
149158
var segments = [Data4104]()
150159
for i in 0 ..< segmentCount {
@@ -159,19 +168,32 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
159168
segments.append(Data4104(segment)!)
160169
}
161170

171+
// Calculate the segments root
172+
let segmentsRoot = Merklization.constantDepthMerklize(segments.map(\.data))
173+
174+
var nodes = [Data]()
175+
// workpackage bundle shard hash + segment shard root
176+
for i in 0 ..< bundleShards.count {
177+
let shardHash = bundleShards[i].blake2b256hash()
178+
try nodes.append(JamEncoder.encode(shardHash) + JamEncoder.encode(segmentsRoot))
179+
}
180+
181+
// ErasureRoot
182+
let erasureRoot = Merklization.binaryMerklize(nodes)
183+
184+
// Extract the work package hash from the bundle
185+
let workPackageHash = bundle.workPackage.hash()
186+
187+
// Store the serialized bundle in the audit store (short-term storage)
162188
// Store the segment in the data store
163189
for (i, segment) in segments.enumerated() {
164190
try await dataStore.set(data: segment, erasureRoot: erasureRoot, index: UInt16(i))
165191
}
166192

167-
// 5. Calculate the segments root
168-
// TODO: replace this with real implementation
169-
let segmentsRoot = serializedData.blake2b256hash()
170-
171-
// 6. Map the work package hash to the segments root
193+
// Map the work package hash to the segments root
172194
try await dataStore.setSegmentRoot(segmentRoot: segmentsRoot, forWorkPackageHash: workPackageHash)
173195

174-
// 7. Set the timestamp for retention tracking
196+
// Set the timestamp for retention tracking
175197
// As per GP 14.3.1, items in the audit store are kept until finality (approx. 1 hour)
176198
let currentTimestamp = Date()
177199
try await dataStore.setTimestamp(erasureRoot: erasureRoot, timestamp: currentTimestamp)
@@ -238,23 +260,79 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
238260

239261
// MARK: - Shard Distribution (CE 137)
240262

241-
/// Distribute shards to validators
242-
/// - Parameters:
243-
/// - shards: The shards to distribute
244-
/// - erasureRoot: The erasure root of the data
245-
/// - validators: The validators to distribute to
246-
/// - Returns: Success status of the distribution
247-
public func distributeShards(
248-
shards _: [Data4104],
263+
public func shardDistribution(
264+
erasureRoot: Data32,
265+
shardIndex: UInt16
266+
) async throws {
267+
// Generate request ID
268+
let requestId = try JamEncoder.encode(erasureRoot, shardIndex).blake2b256hash()
269+
do {
270+
// TODO: Fetch shard data from local storage
271+
let (bundleShard, segmentShards) = (Data(), [Data()])
272+
273+
// Generate Merkle proof justification
274+
let justification = try await generateJustification(
275+
erasureRoot: erasureRoot,
276+
shardIndex: shardIndex,
277+
bundleShard: bundleShard,
278+
segmentShards: segmentShards
279+
)
280+
281+
// Respond with shards + proof
282+
publish(RuntimeEvents.ShardDistributionReceivedResponse(
283+
requestId: requestId,
284+
bundleShard: bundleShard,
285+
segmentShards: segmentShards,
286+
justification: justification
287+
))
288+
289+
} catch {
290+
publish(RuntimeEvents.ShardDistributionReceivedResponse(
291+
requestId: requestId,
292+
error: error
293+
))
294+
}
295+
}
296+
297+
private func generateJustification(
249298
erasureRoot _: Data32,
250-
validators _: [ValidatorIndex]
251-
) async throws -> Bool {
252-
// TODO: Implement shard distribution to validators
253-
// 1. Determine which shards go to which validators
254-
// 2. Send shards to validators over the network
255-
// 3. Track distribution status
256-
// 4. Return success status
257-
throw DataAvailabilityError.distributionError
299+
shardIndex: UInt16,
300+
bundleShard _: Data,
301+
segmentShards: [Data]
302+
) async throws -> Justification {
303+
guard !segmentShards.isEmpty else {
304+
throw DataAvailabilityError.emptySegmentShards
305+
}
306+
307+
// GP T(s,i,H)
308+
let merklePath = Merklization.trace(
309+
segmentShards,
310+
index: Int(shardIndex),
311+
hasher: Blake2b256.self
312+
)
313+
314+
// TODO: Got Justification
315+
switch merklePath.count {
316+
case 1:
317+
// 0 ++ Hash
318+
guard case let .right(hash) = merklePath.first! else {
319+
throw DataAvailabilityError.invalidMerklePath
320+
}
321+
return .singleHash(hash)
322+
323+
case 2:
324+
// 1 ++ Hash ++ Hash
325+
guard case let .right(hash1) = merklePath[0],
326+
case let .right(hash2) = merklePath[1]
327+
else {
328+
throw DataAvailabilityError.invalidMerklePath
329+
}
330+
return .doubleHash(hash1, hash2)
331+
332+
default:
333+
// TODO: 2 ++ Segment Shard (12 bytes)
334+
return .segmentShard(Data12())
335+
}
258336
}
259337

260338
// MARK: - Audit Shard Requests (CE 138)

Database/Sources/Database/RocksDBBackend.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
225225
// TODO: batch delete
226226

227227
try blocks.delete(key: hash)
228-
try guaranteedWorkReports.delete(key: hash)
229228
if let block = try await getBlock(hash: hash) {
230229
try blockHashByTimeslot.delete(key: block.header.timeslot)
231230
}
@@ -235,6 +234,11 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
235234
}
236235
try blockNumberByHash.delete(key: hash)
237236
}
237+
238+
public func remove(workReportHash: Data32) async throws {
239+
logger.trace("remove() \(workReportHash)")
240+
try guaranteedWorkReports.delete(key: workReportHash)
241+
}
238242
}
239243

240244
extension RocksDBBackend: StateBackendProtocol {

Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import Foundation
2-
import MsQuicSwift
3-
@testable import Networking
42
import Testing
3+
import TracingUtils
4+
import Utils
5+
6+
@testable import MsQuicSwift
57

68
struct NetAddrTests {
79
@Test

Node/Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ let package = Package(
2222
.package(path: "../TracingUtils"),
2323
.package(path: "../Utils"),
2424
.package(path: "../Database"),
25-
.package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.3"),
25+
.package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.0"),
2626
.package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"),
2727
],
2828
targets: [

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/ShardDistributionMessage.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import Utils
55

66
public struct ShardDistributionMessage: Codable, Sendable, Equatable, Hashable {
77
public var erasureRoot: Data32
8-
public var shardIndex: UInt32
8+
public var shardIndex: UInt16
99

10-
public init(erasureRoot: Data32, shardIndex: UInt32) {
10+
public init(erasureRoot: Data32, shardIndex: UInt16) {
1111
self.erasureRoot = erasureRoot
1212
self.shardIndex = shardIndex
1313
}

0 commit comments

Comments
 (0)