Skip to content

Commit 015ddb2

Browse files
authored
Merge pull request #8 from ruixinxu/nbsample2
add more scala samples
2 parents 77f57fe + b3aface commit 015ddb2

5 files changed

+15356
-1
lines changed
Lines changed: 384 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,384 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"source": [
6+
"# Access data on Azure Data Lake Storage Gen2 (ADLS Gen2) with Synapse Spark\n",
7+
"\n",
8+
"Azure Data Lake Storage Gen2 (ADLS Gen2) is used as the storage account associated with a Synapse workspace. A synapse workspace can have a default ADLS Gen2 storage account and additional linked storage accounts. \n",
9+
"\n",
10+
"You can access data on ADLS Gen2 with Synapse Spark via following URL:\n",
11+
" \n",
12+
" abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>\n",
13+
"\n",
14+
"This notebook provides examples of how to read data from ADLS Gen2 account into a Spark context and how to write the output of Spark jobs directly into an ADLS Gen2 location.\n",
15+
"\n",
16+
"## Pre-requisites\n",
17+
"Synapse leverage AAD pass-through to access any ADLS Gen2 account (or folder) to which you have a **Blob Storage Contributor** permission. No credentials or access token is required. "
18+
],
19+
"metadata": {}
20+
},
21+
{
22+
"cell_type": "markdown",
23+
"source": [
24+
"## Load a sample data\n",
25+
"\n",
26+
"Let's first load the [public holidays](https://azure.microsoft.com/en-us/services/open-datasets/catalog/public-holidays/) from Azure Open datasets as a sample."
27+
],
28+
"metadata": {}
29+
},
30+
{
31+
"cell_type": "code",
32+
"source": [
33+
"// set blob storage account connection for open dataset\n",
34+
"\n",
35+
"val hol_blob_account_name = \"azureopendatastorage\"\n",
36+
"val hol_blob_container_name = \"holidaydatacontainer\"\n",
37+
"val hol_blob_relative_path = \"Processed\"\n",
38+
"val hol_blob_sas_token = \"\"\n",
39+
"\n",
40+
"val hol_wasbs_path = f\"wasbs://$hol_blob_container_name@$hol_blob_account_name.blob.core.windows.net/$hol_blob_relative_path\"\n",
41+
"spark.conf.set(f\"fs.azure.sas.$hol_blob_container_name.$hol_blob_account_name.blob.core.windows.net\",hol_blob_sas_token)"
42+
],
43+
"outputs": [
44+
{
45+
"output_type": "execute_result",
46+
"execution_count": 5,
47+
"data": {
48+
"text/plain": [
49+
"hol_blob_account_name: String = azureopendatastorage\n",
50+
"hol_blob_container_name: String = holidaydatacontainer\n",
51+
"hol_blob_relative_path: String = Processed\n",
52+
"hol_blob_sas_token: String = \"\"\n",
53+
"hol_wasbs_path: String = wasbs://holidaydatacontainer@azureopendatastorage.blob.core.windows.net/Processed"
54+
]
55+
},
56+
"metadata": {}
57+
}
58+
],
59+
"execution_count": 5,
60+
"metadata": {}
61+
},
62+
{
63+
"cell_type": "code",
64+
"source": [
65+
"// load the sample data as a Spark DataFrame\n",
66+
"val hol_df = spark.read.parquet(hol_wasbs_path) \n",
67+
"hol_df.show(5, truncate = false)"
68+
],
69+
"outputs": [
70+
{
71+
"output_type": "execute_result",
72+
"execution_count": 6,
73+
"data": {
74+
"text/plain": [
75+
"hol_df: org.apache.spark.sql.DataFrame = [countryOrRegion: string, holidayName: string ... 4 more fields]\n",
76+
"+---------------+--------------------------+--------------------------+-------------+-----------------+-------------------+\n",
77+
"|countryOrRegion|holidayName |normalizeHolidayName |isPaidTimeOff|countryRegionCode|date |\n",
78+
"+---------------+--------------------------+--------------------------+-------------+-----------------+-------------------+\n",
79+
"|Argentina |Año Nuevo [New Year's Day]|Año Nuevo [New Year's Day]|null |AR |1970-01-01 00:00:00|\n",
80+
"|Australia |New Year's Day |New Year's Day |null |AU |1970-01-01 00:00:00|\n",
81+
"|Austria |Neujahr |Neujahr |null |AT |1970-01-01 00:00:00|\n",
82+
"|Belgium |Nieuwjaarsdag |Nieuwjaarsdag |null |BE |1970-01-01 00:00:00|\n",
83+
"|Brazil |Ano novo |Ano novo |null |BR |1970-01-01 00:00:00|\n",
84+
"+---------------+--------------------------+--------------------------+-------------+-----------------+-------------------+\n",
85+
"only showing top 5 rows"
86+
]
87+
},
88+
"metadata": {}
89+
}
90+
],
91+
"execution_count": 6,
92+
"metadata": {}
93+
},
94+
{
95+
"cell_type": "markdown",
96+
"source": [
97+
"## Write data to the default ADLS Gen2 storage\n",
98+
"\n",
99+
"We are going to write the spark dateframe to your default ADLS Gen2 storage account.\n"
100+
],
101+
"metadata": {}
102+
},
103+
{
104+
"cell_type": "code",
105+
"source": [
106+
"// set your storage account connection\n",
107+
"\n",
108+
"val account_name = \"\" // replace with your blob name\n",
109+
"val container_name = \"\" //replace with your container name\n",
110+
"val relative_path = \"\" //replace with your relative folder path\n",
111+
"\n",
112+
"val adls_path = f\"abfss://$container_name@$account_name.dfs.core.windows.net/$relative_path\""
113+
],
114+
"outputs": [
115+
{
116+
"output_type": "execute_result",
117+
"execution_count": 7,
118+
"data": {
119+
"text/plain": [
120+
"account_name: String = ltianwestus2gen2\n",
121+
"container_name: String = mydefault\n",
122+
"relative_path: String = samplenb/\n",
123+
"adls_path: String = abfss://mydefault@ltianwestus2gen2.dfs.core.windows.net/samplenb/"
124+
]
125+
},
126+
"metadata": {}
127+
}
128+
],
129+
"execution_count": 7,
130+
"metadata": {}
131+
},
132+
{
133+
"cell_type": "markdown",
134+
"source": [
135+
"### Save a dataframe as Parquet, JSON or CSV\n",
136+
"If you have a dataframe, you can save it to Parquet or JSON with the .write.parquet(), .write.json() and .write.csv() methods respectively.\n",
137+
"\n",
138+
"Dataframes can be saved in any format, regardless of the input format.\n"
139+
],
140+
"metadata": {}
141+
},
142+
{
143+
"cell_type": "code",
144+
"source": [
145+
"// set the path for the output file\n",
146+
"\n",
147+
"val parquet_path = adls_path + \"holiday.parquet\"\n",
148+
"val json_path = adls_path + \"holiday.json\"\n",
149+
"val csv_path = adls_path + \"holiday.csv\""
150+
],
151+
"outputs": [
152+
{
153+
"output_type": "execute_result",
154+
"execution_count": 9,
155+
"data": {
156+
"text/plain": [
157+
"parquet_path: String = abfss://mydefault@ltianwestus2gen2.dfs.core.windows.net/samplenb/holiday.parquet\n",
158+
"json_path: String = abfss://mydefault@ltianwestus2gen2.dfs.core.windows.net/samplenb/holiday.json\n",
159+
"csv_path: String = abfss://mydefault@ltianwestus2gen2.dfs.core.windows.net/samplenb/holiday.csv"
160+
]
161+
},
162+
"metadata": {}
163+
}
164+
],
165+
"execution_count": 9,
166+
"metadata": {}
167+
},
168+
{
169+
"cell_type": "code",
170+
"source": [
171+
"import org.apache.spark.sql.SaveMode\n",
172+
"\n",
173+
"hol_df.write.mode(SaveMode.Overwrite).parquet(parquet_path)\n",
174+
"hol_df.write.mode(SaveMode.Overwrite).json(json_path)\n",
175+
"hol_df.write.mode(SaveMode.Overwrite).option(\"header\", \"true\").csv(csv_path)"
176+
],
177+
"outputs": [
178+
{
179+
"output_type": "execute_result",
180+
"execution_count": 10,
181+
"data": {
182+
"text/plain": [
183+
"import org.apache.spark.sql.SaveMode"
184+
]
185+
},
186+
"metadata": {}
187+
}
188+
],
189+
"execution_count": 10,
190+
"metadata": {}
191+
},
192+
{
193+
"cell_type": "markdown",
194+
"source": [
195+
"### Save a dataframe as text files\n",
196+
"If you have a dataframe that you want ot save as text file, you must first covert it to an RDD and then save that RDD as a text file.\n"
197+
],
198+
"metadata": {}
199+
},
200+
{
201+
"cell_type": "code",
202+
"source": [
203+
"// Define the text file path and covert spark dataframe into RDD\n",
204+
"val text_path = adls_path + \"holiday.txt\"\n",
205+
"val hol_RDD = hol_df.rdd"
206+
],
207+
"outputs": [
208+
{
209+
"output_type": "execute_result",
210+
"execution_count": 12,
211+
"data": {
212+
"text/plain": [
213+
"text_path: String = abfss://mydefault@ltianwestus2gen2.dfs.core.windows.net/samplenb/holiday.txt\n",
214+
"hol_RDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[24] at rdd at <console>:30"
215+
]
216+
},
217+
"metadata": {}
218+
}
219+
],
220+
"execution_count": 12,
221+
"metadata": {}
222+
},
223+
{
224+
"cell_type": "code",
225+
"source": [
226+
"// Save RDD as text file\n",
227+
"hol_RDD.saveAsTextFile(text_path)"
228+
],
229+
"outputs": [],
230+
"execution_count": 14,
231+
"metadata": {}
232+
},
233+
{
234+
"cell_type": "markdown",
235+
"source": [
236+
"# Read data from the default ADLS Gen2 storage\n"
237+
],
238+
"metadata": {}
239+
},
240+
{
241+
"cell_type": "markdown",
242+
"source": [
243+
"### Create a dataframe from parquet files\n"
244+
],
245+
"metadata": {}
246+
},
247+
{
248+
"cell_type": "code",
249+
"source": [
250+
"val df_parquet = spark.read.parquet(parquet_path)"
251+
],
252+
"outputs": [
253+
{
254+
"output_type": "execute_result",
255+
"execution_count": 15,
256+
"data": {
257+
"text/plain": [
258+
"df_parquet: org.apache.spark.sql.DataFrame = [countryOrRegion: string, holidayName: string ... 4 more fields]"
259+
]
260+
},
261+
"metadata": {}
262+
}
263+
],
264+
"execution_count": 15,
265+
"metadata": {}
266+
},
267+
{
268+
"cell_type": "markdown",
269+
"source": [
270+
"### Create a dataframe from JSON files\n"
271+
],
272+
"metadata": {}
273+
},
274+
{
275+
"cell_type": "code",
276+
"source": [
277+
"val df_json = spark.read.json(json_path)"
278+
],
279+
"outputs": [
280+
{
281+
"output_type": "execute_result",
282+
"execution_count": 16,
283+
"data": {
284+
"text/plain": [
285+
"df_json: org.apache.spark.sql.DataFrame = [countryOrRegion: string, countryRegionCode: string ... 4 more fields]"
286+
]
287+
},
288+
"metadata": {}
289+
}
290+
],
291+
"execution_count": 16,
292+
"metadata": {}
293+
},
294+
{
295+
"cell_type": "markdown",
296+
"source": [
297+
"### Create a dataframe from CSV files\n"
298+
],
299+
"metadata": {}
300+
},
301+
{
302+
"cell_type": "code",
303+
"source": [
304+
"val df_csv = spark.read.csv(csv_path, header = 'true')"
305+
],
306+
"outputs": [
307+
{
308+
"output_type": "error",
309+
"ename": "Error",
310+
"evalue": "<console>:1: error: unclosed character literal",
311+
"traceback": [
312+
"Error : <console>:1: error: unclosed character literal",
313+
"val df_csv = spark.read.csv(csv_path, header = 'true')\n",
314+
" ^\n"
315+
]
316+
}
317+
],
318+
"execution_count": 17,
319+
"metadata": {}
320+
},
321+
{
322+
"cell_type": "markdown",
323+
"source": [
324+
"### Create an RDD from text file\n"
325+
],
326+
"metadata": {}
327+
},
328+
{
329+
"cell_type": "code",
330+
"source": [
331+
"val text = sc.textFile(text_path)"
332+
],
333+
"outputs": [
334+
{
335+
"output_type": "execute_result",
336+
"execution_count": 18,
337+
"data": {
338+
"text/plain": [
339+
"text: org.apache.spark.rdd.RDD[String] = abfss://mydefault@ltianwestus2gen2.dfs.core.windows.net/samplenb/holiday.txt MapPartitionsRDD[33] at textFile at <console>:32"
340+
]
341+
},
342+
"metadata": {}
343+
}
344+
],
345+
"execution_count": 18,
346+
"metadata": {}
347+
},
348+
{
349+
"cell_type": "code",
350+
"source": [
351+
"text.take(5).foreach(println)"
352+
],
353+
"outputs": [
354+
{
355+
"output_type": "execute_result",
356+
"execution_count": 19,
357+
"data": {
358+
"text/plain": [
359+
"[Argentina,Año Nuevo [New Year's Day],Año Nuevo [New Year's Day],null,AR,1970-01-01 00:00:00.0]\n",
360+
"[Australia,New Year's Day,New Year's Day,null,AU,1970-01-01 00:00:00.0]\n",
361+
"[Austria,Neujahr,Neujahr,null,AT,1970-01-01 00:00:00.0]\n",
362+
"[Belgium,Nieuwjaarsdag,Nieuwjaarsdag,null,BE,1970-01-01 00:00:00.0]\n",
363+
"[Brazil,Ano novo,Ano novo,null,BR,1970-01-01 00:00:00.0]"
364+
]
365+
},
366+
"metadata": {}
367+
}
368+
],
369+
"execution_count": 19,
370+
"metadata": {}
371+
}
372+
],
373+
"metadata": {
374+
"saveOutput": true,
375+
"language_info": {
376+
"name": "scala"
377+
},
378+
"nteract": {
379+
"version": "0.22.4"
380+
}
381+
},
382+
"nbformat": 4,
383+
"nbformat_minor": 2
384+
}

0 commit comments

Comments
 (0)