Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import PackageDescription
let externalDependencies: [String: Range<Version>] = [
"https://github.com/ordo-one/flatbuffers": .upToNextMajor(from: "25.2.10-ordo.2"),
"https://github.com/apple/swift-argument-parser": .upToNextMajor(from: "1.1.0"),
"https://github.com/apple/swift-nio": .upToNextMajor(from: "2.83.0"),
"https://github.com/apple/swift-distributed-tracing": .upToNextMajor(from: "1.0.0"),
"https://github.com/apple/swift-log": .upToNextMajor(from: "1.4.4"),
"https://github.com/apple/swift-service-discovery.git" : .upToNextMajor(from: "1.0.0"),
"https://github.com/apple/swift-nio": .upToNextMajor(from: "2.83.0"),
"https://github.com/apple/swift-service-context.git": .upToNextMajor(from: "1.2.1"),
"https://github.com/apple/swift-service-discovery.git": .upToNextMajor(from: "1.0.0")
]

let internalDependencies: [String: Range<Version>] = [
Expand Down Expand Up @@ -70,10 +72,12 @@ let package = Package(
name: "DistributedSystem",
dependencies: [
.product(name: "ConsulServiceDiscovery", package: "package-consul"),
.product(name: "Instrumentation", package: "swift-distributed-tracing"),
.product(name: "Logging", package: "swift-log"),
.product(name: "lz4", package: "package-lz4"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "ServiceContextModule", package: "swift-service-context"),
.product(name: "ServiceDiscovery", package: "swift-service-discovery"),
],
swiftSettings:[
Expand Down Expand Up @@ -118,6 +122,8 @@ let package = Package(
dependencies: [
"DistributedSystem",
"TestMessages",
.product(name: "InMemoryTracing", package: "swift-distributed-tracing"),
.product(name: "Tracing", package: "swift-distributed-tracing")
],
resources: [
.process("Resources")
Expand Down
6 changes: 5 additions & 1 deletion Sources/DistributedSystem/ChannelHandshake.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ final class ChannelHandshakeServer: ChannelInboundHandler, RemovableChannelHandl
// If we would close TCP connection now then client not necessary will receive it.
}
} else {
logger.info("\(context.channel.addressDescription): invalid handshake request received, closing connection\n\(unwrapInboundIn(data).hexDump(format: ByteBuffer.HexDumpFormat.detailed(maxBytes: Self.hexDumpMaxBytes)))")
logger.info("""
\(context.channel.addressDescription): invalid handshake request received, closing connection
\(unwrapInboundIn(data).hexDump(format: ByteBuffer.HexDumpFormat.detailed(maxBytes: Self.hexDumpMaxBytes)))
"""
)
context.close(promise: nil)
}
}
Expand Down
91 changes: 73 additions & 18 deletions Sources/DistributedSystem/DistributedSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import Atomics
import ConsulServiceDiscovery
import Dispatch
import Distributed
import Instrumentation
import Logging
import struct Foundation.Data
import struct Foundation.UUID
import NIOCore
import ServiceContextModule
import Synchronization
internal import NIOPosix

Expand Down Expand Up @@ -783,10 +785,10 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {
let stateSize = MemoryLayout<ConnectionState.RawValue>.size
let bufferSize = ULEB128.size(UInt(stateSize)) + stateSize
var buffer = ByteBufferAllocator().buffer(capacity: bufferSize)
buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(UInt(stateSize), to: ptr.baseAddress!) }
_ = res.state.rawValue.withUnsafeBytesSerialization { bytes in buffer.writeBytes(bytes) }
ULEB128.encode(UInt(stateSize), to: &buffer)
_ = res.state.rawValue.withUnsafeBytesSerialization { buffer.writeBytes($0) }

let envelope = InvocationEnvelope(0, "", [], buffer)
let envelope = InvocationEnvelope(UInt32(stateSize), 0, [:], "", [], buffer)
for entry in res.actors {
dispatchInvocation(envelope, for: entry.0, channel, entry.1, entry.2)
}
Expand Down Expand Up @@ -1141,18 +1143,31 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {
}
}

let serviceContext: ServiceContext? = (callID == 0) ? nil : .current
let payloadSize =
MemoryLayout<SessionMessage.RawValue>.size
+ ULEB128.size(actor.id.instanceID)
+ InvocationEnvelope.wireSize(callID, invocation.genericSubstitutions, invocation.arguments, target)
+ InvocationEnvelope.wireSize(
callID,
serviceContext,
invocation.genericSubstitutions,
invocation.arguments,
target)

// Even if we carefully calculated the capacity of the desired buffer and know it to the nearest byte,
// swift-nio still allocates a buffer with a storage capacity rounded up to nearest power of 2...
// Weird...
var buffer = ByteBufferAllocator().buffer(capacity: MemoryLayout<UInt32>.size + payloadSize)
buffer.writeInteger(UInt32(payloadSize))
buffer.writeInteger(SessionMessage.invocationEnvelope.rawValue)
buffer.writeWithUnsafeMutableBytes(minimumWritableBytes: 0) { ptr in ULEB128.encode(actor.id.instanceID, to: ptr.baseAddress!) }
let targetOffset = InvocationEnvelope.encode(callID, invocation.genericSubstitutions, &invocation.arguments, to: &buffer)
ULEB128.encode(actor.id.instanceID, to: &buffer)
let targetOffset = InvocationEnvelope.encode(
callID,
serviceContext,
invocation.genericSubstitutions,
&invocation.arguments,
to: &buffer
)
let targetIdentifier = target.identifier

channel.eventLoop.execute { [buffer] in
Expand Down Expand Up @@ -1193,7 +1208,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {

func channelRead(_ channelID: UInt32, _ channel: Channel, _ buffer: inout ByteBuffer, _ targetFuncs: inout [String]) {
let bytesReceived = buffer.readableBytes
guard buffer.readInteger(as: UInt32.self) != nil, // skip message size
guard let messageSize = buffer.readInteger(as: UInt32.self),
let rawMessageType = buffer.readInteger(as: SessionMessage.RawValue.self)
else {
logger.error("\(channel.addressDescription): invalid message received (\(bytesReceived)), closing connection")
Expand Down Expand Up @@ -1225,7 +1240,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {
case .invocationEnvelope:
let instanceID = try Self.readULEB128(from: &buffer, as: EndpointIdentifier.InstanceIdentifier.self)
let endpointID = EndpointIdentifier(channelID, instanceID)
let envelope = try InvocationEnvelope(from: &buffer, &targetFuncs)
let envelope = try InvocationEnvelope(from: &buffer, messageSize, &targetFuncs)
try invokeLocalCall(envelope, for: endpointID, channel)
case .invocationResult:
try syncCallManager.handleResult(&buffer)
Expand Down Expand Up @@ -1402,15 +1417,55 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {
}
} else {
let resultHandler = ResultHandler(loggerBox, envelope.targetFunc)
try await executeDistributedTarget(on: actor,
target: RemoteCallTarget(envelope.targetFunc),
invocationDecoder: &decoder,
handler: resultHandler)
if resultHandler.hasResult {
try resultHandler.sendTo(channel, for: envelope.callID)

if envelope.serviceContext.isEmpty {
try await executeDistributedTarget(
on: actor,
target: RemoteCallTarget(envelope.targetFunc),
invocationDecoder: &decoder,
handler: resultHandler
)

if envelope.callID == 0 {
if resultHandler.hasResult {
logger.error("internal error: unexpected result for \(envelope.targetFunc)")
}
} else {
if resultHandler.hasResult {
try resultHandler.sendTo(channel, for: envelope.callID)
} else {
logger.error("internal error: missing result for \(envelope.targetFunc)")
}
}
} else {
if envelope.callID != 0 {
logger.error("internal error: missing result")
var requestBaggage = ServiceContext.current ?? .topLevel

struct ServiceContextExtractor: Extractor {
typealias Carrier = [String: String]
func extract(key: String, from carrier: Carrier) -> String? {
carrier[key]
}
}

InstrumentationSystem.instrument.extract(
envelope.serviceContext,
into: &requestBaggage,
using: ServiceContextExtractor(),
)

try await ServiceContext.withValue(requestBaggage) {
try await executeDistributedTarget(
on: actor,
target: RemoteCallTarget(envelope.targetFunc),
invocationDecoder: &decoder,
handler: resultHandler
)
}

if resultHandler.hasResult {
try resultHandler.sendTo(channel, for: envelope.callID)
} else {
logger.error("internal error: missing result for \(envelope.targetFunc)")
}
}
}
Expand All @@ -1425,7 +1480,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {
while true {
let oldSize = (oldState & sizeMask)
assert(oldSize >= envelope.size)
var newState = (oldState - envelope.size)
var newState = (oldState - UInt64(envelope.size))
let newSize = (newState & sizeMask)
if (newSize < endpointQueueLowWatermark) && (oldSize >= endpointQueueLowWatermark) && ((oldState & Self.endpointQueueSuspendIndicator) != 0) {
newState -= Self.endpointQueueSuspendIndicator
Expand Down Expand Up @@ -1498,7 +1553,7 @@ public class DistributedSystem: DistributedActorSystem, @unchecked Sendable {
let sizeMask = ((UInt64(1) << sizeBits) - 1)
var oldState = queueState.load(ordering: .relaxed)
while true {
var newState = (oldState + envelope.size)
var newState = (oldState + UInt64(envelope.size))
let oldSize = (oldState & sizeMask)
let newSize = (newState & sizeMask)
let warningSize = (endpointQueueWarningSize << ((oldState >> sizeBits) & 0x7F))
Expand Down
Loading
Loading