diff --git a/Package.swift b/Package.swift index 7709d98..5ba4f02 100644 --- a/Package.swift +++ b/Package.swift @@ -7,9 +7,11 @@ import PackageDescription let externalDependencies: [String: Range] = [ "https://github.com/ordo-one/flatbuffers": .upToNextMajor(from: "25.2.10-ordo.2"), "https://github.com/apple/swift-argument-parser": .upToNextMajor(from: "1.1.0"), - "https://github.com/apple/swift-nio": .upToNextMajor(from: "2.83.0"), + "https://github.com/apple/swift-distributed-tracing": .upToNextMajor(from: "1.0.0"), "https://github.com/apple/swift-log": .upToNextMajor(from: "1.4.4"), - "https://github.com/apple/swift-service-discovery.git" : .upToNextMajor(from: "1.0.0"), + "https://github.com/apple/swift-nio": .upToNextMajor(from: "2.83.0"), + "https://github.com/apple/swift-service-context.git": .upToNextMajor(from: "1.2.1"), + "https://github.com/apple/swift-service-discovery.git": .upToNextMajor(from: "1.0.0") ] let internalDependencies: [String: Range] = [ @@ -70,10 +72,12 @@ let package = Package( name: "DistributedSystem", dependencies: [ .product(name: "ConsulServiceDiscovery", package: "package-consul"), + .product(name: "Instrumentation", package: "swift-distributed-tracing"), .product(name: "Logging", package: "swift-log"), .product(name: "lz4", package: "package-lz4"), .product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), + .product(name: "ServiceContextModule", package: "swift-service-context"), .product(name: "ServiceDiscovery", package: "swift-service-discovery"), ], swiftSettings:[ @@ -118,6 +122,8 @@ let package = Package( dependencies: [ "DistributedSystem", "TestMessages", + .product(name: "InMemoryTracing", package: "swift-distributed-tracing"), + .product(name: "Tracing", package: "swift-distributed-tracing") ], resources: [ .process("Resources") diff --git a/Sources/DistributedSystem/ChannelHandshake.swift b/Sources/DistributedSystem/ChannelHandshake.swift index 367f75b..eb3a9f8 100644 --- a/Sources/DistributedSystem/ChannelHandshake.swift +++ b/Sources/DistributedSystem/ChannelHandshake.swift @@ -96,7 +96,11 @@ final class ChannelHandshakeServer: ChannelInboundHandler, RemovableChannelHandl // If we would close TCP connection now then client not necessary will receive it. } } else { - logger.info("\(context.channel.addressDescription): invalid handshake request received, closing connection\n\(unwrapInboundIn(data).hexDump(format: ByteBuffer.HexDumpFormat.detailed(maxBytes: Self.hexDumpMaxBytes)))") + logger.info(""" + \(context.channel.addressDescription): invalid handshake request received, closing connection + \(unwrapInboundIn(data).hexDump(format: ByteBuffer.HexDumpFormat.detailed(maxBytes: Self.hexDumpMaxBytes))) + """ + ) context.close(promise: nil) } } diff --git a/Sources/DistributedSystem/DistributedSystem.swift b/Sources/DistributedSystem/DistributedSystem.swift index c0f026a..c19bc66 100644 --- a/Sources/DistributedSystem/DistributedSystem.swift +++ b/Sources/DistributedSystem/DistributedSystem.swift @@ -12,10 +12,12 @@ import Atomics import ConsulServiceDiscovery import Dispatch import Distributed +import Instrumentation import Logging import struct Foundation.Data import struct Foundation.UUID import NIOCore +import ServiceContextModule import Synchronization internal import NIOPosix @@ -783,10 +785,10 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable { let stateSize = MemoryLayout.size let bufferSize = ULEB128.size(UInt(stateSize)) + stateSize var buffer = ByteBufferAllocator().buffer(capacity: bufferSize) - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(UInt(stateSize), to: ptr.baseAddress!) } - _ = res.state.rawValue.withUnsafeBytesSerialization { bytes in buffer.writeBytes(bytes) } + ULEB128.encode(UInt(stateSize), to: &buffer) + _ = res.state.rawValue.withUnsafeBytesSerialization { buffer.writeBytes($0) } - let envelope = InvocationEnvelope(0, "", [], buffer) + let envelope = InvocationEnvelope(0, [:], "", [], buffer) for entry in res.actors { dispatchInvocation(envelope, for: entry.0, channel, entry.1, entry.2) } @@ -1141,18 +1143,31 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable { } } + let serviceContext: ServiceContext? = (callID == 0) ? nil : .current let payloadSize = MemoryLayout.size + ULEB128.size(actor.id.instanceID) - + InvocationEnvelope.wireSize(callID, invocation.genericSubstitutions, invocation.arguments, target) + + InvocationEnvelope.wireSize( + callID, + serviceContext, + invocation.genericSubstitutions, + invocation.arguments, + target) + // Even if we carefully calculated the capacity of the desired buffer and know it to the nearest byte, // swift-nio still allocates a buffer with a storage capacity rounded up to nearest power of 2... // Weird... var buffer = ByteBufferAllocator().buffer(capacity: MemoryLayout.size + payloadSize) buffer.writeInteger(UInt32(payloadSize)) buffer.writeInteger(SessionMessage.invocationEnvelope.rawValue) - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(actor.id.instanceID, to: ptr.baseAddress!) } - let targetOffset = InvocationEnvelope.encode(callID, invocation.genericSubstitutions, &invocation.arguments, to: &buffer) + ULEB128.encode(actor.id.instanceID, to: &buffer) + let targetOffset = InvocationEnvelope.encode( + callID, + serviceContext, + invocation.genericSubstitutions, + &invocation.arguments, + to: &buffer + ) let targetIdentifier = target.identifier channel.eventLoop.execute { [buffer] in @@ -1193,7 +1208,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable { func channelRead(_ channelID: UInt32, _ channel: Channel, _ buffer: inout ByteBuffer, _ targetFuncs: inout [String]) { let bytesReceived = buffer.readableBytes - guard buffer.readInteger(as: UInt32.self) != nil, // skip message size + guard let _ = buffer.readInteger(as: UInt32.self), let rawMessageType = buffer.readInteger(as: SessionMessage.RawValue.self) else { logger.error("\(channel.addressDescription): invalid message received (\(bytesReceived)), closing connection") @@ -1402,15 +1417,55 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable { } } else { let resultHandler = ResultHandler(loggerBox, envelope.targetFunc) - try await executeDistributedTarget(on: actor, - target: RemoteCallTarget(envelope.targetFunc), - invocationDecoder: &decoder, - handler: resultHandler) - if resultHandler.hasResult { - try resultHandler.sendTo(channel, for: envelope.callID) + + if envelope.serviceContext.isEmpty { + try await executeDistributedTarget( + on: actor, + target: RemoteCallTarget(envelope.targetFunc), + invocationDecoder: &decoder, + handler: resultHandler + ) + + if envelope.callID == 0 { + if resultHandler.hasResult { + logger.error("internal error: unexpected result for \(envelope.targetFunc)") + } + } else { + if resultHandler.hasResult { + try resultHandler.sendTo(channel, for: envelope.callID) + } else { + logger.error("internal error: missing result for \(envelope.targetFunc)") + } + } } else { - if envelope.callID != 0 { - logger.error("internal error: missing result") + var requestBaggage = ServiceContext.current ?? .topLevel + + struct ServiceContextExtractor: Extractor { + typealias Carrier = [String: String] + func extract(key: String, from carrier: Carrier) -> String? { + carrier[key] + } + } + + InstrumentationSystem.instrument.extract( + envelope.serviceContext, + into: &requestBaggage, + using: ServiceContextExtractor(), + ) + + try await ServiceContext.withValue(requestBaggage) { + try await executeDistributedTarget( + on: actor, + target: RemoteCallTarget(envelope.targetFunc), + invocationDecoder: &decoder, + handler: resultHandler + ) + } + + if resultHandler.hasResult { + try resultHandler.sendTo(channel, for: envelope.callID) + } else { + logger.error("internal error: missing result for \(envelope.targetFunc)") } } } @@ -1425,7 +1480,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable { while true { let oldSize = (oldState & sizeMask) assert(oldSize >= envelope.size) - var newState = (oldState - envelope.size) + var newState = (oldState - UInt64(envelope.size)) let newSize = (newState & sizeMask) if (newSize < endpointQueueLowWatermark) && (oldSize >= endpointQueueLowWatermark) && ((oldState & Self.endpointQueueSuspendIndicator) != 0) { newState -= Self.endpointQueueSuspendIndicator @@ -1498,7 +1553,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable { let sizeMask = ((UInt64(1) << sizeBits) - 1) var oldState = queueState.load(ordering: .relaxed) while true { - var newState = (oldState + envelope.size) + var newState = (oldState + UInt64(envelope.size)) let oldSize = (oldState & sizeMask) let newSize = (newState & sizeMask) let warningSize = (endpointQueueWarningSize << ((oldState >> sizeBits) & 0x7F)) diff --git a/Sources/DistributedSystem/InvocationEnvelope.swift b/Sources/DistributedSystem/InvocationEnvelope.swift index e8e5b7d..a8f89a4 100644 --- a/Sources/DistributedSystem/InvocationEnvelope.swift +++ b/Sources/DistributedSystem/InvocationEnvelope.swift @@ -7,27 +7,73 @@ // http://www.apache.org/licenses/LICENSE-2.0 import Distributed +import Instrumentation import NIOCore +import ServiceContextModule + +#if canImport(Darwin) +import Darwin +#elseif canImport(Glibc) +import Glibc +#endif + +extension ULEB128 { + static func encode(_ value: T, to buffer: inout ByteBuffer) { + buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { + ULEB128.encode(value, to: $0.baseAddress!) + } + } + + static func decode(_ type: T.Type = T.self, from buffer: inout ByteBuffer) throws -> T { + try buffer.readWithUnsafeReadableBytes { + try ULEB128.decode($0, as: T.self) + } + } +} public struct InvocationEnvelope: Sendable { public let callID: UInt64 + public let serviceContext: [String: String] public let targetFunc: String public let genericSubstitutions: [Any.Type] public let arguments: ByteBuffer + public let size: Int - public var size: UInt64 { - UInt64(MemoryLayout.size + targetFunc.count + arguments.readableBytes) - } - - public init(_ callID: UInt64, _ targetFunc: String, _ genericSubstitutions: [Any.Type], _ arguments: ByteBuffer) { + init( + _ callID: UInt64, + _ serviceContext: [String: String], + _ targetFunc: String, + _ genericSubstitutions: [Any.Type], + _ arguments: ByteBuffer + ) { self.callID = callID + self.serviceContext = serviceContext self.targetFunc = targetFunc self.genericSubstitutions = genericSubstitutions self.arguments = arguments + self.size = arguments.readableBytes } - public init(from buffer: inout ByteBuffer, _ targetFuncs: inout [String]) throws { - callID = try buffer.readWithUnsafeReadableBytes { ptr in try ULEB128.decode(ptr, as: UInt64.self) } + init(from buffer: inout ByteBuffer, _ targetFuncs: inout [String]) throws { + let startIndex = buffer.readerIndex + self.callID = try ULEB128.decode(from: &buffer) + + var serviceContext = [String: String]() + while true { + let keySize = try ULEB128.decode(UInt.self, from: &buffer) + if keySize == 0 { + break + } + guard let key = buffer.readString(length: Int(keySize)) else { + throw DistributedSystemErrors.error("Failed to decode InvocationEnvelope (service context key)") + } + let valueSize = try ULEB128.decode(UInt.self, from: &buffer) + guard let value = buffer.readString(length: Int(valueSize)) else { + throw DistributedSystemErrors.error("Failed to decode InvocationEnvelope (service context value)") + } + serviceContext[key] = value + } + self.serviceContext = serviceContext var genericSubstitutions = [Any.Type]() while true { @@ -65,21 +111,52 @@ public struct InvocationEnvelope: Sendable { let funcId = try buffer.readWithUnsafeReadableBytes { ptr in try ULEB128.decode(ptr, as: UInt32.self) } self.targetFunc = targetFuncs[Int(funcId)] } + self.size = (buffer.readerIndex - startIndex) } public static func wireSize( _ callID: UInt64, + _ serviceContext: ServiceContext?, _ genericSubstitutions: [String], _ arguments: ByteBuffer, _ targetFunc: RemoteCallTarget ) -> Int { var wireSize = 0 wireSize += ULEB128.size(callID) + + if let serviceContext { + struct ServiceContextSizeCalculatorInjector: Injector { + typealias Carrier = Int + + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { + key.withCString { + let count = strlen($0) + carrier += ULEB128.size(UInt(count)) + carrier += count + } + value.withCString { + let count = strlen($0) + carrier += ULEB128.size(UInt(count)) + carrier += count + } + } + } + + InstrumentationSystem.instrument.inject( + serviceContext, + into: &wireSize, + using: ServiceContextSizeCalculatorInjector() + ) + } + wireSize += ULEB128.size(UInt(0)) + for typeName in genericSubstitutions { - wireSize += ULEB128.size(UInt(typeName.count)) - wireSize += typeName.count + typeName.utf8.withContiguousStorageIfAvailable { + wireSize += ULEB128.size(UInt($0.count)) + wireSize += $0.count + } } - wireSize += MemoryLayout.size + wireSize += ULEB128.size(UInt(0)) wireSize += ULEB128.size(UInt(arguments.readableBytes)) + arguments.readableBytes // The target funcion name first time will be sent as a string, @@ -87,7 +164,10 @@ public struct InvocationEnvelope: Sendable { // When we encode envelope we do not know is it first time or not, // so just reserve space large enough for any of them. wireSize += MemoryLayout.size // target type - let stringTargetSize = ULEB128.size(UInt(targetFunc.identifier.count)) + targetFunc.identifier.count + let stringTargetSize = targetFunc.identifier.withCString { + let count = strlen($0) + return ULEB128.size(UInt(count)) + count + } let idxTargetSize = ULEB128.size(UInt32.max) wireSize += max(stringTargetSize, idxTargetSize) return wireSize @@ -95,23 +175,47 @@ public struct InvocationEnvelope: Sendable { public static func encode( _ callID: UInt64, + _ serviceContext: ServiceContext?, _ genericSubstitutions: [String], _ arguments: inout ByteBuffer, to buffer: inout ByteBuffer ) -> Int { - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(callID, to: ptr.baseAddress!) } + ULEB128.encode(callID, to: &buffer) + + if let serviceContext { + struct ServiceContextInjector: Injector { + typealias Carrier = ByteBuffer + + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { + key.withCString { + let count = strlen($0) + ULEB128.encode(UInt(count), to: &carrier) + carrier.writeBytes(UnsafeRawBufferPointer(start: $0, count: count)) + } + + value.withCString { + let count = strlen($0) + ULEB128.encode(UInt(count), to: &carrier) + carrier.writeBytes(UnsafeRawBufferPointer(start: $0, count: count)) + } + } + } + + InstrumentationSystem.instrument.inject( + serviceContext, + into: &buffer, + using: ServiceContextInjector() + ) + } + ULEB128.encode(UInt(0), to: &buffer) for typeName in genericSubstitutions { - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { - ptr in ULEB128.encode(UInt(typeName.count), to: ptr.baseAddress!) - } + ULEB128.encode(UInt(typeName.count), to: &buffer) buffer.writeString(typeName) } - buffer.writeInteger(UInt8(0)) // generic substitution end indicator + ULEB128.encode(UInt(0), to: &buffer) - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { - ptr in ULEB128.encode(UInt(arguments.readableBytes), to: ptr.baseAddress!) - } + ULEB128.encode(UInt(arguments.readableBytes), to: &buffer) buffer.writeBuffer(&arguments) return buffer.writerIndex // offset where target should be encoded @@ -120,13 +224,16 @@ public struct InvocationEnvelope: Sendable { public static func setTargetId(_ name: String, in buffer: inout ByteBuffer, at offs: Int) { buffer.moveWriterIndex(to: offs) buffer.writeInteger(UInt8(0)) // target type = string - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(UInt(name.count), to: ptr.baseAddress!) } - buffer.writeString(name) + name.withCString { + let count = strlen($0) + ULEB128.encode(UInt(count), to: &buffer) + buffer.writeBytes(UnsafeRawBufferPointer(start: $0, count: count)) + } } public static func setTargetId(_ id: UInt32, in buffer: inout ByteBuffer, at offs: Int) { buffer.moveWriterIndex(to: offs) buffer.writeInteger(UInt8(1)) // target type = index - buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(id, to: ptr.baseAddress!) } + ULEB128.encode(id, to: &buffer) } } diff --git a/Tests/DistributedSystemTests/DistributedSystemTests.swift b/Tests/DistributedSystemTests/DistributedSystemTests.swift index c206d8d..cabe5af 100644 --- a/Tests/DistributedSystemTests/DistributedSystemTests.swift +++ b/Tests/DistributedSystemTests/DistributedSystemTests.swift @@ -5,6 +5,8 @@ import Distributed import Logging import NIOCore import Synchronization +import InMemoryTracing +import Tracing @testable import TestMessages import XCTest @@ -1360,13 +1362,67 @@ final class DistributedSystemTests: XCTestCase { serverSystem.stop() } + func testDistributedTracing() async throws { + distributed actor TestServiceEndpoint: ServiceEndpoint { + public typealias ActorSystem = DistributedSystem + public typealias SerializationRequirement = Transferable + + public static var serviceName: String { "test_service" } + + init(_ actorSystem: DistributedSystem) { + self.actorSystem = actorSystem + } + + distributed func handleConnectionState(_ state: ConnectionState) async throws { + } + + distributed func foo() async throws -> Bool { + var result = false + ServiceContext.current?.forEach { _, value in + if let context = value as? InMemorySpanContext { + logger.info("context=\(context)") + result = true + } + } + return result + } + } + + InstrumentationSystem.bootstrap(InMemoryTracer()) + let processInfo = ProcessInfo.processInfo + let systemName = "\(processInfo.hostName)-ts-\(processInfo.processIdentifier)-\(#line)" + + let serverSystem = DistributedSystemServer(name: systemName) + try await serverSystem.start() + try await serverSystem.addService(ofType: TestServiceEndpoint.self) { actorSystem in + TestServiceEndpoint(actorSystem) + } + + let clientSystem = DistributedSystem(name: systemName) + try clientSystem.start() + + let serviceEndpoint = try await clientSystem.connectToService( + TestServiceEndpoint.self, + withFilter: { _ in true } + ) + + let result = try await withSpan("remote call") { _ in + try await serviceEndpoint.foo() + } + + XCTAssertTrue(result) + + clientSystem.stop() + serverSystem.stop() + } + func testInvocationEncoding() { let callID = UInt64(42) var arguments = ByteBuffer() let remoteCallTarget = RemoteCallTarget("does not matter") - let wireSize = InvocationEnvelope.wireSize(callID, [], arguments, remoteCallTarget) + let wireSize = InvocationEnvelope.wireSize(callID, nil, [], arguments, remoteCallTarget) var buffer = ByteBufferAllocator().buffer(capacity: wireSize) - let targetOffset = InvocationEnvelope.encode(callID, [], &arguments, to: &buffer) + let targetOffset = InvocationEnvelope.encode(callID, nil, [], &arguments, to: &buffer) var copy = ByteBuffer() copy.writeImmutableBuffer(buffer) InvocationEnvelope.setTargetId(remoteCallTarget.identifier, in: &buffer, at: targetOffset)