Skip to content

Commit af43eb9

Browse files
authored
Merge pull request #111 from bonitoo-io/feat/retry_improvements
fix: fixed blocked write after max retry count reached
2 parents fdd0393 + 2ea8a1f commit af43eb9

File tree

3 files changed

+116
-40
lines changed

3 files changed

+116
-40
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
- Removed applying retry wait time in case of network error
99
- Better explanatory error message when a request is about to be sent in the retry wait state
1010

11-
### Documentation
12-
1311
### Fixes
1412
- [#108](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/108) - Added optional param for specifying decimal places of double.: `void Point::addField(String name, double value, int decimalPlaces = 2)`
13+
- [#111](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/110) - Fixed blocked writing after another point reached max retry count (#110)
1514

1615
## 3.4.0 [2020-10-02]
1716
### Features

src/InfluxDbClient.cpp

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,6 @@ bool InfluxDBClient::writeRecord(String &record) {
392392
_batchPointer = 0;
393393
}
394394
}
395-
INFLUXDB_CLIENT_DEBUG("[D] writeRecord: bufferPointer: %d, batchPointer: %d\n", _bufferPointer, _batchPointer);
396395
if(_writeBuffer[_bufferPointer]->append(record)) { //we reached batch size
397396
_bufferPointer++;
398397
if(_bufferPointer == _writeBufferSize) { // writeBuffer is full
@@ -404,7 +403,7 @@ bool InfluxDBClient::writeRecord(String &record) {
404403
_bufferCeiling++;
405404
}
406405
}
407-
406+
INFLUXDB_CLIENT_DEBUG("[D] writeRecord: bufferPointer: %d, batchPointer: %d, _bufferCeiling: %d\n", _bufferPointer, _batchPointer, _bufferCeiling);
408407
return checkBuffer();
409408
}
410409

@@ -421,20 +420,6 @@ bool InfluxDBClient::checkBuffer() {
421420
return true;
422421
}
423422

424-
void InfluxDBClient::dropCurrentBatch() {
425-
delete _writeBuffer[_batchPointer];
426-
_writeBuffer[_batchPointer] = nullptr;
427-
_batchPointer++;
428-
//did we got over top?
429-
if(_batchPointer == _writeBufferSize) {
430-
// restart _batchPointer in ring buffer from start
431-
_batchPointer = 0;
432-
// we reached buffer size, that means buffer was full and now lower ceiling
433-
_bufferCeiling = _bufferPointer;
434-
}
435-
INFLUXDB_CLIENT_DEBUG("[D] Droped batch, batchpointer: %d\n", _batchPointer);
436-
}
437-
438423
bool InfluxDBClient::flushBuffer() {
439424
return flushBufferInternal(false);
440425
}
@@ -464,8 +449,8 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
464449
// send all batches, It could happen there was long network outage and buffer is full
465450
while(_writeBuffer[_batchPointer] && (!flashOnlyFull || _writeBuffer[_batchPointer]->isFull())) {
466451
data = _writeBuffer[_batchPointer]->createData();
467-
if(!_writeBuffer[_batchPointer]->isFull()) {
468-
// increase _bufferPointer as it happen when buffer is flushed when is full
452+
if(!_writeBuffer[_batchPointer]->isFull() && _writeBuffer[_batchPointer]->retryCount == 0 ) { //do not increase pointer in case of retrying
453+
// points will be written so increase _bufferPointer as it happen when buffer is flushed when is full
469454
if(++_bufferPointer == _writeBufferSize) {
470455
_bufferPointer = 0;
471456
}
@@ -483,19 +468,21 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
483468
_lastFlushed = millis()/1000;
484469
dropCurrentBatch();
485470
} else if(retry) {
486-
_writeBuffer[_batchPointer]->retryCount++;
487-
if(_writeBuffer[_batchPointer]->retryCount > _writeOptions._maxRetryAttempts) {
488-
INFLUXDB_CLIENT_DEBUG("[D] Reached max retry count, dropping batch\n");
489-
dropCurrentBatch();
490-
}
491-
if(!_lastRetryAfter && statusCode > 0) {
492-
_lastRetryAfter = _writeOptions._retryInterval;
493-
if(_writeBuffer[_batchPointer]) {
494-
for(int i=1;i<_writeBuffer[_batchPointer]->retryCount;i++) {
495-
_lastRetryAfter *= _writeOptions._retryInterval;
496-
}
497-
if(_lastRetryAfter > _writeOptions._maxRetryInterval) {
498-
_lastRetryAfter = _writeOptions._maxRetryInterval;
471+
if(statusCode > 0) { //apply retry strategy only in case of HTTP errors
472+
_writeBuffer[_batchPointer]->retryCount++;
473+
if(_writeBuffer[_batchPointer]->retryCount > _writeOptions._maxRetryAttempts) {
474+
INFLUXDB_CLIENT_DEBUG("[D] Reached max retry count, dropping batch\n");
475+
dropCurrentBatch();
476+
}
477+
if(!_lastRetryAfter) {
478+
_lastRetryAfter = _writeOptions._retryInterval;
479+
if(_writeBuffer[_batchPointer]) {
480+
for(int i=1;i<_writeBuffer[_batchPointer]->retryCount;i++) {
481+
_lastRetryAfter *= _writeOptions._retryInterval;
482+
}
483+
if(_lastRetryAfter > _writeOptions._maxRetryInterval) {
484+
_lastRetryAfter = _writeOptions._maxRetryInterval;
485+
}
499486
}
500487
}
501488
}
@@ -507,18 +494,31 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
507494
yield();
508495
}
509496
//Have we emptied the buffer?
510-
if(success) {
511-
if(_batchPointer == _bufferPointer && !_writeBuffer[_bufferPointer]) {
512-
_bufferPointer = 0;
513-
_batchPointer = 0;
514-
_bufferCeiling = 0;
515-
INFLUXDB_CLIENT_DEBUG("[D] Buffer empty\n");
516-
}
497+
INFLUXDB_CLIENT_DEBUG("[D] Success: %d, _bufferPointer: %d, _batchPointer: %d, _writeBuffer[_bufferPointer]_%x\n",success,_bufferPointer,_batchPointer, _writeBuffer[_bufferPointer]);
498+
if(_batchPointer == _bufferPointer && !_writeBuffer[_bufferPointer]) {
499+
_bufferPointer = 0;
500+
_batchPointer = 0;
501+
_bufferCeiling = 0;
502+
INFLUXDB_CLIENT_DEBUG("[D] Buffer empty\n");
517503
}
518504
return success;
519505
}
520506

521507

508+
void InfluxDBClient::dropCurrentBatch() {
509+
delete _writeBuffer[_batchPointer];
510+
_writeBuffer[_batchPointer] = nullptr;
511+
_batchPointer++;
512+
//did we got over top?
513+
if(_batchPointer == _writeBufferSize) {
514+
// restart _batchPointer in ring buffer from start
515+
_batchPointer = 0;
516+
// we reached buffer size, that means buffer was full and now lower ceiling
517+
_bufferCeiling = _bufferPointer;
518+
}
519+
INFLUXDB_CLIENT_DEBUG("[D] Droped batch, batchpointer: %d\n", _batchPointer);
520+
}
521+
522522
bool InfluxDBClient::validateConnection() {
523523
if(!_wifiClient && !init()) {
524524
_lastStatusCode = 0;

test/test.ino

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private: // tests
7575
static void testTimestamp();
7676
static void testHTTPReadTimeout();
7777
static void testRetryOnFailedConnection();
78+
static void testRetryOnFailedConnectionWithFlush();
7879
static void testBufferOverwriteBatchsize1();
7980
static void testBufferOverwriteBatchsize5();
8081
static void testServerTempDownBatchsize5();
@@ -117,6 +118,7 @@ void Test::run() {
117118
testFailedWrites();
118119
testTimestamp();
119120
testRetryOnFailedConnection();
121+
//testRetryOnFailedConnectionWithFlush();
120122
testBufferOverwriteBatchsize1();
121123
testBufferOverwriteBatchsize5();
122124
testServerTempDownBatchsize5();
@@ -525,6 +527,81 @@ void Test::testRetryOnFailedConnection() {
525527
deleteAll(INFLUXDB_CLIENT_TESTING_URL);
526528
}
527529

530+
void Test::testRetryOnFailedConnectionWithFlush() {
531+
TEST_INIT("testRetryOnFailedConnectionWithFlush");
532+
533+
InfluxDBClient clientOk(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK);
534+
clientOk.setWriteOptions(WriteOptions().batchSize(2).bufferSize(2).retryInterval(4));
535+
waitServer(clientOk, true);
536+
TEST_ASSERT(clientOk.validateConnection());
537+
Point *p = createPoint("test1");
538+
TEST_ASSERT(clientOk.writePoint(*p));
539+
delete p;
540+
TEST_ASSERT(clientOk.flushBuffer());
541+
TEST_ASSERT(clientOk.isBufferEmpty());
542+
543+
clientOk.setHTTPOptions(HTTPOptions().httpReadTimeout(500));
544+
545+
Serial.println("Stop server!");
546+
waitServer(clientOk, false);
547+
// test dropping batch on max retry count
548+
TEST_ASSERT(!clientOk.validateConnection());
549+
p = createPoint("test1");
550+
TEST_ASSERT(clientOk.writePoint(*p));
551+
delete p;
552+
553+
Serial.print(millis()/1000.0f,3);
554+
Serial.println(" Write 1");
555+
556+
TEST_ASSERT(!clientOk.flushBuffer());
557+
TEST_ASSERT(!clientOk.isBufferEmpty());
558+
Serial.println(clientOk.getLastErrorMessage());
559+
560+
Serial.print(millis()/1000.0f,3);
561+
Serial.println(" Write 2");
562+
563+
TEST_ASSERT(!clientOk.flushBuffer());
564+
TEST_ASSERT(!clientOk.isBufferEmpty());
565+
Serial.println(clientOk.getLastErrorMessage());
566+
567+
Serial.print(millis()/1000.0f,3);
568+
Serial.println(" Write 3");
569+
570+
TEST_ASSERT(!clientOk.flushBuffer());
571+
TEST_ASSERT(!clientOk.isBufferEmpty());
572+
Serial.println(clientOk.getLastErrorMessage());
573+
574+
575+
Serial.print(millis()/1000.0f,3);
576+
Serial.println(" Write 4");
577+
578+
TEST_ASSERT(!clientOk.flushBuffer());
579+
TEST_ASSERT(!clientOk.isBufferEmpty());
580+
Serial.println(clientOk.getLastErrorMessage());
581+
582+
583+
Serial.println("Start server!");
584+
waitServer(clientOk, true);
585+
clientOk.setHTTPOptions(HTTPOptions().httpReadTimeout(5000));
586+
TEST_ASSERT(clientOk.validateConnection());
587+
588+
Serial.print(millis()/1000.0f,3);
589+
Serial.println(" Write 5");
590+
p = createPoint("test1");
591+
TEST_ASSERT(clientOk.writePoint(*p));
592+
delete p;
593+
TEST_ASSERT(clientOk.flushBuffer());
594+
TEST_ASSERT(clientOk.isBufferEmpty());
595+
596+
String query = "select";
597+
FluxQueryResult q = clientOk.query(query);
598+
TEST_ASSERT(countLines(q) == 2);
599+
600+
601+
TEST_END();
602+
deleteAll(INFLUXDB_CLIENT_TESTING_URL);
603+
}
604+
528605
void Test::testBufferOverwriteBatchsize1() {
529606
TEST_INIT("testBufferOverwriteBatchsize1");
530607
InfluxDBClient client(INFLUXDB_CLIENT_TESTING_BAD_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK);

0 commit comments

Comments
 (0)