Skip to content

Commit 6622781

Browse files
committed
Merge branch 'master' of https://github.com/Microsoft/azure-docs-pr into githubissue29875
2 parents 7c8bfb2 + d543641 commit 6622781

File tree

1 file changed

+42
-49
lines changed

1 file changed

+42
-49
lines changed

articles/cognitive-services/Anomaly-Detector/tutorials/anomaly-detection-streaming-databricks.md

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ ms.author: aahi
1414

1515
# Tutorial: Anomaly detection on streaming data using Azure Databricks
1616

17-
Microsoft Power BI Desktop is a free application that lets you connect to, transform, and visualize your data. The Anomaly Detector API, part of Azure Cognitive Services, provides a way of monitoring your time series data. Use this tutorial to run anomaly detection on a stream of data in near real-time using Azure Databricks. You'll ingest twitter data using Azure Event Hubs, and import them into Azure Databricks using the Spark Event Hubs connector. Afterwards, you'll use the API to detect anomalies on the streamed data.
17+
[Azure Databricks](https://azure.microsoft.com/en-us/services/databricks/) is a fast, easy, and collaborative Apache Spark–based analytics service. The Anomaly Detector API, part of Azure Cognitive Services, provides a way of monitoring your time series data. Use this tutorial to run anomaly detection on a stream of data in near real-time using Azure Databricks. You'll ingest twitter data using Azure Event Hubs, and import them into Azure Databricks using the Spark Event Hubs connector. Afterwards, you'll use the API to detect anomalies on the streamed data.
1818

1919
The following illustration shows the application flow:
2020

@@ -75,7 +75,7 @@ In this section, you create an Azure Databricks workspace using the [Azure porta
7575

7676
Select **Create**.
7777

78-
4. The account creation takes a few minutes.
78+
4. The workspace creation takes a few minutes.
7979

8080
## Create a Spark cluster in Databricks
8181

@@ -95,7 +95,8 @@ In this section, you create an Azure Databricks workspace using the [Azure porta
9595
* For this article, create a cluster with **5.2** runtime. Do NOT select **5.3** runtime.
9696
* Make sure the **Terminate after \_\_ minutes of inactivity** checkbox is selected. Provide a duration (in minutes) to terminate the cluster, if the cluster isn't being used.
9797

98-
Select **Create cluster**. Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.
98+
Select **Create cluster**.
99+
4. The cluster creation takes several minutes. Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.
99100

100101
## Create a Twitter application
101102

@@ -123,7 +124,7 @@ In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. You als
123124

124125
![Add library dialog box](../media/tutorials/databricks-add-library-option.png "Add library dialog box")
125126

126-
2. In the New Library page, for **Source** select **Maven Coordinate**. For **Coordinate**, enter the coordinate for the package you want to add. Here is the Maven coordinates for the libraries used in this tutorial:
127+
2. In the New Library page, for **Source** select **Maven**. For **Coordinates**, enter the coordinate for the package you want to add. Here is the Maven coordinates for the libraries used in this tutorial:
127128

128129
* Spark Event Hubs connector - `com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10`
129130
* Twitter API - `org.twitter4j:twitter4j-core:4.0.7`
@@ -168,26 +169,22 @@ In this tutorial, you use the [Azure Cognitive Services Anomaly Detector APIs](.
168169

169170
Select **Create**.
170171

171-
5. After the resource is created, from the **Overview** tab, select **Show access keys**.
172+
5. After the resource is created, from the **Overview** tab, copy and save the **Endpoint** URL, as shown in the screenshot. Then select **Show access keys**.
172173

173174
![Show access keys](../media/tutorials/cognitive-services-get-access-keys.png "Show access keys")
174175

175-
Also, copy a part of the endpoint URL, as shown in the screenshot. You need this URL in the tutorial.
176-
177-
6. Under **Keys**, select the copy icon against the key you want to use.
176+
6. Under **Keys**, select the copy icon against the key you want to use. Save the access key.
178177

179178
![Copy access keys](../media/tutorials/cognitive-services-copy-access-keys.png "Copy access keys")
180179

181-
7. Save the values for the endpoint URL and the access key, you retrieved in this step. You need it later in this tutorial.
182-
183180
## Create notebooks in Databricks
184181

185182
In this section, you create two notebooks in Databricks workspace with the following names
186183

187184
- **SendTweetsToEventHub** - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
188185
- **AnalyzeTweetsFromEventHub** - A consumer notebook you use to read the tweets from Event Hubs and run anomaly detection.
189186

190-
1. In the left pane, select **Workspace**. From the **Workspace** drop-down, select **Create**, and then select **Notebook**.
187+
1. In the Azure Databricks workspace, select **Workspace** from the left pane. From the **Workspace** drop-down, select **Create**, and then select **Notebook**.
191188

192189
![Create notebook in Databricks](../media/tutorials/databricks-create-notebook.png "Create notebook in Databricks")
193190

@@ -201,7 +198,7 @@ In this section, you create two notebooks in Databricks workspace with the follo
201198

202199
## Send tweets to Event Hubs
203200

204-
In the **SendTweetsToEventHub** notebook, paste the following code, and replace the placeholder with values for your Event Hubs namespace and Twitter application that you created earlier. This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.
201+
In the **SendTweetsToEventHub** notebook, paste the following code, and replace the placeholder with values for your Event Hubs namespace and Twitter application that you created earlier. This notebook extracts creation time and number of "Like"s from tweets with the keyword "Azure" and stream those as events into Event Hubs in real time.
205202

206203
```scala
207204
//
@@ -298,7 +295,7 @@ eventHubClient.get().close()
298295
pool.shutdown()
299296
```
300297

301-
To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the following snippet. Each event in the output is a tweet that is ingested into the Event Hubs.
298+
To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the following snippet. Each event in the output is a combination of timestamp and number of "Like"s ingested into the Event Hubs.
302299

303300
Sent event: {"timestamp":"2019-04-24T09:39:40.000Z","favorite":0}
304301

@@ -321,7 +318,7 @@ To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the
321318

322319
## Read tweets from Event Hubs
323320

324-
In the **AnalyzeTweetsFromEventHub** notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. This notebook reads the tweets that you earlier streamed into Event Hubs using the **SendTweetsToEventHub** notebook.
321+
In the **AnalyzeTweetsFromEventHub** notebook, paste the following code, and replace the placeholder with values for your Anomaly Detector resource that you created earlier. This notebook reads the tweets that you earlier streamed into Event Hubs using the **SendTweetsToEventHub** notebook.
325322

326323
First, write a client to call Anomaly detector.
327324
```scala
@@ -383,7 +380,7 @@ object AnomalyDetector extends Serializable {
383380
return response.toString()
384381
}
385382

386-
// Calls the Latest Point Detection API for timeserie.
383+
// Calls the Latest Point Detection API.
387384
def detectLatestPoint(series: Series): Option[AnomalySingleResponse] = {
388385
try {
389386
println("Process Timestamp: " + series.series.apply(series.series.length-1).timestamp.toString + ", size: " + series.series.length)
@@ -402,7 +399,7 @@ object AnomalyDetector extends Serializable {
402399
}
403400
}
404401

405-
// Calls the Batch Detection API for timeserie.
402+
// Calls the Batch Detection API.
406403
def detectBatch(series: Series): Option[AnomalyBatchResponse] = {
407404
try {
408405
val response = processUsingApi(gson.toJson(series), batchDetectionUrl)
@@ -421,7 +418,7 @@ object AnomalyDetector extends Serializable {
421418
}
422419
```
423420

424-
To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the following snippet. :
421+
To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the following snippet.
425422

426423
import java.io.{BufferedReader, DataOutputStream, InputStreamReader}
427424
import java.net.URL
@@ -443,10 +440,9 @@ Then prepare an aggregation function for future usage.
443440
import org.apache.spark.sql.Row
444441
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
445442
import org.apache.spark.sql.types.{StructType, TimestampType, FloatType, MapType, BooleanType, DataType}
446-
//import org.apache.spark.sql.functions._
447443
import scala.collection.immutable.ListMap
448444

449-
class AnomalyDetectorAggregationFunction_Hourly extends UserDefinedAggregateFunction {
445+
class AnomalyDetectorAggregationFunction extends UserDefinedAggregateFunction {
450446
override def inputSchema: StructType = new StructType().add("timestamp", TimestampType).add("value", FloatType)
451447

452448
override def bufferSchema: StructType = new StructType().add("point", MapType(TimestampType, FloatType))
@@ -478,8 +474,8 @@ class AnomalyDetectorAggregationFunction_Hourly extends UserDefinedAggregateFunc
478474

479475

480476
// 0.25 is maxAnomalyRatio. It represents 25%, max anomaly ratio in a time series.
481-
// 95 is the sensitivity of the algorithms.
482-
// Check Anomaly detector API reference (https://westus2.dev.cognitive.microsoft.com/docs/services/AnomalyDetector/operations/post-timeseries-last-detect)
477+
// 95 is the sensitivity of the algorithms.
478+
// Check Anomaly detector API reference (https://aka.ms/anomaly-detector-rest-api-ref)
483479

484480
val series: Series = new Series(detect_points.toArray, 0.25, 95, "hourly")
485481
val response: Option[AnomalySingleResponse] = AnomalyDetector.detectLatestPoint(series)
@@ -494,15 +490,15 @@ class AnomalyDetectorAggregationFunction_Hourly extends UserDefinedAggregateFunc
494490

495491
```
496492

497-
To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the following snippet.
493+
To run the notebook, press **SHIFT + ENTER**. You see an output as shown in the following snippet.
498494

499495
import org.apache.spark.sql.Row
500496
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
501497
import org.apache.spark.sql.types.{StructType, TimestampType, FloatType, MapType, BooleanType, DataType}
502498
import scala.collection.immutable.ListMap
503499
defined class AnomalyDetectorAggregationFunction
504500

505-
Then load data from event hub for anomaly detection.
501+
Then load data from event hub for anomaly detection. Replace the placeholder with values for your Azure Event Hubs that you created earlier.
506502

507503
```scala
508504
//
@@ -540,7 +536,7 @@ display(msgStream)
540536

541537
```
542538

543-
The output now resembles the following image. Pay attention to that your date in the table might be different from the date in this tutorial as the data is real time.
539+
The output now resembles the following image. Note that your date in the table might be different from the date in this tutorial as the data is real time.
544540
![Load Data From Event hub](../media/tutorials/load-data-from-eventhub.png "Load Data From Event Hub")
545541

546542
You have now streamed data from Azure Event Hubs into Azure Databricks at near real time using the Event Hubs connector for Apache Spark. For more information on how to use the Event Hubs connector for Spark, see the [connector documentation](https://github.com/Azure/azure-event-hubs-spark/tree/master/docs).
@@ -580,6 +576,8 @@ groupTime average
580576
```
581577

582578
Then get the aggregated output result to Delta. Because anomaly detection requires a longer history window, we're using Delta to keep the history data for the point you want to detect.
579+
Replace the "[Placeholder: table name]" with a qualified Delta table name to be created (for example, "tweets"). Replace "[Placeholder: folder name for checkpoints]" with a string value that's unique each time you run this code (for example, "etl-from-eventhub-20190605").
580+
To learn more about Delta Lake on Azure Databricks, please refer to [Delta Lake Guide](https://docs.azuredatabricks.net/delta/index.html)
583581

584582

585583
```scala
@@ -595,6 +593,7 @@ groupStream.writeStream
595593

596594
```
597595

596+
Replace the "[Placeholder: table name]" with the same Delta table name you've selected above.
598597
```scala
599598
//
600599
// Show Aggregate Result
@@ -621,26 +620,35 @@ groupTime average
621620
622621
```
623622

624-
Now the aggregated time series data is continuously ingested into the Delta. Then you can schedule a job every hour to detect the anomaly of latest point.
623+
Now the aggregated time series data is continuously ingested into the Delta. Then you can schedule an hourly job to detect the anomaly of latest point.
624+
Replace the "[Placeholder: table name]" with the same Delta table name you've selected above.
625625

626626
```scala
627627
//
628-
// Anomaly Detection with Batch query
628+
// Anomaly Detection
629629
//
630630

631631
import java.time.Instant
632+
import java.time.format.DateTimeFormatter
633+
import java.time.ZoneOffset
632634
import java.time.temporal.ChronoUnit
633635

634636
val detectData = spark.read.format("delta").table("[Placeholder: table name]")
635637

636-
// How long history you want to use in anomaly detection. It is hourly time series in this tutorial, so 72 means 72 hours.
637-
val batchSize = 72
638+
// You could use Databricks to schedule an hourly job and always monitor the latest data point
639+
// Or you could specify a const value here for testing purpose
640+
// For example, val endTime = Instant.parse("2019-04-16T00:00:00Z")
641+
val endTime = Instant.now()
638642

639-
// Change the endTime to where you want to detect. You could use Databricks to schedule a job and change it to the latest hour.
640-
val endTime = Instant.parse("2019-04-16T00:00:00Z")
643+
// This is when your input of anomaly detection starts. It is hourly time series in this tutorial, so 72 means 72 hours ago from endTime.
644+
val batchSize = 72
641645
val startTime = endTime.minus(batchSize, ChronoUnit.HOURS)
642646

643-
val series = detectData.filter($"groupTime" < endTime.toString && $"groupTime" >= startTime.toString).sort($"groupTime")
647+
val DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC);
648+
649+
val series = detectData.filter($"groupTime" <= DATE_TIME_FORMATTER.format(endTime))
650+
.filter($"groupTime" > DATE_TIME_FORMATTER.format(startTime))
651+
.sort($"groupTime")
644652

645653
series.createOrReplaceTempView("series")
646654

@@ -649,7 +657,7 @@ series.createOrReplaceTempView("series")
649657
// Register the function to access it
650658
spark.udf.register("anomalydetect", new AnomalyDetectorAggregationFunction)
651659

652-
val adResult = spark.sql("SELECT '" + endTime.toString + "' as timestamp, anomalydetect(groupTime, average) as anomaly FROM series")
660+
val adResult = spark.sql("SELECT '" + endTime.toString + "' as datetime, anomalydetect(groupTime, average) as anomaly FROM series")
653661
adResult.show()
654662
```
655663
Result as below:
@@ -661,28 +669,13 @@ Result as below:
661669
|2019-04-16T00:00:00Z| false|
662670
+--------------------+-------+
663671
664-
```
665-
Output the anomaly detection result back to the Delta.
666-
```scala
667-
//
668-
// Output Batch AD Result to delta
669-
//
670-
671-
adResult.writeStream
672-
.format("delta")
673-
.outputMode("complete")
674-
.option("checkpointLocation", "/delta/[Placeholder: table name]/_checkpoints/[Placeholder: folder name for checkpoints]")
675-
.table("[Placeholder: table name]")
676-
677-
```
678-
679672
680-
That's it! Using Azure Databricks, you have successfully streamed data into Azure Event Hubs, consumed the stream data using the Event Hubs connector, and then ran anomaly detection on streaming data in near real time.
673+
That's it! Using Azure Databricks, you have successfully streamed data into Azure Event Hubs, consumed the stream data using the Event Hubs connector, and then run anomaly detection on streaming data in near real time.
681674
Although in this tutorial, the granularity is hourly, you can always change the granularity to meet your need.
682675
683676
## Clean up resources
684677
685-
After you have finished running the tutorial, you can terminate the cluster. To do so, from the Azure Databricks workspace, from the left pane, select **Clusters**. For the cluster you want to terminate, move the cursor over the ellipsis under **Actions** column, and select the **Terminate** icon.
678+
After you have finished running the tutorial, you can terminate the cluster. To do so, in the Azure Databricks workspace, select **Clusters** from the left pane. For the cluster you want to terminate, move the cursor over the ellipsis under **Actions** column, and select the **Terminate** icon and then select **Confirm**.
686679
687680
![Stop a Databricks cluster](../media/tutorials/terminate-databricks-cluster.png "Stop a Databricks cluster")
688681

0 commit comments

Comments
 (0)