diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 9f72cd1bd..360bc0373 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -433,6 +433,10 @@ void ProcessGroupXCCL::setCompletedPgStatus( pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); pgStatus_->lastCompletedNumelIn = work->numelIn_; pgStatus_->lastCompletedNumelOut = work->numelOut_; + while (!work->isCompleted()) { + std::this_thread::sleep_for( + std::chrono::milliseconds(kSynchronizeBusyWaitMillis)); + } // To avoid complexity, we're not computing duration. FlightRecorderXCCL::get()->retire_id( work->trace_id_, /*compute_duration*/ false); @@ -455,7 +459,8 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( bool isP2P, const char* profilingTitle, const std::vector& inputs, - const std::vector& outputs) { + const std::vector& outputs, + bool record) { auto r = c10::make_intrusive( device, rank, @@ -466,20 +471,22 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( profilingTitle != nullptr ? std::optional>(inputs) : std::nullopt); - r->trace_id_ = FlightRecorderXCCL::get()->record( - local_id_, - std::make_tuple(pg_uid_, pg_desc_), // PG name tuple - seqCollective_, - seqP2P_, - op_id_, - profilingTitle ? profilingTitle : "", - inputs, - outputs, - nullptr, - r->xcclEndEvent_.get(), - options_->timeout, - pgStatus_, - isP2P); + if (record) { + r->trace_id_ = FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle ? profilingTitle : "", + inputs, + outputs, + nullptr, + r->xcclEndEvent_.get(), + options_->timeout, + pgStatus_, + isP2P); + } return r; } @@ -660,16 +667,19 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( const char* profilingTitle, bool nanCheck) { nanCheck &= enableNanCheck_; - seqCollective_++; auto device = inputs[0].device(); const auto key = std::to_string(device.index()); auto comm = getXCCLComm(key, device, opType); + if (!coalescing_state_) { + seqCollective_++; + } + op_id_++; + if (coalescing_state_ & CoalActive) { if ((coalescing_state_ & CoalColl) == 0) { seqCollective_++; } - op_id_++; coalescing_state_ |= CoalColl; if (coalescedDevice_.index() < 0) { coalescedDevice_ = device; @@ -710,8 +720,15 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( } c10::intrusive_ptr work; - work = - initWork(device, rank_, opType, false, profilingTitle, inputs, outputs); + work = initWork( + device, + rank_, + opType, + false, + profilingTitle, + inputs, + outputs, + !coalescing_state_); if (coalescing_state_) { FlightRecorderXCCL::get()->record( local_id_, diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index e7aa39c82..dc7552452 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -123,9 +123,6 @@ class TORCH_API ProcessGroupXCCL : public Backend { static c10::intrusive_ptr create() { return c10::make_intrusive(); } - - std::vector global_ranks_in_group; - std::string group_name; }; ProcessGroupXCCL( @@ -171,7 +168,8 @@ class TORCH_API ProcessGroupXCCL : public Backend { bool isP2P, const char* profilingTitle = nullptr, const std::vector& inputs = {}, - const std::vector& outputs = {}); + const std::vector& outputs = {}, + bool record = false); template c10::intrusive_ptr collective( diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index 916524073..f8a0389e4 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -1,16 +1,21 @@ # Owner(s): ["oncall: distributed"] +import json import math import os +import pickle import random import signal import sys +import tempfile +import threading import time -from datetime import timedelta +from datetime import datetime, timedelta from enum import auto, Enum from unittest import mock import torch +import torch._C._distributed_c10d import torch.distributed as c10d if not c10d.is_available() or not c10d.is_xccl_available(): @@ -627,6 +632,564 @@ def test_all_gather_into_tensor(self): ) +class XCCLTraceTestBase(MultiProcessTestCase): + def setUp(self): + super().setUp() + os.environ["TORCH_FR_BUFFER_SIZE"] = "1000" + self.tempdir = tempfile.TemporaryDirectory() + os.environ["TORCH_FR_DUMP_TEMP_FILE"] = self._trace_basename() + os.environ["TORCH_FR_DEBUG_INFO_PIPE_FILE"] = self._trace_basename() + self._spawn_processes() + + @classmethod + def _run( + cls, + parent_conn, + rank: int, + test_name: str, + file_name: str, + parent_pipe, + **kwargs, + ) -> None: + cls.parent = parent_conn + super()._run(rank, test_name, file_name, parent_pipe) + + @property + def local_device(self): + return torch.device("xpu", self.rank_to_GPU[self.rank][0]) + + def _join_processes(self, fn): + # We need to patch sys.exit() as skip_if will use sys.exit() and + # the exit code from the this process will not be caught. + with mock.patch("sys.exit"): + fn() + super()._join_processes(fn) + + def _spawn_processes(self) -> None: + proc = torch.multiprocessing.get_context("spawn").Process + self.children_pipes = [] + parent_pipes = [] + for _ in range(self.world_size): + parent_conn, child_conn = torch.multiprocessing.Pipe() + self.children_pipes.append(child_conn) + parent_pipes.append(parent_conn) + piter = iter(parent_pipes) + + def wrap(*positional, args, **kwargs): + args = (next(piter), *args) + return proc(*positional, args=args, **kwargs) + + self._start_processes(wrap) + + def _create_process_group_xccl( + self, timeout=timedelta(seconds=600), device_id=None + ): + store = c10d.FileStore(self.file_name, self.world_size) + c10d.init_process_group( + "xccl", + world_size=self.world_size, + rank=self.rank, + store=store, + timeout=timeout, + device_id=device_id, + ) + pg = c10d.distributed_c10d._get_default_group() + return pg + + def tearDown(self): + super().tearDown() + try: + os.remove(self.file_name) + except OSError: + pass + + @property + def world_size(self): + return 2 + + @property + def rank_to_GPU(self): + # return rank to GPU map + return init_multigpu_helper(self.world_size, "xccl") + + def _trace_basename(self): + # we pass the base to the env, and the dump util will append rank + return os.path.join(self.tempdir.name, "trace_") + + def _trace_name(self, rank): + return self._trace_basename() + str(rank) + + def started_or_scheduled(self, timing_enabled=False): + return "started" if timing_enabled else "scheduled" + + +class XCCLTraceTest(XCCLTraceTestBase): + def _verify_trace(self, t, include_collectives, is_json, timing_enabled=False): + ver = t["version"] + self.assertEqual(ver, "2.10") + comm_lib_version = t["comm_lib_version"] + torch_comm_lib_version = torch._C._distributed_c10d.get_xccl_version() + self.assertEqual(comm_lib_version, torch_comm_lib_version) + pg_config = t["pg_config"] + self.assertEqual(len(pg_config), 1) + default_pg_info = pg_config["0"] + self.assertIn("name", default_pg_info) + self.assertIn("desc", default_pg_info) + self.assertIn("ranks", default_pg_info) + pg_status = t["pg_status"] + self.assertEqual(len(pg_status), 1) + self.assertEqual(str(pg_status["0"]["last_enqueued_collective"]), "2") + self.assertEqual(str(pg_status["0"]["last_completed_collective"]), "2") + self.assertEqual( + str(pg_status["0"]["last_started_collective"]), + "2" if timing_enabled else "-1", + ) + global_ranks = pg_config["0"]["ranks"] + self.assertEqual(len(json.loads(global_ranks)), self.world_size) + if include_collectives: + self.assertEqual(len(t["entries"]), 2) + t = t["entries"] + last = t[-1] + self.assertEqual(last["thread_id"], str(threading.current_thread().ident)) + self.assertEqual(last["thread_name"], "fr_test_thread") + self.assertEqual(last["process_group"], ("0", "default_pg")) + self.assertEqual(last["state"], "completed") + s = last["time_discovered_started_ns"] + f = last["time_discovered_completed_ns"] + self.assertEqual(last["record_id"], 1) + self.assertIsNotNone(f) + if timing_enabled: + self.assertIsNotNone(s) + self.assertTrue(s <= f) + # we don't collect stack traces in JSON at the moment + if not is_json: + self.assertIn("test_c10d_xccl.py", str(last["frames"])) + self.assertEqual(last["input_sizes"], ((3, 4),)) + self.assertEqual(last["input_dtypes"], ["Float"]) + self.assertEqual(last["output_sizes"], ((3, 4),)) + self.assertEqual(last["output_dtypes"], ["Float"]) + self.assertEqual(last["collective_seq_id"], 2) + self.assertEqual(last["timeout_ms"], 600000) + now = datetime.now() + event_created_time = datetime.fromtimestamp( + last["time_created_ns"] / 1000000000 + ) + before_test = now - timedelta(minutes=1) + self.assertTrue(before_test < event_created_time < now) + if timing_enabled: + # very loose bounds, measured 0.036 ms on devgpu + self.assertTrue(0 < last["duration_ms"] < 100) + else: + self.assertTrue("duration_ms" not in last) + else: + self.assertTrue("entries" not in t) + + def load_libpthread_or_libc(self): + import ctypes.util + + for base in ("pthread", "c"): + path = ctypes.util.find_library(base) + if path: + try: + return ctypes.CDLL(path) + except OSError: + continue + raise RuntimeError("Could not load pthread or libc") + + # Directly set thread name using threading.current_thread().name does not work + # because we use pthread_getname_np to get the thread’s OS-level name in C++ + def set_thread_name(self, name): + import ctypes + + lib = self.load_libpthread_or_libc() + pthread_self = lib.pthread_self + pthread_self.restype = ctypes.c_void_p + pthread_setname_np = lib.pthread_setname_np + pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + + # Get current pthread handle + tid = pthread_self() + + # Set name + pthread_setname_np(tid, name.encode()) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("include_collectives", [True, False]) + def test_short_pickle(self, include_collectives, timing_enabled=False): + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + device = self.local_device + self.set_thread_name("fr_test_thread") + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + # gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api + time.sleep(1) + t = pickle.loads( + torch._C._distributed_c10d._dump_xccl_trace( + includeCollectives=include_collectives + ) + ) + self._verify_trace( + t, + include_collectives=include_collectives, + is_json=True, + timing_enabled=timing_enabled, + ) + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_dump_pipe(self): + def open_file_with_timeout(file_path, mode, timeout=1.0): + start_time = time.time() + while time.time() - start_time < timeout: + if os.path.exists(file_path): + return open(file_path, mode) + time.sleep(0.1) + raise FileNotFoundError + + if self.rank == self.MAIN_PROCESS_RANK: + for c in self.children_pipes: + self.assertEqual(c.recv(), "next") + + dump_file = self._trace_name(rank=0) + pipe_file = dump_file + ".pipe" + with open_file_with_timeout(pipe_file, "w") as f: + f.write("1\n") + with open_file_with_timeout(dump_file, "rb", timeout=10.0) as f: + self.assertTrue("all_reduce" in str(pickle.load(f))) + + for c in self.children_pipes: + c.send("next") + return + + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + self.parent.send("next") + self.parent.recv() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_long(self): + os.environ["TORCH_FR_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + # test some other primitives to make sure + # their strings are valid + xs = [torch.ones(3, 4, device=device)] + pg.broadcast(xs).wait() + pg.allreduce(xs).wait() + # pg.reduce(xs).wait() //Currently failing on XPU + ys = [[torch.empty(3, 4, device=device) for _ in range(self.world_size)]] + pg.allgather(ys, xs).wait() + pg.reduce_scatter(xs, ys).wait() + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 10) + first = t[0] + last = t[-1] + self.assertEqual(last["profiling_name"], "xccl:all_reduce") + self.assertEqual(last["state"], "completed") + self.assertIn("test_c10d_xccl.py", str(last["frames"])) + self.assertEqual(last["input_sizes"], ((3, 4),)) + self.assertEqual(last["input_dtypes"], ["Float"]) + self.assertEqual(last["output_sizes"], ((3, 4),)) + self.assertEqual(last["output_dtypes"], ["Float"]) + self.assertEqual(last["timeout_ms"], 600000) + self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9) + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_barrier_profiling(self): + os.environ["TORCH_FR_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + f = pg.barrier() + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 2) + first = t[0] + last = t[-1] + self.assertEqual(first["profiling_name"], "xccl:all_reduce_barrier") + self.assertEqual(last["profiling_name"], "xccl:all_reduce") + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize( + "op_sizes_per_coalesce", + [ + [(2, 3)], + [(2, 3), (5, 5), (1,)], + ], + ) + @parametrize("timing_enabled", [False]) + def test_batched_send_recv(self, op_sizes_per_coalesce, timing_enabled): + """ + 'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use + a destructed Work obj's cuda events + """ + + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + + num_coalesced_ops = 20 + ops_per_coalesce = len(op_sizes_per_coalesce) + for _ in range(num_coalesced_ops): + ops = [] + for input_sizes in op_sizes_per_coalesce: + tensor = torch.zeros(input_sizes).to(self.local_device) + if self.rank == 0: + ops.append(dist.P2POp(dist.irecv, tensor, 1)) + elif self.rank == 1: + tensor *= 2 + ops.append(dist.P2POp(dist.isend, tensor, 0)) + + dist.batch_isend_irecv(ops).pop().wait() + + torch.xpu.synchronize(device=self.local_device) + + if timing_enabled: + # wait for watchdog thread to process the queue of works + time.sleep(1) + + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + self.assertEqual(len(t["entries"]), num_coalesced_ops * ops_per_coalesce) + + expected_record_id = 0 + expected_seq = 1 + expected_op_id = 1 + for seq in range(num_coalesced_ops): + first_op = seq * (ops_per_coalesce) + coalesced_op = first_op + ops_per_coalesce + for p2p_op_idx, input_sizes in zip( + range(first_op, coalesced_op, 1), op_sizes_per_coalesce + ): + # the indivudal ops inside the coalescing group the individual op metadata, + # but not the timing info coming from the actual coalesced kernel + profiling_name = ( + "xccl:recv 0<-1" if self.rank == 0 else "xccl:send 1->0" + ) + self.assertEqual( + t["entries"][p2p_op_idx]["record_id"], expected_record_id + ) + expected_record_id += 1 + self.assertEqual( + t["entries"][p2p_op_idx]["profiling_name"], profiling_name + ) + # we don't increment collective_seq_id for p2p ops. + self.assertEqual(t["entries"][p2p_op_idx]["collective_seq_id"], 0) + self.assertEqual(t["entries"][p2p_op_idx]["p2p_seq_id"], expected_seq) + self.assertEqual(t["entries"][p2p_op_idx]["op_id"], expected_op_id) + expected_op_id += 1 + self.assertEqual(t["entries"][p2p_op_idx]["input_sizes"], [input_sizes]) + self.assertEqual( + t["entries"][p2p_op_idx]["output_sizes"], [input_sizes] + ) + # duration doesn't get tagged onto individual ops yet, nor is their state updated + self.assertEqual(t["entries"][p2p_op_idx]["state"], "scheduled") + self.assertTrue("duration_ms" not in t["entries"][p2p_op_idx]) + + # coalesced ops not yet supported in FR + expected_seq += 1 + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize( + "op_sizes", + [ + [(2, 3)], + [(2, 3), (5, 5), (1,)], + ], + ) + @parametrize("timing_enabled", [False]) + def test_individual_send_recv(self, op_sizes, timing_enabled): + """ + 'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use + a destructed Work obj's cuda events + """ + + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + num_repeats = 10 + ops_per_repeat = len(op_sizes) + for _ in range(num_repeats): + for input_sizes in op_sizes: + tensor = torch.zeros(input_sizes).to(self.local_device) + if self.rank == 0: + dist.recv(tensor, 1) + elif self.rank == 1: + tensor *= 2 + dist.send(tensor, 0) + + torch.xpu.synchronize(device=self.local_device) + if timing_enabled: + # wait for watchdog thread to process the queue of works + time.sleep(1) + + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + self.assertEqual(len(t["entries"]), num_repeats * (ops_per_repeat)) + expected_seq = 1 + expected_op_id = 1 + for seq in range(num_repeats * ops_per_repeat): + input_sizes = op_sizes[seq % ops_per_repeat] + profiling_name = "xccl:recv 0<-1" if self.rank == 0 else "xccl:send 1->0" + self.assertEqual(t["entries"][seq]["profiling_name"], profiling_name) + # we don't increment collective_seq_id for p2p ops. + self.assertEqual(t["entries"][seq]["collective_seq_id"], 0) + self.assertEqual(t["entries"][seq]["p2p_seq_id"], expected_seq) + expected_seq += 1 + self.assertEqual(t["entries"][seq]["op_id"], expected_op_id) + expected_op_id += 1 + self.assertEqual(t["entries"][seq]["input_sizes"], [input_sizes]) + self.assertEqual(t["entries"][seq]["output_sizes"], [input_sizes]) + self.assertEqual(t["entries"][seq]["state"], "completed") + + if timing_enabled: + duration = t["entries"][seq]["duration_ms"] + self.assertTrue(0.001 < duration < 10000, duration) + else: + self.assertTrue("duration_ms" not in t["entries"][seq]) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("timing_enabled", [False]) + def test_allgather_uneven(self, timing_enabled): + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + + output_split_sizes = [i + 1 for i in range(self.world_size)] + sum_len = sum(output_split_sizes) + output_tensor = torch.zeros(sum_len, 2).to(self.rank) + expected_tensor = torch.ones(sum_len, 2).to(self.rank) + input_tensor = torch.ones(output_split_sizes[self.rank], 2).to(self.rank) + + dist.all_gather( + list(torch.split(output_tensor, output_split_sizes)), input_tensor + ) + torch.xpu.synchronize(device=self.rank) + self.assertEqual(output_tensor, expected_tensor) + if timing_enabled: + # wait for watchdog thread to process the queue of works + time.sleep(1) + + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + self.assertEqual(len(t["entries"]), self.world_size) + for i in range(self.world_size): + self.assertEqual(t["entries"][i]["profiling_name"], "xccl:_broadcast_oop") + # collective_seq_id should be incremented once. + self.assertEqual(t["entries"][i]["collective_seq_id"], 1) + self.assertEqual(t["entries"][i]["input_sizes"], [[i + 1, 2]]) + self.assertEqual( + t["entries"][i]["output_sizes"], + [[i + 1, 2]], + ) + self.assertEqual(t["entries"][i]["state"], "scheduled") + # No event is recorded for individual ops + self.assertTrue("time_discovered_completed_ns" in t["entries"][i]) + # TODO: (frost-intel) Add coalesced op recording for FR + + # TODO(whc) test out other ops (And combinations of ops, if that's valid?) + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("timing_enabled", [False]) + def test_coalescing_manager_collective(self, timing_enabled): + """ + The coalescing manager api works by accumulating operations in python via a contextmanager, and then making + one call into c++ to an _coalesced API. It has limited support for ops and has been added recently to + avoid overheads of making individual py-cpp calls. This complicates flight recording.. + + For now, flight recording of coalescing_manager collectives is less detailed than cpp coalesced collectives. + """ + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + + output_tensors = torch.zeros(2, 2).to(self.rank) + input_tensors = [torch.ones(2, 2).to(self.rank) for _ in range(self.world_size)] + + # TODO(whc) make this work with bigger world or something + self.assertEqual(self.world_size, 2, self.world_size) + + with dist._coalescing_manager(): + for i in range(self.world_size): + dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i]) + self.assertEqual(output_tensors, input_tensors[self.rank] * self.world_size) + + torch.xpu.synchronize(device=self.rank) + + if timing_enabled: + # wait for watchdog thread to process the queue of works + time.sleep(1) + + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + + self.assertEqual( + len(t["entries"]), 1 + ) # one for the reduce_scatter_tensor_coalesced + self.assertEqual( + t["entries"][0]["profiling_name"], "xccl:reduce_scatter_tensor_coalesced" + ) + # collective_seq_id should be incremented once. + self.assertEqual(t["entries"][0]["collective_seq_id"], 1) + self.assertEqual(t["entries"][0]["input_sizes"], [[2, 2], [2, 2]]) + self.assertEqual( + t["entries"][0]["output_sizes"], + [ + [ + 2, + ], + [ + 2, + ], + ], + ) + self.assertEqual(t["entries"][0]["state"], "completed") + if timing_enabled: + duration = t["entries"][0]["duration_ms"] + self.assertTrue(0.001 < duration < 10000, duration) + else: + self.assertTrue("duration_ms" not in t["entries"][0]) + + +instantiate_parametrized_tests(XCCLTraceTest) instantiate_parametrized_tests(ProcessGroupXCCLTest)