Skip to content

Commit 5d70992

Browse files
author
Vasileios Karakasis
authored
Merge pull request #2214 from teojgo/dummy/spark
[testlib] Add a library test for Apache Spark
2 parents a048a99 + 66eda64 commit 5d70992

File tree

3 files changed

+141
-52
lines changed

3 files changed

+141
-52
lines changed

cscs-checks/apps/spark/spark_check.py

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,61 +3,22 @@
33
#
44
# SPDX-License-Identifier: BSD-3-Clause
55

6-
import math
7-
86
import reframe as rfm
9-
import reframe.utility.sanity as sn
10-
from reframe.core.backends import getlauncher
117

8+
from hpctestlib.data_analytics.spark.compute_pi import compute_pi_check
129

13-
@rfm.simple_test
14-
class SparkCheck(rfm.RunOnlyRegressionTest):
15-
variant = parameter(['spark', 'pyspark'])
1610

17-
def __init__(self):
18-
self.descr = f'Simple calculation of pi with {self.variant}'
19-
self.valid_systems = ['daint:gpu', 'daint:mc',
20-
'dom:gpu', 'dom:mc']
21-
self.valid_prog_environs = ['builtin']
22-
self.modules = ['Spark']
23-
self.prerun_cmds = ['start-all.sh']
24-
self.postrun_cmds = ['stop-all.sh']
25-
self.num_tasks = 3
26-
self.num_tasks_per_node = 1
27-
pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)',
28-
self.stdout, 'pi', float)
29-
self.sanity_patterns = sn.assert_lt(sn.abs(pi_value - math.pi), 0.01)
30-
self.maintainers = ['TM', 'RS']
31-
self.tags = {'production'}
11+
@rfm.simple_test
12+
class cscs_compute_pi_check(compute_pi_check):
13+
valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc']
14+
valid_prog_environs = ['builtin']
15+
modules = ['Spark']
16+
spark_prefix = '$EBROOTSPARK'
17+
executor_memory = '15g'
18+
maintainers = ['TM', 'RS']
19+
tags |= {'production'}
3220

3321
@run_before('run')
34-
def prepare_run(self):
35-
if self.current_partition.fullname in ['daint:gpu', 'dom:gpu']:
36-
num_workers = 12
37-
exec_cores = 3
38-
else:
39-
num_workers = 36
40-
exec_cores = 9
41-
42-
self.variables = {
43-
'SPARK_WORKER_CORES': str(num_workers),
44-
'SPARK_LOCAL_DIRS': '"/tmp"',
45-
}
46-
self.executable = 'spark-submit'
47-
self.executable_opts = [
48-
f'--conf spark.default.parallelism={num_workers}',
49-
f'--conf spark.executor.cores={exec_cores}',
50-
f'--conf spark.executor.memory=15g',
51-
f'--master $SPARKURL'
52-
]
53-
if self.variant == 'spark':
54-
self.executable_opts += [
55-
'--class org.apache.spark.examples.SparkPi',
56-
'$EBROOTSPARK/examples/jars/spark-examples*.jar 10000'
57-
]
58-
else:
59-
self.executable_opts.append('spark_pi.py')
60-
61-
# The job launcher has to be changed since the `spark-submit`
62-
# script is not used with srun.
63-
self.job.launcher = getlauncher('local')()
22+
def set_num_workers_and_cores(self):
23+
self.num_workers = self.current_partition.processor.num_cores
24+
self.exec_cores = self.num_workers // 4
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
2+
# ReFrame Project Developers. See the top-level LICENSE file for details.
3+
#
4+
# SPDX-License-Identifier: BSD-3-Clause
5+
6+
import math
7+
8+
9+
import reframe as rfm
10+
import reframe.utility.sanity as sn
11+
12+
from reframe.core.backends import getlauncher
13+
14+
15+
@rfm.simple_test
16+
class compute_pi_check(rfm.RunOnlyRegressionTest, pin_prefix=True):
17+
'''Test Apache Spark by computing PI.
18+
19+
Apache Spark is a unified analytics engine for large-scale data
20+
processing. It provides high-level APIs in Java, Scala, Python
21+
and R, and an optimized engine that supports general execution
22+
graphs. It also supports a rich set of higher-level tools including
23+
Spark SQL for SQL and structured data processing, MLlib for machine
24+
learning, GraphX for graph processing, and Structured Streaming for
25+
incremental computation and stream processing (see spark.apache.org).
26+
27+
This test checks that Spark is functioning correctly. To do this, it is
28+
necessary to define the tolerance of acceptable deviation. The tolerance
29+
is used to check that the computations are executed correctly, by
30+
comparing the value of pi calculated to the one obtained from the math
31+
library. The default assumption is that Spark is already installed on the
32+
system under test.
33+
34+
'''
35+
36+
#: Parameter encoding the variant of the test.
37+
#:
38+
#: :type:`str`
39+
#: :values: ``['spark', 'pyspark']``
40+
variant = parameter(['spark', 'pyspark'])
41+
42+
#: The absolute tolerance of the computed value of PI
43+
#:
44+
#: :type: :class:`float`
45+
#: :required: No
46+
#: :default: `0.01`
47+
tolerance = variable(float, value=0.01)
48+
49+
#: The Spark installation prefix path
50+
#:
51+
#: :type: :class:`str`
52+
#: :required: Yes
53+
spark_prefix = variable(str)
54+
55+
#: The local directories used by Spark
56+
#:
57+
#: :type: :class:`str`
58+
#: :required: No
59+
#: :default: `'/tmp'`
60+
spark_local_dirs = variable(str, value='/tmp')
61+
62+
#: Amount of memory to use per executor process, following the JVM memory
63+
#: strings convention, i.e a number with a size unit suffix
64+
#: ("k", "m", "g" or "t") (e.g. 512m, 2g)
65+
#:
66+
#: :type: :class:`str`
67+
#: :required: Yes
68+
executor_memory = variable(str)
69+
70+
#: The number of Spark workers per node
71+
#:
72+
#: :type: :class:`int`
73+
#: :required: No
74+
#: :default: `1`
75+
num_workers = variable(int, value=1)
76+
77+
#: The number of cores per each Spark executor
78+
#:
79+
#: :type: :class:`int`
80+
#: :required: No
81+
#: :default: `1`
82+
exec_cores = variable(int, value=1)
83+
84+
num_tasks = 3
85+
num_tasks_per_node = 1
86+
prerun_cmds = ['start-all.sh']
87+
postrun_cmds = ['stop-all.sh']
88+
executable = 'spark-submit'
89+
executable_opts = required
90+
tags = {'data-science', 'big-data'}
91+
92+
@run_after('init')
93+
def set_description(self):
94+
self.mydescr = f'Simple calculation of pi with {self.variant}'
95+
96+
@run_before('run')
97+
def set_job_launcher(self):
98+
# The job launcher has to be changed since the `spark-submit`
99+
# script is not used with srun.
100+
self.job.launcher = getlauncher('local')()
101+
102+
@run_before('run')
103+
def prepare_run(self):
104+
self.variables = {
105+
'SPARK_WORKER_CORES': str(self.num_workers),
106+
'SPARK_LOCAL_DIRS': self.spark_local_dirs,
107+
}
108+
self.executable_opts = [
109+
f'--conf spark.default.parallelism={self.num_workers}',
110+
f'--conf spark.executor.cores={self.exec_cores}',
111+
f'--conf spark.executor.memory={self.executor_memory}',
112+
f'--master $SPARKURL'
113+
]
114+
if self.variant == 'spark':
115+
self.executable_opts += [
116+
f'--class org.apache.spark.examples.SparkPi',
117+
f'{self.spark_prefix}/examples/jars/spark-examples*.jar 10000'
118+
]
119+
elif self.variant == 'pyspark':
120+
self.executable_opts += ['spark_pi.py']
121+
122+
@sanity_function
123+
def assert_pi_readout(self):
124+
'''Assert that the obtained pi value meets the specified tolerances.'''
125+
126+
pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)',
127+
self.stdout, 'pi', float)
128+
return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance)

0 commit comments

Comments
 (0)