13
13
# limitations under the License.
14
14
15
15
import functools
16
+ import inspect
16
17
import threading
17
18
from typing import List
18
19
20
+ from google .cloud import bigquery
21
+ import pandas
22
+
19
23
_lock = threading .Lock ()
20
24
21
25
# The limit is 64 (https://cloud.google.com/bigquery/docs/labels-intro#requirements),
22
26
# but leave a few spare for internal labels to be added.
23
27
# See internal issue 386825477.
24
28
MAX_LABELS_COUNT = 64 - 8
29
+ PANDAS_API_TRACKING_TASK = "pandas_api_tracking"
30
+ PANDAS_PARAM_TRACKING_TASK = "pandas_param_tracking"
25
31
26
32
_api_methods : List = []
27
33
_excluded_methods = ["__setattr__" , "__getattr__" ]
30
36
_call_stack : List = []
31
37
32
38
39
+ def submit_pandas_labels (
40
+ bq_client : bigquery .Client ,
41
+ class_name : str ,
42
+ method_name : str ,
43
+ args = (),
44
+ kwargs = {},
45
+ task : str = PANDAS_API_TRACKING_TASK ,
46
+ ):
47
+ """
48
+ Submits usage of API to BigQuery using a simulated failed query.
49
+
50
+ This function is designed to capture and log details about the usage of pandas methods,
51
+ including class and method names, the count of positional arguments, and any keyword
52
+ arguments that match the method's signature. To avoid incurring costs, it simulates a
53
+ query execution using a query with syntax errors.
54
+
55
+ Args:
56
+ bq_client (bigquery.Client): The client used to interact with BigQuery.
57
+ class_name (str): The name of the pandas class being used.
58
+ method_name (str): The name of the method being invoked.
59
+ args (tuple): The positional arguments passed to the method.
60
+ kwargs (dict): The keyword arguments passed to the method.
61
+ task (str): The specific task type for the logging event:
62
+ - 'PANDAS_API_TRACKING_TASK': Indicates that the unimplemented feature is a method.
63
+ - 'PANDAS_PARAM_TRACKING_TASK': Indicates that the unimplemented feature is a
64
+ parameter of a method.
65
+ """
66
+ labels_dict = {
67
+ "task" : task ,
68
+ "class_name" : class_name .lower (),
69
+ "method_name" : method_name .lower (),
70
+ "args_count" : len (args ),
71
+ }
72
+
73
+ if hasattr (pandas , class_name ):
74
+ cls = getattr (pandas , class_name )
75
+ else :
76
+ return
77
+
78
+ if hasattr (cls , method_name ):
79
+ method = getattr (cls , method_name )
80
+ else :
81
+ return
82
+
83
+ if kwargs :
84
+ # Iterate through the keyword arguments and add them to the labels dictionary if they
85
+ # are parameters that are implemented in pandas and the maximum label count has not been reached.
86
+ signature = inspect .signature (method )
87
+ param_names = [param .name for param in signature .parameters .values ()]
88
+
89
+ idx = 0
90
+ for key in kwargs .keys ():
91
+ if len (labels_dict ) >= MAX_LABELS_COUNT :
92
+ break
93
+ if key in param_names :
94
+ labels_dict [f"kwargs_{ idx } " ] = key .lower ()
95
+ idx += 1
96
+
97
+ # If this log is for tracking unimplemented parameters and no keyword arguments were
98
+ # provided, skip logging.
99
+ if len (labels_dict ) == 4 and task == PANDAS_PARAM_TRACKING_TASK :
100
+ return
101
+
102
+ # Run a query with syntax error to avoid cost.
103
+ query = "SELECT COUNT(x FROM data_table—"
104
+ job_config = bigquery .QueryJobConfig (labels = labels_dict )
105
+ bq_client .query (query , job_config = job_config )
106
+
107
+
33
108
def class_logger (decorated_cls ):
34
109
"""Decorator that adds logging functionality to each method of the class."""
35
110
for attr_name , attr_value in decorated_cls .__dict__ .items ():
@@ -46,7 +121,7 @@ def method_logger(method, decorated_cls):
46
121
"""Decorator that adds logging functionality to a method."""
47
122
48
123
@functools .wraps (method )
49
- def wrapper (* args , ** kwargs ):
124
+ def wrapper (self , * args , ** kwargs ):
50
125
class_name = decorated_cls .__name__ # Access decorated class name
51
126
api_method_name = str (method .__name__ )
52
127
full_method_name = f"{ class_name .lower ()} -{ api_method_name } "
@@ -58,7 +133,23 @@ def wrapper(*args, **kwargs):
58
133
_call_stack .append (full_method_name )
59
134
60
135
try :
61
- return method (* args , ** kwargs )
136
+ return method (self , * args , ** kwargs )
137
+ except (NotImplementedError , TypeError ) as e :
138
+ # Log method parameters that are implemented in pandas but either missing (TypeError)
139
+ # or not fully supported (NotImplementedError) in BigFrames.
140
+ # Logging is currently supported only when we can access the bqclient through
141
+ # self._block.expr.session.bqclient. Also, to avoid generating multiple queries
142
+ # because of internal calls, we log only when the method is directly invoked.
143
+ if hasattr (self , "_block" ) and len (_call_stack ) == 1 :
144
+ submit_pandas_labels (
145
+ self ._block .expr .session .bqclient ,
146
+ class_name ,
147
+ api_method_name ,
148
+ args ,
149
+ kwargs ,
150
+ task = PANDAS_PARAM_TRACKING_TASK ,
151
+ )
152
+ raise e
62
153
finally :
63
154
_call_stack .pop ()
64
155
0 commit comments