Skip to content

Commit 1e79731

Browse files
Northbadgemtrofin
authored andcommitted
Switch file_paths to ModuleSpec (#57)
Preliminary patch to refactoring file_paths tuple usage completely. Allows distributed thinlto corpora to be trained on
1 parent 3004b74 commit 1e79731

10 files changed

+126
-106
lines changed

compiler_opt/rl/compilation_runner.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from absl import flags
2525
from compiler_opt.distributed.worker import Worker, WorkerFuture
2626
from compiler_opt.rl import constant
27+
from compiler_opt.rl import corpus
2728
import tensorflow as tf
2829

2930
_COMPILATION_TIMEOUT = flags.DEFINE_integer(
@@ -261,7 +262,7 @@ class CompilationRunnerStub(metaclass=abc.ABCMeta):
261262

262263
@abc.abstractmethod
263264
def collect_data(
264-
self, file_paths: Tuple[str, ...], tf_policy_path: str,
265+
self, module_spec: corpus.ModuleSpec, tf_policy_path: str,
265266
reward_stat: Optional[Dict[str, RewardStat]]
266267
) -> WorkerFuture[CompilationResult]:
267268
raise NotImplementedError()
@@ -313,12 +314,12 @@ def cancel_all_work(self):
313314
self._cancellation_manager.kill_all_processes()
314315

315316
def collect_data(
316-
self, file_paths: Tuple[str, ...], tf_policy_path: str,
317+
self, module_spec: corpus.ModuleSpec, tf_policy_path: str,
317318
reward_stat: Optional[Dict[str, RewardStat]]) -> CompilationResult:
318319
"""Collect data for the given IR file and policy.
319320
320321
Args:
321-
file_paths: path to files needed for inlining, Tuple of (.bc, .cmd).
322+
module_spec: a ModuleSpec.
322323
tf_policy_path: path to the tensorflow policy.
323324
reward_stat: reward stat of this module, None if unknown.
324325
cancellation_token: a CancellationToken through which workers may be
@@ -336,7 +337,7 @@ def collect_data(
336337
"""
337338
if reward_stat is None:
338339
default_result = self._compile_fn(
339-
file_paths,
340+
module_spec,
340341
tf_policy_path='',
341342
reward_only=bool(tf_policy_path),
342343
cancellation_manager=self._cancellation_manager)
@@ -346,7 +347,7 @@ def collect_data(
346347

347348
if tf_policy_path:
348349
policy_result = self._compile_fn(
349-
file_paths,
350+
module_spec,
350351
tf_policy_path,
351352
reward_only=False,
352353
cancellation_manager=self._cancellation_manager)
@@ -362,7 +363,7 @@ def collect_data(
362363
if k not in reward_stat:
363364
raise ValueError(
364365
(f'Example {k} does not exist under default policy for '
365-
'module {file_paths[0]}'))
366+
f'module {module_spec.name}'))
366367
default_reward = reward_stat[k].default_reward
367368
moving_average_reward = reward_stat[k].moving_average_reward
368369
sequence_example = _overwrite_trajectory_reward(
@@ -384,13 +385,14 @@ def collect_data(
384385
keys=keys)
385386

386387
def _compile_fn(
387-
self, file_paths: Tuple[str, ...], tf_policy_path: str, reward_only: bool,
388+
self, module_spec: corpus.ModuleSpec, tf_policy_path: str,
389+
reward_only: bool,
388390
cancellation_manager: Optional[WorkerCancellationManager]
389391
) -> Dict[str, Tuple[tf.train.SequenceExample, float]]:
390392
"""Compiles for the given IR file under the given policy.
391393
392394
Args:
393-
file_paths: path to files needed for compilation.
395+
module_spec: a ModuleSpec.
394396
tf_policy_path: path to TF policy directory on local disk.
395397
reward_only: whether only return reward.
396398
cancellation_manager: a WorkerCancellationManager to handle early

compiler_opt/rl/compilation_runner_test.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
from compiler_opt.rl import compilation_runner
3030
from compiler_opt.rl import constant
31+
from compiler_opt.rl import corpus
3132

3233
_DEFAULT_FEATURE_VALUE = 12
3334
_POLICY_FEATURE_VALUE = 34
@@ -107,7 +108,7 @@ def test_policy(self, mock_compile_fn):
107108
runner = compilation_runner.CompilationRunner(
108109
moving_average_decay_rate=_MOVING_AVERAGE_DECAY_RATE)
109110
data = runner.collect_data(
110-
file_paths=('bc', 'cmd'),
111+
module_spec=corpus.ModuleSpec(name='dummy'),
111112
tf_policy_path='policy_path',
112113
reward_stat=None)
113114
self.assertEqual(2, mock_compile_fn.call_count)
@@ -138,7 +139,9 @@ def test_default(self, mock_compile_fn):
138139
moving_average_decay_rate=_MOVING_AVERAGE_DECAY_RATE)
139140

140141
data = runner.collect_data(
141-
file_paths=('bc', 'cmd'), tf_policy_path='', reward_stat=None)
142+
module_spec=corpus.ModuleSpec(name='dummy'),
143+
tf_policy_path='',
144+
reward_stat=None)
142145
# One call when we ask for the default policy, because it can provide both
143146
# trace and default size.
144147
self.assertEqual(1, mock_compile_fn.call_count)
@@ -167,7 +170,7 @@ def test_given_default_size(self, mock_compile_fn):
167170
moving_average_decay_rate=_MOVING_AVERAGE_DECAY_RATE)
168171

169172
data = runner.collect_data(
170-
file_paths=('bc', 'cmd'),
173+
module_spec=corpus.ModuleSpec(name='dummy'),
171174
tf_policy_path='policy_path',
172175
reward_stat={
173176
'default':
@@ -204,7 +207,7 @@ def test_exception_handling(self, mock_compile_fn):
204207

205208
with self.assertRaisesRegex(subprocess.CalledProcessError, 'error'):
206209
_ = runner.collect_data(
207-
file_paths=('bc', 'cmd'),
210+
module_spec=corpus.ModuleSpec(name='dummy'),
208211
tf_policy_path='policy_path',
209212
reward_stat=None)
210213
self.assertEqual(1, mock_compile_fn.call_count)

compiler_opt/rl/corpus.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# coding=utf-8
2+
# Copyright 2020 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""ModuleSpec definition and utility command line parsing functions."""
16+
17+
from dataclasses import dataclass
18+
19+
20+
@dataclass(frozen=True)
21+
class ModuleSpec:
22+
"""Dataclass describing an input module and its compilation command options.
23+
"""
24+
name: str
25+
has_thinlto: bool = False

compiler_opt/rl/inlining/inlining_runner.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import tensorflow as tf
2424

2525
from compiler_opt.rl import compilation_runner
26+
from compiler_opt.rl import corpus
2627

2728
_DEFAULT_IDENTIFIER = 'default'
2829

@@ -45,14 +46,14 @@ def __init__(self, llvm_size_path: str, *args, **kwargs):
4546
self._llvm_size_path = llvm_size_path
4647

4748
def _compile_fn(
48-
self, file_paths: Tuple[str, str], tf_policy_path: str, reward_only: bool,
49-
cancellation_manager: Optional[
49+
self, module_spec: corpus.ModuleSpec, tf_policy_path: str,
50+
reward_only: bool, cancellation_manager: Optional[
5051
compilation_runner.WorkerCancellationManager]
5152
) -> Dict[str, Tuple[tf.train.SequenceExample, float]]:
5253
"""Run inlining for the given IR file under the given policy.
5354
5455
Args:
55-
file_paths: path to files needed for inlining, Tuple of (.bc, .cmd).
56+
module_spec: a ModuleSpec.
5657
tf_policy_path: path to TF policy direcoty on local disk.
5758
reward_only: whether only return native size.
5859
cancellation_manager: handler for early termination by killing any running
@@ -75,24 +76,22 @@ def _compile_fn(
7576
log_path = os.path.join(working_dir, 'log')
7677
output_native_path = os.path.join(working_dir, 'native')
7778

78-
input_ir_path, cmd_path = file_paths
79-
8079
sequence_example = tf.train.SequenceExample()
8180
native_size = 0
8281
try:
8382
command_line = []
8483
if self._launcher_path:
8584
command_line.append(self._launcher_path)
86-
command_line.extend([self._clang_path] +
87-
compilation_runner.get_command_line_for_bundle(
88-
cmd_path,
89-
input_ir_path,
90-
additional_flags=self._additional_flags,
91-
delete_flags=self._delete_flags) + [
92-
'-mllvm', '-enable-ml-inliner=development',
93-
'-mllvm', '-training-log=' +
94-
log_path, '-o', output_native_path
95-
])
85+
command_line.extend(
86+
[self._clang_path] + compilation_runner.get_command_line_for_bundle(
87+
module_spec.name + '.cmd',
88+
module_spec.name + '.bc', (module_spec.name + '.thinlto.bc'
89+
) if module_spec.has_thinlto else None,
90+
additional_flags=self._additional_flags,
91+
delete_flags=self._delete_flags) + [
92+
'-mllvm', '-enable-ml-inliner=development', '-mllvm',
93+
'-training-log=' + log_path, '-o', output_native_path
94+
])
9695
if tf_policy_path:
9796
command_line.extend(
9897
['-mllvm', '-ml-inliner-model-under-training=' + tf_policy_path])

compiler_opt/rl/local_data_collector.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from compiler_opt.distributed import worker
2727
from compiler_opt.rl import compilation_runner
28+
from compiler_opt.rl import corpus
2829
from compiler_opt.rl import data_collector
2930

3031

@@ -33,7 +34,7 @@ class LocalDataCollector(data_collector.DataCollector):
3334

3435
def __init__(
3536
self,
36-
file_paths: Tuple[Tuple[str, ...], ...],
37+
module_specs: List[corpus.ModuleSpec],
3738
num_modules: int,
3839
worker_pool: List[compilation_runner.CompilationRunnerStub],
3940
parser: Callable[[List[str]], Iterator[trajectory.Trajectory]],
@@ -43,7 +44,7 @@ def __init__(
4344
# TODO(mtrofin): type exit_checker_ctor when we get typing.Protocol support
4445
super().__init__()
4546

46-
self._file_paths = file_paths
47+
self._module_specs = module_specs
4748
self._num_modules = num_modules
4849
self._parser = parser
4950
self._worker_pool = worker_pool
@@ -55,7 +56,7 @@ def __init__(
5556
# with the training phase - i.e. whatever happens between successive data
5657
# collection calls.
5758
self._reset_workers: concurrent.futures.Future = None
58-
self._current_work: List[Tuple[Tuple[str, ...], worker.WorkerFuture]] = []
59+
self._current_work: List[Tuple[corpus.ModuleSpec, worker.WorkerFuture]] = []
5960
self._pool = concurrent.futures.ThreadPoolExecutor()
6061

6162
def close_pool(self):
@@ -77,14 +78,13 @@ def _join_pending_jobs(self):
7778
time.time() - t1)
7879

7980
def _schedule_jobs(
80-
self, policy_path, sampled_file_paths
81+
self, policy_path: str, sampled_modules: List[corpus.ModuleSpec]
8182
) -> List[worker.WorkerFuture[compilation_runner.CompilationResult]]:
8283
# by now, all the pending work, which was signaled to cancel, must've
8384
# finished
8485
self._join_pending_jobs()
85-
jobs = [(file_paths, policy_path,
86-
self._reward_stat_map['-'.join(file_paths)])
87-
for file_paths in sampled_file_paths]
86+
jobs = [(module_spec, policy_path, self._reward_stat_map[module_spec.name])
87+
for module_spec in sampled_modules]
8888

8989
# Naive load balancing.
9090
ret = []
@@ -108,8 +108,8 @@ def collect_data(
108108
They will be reported using `tf.scalar.summary` by the trainer so these
109109
information is viewable in TensorBoard.
110110
"""
111-
sampled_file_paths = random.sample(self._file_paths, k=self._num_modules)
112-
results = self._schedule_jobs(policy_path, sampled_file_paths)
111+
sampled_modules = random.sample(self._module_specs, k=self._num_modules)
112+
results = self._schedule_jobs(policy_path, sampled_modules)
113113

114114
def wait_for_termination():
115115
early_exit = self._exit_checker_ctor(num_modules=self._num_modules)
@@ -121,12 +121,12 @@ def get_num_finished_work():
121121
return early_exit.wait(get_num_finished_work)
122122

123123
wait_seconds = wait_for_termination()
124-
self._current_work = list(zip(sampled_file_paths, results))
124+
self._current_work = list(zip(sampled_modules, results))
125125
finished_work = [
126-
(paths, res) for paths, res in self._current_work if res.done()
126+
(spec, res) for spec, res in self._current_work if res.done()
127127
]
128-
successful_work = [(paths, res.result())
129-
for paths, res in finished_work
128+
successful_work = [(spec, res.result())
129+
for spec, res in finished_work
130130
if not worker.get_exception(res)]
131131
failures = len(finished_work) - len(successful_work)
132132

@@ -149,10 +149,8 @@ def wrapup():
149149
itertools.chain.from_iterable(
150150
[res.serialized_sequence_examples for (_, res) in successful_work]))
151151
total_trajectory_length = sum(res.length for (_, res) in successful_work)
152-
self._reward_stat_map.update({
153-
'-'.join(file_paths): res.reward_stats
154-
for (file_paths, res) in successful_work
155-
})
152+
self._reward_stat_map.update(
153+
{spec.name: res.reward_stats for (spec, res) in successful_work})
156154

157155
monitor_dict = {}
158156
monitor_dict['default'] = {

compiler_opt/rl/local_data_collector_test.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from compiler_opt.distributed.local.local_worker_manager import LocalWorkerPool
2727
from compiler_opt.rl import compilation_runner
28+
from compiler_opt.rl import corpus
2829
from compiler_opt.rl import data_collector
2930
from compiler_opt.rl import local_data_collector
3031

@@ -46,8 +47,8 @@ def _get_sequence_example(feature_value):
4647
return text_format.Parse(sequence_example_text, tf.train.SequenceExample())
4748

4849

49-
def mock_collect_data(file_paths, tf_policy_dir, reward_stat):
50-
assert file_paths == ('a', 'b')
50+
def mock_collect_data(module_spec, tf_policy_dir, reward_stat):
51+
assert module_spec.name == 'dummy'
5152
assert tf_policy_dir == 'policy'
5253
assert reward_stat is None or reward_stat == {
5354
'default':
@@ -79,8 +80,8 @@ def mock_collect_data(file_paths, tf_policy_dir, reward_stat):
7980
class Sleeper(compilation_runner.CompilationRunner):
8081
"""Test CompilationRunner that just sleeps."""
8182

82-
def collect_data(self, file_paths, tf_policy_path, reward_stat):
83-
_ = file_paths, tf_policy_path, reward_stat
83+
def collect_data(self, module_spec, tf_policy_path, reward_stat):
84+
_ = module_spec, tf_policy_path, reward_stat
8485
compilation_runner.start_cancellable_process(['sleep', '3600s'], 3600,
8586
self._cancellation_manager)
8687

@@ -114,7 +115,7 @@ def _test_iterator_fn(data_list):
114115

115116
with LocalWorkerPool(worker_class=MyRunner, count=4) as lwp:
116117
collector = local_data_collector.LocalDataCollector(
117-
file_paths=tuple([('a', 'b')] * 100),
118+
module_specs=[corpus.ModuleSpec(name='dummy')] * 100,
118119
num_modules=9,
119120
worker_pool=lwp,
120121
parser=create_test_iterator_fn(),
@@ -175,7 +176,7 @@ def wait(self, _):
175176

176177
with LocalWorkerPool(worker_class=Sleeper, count=4) as lwp:
177178
collector = local_data_collector.LocalDataCollector(
178-
file_paths=tuple([('a', 'b')] * 200),
179+
module_specs=[corpus.ModuleSpec(name='dummy')] * 200,
179180
num_modules=4,
180181
worker_pool=lwp,
181182
parser=parser,

0 commit comments

Comments
 (0)