1919import logging
2020import re
2121from base64 import b64encode
22- from typing import Any , Mapping , MutableMapping , Optional , Sequence
22+ from functools import partial
23+ from typing import (
24+ Any ,
25+ Mapping ,
26+ MutableMapping ,
27+ Optional ,
28+ Sequence ,
29+ TextIO ,
30+ cast ,
31+ )
2332
2433import google .auth
2534from google .api .monitored_resource_pb2 import ( # pylint: disable = no-name-in-module
3645from google .logging .type .log_severity_pb2 import ( # pylint: disable = no-name-in-module
3746 LogSeverity ,
3847)
48+ from google .protobuf .json_format import MessageToDict
3949from google .protobuf .struct_pb2 import ( # pylint: disable = no-name-in-module
4050 Struct ,
4151)
5262from opentelemetry .sdk .resources import Resource
5363from opentelemetry .trace import format_span_id , format_trace_id
5464from opentelemetry .util .types import AnyValue
65+ from proto .datetime_helpers import ( # type: ignore[import]
66+ DatetimeWithNanoseconds ,
67+ )
5568
5669DEFAULT_MAX_ENTRY_SIZE = 256000 # 256 KB
5770DEFAULT_MAX_REQUEST_SIZE = 10000000 # 10 MB
@@ -205,24 +218,41 @@ def __init__(
205218 project_id : Optional [str ] = None ,
206219 default_log_name : Optional [str ] = None ,
207220 client : Optional [LoggingServiceV2Client ] = None ,
221+ * ,
222+ structured_json_file : Optional [TextIO ] = None ,
208223 ):
209224 self .project_id : str
210225 if not project_id :
211226 _ , default_project_id = google .auth .default ()
212227 self .project_id = str (default_project_id )
213228 else :
214229 self .project_id = project_id
230+
215231 if default_log_name :
216232 self .default_log_name = default_log_name
217233 else :
218234 self .default_log_name = "otel_python_inprocess_log_name_temp"
219- self .client = client or LoggingServiceV2Client (
220- transport = LoggingServiceV2GrpcTransport (
221- channel = LoggingServiceV2GrpcTransport .create_channel (
222- options = _OPTIONS ,
235+
236+ if client and structured_json_file :
237+ raise ValueError (
238+ "Cannot specify both client and structured_json_file"
239+ )
240+
241+ if structured_json_file :
242+ self ._write_log_entries = partial (
243+ self ._write_log_entries_to_file , structured_json_file
244+ )
245+ else :
246+ client = client or LoggingServiceV2Client (
247+ transport = LoggingServiceV2GrpcTransport (
248+ channel = LoggingServiceV2GrpcTransport .create_channel (
249+ options = _OPTIONS ,
250+ )
223251 )
224252 )
225- )
253+ self ._write_log_entries = partial (
254+ self ._write_log_entries_to_client , client
255+ )
226256
227257 def pick_log_id (self , log_name_attr : Any , event_name : str | None ) -> str :
228258 if log_name_attr and isinstance (log_name_attr , str ):
@@ -288,7 +318,58 @@ def export(self, batch: Sequence[LogData]):
288318
289319 self ._write_log_entries (log_entries )
290320
291- def _write_log_entries (self , log_entries : list [LogEntry ]):
321+ @staticmethod
322+ def _write_log_entries_to_file (file : TextIO , log_entries : list [LogEntry ]):
323+ """Formats logs into the Cloud Logging structured log format, and writes them to the
324+ specified file-like object
325+
326+ See https://cloud.google.com/logging/docs/structured-logging
327+ """
328+ # TODO: this is not resilient to exceptions which can cause recursion when using OTel's
329+ # logging handler. See
330+ # https://github.com/open-telemetry/opentelemetry-python/issues/4261 for outstanding
331+ # issue in OTel.
332+
333+ for entry in log_entries :
334+ json_dict : dict [str , Any ] = {}
335+
336+ # These are not added in export() so not added to the JSON here.
337+ # - httpRequest
338+ # - logging.googleapis.com/sourceLocation
339+ # - logging.googleapis.com/operation
340+ # - logging.googleapis.com/insertId
341+
342+ # https://cloud.google.com/logging/docs/agent/logging/configuration#timestamp-processing
343+ timestamp = cast (DatetimeWithNanoseconds , entry .timestamp )
344+ json_dict ["time" ] = timestamp .rfc3339 ()
345+
346+ json_dict ["severity" ] = LogSeverity .Name (
347+ cast (LogSeverity .ValueType , entry .severity )
348+ )
349+ json_dict ["logging.googleapis.com/labels" ] = dict (entry .labels )
350+ json_dict ["logging.googleapis.com/spanId" ] = entry .span_id
351+ json_dict [
352+ "logging.googleapis.com/trace_sampled"
353+ ] = entry .trace_sampled
354+ json_dict ["logging.googleapis.com/trace" ] = entry .trace
355+
356+ if entry .text_payload :
357+ json_dict ["message" ] = entry .text_payload
358+ if entry .json_payload :
359+ json_dict .update (
360+ MessageToDict (LogEntry .pb (entry ).json_payload )
361+ )
362+
363+ # Use dumps to avoid invalid json written to the stream if serialization fails for any reason
364+ file .write (
365+ json .dumps (json_dict , separators = ("," , ":" ), sort_keys = True )
366+ + "\n "
367+ )
368+
369+ @staticmethod
370+ def _write_log_entries_to_client (
371+ client : LoggingServiceV2Client , log_entries : list [LogEntry ]
372+ ):
292373 batch : list [LogEntry ] = []
293374 batch_byte_size = 0
294375 for entry in log_entries :
@@ -302,7 +383,7 @@ def _write_log_entries(self, log_entries: list[LogEntry]):
302383 continue
303384 if msg_size + batch_byte_size > DEFAULT_MAX_REQUEST_SIZE :
304385 try :
305- self . client .write_log_entries (
386+ client .write_log_entries (
306387 WriteLogEntriesRequest (
307388 entries = batch , partial_success = True
308389 )
@@ -319,7 +400,7 @@ def _write_log_entries(self, log_entries: list[LogEntry]):
319400 batch_byte_size += msg_size
320401 if batch :
321402 try :
322- self . client .write_log_entries (
403+ client .write_log_entries (
323404 WriteLogEntriesRequest (entries = batch , partial_success = True )
324405 )
325406 # pylint: disable=broad-except
0 commit comments