diff --git a/data-explorer/kusto/functions-library/detect-anomalous-access-cf-fl.md b/data-explorer/kusto/functions-library/detect-anomalous-access-cf-fl.md new file mode 100644 index 0000000000..a72bae908f --- /dev/null +++ b/data-explorer/kusto/functions-library/detect-anomalous-access-cf-fl.md @@ -0,0 +1,411 @@ +--- +title: detect_anomalous_access_cf_fl() +description: Learn how to use the detect_anomalous_access_cf_fl() function to detect anomalous access using collaborative filtering. +ms.reviewer: shaysakazi +ms.topic: reference +ms.date: 05/26/2025 +monikerRange: "microsoft-fabric || azure-data-explorer || azure-monitor || microsoft-sentinel" +--- +# detect_anomalous_access_cf_fl() + +>[!INCLUDE [applies](../includes/applies-to-version/applies.md)] [!INCLUDE [fabric](../includes/applies-to-version/fabric.md)] [!INCLUDE [azure-data-explorer](../includes/applies-to-version/azure-data-explorer.md)] [!INCLUDE [monitor](../includes/applies-to-version/monitor.md)] [!INCLUDE [sentinel](../includes/applies-to-version/sentinel.md)] + +Detect anomalous access using a collaborative filtering (CF) model that identifies anomalous access patterns in timestamped data. + +The `detect_anomalous_access_cf_fl()` function is a [user-defined function (UDF)](../query/functions/user-defined-functions.md) that applies a collaborative filtering (CF) model to detect anomalous interactions, such as entity-resource. For example, a User Principal Name (UPN) accessing a Storage account, based on timestamped data like access logs. In a cybersecurity context, this function helps detect abnormal or unauthorized access patterns. + +The CF-based model predicts access scores using item similarity, leveraging historical access patterns and the cosine similarity between entities and resources. It estimates the probability of an entity accessing a resource during a defined detection period within a given scope, such as a subscription or an account. Several optional parameters, including a minimal threshold, allow customization of the model’s behavior. + +The model outputs an access anomaly score in the range [0, 1], where 0 indicates a high likelihood of legitimate access and 1 indicates a highly anomalous access. Alongside the access anomaly score, the function also returns a binary anomaly flag (based on the defined threshold) and additional explanatory fields. + +## Syntax + +`detect_anomalous_access_cf_fl(`*entityColumnName*, *resourceColumnName*, *scopeColumnName*, *timeColumnName*, *startTraining*, *startDetection*, *endDetection*, [*anomalyScoreThresh*]`)` + +[!INCLUDE [syntax-conventions-note](../includes/syntax-conventions-note.md)] + +## Parameters + +| Name | Type | Required | Description | +|--|--|--|--| +| *entityColumnName* | `string` | :heavy_check_mark: | The name of the input table column containing entity names or IDs for which the cf model is calculated. | +| *resourceColumnName* | `string` | :heavy_check_mark: | The name of the input table column containing resource names or IDs for which the model is calculated. | +| *scopeColumnName* | `string` | :heavy_check_mark: | The name of the input table column containing the partition or scope, so that a different anomaly model is built for each scope. | +| *timeColumnName* | `string` | :heavy_check_mark: | The name of the input table column containing the timestamps that are used to define the training and detection periods. | +| *startTraining* | `datetime` | :heavy_check_mark: | The beginning of the training period for the anomaly model. Its end is defined by the beginning of detection period. | +| *startDetection* | `datetime` | :heavy_check_mark: | The beginning of the detection period for anomaly detection. | +| *endDetection* | `datetime` | :heavy_check_mark: | The end of the detection period for anomaly detection. | +| *anomalyScoreThresh* | `real` | | The maximum value of anomaly score for which an anomaly is detected, a number in range [0, 1]. Higher values mean that only more significant cases are considered anomalous, so fewer anomalies are detected (higher precision, lower recall). The default value is 0.9. | + +## Function definition + +You can define the function by either embedding its code as a query-defined function, or creating it as a stored function in your database, as follows: + +### [Query-defined](#tab/query-defined) + +Define the function using the following [let statement](../query/let-statement.md). No permissions are required. + +> [!IMPORTANT] +> A [let statement](../query/let-statement.md) can't run on its own. It must be followed by a [tabular expression statement](../query/tabular-expression-statements.md). To run a working example of `detect_anomalous_access_cf_fl()`, see [Example](#example). + +```kusto +let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string + , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime + , anomalyScoreThresh:real = 0.9) +{ +//pre-process the input data by adding standard column names and dividing to datasets +let processedData = ( + T + | extend entity = column_ifexists(entityColumnName, '') + | extend resource = column_ifexists(resourceColumnName, '') + | extend scope = column_ifexists(scopeColumnName, '') + | extend sliceTime = todatetime(column_ifexists(timeColumnName, '')) + | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime) + | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet' + , sliceTime >= startDetection and sliceTime <= endDetection , 'detectSet' + , 'other') + | where dataSet in ('trainSet', 'detectSet') +); +// Create all possible pairs (entity, resource) with the same scope +let entities = ( + processedData + | where dataSet == 'trainSet' + | summarize by entity, scope + | extend temp = 1 +); +let resources = ( + processedData + | where dataSet == 'trainSet' + | summarize by resource, scope + | extend temp = 1 +); +let potentialAccessTrainData = ( + entities + | join kind=inner resources on temp + | distinct entity, resource, scope +); +let accessTrainData = ( + potentialAccessTrainData + | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope + | extend usedOperation = iff(isempty(resource1), 0, 1) + | distinct entity, resource, scope, usedOperation +); +// Aggregate interaction scores per item into a list to prepare for similarity calculations +// Add a temporary key for self-joining later in the process +let ItemUserInteractions = ( + accessTrainData + | summarize interactList = make_list(usedOperation) by resource, scope + | extend tempKey=1 +); +// Compute item-to-item similarity using cosine similarity +let ItemSimilarities = ( + ItemUserInteractions + | join kind=inner (ItemUserInteractions) on tempKey + | where scope == scope1 + | extend similarity = series_cosine_similarity(interactList, interactList1) + | extend similarity = iff(isnan(similarity), 0.0, similarity) + | project resource, resource1, scope, similarity +); +// Predict user-item interactions based on item similarities +let Predictions = ( + accessTrainData + | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1 + | project entity, resource=resource2, usedOperation, similarity + | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource + | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore) + | extend accessAnomalyScore = 1 - accessAnomalyScore + | extend accessAnomalyScore = round(accessAnomalyScore, 4) + | join kind=inner accessTrainData on entity, resource + | project entity, resource, scope, usedOperation, accessAnomalyScore + | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation)) + | order by entity asc, resource +); +let resultsData = ( + processedData + | where dataSet == "detectSet" + | join kind=leftouter Predictions on entity, resource, scope + | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0) + | project-away sliceTime, entity1, resource1, scope1, usedOperation +); +resultsData +}; +// Write your query to use the function here. +``` + +### [Stored](#tab/stored) + +Define the stored function once using the following [`.create function`](../management/create-function.md). [Database User permissions](../access-control/role-based-access-control.md) are required. + +> [!IMPORTANT] +> You must run this code to create the function before you can use the function as shown in the [Example](#example). + +```kusto +.create-or-alter function with (folder = "KCL", docstring = "Detect anomalous access using collaborative filtering model", skipvalidation = "true") +detect_anomalous_access_cf_fl(T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string + , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime + , anomalyScoreThresh:real=0.9) +{ +//pre-process the input data by adding standard column names and dividing to datasets +let processedData = ( + T + | extend entity = column_ifexists(entityColumnName, '') + | extend resource = column_ifexists(resourceColumnName, '') + | extend scope = column_ifexists(scopeColumnName, '') + | extend sliceTime = todatetime(column_ifexists(timeColumnName, '')) + | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime) + | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet' + , sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet' + , 'other') + | where dataSet in ('trainSet', 'detectSet') +); +// Create all possible pairs (entity, resource) with the same scope +let entities = ( + processedData + | where dataSet == 'trainSet' + | summarize by entity, scope + | extend temp = 1 +); +let resources = ( + processedData + | where dataSet == 'trainSet' + | summarize by resource, scope + | extend temp = 1 +); +let potentialAccessTrainData = ( + entities + | join kind=inner resources on temp + | distinct entity, resource, scope +); +let accessTrainData = ( + potentialAccessTrainData + | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope + | extend usedOperation = iff(isempty(resource1), 0, 1) + | distinct entity, resource, scope, usedOperation +); +// Aggregate interaction scores per item into a list to prepare for similarity calculations +// Add a temporary key for self-joining later in the process +let ItemUserInteractions = ( + accessTrainData + | summarize interactList = make_list(usedOperation) by resource, scope + | extend tempKey=1 +); +// Compute item-to-item similarity using cosine similarity +let ItemSimilarities = ( + ItemUserInteractions + | join kind=inner (ItemUserInteractions) on tempKey + | where scope == scope1 + | extend similarity = series_cosine_similarity(interactList, interactList1) + | extend similarity = iff(isnan(similarity), 0.0, similarity) + | project resource, resource1, scope, similarity +); +// Predict user-item interactions based on item similarities +let Predictions = ( + accessTrainData + | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1 + | project entity, resource=resource2, usedOperation, similarity + | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource + | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore) + | extend accessAnomalyScore = 1 - accessAnomalyScore + | extend accessAnomalyScore = round(accessAnomalyScore, 4) + | join kind=inner accessTrainData on entity, resource + | project entity, resource, scope, usedOperation, accessAnomalyScore + | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation)) + | order by entity asc, resource +); +let resultsData = ( + processedData + | where dataSet == "detectSet" + | join kind=leftouter Predictions on entity, resource, scope + | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0) + | project-away sliceTime, entity1, resource1, scope1, usedOperation +); +resultsData +} +``` + +--- + +## Example + +The following example uses the [invoke operator](../query/invoke-operator.md) to run the function. + +### [Query-defined](#tab/query-defined) + +To use a query-defined function, invoke it after the embedded function definition. + +:::moniker range="azure-data-explorer" +> [!div class="nextstepaction"] +> Run the query +::: moniker-end + +```kusto +let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string + , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime + , anomalyScoreThresh:real = 0.9) +{ +//pre-process the input data by adding standard column names and dividing to datasets +let processedData = ( + T + | extend entity = column_ifexists(entityColumnName, '') + | extend resource = column_ifexists(resourceColumnName, '') + | extend scope = column_ifexists(scopeColumnName, '') + | extend sliceTime = todatetime(column_ifexists(timeColumnName, '')) + | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime) + | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet' + , sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet' + , 'other') + | where dataSet in ('trainSet', 'detectSet') +); +// Create all possible pairs (entity, resource) with the same scope +let entities = ( + processedData + | where dataSet == 'trainSet' + | summarize by entity, scope + | extend temp = 1 +); +let resources = ( + processedData + | where dataSet == 'trainSet' + | summarize by resource, scope + | extend temp = 1 +); +let potentialAccessTrainData = ( + entities + | join kind=inner resources on temp + | distinct entity, resource, scope +); +let accessTrainData = ( + potentialAccessTrainData + | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope + | extend usedOperation = iff(isempty(resource1), 0, 1) + | distinct entity, resource, scope, usedOperation +); +// Aggregate interaction scores per item into a list to prepare for similarity calculations +// Add a temporary key for self-joining later in the process +let ItemUserInteractions = ( + accessTrainData + | summarize interactList = make_list(usedOperation) by resource, scope + | extend tempKey=1 +); +// Compute item-to-item similarity using cosine similarity +let ItemSimilarities = ( + ItemUserInteractions + | join kind=inner (ItemUserInteractions) on tempKey + | where scope == scope1 + | extend similarity = series_cosine_similarity(interactList, interactList1) + | extend similarity = iff(isnan(similarity), 0.0, similarity) + | project resource, resource1, scope, similarity +); +// Predict user-item interactions based on item similarities +let Predictions = ( + accessTrainData + | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1 + | project entity, resource=resource2, usedOperation, similarity + | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource + | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore) + | extend accessAnomalyScore = 1 - accessAnomalyScore + | extend accessAnomalyScore = round(accessAnomalyScore, 4) + | join kind=inner accessTrainData on entity, resource + | project entity, resource, scope, usedOperation, accessAnomalyScore + | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation)) + | order by entity asc, resource +); +let resultsData = ( + processedData + | where dataSet == "detectSet" + | join kind=leftouter Predictions on entity, resource, scope + | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0) + | project-away sliceTime, entity1, resource1, scope1, usedOperation +); +resultsData +}; +// synthetic data generation +let detectPeriodStart = datetime(2022-04-30 05:00); +let trainPeriodStart = datetime(2022-03-01 05:00); +let names = pack_array("Admin", "Dev1", "Dev2", "IT-support"); +let countNames = array_length(names); +let devices = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device)); +let countDevices = array_length(devices)-1; +let testData = range t from 0 to 24*60 step 1 + | extend timeSlice = trainPeriodStart + 1h * t + | extend userName = tostring(names[toint(rand(countNames))]) + | extend deviceId = tostring(devices[toint(rand(countDevices))]) + | extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment') + | extend userName = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName) + | extend deviceId = iff(timeSlice == trainPeriodStart, 'device1', deviceId) + | extend accountName = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName) + | extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName) + | extend deviceId = iff(timeSlice == detectPeriodStart, 'device50', deviceId) + | extend accountName = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName) + | sort by timeSlice desc +; +testData +| invoke detect_anomalous_access_cf_fl(entityColumnName = 'userName' + , resourceColumnName = 'deviceId' + , scopeColumnName = 'accountName' + , timeColumnName = 'timeSlice' + , startTraining = trainPeriodStart + , startDetection = detectPeriodStart + , endDetection = detectPeriodStart + ) +``` + +### [Stored](#tab/stored) + +> [!IMPORTANT] +> For this example to run successfully, you must first run the [Function definition](#function-definition) code to store the function. + +```kusto +// synthetic data generation +let detectPeriodStart = datetime(2022-04-30 05:00); +let trainPeriodStart = datetime(2022-03-01 05:00); +let names = pack_array("Admin", "Dev1", "Dev2", "IT-support"); +let countNames = array_length(names); +let devices = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device)); +let countDevices = array_length(devices)-1; +let testData = range t from 0 to 24*60 step 1 + | extend timeSlice = trainPeriodStart + 1h * t + | extend userName = tostring(names[toint(rand(countNames))]) + | extend deviceId = tostring(devices[toint(rand(countDevices))]) + | extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment') + | extend userName = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName) + | extend deviceId = iff(timeSlice == trainPeriodStart, 'device1', deviceId) + | extend accountName = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName) + | extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName) + | extend deviceId = iff(timeSlice == detectPeriodStart, 'device50', deviceId) + | extend accountName = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName) + | sort by timeSlice desc +; +testData +| invoke detect_anomalous_access_cf_fl(entityColumnName = 'userName' + , resourceColumnName = 'deviceId' + , scopeColumnName = 'accountName' + , timeColumnName = 'timeSlice' + , startTraining = trainPeriodStart + , startDetection = detectPeriodStart + , endDetection = detectPeriodStart + ) +``` + +--- + +**Output** + +| t | timeSlice | userName | deviceId | accountName | entity | resource | scope | dataSet | accessAnomalyScore | isAnomalousAccess | +|------|-----------------------------|----------|----------|-----------------|--------|----------|------------------|--------------|---------------------|--------------------| +| 1440 | 2022-04-30 05:00:00.0000000 | H4ck3r | device50 | prodEnvironment | H4ck3r | device50 | prodEnvironment | detectSet | 0.982 | 1 | + + +The output of running the function shows each anomalous entity-resource access event during the detection period, filtered for cases where the predicted access probability (based on collaborative filtering) was higher than the defined anomaly threshold (by default, 0.9). Additional fields are added for clarity: + +* `dataSet`: current dataset (is always `detectSet`). +* `accessAnomalyScore`: the predicted access anomaly score of this access, based on collaborative filtering modeling. The value is in range [0, 1], higher values signify a higher degree of anomaly. +* `isAnomalousAccess`: binary flag for anomalous accesses + +Running the function with default parameters flags the access attempt by the user 'H4ck3r' to device 'device50' within the 'prodEnvironment' account. The predicted access anomaly score is 0.982, which is very high, indicating that this access is unexpected according to the trained model based on historical patterns. + +In the training period, the collaborative filtering model learned access patterns between users and devices within scopes. Since 'H4ck3r' accessing 'device50' was not observed and considered unlikely in the historical data, it was flagged as anomalous. + +The output table presents these anomalous accesses together with the predicted access score. These fields are useful for further investigation, alerting, or integration with broader detection workflows. + +The suggested usage in a cybersecurity context is to monitor important entities, such as usernames or IPs, accessing important resources like devices, databases, or applications within their corresponding scopes (e.g., account or subscription). diff --git a/data-explorer/kusto/functions-library/functions-library.md b/data-explorer/kusto/functions-library/functions-library.md index 84ad9ac8b5..6bd50d2126 100644 --- a/data-explorer/kusto/functions-library/functions-library.md +++ b/data-explorer/kusto/functions-library/functions-library.md @@ -18,6 +18,7 @@ The user-defined functions code is given in the articles. It can be used within | Function Name | Description | |--|--| +| [detect_anomalous_access_cf_fl()](detect-anomalous-access-cf-fl.md) | Detect anomalous access using collaborative filtering over timestamped data. | | [detect_anomalous_new_entity_fl()](detect-anomalous-new-entity-fl.md) | Detect the appearance of anomalous new entities in timestamped data. | | [detect_anomalous_spike_fl()](detect-anomalous-spike-fl.md) | Detect the appearance of anomalous spikes in numeric variables in timestamped data. | | [graph_blast_radius_fl()](graph-blast-radius-fl.md) | Calculate the Blast Radius (list and score) of source nodes over path or edge data. | diff --git a/data-explorer/kusto/functions-library/toc.yml b/data-explorer/kusto/functions-library/toc.yml index 25ca85ead0..4bbacb77ec 100644 --- a/data-explorer/kusto/functions-library/toc.yml +++ b/data-explorer/kusto/functions-library/toc.yml @@ -17,6 +17,9 @@ items: - name: dbscan_dynamic_fl() displayName: functions library, clustering, DBSCAN href: dbscan-dynamic-fl.md +- name: detect_anomalous_access_cf_fl() + displayName: functions library, security, cybersecurity, anomaly detection, access pattern, collaborative filtering, cyber, detect anomalous access + href: detect-anomalous-access-cf-fl.md - name: detect_anomalous_new_entity_fl() displayName: functions library, security, cybersecurity, anomaly detection, new entity, novelty, cyber, detect anomalous new entity href: detect-anomalous-new-entity-fl.md @@ -190,4 +193,4 @@ items: href: two-sample-t-test-fl.md - name: wilcoxon_test_fl() displayName: functions library, statistical test - href: wilcoxon-test-fl.md \ No newline at end of file + href: wilcoxon-test-fl.md diff --git a/data-explorer/kusto/management/query-acceleration-policy.md b/data-explorer/kusto/management/query-acceleration-policy.md index ebc4287376..a24c5ebb11 100644 --- a/data-explorer/kusto/management/query-acceleration-policy.md +++ b/data-explorer/kusto/management/query-acceleration-policy.md @@ -25,10 +25,9 @@ To enable query acceleration in the Fabric UI, see [Query acceleration over OneL * The number of columns in the external table can't exceed 900. * Delta tables with checkpoint V2 are not supported. -* Query performance over accelerated external delta tables which have partitions may not be optimal during preview. +* Query performance over accelerated external delta tables which have more than 2.5 million data files may not be optimal. * The feature assumes delta tables with static advanced features, for example column mapping doesn't change, partitions don't change, and so on. To change advanced features, first disable the policy, and once the change is made, re-enable the policy. * Schema changes on the delta table must also be followed with the respective `.alter` external delta table schema, which might result in acceleration starting from scratch if there was breaking schema change. -* Index-based pruning isn't supported for partitions. * Parquet files larger than 1 GB won't be cached. ::: moniker range="azure-data-explorer" * Query acceleration isn't supported for external tables with impersonation authentication.