Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions ntcore/src/main/native/cpp/NetworkServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class NetworkServer::ServerConnection {
void SetupOutgoingTimer();
void UpdateOutgoingTimer(uint32_t repeatMs);
void ConnectionClosed();
uv::Loop& GetLoopRef() const { return m_outgoingTimer->GetLoopRef(); }

NetworkServer& m_server;
ConnectionInfo m_info;
Expand Down Expand Up @@ -239,13 +238,11 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) {
m_server.m_idle->Start();
}
m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count());
});
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) {
m_server.m_idle->Start();
}
m_server.m_serverImpl.SendAllLocalOutgoing(GetLoopRef().Now().count());
});

SetupOutgoingTimer();
Expand Down Expand Up @@ -425,7 +422,6 @@ void NetworkServer::Init() {
DEBUG4("Stopping idle processing");
m_idle->Stop(); // go back to sleep
}
m_serverImpl.SendAllLocalOutgoing(m_loop.Now().count());
});
}

Expand Down
10 changes: 8 additions & 2 deletions ntcore/src/main/native/cpp/net/NetworkOutgoingQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ concept NetworkMessage =
std::same_as<typename MessageType::ValueMsg, ServerValueMsg> ||
std::same_as<typename MessageType::ValueMsg, ClientValueMsg>;

enum class ValueSendMode { kDisabled = 0, kAll, kNormal };
enum class ValueSendMode { kDisabled = 0, kAll, kNormal, kImm };

template <NetworkMessage MessageType>
class NetworkOutgoingQueue {
Expand Down Expand Up @@ -115,13 +115,19 @@ class NetworkOutgoingQueue {
}

void SendValue(int id, const Value& value, ValueSendMode mode) {
if (m_local) {
mode = ValueSendMode::kImm; // always send local immediately
}
// backpressure by stopping sending all if the buffer is too full
if (mode == ValueSendMode::kAll && m_totalSize >= kOutgoingLimit) {
mode = ValueSendMode::kNormal;
}
switch (mode) {
case ValueSendMode::kDisabled: // do nothing
break;
case ValueSendMode::kImm: // send immediately
m_wire.SendBinary([&](auto& os) { EncodeValue(os, id, value); });
break;
case ValueSendMode::kAll: { // append to outgoing
auto& info = m_idMap[id];
auto& queue = m_queues[info.queueIndex];
Expand Down Expand Up @@ -161,7 +167,7 @@ class NetworkOutgoingQueue {
return; // nothing to do
}

// rate limit frequency of transmissions
// rate limit frequency of transmissions for remote connections
if (!m_local && curTimeMs < (m_lastSendMs + kMinPeriodMs)) {
return;
}
Expand Down
8 changes: 0 additions & 8 deletions ntcore/src/main/native/cpp/server/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,6 @@ void ServerImpl::SendOutgoing(int clientId, uint64_t curTimeMs) {
}
}

void ServerImpl::SendAllLocalOutgoing(uint64_t curTimeMs) {
for (auto&& client : m_clients) {
if (client && client->IsLocal()) {
client->SendOutgoing(curTimeMs, true);
}
}
}

void ServerImpl::SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue) {
DEBUG4("SetLocal()");
Expand Down
1 change: 0 additions & 1 deletion ntcore/src/main/native/cpp/server/ServerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class ServerImpl final {

void SendAllOutgoing(uint64_t curTimeMs, bool flush);
void SendOutgoing(int clientId, uint64_t curTimeMs);
void SendAllLocalOutgoing(uint64_t curTimeMs);

void SetLocal(net::ServerMessageHandler* local,
net::ClientMessageQueue* queue);
Expand Down
Loading