Skip to content

Commit 930480b

Browse files
committed
[Distributed] Pass args from Invocation to HBuffer
1 parent 372ada0 commit 930480b

File tree

3 files changed

+199
-145
lines changed

3 files changed

+199
-145
lines changed

stdlib/public/Distributed/DistributedActorSystem.swift

Lines changed: 80 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,13 @@
99
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
1010
//
1111
//===----------------------------------------------------------------------===//
12-
1312
import Swift
1413
import _Concurrency
1514

1615
@available(SwiftStdlib 5.6, *)
1716
public protocol DistributedActorSystem: Sendable {
1817
/// The identity used by actors that communicate via this transport
1918
associatedtype ActorID: Sendable & Hashable & Codable // TODO: make Codable conditional here
20-
2119
/// The specific type of the argument builder to be used for remote calls.
2220
associatedtype Invocation: DistributedTargetInvocation
2321

@@ -26,7 +24,6 @@ public protocol DistributedActorSystem: Sendable {
2624

2725
// ==== ---------------------------------------------------------------------
2826
// - MARK: Resolving actors by identity
29-
3027
/// Resolve a local or remote actor address to a real actor instance, or throw if unable to.
3128
/// The returned value is either a local actor or proxy to a remote actor.
3229
///
@@ -48,12 +45,11 @@ public protocol DistributedActorSystem: Sendable {
4845
/// Detecting liveness of such remote actors shall be offered / by transport libraries
4946
/// by other means, such as "watching an actor for termination" or similar.
5047
func resolve<Act>(id: ActorID, as actorType: Act.Type) throws -> Act?
51-
where Act: DistributedActor,
52-
Act.ID == ActorID
48+
where Act: DistributedActor,
49+
Act.ID == ActorID
5350

5451
// ==== ---------------------------------------------------------------------
5552
// - MARK: Actor Lifecycle
56-
5753
/// Create an `ActorID` for the passed actor type.
5854
///
5955
/// This function is invoked by an distributed actor during its initialization,
@@ -65,8 +61,8 @@ public protocol DistributedActorSystem: Sendable {
6561
/// `system.resolve(id: addr1, as: Greeter.self)` MUST return a reference
6662
/// to the same actor.
6763
func assignID<Act>(_ actorType: Act.Type) -> ActorID
68-
where Act: DistributedActor,
69-
Act.ID == ActorID
64+
where Act: DistributedActor,
65+
Act.ID == ActorID
7066

7167
/// Invoked during a distributed actor's initialization, as soon as it becomes fully initialized.
7268
///
@@ -84,8 +80,8 @@ public protocol DistributedActorSystem: Sendable {
8480
///
8581
/// - Parameter actor: reference to the (local) actor that was just fully initialized.
8682
func actorReady<Act>(_ actor: Act)
87-
where Act: DistributedActor,
88-
Act.ID == ActorID
83+
where Act: DistributedActor,
84+
Act.ID == ActorID
8985

9086
/// Called during when a distributed actor is deinitialized, or fails to initialize completely (e.g. by throwing
9187
/// out of an `init` that did not completely initialize all of the the actors stored properties yet).
@@ -102,10 +98,9 @@ public protocol DistributedActorSystem: Sendable {
10298

10399
// ==== ---------------------------------------------------------------------
104100
// - MARK: Remote Method Invocations
105-
106101
/// Invoked by the Swift runtime when a distributed remote call is about to be made.
107102
///
108-
/// The returned DistributedTargetInvocation will be populated with all
103+
/// The returned `DistributedTargetInvocation` will be populated with all
109104
/// arguments, generic substitutions, and specific error and return types
110105
/// that are associated with this specific invocation.
111106
@inlinable
@@ -124,19 +119,17 @@ public protocol DistributedActorSystem: Sendable {
124119
// func remoteCall<Act, Err, Res>(
125120
// on actor: Act,
126121
// target: RemoteCallTarget,
127-
// arguments: Invocation,
122+
// invocation: inout Invocation,
128123
// throwing: Err.Type,
129124
// returning: Res.Type
130-
// ) async throws -> Res.Type
125+
// ) async throws -> Res
131126
// where Act: DistributedActor,
132127
// Act.ID == ActorID,
133128
// Res: SerializationRequirement
134-
135129
}
136130

137131
// ==== ----------------------------------------------------------------------------------------------------------------
138132
// MARK: Execute Distributed Methods
139-
140133
@available(SwiftStdlib 5.6, *)
141134
extension DistributedActorSystem {
142135

@@ -154,21 +147,20 @@ extension DistributedActorSystem {
154147
/// is that thanks to this approach it can avoid any existential boxing, and can serve the most
155148
/// latency sensitive-use-cases.
156149
public func executeDistributedTarget<Act, ResultHandler>(
157-
on actor: Act,
158-
mangledTargetName: String,
159-
invocation: Self.Invocation,
160-
handler: ResultHandler
150+
on actor: Act,
151+
mangledTargetName: String,
152+
invocation: inout Invocation,
153+
handler: ResultHandler
161154
) async throws where Act: DistributedActor,
162155
Act.ID == ActorID,
163156
ResultHandler: DistributedTargetInvocationResultHandler {
164157
// NOTE: this implementation is not the most efficient, nor final, version of this func
165158
// we end up demangling the name multiple times, perform more heap allocations than
166159
// we truly need to etc. We'll eventually move this implementation to a specialized one
167160
// avoiding these issues.
168-
169161
guard mangledTargetName.count > 0 && mangledTargetName.first == "$" else {
170162
throw ExecuteDistributedTargetError(
171-
message: "Illegal mangledTargetName detected, must start with '$'")
163+
message: "Illegal mangledTargetName detected, must start with '$'")
172164
}
173165

174166
// Get the expected parameter count of the func
@@ -179,36 +171,36 @@ extension DistributedActorSystem {
179171

180172
guard paramCount >= 0 else {
181173
throw ExecuteDistributedTargetError(
182-
message: """
183-
Failed to decode distributed invocation target expected parameter count,
184-
error code: \(paramCount)
185-
mangled name: \(mangledTargetName)
186-
""")
174+
message: """
175+
Failed to decode distributed invocation target expected parameter count,
176+
error code: \(paramCount)
177+
mangled name: \(mangledTargetName)
178+
""")
187179
}
188180

189181
// Prepare buffer for the parameter types to be decoded into:
190182
let paramTypesBuffer = UnsafeMutableRawBufferPointer
191-
.allocate(byteCount: MemoryLayout<Any.Type>.size * Int(paramCount),
192-
alignment: MemoryLayout<Any.Type>.alignment)
183+
.allocate(byteCount: MemoryLayout<Any.Type>.size * Int(paramCount),
184+
alignment: MemoryLayout<Any.Type>.alignment)
193185
defer {
194186
paramTypesBuffer.deallocate()
195187
}
196188

197189
// Demangle and write all parameter types into the prepared buffer
198190
let decodedNum = nameUTF8.withUnsafeBufferPointer { nameUTF8 in
199191
__getParameterTypeInfo(
200-
nameUTF8.baseAddress!, UInt(nameUTF8.endIndex),
201-
paramTypesBuffer.baseAddress!._rawValue, Int(paramCount))
192+
nameUTF8.baseAddress!, UInt(nameUTF8.endIndex),
193+
paramTypesBuffer.baseAddress!._rawValue, Int(paramCount))
202194
}
203195

204196
// Fail if the decoded parameter types count seems off and fishy
205197
guard decodedNum == paramCount else {
206198
throw ExecuteDistributedTargetError(
207-
message: """
208-
Failed to decode the expected number of params of distributed invocation target, error code: \(decodedNum)
209-
(decoded: \(decodedNum), expected params: \(paramCount)
210-
mangled name: \(mangledTargetName)
211-
""")
199+
message: """
200+
Failed to decode the expected number of params of distributed invocation target, error code: \(decodedNum)
201+
(decoded: \(decodedNum), expected params: \(paramCount)
202+
mangled name: \(mangledTargetName)
203+
""")
212204
}
213205

214206
// Copy the types from the buffer into a Swift Array
@@ -224,21 +216,23 @@ extension DistributedActorSystem {
224216
func allocateReturnTypeBuffer<R>(_: R.Type) -> UnsafeRawPointer? {
225217
return UnsafeRawPointer(UnsafeMutablePointer<R>.allocate(capacity: 1))
226218
}
227-
guard let returnType: Any.Type = _getReturnTypeInfo(mangledMethodName: mangledTargetName) else {
219+
220+
guard let returnTypeFromTypeInfo: Any.Type = _getReturnTypeInfo(mangledMethodName: mangledTargetName) else {
228221
throw ExecuteDistributedTargetError(
229-
message: "Failed to decode distributed target return type")
222+
message: "Failed to decode distributed target return type")
230223
}
231224

232-
guard let resultBuffer = _openExistential(returnType, do: allocateReturnTypeBuffer) else {
225+
guard let resultBuffer = _openExistential(returnTypeFromTypeInfo, do: allocateReturnTypeBuffer) else {
233226
throw ExecuteDistributedTargetError(
234-
message: "Failed to allocate buffer for distributed target return type")
227+
message: "Failed to allocate buffer for distributed target return type")
235228
}
236229

237230
func destroyReturnTypeBuffer<R>(_: R.Type) {
238231
resultBuffer.assumingMemoryBound(to: R.self).deallocate()
239232
}
233+
240234
defer {
241-
_openExistential(returnType, do: destroyReturnTypeBuffer)
235+
_openExistential(returnTypeFromTypeInfo, do: destroyReturnTypeBuffer)
242236
}
243237

244238
// Prepare the buffer to decode the argument values into
@@ -249,12 +243,37 @@ extension DistributedActorSystem {
249243
}
250244

251245
do {
246+
// Decode the invocation and pack arguments into the h-buffer
247+
// TODO(distributed): decode the generics info
248+
var argumentDecoder = invocation.makeArgumentDecoder()
249+
var paramIdx = 0
250+
for unsafeRawArgPointer in hargs {
251+
guard paramIdx < paramCount else {
252+
throw ExecuteDistributedTargetError(
253+
message: "Unexpected attempt to decode more parameters than expected: \(paramIdx + 1)")
254+
}
255+
let paramType = paramTypes[paramIdx]
256+
paramIdx += 1
257+
258+
// FIXME(distributed): func doDecode<Arg: SerializationRequirement>(_: Arg.Type) throws {
259+
// FIXME: but how would we call this...?
260+
// FIXME: > type 'Arg' constrained to non-protocol, non-class type 'Self.Invocation.SerializationRequirement'
261+
func doDecodeArgument<Arg>(_: Arg.Type) throws {
262+
let unsafeArgPointer = unsafeRawArgPointer
263+
.bindMemory(to: Arg.self, capacity: 1)
264+
try argumentDecoder.decodeNext(Arg.self, into: unsafeArgPointer)
265+
}
266+
try _openExistential(paramType, do: doDecodeArgument)
267+
}
268+
269+
let returnType = try invocation.decodeReturnType() ?? returnTypeFromTypeInfo
270+
// let errorType = try invocation.decodeErrorType() // TODO: decide how to use?
252271
// Execute the target!
253272
try await _executeDistributedTarget(
254-
on: actor,
255-
mangledTargetName, UInt(mangledTargetName.count),
256-
argumentBuffer: hargs.buffer._rawValue,
257-
resultBuffer: resultBuffer._rawValue
273+
on: actor,
274+
mangledTargetName, UInt(mangledTargetName.count),
275+
argumentBuffer: hargs.buffer._rawValue,
276+
resultBuffer: resultBuffer._rawValue
258277
)
259278

260279
func onReturn<R>(_ resultTy: R.Type) async throws {
@@ -279,15 +298,14 @@ func _executeDistributedTarget(
279298

280299
// ==== ----------------------------------------------------------------------------------------------------------------
281300
// MARK: Support types
282-
283301
/// A distributed 'target' can be a `distributed func` or `distributed` computed property.
284302
@available(SwiftStdlib 5.6, *)
285303
public struct RemoteCallTarget {
286304
let mangledName: String
287305

288306
// Only intended to be created by the _Distributed library.
289-
internal init(mangledName: String) {
290-
self.mangledName = mangledName
307+
public init(_mangledName: String) {
308+
self.mangledName = _mangledName
291309
}
292310

293311
// <module>.Base.hello(hi:)
@@ -333,38 +351,26 @@ public protocol DistributedTargetInvocation {
333351
associatedtype SerializationRequirement
334352

335353
// === Sending / recording -------------------------------------------------
336-
337-
// RECIPIENT:
338-
// when we get the gen args; we can check them against the where clause
339-
// and throw, rather than blow up... we have the ability to check the where clause
340-
// the same code as implements `as?` takes generic signature and checks type arguments against that
341-
//
342-
// accessor is not generic, gets the subs passed in, forms SubsMap and uses to look up other things
343-
// pass to call emission which would do the right thing...
344-
/// Ad-hoc requirement
345-
///
346354
/// The arguments must be encoded order-preserving, and once `decodeGenericSubstitutions`
347355
/// is called, the substitutions must be returned in the same order in which they were recorded.
348-
mutating func recordGenericSubstitution<T>(mangledType: T.Type) throws
356+
mutating func recordGenericSubstitution<T>(_ type: T.Type) throws
349357

350358
// /// Ad-hoc requirement
351359
// ///
352360
// /// Record an argument of `Argument` type in this arguments storage.
353361
// mutating func recordArgument<Argument: SerializationRequirement>(argument: Argument) throws
362+
mutating func recordErrorType<E: Error>(_ type: E.Type) throws
354363

355364
// /// Ad-hoc requirement
356365
// ///
357-
// mutating func recordReturnType<R: SerializationRequirement>(mangledType: R.Type) throws
358-
359-
mutating func recordErrorType<E: Error>(mangledType: E.Type) throws
360-
366+
// /// Record the return type of the distributed method.
367+
// mutating func recordReturnType<R: SerializationRequirement>(_ type: R.Type) throws
361368
mutating func doneRecording() throws
362369

363370
// === Receiving / decoding -------------------------------------------------
364-
365371
mutating func decodeGenericSubstitutions() throws -> [Any.Type]
366372

367-
mutating func argumentDecoder() -> Self.ArgumentDecoder
373+
func makeArgumentDecoder() -> Self.ArgumentDecoder
368374

369375
mutating func decodeReturnType() throws -> Any.Type?
370376

@@ -375,20 +381,6 @@ public protocol DistributedTargetInvocation {
375381
///
376382
/// It will be called exactly `N` times where `N` is the known number of arguments
377383
/// to the target invocation.
378-
///
379-
/// ## Ad-hoc protocol requirement
380-
///
381-
/// Adopters of this protocol must defined the following method:
382-
///
383-
/// ```
384-
/// mutating func decodeNext<Argument: SerializationRequirement>(
385-
/// into pointer: UnsafeMutablePointer<Argument>
386-
/// ) throws
387-
/// ```
388-
///
389-
/// which will be invoked with the specific `Argument` type that the target invocation is expecting.
390-
///
391-
/// This method is allowed to invoke
392384
@available(SwiftStdlib 5.6, *)
393385
public protocol DistributedTargetInvocationArgumentDecoder {
394386
associatedtype SerializationRequirement
@@ -409,11 +401,18 @@ public protocol DistributedTargetInvocationArgumentDecoder {
409401
// mutating func decodeNext<Argument: SerializationRequirement>(
410402
// into pointer: UnsafeMutablePointer<Argument> // pointer to our hbuffer
411403
// ) throws
404+
// FIXME(distributed): remove this since it must have the ': SerializationRequirement'
405+
mutating func decodeNext<Argument>(
406+
_ argumentType: Argument.Type,
407+
into pointer: UnsafeMutablePointer<Argument> // pointer to our hbuffer
408+
) throws
412409
}
413410

414411
@available(SwiftStdlib 5.6, *)
415412
public protocol DistributedTargetInvocationResultHandler {
416-
// FIXME: these must be ad-hoc protocol requirements, because Res: SerializationRequirement !!!
413+
associatedtype SerializationRequirement
414+
415+
// FIXME(distributed): these must be ad-hoc protocol requirements, because Res: SerializationRequirement !!!
417416
func onReturn<Res>(value: Res) async throws
418417
func onThrow<Err: Error>(error: Err) async throws
419418
}
@@ -443,7 +442,7 @@ public struct DistributedActorCodingError: DistributedActorSystemError {
443442
}
444443

445444
public static func missingActorSystemUserInfo<Act>(_ actorType: Act.Type) -> Self
446-
where Act: DistributedActor {
445+
where Act: DistributedActor {
447446
.init(message: "Missing DistributedActorSystem userInfo while decoding")
448447
}
449448
}

0 commit comments

Comments
 (0)