Skip to content

Commit c033996

Browse files
authored
Merge IC zero copy socket send in to stream-nb-24-4 (#26095)
1 parent 2c109ac commit c033996

31 files changed

+1211
-99
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,17 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter
586586
result.EventDelay = TDuration::MicroSeconds(config.GetEventDelayMicrosec());
587587
}
588588

589+
if (config.HasSocketSendOptimization()) {
590+
switch (config.GetSocketSendOptimization()) {
591+
case NKikimrConfig::TInterconnectConfig::IC_SO_DISABLED:
592+
result.SocketSendOptimization = ESocketSendOptimization::DISABLED;
593+
break;
594+
case NKikimrConfig::TInterconnectConfig::IC_SO_MSG_ZEROCOPY:
595+
result.SocketSendOptimization = ESocketSendOptimization::IC_MSG_ZEROCOPY;
596+
break;
597+
}
598+
}
599+
589600
return result;
590601
}
591602

ydb/core/protos/config.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,11 @@ message TInterconnectConfig {
407407
REQUIRED = 2;
408408
};
409409

410+
enum ESocketSendOptimization {
411+
IC_SO_DISABLED = 0;
412+
IC_SO_MSG_ZEROCOPY = 1;
413+
};
414+
410415
repeated TChannel Channel = 1;
411416
optional bool FirstTryBeforePoll = 2; // DEPRECATED
412417
optional bool StartTcp = 3 [default = false];
@@ -436,9 +441,10 @@ message TInterconnectConfig {
436441
optional bool SuppressConnectivityCheck = 39 [default = false];
437442
optional uint32 PreallocatedBufferSize = 40;
438443
optional uint32 NumPreallocatedBuffers = 41;
439-
optional bool EnableExternalDataChannel = 42;
444+
optional bool EnableExternalDataChannel = 42 [default = false];
440445
optional bool ValidateIncomingPeerViaDirectLookup = 44;
441446
optional uint32 SocketBacklogSize = 45; // SOMAXCONN if not set or zero
447+
optional ESocketSendOptimization SocketSendOptimization = 51 [default = IC_SO_DISABLED];
442448

443449
// ballast is added to IC handshake frames to ensure correctness of jumbo frames transmission over network
444450
optional uint32 HandshakeBallastSize = 14;

ydb/library/actors/interconnect/channel_scheduler.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace NActors {
1212
std::array<std::optional<TEventOutputChannel>, 16> ChannelArray;
1313
THashMap<ui16, TEventOutputChannel> ChannelMap;
1414
std::shared_ptr<IInterconnectMetrics> Metrics;
15-
TEventHolderPool& Pool;
1615
const ui32 MaxSerializedEventSize;
1716
const TSessionParams Params;
1817

@@ -29,11 +28,10 @@ namespace NActors {
2928

3029
public:
3130
TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels,
32-
std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize,
31+
std::shared_ptr<IInterconnectMetrics> metrics, ui32 maxSerializedEventSize,
3332
TSessionParams params)
3433
: PeerNodeId(peerNodeId)
3534
, Metrics(std::move(metrics))
36-
, Pool(pool)
3735
, MaxSerializedEventSize(maxSerializedEventSize)
3836
, Params(std::move(params))
3937
{
@@ -72,15 +70,15 @@ namespace NActors {
7270
if (channel < ChannelArray.size()) {
7371
auto& res = ChannelArray[channel];
7472
if (Y_UNLIKELY(!res)) {
75-
res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics,
73+
res.emplace(channel, PeerNodeId, MaxSerializedEventSize, Metrics,
7674
Params);
7775
}
7876
return *res;
7977
} else {
8078
auto it = ChannelMap.find(channel);
8179
if (Y_UNLIKELY(it == ChannelMap.end())) {
8280
it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel),
83-
std::forward_as_tuple(Pool, channel, PeerNodeId, MaxSerializedEventSize,
81+
std::forward_as_tuple(channel, PeerNodeId, MaxSerializedEventSize,
8482
Metrics, Params)).first;
8583
}
8684
return it->second;

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "interconnect_channel.h"
2+
#include "interconnect_zc_processor.h"
23

34
#include <ydb/library/actors/core/events.h>
45
#include <ydb/library/actors/core/executor_thread.h>
@@ -56,10 +57,10 @@ namespace NActors {
5657
return true;
5758
}
5859

59-
void TEventOutputChannel::DropConfirmed(ui64 confirm) {
60+
void TEventOutputChannel::DropConfirmed(ui64 confirm, TEventHolderPool& pool) {
6061
LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages");
6162
for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) {
62-
Pool.Release(NotYetConfirmed, it++);
63+
pool.Release(NotYetConfirmed, it++);
6364
}
6465
}
6566

@@ -185,7 +186,7 @@ namespace NActors {
185186
if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 64) {
186187
task.Write<External>(data, len);
187188
} else {
188-
task.Append<External>(data, len);
189+
task.Append<External>(data, len, &event.ZcTransferId);
189190
}
190191
*bytesSerialized += len;
191192
Y_DEBUG_ABORT_UNLESS(len <= PartLenRemain);
@@ -314,17 +315,19 @@ namespace NActors {
314315
};
315316
char *ptr = reinterpret_cast<char*>(part + 1);
316317
*ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA);
317-
*reinterpret_cast<ui16*>(ptr) = bytesSerialized;
318+
319+
WriteUnaligned<ui16>(ptr, bytesSerialized);
318320
ptr += sizeof(ui16);
319321
if (task.ChecksummingXxhash()) {
320322
XXH3_state_t state;
321323
XXH3_64bits_reset(&state);
322324
task.XdcStream.ScanLastBytes(bytesSerialized, [&state](TContiguousSpan span) {
323325
XXH3_64bits_update(&state, span.data(), span.size());
324326
});
325-
*reinterpret_cast<ui32*>(ptr) = XXH3_64bits_digest(&state);
327+
const ui32 cs = XXH3_64bits_digest(&state);
328+
WriteUnaligned<ui32>(ptr, cs);
326329
} else if (task.ChecksummingCrc32c()) {
327-
*reinterpret_cast<ui32*>(ptr) = task.ExternalChecksum;
330+
WriteUnaligned<ui32>(ptr, task.ExternalChecksum);
328331
}
329332

330333
task.WriteBookmark(std::move(partBookmark), buffer, partSize);
@@ -335,7 +338,7 @@ namespace NActors {
335338
return complete;
336339
}
337340

338-
void TEventOutputChannel::NotifyUndelivered() {
341+
void TEventOutputChannel::ProcessUndelivered(TEventHolderPool& pool, NInterconnect::IZcGuard* zg) {
339342
LOG_DEBUG_IC_SESSION("ICOCH89", "Notyfying about Undelivered messages! NotYetConfirmed size: %zu, Queue size: %zu", NotYetConfirmed.size(), Queue.size());
340343
if (State == EState::BODY && Queue.front().Event) {
341344
Y_ABORT_UNLESS(!Chunker.IsComplete()); // chunk must have an event being serialized
@@ -350,11 +353,17 @@ namespace NActors {
350353
item.ForwardOnNondelivery(true);
351354
}
352355
}
353-
Pool.Release(NotYetConfirmed);
356+
357+
// Events in the NotYetConfirmed may be actualy not sended by kernel.
358+
// In case of enabled ZC we need to wait kernel send task to be completed before reusing buffers
359+
if (zg) {
360+
zg->ExtractToSafeTermination(NotYetConfirmed);
361+
}
362+
pool.Release(NotYetConfirmed);
354363
for (auto& item : Queue) {
355364
item.ForwardOnNondelivery(false);
356365
}
357-
Pool.Release(Queue);
366+
pool.Release(Queue);
358367
}
359368

360369
}

ydb/library/actors/interconnect/interconnect_channel.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
#include "packet.h"
1616
#include "event_holder_pool.h"
1717

18+
namespace NInterconnect {
19+
class IZcGuard;
20+
}
21+
1822
namespace NActors {
1923
#pragma pack(push, 1)
2024

@@ -59,10 +63,9 @@ namespace NActors {
5963

6064
class TEventOutputChannel : public TInterconnectLoggingBase {
6165
public:
62-
TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
66+
TEventOutputChannel(ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
6367
std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
6468
: TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
65-
, Pool(pool)
6669
, PeerNodeId(peerNodeId)
6770
, ChannelId(id)
6871
, Metrics(std::move(metrics))
@@ -73,8 +76,8 @@ namespace NActors {
7376
~TEventOutputChannel() {
7477
}
7578

76-
std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
77-
TEventHolder& event = Pool.Allocate(Queue);
79+
std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TEventHolderPool& pool) {
80+
TEventHolder& event = pool.Allocate(Queue);
7881
const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr2);
7982
OutputQueueSize += bytes;
8083
if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
@@ -85,7 +88,7 @@ namespace NActors {
8588
return std::make_pair(bytes, &event);
8689
}
8790

88-
void DropConfirmed(ui64 confirm);
91+
void DropConfirmed(ui64 confirm, TEventHolderPool& pool);
8992

9093
bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);
9194

@@ -105,9 +108,8 @@ namespace NActors {
105108
return OutputQueueSize;
106109
}
107110

108-
void NotifyUndelivered();
111+
void ProcessUndelivered(TEventHolderPool& pool, NInterconnect::IZcGuard* zg);
109112

110-
TEventHolderPool& Pool;
111113
const ui32 PeerNodeId;
112114
const ui16 ChannelId;
113115
std::shared_ptr<IInterconnectMetrics> Metrics;

ydb/library/actors/interconnect/interconnect_common.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ namespace NActors {
2222
REQUIRED, // encryption is mandatory
2323
};
2424

25+
enum class ESocketSendOptimization {
26+
DISABLED,
27+
IC_MSG_ZEROCOPY,
28+
};
29+
2530
struct TInterconnectSettings {
2631
TDuration Handshake;
2732
TDuration DeadPeer;
@@ -48,13 +53,14 @@ namespace NActors {
4853
ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
4954
ui32 PreallocatedBufferSize = 8 << 10; // 8 KB
5055
ui32 NumPreallocatedBuffers = 16;
51-
bool EnableExternalDataChannel = false;
56+
bool EnableExternalDataChannel = true;
5257
bool ValidateIncomingPeerViaDirectLookup = false;
5358
ui32 SocketBacklogSize = 0; // SOMAXCONN if zero
5459
TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
5560
TDuration MaxErrorSleep = TDuration::Seconds(1);
5661
double ErrorSleepRetryMultiplier = 4.0;
5762
TDuration EventDelay = TDuration::Zero();
63+
ESocketSendOptimization SocketSendOptimization = ESocketSendOptimization::DISABLED;
5864
};
5965

6066
struct TWhiteboardSessionStatus {

ydb/library/actors/interconnect/interconnect_mon.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ namespace NInterconnect {
130130
}
131131
TABLED() { str << kv.second.TotalOutputQueueSize; }
132132
TABLED() { str << (kv.second.Connected ? "yes" : "<strong>no</strong>"); }
133-
TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no"); }
133+
TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no")
134+
<< " (" << (kv.second.XDCFlags & TInterconnectProxyTCP::TProxyStats::XDCFlags::MSG_ZERO_COPY_SEND ? "MSG_ZC_SEND" : "_") << ")"; }
134135
TABLED() { str << kv.second.Host; }
135136
TABLED() { str << kv.second.Port; }
136137
TABLED() {

ydb/library/actors/interconnect/interconnect_stream.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,29 @@ namespace NInterconnect {
112112

113113
ssize_t
114114
TStreamSocket::Send(const void* msg, size_t len, TString* /*err*/) const {
115-
const auto ret = ::send(Descriptor, static_cast<const char*>(msg), int(len), 0);
115+
return SendWithFlags(msg, len, 0);
116+
}
117+
118+
ssize_t
119+
TStreamSocket::SendWithFlags(const void* msg, size_t len, int flags) const {
120+
const auto ret = ::send(Descriptor, static_cast<const char*>(msg), int(len), flags);
116121
if (ret < 0)
117122
return -LastSocketError();
118123

119124
return ret;
120125
}
121126

127+
#if defined(__linux__)
128+
ssize_t
129+
TStreamSocket::RecvErrQueue(struct msghdr* msg) const {
130+
const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE);
131+
if (ret < 0)
132+
return -LastSocketError();
133+
134+
return ret;
135+
}
136+
#endif
137+
122138
ssize_t
123139
TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const {
124140
const auto ret = ::recv(Descriptor, static_cast<char*>(buf), int(len), 0);

ydb/library/actors/interconnect/interconnect_stream.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ namespace NInterconnect {
5858
virtual ssize_t WriteV(const struct iovec* iov, int iovcnt) const;
5959
virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const;
6060

61+
ssize_t SendWithFlags(const void* msg, size_t len, int flags) const;
62+
#if defined(__linux__)
63+
ssize_t RecvErrQueue(struct msghdr* msg) const;
64+
#endif
65+
6166
int Connect(const TAddress& addr) const;
6267
int Connect(const NAddr::IRemoteAddr* addr) const;
6368
int Listen(int backlog) const;

ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,14 +575,14 @@ namespace NActors {
575575
throw TExDestroySession{TDisconnectReason::FormatError()};
576576
}
577577

578-
auto size = *reinterpret_cast<const ui16*>(ptr);
578+
const ui16 size = ReadUnaligned<ui16>(ptr);
579579
if (!size) {
580580
LOG_CRIT_IC_SESSION("ICIS03", "XDC empty payload");
581581
throw TExDestroySession{TDisconnectReason::FormatError()};
582582
}
583583

584584
if (!Params.Encryption) {
585-
const ui32 checksumExpected = *reinterpret_cast<const ui32*>(ptr + sizeof(ui16));
585+
const ui32 checksumExpected = ReadUnaligned<ui32>(ptr + sizeof(ui16));
586586
XdcChecksumQ.emplace_back(size, checksumExpected);
587587
}
588588

0 commit comments

Comments
 (0)