Skip to content

Commit d177e02

Browse files
Treehugger RobotGerrit Code Review
authored andcommitted
Merge changes I50e1fcd7,I32f4eadb
* changes: btp: augment query exception with metadata about trace btp: add support for incrementing stats on query/execute as well
2 parents 1336284 + 88dd422 commit d177e02

File tree

2 files changed

+89
-43
lines changed

2 files changed

+89
-43
lines changed

python/perfetto/batch_trace_processor/api.py

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,44 +38,55 @@
3838
PLATFORM_DELEGATE = PlatformDelegate
3939

4040
TraceListReference = registry.TraceListReference
41+
Metadata = Dict[str, str]
4142

4243

43-
# Enum encoding how errors while loading traces in BatchTraceProcessor should
44-
# be handled.
45-
class LoadFailureHandling(Enum):
46-
# If any trace fails to load, raises an exception causing the entire batch
47-
# trace processor to fail.
44+
# Enum encoding how errors while loading/querying traces in BatchTraceProcessor
45+
# should be handled.
46+
class FailureHandling(Enum):
47+
# If any trace fails to load or be queried, raises an exception causing the
48+
# entire batch trace processor to fail.
4849
# This is the default behaviour and the method which should be preferred for
4950
# any interactive use of BatchTraceProcessor.
5051
RAISE_EXCEPTION = 0
5152

52-
# If a trace fails to load, the trace processor for that trace is dropped but
53-
# loading of other traces is unaffected. |load_failures| is incremented in the
54-
# Stats class for the batch trace processor instance.
53+
# If a trace fails to load or be queried, the trace processor for that trace
54+
# is dropped but loading of other traces is unaffected. A failure integer is
55+
# incremented in the Stats class for the batch trace processor instance.
5556
INCREMENT_STAT = 1
5657

5758

5859
@dc.dataclass
5960
class BatchTraceProcessorConfig:
6061
tp_config: TraceProcessorConfig
61-
load_failure_handling: LoadFailureHandling
62-
63-
def __init__(self,
64-
tp_config: TraceProcessorConfig = TraceProcessorConfig(),
65-
load_failure_handling: LoadFailureHandling = LoadFailureHandling
66-
.RAISE_EXCEPTION):
62+
load_failure_handling: FailureHandling
63+
query_failure_handling: FailureHandling
64+
65+
def __init__(
66+
self,
67+
tp_config: TraceProcessorConfig = TraceProcessorConfig(),
68+
load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION,
69+
execute_failure_handling: FailureHandling = FailureHandling
70+
.RAISE_EXCEPTION,
71+
):
6772
self.tp_config = tp_config
6873
self.load_failure_handling = load_failure_handling
74+
self.execute_failure_handling = execute_failure_handling
6975

7076

7177
# Contains stats about the events which happened during the use of
7278
# BatchTraceProcessor.
7379
@dc.dataclass
7480
class Stats:
7581
# The number of traces which failed to load; only non-zero if
76-
# LoadFailureHandling.INCREMENT_STAT is chosen as the handling type.
82+
# FailureHanding.INCREMENT_STAT is chosen as the load failure handling type.
7783
load_failures: int = 0
7884

85+
# The number of traces which failed while executing (query, metric or
86+
# arbitary function); only non-zero if FailureHanding.INCREMENT_STAT is
87+
# chosen as the execute failure handling type.
88+
execute_failures: int = 0
89+
7990

8091
class BatchTraceProcessor:
8192
"""Run ad-hoc SQL queries across many Perfetto traces.
@@ -121,7 +132,7 @@ def __init__(self,
121132
trace processor and underlying trace processors.
122133
"""
123134

124-
self.tps = None
135+
self.tps_and_metadata = None
125136
self.closed = False
126137
self._stats = Stats()
127138

@@ -148,8 +159,7 @@ def __init__(self,
148159
len(resolved)) or query_executor
149160

150161
self.query_executor = query_executor
151-
self.metadata = [t.metadata for t in resolved]
152-
self.tps = [
162+
self.tps_and_metadata = [
153163
x for x in load_exectuor.map(self._create_tp, resolved) if x is not None
154164
]
155165

@@ -266,7 +276,12 @@ def execute(self, fn: Callable[[TraceProcessor], Any]) -> List[Any]:
266276
A list of values with the result of executing the fucntion (one per
267277
trace).
268278
"""
269-
return list(self.query_executor.map(fn, self.tps))
279+
280+
def wrapped(pair: Tuple[TraceProcessor, Metadata]):
281+
(tp, metadata) = pair
282+
return self._execute_handling_failure(fn, tp, metadata)
283+
284+
return list(self.query_executor.map(wrapped, self.tps_and_metadata))
270285

271286
def execute_and_flatten(self, fn: Callable[[TraceProcessor], pd.DataFrame]
272287
) -> pd.DataFrame:
@@ -285,15 +300,15 @@ def execute_and_flatten(self, fn: Callable[[TraceProcessor], pd.DataFrame]
285300
be added to the dataframe (see |query_and_flatten| for details).
286301
"""
287302

288-
def wrapped(pair: Tuple[TraceProcessor, Dict[str, str]]):
303+
def wrapped(pair: Tuple[TraceProcessor, Metadata]):
289304
(tp, metadata) = pair
290-
df = fn(tp)
305+
df = self._execute_handling_failure(fn, tp, metadata)
291306
for key, value in metadata.items():
292307
df[key] = value
293308
return df
294309

295310
df = pd.concat(
296-
list(self.query_executor.map(wrapped, zip(self.tps, self.metadata))))
311+
list(self.query_executor.map(wrapped, self.tps_and_metadata)))
297312
return df.reset_index(drop=True)
298313

299314
def close(self):
@@ -309,8 +324,8 @@ def close(self):
309324
return
310325
self.closed = True
311326

312-
if self.tps:
313-
for tp in self.tps:
327+
if self.tps_and_metadata:
328+
for tp, _ in self.tps_and_metadata:
314329
tp.close()
315330

316331
def stats(self):
@@ -319,17 +334,28 @@ def stats(self):
319334
See |Stats| class definition for the list of the statistics available."""
320335
return self._stats
321336

322-
def _create_tp(self,
323-
trace: ResolverRegistry.Result) -> Optional[TraceProcessor]:
337+
def _create_tp(self, trace: ResolverRegistry.Result
338+
) -> Optional[Tuple[TraceProcessor, Metadata]]:
324339
try:
325-
return TraceProcessor(trace=trace.generator, config=self.config.tp_config)
340+
return TraceProcessor(
341+
trace=trace.generator, config=self.config.tp_config), trace.metadata
326342
except TraceProcessorException as ex:
327-
if self.config.load_failure_handling == \
328-
LoadFailureHandling.RAISE_EXCEPTION:
343+
if self.config.load_failure_handling == FailureHandling.RAISE_EXCEPTION:
329344
raise ex
330345
self._stats.load_failures += 1
331346
return None
332347

348+
def _execute_handling_failure(self, fn: Callable[[TraceProcessor], Any],
349+
tp: TraceProcessor, metadata: Metadata):
350+
try:
351+
return fn(tp)
352+
except TraceProcessorException as ex:
353+
if self.config.execute_failure_handling == \
354+
FailureHandling.RAISE_EXCEPTION:
355+
raise TraceProcessorException(f'{metadata} {ex}') from None
356+
self._stats.execute_failures += 1
357+
return pd.DataFrame()
358+
333359
def __enter__(self):
334360
return self
335361

python/test/api_integrationtest.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import pandas as pd
2121

2222
from perfetto.batch_trace_processor.api import BatchTraceProcessor
23-
from perfetto.batch_trace_processor.api import LoadFailureHandling
2423
from perfetto.batch_trace_processor.api import BatchTraceProcessorConfig
24+
from perfetto.batch_trace_processor.api import FailureHandling
2525
from perfetto.batch_trace_processor.api import TraceListReference
2626
from perfetto.trace_processor.api import PLATFORM_DELEGATE
2727
from perfetto.trace_processor.api import TraceProcessor
@@ -90,17 +90,18 @@ def resolve(self):
9090

9191
def create_batch_tp(
9292
traces: TraceListReference,
93-
failure_handling: LoadFailureHandling = LoadFailureHandling.RAISE_EXCEPTION
93+
load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION,
94+
execute_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION,
9495
):
95-
default = PLATFORM_DELEGATE().default_resolver_registry()
96-
default.register(SimpleResolver)
97-
default.register(RecursiveResolver)
98-
return BatchTraceProcessor(
99-
traces=traces,
100-
config=BatchTraceProcessorConfig(
101-
load_failure_handling=failure_handling,
102-
tp_config=TraceProcessorConfig(
103-
bin_path=os.environ["SHELL_PATH"], resolver_registry=default)))
96+
registry = PLATFORM_DELEGATE().default_resolver_registry()
97+
registry.register(SimpleResolver)
98+
registry.register(RecursiveResolver)
99+
config = BatchTraceProcessorConfig(
100+
load_failure_handling=load_failure_handling,
101+
execute_failure_handling=execute_failure_handling,
102+
tp_config=TraceProcessorConfig(
103+
bin_path=os.environ["SHELL_PATH"], resolver_registry=registry))
104+
return BatchTraceProcessor(traces=traces, config=config)
104105

105106

106107
def create_tp(trace: TraceReference):
@@ -229,13 +230,32 @@ def test_recursive_resolver(self):
229230
df = btp.query_and_flatten('select dur from slice limit 1')
230231
pd.testing.assert_frame_equal(df, expected, check_dtype=False)
231232

232-
def test_btp_failure(self):
233+
def test_btp_load_failure(self):
233234
f = io.BytesIO(b'<foo></foo>')
234235
with self.assertRaises(TraceProcessorException):
235236
_ = create_batch_tp(traces=f)
236237

237-
def test_btp_failure_increment_stat(self):
238+
def test_btp_load_failure_increment_stat(self):
238239
f = io.BytesIO(b'<foo></foo>')
239240
btp = create_batch_tp(
240-
traces=f, failure_handling=LoadFailureHandling.INCREMENT_STAT)
241+
traces=f, load_failure_handling=FailureHandling.INCREMENT_STAT)
241242
self.assertEqual(btp.stats().load_failures, 1)
243+
244+
def test_btp_query_failure(self):
245+
btp = create_batch_tp(traces=example_android_trace_path())
246+
with self.assertRaises(TraceProcessorException):
247+
_ = btp.query('select * from sl')
248+
249+
def test_btp_query_failure_increment_stat(self):
250+
btp = create_batch_tp(
251+
traces=example_android_trace_path(),
252+
execute_failure_handling=FailureHandling.INCREMENT_STAT)
253+
_ = btp.query('select * from sl')
254+
self.assertEqual(btp.stats().execute_failures, 1)
255+
256+
def test_btp_query_failure_message(self):
257+
btp = create_batch_tp(
258+
traces='simple:path={}'.format(example_android_trace_path()))
259+
with self.assertRaisesRegex(
260+
TraceProcessorException, expected_regex='.*source.*generator.*'):
261+
_ = btp.query('select * from sl')

0 commit comments

Comments
 (0)