Skip to content

Simple parallel runner #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions examples/cuda/sepconv_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
import numpy
from kernel_tuner import tune_kernel
from collections import OrderedDict


def tune():
with open("convolution.cu", "r") as f:
kernel_string = f.read()

# setup tunable parameters
tune_params = OrderedDict()
tune_params["filter_height"] = [i for i in range(3, 19, 2)]
tune_params["filter_width"] = [i for i in range(3, 19, 2)]
tune_params["block_size_x"] = [16 * i for i in range(1, 65)]
tune_params["block_size_y"] = [2**i for i in range(6)]
tune_params["tile_size_x"] = [i for i in range(1, 11)]
tune_params["tile_size_y"] = [i for i in range(1, 11)]

tune_params["use_padding"] = [0, 1] # toggle the insertion of padding in shared memory
tune_params["read_only"] = [0, 1] # toggle using the read-only cache

# limit the search to only use padding when its effective, and at least 32 threads in a block
restrict = ["use_padding==0 or (block_size_x % 32 != 0)", "block_size_x*block_size_y >= 32"]

# setup input and output dimensions
problem_size = (4096, 4096)
size = numpy.prod(problem_size)
largest_fh = max(tune_params["filter_height"])
largest_fw = max(tune_params["filter_width"])
input_size = (problem_size[0] + largest_fw - 1) * (problem_size[1] + largest_fh - 1)

# create input data
output_image = numpy.zeros(size).astype(numpy.float32)
input_image = numpy.random.randn(input_size).astype(numpy.float32)
filter_weights = numpy.random.randn(largest_fh * largest_fw).astype(numpy.float32)

# setup kernel arguments
cmem_args = {"d_filter": filter_weights}
args = [output_image, input_image, filter_weights]

# tell the Kernel Tuner how to compute grid dimensions
grid_div_x = ["block_size_x", "tile_size_x"]
grid_div_y = ["block_size_y", "tile_size_y"]

# start tuning separable convolution (row)
tune_params["filter_height"] = [1]
tune_params["tile_size_y"] = [1]
results_row = tune_kernel(
"convolution_kernel",
kernel_string,
problem_size,
args,
tune_params,
grid_div_y=grid_div_y,
grid_div_x=grid_div_x,
cmem_args=cmem_args,
verbose=False,
restrictions=restrict,
parallel_runner=1024,
cache="convolution_kernel_row",
)

# start tuning separable convolution (col)
tune_params["filter_height"] = tune_params["filter_width"][:]
tune_params["file_size_y"] = tune_params["tile_size_x"][:]
tune_params["filter_width"] = [1]
tune_params["tile_size_x"] = [1]
results_col = tune_kernel(
"convolution_kernel",
kernel_string,
problem_size,
args,
tune_params,
grid_div_y=grid_div_y,
grid_div_x=grid_div_x,
cmem_args=cmem_args,
verbose=False,
restrictions=restrict,
parallel_runner=1024,
cache="convolution_kernel_col",
)

return results_row, results_col


if __name__ == "__main__":
results_row, results_col = tune()
35 changes: 35 additions & 0 deletions examples/cuda/vector_add_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python

import numpy
from kernel_tuner import tune_kernel


def tune():
kernel_string = """
__global__ void vector_add(float *c, float *a, float *b, int n) {
int i = (blockIdx.x * block_size_x) + threadIdx.x;
if ( i < n ) {
c[i] = a[i] + b[i];
}
}
"""

size = 10000000

a = numpy.random.randn(size).astype(numpy.float32)
b = numpy.random.randn(size).astype(numpy.float32)
c = numpy.zeros_like(b)
n = numpy.int32(size)

args = [c, a, b, n]

tune_params = dict()
tune_params["block_size_x"] = [32 * i for i in range(1, 33)]

results, env = tune_kernel("vector_add", kernel_string, size, args, tune_params, parallel_runner=4)

return results


if __name__ == "__main__":
tune()
14 changes: 13 additions & 1 deletion kernel_tuner/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import kernel_tuner.util as util
from kernel_tuner.file_utils import get_input_file, get_t4_metadata, get_t4_results
from kernel_tuner.integration import get_objective_defaults
from kernel_tuner.runners.parallel import ParallelRunner
from kernel_tuner.runners.sequential import SequentialRunner
from kernel_tuner.runners.simulation import SimulationRunner
from kernel_tuner.searchspace import Searchspace
Expand Down Expand Up @@ -468,6 +469,7 @@ def __deepcopy__(self, _):
),
("metrics", ("specifies user-defined metrics, please see :ref:`metrics`.", "dict")),
("simulation_mode", ("Simulate an auto-tuning search from an existing cachefile", "bool")),
("parallel_runner", ("If the value is larger than 1 use that number as the number of parallel runners doing the tuning", "int")),
("observers", ("""A list of Observers to use during tuning, please see :ref:`observers`.""", "list")),
]
)
Expand Down Expand Up @@ -579,6 +581,7 @@ def tune_kernel(
cache=None,
metrics=None,
simulation_mode=False,
parallel_runner=1,
observers=None,
objective=None,
objective_higher_is_better=None,
Expand All @@ -605,6 +608,8 @@ def tune_kernel(

if iterations < 1:
raise ValueError("Iterations should be at least one!")
if parallel_runner < 1:
logging.warning("The number of parallel runners should be at least one!")

# sort all the options into separate dicts
opts = locals()
Expand Down Expand Up @@ -663,7 +668,14 @@ def tune_kernel(
strategy = brute_force

# select the runner for this job based on input
selected_runner = SimulationRunner if simulation_mode else SequentialRunner
# TODO: we could use the "match case" syntax when removing support for 3.9
if simulation_mode:
selected_runner = SimulationRunner
elif parallel_runner > 1:
selected_runner = ParallelRunner
tuning_options.parallel_runner = parallel_runner
else:
selected_runner = SequentialRunner
tuning_options.simulated_time = 0
runner = selected_runner(kernelsource, kernel_options, device_options, iterations, observers)

Expand Down
173 changes: 173 additions & 0 deletions kernel_tuner/runners/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""A specialized runner that tunes in parallel the parameter space."""
import logging
from time import perf_counter
from datetime import datetime, timezone
from itertools import chain

from ray import remote, get, put

from kernel_tuner.runners.runner import Runner
from kernel_tuner.core import DeviceInterface
from kernel_tuner.util import ErrorConfig, print_config_output, process_metrics, store_cache


class ParallelRunnerState:
"""This class represents the state of a parallel tuning run."""

def __init__(self, observers, iterations):
self.device_options = None
self.quiet = False
self.kernel_source = None
self.warmed_up = False
self.simulation_mode = False
self.start_time = None
self.last_strategy_start_time = None
self.last_strategy_time = 0
self.kernel_options = None
self.observers = observers
self.iterations = iterations


@remote(num_cpus=1, num_gpus=1)
def parallel_run(task_id: int, state: ParallelRunnerState, parameter_space, tuning_options):
dev = DeviceInterface(
state.kernel_source, iterations=state.iterations, observers=state.observers, **state.device_options
)
# move data to the GPU
gpu_args = dev.ready_argument_list(state.kernel_options.arguments)
# iterate over parameter space
results = []
elements_per_task = int(len(parameter_space) / tuning_options.parallel_runner)
first_element = int(task_id * elements_per_task)
last_element = int(
(task_id + 1) * elements_per_task if task_id + 1 < tuning_options.parallel_runner else len(parameter_space)
)
for element in parameter_space[first_element:last_element]:
params = dict(zip(tuning_options.tune_params.keys(), element))

result = None
warmup_time = 0

# check if configuration is in the cache
x_int = ",".join([str(i) for i in element])
if tuning_options.cache and x_int in tuning_options.cache:
params.update(tuning_options.cache[x_int])
params["compile_time"] = 0
params["verification_time"] = 0
params["benchmark_time"] = 0
else:
# attempt to warm up the GPU by running the first config in the parameter space and ignoring the result
if not state.warmed_up:
warmup_time = perf_counter()
dev.compile_and_benchmark(state.kernel_source, gpu_args, params, state.kernel_options, tuning_options)
state.warmed_up = True
warmup_time = 1e3 * (perf_counter() - warmup_time)

result = dev.compile_and_benchmark(
state.kernel_source, gpu_args, params, state.kernel_options, tuning_options
)

params.update(result)

if tuning_options.objective in result and isinstance(result[tuning_options.objective], ErrorConfig):
logging.debug("kernel configuration was skipped silently due to compile or runtime failure")

# only compute metrics on configs that have not errored
if tuning_options.metrics and not isinstance(params.get(tuning_options.objective), ErrorConfig):
params = process_metrics(params, tuning_options.metrics)

# get the framework time by estimating based on other times
total_time = 1000 * ((perf_counter() - state.start_time) - warmup_time)
params["strategy_time"] = state.last_strategy_time
params["framework_time"] = max(
total_time
- (
params["compile_time"]
+ params["verification_time"]
+ params["benchmark_time"]
+ params["strategy_time"]
),
0,
)
params["timestamp"] = str(datetime.now(timezone.utc))
state.start_time = perf_counter()

if result:
# print configuration to the console
print_config_output(tuning_options.tune_params, params, state.quiet, tuning_options.metrics, dev.units)

# add configuration to cache
store_cache(x_int, params, tuning_options)

# all visited configurations are added to results to provide a trace for optimization strategies
results.append(params)

return results


class ParallelRunner(Runner):
"""ParallelRunner is used to distribute configurations across multiple nodes."""

def __init__(self, kernel_source, kernel_options, device_options, iterations, observers):
"""Instantiate the ParallelRunner.

:param kernel_source: The kernel source
:type kernel_source: kernel_tuner.core.KernelSource

:param kernel_options: A dictionary with all options for the kernel.
:type kernel_options: kernel_tuner.interface.Options

:param device_options: A dictionary with all options for the device
on which the kernel should be tuned.
:type device_options: kernel_tuner.interface.Options

:param iterations: The number of iterations used for benchmarking
each kernel instance.
:type iterations: int
"""
self.state = ParallelRunnerState(observers, iterations)
self.state.device_options = device_options
self.state.quiet = device_options.quiet
self.state.kernel_source = kernel_source
self.state.warmed_up = False
self.state.simulation_mode = False
self.state.start_time = perf_counter()
self.state.last_strategy_start_time = self.state.start_time
self.state.last_strategy_time = 0
self.state.kernel_options = kernel_options
# fields used directly by strategies
self.last_strategy_time = perf_counter()
self.state.last_strategy_start_time = self.last_strategy_time
# define a dummy device interface on the master node
self.dev = DeviceInterface(kernel_source)

def get_environment(self, tuning_options):
# dummy environment
return self.dev.get_environment()

def run(self, parameter_space, tuning_options):
"""Iterate through the entire parameter space using a single Python process.

:param parameter_space: The parameter space as an iterable.
:type parameter_space: iterable

:param tuning_options: A dictionary with all options regarding the tuning process.
:type tuning_options: kernel_tuner.interface.Options

:returns: A list of dictionaries for executed kernel configurations and their execution times.
:rtype: dict()
"""
# given the parameter_space, distribute it over Ray tasks
logging.debug("parallel runner started for " + self.state.kernel_options.kernel_name)

results = []
tasks = []
parameter_space_ref = put(parameter_space)
state_ref = put(self.state)
tuning_options_ref = put(tuning_options)
for task_id in range(0, tuning_options.parallel_runner):
tasks.append(parallel_run.remote(task_id, state_ref, parameter_space_ref, tuning_options_ref))
for task in tasks:
results.append(get(task))

return list(chain.from_iterable(results))
2 changes: 1 addition & 1 deletion kernel_tuner/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(
pass

@abstractmethod
def get_environment(self):
def get_environment(self, tuning_options):
pass

@abstractmethod
Expand Down
Loading
Loading