Skip to content

Commit 3b392c4

Browse files
authored
Adds support for leveraging user custom attributes in Alerting monitors (#1917)
* update code to access custom attributes, not custom attribute names, when managing monitors Signed-off-by: Mark Boyd <[email protected]> * fix reference to custom attributes property Signed-off-by: Mark Boyd <[email protected]> * try to fix type errors Signed-off-by: Mark Boyd <[email protected]> * make sure to pass along user to InjectorContextElement when running query level and bucket level monitors Signed-off-by: Mark Boyd <[email protected]> * fix kotlin compilation errors Signed-off-by: Mark Boyd <[email protected]> * update randomAnomalyResult() to properly stringify custom user attributes when inserting them into test documents Signed-off-by: Mark Boyd <[email protected]> * add AlertingRestTestCase.createUserWithAttributes helper for creating test users with attributes Signed-off-by: Mark Boyd <[email protected]> * add new test to validate query-level monitors work correctly with users having custom attributes and roles using DLS with attribute substitution Signed-off-by: Mark Boyd <[email protected]> * add security plugin setting to enable user attribute serialization for integration tests Signed-off-by: Mark Boyd <[email protected]> * update test user setup for integration test of query level monitor with dynamic DLS Signed-off-by: Mark Boyd <[email protected]> * fix custom_attributes -> attributes when creating user using API Signed-off-by: Mark Boyd <[email protected]> * update alert mappping definition to support custom_attributes for a user Signed-off-by: Mark Boyd <[email protected]> * update more mappings to include custom_attributes Signed-off-by: Mark Boyd <[email protected]> * remove newlines from the end of the JSON mapping files Signed-off-by: Mark Boyd <[email protected]> * fix expected schema versions in test Signed-off-by: Mark Boyd <[email protected]> * update expected schema version of scheduled jobs mapping for another integration test Signed-off-by: Mark Boyd <[email protected]> * remove custom_attributes property from mappings Signed-off-by: Mark Boyd <[email protected]> * revert updated schema versions for mappings Signed-off-by: Mark Boyd <[email protected]> * update randomAnomalyResult() to use custom_attribute_names property in JSON Signed-off-by: Mark Boyd <[email protected]> * revert test assertions for updated schema versions Signed-off-by: Mark Boyd <[email protected]> * remove custom_attributes property from scheduled jobs mapping Signed-off-by: Mark Boyd <[email protected]> --------- Signed-off-by: Mark Boyd <[email protected]>
1 parent 7df6d6f commit 3b392c4

File tree

12 files changed

+111
-30
lines changed

12 files changed

+111
-30
lines changed

alerting/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ configurations.all {
107107
force "commons-codec:commons-codec:1.13"
108108

109109
force "org.slf4j:slf4j-api:${versions.slf4j}" //Needed for http5
110-
110+
111111
// This is required because kotlin-coroutines-core 1.1.1 still requires kotlin stdlib 1.3.20 and we're using a higher kotlin version
112112
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
113113
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
@@ -286,6 +286,7 @@ testClusters.integTest.nodes.each { node ->
286286
node.setting("plugins.security.check_snapshot_restore_write_privileges", "true")
287287
node.setting("plugins.security.restapi.roles_enabled", "[\"all_access\", \"security_rest_api_access\"]")
288288
node.setting("plugins.security.system_indices.enabled", "true")
289+
node.setting("plugins.security.user_attribute_serialization.enabled", "true")
289290
}
290291
}
291292

alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,15 @@ object BucketLevelMonitorRunner : MonitorRunner() {
124124
// If a setting is imposed that limits buckets that can be processed for Bucket-Level Monitors, we'd need to iterate over
125125
// the buckets until we hit that threshold. In that case, we'd want to exit the execution without creating any alerts since the
126126
// buckets we iterate over before hitting the limit is not deterministic. Is there a better way to fail faster in this case?
127-
withClosableContext(InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)) {
127+
withClosableContext(
128+
InjectorContextElement(
129+
monitor.id,
130+
monitorCtx.settings!!,
131+
monitorCtx.threadPool!!.threadContext,
132+
roles,
133+
monitor.user
134+
)
135+
) {
128136
// Storing the first page of results in the case of pagination input results to prevent empty results
129137
// in the final output of monitorResult which occurs when all pages have been exhausted.
130138
// If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths

alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,15 @@ object QueryLevelMonitorRunner : MonitorRunner() {
5555
return monitorResult.copy(error = e)
5656
}
5757
if (!isADMonitor(monitor)) {
58-
withClosableContext(InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)) {
58+
withClosableContext(
59+
InjectorContextElement(
60+
monitor.id,
61+
monitorCtx.settings!!,
62+
monitorCtx.threadPool!!.threadContext,
63+
roles,
64+
monitor.user
65+
)
66+
) {
5967
monitorResult = monitorResult.copy(
6068
inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd, null, workflowRunContext)
6169
)

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,11 @@ class TransportIndexMonitorAction @Inject constructor(
267267
if (user == null) {
268268
// Security is disabled, add empty user to Monitor. user is null for older versions.
269269
request.monitor = request.monitor
270-
.copy(user = User("", listOf(), listOf(), listOf()))
270+
.copy(user = User("", listOf(), listOf(), mapOf()))
271271
start()
272272
} else {
273273
request.monitor = request.monitor
274-
.copy(user = User(user.name, user.backendRoles, user.roles, user.customAttNames))
274+
.copy(user = User(user.name, user.backendRoles, user.roles, user.customAttributes))
275275
start()
276276
}
277277
}
@@ -280,12 +280,12 @@ class TransportIndexMonitorAction @Inject constructor(
280280
if (user == null) {
281281
// Security is disabled, add empty user to Monitor. user is null for older versions.
282282
request.monitor = request.monitor
283-
.copy(user = User("", listOf(), listOf(), listOf()))
283+
.copy(user = User("", listOf(), listOf(), mapOf()))
284284
start()
285285
} else {
286286
try {
287287
request.monitor = request.monitor
288-
.copy(user = User(user.name, user.backendRoles, user.roles, user.customAttNames))
288+
.copy(user = User(user.name, user.backendRoles, user.roles, user.customAttributes))
289289
val searchSourceBuilder = SearchSourceBuilder().size(0)
290290
if (getRoleFilterEnabled(clusterService, settings, "plugins.anomaly_detection.filter_by_backend_roles")) {
291291
addUserBackendRolesFilter(user, searchSourceBuilder)
@@ -491,7 +491,7 @@ class TransportIndexMonitorAction @Inject constructor(
491491
else request.rbacRoles
492492

493493
request.monitor = request.monitor.copy(
494-
user = User(user.name, rbacRoles.orEmpty().toList(), user.roles, user.customAttNames)
494+
user = User(user.name, rbacRoles.orEmpty().toList(), user.roles, user.customAttributes)
495495
)
496496
log.debug("Created monitor's backend roles: $rbacRoles")
497497
}
@@ -649,20 +649,20 @@ class TransportIndexMonitorAction @Inject constructor(
649649
if (request.rbacRoles != null) {
650650
if (isAdmin(user)) {
651651
request.monitor = request.monitor.copy(
652-
user = User(user.name, request.rbacRoles, user.roles, user.customAttNames)
652+
user = User(user.name, request.rbacRoles, user.roles, user.customAttributes)
653653
)
654654
} else {
655655
// rolesToRemove: these are the backend roles to remove from the monitor
656656
val rolesToRemove = user.backendRoles - request.rbacRoles.orEmpty()
657657
// remove the monitor's roles with rolesToRemove and add any roles passed into the request.rbacRoles
658658
val updatedRbac = currentMonitor.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty()
659659
request.monitor = request.monitor.copy(
660-
user = User(user.name, updatedRbac, user.roles, user.customAttNames)
660+
user = User(user.name, updatedRbac, user.roles, user.customAttributes)
661661
)
662662
}
663663
} else {
664664
request.monitor = request.monitor
665-
.copy(user = User(user.name, currentMonitor.user!!.backendRoles, user.roles, user.customAttNames))
665+
.copy(user = User(user.name, currentMonitor.user!!.backendRoles, user.roles, user.customAttributes))
666666
}
667667
log.debug("Update monitor backend roles to: ${request.monitor.user?.backendRoles}")
668668
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,11 @@ class TransportIndexWorkflowAction @Inject constructor(
223223
if (user == null) {
224224
// Security is disabled, add empty user to Workflow. user is null for older versions.
225225
request.workflow = request.workflow
226-
.copy(user = User("", listOf(), listOf(), listOf()))
226+
.copy(user = User("", listOf(), listOf(), mapOf()))
227227
start()
228228
} else {
229229
request.workflow = request.workflow
230-
.copy(user = User(user.name, user.backendRoles, user.roles, user.customAttNames))
230+
.copy(user = User(user.name, user.backendRoles, user.roles, user.customAttributes))
231231
start()
232232
}
233233
}
@@ -346,7 +346,7 @@ class TransportIndexWorkflowAction @Inject constructor(
346346
else request.rbacRoles
347347

348348
request.workflow = request.workflow.copy(
349-
user = User(user.name, rbacRoles.orEmpty().toList(), user.roles, user.customAttNames)
349+
user = User(user.name, rbacRoles.orEmpty().toList(), user.roles, user.customAttributes)
350350
)
351351
log.debug("Created workflow's backend roles: $rbacRoles")
352352
}
@@ -484,7 +484,7 @@ class TransportIndexWorkflowAction @Inject constructor(
484484
if (request.rbacRoles != null) {
485485
if (isAdmin(user)) {
486486
request.workflow = request.workflow.copy(
487-
user = User(user.name, request.rbacRoles, user.roles, user.customAttNames)
487+
user = User(user.name, request.rbacRoles, user.roles, user.customAttributes)
488488
)
489489
} else {
490490
// rolesToRemove: these are the backend roles to remove from the monitor
@@ -493,7 +493,7 @@ class TransportIndexWorkflowAction @Inject constructor(
493493
val updatedRbac =
494494
currentWorkflow.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty()
495495
request.workflow = request.workflow.copy(
496-
user = User(user.name, updatedRbac, user.roles, user.customAttNames)
496+
user = User(user.name, updatedRbac, user.roles, user.customAttributes)
497497
)
498498
}
499499
} else {
@@ -503,7 +503,7 @@ class TransportIndexWorkflowAction @Inject constructor(
503503
user.name,
504504
currentWorkflow.user!!.backendRoles,
505505
user.roles,
506-
user.customAttNames
506+
user.customAttributes
507507
)
508508
)
509509
}

alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ fun maxAnomalyGradeSearchInput(
472472

473473
fun adMonitorTrigger(): QueryLevelTrigger {
474474
val triggerScript = """
475-
return ctx.results[0].aggregations.max_anomaly_grade.value != null &&
475+
return ctx.results[0].aggregations.max_anomaly_grade.value != null &&
476476
ctx.results[0].aggregations.max_anomaly_grade.value > 0.7
477477
""".trimIndent()
478478
return randomQueryLevelTrigger(condition = Script(triggerScript))
@@ -503,6 +503,6 @@ fun randomADMonitor(
503503
fun randomADUser(backendRole: String = OpenSearchRestTestCase.randomAlphaOfLength(10)): User {
504504
return User(
505505
OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(backendRole),
506-
listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), ALL_ACCESS_ROLE), listOf("test_attr=test")
506+
listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), ALL_ACCESS_ROLE), mapOf("test_attr" to "test")
507507
)
508508
}

alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,13 +1504,20 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
15041504
}
15051505

15061506
fun createUser(name: String, backendRoles: Array<String>) {
1507+
this.createUserWithAttributes(name, backendRoles, mapOf())
1508+
}
1509+
1510+
fun createUserWithAttributes(name: String, backendRoles: Array<String>, customAttributes: Map<String, String>) {
15071511
val request = Request("PUT", "/_plugins/_security/api/internalusers/$name")
15081512
val broles = backendRoles.joinToString { it -> "\"$it\"" }
1513+
val customAttributesString = customAttributes.entries.joinToString(prefix = "{", separator = ", ", postfix = "}") {
1514+
"\"${it.key}\": \"${it.value}\""
1515+
}
15091516
var entity = " {\n" +
15101517
"\"password\": \"$password\",\n" +
15111518
"\"backend_roles\": [$broles],\n" +
1512-
"\"attributes\": {\n" +
1513-
"}} "
1519+
"\"attributes\": $customAttributesString\n" +
1520+
"} "
15141521
request.setJsonEntity(entity)
15151522
client().performRequest(request)
15161523
}

alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
10721072
// for old monitor before enable FGAC, the user field is empty
10731073
val monitor = randomADMonitor(
10741074
inputs = listOf(adSearchInput(detectorId)), triggers = listOf(adMonitorTrigger()),
1075-
user = User(user.name, listOf(), user.roles, user.customAttNames)
1075+
user = User(user.name, listOf(), user.roles, user.customAttributes)
10761076
)
10771077
val response = executeMonitor(monitor, params = DRYRUN_MONITOR)
10781078
val output = entityAsMap(response)
@@ -2252,7 +2252,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
22522252
indexDoc(adResultIndex, "3", testResult3)
22532253
val testResult4 = randomAnomalyResult(
22542254
detectorId = detectorId, executionEndTime = testTime,
2255-
user = User(user.name, listOf(), user.roles, user.customAttNames),
2255+
user = User(user.name, listOf(), user.roles, user.customAttributes),
22562256
anomalyGrade = 0.9
22572257
)
22582258
indexDoc(adResultIndex, "4", testResult4)

alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -664,12 +664,12 @@ fun randomUser(): User {
664664
OpenSearchRestTestCase.randomAlphaOfLength(10)
665665
),
666666
listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), ALL_ACCESS_ROLE),
667-
listOf("test_attr=test")
667+
mapOf("test_attr" to "test"),
668668
)
669669
}
670670

671671
fun randomUserEmpty(): User {
672-
return User("", listOf(), listOf(), listOf())
672+
return User("", listOf(), listOf(), mapOf())
673673
}
674674

675675
fun EmailAccount.toJsonString(): String {

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() {
14011401
/*
14021402
TODO: https://github.com/opensearch-project/alerting/issues/300
14031403
*/
1404-
fun `test execute query-level monitor with user having partial index permissions`() {
1404+
fun `test execute query-level monitor with user role using static DLS`() {
14051405
createUser(user, arrayOf(TEST_HR_BACKEND_ROLE))
14061406
createTestIndex(TEST_HR_INDEX)
14071407
createIndexRoleWithDocLevelSecurity(
@@ -1418,7 +1418,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() {
14181418
"""
14191419
{
14201420
"test_field": "a",
1421-
"accessible": true
1421+
"accessible": true
14221422
}
14231423
""".trimIndent()
14241424
)
@@ -1437,7 +1437,64 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() {
14371437
val input = SearchInput(indices = listOf(TEST_HR_INDEX), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
14381438
val triggerScript = """
14391439
// make sure there is exactly one hit
1440-
return ctx.results[0].hits.hits.size() == 1
1440+
return ctx.results[0].hits.hits.size() == 1
1441+
""".trimIndent()
1442+
1443+
val trigger = randomQueryLevelTrigger(condition = Script(triggerScript)).copy(actions = listOf())
1444+
val monitor = createMonitorWithClient(
1445+
userClient!!,
1446+
randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger))
1447+
)
1448+
1449+
try {
1450+
executeMonitor(monitor.id)
1451+
val alerts = searchAlerts(monitor)
1452+
assertEquals("Incorrect number of alerts", 1, alerts.size)
1453+
} finally {
1454+
deleteRoleAndRoleMapping(TEST_HR_ROLE)
1455+
}
1456+
}
1457+
1458+
fun `test execute query-level monitor with user role using dynamic DLS with parameter substitution`() {
1459+
createUserWithAttributes(user, arrayOf(TEST_HR_BACKEND_ROLE), mapOf("team" to "red"))
1460+
createTestIndex(TEST_HR_INDEX)
1461+
// The ${'$'} is an attempt to bypass the fact that Kotlin treats ${} as string interpolation in a
1462+
// triple quoted string, but we need to preserve the reference as ${attr.internal.team}
1463+
val dlsQuery = """{\"term\": { \"team\": \"${'$'}{attr.internal.team}\"}}"""
1464+
createIndexRoleWithDocLevelSecurity(
1465+
TEST_HR_ROLE,
1466+
TEST_HR_INDEX,
1467+
dlsQuery,
1468+
getClusterPermissionsFromCustomRole(ALERTING_INDEX_MONITOR_ACCESS)
1469+
)
1470+
createUserRolesMapping(TEST_HR_ROLE, arrayOf(user))
1471+
1472+
// Add a doc that is accessible to the user
1473+
indexDoc(
1474+
TEST_HR_INDEX, "1",
1475+
"""
1476+
{
1477+
"test_field": "a",
1478+
"team": "red"
1479+
}
1480+
""".trimIndent()
1481+
)
1482+
1483+
// Add a second doc that is not accessible to the user
1484+
indexDoc(
1485+
TEST_HR_INDEX, "2",
1486+
"""
1487+
{
1488+
"test_field": "b",
1489+
"team": "blue"
1490+
}
1491+
""".trimIndent()
1492+
)
1493+
1494+
val input = SearchInput(indices = listOf(TEST_HR_INDEX), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
1495+
val triggerScript = """
1496+
// make sure there is exactly one hit
1497+
return ctx.results[0].hits.hits.size() == 1
14411498
""".trimIndent()
14421499

14431500
val trigger = randomQueryLevelTrigger(condition = Script(triggerScript)).copy(actions = listOf())

0 commit comments

Comments
 (0)