Skip to content

Commit 55e7f1f

Browse files
committed
complete notebook example w/ spark.ml pipeline
1 parent d4578a9 commit 55e7f1f

File tree

1 file changed

+212
-24
lines changed

1 file changed

+212
-24
lines changed

spark-cluster.ipynb

Lines changed: 212 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"id": "707cc39c-31f3-408e-a61f-8fd612627910",
66
"metadata": {},
77
"source": [
8-
"## Spark cluster (standalone) - Prediction notebook"
8+
"## Spark cluster (standalone) - Prediction with a Pipeline notebook"
99
]
1010
},
1111
{
@@ -15,13 +15,20 @@
1515
"tags": []
1616
},
1717
"source": [
18-
"> Dockerized env : [JupyterLab server => Spark (master <-> 1 worker) ] \n",
19-
"`docker-compose.yml` was (slightly) adapted from this [article](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3) \n",
20-
"\n",
21-
"> Original notebook is heavily modified : \n",
18+
"> Dockerized env : [JupyterLab server => Spark (master <-> 1 worker) ] "
19+
]
20+
},
21+
{
22+
"cell_type": "markdown",
23+
"id": "ead98478-3591-4f6f-8d2d-5b65bbe18d30",
24+
"metadata": {
25+
"tags": []
26+
},
27+
"source": [
28+
"`docker-compose.yml` was (slightly) adapted from this [article](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3), whereras original notebook was heavily modified : \n",
2229
"-random forest regressor instead of the article's linreg \n",
2330
"-use of a Pipeline (pyspark.ml.pipeline) to streamline the whole prediction process \n",
24-
"-no more sql-type queries"
31+
"-no more sql-type queries (personal preferences)"
2532
]
2633
},
2734
{
@@ -45,15 +52,15 @@
4552
},
4653
{
4754
"cell_type": "code",
48-
"execution_count": 26,
55+
"execution_count": 7,
4956
"id": "ac3e8958-5d9c-4e80-9a6f-fd343a3d4dd5",
5057
"metadata": {
5158
"tags": []
5259
},
5360
"outputs": [],
5461
"source": [
5562
"from pyspark.sql import SparkSession\n",
56-
"import pyspark.sql.functions as F\n",
63+
"\n",
5764
"\n",
5865
"# SparkSession\n",
5966
"URL_SPARK = \"spark://spark:7077\"\n",
@@ -74,7 +81,7 @@
7481
"tags": []
7582
},
7683
"source": [
77-
"### Run example - pyspark.sql / pyspark.ml"
84+
"### Run example - pyspark.sql / pyspark.ml, build a ML Pipeline"
7885
]
7986
},
8087
{
@@ -110,7 +117,7 @@
110117
},
111118
{
112119
"cell_type": "code",
113-
"execution_count": 51,
120+
"execution_count": 8,
114121
"id": "888a85f7-5e40-4e90-8a35-3cb1435d1460",
115122
"metadata": {
116123
"tags": []
@@ -152,44 +159,225 @@
152159
"source": [
153160
"# Cache table/dataframe for re-usable table with .cache()\n",
154161
"# caching operation takes place only when a Spark action (count, show, take or write) is also performed on the same dataframe\n",
155-
"df_avocado = spark.read.csv(\n",
162+
"df = spark.read.csv(\n",
156163
" \"data/avocado.csv\", \n",
157164
" header=True, \n",
158165
" inferSchema=True\n",
159166
").cache() # cache transformation\n",
160167
"\n",
161-
"df_avocado.printSchema()\n",
162-
"df_avocado.show(4) # call show() from the cached df_avocado. df_avocado cached in memory right after we call the action (show)"
168+
"df.printSchema()\n",
169+
"df.show(4) # call show() from the cached df_avocado. df_avocado cached in memory right after we call the action (show)"
163170
]
164171
},
165172
{
166173
"cell_type": "markdown",
167174
"id": "75c92350-8990-4534-81cc-4c9da00a40b1",
168-
"metadata": {},
175+
"metadata": {
176+
"tags": []
177+
},
169178
"source": [
170179
"#### Steps overview"
171180
]
172181
},
182+
{
183+
"cell_type": "code",
184+
"execution_count": 9,
185+
"id": "e0068bc2-270c-4e43-beeb-082b404ce297",
186+
"metadata": {
187+
"tags": []
188+
},
189+
"outputs": [],
190+
"source": [
191+
"import pyspark.sql.functions as F\n",
192+
"from pyspark.sql.functions import col\n",
193+
"\n",
194+
"from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler\n",
195+
"from pyspark.ml.regression import RandomForestRegressor\n",
196+
"from pyspark.ml.pipeline import Pipeline"
197+
]
198+
},
173199
{
174200
"cell_type": "markdown",
175-
"id": "a4734d8a-4399-4980-82c3-05940fbf888e",
201+
"id": "b840a5b1-8bd7-4c73-a8c9-133e4983e8dd",
176202
"metadata": {},
177203
"source": [
178-
"- No EDA, has been done a million times, we jump to the implementation directly\n",
179-
"- feature creation from 'Date' : --> yy and mm\n",
180-
"- one hot encoding categorical 'region'\n",
181-
"- scale numerical features ()\n",
182-
"- Drop columns : _c0 (index), Total Bags, Total Volume (strong corr with respective subcategories)\n",
183-
"- Drop transformed columns (if not aready done): Date, region"
204+
"- Steps differs a bit from sklearn. Search for 'transformers' and 'estimators'\n",
205+
"- No EDA, has been done a million times on this dataset. \n",
206+
"- Format data \n",
207+
"-Feature creation from 'Date' : yy and mm \n",
208+
"-Drop columns : Total Bags, Total Volume (strong corr with respective subcategories) ; could also be done in pipeline tho ?\n",
209+
"- Pipeline (encode etc...) \n",
210+
"-One hot encoding categorical 'region' (before that, use StringIndexer) \n",
211+
"-Drop transformed columns: Date, region. Note : unlike scikit-learn col transf, pyspark adds new col when transforming \n",
212+
"- Consolidate all remaining features in a single vector using VectorAssembler\n",
213+
"- Scale numerical features using StandardScaler <- would be earlier in a sklearn pipeline\n",
214+
"- Predict"
215+
]
216+
},
217+
{
218+
"cell_type": "markdown",
219+
"id": "ee3ac793-eff8-4d36-ae00-96dd66fd2d19",
220+
"metadata": {
221+
"tags": []
222+
},
223+
"source": [
224+
"#### Format Data"
184225
]
185226
},
186227
{
187228
"cell_type": "code",
188-
"execution_count": null,
189-
"id": "95442a63-b7d7-409a-95f2-6650cbe6bc5b",
229+
"execution_count": 10,
230+
"id": "ea5b4865-062b-491a-bf10-1242d46d358c",
231+
"metadata": {},
232+
"outputs": [
233+
{
234+
"name": "stdout",
235+
"output_type": "stream",
236+
"text": [
237+
"+------------+-----------+----------+-----------+----------+----------+-----------+------------+----+------+----------+-----+\n",
238+
"|AveragePrice|Medium Size|Large Size|XLarge Size|Small Bags|Large Bags|XLarge Bags| type|year|region|Year Index|Month|\n",
239+
"+------------+-----------+----------+-----------+----------+----------+-----------+------------+----+------+----------+-----+\n",
240+
"| 1.33| 1036.74| 54454.85| 48.16| 8603.62| 93.25| 0.0|conventional|2015|Albany| 15| 12|\n",
241+
"| 1.35| 674.28| 44638.81| 58.33| 9408.07| 97.49| 0.0|conventional|2015|Albany| 15| 12|\n",
242+
"| 0.93| 794.7| 109149.67| 130.5| 8042.21| 103.14| 0.0|conventional|2015|Albany| 15| 12|\n",
243+
"| 1.08| 1132.0| 71976.41| 72.58| 5677.4| 133.76| 0.0|conventional|2015|Albany| 15| 12|\n",
244+
"+------------+-----------+----------+-----------+----------+----------+-----------+------------+----+------+----------+-----+\n",
245+
"only showing top 4 rows\n",
246+
"\n"
247+
]
248+
}
249+
],
250+
"source": [
251+
"# convert 'year' yyyy to yy (yyyy - 2000, since we have 2015-2018 values)\n",
252+
"df = df.withColumn('Year Index', col('Year') - 2000)\n",
253+
"\n",
254+
"# extract month from 'Date' timestamps col\n",
255+
"df = df.withColumn('Month', F.month('Date'))\n",
256+
"\n",
257+
"# drop \"useless\" columns ; from multiple notebooks on this particular dataset : \"Total Bags\", \"Total Volume\" & index (c_0)\n",
258+
"# /!\\ Optional : not really useful tho, as we will assemble a features (the ones we're interested in) vector later in the pipeline\n",
259+
"drop_cols = (\"Total Bags\", \"Total Volume\", \"_c0\", \"Date\") # we keep \"year\" just to show we do not need to drop it.\n",
260+
"df = df.drop(*drop_cols)\n",
261+
"\n",
262+
"# rename columns avocado sizes columns for clarity \n",
263+
"df = df.withColumnRenamed(\"4046\", \"Medium Size\").withColumnRenamed(\"4225\", \"Large Size\").withColumnRenamed(\"4770\", \"XLarge Size\")\n",
264+
"df.show(4)"
265+
]
266+
},
267+
{
268+
"cell_type": "markdown",
269+
"id": "4dfcd69e-fbdb-4eff-97fb-9b6e3a58a06f",
270+
"metadata": {
271+
"tags": []
272+
},
273+
"source": [
274+
"#### Build Pipeline (encode, vectorize, scaling)"
275+
]
276+
},
277+
{
278+
"cell_type": "code",
279+
"execution_count": 14,
280+
"id": "382272ea-07aa-43a4-af0f-681b332af34d",
190281
"metadata": {},
191282
"outputs": [],
192-
"source": []
283+
"source": [
284+
"# 1. Must use StringIndexer before encoding categorical features. (OHE input is category indices)\n",
285+
"str_indexer = StringIndexer(inputCols=['region','type'], outputCols=['region_str', 'type_str'])\n",
286+
"\n",
287+
"# 2. Encoder categorical features. \n",
288+
"# Unlike sklearn, transformations add new columns, keeping inputs. So we keep track of changes with outp cols names.\n",
289+
"# Spark OHE is different from sklearn’s OneHotEncoder which keeps all categories. The output vectors are sparse\n",
290+
"ohe = OneHotEncoder(\n",
291+
" inputCols=['region_str','type_str'], \n",
292+
" outputCols=['region_str_ohe','type_str_ohe']\n",
293+
")\n",
294+
"\n",
295+
"# 3. Assemble (used) features in one vector\n",
296+
"assembler = VectorAssembler(\n",
297+
" inputCols=[\n",
298+
" 'Medium Size',\n",
299+
" 'Large Size',\n",
300+
" 'XLarge Size',\n",
301+
" 'Small Bags',\n",
302+
" 'Large Bags',\n",
303+
" 'XLarge Bags',\n",
304+
" 'region_str_ohe',\n",
305+
" 'type_str_ohe'\n",
306+
" ], outputCol='features')\n",
307+
"\n",
308+
"# 4. Standardize numerical features\n",
309+
"scaler = StandardScaler(inputCol='features',outputCol='scaled_features')\n",
310+
"\n",
311+
"# 5. define regressor\n",
312+
"rf_regressor = RandomForestRegressor(featuresCol='scaled_features',labelCol='AveragePrice', numTrees=50, maxDepth=15)\n",
313+
"\n",
314+
"# 6. build Pipeline\n",
315+
"pipeline = Pipeline(stages=[str_indexer, ohe, assembler, scaler, rf_regressor])\n"
316+
]
317+
},
318+
{
319+
"cell_type": "markdown",
320+
"id": "b81532fd-1edf-4a7b-a665-676ee9cb21cd",
321+
"metadata": {
322+
"tags": []
323+
},
324+
"source": [
325+
"#### Simple randomforest modeling : train-test split, apply Pipeline to train & test, eval"
326+
]
327+
},
328+
{
329+
"cell_type": "markdown",
330+
"id": "c3332499-66a1-4f79-be00-bcefcbda212a",
331+
"metadata": {},
332+
"source": [
333+
"Crude attempt, no cv, some default rf parameters. \n",
334+
"For parameters tuning, look up for pyspark.ml.tuning / CrossValidator, ParamGridBuilder. Not used here"
335+
]
336+
},
337+
{
338+
"cell_type": "code",
339+
"execution_count": 18,
340+
"id": "ae2ebec7-8379-45bd-b375-faac5c64824c",
341+
"metadata": {
342+
"tags": []
343+
},
344+
"outputs": [
345+
{
346+
"data": {
347+
"text/plain": [
348+
"0.1975694758480664"
349+
]
350+
},
351+
"execution_count": 18,
352+
"metadata": {},
353+
"output_type": "execute_result"
354+
}
355+
],
356+
"source": [
357+
"from pyspark.ml.evaluation import RegressionEvaluator\n",
358+
"\n",
359+
"# split\n",
360+
"train, test = df.randomSplit([.8,.2])\n",
361+
"\n",
362+
"# fit the model\n",
363+
"model = pipeline.fit(train)\n",
364+
"\n",
365+
"\n",
366+
"# apply the model to the test set\n",
367+
"prediction = model.transform(test)\n",
368+
"eval = RegressionEvaluator(predictionCol='prediction',\n",
369+
" labelCol='AveragePrice', metricName='rmse')\n",
370+
"\n",
371+
"eval.evaluate(prediction)"
372+
]
373+
},
374+
{
375+
"cell_type": "markdown",
376+
"id": "5a769698-04bc-4eda-9edc-63a4bfd11d25",
377+
"metadata": {},
378+
"source": [
379+
"For reference, original article, using Linear regression + cv : rmse of .28"
380+
]
193381
}
194382
],
195383
"metadata": {

0 commit comments

Comments
 (0)