Skip to content

Running spark jobs on Google Dataproc with ML_DSL

Anna Safonova edited this page Jun 29, 2020 · 2 revisions

There are two ways to run spark jobs on a Dataproc: using API or jupyter magic functions.

Using API

Here is an example of how to use ml-dsl API for data processing.

from com.griddynamics.dsl.ml.executors.executors import DataProcExecutor
from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.jobs.builder import JobBuilder
from com.griddynamics.dsl.ml.sessions import SessionFactory
from com.griddynamics.dsl.ml.settings.description import Platform

define Profile instance for pyspark job

profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',      
                            region='global', job_prefix='test_job',  
                            root_path='scripts', project='test_project', 
                            ai_region='us-central1', job_async=False)

name of main python script

script_name = 'test_job.py'

additional files for spark job

profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']

job properties

profile.properties={"spark.executor.cores":"1",       
                    "spark.executor.memory":"4G"}

job arguments

profile.args = {'--data_path': 'gs://test_bucket/data'}

define job builder class instance

builder = JobBuilder(Platform.GCP)

instance of Session class for dataproc JobController client and build job description

session = sessions.SessionFactory(platform=Platform.GCP)
                       .build_session(job_bucket=profile.bucket,  
                                      job_region=profile.region, 
                                      cluster=profile.cluster,  
                                      job_project_id=profile.project, 
                                      ml_region=profile.ai_region)
job = builder.files_root(root_path)
             .job_file(script_name)
             .job_id(job_name)
             .build_job(profile, Platform.GCP)

Executor instance for submit job to Dataproc cluster

executor = executors.executors.DataprocExecutor(job, session)
executor.submit_job(run_async=prf.job_async)

Using Magic Functions

from com.griddynamics.dsl.ml.settings.profiles import PySparkJobProfile
from com.griddynamics.dsl.ml.settings.description import Platform

define Profile for pyspark job

profile = PySparkJobProfile(bucket='test_bucket',cluster='test_cluster',      
                            region='global', job_prefix='test_job',  
                            root_path='scripts', project='test_project', 
                            ai_region='us-central1', job_async=False)

additional files for spark job

profile.py_files = ['py_file1.py', ..., 'py_filen.py']
profile.jars = ['jar1.py', ..., 'jarn.py']

job properties

profile.properties={"spark.executor.cores":"1",       
                "spark.executor.memory":"4G"}

job arguments

profile.args = {'--data_path': 'gs://test_bucket/data'}

set profile and platform

PySparkJobProfile.set('DslTestJobProfile', profile)
platform = Platform.GCP

Open or load task script using magic functions %py_script, %py_script_open or %py_load:

%%py_script --name test_word_count.py --path scripts --data_path data/data.txt -o output
#!/usr/bin/python
from operator import add
from pyspark import SparkContext
from pyspark.sql import SparkSession
import argparse

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--path_to_book', help='s3 path to book')
    parser.add_argument('--path_to_save', help='s3 path to save')
    parser.add_argument('--output_path', help='s3 path to save')
    args = parser.parse_known_args()
    sc = SparkContext(appName="word_count").getOrCreate()
    lines = sc.textFile(args[0].path_to_book)
    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
    words_path = '{}/'.format(args[0].output_path)
    counts.saveAsTextFile(words_path)
    print('Word counts to: "{}"'.format(words_path))
    sc.stop()

Start job using magic function %py_data:

%py_data -n test_word_count.py -p DslTestJobProfile -pm $platform
Clone this wiki locally