11# -*- coding: utf-8 -*-
22
3+ import socket
34import threading
45from queue import Queue , Full , Empty
56
@@ -81,11 +82,7 @@ def close(self, flush=True):
8182 return
8283 self ._closed = True
8384 if not flush :
84- while True :
85- try :
86- self ._queue .get (block = False )
87- except Empty :
88- break
85+ self ._clear_queue ()
8986 self ._queue .put (_TOMBSTONE )
9087 self ._send_thread .join ()
9188
@@ -101,6 +98,13 @@ def queue_blocking(self):
10198 def queue_circular (self ):
10299 return self ._queue_circular
103100
101+ def _clear_queue (self ):
102+ while True :
103+ try :
104+ self ._queue .get (block = False )
105+ except Empty :
106+ break
107+
104108 def _send (self , bytes_ ):
105109 with self .lock :
106110 if self ._closed :
@@ -120,8 +124,20 @@ def _send(self, bytes_):
120124
121125 return True
122126
127+ def _send_internal (self , bytes_ ):
128+ send_internal_result = super (FluentSender , self )._send_internal (bytes_ )
129+ if send_internal_result is False :
130+ # when send_result is False, super() caught socket.error
131+ # and assigned the error to self.last_error
132+ if isinstance (self .last_error , socket .gaierror ):
133+ # clear the queue to avoid blocking and print the log
134+ self ._clear_queue ()
135+ print ("%s. Please check address: (%s, %s)" % (str (self .last_error ), self .host , self .port ))
136+
137+ return send_internal_result
138+
123139 def _send_loop (self ):
124- send_internal = super ( FluentSender , self ) ._send_internal
140+ send_internal = self ._send_internal
125141
126142 try :
127143 while True :
0 commit comments