Skip to content

Commit 5b5f05d

Browse files
committed
Merge branch 'dreibh/tcp-graceful-shutdown'
2 parents 08efe30 + 4ffd618 commit 5b5f05d

File tree

6 files changed

+71
-63
lines changed

6 files changed

+71
-63
lines changed

src/control.cc

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ bool performNetPerfMeterStart(MessageReader* messageReader,
408408
}
409409

410410
// ====== Start flows ====================================================
411-
const bool success = FlowManager::getFlowManager()->startMeasurement(
411+
const bool success = FlowManager::getFlowManager()->beginMeasurement(
412412
controlSocket, measurementID, getMicroTime(),
413413
vectorNamePattern, vectorFileFormat,
414414
scalarNamePattern, scalarFileFormat);
@@ -434,7 +434,7 @@ bool performNetPerfMeterStart(MessageReader* messageReader,
434434
}
435435

436436
LOG_INFO
437-
stdlog << format("Starting Measurement $%llx on socket %d ...",
437+
stdlog << format("Starting measurement $%llx on socket %d ...",
438438
(unsigned long long)measurementID, controlSocket) << "\n";
439439
LOG_END
440440
if(ext_send(controlSocket, &startMsg, sizeof(startMsg), 0) < 0) {
@@ -493,7 +493,12 @@ bool performNetPerfMeterStop(MessageReader* messageReader,
493493
{
494494
// ====== Stop flows =====================================================
495495
FlowManager::getFlowManager()->lock();
496-
FlowManager::getFlowManager()->stopMeasurement(controlSocket, measurementID);
496+
std::vector<Flow*>::iterator iterator = FlowManager::getFlowManager()->getFlowSet().begin();
497+
while(iterator != FlowManager::getFlowManager()->getFlowSet().end()) {
498+
Flow* flow = *iterator;
499+
flow->deactivate(true);
500+
iterator++;
501+
}
497502
Measurement* measurement = FlowManager::getFlowManager()->findMeasurement(controlSocket,
498503
measurementID);
499504
assure(measurement != nullptr);
@@ -560,8 +565,10 @@ bool performNetPerfMeterStop(MessageReader* messageReader,
560565

561566
// ====== Download flow results and remove the flows =====================
562567
FlowManager::getFlowManager()->lock();
563-
std::vector<Flow*>::iterator iterator = FlowManager::getFlowManager()->getFlowSet().begin();
564-
while(iterator != FlowManager::getFlowManager()->getFlowSet().end()) {
568+
569+
for(std::vector<Flow*>::iterator iterator = FlowManager::getFlowManager()->getFlowSet().begin();
570+
iterator != FlowManager::getFlowManager()->getFlowSet().end();
571+
iterator++) {
565572
Flow* flow = *iterator;
566573
if(flow->getMeasurementID() == measurementID) {
567574
if(sendNetPerfMeterRemoveFlow(messageReader, controlSocket,
@@ -572,13 +579,23 @@ bool performNetPerfMeterStop(MessageReader* messageReader,
572579
LOG_INFO
573580
flow->print(stdlog, true);
574581
LOG_END
582+
}
583+
}
584+
585+
iterator = FlowManager::getFlowManager()->getFlowSet().begin();
586+
while(iterator != FlowManager::getFlowManager()->getFlowSet().end()) {
587+
Flow* flow = *iterator;
588+
if(flow->getMeasurementID() == measurementID) {
589+
flow->deactivate(false);
575590
delete flow;
576-
// Invalidated iterator. Is there a better solution?
577591
iterator = FlowManager::getFlowManager()->getFlowSet().begin();
578592
continue;
579593
}
580594
iterator++;
581595
}
596+
597+
FlowManager::getFlowManager()->finishMeasurement(controlSocket, measurementID);
598+
582599
FlowManager::getFlowManager()->unlock();
583600

584601
// ====== Remove the Measurement object =================================
@@ -850,7 +867,7 @@ static bool handleNetPerfMeterRemoveFlow(MessageReader* me
850867
Flow* flow = FlowManager::getFlowManager()->findFlow(measurementID, flowID, streamID);
851868
if(flow == nullptr) {
852869
LOG_WARNING
853-
stdlog << format("NETPERFMETER_ADD_REMOVE tried to remove not-existing flow on socket %d!",
870+
stdlog << format("NETPERFMETER_REMOVE_FLOW tried to remove not-existing flow on socket %d!",
854871
controlSocket) << "\n";
855872
LOG_END
856873
return(sendNetPerfMeterAcknowledge(controlSocket,
@@ -888,7 +905,7 @@ static bool handleNetPerfMeterStart(MessageReader* messageReade
888905
}
889906
const uint64_t measurementID = ntoh64(startMsg->MeasurementID);
890907
LOG_INFO
891-
stdlog << format("Starting Measurement $%llx on socket %d ...",
908+
stdlog << format("Starting measurement $%llx on socket %d ...",
892909
(unsigned long long)measurementID, controlSocket) << "\n";
893910
LOG_END
894911

@@ -908,7 +925,7 @@ static bool handleNetPerfMeterStart(MessageReader* messageReade
908925
}
909926

910927
const unsigned long long now = getMicroTime();
911-
bool success = FlowManager::getFlowManager()->startMeasurement(
928+
bool success = FlowManager::getFlowManager()->beginMeasurement(
912929
controlSocket, measurementID, now,
913930
nullptr, vectorFileFormat,
914931
nullptr, scalarFileFormat);
@@ -936,13 +953,13 @@ static bool handleNetPerfMeterStop(MessageReader* messageReader,
936953
}
937954
const uint64_t measurementID = ntoh64(stopMsg->MeasurementID);
938955
LOG_INFO
939-
stdlog << format("Stopping Measurement $%llx on socket %d ...",
956+
stdlog << format("Stopping measurement $%llx on socket %d ...",
940957
(unsigned long long)measurementID, controlSocket) << "\n";
941958
LOG_END
942959

943960
// ====== Stop flows =====================================================
944961
FlowManager::getFlowManager()->lock();
945-
FlowManager::getFlowManager()->stopMeasurement(controlSocket, measurementID);
962+
946963
bool success = false;
947964
Measurement* measurement =
948965
FlowManager::getFlowManager()->findMeasurement(controlSocket, measurementID);
@@ -960,8 +977,10 @@ static bool handleNetPerfMeterStop(MessageReader* messageReader,
960977
Flow* flow = *iterator;
961978
if(flow->getMeasurement() == measurement) {
962979
flow->setMeasurement(nullptr);
980+
flow->deactivate(true);
963981
}
964982
}
983+
FlowManager::getFlowManager()->finishMeasurement(controlSocket, measurementID);
965984
FlowManager::getFlowManager()->unlock();
966985

967986
// ====== Acknowledge result =============================================

src/flow.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,17 +284,17 @@ void Flow::deactivate(const bool asyncStop)
284284
OutputStatus = Off;
285285
PollFDEntry = nullptr; // Poll FD entry is now invalid!
286286
unlock();
287-
stop();
288287
if(SocketDescriptor >= 0) {
289288
if(TrafficSpec.Protocol == IPPROTO_UDP) {
290289
// NOTE: There is only one UDP socket. We cannot close it here!
291290
// The thread will notice the need to finish after the poll()
292291
// timeout.
293292
}
294293
else {
295-
ext_shutdown(SocketDescriptor, SHUT_RDWR);
294+
ext_shutdown(SocketDescriptor, (TrafficSpec.Protocol == IPPROTO_TCP) ? SHUT_WR : SHUT_RDWR);
296295
}
297296
}
297+
stop();
298298
if(!asyncStop) {
299299
waitForFinish();
300300
FlowManager::getFlowManager()->getMessageReader()->deregisterSocket(SocketDescriptor);

src/flowmanager.cc

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,8 @@ Flow* FlowManager::findFlow(const struct sockaddr* from)
222222
}
223223

224224

225-
// ###### Start measurement #################################################
226-
bool FlowManager::startMeasurement(const int controlSocket,
225+
// ###### Begin measurement #################################################
226+
bool FlowManager::beginMeasurement(const int controlSocket,
227227
const uint64_t measurementID,
228228
const unsigned long long now,
229229
const char* vectorNamePattern,
@@ -274,7 +274,7 @@ bool FlowManager::startMeasurement(const int controlSocket,
274274

275275
if(success) {
276276
LOG_INFO
277-
stdlog << format("Started Measurement $%llx on socket %d:",
277+
stdlog << format("Prepared measurement $%llx on socket %d:",
278278
measurementID, controlSocket)
279279
<< "\n" << ss.str();
280280
LOG_END
@@ -283,43 +283,16 @@ bool FlowManager::startMeasurement(const int controlSocket,
283283
}
284284

285285

286-
// ###### Stop measurement ##################################################
287-
void FlowManager::stopMeasurement(const int controlSocket,
288-
const uint64_t measurementID,
289-
const unsigned long long now)
286+
// ###### Finish measurement ##################################################
287+
void FlowManager::finishMeasurement(const int controlSocket,
288+
const uint64_t measurementID,
289+
const unsigned long long now)
290290
{
291-
std::stringstream ss;
292-
293291
CPULoadStats.update();
294-
lock();
295-
296-
// We make a two-staged stopping process here:
297-
// In stage 0, the flows' sender threads are told to stop.
298-
// => all threads can perform shutdown simultaneously
299-
// => much faster if there are many flows
300-
// In stage 1, we wait until the threads have stopped.
301-
for(unsigned int stage = 0;stage < 2;stage++) {
302-
for(std::vector<Flow*>::iterator iterator = FlowSet.begin();
303-
iterator != FlowSet.end();iterator++) {
304-
Flow* flow = *iterator;
305-
if(flow->MeasurementID == measurementID) {
306-
// ====== Stop flow ================================================
307-
if(stage == 0) {
308-
flow->deactivate(true);
309-
}
310-
else {
311-
flow->deactivate(false);
312-
flow->print(ss);
313-
}
314-
}
315-
}
316-
}
317-
unlock();
318292

319293
LOG_INFO
320-
stdlog << format("Stopped Measurement $%llx on socket %d:",
321-
measurementID, controlSocket)
322-
<< "\n" << ss.str();
294+
stdlog << format("Stopped measurement $%llx on socket %d\n",
295+
measurementID, controlSocket);
323296
LOG_END
324297
}
325298

src/flowmanager.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,16 @@ class FlowManager : public Thread
8989
void printFlows(std::ostream& os,
9090
const bool printStatistics);
9191

92-
bool startMeasurement(const int controlSocket,
92+
bool beginMeasurement(const int controlSocket,
9393
const uint64_t measurementID,
9494
const unsigned long long now,
9595
const char* vectorNamePattern,
9696
const OutputFileFormat vectorFileFormat,
9797
const char* scalarNamePattern,
9898
const OutputFileFormat scalarFileFormat);
99-
void stopMeasurement(const int controlSocket,
100-
const uint64_t measurementID,
101-
const unsigned long long now = getMicroTime());
99+
void finishMeasurement(const int controlSocket,
100+
const uint64_t measurementID,
101+
const unsigned long long now = getMicroTime());
102102

103103
void writeScalarStatistics(const uint64_t measurementID,
104104
const unsigned long long now,

src/netperfmeter.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,10 +1473,14 @@ void passiveMode(const uint16_t localPort)
14731473

14741474

14751475
// ====== Clean up =======================================================
1476-
gMessageReader.deregisterSocket(gControlSocketTCP);
1477-
ext_close(gControlSocketTCP);
1478-
gMessageReader.deregisterSocket(gControlSocket);
1479-
ext_close(gControlSocket);
1476+
if(gControlSocketTCP >= 0) {
1477+
gMessageReader.deregisterSocket(gControlSocketTCP);
1478+
ext_close(gControlSocketTCP);
1479+
}
1480+
if(gControlSocket>= 0) {
1481+
gMessageReader.deregisterSocket(gControlSocket);
1482+
ext_close(gControlSocket);
1483+
}
14801484
ext_close(gTCPSocket);
14811485
#ifdef HAVE_MPTCP
14821486
if(gMPTCPSocket >= 0) {
@@ -1485,7 +1489,9 @@ void passiveMode(const uint16_t localPort)
14851489
#endif
14861490
FlowManager::getFlowManager()->removeUnidentifiedSocket(gUDPSocket, false);
14871491
ext_close(gUDPSocket);
1488-
ext_close(gSCTPSocket);
1492+
if(gSCTPSocket >= 0) {
1493+
ext_close(gSCTPSocket);
1494+
}
14891495
#ifdef HAVE_DCCP
14901496
if(gDCCPSocket >= 0) {
14911497
ext_close(gDCCPSocket);

src/transfer.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,20 @@ bool handleNetPerfMeterData(const bool isActiveMode,
309309
Flow* flow = FlowManager::getFlowManager()->findFlow(sd, sinfo.sinfo_stream);
310310
if(flow) {
311311
flow->lock();
312-
LOG_WARNING
313-
stdlog << format("End of input for flow #%u on socket %d!",
314-
flow->getFlowID(), sd) << "\n";
315-
LOG_END
312+
if(!flow->isAcceptedIncomingFlow()) {
313+
// The outgoing flow on the active side is closed:
314+
LOG_WARNING
315+
stdlog << format("End of input for flow #%u on socket %d!",
316+
flow->getFlowID(), sd) << "\n";
317+
LOG_END
318+
}
319+
else {
320+
// This is probably just the regular connection shutdown:
321+
LOG_DEBUG
322+
stdlog << format("End of input for flow #%u on socket %d!",
323+
flow->getFlowID(), sd) << "\n";
324+
LOG_END
325+
}
316326
flow->unlock();
317327
flow->endOfInput();
318328
}
@@ -325,7 +335,7 @@ bool handleNetPerfMeterData(const bool isActiveMode,
325335
if(protocol != IPPROTO_UDP) {
326336
ext_shutdown(sd, SHUT_RDWR);
327337
}
328-
return false;
338+
return flow->isAcceptedIncomingFlow(); // No error for incoming flow!
329339
}
330340

331341
return true;

0 commit comments

Comments
 (0)