Skip to content

Commit 54f5cc7

Browse files
authored
Merge pull request #329 from ldbc/s3distcp-filter
Add filtering to s3distcp
2 parents 21b4070 + d75e518 commit 54f5cc7

File tree

8 files changed

+79
-78
lines changed

8 files changed

+79
-78
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ E.g. with [pyenv](https://github.com/pyenv/pyenv) and [pyenv-virtualenv](https:/
4747
```bash
4848
pyenv install 3.7.7
4949
pyenv virtualenv 3.7.7 ldbc_datagen_tools
50-
echo "3.7.7/envs/ldbc_datagen_tools" > .python-version
51-
pip install --user -U pip -r tools/requirements.txt
50+
pyenv local ldbc_datagen_tools
51+
pip install -U pip
52+
pip install ./tools
5253
```
5354
### Running locally
5455

tools/datagen/util.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ def __call__(self, parser, namespace, values, option_string=None):
88

99
for value in values:
1010
# split it into key and value
11-
key, value = value.split('=')
11+
key, value = value.split('=', maxsplit=1)
1212
# assign into dictionary
13-
getattr(namespace, self.dest)[key] = value
13+
getattr(namespace, self.dest)[key.strip()] = value
1414

1515

1616
def ask_continue(message):

tools/emr/README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ In AWS IAM, add the following roles with **Create Role** | **AWS service** | **E
2424

2525
## Install the required libraries
2626

27-
1. From the repository root, run:
27+
Make sure you use pip 21.1 or newer.
28+
29+
1. From `tools`, run:
2830

2931
```
30-
pip install -r tools/requirements.txt
32+
pip install -e .
3133
```
3234

3335
1. Package the JAR. Make sure you use Java 8:
@@ -50,7 +52,7 @@ aws s3 cp target/ldbc_snb_datagen_${PLATFORM_VERSION}-${VERSION}-jar-with-depend
5052
```bash
5153
JOB_NAME=MyTest
5254
SCALE_FACTOR=10
53-
./tools/emr/submit_datagen_job.py --bucket ${BUCKET_NAME} ${JOB_NAME} ${SCALE_FACTOR} -- --format csv --mode raw
55+
./tools/emr/submit_datagen_job.py --bucket ${BUCKET_NAME} ${JOB_NAME} ${SCALE_FACTOR} csv raw
5456
```
5557

5658
Note: scale factors below 1 are not supported.
@@ -60,7 +62,7 @@ Note: scale factors below 1 are not supported.
6062
To use spot instances, add the `--use-spot` argument:
6163

6264
```bash
63-
./tools/emr/submit_datagen_job.py --use-spot --bucket ${BUCKET_NAME} ${JOB_NAME} ${SCALE_FACTOR} -- --format csv --mode raw
65+
./tools/emr/submit_datagen_job.py --use-spot --bucket ${BUCKET_NAME} ${JOB_NAME} ${SCALE_FACTOR} csv raw
6466
```
6567

6668
### Using a different EMR version
@@ -70,7 +72,7 @@ Make sure you uploaded the right JAR first!
7072

7173
```bash
7274
PLATFORM_VERSION=2.12_spark3.1
73-
./tools/emr/submit_datagen_job.py --bucket ${BUCKET_NAME} --platform-version ${PLATFORM_VERSION} --emr-release emr-6.3.0 ${JOB_NAME} ${SCALE_FACTOR} -- --format csv --mode raw
75+
./tools/emr/submit_datagen_job.py --bucket ${BUCKET_NAME} --platform-version ${PLATFORM_VERSION} --emr-release emr-6.3.0 ${JOB_NAME} ${SCALE_FACTOR} csv raw
7476
```
7577

7678
### Using a parameter file

tools/emr/submit_datagen_job.py

Lines changed: 54 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import argparse
1515

16-
from datagen.util import split_passthrough_args
16+
from datagen.util import KeyValue, split_passthrough_args
1717

1818
min_num_workers = 1
1919
max_num_workers = 1000
@@ -28,7 +28,7 @@
2828
'platform_version': lib.platform_version,
2929
'version': lib.version,
3030
'az': 'us-west-2c',
31-
'is_interactive': False,
31+
'yes': False,
3232
'ec2_key': None,
3333
'emr_release': 'emr-5.31.0'
3434
}
@@ -70,21 +70,35 @@ def parse_mem(col):
7070
return {'vcpu': vcpu, 'mem': mem}
7171

7272

73-
def submit_datagen_job(name, sf,
74-
bucket=defaults['bucket'],
75-
use_spot=defaults['use_spot'],
76-
instance_type=defaults['instance_type'],
77-
sf_ratio=defaults['sf_ratio'],
78-
master_instance_type=defaults['master_instance_type'],
79-
az=defaults['az'],
80-
emr_release=defaults['emr_release'],
81-
platform_version=defaults['platform_version'],
82-
version=defaults['version'],
83-
is_interactive=defaults['is_interactive'],
84-
ec2_key=defaults['ec2_key'],
85-
passthrough_args=None,
86-
conf=None
73+
def submit_datagen_job(name,
74+
sf,
75+
format,
76+
mode,
77+
bucket,
78+
use_spot,
79+
instance_type,
80+
sf_ratio,
81+
master_instance_type,
82+
az,
83+
emr_release,
84+
platform_version,
85+
version,
86+
yes,
87+
ec2_key,
88+
conf,
89+
copy_filter,
90+
copy_all,
91+
passthrough_args, **kwargs
8792
):
93+
94+
is_interactive = (not yes) and hasattr(__main__, '__file__')
95+
96+
build_dir = '/ldbc_snb_datagen/build'
97+
98+
if not copy_filter:
99+
copy_filter = f'.*{build_dir}/graphs/{format}/{mode}/.*'
100+
else:
101+
copy_filter = f'.*{build_dir}/{copy_filter}'
88102

89103
exec_info = get_instance_info(instance_type)
90104

@@ -103,13 +117,9 @@ def submit_datagen_job(name, sf,
103117
spark_config = {
104118
'maximizeResourceAllocation': 'true',
105119
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
106-
**(conf if conf else {})
120+
**(dict(conf) if conf else {})
107121
}
108122

109-
hdfs_prefix = '/ldbc_snb_datagen'
110-
111-
build_dir = f'{hdfs_prefix}/build'
112-
113123
market = 'SPOT' if use_spot else 'ON_DEMAND'
114124

115125
ec2_key_dict = {'Ec2KeyName': ec2_key} if ec2_key is not None else {}
@@ -165,6 +175,8 @@ def submit_datagen_job(name, sf,
165175
'--output-dir', build_dir,
166176
'--scale-factor', str(sf),
167177
'--num-threads', str(cluster_config['num_threads']),
178+
'--mode', mode,
179+
'--format', format,
168180
*passthrough_args
169181
]
170182
}
@@ -178,7 +190,8 @@ def submit_datagen_job(name, sf,
178190
'Jar': 'command-runner.jar',
179191
'Args': ['s3-dist-cp',
180192
'--src', f'hdfs://{build_dir}',
181-
'--dest', f'{run_url}/social_network'
193+
'--dest', f'{run_url}/social_network',
194+
*(['--srcPattern', copy_filter] if not copy_all else [])
182195
]
183196
}
184197
}]
@@ -191,23 +204,6 @@ def submit_datagen_job(name, sf,
191204

192205
emr.run_job_flow(**job_flow_args)
193206

194-
def parse_var(s):
195-
items = s.split('=')
196-
key = items[0].strip() # we remove blanks around keys, as is logical
197-
if len(items) > 1:
198-
# rejoin the rest:
199-
value = '='.join(items[1:])
200-
return (key, value)
201-
202-
203-
def parse_vars(items):
204-
d = {}
205-
if items:
206-
for item in items:
207-
key, value = parse_var(item)
208-
d[key] = value
209-
return d
210-
211207

212208
if __name__ == "__main__":
213209
parser = argparse.ArgumentParser(description='Submit a Datagen job to EMR')
@@ -216,7 +212,10 @@ def parse_vars(items):
216212
help='name')
217213
parser.add_argument('sf', type=int,
218214
help='scale factor (used to calculate cluster size)')
215+
parser.add_argument('format', type=str, help='the required output format')
216+
parser.add_argument('mode', type=str, help='output mode')
219217
parser.add_argument('--use-spot',
218+
default=defaults['use_spot'],
220219
action='store_true',
221220
help='Use SPOT workers')
222221
parser.add_argument('--az',
@@ -240,33 +239,31 @@ def parse_vars(items):
240239
parser.add_argument('--emr-release',
241240
default=defaults['emr_release'],
242241
help='The EMR release to use. E.g emr-5.31.0, emr-6.1.0')
243-
parser.add_argument('-y',
242+
parser.add_argument('-y', '--yes',
243+
default=defaults['yes'],
244244
action='store_true',
245245
help='Assume \'yes\' for prompts')
246+
copy_args = parser.add_mutually_exclusive_group()
247+
copy_args.add_argument('--copy-filter',
248+
type=str,
249+
help='A regular expression specifying filtering paths to copy from the build dir to S3. '
250+
'By default it is \'graphs/{format}/{mode}/.*\'')
251+
copy_args.add_argument('--copy-all',
252+
action='store_true',
253+
help='Copy the complete build dir to S3')
246254
parser.add_argument("--conf",
247255
metavar="KEY=VALUE",
248256
nargs='+',
257+
type=KeyValue,
249258
help="SparkConf as key=value pairs")
250259

251260
parser.add_argument('--', nargs='*', help='Arguments passed to LDBC SNB Datagen', dest="arg")
252261

253-
254-
self_args, child_args = split_passthrough_args()
262+
self_args, passthrough_args = split_passthrough_args()
255263

256264
args = parser.parse_args(self_args)
257265

258-
conf = parse_vars(args.conf)
259-
260-
is_interactive = hasattr(__main__, '__file__')
261-
262-
submit_datagen_job(args.name, args.sf,
263-
bucket=args.bucket, use_spot=args.use_spot, az=args.az,
264-
is_interactive=is_interactive and not args.y,
265-
instance_type=args.instance_type,
266-
emr_release=args.emr_release,
267-
ec2_key=args.ec2_key,
268-
platform_version=args.platform_version,
269-
version=args.version,
270-
passthrough_args=child_args,
271-
conf=conf
272-
)
266+
submit_datagen_job(passthrough_args=passthrough_args,
267+
sf_ratio=defaults['sf_ratio'],
268+
master_instance_type=defaults['master_instance_type'],
269+
**args.__dict__)

tools/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[build-system]
2+
requires = ["setuptools", "wheel"]

tools/requirements.txt

Lines changed: 0 additions & 5 deletions
This file was deleted.

tools/setup.cfg

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[metadata]
2+
name = ldbc-datagen-tools-common
3+
version = 1.0.0
4+
5+
[options]
6+
packages = find:
7+
install_requires =
8+
boto3
9+
urllib3
10+
chardet
11+
requests

tools/setup.py

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)