Skip to content

Commit fdd0393

Browse files
authored
Merge pull request #109 from bonitoo-io/feat/retry_improvements
feat: retry strategy improvements
2 parents 90bcb58 + 86ccd69 commit fdd0393

File tree

5 files changed

+84
-35
lines changed

5 files changed

+84
-35
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22
## 3.5.0 [in progress]
33
### Features
44
- [#107](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/107) - Added possibility to set default tags. Use `WriteOptions::addDefaultTag()` to add a tag that will be added to each written point using the `writePoint()` function.
5+
- [#109](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/109) - Retry strategy improvements:
6+
- Added `canSendRequest()` function to check if retry strategy is applied
7+
- Added `getRemaingRetryTime()` function to get wait time before another request (write/query) can be sent
8+
- Removed applying retry wait time in case of network error
9+
- Better explanatory error message when a request is about to be sent in the retry wait state
510

611
### Documentation
712

src/InfluxDbClient.cpp

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ static const char UserAgent[] PROGMEM = "influxdb-client-arduino/" INFLUXDB_CLIE
4545
#include "util/debug.h"
4646

4747
static const char UnitialisedMessage[] PROGMEM = "Unconfigured instance";
48-
static const char TooEarlyMessage[] PROGMEM = "Too early request";
48+
static const char TooEarlyMessage[] PROGMEM = "Cannot send request yet because of applied retry strategy. Remaining ";
4949
// This cannot be put to PROGMEM due to the way how it is used
5050
static const char RetryAfter[] = "Retry-After";
5151
static const char TransferEnconding[] = "Transfer-Encoding";
@@ -398,10 +398,6 @@ bool InfluxDBClient::writeRecord(String &record) {
398398
if(_bufferPointer == _writeBufferSize) { // writeBuffer is full
399399
_bufferPointer = 0;
400400
INFLUXDB_CLIENT_DEBUG("[W] Reached write buffer size, old points will be overwritten\n");
401-
// if(isBufferFull()) {
402-
// //if already isBufferFull
403-
// _batchPointer = 0;
404-
// }
405401
}
406402

407403
if(_bufferCeiling < _writeBufferSize) {
@@ -443,16 +439,25 @@ bool InfluxDBClient::flushBuffer() {
443439
return flushBufferInternal(false);
444440
}
445441

446-
bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
442+
uint32_t InfluxDBClient::getRemaingRetryTime() {
443+
uint32_t rem = 0;
447444
if(_lastRetryAfter > 0) {
448-
uint32_t diff = (millis()-_lastRequestTime)/1000 ;
449-
if(diff < _lastRetryAfter) {
450-
INFLUXDB_CLIENT_DEBUG("[W] Cannot write yet, pause %ds, so far %ds\n", _lastRetryAfter, diff);
451-
// retry after period didn't run out yet
452-
_lastStatusCode = 0;
453-
_lastErrorResponse = FPSTR(TooEarlyMessage);
454-
return false;
455-
}
445+
int32_t diff = _lastRetryAfter - (millis()-_lastRequestTime)/1000;
446+
rem = diff<0?0:(uint32_t)diff;
447+
}
448+
return rem;
449+
}
450+
451+
bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
452+
uint32_t rwt = getRemaingRetryTime();
453+
if(rwt > 0) {
454+
INFLUXDB_CLIENT_DEBUG("[W] Cannot write yet, pause %ds, %ds yet\n", _lastRetryAfter, rwt);
455+
// retry after period didn't run out yet
456+
_lastStatusCode = 0;
457+
_lastErrorResponse = FPSTR(TooEarlyMessage);
458+
_lastErrorResponse += String(rwt);
459+
_lastErrorResponse += "s";
460+
return false;
456461
}
457462
char *data;
458463
bool success = true;
@@ -483,7 +488,7 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
483488
INFLUXDB_CLIENT_DEBUG("[D] Reached max retry count, dropping batch\n");
484489
dropCurrentBatch();
485490
}
486-
if(!_lastRetryAfter) {
491+
if(!_lastRetryAfter && statusCode > 0) {
487492
_lastRetryAfter = _writeOptions._retryInterval;
488493
if(_writeBuffer[_batchPointer]) {
489494
for(int i=1;i<_writeBuffer[_batchPointer]->retryCount;i++) {
@@ -534,7 +539,7 @@ bool InfluxDBClient::validateConnection() {
534539

535540
_lastErrorResponse = "";
536541

537-
afterRequest(200);
542+
afterRequest(200, false);
538543

539544
_httpClient.end();
540545

@@ -591,9 +596,14 @@ static const char QueryDialect[] PROGMEM = "\
591596
}}";
592597

593598
FluxQueryResult InfluxDBClient::query(String fluxQuery) {
594-
if(_lastRetryAfter > 0 && (millis()-_lastRequestTime)/1000 < _lastRetryAfter) {
599+
uint32_t rwt = getRemaingRetryTime();
600+
if(rwt > 0) {
601+
INFLUXDB_CLIENT_DEBUG("[W] Cannot query yet, pause %ds, %ds yet\n", _lastRetryAfter, rwt);
595602
// retry after period didn't run out yet
596-
return FluxQueryResult(FPSTR(TooEarlyMessage));
603+
String mess = FPSTR(TooEarlyMessage);
604+
mess += String(rwt);
605+
mess += "s";
606+
return FluxQueryResult(mess);
597607
}
598608
if(!_wifiClient && !init()) {
599609
_lastStatusCode = 0;
@@ -635,17 +645,17 @@ FluxQueryResult InfluxDBClient::query(String fluxQuery) {
635645
}
636646
}
637647

638-
void InfluxDBClient::afterRequest(int expectedStatusCode) {
639-
_lastRequestTime = millis();
640-
INFLUXDB_CLIENT_DEBUG("[D] HTTP status code - %d\n", _lastStatusCode);
641-
if(_lastStatusCode >= 429) { //retryable server errors
648+
void InfluxDBClient::afterRequest(int expectedStatusCode, bool modifyLastConnStatus) {
649+
if(modifyLastConnStatus) {
650+
_lastRequestTime = millis();
651+
INFLUXDB_CLIENT_DEBUG("[D] HTTP status code - %d\n", _lastStatusCode);
642652
_lastRetryAfter = 0;
643-
if(_httpClient.hasHeader(RetryAfter)) {
644-
_lastRetryAfter = _httpClient.header(RetryAfter).toInt();
645-
INFLUXDB_CLIENT_DEBUG("[D] Reply after - %d\n", _lastRetryAfter);
653+
if(_lastStatusCode >= 429) { //retryable server errors
654+
if(_httpClient.hasHeader(RetryAfter)) {
655+
_lastRetryAfter = _httpClient.header(RetryAfter).toInt();
656+
INFLUXDB_CLIENT_DEBUG("[D] Reply after - %d\n", _lastRetryAfter);
657+
}
646658
}
647-
} else {
648-
_lastRetryAfter = 0;
649659
}
650660
_lastErrorResponse = "";
651661
if(_lastStatusCode != expectedStatusCode) {

src/InfluxDbClient.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,20 @@ class InfluxDBClient {
148148
String getLastErrorMessage() const { return _lastErrorResponse; }
149149
// Returns server url
150150
String getServerUrl() const { return _serverUrl; }
151+
// Check if it is possible to send write/query request to server.
152+
// Returns true if write or query can be send, or false, if server is overloaded and retry strategy is applied.
153+
// Use getRemaingRetryTime() to get wait time in such case.
154+
bool canSendRequest() { return getRemaingRetryTime() == 0; }
155+
// Returns remaining wait time in seconds when retry strategy is applied.
156+
uint32_t getRemaingRetryTime();
151157
protected:
152158
// Checks params and sets up security, if needed.
153159
// Returns true in case of success, otherwise false
154160
bool init();
155161
// Sets request params
156162
void beforeRequest();
157163
// Handles response
158-
void afterRequest(int expectedStatusCode);
164+
void afterRequest(int expectedStatusCode, bool modifyLastConnStatus = true);
159165
// Cleans instances
160166
void clean();
161167
protected:

src/util/debug.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
#include <Arduino.h>
3131

3232
#ifdef INFLUXDB_CLIENT_DEBUG_ENABLE
33-
# define INFLUXDB_CLIENT_DEBUG(fmt, ...) Serial.printf_P( (PGM_P)PSTR(fmt), ## __VA_ARGS__ )
33+
# define INFLUXDB_CLIENT_DEBUG(fmt, ...) Serial.printf("%.03f ",millis()/1000.0f);Serial.printf_P( (PGM_P)PSTR(fmt), ## __VA_ARGS__ )
3434
#else
3535
# define INFLUXDB_CLIENT_DEBUG(fmt, ...)
3636
#endif //INFLUXDB_CLIENT_DEBUG

test/test.ino

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ void setup() {
5151
class Test {
5252
public:
5353
static void run();
54-
private:
54+
private: //helpers
55+
static void setServerUrl(InfluxDBClient &client, String serverUrl);
56+
private: // tests
5557
static void testOptions();
5658
static void testEcaping();
5759
static void testPoint();
@@ -481,16 +483,29 @@ void Test::testRetryOnFailedConnection() {
481483

482484
InfluxDBClient clientOk(INFLUXDB_CLIENT_TESTING_URL, INFLUXDB_CLIENT_TESTING_ORG, INFLUXDB_CLIENT_TESTING_BUC, INFLUXDB_CLIENT_TESTING_TOK);
483485
clientOk.setWriteOptions(WriteOptions().batchSize(1).bufferSize(5));
486+
waitServer(clientOk, true);
487+
TEST_ASSERT(clientOk.validateConnection());
488+
Point *p = createPoint("test1");
489+
TEST_ASSERT(clientOk.writePoint(*p));
490+
delete p;
491+
p = createPoint("test1");
492+
TEST_ASSERT(clientOk.writePoint(*p));
493+
delete p;
494+
TEST_ASSERT(clientOk.isBufferEmpty());
495+
484496
clientOk.setHTTPOptions(HTTPOptions().httpReadTimeout(500));
485497

486498
Serial.println("Stop server!");
487499
waitServer(clientOk, false);
488500
TEST_ASSERT(!clientOk.validateConnection());
489-
Point *p = createPoint("test1");
501+
TEST_ASSERTM(clientOk._lastRetryAfter == 0, String(clientOk._lastRetryAfter));
502+
p = createPoint("test1");
490503
TEST_ASSERT(!clientOk.writePoint(*p));
504+
TEST_ASSERTM(clientOk._lastRetryAfter == 0, String(clientOk._lastRetryAfter));
491505
delete p;
492506
p = createPoint("test1");
493507
TEST_ASSERT(!clientOk.writePoint(*p));
508+
TEST_ASSERTM(clientOk._lastRetryAfter == 0, String(clientOk._lastRetryAfter));
494509
delete p;
495510

496511
Serial.println("Start server!");
@@ -499,6 +514,7 @@ void Test::testRetryOnFailedConnection() {
499514
TEST_ASSERT(clientOk.validateConnection());
500515
p = createPoint("test1");
501516
TEST_ASSERT(clientOk.writePoint(*p));
517+
TEST_ASSERTM(clientOk._lastRetryAfter == 0, String(clientOk._lastRetryAfter));
502518
delete p;
503519
TEST_ASSERT(clientOk.isBufferEmpty());
504520
String query = "select";
@@ -525,8 +541,8 @@ void Test::testBufferOverwriteBatchsize1() {
525541
TEST_ASSERT(client.isBufferFull());
526542
TEST_ASSERTM(client._writeBuffer[0]->buffer[0].indexOf("index=10i") > 0, client._writeBuffer[0]->buffer[0]);
527543

528-
client._serverUrl = INFLUXDB_CLIENT_TESTING_URL;
529-
client.setUrls();
544+
setServerUrl(client,INFLUXDB_CLIENT_TESTING_URL );
545+
530546
waitServer(client, true);
531547
client.setHTTPOptions(HTTPOptions().httpReadTimeout(5000));
532548
Point *p = createPoint("test1");
@@ -566,8 +582,7 @@ void Test::testBufferOverwriteBatchsize5() {
566582
TEST_ASSERT(client.isBufferFull());
567583
TEST_ASSERTM(client._writeBuffer[0]->buffer[0].indexOf("index=20i") > 0, client._writeBuffer[0]->buffer[0]);
568584

569-
client._serverUrl = INFLUXDB_CLIENT_TESTING_URL;
570-
client.setUrls();
585+
setServerUrl(client,INFLUXDB_CLIENT_TESTING_URL );
571586

572587
waitServer(client, true);
573588
client.setHTTPOptions(HTTPOptions().httpReadTimeout(5000));
@@ -1685,34 +1700,42 @@ void Test::testRetryInterval() {
16851700

16861701
String rec = "test1,direction=permanent-set,x-code=502,SSID=bonitoo.io,device_name=ESP32,device_id=4272205360 temperature=28.60,humidity=86i,code=69i,door=false,status=\"failed\",index=0";
16871702
TEST_ASSERT(!client.writeRecord(rec));
1703+
TEST_ASSERT(!client.canSendRequest());
16881704
TEST_ASSERTM(client._lastRetryAfter == 2, String(client._lastRetryAfter));
16891705
TEST_ASSERTM(client._writeBuffer[0]->retryCount == 1, String(client._writeBuffer[0]->retryCount));
16901706
delay(2000);
16911707
rec = "test1,direction=permanent-unset,SSID=bonitoo.io,device_name=ESP32,device_id=4272205360 temperature=28.60,humidity=86i,code=69i,door=false,status=\"failed\",index=2";
16921708
TEST_ASSERT(!client.writeRecord(rec));
1709+
TEST_ASSERT(!client.canSendRequest());
16931710
TEST_ASSERTM(client._lastRetryAfter == 4, String(client._lastRetryAfter));
16941711
TEST_ASSERTM(client._writeBuffer[0]->retryCount == 2, String(client._writeBuffer[0]->retryCount));
16951712
delay(4000);
16961713
rec = "test1,SSID=bonitoo.io,device_name=ESP32,device_id=4272205360 temperature=28.60,humidity=86i,code=69i,door=false,status=\"failed\",index=3";
16971714
TEST_ASSERT(!client.writeRecord(rec));
1715+
TEST_ASSERT(!client.canSendRequest());
16981716
TEST_ASSERTM(client._lastRetryAfter == 8, String(client._lastRetryAfter));
16991717
TEST_ASSERTM(client._writeBuffer[0]->retryCount == 3, String(client._writeBuffer[0]->retryCount));
17001718
delay(8000);
17011719
rec = "test1,SSID=bonitoo.io,device_name=ESP32,device_id=4272205360 temperature=28.60,humidity=86i,code=69i,door=false,status=\"failed\",index=4";
17021720
TEST_ASSERT(!client.writeRecord(rec));
1721+
TEST_ASSERT(!client.canSendRequest());
17031722
TEST_ASSERTM(client._lastRetryAfter == 2, String(client._lastRetryAfter));
17041723
TEST_ASSERT(!client._writeBuffer[0]);
17051724
TEST_ASSERTM(client._writeBuffer[1]->retryCount == 0, String(client._writeBuffer[1]->retryCount));
17061725

17071726
delay(2000);
17081727
rec = "test1,SSID=bonitoo.io,device_name=ESP32,device_id=4272205360 temperature=28.60,humidity=86i,code=69i,door=false,status=\"failed\",index=5";
17091728
TEST_ASSERT(!client.writeRecord(rec));
1729+
TEST_ASSERT(!client.canSendRequest());
17101730
TEST_ASSERTM(client._lastRetryAfter == 2, String(client._lastRetryAfter));
17111731
TEST_ASSERT(!client._writeBuffer[0]);
17121732
TEST_ASSERTM(client._writeBuffer[1]->retryCount == 1, String(client._writeBuffer[1]->retryCount));
17131733

17141734
delay(2000);
1735+
TEST_ASSERT(client.canSendRequest());
17151736
TEST_ASSERTM(client.flushBuffer(), client.getLastErrorMessage());
1737+
TEST_ASSERT(client.isBufferEmpty());
1738+
TEST_ASSERT(!client.isBufferFull());
17161739
String query = "select";
17171740
FluxQueryResult q = client.query(query);
17181741
TEST_ASSERT(countLines(q) == 3); //point with the direction tag is skipped
@@ -1782,6 +1805,11 @@ void Test::testDefaultTags() {
17821805
deleteAll(INFLUXDB_CLIENT_TESTING_URL);
17831806
}
17841807

1808+
void Test::setServerUrl(InfluxDBClient &client, String serverUrl) {
1809+
client._serverUrl = serverUrl;
1810+
client.setUrls();
1811+
}
1812+
17851813
Point *createPoint(String measurement) {
17861814
Point *point = new Point(measurement);
17871815
point->addTag("SSID", WiFi.SSID());

0 commit comments

Comments
 (0)