Skip to content

Commit b87cbd1

Browse files
authored
[Bugfix] allow pending data to send before exit (#98)
1 parent bc83671 commit b87cbd1

File tree

5 files changed

+18
-10
lines changed

5 files changed

+18
-10
lines changed

skywalking/agent/__init__.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
# limitations under the License.
1616
#
1717

18-
from skywalking.loggings import logger
18+
import atexit
1919
from queue import Queue
2020
from threading import Thread, Event
2121
from typing import TYPE_CHECKING
2222

23-
from skywalking import config, plugins
23+
from skywalking import config, plugins, loggings
24+
from skywalking.loggings import logger
2425
from skywalking.agent.protocol import Protocol
2526

2627
if TYPE_CHECKING:
@@ -66,20 +67,27 @@ def __init():
6667
plugins.install()
6768

6869

70+
def __fini():
71+
__protocol.report(__queue, False)
72+
__queue.join()
73+
74+
6975
def start():
7076
global __started
7177
if __started:
7278
raise RuntimeError('the agent can only be started once')
73-
from skywalking import loggings
7479
loggings.init()
7580
config.finalize()
7681
__started = True
7782
__init()
7883
__heartbeat_thread.start()
7984
__report_thread.start()
85+
atexit.register(__fini)
8086

8187

8288
def stop():
89+
atexit.unregister(__fini)
90+
__fini()
8391
__finished.set()
8492

8593

skywalking/agent/protocol/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ def connected(self):
2626
def heartbeat(self):
2727
raise NotImplementedError()
2828

29-
def report(self, queue: Queue):
29+
def report(self, queue: Queue, block: bool = True):
3030
raise NotImplementedError()

skywalking/agent/protocol/grpc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ def on_error(self):
6767
self.channel.unsubscribe(self._cb)
6868
self.channel.subscribe(self._cb, try_to_connect=True)
6969

70-
def report(self, queue: Queue):
70+
def report(self, queue: Queue, block: bool = True):
7171
def generator():
7272
while True:
73-
segment = queue.get() # type: Segment
73+
segment = queue.get(block=block) # type: Segment
7474

7575
logger.debug('reporting segment %s', segment)
7676

skywalking/agent/protocol/http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ def heartbeat(self):
3838
def connected(self):
3939
return True
4040

41-
def report(self, queue: Queue):
41+
def report(self, queue: Queue, block: bool = True):
4242
def generator():
4343
while True:
44-
segment = queue.get() # type: Segment
44+
segment = queue.get(block=block) # type: Segment
4545

4646
logger.debug('reporting segment %s', segment)
4747

skywalking/agent/protocol/kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ def connected(self):
4242
def heartbeat(self):
4343
self.service_management.send_heart_beat()
4444

45-
def report(self, queue: Queue):
45+
def report(self, queue: Queue, block: bool = True):
4646
def generator():
4747
while True:
48-
segment = queue.get() # type: Segment
48+
segment = queue.get(block=block) # type: Segment
4949

5050
logger.debug('reporting segment %s', segment)
5151

0 commit comments

Comments
 (0)