diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/AccumulateFunction.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/AccumulateFunction.swift index dc6a4641..458205c7 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/AccumulateFunction.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/AccumulateFunction.swift @@ -20,7 +20,7 @@ public struct OperandTuple: Codable { public var authorizerTrace: Data } -public struct DeferredTransfers: Codable { +public struct DeferredTransfers: Codable, Sendable { // s public var sender: ServiceIndex // d @@ -43,7 +43,7 @@ public struct DeferredTransfers: Codable { /// Characterization (i.e. values capable of representing) of state components /// which are both needed and mutable by the accumulation process. -public struct AccumulateState { +public struct AccumulateState: Sendable { /// d (all service accounts) public var accounts: ServiceAccountsMutRef /// i diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift index fd6751c9..40614f65 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift @@ -53,7 +53,7 @@ public struct ParallelAccumulationOutput { /// single-service accumulation function ∆1 output public typealias SingleAccumulationOutput = AccumulationResult -public struct ServicePreimagePair: Hashable { +public struct ServicePreimagePair: Hashable, Sendable { public var serviceIndex: ServiceIndex public var preimage: Data @@ -63,7 +63,7 @@ public struct ServicePreimagePair: Hashable { } } -public struct AccumulationResult { +public struct AccumulationResult: Sendable { // e public var state: AccumulateState // t @@ -76,21 +76,47 @@ public struct AccumulationResult { public var provide: Set } -public struct AccountChanges { +public struct AccountChanges: Sendable { public var newAccounts: [ServiceIndex: ServiceAccount] public var altered: Set - public var alterations: [(ServiceAccountsMutRef) async throws -> Void] + public var accountUpdates: [ServiceIndex: ServiceAccountDetails] + public var storageUpdates: [ServiceIndex: [Data: Data?]] + public var preimageUpdates: [ServiceIndex: [Data32: Data?]] + public var preimageInfoUpdates: [ServiceIndex: [Data32: (UInt32, StateKeys.ServiceAccountPreimageInfoKey.Value?)]] public var removed: Set public init() { newAccounts = [:] - alterations = [] altered = [] + accountUpdates = [:] + storageUpdates = [:] + preimageUpdates = [:] + preimageInfoUpdates = [:] removed = [] } - public mutating func addAlteration(index: ServiceIndex, _ alteration: @escaping (ServiceAccountsMutRef) async throws -> Void) { - alterations.append(alteration) + public mutating func addAccountUpdate(index: ServiceIndex, account: ServiceAccountDetails) { + accountUpdates[index] = account + altered.insert(index) + } + + public mutating func addStorageUpdate(index: ServiceIndex, key: Data, value: Data?) { + storageUpdates[index, default: [:]][key] = value + altered.insert(index) + } + + public mutating func addPreimageUpdate(index: ServiceIndex, hash: Data32, value: Data?) { + preimageUpdates[index, default: [:]][hash] = value + altered.insert(index) + } + + public mutating func addPreimageInfoUpdate( + index: ServiceIndex, + hash: Data32, + length: UInt32, + value: StateKeys.ServiceAccountPreimageInfoKey.Value? + ) { + preimageInfoUpdates[index, default: [:]][hash] = (length, value) altered.insert(index) } @@ -101,8 +127,23 @@ public struct AccountChanges { for index in removed { accounts.remove(serviceAccount: index) } - for alteration in alterations { - try await alteration(accounts) + for (index, account) in accountUpdates { + accounts.set(serviceAccount: index, account: account) + } + for (index, storage) in storageUpdates { + for (key, value) in storage { + try await accounts.set(serviceAccount: index, storageKey: key, value: value) + } + } + for (index, preimages) in preimageUpdates { + for (hash, value) in preimages { + accounts.set(serviceAccount: index, preimageHash: hash, value: value) + } + } + for (index, preimageInfos) in preimageInfoUpdates { + for (hash, (length, value)) in preimageInfos { + try await accounts.set(serviceAccount: index, preimageHash: hash, length: length, value: value) + } } } @@ -125,6 +166,25 @@ public struct AccountChanges { } altered.formUnion(other.altered) removed.formUnion(other.removed) + + for (index, account) in other.accountUpdates { + accountUpdates[index] = account + } + for (index, storage) in other.storageUpdates { + for (key, value) in storage { + storageUpdates[index, default: [:]][key] = value + } + } + for (index, preimages) in other.preimageUpdates { + for (hash, value) in preimages { + preimageUpdates[index, default: [:]][hash] = value + } + } + for (index, preimageInfos) in other.preimageInfoUpdates { + for (hash, info) in preimageInfos { + preimageInfoUpdates[index, default: [:]][hash] = info + } + } } } @@ -150,14 +210,14 @@ public typealias TransfersStats = [ServiceIndex: (UInt32, Gas)] extension Accumulation { /// single-service accumulate function ∆1 - private mutating func singleAccumulate( + private static func singleAccumulate( config: ProtocolConfigRef, state: AccumulateState, workReports: [WorkReport], service: ServiceIndex, alwaysAcc: [ServiceIndex: Gas], timeslot: TimeslotIndex - ) async throws -> SingleAccumulationOutput { + ) async throws -> (ServiceIndex, AccumulationResult) { var gas = Gas(0) var arguments: [OperandTuple] = [] @@ -178,7 +238,7 @@ extension Accumulation { } } - logger.debug("[single] accumulate arguments: \(arguments)") + logger.debug("[∆1] service: \(service), arguments: \(arguments)") let result = try await accumulate( config: config, @@ -189,9 +249,9 @@ extension Accumulation { timeslot: timeslot ) - logger.debug("[single] accumulate result: gasUsed=\(result.gasUsed), commitment=\(String(describing: result.commitment))") + logger.debug("[∆1] service: \(service), gasUsed: \(result.gasUsed), commitment: \(String(describing: result.commitment))") - return result + return (service, result) } /// parallelized accumulate function ∆* @@ -206,14 +266,10 @@ extension Accumulation { var gasUsed: [(serviceIndex: ServiceIndex, gas: Gas)] = [] var transfers: [DeferredTransfers] = [] var commitments = Set() - var newValidatorQueue: ConfigFixedSizeArray< - ValidatorKey, ProtocolConfig.TotalNumberOfValidators - >? - var newAuthorizationQueue = authorizationQueue + var currentState = state + var servicePreimageSet = Set() + var overallAccountChanges = AccountChanges() - var tempPrivilegedServices: PrivilegedServices? - var newDelegator: ServiceIndex? - var newAssigners: ConfigFixedSizeArray? for report in workReports { for digest in report.digests { @@ -225,29 +281,114 @@ extension Accumulation { services.append(service) } - logger.debug("[parallel] services to accumulate: \(services)") + let uniqueServices = Set(services) + logger.debug("[∆*] services to accumulate: \(Array(uniqueServices))") + + let serviceBatches = sortServicesToBatches(services: uniqueServices) + + for serviceBatch in serviceBatches { + logger.debug("[∆*] processing batch: \(serviceBatch)") + + let batchState = currentState + + let batchResults = try await withThrowingTaskGroup( + of: (ServiceIndex, AccumulationResult).self, + returning: [(ServiceIndex, AccumulationResult)].self + ) { group in + for service in serviceBatch { + group.addTask { [batchState] in + let parallelState = batchState.copy() + + return try await Self.singleAccumulate( + config: config, + state: parallelState, + workReports: workReports, + service: service, + alwaysAcc: alwaysAcc, + timeslot: timeslot + ) + } + } - var accountsRef = ServiceAccountsMutRef(state.accounts.value) - var servicePreimageSet = Set() + var results: [(ServiceIndex, AccumulationResult)] = [] + for try await result in group { + results.append(result) + } + return results + } - for service in Set(services) { - let singleOutput = try await singleAccumulate( - config: config, - state: AccumulateState( - accounts: accountsRef, - validatorQueue: state.validatorQueue, - authorizationQueue: state.authorizationQueue, - manager: state.manager, - assigners: state.assigners, - delegator: state.delegator, - alwaysAcc: state.alwaysAcc, - entropy: state.entropy - ), - workReports: workReports, - service: service, - alwaysAcc: alwaysAcc, - timeslot: timeslot + try await mergeParallelBatchResults( + batchResults: batchResults, + currentState: ¤tState, + overallAccountChanges: &overallAccountChanges, + gasUsed: &gasUsed, + commitments: &commitments, + transfers: &transfers, + servicePreimageSet: &servicePreimageSet ) + } + + try await preimageIntegration( + servicePreimageSet: servicePreimageSet, + accounts: currentState.accounts, + timeslot: timeslot + ) + + return ParallelAccumulationOutput( + state: currentState, + transfers: transfers, + commitments: commitments, + gasUsed: gasUsed + ) + } + + private func sortServicesToBatches(services: Set) -> [[ServiceIndex]] { + var batches: [[ServiceIndex]] = [] + var remainingServices = services + + // Batch 1: Manager service (if present) + if remainingServices.contains(privilegedServices.manager) { + batches.append([privilegedServices.manager]) + remainingServices.remove(privilegedServices.manager) + } + + // Batch 2: Services that depend on a* and v* + var dependentServices: [ServiceIndex] = [] + if remainingServices.contains(privilegedServices.delegator) { + dependentServices.append(privilegedServices.delegator) + remainingServices.remove(privilegedServices.delegator) + } + for assigner in privilegedServices.assigners + where remainingServices.contains(assigner) + { + dependentServices.append(assigner) + remainingServices.remove(assigner) + } + + if !dependentServices.isEmpty { + batches.append(dependentServices) + } + + // Batch 3: All remaining services + if !remainingServices.isEmpty { + batches.append(Array(remainingServices)) + } + + return batches + } + + private mutating func mergeParallelBatchResults( + batchResults: [(ServiceIndex, AccumulationResult)], + currentState: inout AccumulateState, + overallAccountChanges: inout AccountChanges, + gasUsed: inout [(serviceIndex: ServiceIndex, gas: Gas)], + commitments: inout Set, + transfers: inout [DeferredTransfers], + servicePreimageSet: inout Set + ) async throws { + var batchAccountChanges = AccountChanges() + + for (service, singleOutput) in batchResults { gasUsed.append((service, singleOutput.gasUsed)) if let commitment = singleOutput.commitment { @@ -260,65 +401,39 @@ extension Accumulation { servicePreimageSet.formUnion(singleOutput.provide) - // m' + try batchAccountChanges.checkAndMerge(with: singleOutput.state.accounts.changes) + try overallAccountChanges.checkAndMerge(with: singleOutput.state.accounts.changes) + + // m' - Manager service establishes new privileged services if service == privilegedServices.manager { - tempPrivilegedServices = PrivilegedServices( - manager: singleOutput.state.manager, - assigners: singleOutput.state.assigners, - delegator: singleOutput.state.delegator, - alwaysAcc: singleOutput.state.alwaysAcc - ) + currentState.manager = singleOutput.state.manager + currentState.assigners = singleOutput.state.assigners + currentState.delegator = singleOutput.state.delegator + currentState.alwaysAcc = singleOutput.state.alwaysAcc } - // i' + // i' - Current delegator service can update validator queue if service == privilegedServices.delegator { - newValidatorQueue = singleOutput.state.validatorQueue + currentState.validatorQueue = singleOutput.state.validatorQueue } - // v' - if service == tempPrivilegedServices?.delegator { - newDelegator = singleOutput.state.delegator - } - - // q' + // q' - Current assigners update authorization queue if let index = privilegedServices.assigners.firstIndex(of: service) { - newAuthorizationQueue[index] = singleOutput.state.authorizationQueue[index] + currentState.authorizationQueue[index] = singleOutput.state.authorizationQueue[index] } - // a' - if let index = tempPrivilegedServices?.assigners.firstIndex(of: service) { - if newAssigners == nil { - newAssigners = tempPrivilegedServices?.assigners - } - newAssigners![index] = singleOutput.state.assigners[index] + // v' - New delegator + if service == currentState.delegator { + currentState.delegator = singleOutput.state.delegator } - accountsRef = singleOutput.state.accounts - try overallAccountChanges.checkAndMerge(with: accountsRef.changes) - accountsRef.clearRecordedChanges() + // a' - New assigners + if let index = currentState.assigners.firstIndex(of: service) { + currentState.assigners[index] = singleOutput.state.assigners[index] + } } - try await preimageIntegration( - servicePreimageSet: servicePreimageSet, - accounts: accountsRef, - timeslot: timeslot - ) - - return ParallelAccumulationOutput( - state: AccumulateState( - accounts: accountsRef, - validatorQueue: newValidatorQueue ?? validatorQueue, - authorizationQueue: newAuthorizationQueue, - manager: tempPrivilegedServices?.manager ?? privilegedServices.manager, - assigners: newAssigners ?? tempPrivilegedServices?.assigners ?? privilegedServices.assigners, - delegator: newDelegator ?? tempPrivilegedServices?.delegator ?? privilegedServices.delegator, - alwaysAcc: tempPrivilegedServices?.alwaysAcc ?? privilegedServices.alwaysAcc, - entropy: state.entropy - ), - transfers: transfers, - commitments: commitments, - gasUsed: gasUsed - ) + try await batchAccountChanges.apply(to: currentState.accounts) } /// outer accumulate function ∆+ @@ -354,7 +469,7 @@ extension Accumulation { gasUsed: [] ) } else { - logger.debug("[outer] can accumulate until index: \(i)") + logger.debug("[∆+] can accumulate until index: \(i)") let parallelOutput = try await parallelizedAccumulate( config: config, @@ -466,7 +581,7 @@ extension Accumulation { let rightQueueItems = accumulationQueue.array[index...] let leftQueueItems = accumulationQueue.array[0 ..< index] - var allQueueItems = rightQueueItems.flatMap(\.self) + leftQueueItems.flatMap(\.self) + newQueueItems + var allQueueItems = rightQueueItems.flatMap { $0 } + leftQueueItems.flatMap { $0 } + newQueueItems editQueue(items: &allQueueItems, accumulatedPackages: Set(zeroPrereqReports.map(\.packageSpecification.workPackageHash))) @@ -495,8 +610,6 @@ extension Accumulation { } /// Accumulate execution, state integration and deferred transfers - /// - /// Return accumulation-result merkle tree root public mutating func update( config: ProtocolConfigRef, availableReports: [WorkReport], @@ -596,7 +709,7 @@ extension Accumulation { for (service, _) in accumulateOutput.gasUsed { if accumulateStats[service] != nil { continue } - let digests = accumulated.compactMap(\.digests).flatMap(\.self) + let digests = accumulated.compactMap(\.digests).flatMap { $0 } let num = digests.filter { $0.serviceIndex == service }.count if num == 0 { continue } diff --git a/Blockchain/Sources/Blockchain/State/ServiceAccounts.swift b/Blockchain/Sources/Blockchain/State/ServiceAccounts.swift index 38a21863..f4677a68 100644 --- a/Blockchain/Sources/Blockchain/State/ServiceAccounts.swift +++ b/Blockchain/Sources/Blockchain/State/ServiceAccounts.swift @@ -24,7 +24,7 @@ public protocol ServiceAccounts: Sendable { public class ServiceAccountsRef: Ref, @unchecked Sendable {} -public class ServiceAccountsMutRef { +public class ServiceAccountsMutRef: @unchecked Sendable { public let ref: RefMut public private(set) var changes: AccountChanges @@ -41,17 +41,17 @@ public class ServiceAccountsMutRef { public func set(serviceAccount index: ServiceIndex, account: ServiceAccountDetails) { ref.value.set(serviceAccount: index, account: account) - changes.addAlteration(index: index) { $0.set(serviceAccount: index, account: account) } + changes.addAccountUpdate(index: index, account: account) } public func set(serviceAccount index: ServiceIndex, storageKey key: Data, value: Data?) async throws { try await ref.value.set(serviceAccount: index, storageKey: key, value: value) - changes.addAlteration(index: index) { try await $0.set(serviceAccount: index, storageKey: key, value: value) } + changes.addStorageUpdate(index: index, key: key, value: value) } public func set(serviceAccount index: ServiceIndex, preimageHash hash: Data32, value: Data?) { ref.value.set(serviceAccount: index, preimageHash: hash, value: value) - changes.addAlteration(index: index) { $0.set(serviceAccount: index, preimageHash: hash, value: value) } + changes.addPreimageUpdate(index: index, hash: hash, value: value) } public func set( @@ -61,7 +61,7 @@ public class ServiceAccountsMutRef { value: LimitedSizeArray? ) async throws { try await ref.value.set(serviceAccount: index, preimageHash: hash, length: length, value: value) - changes.addAlteration(index: index) { try await $0.set(serviceAccount: index, preimageHash: hash, length: length, value: value) } + changes.addPreimageInfoUpdate(index: index, hash: hash, length: length, value: value) } public func addNew(serviceAccount index: ServiceIndex, account: ServiceAccount) async throws { diff --git a/PolkaVM/Sources/PolkaVM/Engine.swift b/PolkaVM/Sources/PolkaVM/Engine.swift index 23b30e1b..e1b16854 100644 --- a/PolkaVM/Sources/PolkaVM/Engine.swift +++ b/PolkaVM/Sources/PolkaVM/Engine.swift @@ -8,10 +8,12 @@ public class Engine { let config: PvmConfig let invocationContext: (any InvocationContext)? private var stepCounter: Int = 0 + private let enableStepLogging: Bool public init(config: PvmConfig, invocationContext: (any InvocationContext)? = nil) { self.config = config self.invocationContext = invocationContext + enableStepLogging = ProcessInfo.processInfo.environment["PVM_STEP_LOGGING"] != nil } public func execute(state: any VMState) async -> ExitReason { @@ -75,7 +77,9 @@ public class Engine { // context.state.consumeGas(blockGas) // } - logStep(pc: pc, instruction: inst, context: context) + if enableStepLogging { + logStep(pc: pc, instruction: inst, context: context) + } return inst.execute(context: context, skip: skip) }