@@ -84,15 +84,23 @@ def _file_logging_enabled():
8484 'True' )) and not _is_running_on_app_engine () and not _is_running_on_k8s ()
8585
8686
87+ def _fluentd_logging_enabled ():
88+ """Return bool True where fluentd logging is enabled.
89+ This is enabled by default.
90+ This is disabled for local development and if we are running in app engine or
91+ kubernetes as these have their dedicated loggers, see configure_appengine()
92+ and configure_k8s()."""
93+ return bool (os .getenv ('LOG_TO_FLUENTD' , 'True' )) and not _is_local (
94+ ) and not _is_running_on_app_engine () and not _is_running_on_k8s ()
95+
96+
8797def _cloud_logging_enabled ():
8898 """Return bool True where Google Cloud Logging is enabled.
89- This is enabled by default.
9099 This is disabled for local development and if we are running in a app engine
91100 or kubernetes as these have their dedicated loggers, see
92101 configure_appengine() and configure_k8s()."""
93- return (bool (os .getenv ('LOG_TO_GCP' , 'True' )) and
94- not os .getenv ("PY_UNITTESTS" ) and not _is_local () and
95- not _is_running_on_app_engine () and not _is_running_on_k8s ())
102+ return bool (os .getenv ('LOG_TO_GCP' )) and not _is_local (
103+ ) and not _is_running_on_app_engine () and not _is_running_on_k8s ()
96104
97105
98106def suppress_unwanted_warnings ():
@@ -176,57 +184,53 @@ def truncate(msg, limit):
176184 ])
177185
178186
179- class JsonFormatter (logging .Formatter ):
180- """Formats log records as JSON."""
181-
182- def format (self , record : logging .LogRecord ) -> str :
183- """Format LogEntry into JSON string."""
184- entry = {
185- 'message' :
186- truncate (record .getMessage (), STACKDRIVER_LOG_MESSAGE_LIMIT ),
187- 'created' : (
188- datetime .datetime .utcfromtimestamp (record .created ).isoformat () + 'Z'
189- ),
190- 'severity' :
191- record .levelname ,
192- 'bot_name' :
193- os .getenv ('BOT_NAME' ),
194- 'task_payload' :
195- os .getenv ('TASK_PAYLOAD' ),
196- 'name' :
197- record .name ,
198- 'pid' :
199- os .getpid (),
200- 'task_id' :
201- os .getenv ('CF_TASK_ID' , 'null' ),
202- }
187+ def format_record (record : logging .LogRecord ) -> str :
188+ """Format LogEntry into JSON string."""
189+ entry = {
190+ 'message' :
191+ truncate (record .getMessage (), STACKDRIVER_LOG_MESSAGE_LIMIT ),
192+ 'created' : (
193+ datetime .datetime .utcfromtimestamp (record .created ).isoformat () + 'Z' ),
194+ 'severity' :
195+ record .levelname ,
196+ 'bot_name' :
197+ os .getenv ('BOT_NAME' ),
198+ 'task_payload' :
199+ os .getenv ('TASK_PAYLOAD' ),
200+ 'name' :
201+ record .name ,
202+ 'pid' :
203+ os .getpid (),
204+ 'task_id' :
205+ os .getenv ('CF_TASK_ID' , 'null' ),
206+ }
203207
204- initial_payload = os .getenv ('INITIAL_TASK_PAYLOAD' )
205- if initial_payload :
206- entry ['actual_task_payload' ] = entry ['task_payload' ]
207- entry ['task_payload' ] = initial_payload
208+ initial_payload = os .getenv ('INITIAL_TASK_PAYLOAD' )
209+ if initial_payload :
210+ entry ['actual_task_payload' ] = entry ['task_payload' ]
211+ entry ['task_payload' ] = initial_payload
208212
209- entry ['location' ] = getattr (record , 'location' , {'error' : True })
210- entry ['extras' ] = getattr (record , 'extras' , {})
211- update_entry_with_exc (entry , record .exc_info )
213+ entry ['location' ] = getattr (record , 'location' , {'error' : True })
214+ entry ['extras' ] = getattr (record , 'extras' , {})
215+ update_entry_with_exc (entry , record .exc_info )
212216
213- if not entry ['extras' ]:
214- del entry ['extras' ]
217+ if not entry ['extras' ]:
218+ del entry ['extras' ]
215219
216- worker_bot_name = os .environ .get ('WORKER_BOT_NAME' )
217- if worker_bot_name :
218- entry ['worker_bot_name' ] = worker_bot_name
220+ worker_bot_name = os .environ .get ('WORKER_BOT_NAME' )
221+ if worker_bot_name :
222+ entry ['worker_bot_name' ] = worker_bot_name
219223
220- fuzz_target = os .getenv ('FUZZ_TARGET' )
221- if fuzz_target :
222- entry ['fuzz_target' ] = fuzz_target
224+ fuzz_target = os .getenv ('FUZZ_TARGET' )
225+ if fuzz_target :
226+ entry ['fuzz_target' ] = fuzz_target
223227
224- # Log bot shutdown cases as WARNINGs ( this is expected for preemptibles) .
225- if (entry ['severity' ] in ['ERROR' , 'CRITICAL' ] and
226- 'IOError: [Errno 4] Interrupted function call' in entry ['message' ]):
227- entry ['severity' ] = 'WARNING'
228+ # Log bot shutdown cases as WARNINGs since this is expected for preemptibles.
229+ if (entry ['severity' ] in ['ERROR' , 'CRITICAL' ] and
230+ 'IOError: [Errno 4] Interrupted function call' in entry ['message' ]):
231+ entry ['severity' ] = 'WARNING'
228232
229- return json .dumps (entry , default = _handle_unserializable )
233+ return json .dumps (entry , default = _handle_unserializable )
230234
231235
232236def _handle_unserializable (unserializable : Any ) -> str :
@@ -267,6 +271,16 @@ def update_entry_with_exc(entry, exc_info):
267271 }
268272
269273
274+ class JsonSocketHandler (logging .handlers .SocketHandler ):
275+ """Format log into JSON string before sending it to fluentd. We need this
276+ because SocketHandler doesn't respect the formatter attribute."""
277+
278+ def makePickle (self , record : logging .LogRecord ):
279+ """Format LogEntry into JSON string."""
280+ # \n is the recognized delimiter by fluentd's in_tcp. Don't remove.
281+ return (format_record (record ) + '\n ' ).encode ('utf-8' )
282+
283+
270284def uncaught_exception_handler (exception_type , exception_value ,
271285 exception_traceback ):
272286 """Handles any exception that are uncaught by logging an error and calling
@@ -344,6 +358,15 @@ def record_factory(*args, **kwargs):
344358 logging .getLogger ().setLevel (logging .INFO )
345359
346360
361+ def configure_fluentd_logging ():
362+ fluentd_handler = JsonSocketHandler (
363+ host = '127.0.0.1' ,
364+ port = 5170 ,
365+ )
366+ fluentd_handler .setLevel (logging .INFO )
367+ logging .getLogger ().addHandler (fluentd_handler )
368+
369+
347370def configure_cloud_logging ():
348371 """ Configure Google cloud logging, for bots not running on appengine nor k8s.
349372 """
@@ -389,8 +412,6 @@ def cloud_label_filter(record):
389412
390413 handler .addFilter (cloud_label_filter )
391414 handler .setLevel (logging .INFO )
392- formatter = JsonFormatter ()
393- handler .setFormatter (formatter )
394415
395416 logging .getLogger ().addHandler (handler )
396417
@@ -413,6 +434,8 @@ def configure(name, extras=None):
413434 logging .basicConfig (level = logging .INFO )
414435 if _file_logging_enabled ():
415436 config .dictConfig (get_logging_config_dict (name ))
437+ if _fluentd_logging_enabled ():
438+ configure_fluentd_logging ()
416439 if _cloud_logging_enabled ():
417440 configure_cloud_logging ()
418441 logger = logging .getLogger (name )
0 commit comments