2222# sunset it
2323from __future__ import annotations
2424
25+ import datetime
2526import hashlib
27+ import json
2628import logging
2729import os
2830import platform
4244from apache_beam .transforms import environments
4345from apache_beam .utils import shared
4446from apache_beam .utils import subprocess_server
47+ from apache_beam .utils .subprocess_server import _LOGGER as subprocess_server_logger
4548from apache_beam .version import __version__ as beam_version
4649
4750# pytype: skip-file
@@ -111,11 +114,53 @@ def _rename_if_different(src, dst):
111114 os .rename (src , dst )
112115
113116
117+ class PrismRunnerLogFilter (logging .Filter ):
118+ COMMON_FIELDS = set (["level" , "source" , "msg" , "time" ])
119+
120+ def filter (self , record ):
121+ if record .funcName == 'log_stdout' :
122+ try :
123+ # TODO: Fix this error message from prism
124+ message = record .getMessage ().replace (
125+ '"!ERROR:time.Time year outside of range [0,9999]"' , '' )
126+ json_record = json .loads (message )
127+ record .levelno = getattr (logging , json_record ["level" ])
128+ record .levelname = logging .getLevelName (record .levelno )
129+ if "source" in json_record :
130+ record .funcName = json_record ["source" ]["function" ]
131+ record .pathname = json_record ["source" ]["file" ]
132+ record .filename = os .path .basename (record .pathname )
133+ record .lineno = json_record ["source" ]["line" ]
134+ record .created = datetime .datetime .fromisoformat (
135+ json_record ["time" ]).timestamp ()
136+ extras = {
137+ k : v
138+ for k , v in json_record .items ()
139+ if k not in PrismRunnerLogFilter .COMMON_FIELDS
140+ }
141+
142+ if json_record ["msg" ] == "log from SDK worker" :
143+ # TODO: Use location and time inside the nested message to set record
144+ record .name = "SdkWorker" + "@" + json_record ["worker" ]["ID" ]
145+ record .msg = json_record ["sdk" ]["msg" ]
146+ else :
147+ record .name = "PrismRunner"
148+ record .msg = (
149+ f"{ json_record ['msg' ]} "
150+ f"({ ', ' .join (f'{ k } ={ v !r} ' for k , v in extras .items ())} )" )
151+ except (json .JSONDecodeError , KeyError , ValueError ):
152+ # The log parsing/filtering is best-effort.
153+ pass
154+
155+ return True # Always return True to allow the record to pass.
156+
157+
114158class PrismJobServer (job_server .SubprocessJobServer ):
115159 BIN_CACHE = os .path .expanduser ("~/.apache_beam/cache/prism/bin" )
116160
117161 def __init__ (self , options ):
118162 super ().__init__ ()
163+
119164 prism_options = options .view_as (pipeline_options .PrismRunnerOptions )
120165 # Options flow:
121166 # If the path is set, always download and unzip the provided path,
@@ -131,6 +176,12 @@ def __init__(self, options):
131176 self ._job_port = job_options .job_port
132177
133178 self ._log_level = prism_options .prism_log_level
179+ self ._log_kind = prism_options .prism_log_kind
180+
181+ # override console to json with log filter enabled
182+ if self ._log_kind == "console" :
183+ self ._log_kind = "json"
184+ subprocess_server_logger .addFilter (PrismRunnerLogFilter ())
134185
135186 # the method is only kept for testing and backward compatibility
136187 @classmethod
@@ -429,6 +480,8 @@ def prism_arguments(self, job_port) -> typing.List[typing.Any]:
429480 job_port ,
430481 '--log_level' ,
431482 self ._log_level ,
483+ '--log_kind' ,
484+ self ._log_kind ,
432485 '--serve_http' ,
433486 False ,
434487 ]
0 commit comments