44import backoff
55import json
66
7- from analytics .version import VERSION
87from analytics .request import post , APIError , DatetimeSerializer
98
109try :
@@ -23,8 +22,9 @@ class Consumer(Thread):
2322 """Consumes the messages from the client's queue."""
2423 log = logging .getLogger ('segment' )
2524
26- def __init__ (self , queue , write_key , flush_at = 100 , host = None , on_error = None ,
27- flush_interval = 0.5 , gzip = False , retries = 10 , timeout = 15 ):
25+ def __init__ (self , queue , write_key , flush_at = 100 , host = None ,
26+ on_error = None , flush_interval = 0.5 , gzip = False , retries = 10 ,
27+ timeout = 15 ):
2828 """Create a consumer thread."""
2929 Thread .__init__ (self )
3030 # Make consumer a daemon thread so that it doesn't block program exit
@@ -38,7 +38,8 @@ def __init__(self, queue, write_key, flush_at=100, host=None, on_error=None,
3838 self .gzip = gzip
3939 # It's important to set running in the constructor: if we are asked to
4040 # pause immediately after construction, we might set running to True in
41- # run() *after* we set it to False in pause... and keep running forever.
41+ # run() *after* we set it to False in pause... and keep running
42+ # forever.
4243 self .running = True
4344 self .retries = retries
4445 self .timeout = timeout
@@ -113,14 +114,19 @@ def request(self, batch):
113114
114115 def fatal_exception (exc ):
115116 if isinstance (exc , APIError ):
116- # retry on server errors and client errors with 429 status code (rate limited),
117+ # retry on server errors and client errors
118+ # with 429 status code (rate limited),
117119 # don't retry on other client errors
118120 return (400 <= exc .status < 500 ) and exc .status != 429
119121 else :
120122 # retry on all other errors (eg. network)
121123 return False
122124
123- @backoff .on_exception (backoff .expo , Exception , max_tries = self .retries + 1 , giveup = fatal_exception )
125+ @backoff .on_exception (
126+ backoff .expo ,
127+ Exception ,
128+ max_tries = self .retries + 1 ,
129+ giveup = fatal_exception )
124130 def send_request ():
125131 post (self .write_key , self .host , gzip = self .gzip ,
126132 timeout = self .timeout , batch = batch )
0 commit comments