COM6012 Scalable Machine Learning 2025 by Shuo Zhou at The University of Sheffield
- Task 1: To finish in the lab session on 18th Feb. Essential
- Task 2: To finish in the lab session on 18th Feb. Essential
- Task 3: To finish in the lab session on 18th Feb. Essential
- Task 4: To finish by the following Monday 23rd Feb. Exercise
- Task 5: To explore further. Optional
- Chapters 5 and 6, and especially Section 9.1 (of Chapter 9) of this PySpark tutorial Learning Apache Spark with Python
- RDD Programming Guide: Most are useful to know in this module.
- Spark SQL, DataFrames and Datasets Guide:
OverviewandGetting Startedrecommended (skipping those without Python example). - Machine Learning Library (MLlib) Guide
- ML Pipelines
- Apache Spark Examples
- Basic Statistics - DataFrame API
- Basic Statistics - RDD API: much richer
- Cheat sheet PySpark - RDD Basics
- Cheat sheet PySpark - SQL Basics
- Cheat sheet for PySpark (2 page version)
Tip: Try to use as much DataFrame APIs as possible by referring to the PySpark API documentation. When you try to program something, try to search whether there is a function in the API already.
Firstly, we follow the standard steps as in Task 2 of Lab 1 but with some variations in settings, i.e. to request 2 cores for an interactive shell. We also install numpy to our environment for later use.
First log into the Stanage cluster
ssh $USER@stanage.shef.ac.ukYou need to replace $USER with your username (using lowercase and without $).
Once logged in, we can request 2 cores from reserved resources by
srun --account=rse-com6012 --reservation=rse-com6012-2 --cpus-per-task=2 --time=01:00:00 --pty /bin/bashYou will use an interactive session for all essential tasks in this lab. You may change the --time parameter to a longer time if you think you need more time for the lab session. Please note do not request more than two cores during this lab session to ensure all students can get access to the reserved resources. You can explore using more cores outside the lab session when HPC is less busy.
If the reserved resources are not available, request core from the general queue by
srun --pty --cpus-per-task=2 bash -iIf you have created a myspark.sh script in Lab 1 (Task 2), Now set up our conda environment using
source myspark.sh # assuming you copied HPC/myspark.sh to your root directory (see Lab 1 Task 2)If not, use
module load Java/17.0.4
module load Anaconda3/2024.02-1
source activate mysparkNow we can start the PySpark interactive shell by
conda install -y numpy # install numpy, to be used in Task 3. This ONLY needs to be done ONCE. NOT every time.
cd com6012/ScalableML # our main working directory
pyspark --master local[2] # start pyspark with 2 cores requested above by specifying the master parameter 'local[k]'.If you are experiencing a segmentation fault when entering the pyspark interactive shell, run export LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 to fix it. It is recommended to add this line to your myspark.sh file.
As stated in the RDD Programming Guide, Spark allows for parallel operations in a program to be executed on a cluster with the following abstractions:
- Main abstraction: resilient distributed dataset (RDD) is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
- Second abstraction: shared variables can be shared across tasks, or between tasks and the driver program. Two types:
- Broadcast variables, which can be used to cache a value in memory on all nodes
- Accumulators, which are variables that are only “added” to, such as counters and sums.
Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
data = [1, 2, 3, 4, 5]
rddData = sc.parallelize(data)
rddData.collect()
# [1, 2, 3, 4, 5] Note: From Lab 2, I will show output as comments # OUTPUT and show code without >>> for easy copy and paste in the shell.
The number of partitions can be set manually by passing parallelize a second argument to the SparkContext
sc.parallelize(data, 16)
# ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:289Spark tries to set the number of partitions automatically based on the cluster, the rule being 2-4 partitions for every CPU in the cluster.
Spark can also be used for compute-intensive tasks. This code estimates random.random() returns the next random floating point number in the range [0.0, 1.0).
from random import random
def inside(p):
x, y = random(), random()
return x*x + y*y < 1
NUM_SAMPLES = 10000000
count = sc.parallelize(range(0, NUM_SAMPLES),8).filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
# Pi is roughly 3.142717Note that we did not control the seed above so you are not likely to get exactly the same number 3.142717. You can change NUM_SAMPLES to see the difference in precision and time cost.
When a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.
To avoid creating a copy of a large variable for each task, an accessible (read-only!) variable can be kept on each machine - this is useful for particularly large datasets which may be needed for multiple tasks. The data broadcasted this way is cached in serialized form and deserialized before running each task. See Data Serialization for more details about serialization.
Broadcast variables are created from a variable SparkContext.broadcast(v). The broadcast variable is a wrapper around
broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar
# <pyspark.broadcast.Broadcast object at 0x2ac15e088860>
broadcastVar.value
# [1, 2, 3]Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums.
An accumulator is created from an initial value v by calling SparkContext.accumulator(v)
Cluster tasks can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator's value using its value method.
accum = sc.accumulator(0)
accum
# Accumulator<id=0, value=0>
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
# 10
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value
# 20Along with the introduction of SparkSession, the resilient distributed dataset (RDD) was replaced by dataset in Spark version 2.0. Again, these are objects that can be worked on in parallel. The available operations are:
- transformations: produce new datasets
- actions: computations which return results
In PySpark, there is no separate Dataset API as there is in Spark for Scala and Java. PySpark primarily provides the DataFrame API to perform structured data processing.
We will begin by creating DataFrames, and then demonstrate how to print their contents. In the cell below, we create a DataFrame and display some information (with the option to modify the output before printing):
From RDD to DataFrame
rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df = rdd.toDF(["a","b","c"])
rdd
# ParallelCollectionRDD[10] at readRDDFromFile at PythonRDD.scala:289 The number in [10] is the index for RDDs in the shell so it may vary.
Let us examine the DataFrame
df
# DataFrame[a: bigint, b: bigint, c: bigint]
df.show()
# +---+---+---+
# | a| b| c|
# +---+---+---+
# | 1| 2| 3|
# | 4| 5| 6|
# | 7| 8| 9|
# +---+---+---+
df.printSchema()
# root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- c: long (nullable = true)Now let us get RDD from DataFrame
rdd2=df.rdd
rdd2
# MapPartitionsRDD[26] at javaToPython at NativeMethodAccessorImpl.java:0
rdd2.collect() # view the content
# [Row(a=1, b=2, c=3), Row(a=4, b=5, c=6), Row(a=7, b=8, c=9)]This data was downloaded from a classic book on statistical learning.
df = spark.read.load("Data/Advertising.csv", format="csv", inferSchema="true", header="true")
df.show(5) # show the top 5 rows
# 21/02/13 12:10:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
# Header: , TV, radio, newspaper, sales
# Schema: _c0, TV, radio, newspaper, sales
# Expected: _c0 but found:
# CSV file: file:///home/ac1hlu/com6012/ScalableML/Data/Advertising.csv
# +---+-----+-----+---------+-----+
# |_c0| TV|radio|newspaper|sales|
# +---+-----+-----+---------+-----+
# | 1|230.1| 37.8| 69.2| 22.1|
# | 2| 44.5| 39.3| 45.1| 10.4|
# | 3| 17.2| 45.9| 69.3| 9.3|
# | 4|151.5| 41.3| 58.5| 18.5|
# | 5|180.8| 10.8| 58.4| 12.9|
# +---+-----+-----+---------+-----+
# only showing top 5 rowsNote that a warning is given because the first column has an empty header. If we manually specify it, e.g. as index, the warning will disappear.
Recall that CSV files are semi-structured data so here Spark inferred the scheme automatically. Let us take a look.
df.printSchema()
# root
# |-- _c0: integer (nullable = true)
# |-- TV: double (nullable = true)
# |-- radio: double (nullable = true)
# |-- newspaper: double (nullable = true)
# |-- sales: double (nullable = true)Let us remove the first column
df2=df.drop('_c0')
df2.printSchema()
# root
# |-- TV: double (nullable = true)
# |-- radio: double (nullable = true)
# |-- newspaper: double (nullable = true)
# |-- sales: double (nullable = true)We can get summary statistics for numerical columns using .describe().show(), very handy to inspect your (big) data for understanding/debugging.
df2.describe().show()
# +-------+-----------------+------------------+------------------+------------------+
# |summary| TV| radio| newspaper| sales|
# +-------+-----------------+------------------+------------------+------------------+
# | count| 200| 200| 200| 200|
# | mean| 147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
# | stddev|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
# | min| 0.7| 0.0| 0.3| 1.6|
# | max| 296.4| 49.6| 114.0| 27.0|
# +-------+-----------------+------------------+------------------+------------------+MLlib is Spark’s machine learning (ML) library. It provides:
- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- Featurization: feature extraction, transformation, dimensionality reduction, and selection
- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
- Persistence: saving and load algorithms, models, and Pipelines
- Utilities: linear algebra, statistics, data handling, etc.
MLlib allows easy combination of numerous algorithms into a single pipeline using standardized APIs for machine learning algorithms. The key concepts are:
- Dataframe. Dataframes can hold a variety of data types.
- Transformer. Transforms one dataframe into another.
- Estimator. Algorithm which can be fit on a DataFrame to produce a Transformer.
- Pipeline. A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
- Parameter. Transformers and Estimators share a common API for specifying parameters.
A list of some of the available ML features is available at: Extracting, transforming and selecting features.
Clarification on whether Estimator is a transformer. See Estimators
An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.
The example below is based on Section 9.1 of PySpark tutorial.
Let us convert the above data in CSV format to a typical (feature, label) pair for supervised learning. Here we use the Vectors API. You may also review the lambda expressions in python.
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
transformed= transData(df2)
transformed.show(5)
# +-----------------+-----+
# | features|label|
# +-----------------+-----+
# |[230.1,37.8,69.2]| 22.1|
# | [44.5,39.3,45.1]| 10.4|
# | [17.2,45.9,69.3]| 9.3|
# |[151.5,41.3,58.5]| 18.5|
# |[180.8,10.8,58.4]| 12.9|
# +-----------------+-----+
# only showing top 5 rowsThe labels here are real numbers and this is a regression problem. For classification problem, you may need to transform labels (e.g., disease,healthy) to indices with a featureIndexer in Step 5, Section 9.1 of PySpark tutorial.
(trainingData, testData) = transformed.randomSplit([0.6, 0.4], 6012)We set the seed=6012 in the above (see the randomSplit API). Check your train and test data as follows. It is a good practice to keep tracking your data during prototype phase.
trainingData.show(5)
# +---------------+-----+
# | features|label|
# +---------------+-----+
# | [4.1,11.6,5.7]| 3.2|
# | [5.4,29.9,9.4]| 5.3|
# |[7.8,38.9,50.6]| 6.6|
# |[8.7,48.9,75.0]| 7.2|
# |[13.1,0.4,25.6]| 5.3|
# +---------------+-----+
# only showing top 5 rows
testData.show(5)
# +----------------+-----+
# | features|label|
# +----------------+-----+
# | [0.7,39.6,8.7]| 1.6|
# | [7.3,28.1,41.4]| 5.5|
# | [8.4,27.2,2.1]| 5.7|
# | [8.6,2.1,1.0]| 4.8|
# |[11.7,36.9,45.2]| 7.3|
# +----------------+-----+
# only showing top 5 rowsMore details on parameters can be found in the Python API documentation.
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.show(5)
# +----------------+-----+------------------+
# | features|label| prediction|
# +----------------+-----+------------------+
# | [0.7,39.6,8.7]| 1.6|10.497359087823323|
# | [7.3,28.1,41.4]| 5.5| 8.615626828376815|
# | [8.4,27.2,2.1]| 5.7| 8.59859112486577|
# | [8.6,2.1,1.0]| 4.8| 4.027845382391438|
# |[11.7,36.9,45.2]| 7.3| 10.41211129446484|
# +----------------+-----+------------------+
# only showing top 5 rowsYou may see some warnings, which are normal.
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
# Root Mean Squared Error (RMSE) on test data = 1.87125This example is adapted from the ML Pipeline API, with minor changes and additional explanations.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, TokenizerDirectly create DataFrame (for illustration)
training = spark.createDataFrame([
(0, "a b c d e spark 6012", 1.0),
(1, "b d", 0.0),
(2, "spark f g h 6012", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
training.printSchema()
# root
# |-- id: long (nullable = true)
# |-- text: string (nullable = true)
# |-- label: double (nullable = true)
training.show()
# +---+--------------------+-----+
# | id| text|label|
# +---+--------------------+-----+
# | 0|a b c d e spark 6012| 1.0|
# | 1| b d| 0.0|
# | 2| spark f g h 6012| 1.0|
# | 3| hadoop mapreduce| 0.0|
# +---+--------------------+-----+Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])Model fitting
model = pipeline.fit(training)Construct test documents (data), which are unlabeled (id, text) tuples
test = spark.createDataFrame([
(4, "spark i j 6012"),
(5, "l m n"),
(6, "spark 6012 spark"),
(7, "apache hadoop")
], ["id", "text"])
test.show()
# +---+------------------+
# | id| text|
# +---+------------------+
# | 4| spark i j 6012|
# | 5| l m n|
# | 6|spark 6012 spark|
# | 7| apache hadoop|
# +---+------------------+Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
prediction.show()
# +---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
# | id| text| words| features| rawPrediction| probability|prediction|
# +---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
# | 4| spark i j 6012| [spark, i, j, 6012]|(262144,[19036,11...|[-1.0173918675250...|[0.26553574436761...| 1.0|
# | 5| l m n| [l, m, n]|(262144,[1303,526...|[4.76763852580441...|[0.99157121824630...| 0.0|
# | 6|spark 6012 spark|[spark, 6012, spark]|(262144,[111139,1...|[-3.9099070641898...|[0.01964856004327...| 1.0|
# | 7| apache hadoop| [apache, hadoop]|(262144,[68303,19...|[5.80789088699039...|[0.99700523688305...| 0.0|
# +---+----------------+--------------------+--------------------+--------------------+--------------------+----------+
selected = prediction.select("id", "text", "probability", "prediction")
selected.show()
# +---+----------------+--------------------+----------+
# | id| text| probability|prediction|
# +---+----------------+--------------------+----------+
# | 4| spark i j 6012|[0.26553574436761...| 1.0|
# | 5| l m n|[0.99157121824630...| 0.0|
# | 6|spark 6012 spark|[0.01964856004327...| 1.0|
# | 7| apache hadoop|[0.99700523688305...| 0.0|
# +---+----------------+--------------------+----------+
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
# (4, spark i j 6012) --> prob=[0.2655357443676159,0.7344642556323842], prediction=1.000000
# (5, l m n) --> prob=[0.9915712182463081,0.008428781753691883], prediction=0.000000
# (6, spark 6012 spark) --> prob=[0.019648560043272496,0.9803514399567275], prediction=1.000000
# (7, apache hadoop) --> prob=[0.9970052368830581,0.002994763116941912], prediction=0.000000Note: A reference solution will be provided in Blackboard for this part by the following Tuesday.
Starting from this lab, you need to use as many DataFrame functions as possible.
-
On HPC, download the description of the NASA access log data to the
Datadirectory viawget ftp://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html
Load the Aug95 NASA access log data in Lab 1 and create a DataFrame with FIVE columns by specifying the schema according to the description in the downloaded html file. Use this DataFrame for the following questions.
-
Find out the number of unique hosts in total (i.e. in August 1995)?
-
Find out the most frequent visitor, i.e. the host with the largest number of visits.
- Add regularization to the linear regression for advertising example and evaluate the prediction performance against the performance without any regularization. Study at least three different regularization settings.
- Construct another test dataset for the machine learning pipeline for document classification example with three test document samples:
"pyspark hadoop";"spark a b c";"mapreduce spark"and report the prediction probabilities and the predicted labels for these three sample.
Note: NO solutions will be provided for this part.
- Change the number of partitions to a range of values (e.g. 2, 4, 8, 16, ...) and study the time cost for each value (e.g. by plotting the time cost against the number of partitions).
- Change the number of samples to study the variation in precision and time cost.
- Find out the mean and standard deviation of the reply byte size.
- Other questions in Lab 1 Task 6.
- Explore more CSV data of your interest via Google or at Sample CSV data, including insurance, real estate, and sales transactions.
- Explore the UCI Machine Learning Repository to build machine learning pipelines in PySpark for some datasets of your interest.