diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 904c6c6cd..646e92cdc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.action.DocWriteRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.bulk.BulkItemResponse import org.opensearch.action.bulk.BulkRequest @@ -26,6 +27,7 @@ import org.opensearch.indexmanagement.transform.settings.TransformSettings import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.core.rest.RestStatus import org.opensearch.transport.RemoteTransportException +import org.opensearch.action.support.master.AcknowledgedResponse @Suppress("ComplexMethod") class TransformIndexer( @@ -63,6 +65,20 @@ class TransformIndexer( throw TransformIndexException("Failed to create the target index") } } + if (clusterService.state().metadata.hasAlias(targetIndex)) { + // return error if no write index with the alias + val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex + if (writeIndexMetadata == null) { + throw TransformIndexException("target_index [$targetIndex] is an alias but doesn't have write index") + } + val putMappingReq = PutMappingRequest(writeIndexMetadata.index?.name).source(targetFieldMappings) + val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { + putMapping(putMappingReq) + } + if (!mapResp.isAcknowledged) { + logger.error("Target index mapping request failed") + } + } } @Suppress("ThrowsCount", "RethrowCaughtException") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index a948899ff..4c5c789ee 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -22,6 +22,8 @@ import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestRequest import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS import org.opensearch.script.Script import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilders @@ -1430,6 +1432,99 @@ class TransformRunnerIT : TransformRestTestCase() { disableTransform(transform.id) } + @Suppress("UNCHECKED_CAST") + fun `test transform with wildcard, aliased target index`() { + val sourceIndex = "source-index" + validateSourceIndex(sourceIndex) + + // create alias + val indexAlias = "wildcard_index_alias" + val resolvedTargetIndex = "resolved_target_index" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, 1) + it.put(INDEX_NUMBER_OF_SHARDS, 1) + it + }.build() + val aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(resolvedTargetIndex, builtSettings, null, aliases) + + refreshAllIndices() + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" + + val transform = Transform( + id = "id_18", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = sourceIndex, + targetIndex = indexAlias, + roles = emptyList(), + pageSize = 100, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTime) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { assertTrue("Target transform index was not created", indexExists(resolvedTargetIndex)) } + + waitFor { + val job = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", job.metadataId) + val transformMetadata = getTransformMetadata(job.metadataId!!) + assertEquals("Transform had not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + // TODO - make sure we've written to the correct index! + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIndex/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$indexAlias/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + // how to check if the results are correctly written to the write index of the alias? + val sourcePickupDate = (((sourceIndexParserMap[sourceIndex]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[indexAlias]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + + assertEquals(sourcePickupDate, targetPickupDate) + + val pickupDateTimeTerm = "pickupDateTerm14" + + val request = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "avgFareAmount": { "avg": { "field": "$fareAmount" } } } + } + } + } + """ + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIndex/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$indexAlias/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + // Verify the values of keys and metrics in all buckets + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) + } + } private fun getStrictMappings(): String { return """ "dynamic": "strict",