Skip to content

Commit 694c1db

Browse files
committed
Modify high water level callback
1 parent 31c0988 commit 694c1db

File tree

3 files changed

+54
-18
lines changed

3 files changed

+54
-18
lines changed

trantor/net/TcpConnection.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,12 @@ class TRANTOR_EXPORT TcpConnection
359359
{
360360
sslErrorCallback_ = std::move(cb);
361361
}
362-
362+
/**
363+
* @brief Get the data length in the sending buffer. The sending buffer is
364+
* in the user memory space.
365+
* @note This method should be called in the right event loop.
366+
*/
367+
virtual size_t getBufferedDataLength() const = 0;
363368
// TODO: These should be internal APIs
364369
virtual void connectEstablished() = 0;
365370
virtual void connectDestroyed() = 0;

trantor/net/inner/TcpConnectionImpl.cc

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,18 @@ void TcpConnectionImpl::setTcpNoDelay(bool on)
320320
{
321321
socketPtr_->setTcpNoDelay(on);
322322
}
323+
void TcpConnectionImpl::checkBufferedDataSize()
324+
{
325+
loop_->assertInLoopThread();
326+
if (highWaterMarkCallback_)
327+
{
328+
auto bufferedDataSize = getBufferedDataLength();
329+
if (bufferedDataSize > highWaterMarkLen_)
330+
{
331+
highWaterMarkCallback_(shared_from_this(), bufferedDataSize);
332+
}
333+
}
334+
}
323335
void TcpConnectionImpl::connectDestroyed()
324336
{
325337
loop_->assertInLoopThread();
@@ -415,21 +427,7 @@ void TcpConnectionImpl::sendInLoop(const char *buffer, size_t length)
415427
writeBufferList_.back()->append(static_cast<const char *>(buffer) +
416428
sendLen,
417429
length);
418-
if (highWaterMarkCallback_ &&
419-
writeBufferList_.back()->remainingBytes() >
420-
static_cast<long long>(highWaterMarkLen_))
421-
{
422-
highWaterMarkCallback_(shared_from_this(),
423-
writeBufferList_.back()->remainingBytes());
424-
}
425-
if (highWaterMarkCallback_ && tlsProviderPtr_ &&
426-
tlsProviderPtr_->getBufferedData().readableBytes() >
427-
highWaterMarkLen_)
428-
{
429-
highWaterMarkCallback_(
430-
shared_from_this(),
431-
tlsProviderPtr_->getBufferedData().readableBytes());
432-
}
430+
checkBufferedDataSize();
433431
}
434432
}
435433
// The order of data sending should be same as the order of calls of send()
@@ -598,12 +596,16 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode)
598596
{
599597
auto n = sendNodeInLoop(fileNode);
600598
if (fileNode->remainingBytes() > 0 && n >= 0)
599+
{
601600
writeBufferList_.push_back(std::move(fileNode));
601+
checkBufferedDataSize();
602+
}
602603
return;
603604
}
604605
else
605606
{
606607
writeBufferList_.push_back(std::move(fileNode));
608+
checkBufferedDataSize();
607609
}
608610
}
609611
else
@@ -614,11 +616,15 @@ void TcpConnectionImpl::sendFile(BufferNodePtr &&fileNode)
614616
{
615617
auto n = thisPtr->sendNodeInLoop(node);
616618
if (node->remainingBytes() > 0 && n >= 0)
619+
{
617620
thisPtr->writeBufferList_.push_back(std::move(node));
621+
thisPtr->checkBufferedDataSize();
622+
}
618623
}
619624
else
620625
{
621626
thisPtr->writeBufferList_.push_back(std::move(node));
627+
thisPtr->checkBufferedDataSize();
622628
}
623629
});
624630
}
@@ -634,7 +640,10 @@ void TcpConnectionImpl::sendStream(
634640
{
635641
auto n = sendNodeInLoop(node);
636642
if (node->remainingBytes() > 0 && n >= 0)
643+
{
637644
writeBufferList_.push_back(std::move(node));
645+
checkBufferedDataSize();
646+
}
638647
return;
639648
}
640649
}
@@ -647,11 +656,15 @@ void TcpConnectionImpl::sendStream(
647656
{
648657
auto n = thisPtr->sendNodeInLoop(node);
649658
if (node->remainingBytes() > 0 && n >= 0)
659+
{
650660
thisPtr->writeBufferList_.push_back(std::move(node));
661+
thisPtr->checkBufferedDataSize();
662+
}
651663
}
652664
else
653665
{
654666
thisPtr->writeBufferList_.push_back(std::move(node));
667+
thisPtr->checkBufferedDataSize();
655668
}
656669
});
657670
}
@@ -959,8 +972,8 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff)
959972
idleTimeoutBackup_ = idleTimeout_;
960973
idleTimeout_ = 0;
961974
}
962-
963975
writeBufferList_.push_back(asyncStreamNode);
976+
checkBufferedDataSize();
964977
}
965978
else
966979
{
@@ -984,11 +997,15 @@ AsyncStreamPtr TcpConnectionImpl::sendAsyncStream(bool disableKickoff)
984997
{
985998
auto n = thisPtr->sendNodeInLoop(node);
986999
if (n >= 0 && (node->remainingBytes() > 0 || node->available()))
1000+
{
9871001
thisPtr->writeBufferList_.push_back(std::move(node));
1002+
thisPtr->checkBufferedDataSize();
1003+
}
9881004
}
9891005
else
9901006
{
9911007
thisPtr->writeBufferList_.push_back(std::move(node));
1008+
thisPtr->checkBufferedDataSize();
9921009
}
9931010
});
9941011
}

trantor/net/inner/TcpConnectionImpl.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,17 +202,31 @@ class TcpConnectionImpl : public TcpConnection,
202202
idleTimeout_ = timeout;
203203
timingWheel->insertEntry(timeout, entry);
204204
}
205+
size_t getBufferedDataLength() const override
206+
{
207+
loop_->assertInLoopThread();
208+
size_t len = 0;
209+
if (tlsProviderPtr_)
210+
{
211+
len += tlsProviderPtr_->getBufferedData().readableBytes();
212+
}
213+
for (auto &node : writeBufferList_)
214+
{
215+
len += node->remainingBytes();
216+
}
217+
return len;
218+
}
205219

206220
private:
207221
/// Internal use only.
208-
209222
std::weak_ptr<KickoffEntry> kickoffEntry_;
210223
std::weak_ptr<TimingWheel> timingWheelWeakPtr_;
211224
size_t idleTimeout_{0};
212225
size_t idleTimeoutBackup_{0};
213226
Date lastTimingWheelUpdateTime_;
214227
void extendLife();
215228
void sendFile(BufferNodePtr &&fileNode);
229+
void checkBufferedDataSize();
216230

217231
protected:
218232
enum class ConnStatus

0 commit comments

Comments
 (0)