Skip to content

Commit 07a3d18

Browse files
authored
Merge pull request #411 from ldbc/emr-tweaks
Add option to control number of nodes & partitions
2 parents 2337b27 + bb21709 commit 07a3d18

File tree

1 file changed

+37
-21
lines changed

1 file changed

+37
-21
lines changed

tools/emr/submit_datagen_job.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,23 @@
99
import __main__
1010

1111
from math import ceil
12-
from botocore.credentials import subprocess
1312
from datagen import lib, util
14-
import subprocess
1513

1614
import argparse
1715

1816
from datagen.util import KeyValue, split_passthrough_args
1917

2018
min_num_workers = 1
2119
max_num_workers = 1000
20+
min_num_threads = 1
2221

2322
defaults = {
2423
'bucket': 'ldbc-snb-datagen-store',
2524
'use_spot': True,
2625
'master_instance_type': 'r6gd.2xlarge',
27-
'instance_type': 'r6gd.4xlarge',
28-
'sf_ratio': 100.0, # ratio of SFs and machines. a ratio of 250.0 for SF1000 yields 4 machines
26+
'instance_type': 'i3.4xlarge',
27+
'sf_per_executor': 3e3,
28+
'sf_per_partition': 10,
2929
'az': 'us-west-2c',
3030
'yes': False,
3131
'ec2_key': None,
@@ -43,15 +43,6 @@
4343
ec2_instances = [dict(row) for row in reader]
4444

4545

46-
def calculate_cluster_config(scale_factor, sf_ratio, vcpu):
47-
num_workers = max(min_num_workers, min(max_num_workers, ceil(scale_factor / sf_ratio)))
48-
num_threads = ceil(num_workers * vcpu * 2)
49-
return {
50-
'num_workers': num_workers,
51-
'num_threads': num_threads
52-
}
53-
54-
5546
def get_instance_info(instance_type):
5647
def parse_vcpu(col):
5748
return int(re.search(r'(\d+) .*', col).group(1))
@@ -76,7 +67,10 @@ def submit_datagen_job(name,
7667
jar,
7768
use_spot,
7869
instance_type,
79-
sf_ratio,
70+
executors,
71+
sf_per_executor,
72+
partitions,
73+
sf_per_partition,
8074
master_instance_type,
8175
az,
8276
emr_release,
@@ -97,10 +91,6 @@ def submit_datagen_job(name,
9791
else:
9892
copy_filter = f'.*{build_dir}/{copy_filter}'
9993

100-
exec_info = get_instance_info(instance_type)
101-
102-
cluster_config = calculate_cluster_config(sf, sf_ratio, exec_info['vcpu'])
103-
10494
emr = boto3.client('emr')
10595

10696
ts = datetime.utcnow()
@@ -115,8 +105,15 @@ def submit_datagen_job(name,
115105
'maximizeResourceAllocation': 'true'
116106
}
117107

108+
if executors is None:
109+
executors = max(min_num_workers, min(max_num_workers, ceil(sf / sf_per_executor)))
110+
111+
if partitions is None:
112+
partitions = max(min_num_threads, ceil(sf / sf_per_partition))
113+
118114
spark_defaults_config = {
119115
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
116+
'spark.default.parallelism': str(partitions),
120117
**(dict(conf) if conf else {})
121118
}
122119

@@ -157,7 +154,7 @@ def submit_datagen_job(name,
157154
'Market': market,
158155
'InstanceRole': 'CORE',
159156
'InstanceType': instance_type,
160-
'InstanceCount': cluster_config['num_workers'],
157+
'InstanceCount': executors,
161158
}
162159
],
163160
**ec2_key_dict,
@@ -178,7 +175,7 @@ def submit_datagen_job(name,
178175
'Args': ['spark-submit', '--class', lib.main_class, jar_url,
179176
'--output-dir', build_dir,
180177
'--scale-factor', str(sf),
181-
'--num-threads', str(cluster_config['num_threads']),
178+
'--num-threads', str(partitions),
182179
'--mode', mode,
183180
'--format', format,
184181
*passthrough_args
@@ -263,6 +260,26 @@ def submit_datagen_job(name,
263260
nargs='+',
264261
action=KeyValue,
265262
help="SparkConf as key=value pairs")
263+
executor_args=parser.add_mutually_exclusive_group()
264+
executor_args.add_argument("--executors",
265+
type=int,
266+
help=f"Total number of Spark executors."
267+
)
268+
executor_args.add_argument("--sf-per-executor",
269+
type=float,
270+
default=defaults['sf_per_executor'],
271+
help=f"Number of scale factors per Spark executor. Default: {defaults['sf_per_executor']}"
272+
)
273+
partitioning_args = parser.add_mutually_exclusive_group()
274+
partitioning_args.add_argument("--partitions",
275+
type=int,
276+
help=f"Total number of Spark partitions to use when generating the dataset."
277+
)
278+
partitioning_args.add_argument("--sf-per-partition",
279+
type=float,
280+
default=defaults['sf_per_partition'],
281+
help=f"Number of scale factors per Spark partitions. Default: {defaults['sf_per_partition']}"
282+
)
266283

267284
parser.add_argument('--', nargs='*', help='Arguments passed to LDBC SNB Datagen', dest="arg")
268285

@@ -271,6 +288,5 @@ def submit_datagen_job(name,
271288
args = parser.parse_args(self_args)
272289

273290
submit_datagen_job(passthrough_args=passthrough_args,
274-
sf_ratio=defaults['sf_ratio'],
275291
master_instance_type=defaults['master_instance_type'],
276292
**args.__dict__)

0 commit comments

Comments
 (0)