Skip to content

Commit ee3ee13

Browse files
authored
Exponential reconnection backoff (#157)
1 parent f9c5530 commit ee3ee13

File tree

5 files changed

+99
-34
lines changed

5 files changed

+99
-34
lines changed

skywalking/agent/__init__.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,53 +42,73 @@
4242

4343

4444
def __heartbeat():
45+
wait = base = 30
46+
4547
while not __finished.is_set():
4648
try:
4749
__protocol.heartbeat()
50+
wait = base # reset to base wait time on success
4851
except Exception as exc:
4952
logger.error(str(exc))
53+
wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum
5054

51-
__finished.wait(30)
55+
__finished.wait(wait)
5256

5357

5458
def __report():
59+
wait = base = 0
60+
5561
while not __finished.is_set():
5662
try:
5763
__protocol.report(__queue) # is blocking actually, blocks for max config.QUEUE_TIMEOUT seconds
64+
wait = base
5865
except Exception as exc:
5966
logger.error(str(exc))
67+
wait = min(60, wait * 2 or 1)
6068

61-
__finished.wait(0)
69+
__finished.wait(wait)
6270

6371

6472
def __report_log():
73+
wait = base = 0
74+
6575
while not __finished.is_set():
6676
try:
6777
__protocol.report_log(__log_queue)
78+
wait = base
6879
except Exception as exc:
6980
logger.error(str(exc))
81+
wait = min(60, wait * 2 or 1)
7082

71-
__finished.wait(0)
83+
__finished.wait(wait)
7284

7385

7486
def __send_profile_snapshot():
87+
wait = base = 0.5
88+
7589
while not __finished.is_set():
7690
try:
7791
__protocol.send_snapshot(__snapshot_queue)
92+
wait = base
7893
except Exception as exc:
7994
logger.error(str(exc))
95+
wait = min(60, wait * 2 or 1)
8096

81-
__finished.wait(0.5)
97+
__finished.wait(wait)
8298

8399

84100
def __query_profile_command():
101+
wait = base = config.get_profile_task_interval
102+
85103
while not __finished.is_set():
86104
try:
87105
__protocol.query_profile_commands()
106+
wait = base
88107
except Exception as exc:
89108
logger.error(str(exc))
109+
wait = min(60, wait * 2 or 1)
90110

91-
__finished.wait(config.get_profile_task_interval)
111+
__finished.wait(wait)
92112

93113

94114
def __command_dispatch():

skywalking/agent/protocol/grpc.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,28 @@ def heartbeat(self):
7979

8080
except grpc.RpcError:
8181
self.on_error()
82+
raise
8283

8384
def on_error(self):
8485
traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
8586
self.channel.unsubscribe(self._cb)
8687
self.channel.subscribe(self._cb, try_to_connect=True)
8788

8889
def report(self, queue: Queue, block: bool = True):
89-
start = time()
90+
start = None
9091

9192
def generator():
93+
nonlocal start
94+
9295
while True:
9396
try:
94-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
95-
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
96-
return
97+
timeout = config.QUEUE_TIMEOUT # type: int
98+
if not start: # make sure first time through queue is always checked
99+
start = time()
100+
else:
101+
timeout -= int(time() - start)
102+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
103+
return
97104
segment = queue.get(block=block, timeout=timeout) # type: Segment
98105
except Empty:
99106
return
@@ -145,16 +152,23 @@ def generator():
145152
self.traces_reporter.report(generator())
146153
except grpc.RpcError:
147154
self.on_error()
155+
raise # reraise so that incremental reconnect wait can process
148156

149157
def report_log(self, queue: Queue, block: bool = True):
150-
start = time()
158+
start = None
151159

152160
def generator():
161+
nonlocal start
162+
153163
while True:
154164
try:
155-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
156-
if timeout <= 0:
157-
return
165+
timeout = config.QUEUE_TIMEOUT # type: int
166+
if not start: # make sure first time through queue is always checked
167+
start = time()
168+
else:
169+
timeout -= int(time() - start)
170+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
171+
return
158172
log_data = queue.get(block=block, timeout=timeout) # type: LogData
159173
except Empty:
160174
return
@@ -169,16 +183,23 @@ def generator():
169183
self.log_reporter.report(generator())
170184
except grpc.RpcError:
171185
self.on_error()
186+
raise
172187

173188
def send_snapshot(self, queue: Queue, block: bool = True):
174-
start = time()
189+
start = None
175190

176191
def generator():
192+
nonlocal start
193+
177194
while True:
178195
try:
179-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
180-
if timeout <= 0:
181-
return
196+
timeout = config.QUEUE_TIMEOUT # type: int
197+
if not start: # make sure first time through queue is always checked
198+
start = time()
199+
else:
200+
timeout -= int(time() - start)
201+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
202+
return
182203
snapshot = queue.get(block=block, timeout=timeout) # type: TracingThreadSnapshot
183204
except Empty:
184205
return
@@ -199,3 +220,4 @@ def generator():
199220
self.profile_channel.send(generator())
200221
except grpc.RpcError:
201222
self.on_error()
223+
raise

skywalking/agent/protocol/http.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,20 @@ def heartbeat(self):
4444
self.service_management.send_heart_beat()
4545

4646
def report(self, queue: Queue, block: bool = True):
47-
start = time()
47+
start = None
4848

4949
def generator():
50+
nonlocal start
51+
5052
while True:
5153
try:
52-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
53-
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
54-
return
54+
timeout = config.QUEUE_TIMEOUT # type: int
55+
if not start: # make sure first time through queue is always checked
56+
start = time()
57+
else:
58+
timeout -= int(time() - start)
59+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
60+
return
5561
segment = queue.get(block=block, timeout=timeout) # type: Segment
5662
except Empty:
5763
return
@@ -68,14 +74,20 @@ def generator():
6874
pass
6975

7076
def report_log(self, queue: Queue, block: bool = True):
71-
start = time()
77+
start = None
7278

7379
def generator():
80+
nonlocal start
81+
7482
while True:
7583
try:
76-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
77-
if timeout <= 0:
78-
return
84+
timeout = config.QUEUE_TIMEOUT # type: int
85+
if not start: # make sure first time through queue is always checked
86+
start = time()
87+
else:
88+
timeout -= int(time() - start)
89+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
90+
return
7991
log_data = queue.get(block=block, timeout=timeout) # type: LogData
8092
except Empty:
8193
return

skywalking/agent/protocol/kafka.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,20 @@ def heartbeat(self):
4545
self.service_management.send_heart_beat()
4646

4747
def report(self, queue: Queue, block: bool = True):
48-
start = time()
48+
start = None
4949

5050
def generator():
51+
nonlocal start
52+
5153
while True:
5254
try:
53-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
54-
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
55-
return
55+
timeout = config.QUEUE_TIMEOUT # type: int
56+
if not start: # make sure first time through queue is always checked
57+
start = time()
58+
else:
59+
timeout -= int(time() - start)
60+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
61+
return
5662
segment = queue.get(block=block, timeout=timeout) # type: Segment
5763
except Empty:
5864
return
@@ -103,14 +109,20 @@ def generator():
103109
self.traces_reporter.report(generator())
104110

105111
def report_log(self, queue: Queue, block: bool = True):
106-
start = time()
112+
start = None
107113

108114
def generator():
115+
nonlocal start
116+
109117
while True:
110118
try:
111-
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
112-
if timeout <= 0:
113-
return
119+
timeout = config.QUEUE_TIMEOUT # type: int
120+
if not start: # make sure first time through queue is always checked
121+
start = time()
122+
else:
123+
timeout -= int(time() - start)
124+
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
125+
return
114126
log_data = queue.get(block=block, timeout=timeout) # type: LogData
115127
except Empty:
116128
return

skywalking/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
profile_dump_max_stack_depth = int(os.getenv('SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH') or '500') # type: int
7575
profile_snapshot_transport_buffer_size = int(os.getenv('SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE') or '50')
7676

77-
# NOTE - Log reporting requires a separate channel, will merge in the future.
7877
log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
7978
os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') == 'True' else False # type: bool
8079
log_reporter_max_buffer_size = int(os.getenv('SW_AGENT_LOG_REPORTER_BUFFER_SIZE') or '10000') # type: int

0 commit comments

Comments
 (0)