|
13 | 13 |
|
14 | 14 | # MAGIC %md |
15 | 15 | # MAGIC ## Overview |
16 | | -# MAGIC |
| 16 | +# MAGIC |
17 | 17 | # MAGIC ### In this notebook you: |
18 | 18 | # MAGIC * Use `Databricks Autoloader` to import the ad impression and conversion data generated in the notebook `01_intro`. |
19 | 19 | # MAGIC * Write the data out in `Delta` format. |
|
23 | 23 |
|
24 | 24 | # MAGIC %md |
25 | 25 | # MAGIC ## Step 1: Configure the Environment |
26 | | -# MAGIC |
| 26 | +# MAGIC |
27 | 27 | # MAGIC In this step, we will: |
28 | 28 | # MAGIC 1. Import libraries |
29 | 29 | # MAGIC 2. Run `utils` notebook to gain access to the function `get_params` |
|
63 | 63 | # COMMAND ---------- |
64 | 64 |
|
65 | 65 | params = get_params() |
| 66 | +catalog_name = params['catalog_name'] |
66 | 67 | database_name = params['database_name'] |
67 | 68 | raw_data_path = params['raw_data_path'] |
68 | 69 | bronze_tbl_path = params['bronze_tbl_path'] |
|
71 | 72 |
|
72 | 73 | # MAGIC %md |
73 | 74 | # MAGIC ## Step 2: Load Data using Databricks Auto Loader |
74 | | -# MAGIC |
| 75 | +# MAGIC |
75 | 76 | # MAGIC In this step, we will: |
76 | 77 | # MAGIC 1. Define the schema of the synthetic data generated in `01_load_data` |
77 | 78 | # MAGIC 2. Read the synthetic data into a dataframe using Auto Loader |
|
82 | 83 | # MAGIC %md |
83 | 84 | # MAGIC But, what is Auto Loader? |
84 | 85 | # MAGIC * Auto Loader incrementally and efficiently loads new data files as they arrive in [S3](https://docs.databricks.com/spark/latest/structured-streaming/auto-loader.html) or [Azure Blog Storage](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader). This is enabled by providing a Structured Streaming source called `cloudFiles`. |
85 | | -# MAGIC |
| 86 | +# MAGIC |
86 | 87 | # MAGIC * Auto Loader internally keeps tracks of what files have been processed to provide exactly-once semantics, so you do not need to manage any state information yourself. |
87 | | -# MAGIC |
| 88 | +# MAGIC |
88 | 89 | # MAGIC * Auto Loader supports two modes for detecting when new files arrive: |
89 | 90 | # MAGIC |
90 | 91 | # MAGIC * `Directory listing:` Identifies new files by parallel listing of the input directory. Quick to get started since no permission configurations are required. Suitable for scenarios where only a few files need to be streamed in on a regular basis. |
|
118 | 119 | .option("cloudFiles.region", "us-west-2") \ |
119 | 120 | .option("cloudFiles.includeExistingFiles", "true") \ |
120 | 121 | .schema(schema) \ |
121 | | - .load(raw_data_path) |
| 122 | + .load(raw_data_path) |
122 | 123 |
|
123 | 124 | # COMMAND ---------- |
124 | 125 |
|
|
134 | 135 |
|
135 | 136 | # MAGIC %md |
136 | 137 | # MAGIC ## Step 3: Write Data to Delta Lake |
137 | | -# MAGIC |
| 138 | +# MAGIC |
138 | 139 | # MAGIC In this section of the solution accelerator, we write our data out to [Delta Lake](https://delta.io/) and then create a table (and database) for easy access and queryability. |
139 | | -# MAGIC |
| 140 | +# MAGIC |
140 | 141 | # MAGIC * Delta Lake is an open-source project that enables building a **Lakehouse architecture** on top of existing storage systems such as S3, ADLS, GCS, and HDFS. |
141 | 142 | # MAGIC * Information on the **Lakehouse Architecture** can be found in this [paper](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf) that was presented at [CIDR 2021](http://cidrdb.org/cidr2021/index.html) and in this [video](https://www.youtube.com/watch?v=RU2dXoVU8hY) |
142 | | -# MAGIC |
| 143 | +# MAGIC |
143 | 144 | # MAGIC * Key features of Delta Lake include: |
144 | 145 | # MAGIC * **ACID Transactions**: Ensures data integrity and read consistency with complex, concurrent data pipelines. |
145 | 146 | # MAGIC * **Unified Batch and Streaming Source and Sink**: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box. |
|
157 | 158 |
|
158 | 159 | # COMMAND ---------- |
159 | 160 |
|
160 | | -raw_data_df.writeStream.format("delta") \ |
161 | | - .trigger(once=True) \ |
162 | | - .option("checkpointLocation", bronze_tbl_path+"/checkpoint") \ |
163 | | - .start(bronze_tbl_path) \ |
164 | | - .awaitTermination() |
165 | | - |
166 | | -# COMMAND ---------- |
167 | | - |
168 | | -# MAGIC %md |
169 | | -# MAGIC ## Step 4: Create Database |
170 | | - |
171 | | -# COMMAND ---------- |
172 | | - |
173 | | -# Delete the old database and tables if needed |
174 | | -_ = spark.sql('DROP DATABASE IF EXISTS {} CASCADE'.format(database_name)) |
175 | | - |
176 | | -# Create database to house tables |
177 | | -_ = spark.sql('CREATE DATABASE {}'.format(database_name)) |
178 | | - |
179 | | -# COMMAND ---------- |
180 | | - |
181 | | -# MAGIC %md |
182 | | -# MAGIC ## Step 5: Create bronze-level table in Delta format |
183 | | -# MAGIC |
184 | | -# MAGIC * **Note:** this step will produce an exception if it is run before writeStream in step 3 is initialized. |
185 | | -# MAGIC |
186 | | -# MAGIC * The nomenclature of bronze, silver, and gold tables correspond with a commonly used data modeling approach known as multi-hop architecture. |
187 | | -# MAGIC * Additional information about this pattern can be found [here](https://databricks.com/blog/2019/08/14/productionizing-machine-learning-with-delta-lake.html). |
188 | | - |
189 | | -# COMMAND ---------- |
190 | | - |
191 | | -# Create bronze table |
192 | | -_ = spark.sql(''' |
193 | | - CREATE TABLE `{}`.bronze |
194 | | - USING DELTA |
195 | | - LOCATION '{}' |
196 | | - '''.format(database_name,bronze_tbl_path)) |
| 161 | +dbutils.fs.rm(bronze_tbl_path+"/checkpoint", recurse=True) |
197 | 162 |
|
198 | 163 | # COMMAND ---------- |
199 | 164 |
|
200 | | -# MAGIC %md |
201 | | -# MAGIC ## Step 6: View the bronze table |
202 | | -# MAGIC |
203 | | -# MAGIC Using `spark.table` here enables use of Python. An alternative approach is to query the data directly using SQL. This will be shown in the `03_data_prep` notebook. |
204 | | - |
205 | | -# COMMAND ---------- |
206 | | - |
207 | | -bronze_tbl = spark.table("{}.bronze".format(database_name)) |
208 | | - |
209 | | -# COMMAND ---------- |
210 | | - |
211 | | -display(bronze_tbl) |
| 165 | +raw_data_df.writeStream.format("delta") \ |
| 166 | + .trigger(availableNow=True) \ |
| 167 | + .option("checkpointLocation", bronze_tbl_path+"/checkpoint") \ |
| 168 | + .toTable(f"{catalog_name}.{database_name}.bronze") |
212 | 169 |
|
213 | 170 | # COMMAND ---------- |
214 | 171 |
|
|
220 | 177 |
|
221 | 178 | # MAGIC %md |
222 | 179 | # MAGIC Copyright Databricks, Inc. [2021]. The source in this notebook is provided subject to the [Databricks License](https://databricks.com/db-license-source). All included or referenced third party libraries are subject to the licenses set forth below. |
223 | | -# MAGIC |
| 180 | +# MAGIC |
224 | 181 | # MAGIC |Library Name|Library license | Library License URL | Library Source URL | |
225 | 182 | # MAGIC |---|---|---|---| |
226 | 183 | # MAGIC |Matplotlib|Python Software Foundation (PSF) License |https://matplotlib.org/stable/users/license.html|https://github.com/matplotlib/matplotlib| |
|
0 commit comments