2121
2222import logging
2323import time
24+ from typing import Any , Callable , Dict , Optional , Tuple
2425
25- from google .cloud import logging
26- from grpc import UnaryUnaryClientInterceptor , UnaryStreamClientInterceptor
26+ from google .cloud import logging as google_cloud_logging
27+ from grpc import (
28+ Call ,
29+ Future ,
30+ UnaryUnaryClientInterceptor ,
31+ UnaryStreamClientInterceptor ,
32+ )
33+ from grpc ._interceptor import _ClientCallDetails
2734
2835from google .ads .googleads .interceptors import LoggingInterceptor , mask_message
2936
@@ -40,27 +47,29 @@ class CloudLoggingInterceptor(LoggingInterceptor):
4047 this is to inherit from the Interceptor class instead, and selectively copy whatever
4148 logic is needed from the LoggingInterceptor class."""
4249
43- def __init__ (self , api_version ):
50+ def __init__ (self , api_version : str ):
4451 """Initializer for the CloudLoggingInterceptor.
4552
4653 Args:
4754 api_version: a str of the API version of the request.
4855 """
4956 super ().__init__ (logger = None , api_version = api_version )
5057 # Instantiate the Cloud Logging client.
51- logging_client = logging .Client ()
52- self .logger = logging_client .logger ("cloud_logging" )
58+ logging_client : google_cloud_logging .Client = google_cloud_logging .Client ()
59+ self .logger : google_cloud_logging .Logger = logging_client .logger ("cloud_logging" )
60+ self .rpc_start : float
61+ self .rpc_end : float
5362
5463 def log_successful_request (
5564 self ,
56- method ,
57- customer_id ,
58- metadata_json ,
59- request_id ,
60- request ,
61- trailing_metadata_json ,
62- response ,
63- ):
65+ method : str ,
66+ customer_id : Optional [ str ] ,
67+ metadata_json : str ,
68+ request_id : str ,
69+ request : Any , # google.ads.googleads.vX.services.types.SearchGoogleAdsRequest or SearchGoogleAdsStreamRequest
70+ trailing_metadata_json : str ,
71+ response : Any , # grpc.Call or grpc.Future
72+ ) -> None :
6473 """Handles logging of a successful request.
6574
6675 Args:
@@ -78,15 +87,15 @@ def log_successful_request(
7887 # The response result could contain up to 10,000 rows of data,
7988 # so consider truncating this value before logging it, to save
8089 # on data storage costs and maintain readability.
81- result = self .retrieve_and_mask_result (response )
90+ result : Any = self .retrieve_and_mask_result (response )
8291
8392 # elapsed_ms is the approximate elapsed time of the RPC, in milliseconds.
8493 # There are different ways to define and measure elapsed time, so use
8594 # whatever approach makes sense for your monitoring purposes.
8695 # rpc_start and rpc_end are set in the intercept_unary_* methods below.
87- elapsed_ms = (self .rpc_end - self .rpc_start ) * 1000
96+ elapsed_ms : float = (self .rpc_end - self .rpc_start ) * 1000
8897
89- debug_log = {
98+ debug_log : Dict [ str , Any ] = {
9099 "method" : method ,
91100 "host" : metadata_json ,
92101 "request_id" : request_id ,
@@ -98,7 +107,7 @@ def log_successful_request(
98107 }
99108 self .logger .log_struct (debug_log , severity = "DEBUG" )
100109
101- info_log = {
110+ info_log : Dict [ str , Any ] = {
102111 "customer_id" : customer_id ,
103112 "method" : method ,
104113 "request_id" : request_id ,
@@ -110,14 +119,14 @@ def log_successful_request(
110119
111120 def log_failed_request (
112121 self ,
113- method ,
114- customer_id ,
115- metadata_json ,
116- request_id ,
117- request ,
118- trailing_metadata_json ,
119- response ,
120- ):
122+ method : str ,
123+ customer_id : Optional [ str ] ,
124+ metadata_json : str ,
125+ request_id : str ,
126+ request : Any , # google.ads.googleads.vX.services.types.SearchGoogleAdsRequest or SearchGoogleAdsStreamRequest
127+ trailing_metadata_json : str ,
128+ response : Any , # grpc.Call or grpc.Future
129+ ) -> None :
121130 """Handles logging of a failed request.
122131
123132 Args:
@@ -129,11 +138,11 @@ def log_failed_request(
129138 trailing_metadata_json: A JSON str of trailing_metadata.
130139 response: A JSON str of the response message.
131140 """
132- exception = self ._get_error_from_response (response )
133- exception_str = self ._parse_exception_to_str (exception )
134- fault_message = self ._get_fault_message (exception )
141+ exception : Any = self ._get_error_from_response (response )
142+ exception_str : str = self ._parse_exception_to_str (exception )
143+ fault_message : str = self ._get_fault_message (exception )
135144
136- info_log = {
145+ info_log : Dict [ str , Any ] = {
137146 "method" : method ,
138147 "endpoint" : self .endpoint ,
139148 "host" : metadata_json ,
@@ -145,7 +154,7 @@ def log_failed_request(
145154 }
146155 self .logger .log_struct (info_log , severity = "INFO" )
147156
148- error_log = {
157+ error_log : Dict [ str , Any ] = {
149158 "method" : method ,
150159 "endpoint" : self .endpoint ,
151160 "request_id" : request_id ,
@@ -155,7 +164,12 @@ def log_failed_request(
155164 }
156165 self .logger .log_struct (error_log , severity = "ERROR" )
157166
158- def intercept_unary_unary (self , continuation , client_call_details , request ):
167+ def intercept_unary_unary (
168+ self ,
169+ continuation : Callable [[_ClientCallDetails , Any ], Any ], # Any is request type
170+ client_call_details : _ClientCallDetails ,
171+ request : Any , # google.ads.googleads.vX.services.types.SearchGoogleAdsRequest
172+ ) -> Any : # grpc.Call or grpc.Future
159173 """Intercepts and logs API interactions.
160174
161175 Overrides abstract method defined in grpc.UnaryUnaryClientInterceptor.
@@ -171,15 +185,15 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
171185 A grpc.Call/grpc.Future instance representing a service response.
172186 """
173187 # Set the rpc_end value to current time when RPC completes.
174- def update_rpc_end (response_future ):
188+ def update_rpc_end (response_future : Any ) -> None : # response_future is grpc.Future
175189 self .rpc_end = time .perf_counter ()
176190
177191 # Capture precise clock time to later calculate approximate elapsed
178192 # time of the RPC.
179193 self .rpc_start = time .perf_counter ()
180194
181195 # The below call is REQUIRED.
182- response = continuation (client_call_details , request )
196+ response : Any = continuation (client_call_details , request ) # response is grpc.Call or grpc.Future
183197
184198 response .add_done_callback (update_rpc_end )
185199
@@ -189,8 +203,11 @@ def update_rpc_end(response_future):
189203 return response
190204
191205 def intercept_unary_stream (
192- self , continuation , client_call_details , request
193- ):
206+ self ,
207+ continuation : Callable [[_ClientCallDetails , Any ], Any ], # Any is request type
208+ client_call_details : _ClientCallDetails ,
209+ request : Any , # google.ads.googleads.vX.services.types.SearchGoogleAdsStreamRequest
210+ ) -> Any : # grpc.Call or grpc.Future
194211 """Intercepts and logs API interactions for Unary-Stream requests.
195212
196213 Overrides abstract method defined in grpc.UnaryStreamClientInterceptor.
@@ -206,7 +223,7 @@ def intercept_unary_stream(
206223 A grpc.Call/grpc.Future instance representing a service response.
207224 """
208225
209- def on_rpc_complete (response_future ):
226+ def on_rpc_complete (response_future : Any ) -> None : # response_future is grpc.Future
210227 self .rpc_end = time .perf_counter ()
211228 self .log_request (client_call_details , request , response_future )
212229
@@ -215,7 +232,7 @@ def on_rpc_complete(response_future):
215232 self .rpc_start = time .perf_counter ()
216233
217234 # The below call is REQUIRED.
218- response = continuation (client_call_details , request )
235+ response : Any = continuation (client_call_details , request ) # response is grpc.Call or grpc.Future
219236
220237 # Set self._cache to the cache on the response wrapper in order to
221238 # access the streaming logs. This is REQUIRED in order to log streaming
0 commit comments