diff --git a/final_project/Starter Streaming Tweet Sentiment - Spring 2024 Final Project.ipynb b/final_project/Starter Streaming Tweet Sentiment - Spring 2024 Final Project.ipynb index 7218b56..c4dd768 100644 --- a/final_project/Starter Streaming Tweet Sentiment - Spring 2024 Final Project.ipynb +++ b/final_project/Starter Streaming Tweet Sentiment - Spring 2024 Final Project.ipynb @@ -4,10 +4,7 @@ "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, + "cellMetadata": {}, "inputWidgets": {}, "nuid": "fc749083-2211-485c-943f-640e198b2c70", "showTitle": false, @@ -153,10 +150,7 @@ "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, + "cellMetadata": {}, "inputWidgets": {}, "nuid": "e8ce7c13-7f8f-4e11-b50c-fdaebbcf3f99", "showTitle": false, @@ -179,73 +173,93 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "67bafbb7-d34e-4e86-b159-b2e062faa1d0", + "nuid": "edca168a-2f44-4c5f-a655-3a0805f5ba9c", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import time \n", + "import tqdm\n", + "from pyspark.sql.types import StructType, StructField, StringType,TimestampType\n", + "from pyspark.sql.functions import input_file_name, current_timestamp\n", + "from pyspark.sql.functions import col\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import col, isnan, when, count\n", + "from pyspark.sql.functions import desc\n", + "import matplotlib.pyplot as plt\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import col, udf, explode, lower, regexp_replace,to_timestamp,from_unixtime\n", + "from pyspark.sql.types import StringType, ArrayType\n", + "import string\n", + "import nltk\n", + "from nltk.corpus import stopwords\n", + "from pyspark.sql.functions import length\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "2b7624b4-0aa5-46b7-9767-52f462b2c574", "showTitle": false, "title": "" } }, - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.databricks.v1+bamboolib_hint": "{\"pd.DataFrames\": [], \"version\": \"0.0.1\"}", - "text/plain": [] + "source": [ + "## 2.0 Use the utility functions to ...\n", + "- Read the source file directory listing\n", + "- Count the source files (how many are there?)\n", + "- print the contents of one of the files" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 }, - "metadata": {}, - "output_type": "display_data" - }, - { - "output_type": "display_data", - "data": { - "text/html": [ - "\n", - "

VERY IMPORTANT TO UNDERSTAND THE USE OF THESE VARIABLES!
Please ask if you are confused about their use.

\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "
Variable NameValueDescription
TWEET_BUCKET_NAMEvoc-75-databricks-dataAWS S3 Bucket Name where the tweets are coming into your system.
TWEET_BUCKET_URLhttps://voc-75-databricks-data.s3.amazonaws.com/AWS S3 Bucket URL where the tweets are coming into your system.
TWEET_SOURCE_PATHs3a://voc-75-databricks-data/voc_volume/AWS S3 Path where the tweets are coming into your system.
USER_DIR/tmp/labuser104917-2387441/Path to the local storage (dbfs) for your project.
BRONZE_CHECKPOINT/tmp/labuser104917-2387441/bronze.checkpointStore your Bronze Checkpoint data here.
BRONZE_DELTA/tmp/labuser104917-2387441/bronze.deltaStore your Bronze Delta Table here.
SILVER_CHECKPOINT/tmp/labuser104917-2387441/silver.checkpointStore your Silver Checkpoint data here.
SILVER_DELTA/tmp/labuser104917-2387441/silver.deltaStore your Silver Delta Table here.
GOLD_CHECKPOINT/tmp/labuser104917-2387441/gold.checkpointStore your Gold Checkpoint data here.
GOLD_DELTA/tmp/labuser104917-2387441/gold.deltaStore your Gold Delta Table here.
MODEL_NAMEHF_TWEET_SENTIMENTLoad this production model
HF_MODEL_NAMEfiniteautomata/bertweet-base-sentiment-analysisThe Hugging Face Model for Tweet sentiment classification: https://huggingface.co/finiteautomata/bertweet-base-sentiment-analysis
\n" - ] - }, - "metadata": { - "application/vnd.databricks.v1+output": { - "addedWidgets": {}, - "arguments": {}, - "data": "\n

VERY IMPORTANT TO UNDERSTAND THE USE OF THESE VARIABLES!
Please ask if you are confused about their use.

\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n
Variable NameValueDescription
TWEET_BUCKET_NAMEvoc-75-databricks-dataAWS S3 Bucket Name where the tweets are coming into your system.
TWEET_BUCKET_URLhttps://voc-75-databricks-data.s3.amazonaws.com/AWS S3 Bucket URL where the tweets are coming into your system.
TWEET_SOURCE_PATHs3a://voc-75-databricks-data/voc_volume/AWS S3 Path where the tweets are coming into your system.
USER_DIR/tmp/labuser104917-2387441/Path to the local storage (dbfs) for your project.
BRONZE_CHECKPOINT/tmp/labuser104917-2387441/bronze.checkpointStore your Bronze Checkpoint data here.
BRONZE_DELTA/tmp/labuser104917-2387441/bronze.deltaStore your Bronze Delta Table here.
SILVER_CHECKPOINT/tmp/labuser104917-2387441/silver.checkpointStore your Silver Checkpoint data here.
SILVER_DELTA/tmp/labuser104917-2387441/silver.deltaStore your Silver Delta Table here.
GOLD_CHECKPOINT/tmp/labuser104917-2387441/gold.checkpointStore your Gold Checkpoint data here.
GOLD_DELTA/tmp/labuser104917-2387441/gold.deltaStore your Gold Delta Table here.
MODEL_NAMEHF_TWEET_SENTIMENTLoad this production model
HF_MODEL_NAMEfiniteautomata/bertweet-base-sentiment-analysisThe Hugging Face Model for Tweet sentiment classification: https://huggingface.co/finiteautomata/bertweet-base-sentiment-analysis
\n", - "datasetInfos": [], - "metadata": {}, - "removedWidgets": [], - "textData": null, - "type": "htmlSandbox" - } + "inputWidgets": {}, + "nuid": "a44e6213-9e7c-4096-ae0d-e94b89a7639c", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "files = dbutils.fs.ls(TWEET_SOURCE_PATH) " + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 }, - "output_type": "display_data" - }, - { - "output_type": "stream", - "name": "stdout", - "output_type": "stream", - "text": [ - "the includes are included\n" - ] + "inputWidgets": {}, + "nuid": "93a9d87f-ec1c-4f48-b936-6fe8a6fad142", + "showTitle": false, + "title": "" } - ], + }, + "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "print(\"Number of Files:\",len(files))" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -253,16 +267,14 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "2b7624b4-0aa5-46b7-9767-52f462b2c574", + "nuid": "f0d72ef0-831c-4521-a963-8dc649f49871", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 2.0 Use the utility functions to ...\n", - "- Read the source file directory listing\n", - "- Count the source files (how many are there?)\n", - "- print the contents of one of the files" + "display(files)" ] }, { @@ -275,18 +287,19 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "a59749d5-e45d-460f-9be1-33652cda2ea5", + "nuid": "a36ca49b-acb9-457e-82f0-ee8283f0d490", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "first_file_path = files[0].path" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -294,6 +307,22 @@ "rowLimit": 10000 }, "inputWidgets": {}, + "nuid": "3445b7cc-85e6-4d1d-a656-b01b50cfd114", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "dbutils.fs.head(first_file_path)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, "nuid": "1676aed7-527f-4ea7-9d3b-52544e8c1e37", "showTitle": false, "title": "" @@ -321,18 +350,32 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "c4e37574-9885-4908-bf97-609d140a8818", + "nuid": "0536e97d-47e6-40be-a1ee-3f28e5d28f93", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "schema = StructType([\n", + " StructField(\"date\", StringType(), True),\n", + " StructField(\"user\", StringType(), True),\n", + " StructField(\"text\", StringType(), True),\n", + " StructField(\"sentiment\", StringType(), True),\n", + " StructField(\"source\", StringType(), True),\n", + " StructField(\"processing_time\", TimestampType(), True)\n", + "])\n", + "\n", + "raw_data = spark.readStream \\\n", + " .format(\"cloudFiles\") \\\n", + " .option(\"cloudFiles.format\", \"json\") \\\n", + " .schema(schema) \\\n", + " .load(TWEET_SOURCE_PATH)" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -340,18 +383,14 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "9bc3e824-d218-43e5-a43b-4c6dbe31950d", + "nuid": "e3839562-9209-4621-8300-f69c55263d54", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 4.0 Bronze Data Exploratory Data Analysis\n", - "- How many tweets are captured in your Bronze Table?\n", - "- Are there any columns that contain Nan or Null values? If so how many and what will you do in your silver transforms to address this?\n", - "- Count the number of tweets by each unique user handle and sort the data by descending count.\n", - "- How many tweets have at least one mention (@) how many tweet have no mentions (@)\n", - "- Plot a bar chart that shows the top 20 tweeters (users)\n" + "display(raw_data,streamName=\"rawData\")" ] }, { @@ -364,18 +403,19 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "091ec59c-d968-4acf-b56f-cc9178cd0693", + "nuid": "70e2983e-ffd6-49fb-b5fb-00340d2efabb", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "stop_named_stream(spark, \"rawData\")" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -383,18 +423,15 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "38c290b9-a42a-4551-857b-3f461b8a1be6", + "nuid": "74eeeb1f-b01c-4441-b5e8-22854a32c9aa", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 5.0 Transform the Bronze Data to Silver Data using a stream\n", - "- setup a read stream on your bronze delta table\n", - "- setup a write stream to append to the silver delta table\n", - "- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes\n", - "- name your bronze to silver stream as silver_stream\n", - "- transform the bronze data to the silver data using the data definition at the top of the notebook" + "dbutils.fs.rm(BRONZE_DELTA, recurse=True)\n", + "dbutils.fs.rm(BRONZE_CHECKPOINT, recurse=True)" ] }, { @@ -407,18 +444,26 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "65b029cb-ba45-4625-a9c1-d0d410a4962b", + "nuid": "5059baa4-7e34-49a0-8bea-782e1f50d300", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "bronze_stream = raw_data.select(\n", + " \"date\",\n", + " \"user\",\n", + " \"text\",\n", + " \"sentiment\",\n", + " input_file_name().alias(\"datasource\"),\n", + " current_timestamp().alias(\"processing_time\"),\n", + ")" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -426,20 +471,29 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "9ee38aba-e340-4766-8339-71ef2ecfde3a", + "nuid": "f1511fce-94e1-4d7c-bbc8-eb022302c823", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 6.0 Transform the Silver Data to Gold Data using a stream\n", - "- setup a read stream on your silver delta table\n", - "- setup a write stream to append to the gold delta table\n", - "- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes\n", - "- name your silver to gold stream as gold_stream\n", - "- transform the silver data to the gold data using the data definition at the top of the notebook\n", - "- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry\n", - "- Use a spark UDF to parallelize the inference across your silver data" + "(\n", + " bronze_stream.select(\n", + " \"date\",\n", + " \"user\",\n", + " \"text\",\n", + " \"sentiment\",\n", + " \"datasource\",\n", + " \"processing_time\"\n", + " )\n", + " .writeStream.format(\"delta\")\n", + " .outputMode(\"append\")\n", + " .option(\"checkpointLocation\", BRONZE_CHECKPOINT)\n", + " .option(\"mergeSchema\", \"true\")\n", + " .queryName(\"bronze_stream\")\n", + " .start(BRONZE_DELTA)\n", + ")" ] }, { @@ -452,18 +506,19 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "31e8949d-c016-44a7-9f03-65f471dfd22d", + "nuid": "9fe75fcf-3cb0-4c72-a510-9e1506e41e37", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "untilStreamIsReady(\"bronze_stream\")" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -471,18 +526,14 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "0494d8b4-5a55-47fc-af12-47325f34a303", + "nuid": "383e4956-e2b3-4c31-9f05-f610331fca04", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 7.0 Capture the accuracy metrics from the gold table in MLflow\n", - "Store the following in an MLflow experiment run:\n", - "- Store the precision, recall, and F1-score as MLflow metrics\n", - "- Store an image of the confusion matrix as an MLflow artifact\n", - "- Store the mdoel name and the MLflow version that was used as an MLflow parameters\n", - "- Store the version of the Delta Table (input-silver) as an MLflow parameter" + "display(bronze_stream,\"bronze_stream_display\")" ] }, { @@ -495,18 +546,60 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "3bb174ad-67e4-4b0a-be36-ff20a760ee8b", + "nuid": "efe76f9f-2997-43b5-b28f-313789ecc32c", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "display(bronze_table.history())" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a068133a-00b2-4b57-9766-a47a7598be07", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "for stream in spark.streams.active:\n", + " print(stream.name)" ] }, { "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9bc3e824-d218-43e5-a43b-4c6dbe31950d", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 4.0 Bronze Data Exploratory Data Analysis\n", + "- How many tweets are captured in your Bronze Table?\n", + "- Are there any columns that contain Nan or Null values? If so how many and what will you do in your silver transforms to address this?\n", + "- Count the number of tweets by each unique user handle and sort the data by descending count.\n", + "- How many tweets have at least one mention (@) how many tweet have no mentions (@)\n", + "- Plot a bar chart that shows the top 20 tweeters (users)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -514,23 +607,23 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "81e0e2de-b872-4698-81cc-1669646c2183", + "nuid": "091ec59c-d968-4acf-b56f-cc9178cd0693", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 8.0 Application Data Processing and Visualization\n", - "- How many mentions are there in the gold data total?\n", - "- Count the number of neutral, positive and negative tweets for each mention in new columns\n", - "- Capture the total for each mention in a new column\n", - "- Sort the mention count totals in descending order\n", - "- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)\n", - "- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)\n", - "\n", - "You may want to use the \"Loop Application\" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.\n", + "# ENTER YOUR CODE HERE\n", + "# Create a Spark session with Delta support\n", + "spark = SparkSession.builder \\\n", + " .appName(\"DeltaTableExploration\") \\\n", + " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\") \\\n", + " .getOrCreate()\n", "\n", - "*note: A mention is a specific twitter user that has been \"mentioned\" in a tweet with an @user reference." + "# Load the Delta table\n", + "bronze_df = spark.read.format(\"delta\").load(BRONZE_DELTA)\n" ] }, { @@ -543,18 +636,20 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "8250667b-eaa5-48a1-90c7-bfe11e666e32", + "nuid": "9723821d-bcf4-458d-aa96-91a3991e87ca", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "total_tweets = bronze_df.count()\n", + "print(f\"Total number of tweets captured: {total_tweets}\")" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -562,16 +657,17 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "5d5add3f-0c13-4d81-86d8-12cf3cfdd2c8", + "nuid": "cef82d36-12d7-4c1b-91e6-490b1a11867d", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 9.0 Clean up and completion of your pipeline\n", - "- using the utilities what streams are running? If any.\n", - "- Stop all active streams\n", - "- print out the elapsed time of your notebook." + "nan_null_stats = bronze_df.select([\n", + " count(when(col(c).isNull(), c)).alias(c) for c in bronze_df.columns\n", + "])\n", + "nan_null_stats.show()" ] }, { @@ -584,14 +680,15 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "0304e685-c26e-447d-b230-c37266f30003", + "nuid": "cf7b3011-51c7-4a8c-8c4f-b9066e84d917", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# ENTER YOUR CODE HERE" + "tweets_per_user = bronze_df.groupBy(\"user\").count().orderBy(desc(\"count\"))\n", + "tweets_per_user.show()" ] }, { @@ -604,19 +701,23 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "ea759fc8-1d78-4d6d-b571-e2e256448b76", + "nuid": "c981180f-9747-40cb-acd6-c6c57d1b9693", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.\n", - "END_TIME = time.time()" + "mentions = bronze_df.filter(bronze_df['text'].contains(\"@\")).count()\n", + "no_mentions = total_tweets - mentions\n", + "\n", + "print(f\"Number of tweets with at least one mention: {mentions}\")\n", + "print(f\"Number of tweets with no mentions: {no_mentions}\")" ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -624,32 +725,44 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "b88fc0e5-f50c-4ed1-9480-16a4c8a44b30", + "nuid": "65583e22-1671-45ee-a981-91b9885979e8", "showTitle": false, "title": "" } }, + "outputs": [], "source": [ - "## 10.0 How Optimized is your Spark Application (Grad Students Only)\n", - "Graduate students (registered for the DSCC-402 section of the course) are required to do this section. This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.\n", - "Recall that Spark Optimization has 5 significant dimensions of considertation:\n", - "- Spill: write to executor disk due to lack of memory\n", - "- Skew: imbalance in partition size\n", - "- Shuffle: network io moving data between executors (wide transforms)\n", - "- Storage: inefficiency due to disk storage format (small files, location)\n", - "- Serialization: distribution of code segments across the cluster\n", - "\n", - "Comment on each of the dimentions of performance and how your impelementation is or is not being affected. Use specific information in the Spark UI to support your description. \n", + "# Convert to Pandas DataFrame for plotting\n", + "top_tweeters = tweets_per_user.limit(20).toPandas()\n", "\n", - "Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)\n", - "\n", - "References:\n", - "- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)\n", - "- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)" + "# Plotting\n", + "plt.figure(figsize=(10, 8))\n", + "plt.bar(top_tweeters['user'], top_tweeters['count'], color='blue')\n", + "plt.xlabel('User Handles')\n", + "plt.ylabel('Number of Tweets')\n", + "plt.title('Top 20 Tweeters')\n", + "plt.xticks(rotation=90)\n", + "plt.show()\n" ] }, { "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "741e939a-5c84-4c48-ab39-645dc3b0acce", + "showTitle": false, + "title": "" + } + }, + "source": [ + "Cleaning the Bronze data to stream it into Silver Data" + ] + }, + { + "cell_type": "code", + "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { @@ -657,6 +770,1196 @@ "rowLimit": 10000 }, "inputWidgets": {}, + "nuid": "a3011172-de1c-42a1-9336-2e73f86e1f03", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import col\n", + "\n", + "# Filter the DataFrame to include only rows where the 'text' column contains '@'\n", + "processed_bronze_df = bronze_df.filter(col(\"text\").contains(\"@\"))\n", + "\n", + "# Optionally, you can show some of the results to verify\n", + "processed_bronze_df.show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "implicitDf": true, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9131702d-1435-4468-b609-c7addf581d2c", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%sql\n", + "OPTIMIZE BRONZE_DELTA" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "38c290b9-a42a-4551-857b-3f461b8a1be6", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 5.0 Transform the Bronze Data to Silver Data using a stream\n", + "- setup a read stream on your bronze delta table\n", + "- setup a write stream to append to the silver delta table\n", + "- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes\n", + "- name your bronze to silver stream as silver_stream\n", + "- transform the bronze data to the silver data using the data definition at the top of the notebook" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "897c7d5a-13f6-4e6c-929f-32177b39386c", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "nltk.download('stopwords')\n", + "stop_words = set(stopwords.words('english'))\n", + "\n", + "spark = SparkSession.builder.appName(\"Bronze to Silver Transformation\").getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "fe917139-0ced-49ce-a613-62f2b840d647", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "bronze_df = spark.readStream \\\n", + " .format(\"delta\") \\\n", + " .option(\"ignoreChanges\", \"true\") \\\n", + " .load(BRONZE_DELTA)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1de2ec8a-4cea-4d92-ad0b-443164b5a24b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "!pip uninstall spacy -y\n", + "!pip install spacy\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "5513d265-01d6-44a1-a03e-86613ad4663b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "63744447-b737-48fb-a0f1-208221c867af", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import nltk\n", + "from nltk.stem import WordNetLemmatizer\n", + "from nltk.corpus import stopwords\n", + "import string\n", + "\n", + "def clean_text(text):\n", + " # Initialize the lemmatizer\n", + " nltk.download('punkt')\n", + " nltk.download('wordnet')\n", + " nltk.download('stopwords')\n", + " lemmatizer = WordNetLemmatizer()\n", + " \n", + " # Get English stopwords\n", + " stop_words = set(stopwords.words('english'))\n", + " \n", + " # Remove punctuation\n", + " text = text.translate(str.maketrans('', '', string.punctuation))\n", + " \n", + " # Lemmatize words, remove stopwords and extra spaces\n", + " # Tokenize the text, lemmatize each word, and remove stopwords\n", + " words = nltk.word_tokenize(text)\n", + " lemmatized_text = ' '.join([lemmatizer.lemmatize(word).lower() for word in words if word.lower() not in stop_words])\n", + " \n", + " return lemmatized_text\n", + "\n", + "clean_text_udf = udf(clean_text, StringType())\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "39962189-d8f8-420a-898f-0bfef8ceca91", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "def extract_mentions(text):\n", + " # Find all @mentions\n", + " mentions = ' '.join(set(part[1:] for part in text.split() if part.startswith('@')))\n", + " return mentions\n", + "\n", + "extract_mentions_udf = udf(extract_mentions, StringType())\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c7482b80-5e36-4aab-96a1-9c8ab5437970", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.sql.legacy.timeParserPolicy\", \"LEGACY\")\n", + "silver_df = bronze_df.withColumn(\"mention\", extract_mentions_udf(col(\"text\"))) \\\n", + " .filter(length(col(\"mention\")) > 0) \\\n", + " .withColumn(\"cleaned_text\", clean_text_udf(col(\"text\"))) \\\n", + " .withColumn(\"timestamp\", to_timestamp(\"date\", \"EEE MMM dd HH:mm:ss zzz yyyy\")) \\\n", + " .select(\n", + " col(\"timestamp\"),\n", + " col(\"mention\"),\n", + " col(\"cleaned_text\"),\n", + " col(\"sentiment\")\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "94e1c368-90c0-4f14-8c6e-68fe9c38528f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Sun Apr 19 07:34:33 PDT 2009" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "429cec03-78ab-4804-b67a-6b63a6ab20f9", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "dbutils.fs.rm(SILVER_DELTA, recurse=True)\n", + "dbutils.fs.rm(SILVER_CHECKPOINT, recurse=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b7f3b24a-0eab-4d0d-abd4-ee5d59bc7c80", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "(silver_df.writeStream\n", + " .format(\"delta\")\n", + " .outputMode(\"append\")\n", + " .option(\"checkpointLocation\", SILVER_CHECKPOINT)\n", + " .start(SILVER_DELTA))" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ac3bc743-4215-4161-86a4-8efd8996b67e", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "display(silver_df,streamName='SilverStreamDisplay')" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "implicitDf": true, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "960326b6-88f6-4729-9593-5da1c7e593d2", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%sql\n", + "OPTIMIZE SILVER_DELTA" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "789ff4ab-e7e2-4f7c-ae7e-c2635099d6f7", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "silver_df_ini = spark.read.format(\"delta\").load(SILVER_DELTA)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c5af16ec-2b05-49b9-9a16-93b52d64bf92", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "silver_df_ini = silver_df_ini.toPandas()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9ee38aba-e340-4766-8339-71ef2ecfde3a", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 6.0 Transform the Silver Data to Gold Data using a stream\n", + "- setup a read stream on your silver delta table\n", + "- setup a write stream to append to the gold delta table\n", + "- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes\n", + "- name your silver to gold stream as gold_stream\n", + "- transform the silver data to the gold data using the data definition at the top of the notebook\n", + "- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry\n", + "- Use a spark UDF to parallelize the inference across your silver data" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "31e8949d-c016-44a7-9f03-65f471dfd22d", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# ENTER YOUR CODE HERE\n", + "import mlflow\n", + "from mlflow.pyfunc import PyFuncModel\n", + "from pyspark.sql.functions import udf\n", + "from pyspark.sql.types import StringType, DoubleType, StructType, StructField,IntegerType\n", + "import pandas" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "691e7954-9de5-4ffa-a0f5-9935f9b53823", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Assuming the model is registered under the name 'HF_TWEET_SENTIMENT'\n", + "model_name = \"HF_TWEET_SENTIMENT\"\n", + "\n", + "# Load the model from MLflow\n", + "model_uri = f\"models:/{model_name}/Production\"\n", + "sentiment_model = mlflow.pyfunc.load_model(model_uri)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9042291c-02e1-4a6d-a892-8ac44a4ae89b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "df = pd.DataFrame(silver_df_ini['cleaned_text'])\n", + "df.columns = ['text']\n", + "results = sentiment_model.predict(df['text'])" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "42907dc9-9ecd-4e8f-ac7a-7f3f3f26ecb4", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "results" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "fc304b91-6adb-49ef-8430-a4bf29d580d5", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import numpy as np\n", + "silver_df_ini['predicted_score'] = results['score']\n", + "silver_df_ini['predicted_sentiment'] = results['label']\n", + "silver_df_ini['predicted_sentiment_id'] = np.where(silver_df_ini['predicted_sentiment'] == 'POS', 1, 0)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b7fa0688-8c4f-4839-bfa0-e887a6081757", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "silver_df_ini.head(20)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "dd05ac11-7b6d-4969-b45c-110765eabe47", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "spark_df_silver = spark.createDataFrame(silver_df_ini)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d240854c-dba3-4da3-96bd-c3f749cd3d17", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "silver_delta_preprocessed_path = '/tmp/labuser104917-3110160/silver_delta_preprocessed.delta'\n", + "spark_df_silver.write.format(\"delta\").mode(\"overwrite\").save(silver_delta_preprocessed_path)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2801e278-a5b2-4834-bd0d-961c2b228d21", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# dbutils.fs.rm(GOLD_DELTA, recurse=True)\n", + "# dbutils.fs.rm(GOLD_CHECKPOINT, recurse=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1c99c21e-6344-4a31-830e-2333a865619d", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "silver_df_processed_stream = spark.readStream \\\n", + ".format(\"delta\") \\\n", + ".option(\"ignoreChanges\", \"true\") \\\n", + ".load(silver_delta_preprocessed_path)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2fece3b6-74dc-49c8-93f7-8aef78366f19", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import udf\n", + "from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2d27b18f-a2d0-4195-8874-508b6ddf1938", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# spark_df_silver = spark.readStream \\\n", + "# .format(\"delta\") \\\n", + "# .option(\"ignoreChanges\", \"true\") \\\n", + "# .load(SILVER_DELTA)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "be84b6de-8cae-445f-a370-62e3d008aa5c", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# schema = StructType([\n", + "# StructField(\"predicted_score\", DoubleType(), False),\n", + "# StructField(\"predicted_sentiment\", StringType(), False),\n", + "# StructField(\"predicted_sentiment_id\", IntegerType(), False)\n", + "# ])" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8c35a0a1-a4a4-468f-af86-faad35363d49", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "gold_df = silver_df_processed_stream.select(\n", + " col(\"timestamp\"),\n", + " col(\"mention\"),\n", + " col(\"cleaned_text\"),\n", + " col(\"sentiment\"), # Original sentiment column\n", + " col(\"predicted_score\").cast(\"double\"),\n", + " col(\"predicted_sentiment\"),\n", + " col(\"predicted_sentiment_id\").cast(\"integer\")\n", + " )\\\n", + " .withColumn(\"sentiment_id\", \n", + " when(col(\"sentiment\") == \"positive\", 1).otherwise(0))\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b26aab94-08cd-467a-b170-c25ec5a68cd1", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "stream_query = gold_df.writeStream \\\n", + " .format(\"delta\") \\\n", + " .outputMode(\"append\") \\\n", + " .option(\"checkpointLocation\", GOLD_CHECKPOINT) \\\n", + " .option(\"path\", GOLD_DELTA) \\\n", + " .queryName(\"gold_stream\") \\\n", + " .start()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "13c00a00-6557-422a-87ec-ea3baef69877", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "display(gold_df,streamName=\"Gold_stream\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "601566e4-baeb-4dd5-b4db-9569608976ad", + "showTitle": false, + "title": "" + } + }, + "source": [] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0494d8b4-5a55-47fc-af12-47325f34a303", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 7.0 Capture the accuracy metrics from the gold table in MLflow\n", + "Store the following in an MLflow experiment run:\n", + "- Store the precision, recall, and F1-score as MLflow metrics\n", + "- Store an image of the confusion matrix as an MLflow artifact\n", + "- Store the mdoel name and the MLflow version that was used as an MLflow parameters\n", + "- Store the version of the Delta Table (input-silver) as an MLflow parameter" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "caa22752-8d85-4e9b-ab0c-7b11671b5a7e", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import mlflow\n", + "from mlflow import log_metric, log_param, log_artifact\n", + "from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8250667b-eaa5-48a1-90c7-bfe11e666e32", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# ENTER YOUR CODE HERE\n", + "gold_df_table = spark.read.format('delta').load(GOLD_DELTA)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3bb174ad-67e4-4b0a-be36-ff20a760ee8b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# ENTER YOUR CODE HERE\n", + "\n", + "# mlflow.set_experiment('/Shared/Exp3_09-05-2024')" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f515fbf3-ef50-46b9-b574-9343911477a5", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "mlflow.end_run()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "517f8f7a-2717-41f9-9b5a-5ee87685ddd7", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import mlflow\n", + "from delta.tables import DeltaTable\n", + "import matplotlib.pyplot as plt\n", + "import seaborn as sns\n", + "\n", + "# Assuming `gold_df` is available and ready\n", + "gold_df_plot = spark.read.format(\"delta\").load(GOLD_DELTA) # Replace with your table's name if different\n", + "\n", + "# Start an MLflow run\n", + "mlflow.set_experiment('/Shared/Exp3_09-05-2024')\n", + "with mlflow.start_run():\n", + " # Convert the necessary columns to pandas for metric calculation\n", + " pandas_df = gold_df_plot.select(\"sentiment_id\", \"predicted_sentiment_id\").toPandas()\n", + " y_true = pandas_df['sentiment_id']\n", + " y_pred = pandas_df['predicted_sentiment_id']\n", + "\n", + " # Calculate metrics\n", + " precision = precision_score(y_true, y_pred, average='macro')\n", + " recall = recall_score(y_true, y_pred, average='macro')\n", + " f1 = f1_score(y_true, y_pred, average='macro')\n", + "\n", + " \n", + " # Log metrics\n", + " mlflow.log_metric(\"precision\", precision)\n", + " mlflow.log_metric(\"recall\", recall)\n", + " mlflow.log_metric(\"f1_score\", f1)\n", + "\n", + " # Generate and save confusion matrix\n", + " from sklearn.metrics import confusion_matrix\n", + " cm = confusion_matrix(y_true, y_pred)\n", + " plt.figure(figsize=(10, 7))\n", + " sns.heatmap(cm, annot=True, fmt='g', cmap='Blues')\n", + " plt.xlabel('Predicted')\n", + " plt.ylabel('Actual')\n", + " plt.title('Confusion Matrix')\n", + " plt.savefig(\"confusion_matrix.png\")\n", + " plt.close()\n", + "\n", + " # Log the confusion matrix as an MLflow artifact\n", + " mlflow.log_artifact(\"confusion_matrix.png\")\n", + "\n", + " # Log model and version details\n", + " mlflow.log_param(\"model_name\", \"HF_TWEET_SENTIMENT\")\n", + " mlflow.log_param(\"mlflow_version\", mlflow.__version__)\n", + "\n", + " # Log the version of the Delta Table\n", + " delta_table = DeltaTable.forPath(spark, SILVER_DELTA)\n", + " version = delta_table.history(1).select(\"version\").collect()[0][\"version\"]\n", + " mlflow.log_param(\"delta_table_version\", version)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "81e0e2de-b872-4698-81cc-1669646c2183", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 8.0 Application Data Processing and Visualization\n", + "- How many mentions are there in the gold data total?\n", + "- Count the number of neutral, positive and negative tweets for each mention in new columns\n", + "- Capture the total for each mention in a new column\n", + "- Sort the mention count totals in descending order\n", + "- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)\n", + "- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)\n", + "\n", + "You may want to use the \"Loop Application\" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.\n", + "\n", + "*note: A mention is a specific twitter user that has been \"mentioned\" in a tweet with an @user reference." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "649040cf-282c-4b42-8528-b3342740f25d", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql import functions as F\n", + "\n", + "# Group by 'mention' and pivot on 'predicted_sentiment' to count occurrences of each sentiment\n", + "result_df = gold_df_table.groupBy(\"mention\").pivot(\"predicted_sentiment\", [\"POS\", \"NEU\", \"NEG\"]).count()\n", + "\n", + "# Calculate the total mentions for each user\n", + "result_df = result_df.withColumn(\"total_mentions\", F.col(\"POS\") + F.col(\"NEU\") + F.col(\"NEG\"))\n", + "\n", + "# Order by total_mentions in descending order\n", + "result_df = result_df.orderBy(F.col(\"total_mentions\").desc())\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "7f89ca6d-bf8b-4f21-af2c-9b5bf6dd83bb", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "result_pd = result_df.toPandas()" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9efcffa2-3f48-4d24-8f7b-cb1271748a6d", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "# Plot for positive sentiments\n", + "top_positive = result_pd.nlargest(20, 'POS')\n", + "plt.figure(figsize=(10, 8))\n", + "plt.barh(top_positive['mention'], top_positive['POS'], color='green')\n", + "plt.xlabel('Count of Positive Mentions')\n", + "plt.ylabel('Mentions')\n", + "plt.title('Top 20 Mentions with Positive Sentiment')\n", + "plt.gca().invert_yaxis() # Invert y-axis to have the highest value at the top\n", + "plt.show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f344661b-df26-4ad2-8742-81777dccba4b", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Plot for negative sentiments\n", + "top_negative = result_pd.nlargest(20, 'NEG')\n", + "plt.figure(figsize=(10, 8))\n", + "plt.barh(top_negative['mention'], top_negative['NEG'], color='red')\n", + "plt.xlabel('Count of Negative Mentions')\n", + "plt.ylabel('Mentions')\n", + "plt.title('Top 20 Mentions with Negative Sentiment')\n", + "plt.gca().invert_yaxis() # Invert y-axis to have the highest value at the top\n", + "plt.show()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "5d5add3f-0c13-4d81-86d8-12cf3cfdd2c8", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 9.0 Clean up and completion of your pipeline\n", + "- using the utilities what streams are running? If any.\n", + "- Stop all active streams\n", + "- print out the elapsed time of your notebook." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0304e685-c26e-447d-b230-c37266f30003", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# ENTER YOUR CODE HERE" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ea759fc8-1d78-4d6d-b571-e2e256448b76", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.\n", + "END_TIME = time.time()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "b88fc0e5-f50c-4ed1-9480-16a4c8a44b30", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## 10.0 How Optimized is your Spark Application (Grad Students Only)\n", + "Graduate students (registered for the DSCC-402 section of the course) are required to do this section. This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.\n", + "Recall that Spark Optimization has 5 significant dimensions of considertation:\n", + "- Spill: write to executor disk due to lack of memory\n", + "- Skew: imbalance in partition size\n", + "- Shuffle: network io moving data between executors (wide transforms)\n", + "- Storage: inefficiency due to disk storage format (small files, location)\n", + "- Serialization: distribution of code segments across the cluster\n", + "\n", + "Comment on each of the dimentions of performance and how your impelementation is or is not being affected. Use specific information in the Spark UI to support your description. \n", + "\n", + "Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)\n", + "\n", + "References:\n", + "- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)\n", + "- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, "nuid": "a0a5707d-a5ca-4e87-9e74-3b13d00337b0", "showTitle": false, "title": "" @@ -673,7 +1976,7 @@ "language": "python", "notebookMetadata": { "mostRecentlyExecutedCommandWithImplicitDF": { - "commandId": -1, + "commandId": 205537447457728, "dataframes": [ "_sqldf" ] @@ -697,6 +2000,7 @@ "clear_previous_run": { "currentValue": "No", "nuid": "11b43403-c70f-41ef-ab0e-41172583baaa", + "typedWidgetInfo": null, "widgetInfo": { "widgetType": "dropdown", "defaultValue": "No", @@ -714,7 +2018,8 @@ }, "optimize_tables": { "currentValue": "No", - "nuid": "d27f2950-3146-4010-8bc6-58c47a93e7a9", + "nuid": "76f3e017-31cb-4d59-9051-91851bfd376f", + "typedWidgetInfo": null, "widgetInfo": { "widgetType": "dropdown", "defaultValue": "No", @@ -732,7 +2037,8 @@ }, "stop_streams": { "currentValue": "No", - "nuid": "d70a2190-266e-473c-86a7-84f7bf56b386", + "nuid": "3bb3c3cb-e55d-437e-af0e-594116a56fbc", + "typedWidgetInfo": null, "widgetInfo": { "widgetType": "dropdown", "defaultValue": "No", diff --git a/final_project/confusion_matrix.png b/final_project/confusion_matrix.png new file mode 100644 index 0000000..fbd414e Binary files /dev/null and b/final_project/confusion_matrix.png differ diff --git a/final_project/includes/utilities.ipynb b/final_project/includes/utilities.ipynb index 96bbe60..25ab9af 100644 --- a/final_project/includes/utilities.ipynb +++ b/final_project/includes/utilities.ipynb @@ -85,7 +85,16 @@ " started = True\n", " count += 1\n", " time.sleep(10)\n", - " return started \n", + " return started \n", + "\n", + "\n", + "def untilStreamIsReady(namedStream: str, progressions: int = 3) -> bool:\n", + " queries = list(filter(lambda query: query.name == namedStream, spark.streams.active))\n", + " while len(queries) == 0 or len(queries[0].recentProgress) < progressions:\n", + " time.sleep(5)\n", + " queries = list(filter(lambda query: query.name == namedStream, spark.streams.active))\n", + " print(\"The stream {} is active and ready.\".format(namedStream))\n", + " return True \n", "\n" ] }