-
Notifications
You must be signed in to change notification settings - Fork 0
Running spark jobs on Google Dataproc with ML_DSL
Anna Safonova edited this page Jun 29, 2020
·
2 revisions
There are two ways to run spark jobs on a Dataproc: using API or jupyter magic functions.
Here is an example of how to use ml-dsl API for data processing.
from com.griddynamics.dsl.ml.executors.executors import DataProcExecutor
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.jobs.builder import JobBuilder
from com.griddynamics.dsl.ml.sessions import SessionFactory
from com.griddynamics.dsl.ml.settings.description import Platform
define Profile instance for pyspark job
profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',
region='global', job_prefix='test_job',
root_path='scripts', project='test_project',
ai_region='us-central1', job_async=False)
name of main python script
script_name = 'test_job.py'
additional files for spark job
profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']
job properties
profile.properties={"spark.executor.cores":"1",
"spark.executor.memory":"4G"}
job arguments
profile.args = {'--data_path': 'gs://test_bucket/data'}
define job builder class instance
builder = JobBuilder(Platform.GCP)
instance of Session class for dataproc JobController client and build job description
session = sessions.SessionFactory(platform=Platform.GCP)
.build_session(job_bucket=profile.bucket,
job_region=profile.region,
cluster=profile.cluster,
job_project_id=profile.project,
ml_region=profile.ai_region)
job = builder.files_root(root_path)
.job_file(script_name)
.job_id(job_name)
.build_job(profile, Platform.GCP)
Executor instance for submit job to Dataproc cluster
executor = executors.executors.DataprocExecutor(job, session)
executor.submit_job(run_async=prf.job_async)
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.settings.description import Platform
define Profile for pyspark job
profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',
region='global', job_prefix='test_job',
root_path='scripts', project='test_project',
ai_region='us-central1', job_async=False)
additional files for spark job
profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']
job properties
profile.properties={"spark.executor.cores":"1",
"spark.executor.memory":"4G"}
job arguments
profile.args = {'--data_path': 'gs://test_bucket/data'}
set profile
PySparkJobProfile.set('DslTestJobProfile', profile)
Open or load task script using magic functions %py_script, %py_script_open or %py_load:
%%py_script --name test_word_count.py --path scripts --data_path data/data.txt -o output
#!/usr/bin/python
from operator import add
from pyspark import SparkContext
from pyspark.sql import SparkSession
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--path_to_book', help='s3 path to book')
parser.add_argument('--path_to_save', help='s3 path to save')
parser.add_argument('--output_path', help='s3 path to save')
args = parser.parse_known_args()
sc = SparkContext(appName="word_count").getOrCreate()
lines = sc.textFile(args[0].path_to_book)
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
words_path = '{}/'.format(args[0].output_path)
counts.saveAsTextFile(words_path)
print('Word counts to: "{}"'.format(words_path))
sc.stop()
Start job using magic function %py_data:
%py_data -n test_word_count.py -p DslTestJobProfile -pm Platform.GCP