77
88package org .elasticsearch .xpack .transform .integration ;
99
10- import org .apache .lucene .tests .util .LuceneTestCase .AwaitsFix ;
1110import org .elasticsearch .client .Request ;
1211import org .elasticsearch .client .RequestOptions ;
1312import org .elasticsearch .common .Strings ;
1716import org .junit .After ;
1817import org .junit .Before ;
1918
19+ import java .io .IOException ;
20+ import java .time .Instant ;
2021import java .util .ArrayList ;
2122import java .util .Collections ;
2223import java .util .List ;
24+ import java .util .Locale ;
2325import java .util .Map ;
2426import java .util .concurrent .TimeUnit ;
2527
2628import static org .hamcrest .Matchers .equalTo ;
2729import static org .hamcrest .Matchers .is ;
2830
29- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/104238" )
3031public class TransformChainIT extends TransformRestTestCase {
3132
32- private static final String DEST_INDEX_TEMPLATE = """
33- {
34- "index_patterns": [ "my-transform-*-dest" ],
35- "mappings": {
36- "properties": {
37- "timestamp": {
38- "type": "date"
39- },
40- "user_id": {
41- "type": "keyword"
42- },
43- "stars": {
44- "type": "integer"
45- }
46- }
47- }
48- }""" ;
49-
33+ private static final String SET_INGEST_TIME_PIPELINE = "set_ingest_time" ;
5034 private static final String TRANSFORM_CONFIG_TEMPLATE = """
5135 {
5236 "source": {
5337 "index": "%s"
5438 },
5539 "dest": {
56- "index": "%s"
40+ "index": "%s",
41+ "pipeline": "%s"
5742 },
5843 "sync": {
5944 "time": {
60- "field": "timestamp"
45+ "field": "event.ingested",
46+ "delay": "10s"
6147 }
6248 },
6349 "frequency": "%s",
@@ -85,15 +71,67 @@ public class TransformChainIT extends TransformRestTestCase {
8571 },
8672 "settings": {
8773 "unattended": true,
88- "deduce_mappings": %s
74+ "deduce_mappings": %s,
75+ "use_point_in_time": %s
8976 }
9077 }""" ;
9178
9279 private TestThreadPool threadPool ;
9380
9481 @ Before
95- public void createThreadPool () {
82+ public void setupTransformTests () throws IOException {
9683 threadPool = new TestThreadPool (getTestName ());
84+
85+ // Create destination index template. It will be used by all the transforms in this test.
86+ Request createIndexTemplateRequest = new Request ("PUT" , "_template/test_dest_index_template" );
87+ createIndexTemplateRequest .setJsonEntity ("""
88+ {
89+ "index_patterns": [ "my-transform-*-dest" ],
90+ "mappings": {
91+ "properties": {
92+ "timestamp": {
93+ "type": "date"
94+ },
95+ "user_id": {
96+ "type": "keyword"
97+ },
98+ "stars": {
99+ "type": "integer"
100+ }
101+ }
102+ }
103+ }""" );
104+ createIndexTemplateRequest .setOptions (expectWarnings (RestPutIndexTemplateAction .DEPRECATION_WARNING ));
105+ assertAcknowledged (client ().performRequest (createIndexTemplateRequest ));
106+
107+ // Create ingest pipeline which sets event.ingested field. This is needed for transform's synchronisation to work correctly.
108+ Request putIngestPipelineRequest = new Request ("PUT" , "_ingest/pipeline/" + SET_INGEST_TIME_PIPELINE );
109+ putIngestPipelineRequest .setJsonEntity ("""
110+ {
111+ "description": "Set ingest timestamp.",
112+ "processors": [
113+ {
114+ "set": {
115+ "field": "event.ingested",
116+ "value": "{{{_ingest.timestamp}}}"
117+ }
118+ }
119+ ]
120+ }""" );
121+ assertOK (client ().performRequest (putIngestPipelineRequest ));
122+
123+ // Set logging levels for debugging.
124+ Request settingsRequest = new Request ("PUT" , "/_cluster/settings" );
125+ settingsRequest .setJsonEntity ("""
126+ {
127+ "persistent": {
128+ "logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer": "debug",
129+ "logger.org.elasticsearch.xpack.transform": "debug",
130+ "logger.org.elasticsearch.xpack.transform.notifications": "debug",
131+ "logger.org.elasticsearch.xpack.transform.transforms": "debug"
132+ }
133+ }""" );
134+ assertOK (client ().performRequest (settingsRequest ));
97135 }
98136
99137 @ After
@@ -103,26 +141,36 @@ public void shutdownThreadPool() {
103141 }
104142 }
105143
106- public void testChainedTransforms () throws Exception {
107- String reviewsIndexName = "reviews" ;
108- final int numDocs = 100 ;
109- createReviewsIndex (reviewsIndexName , numDocs , 100 , TransformIT ::getUserIdForRow , TransformIT ::getDateStringForRow );
144+ public void testTwoChainedTransforms () throws Exception {
145+ testChainedTransforms (2 );
146+ }
110147
111- // Create destination index template. It will be used by all the transforms in this test.
112- Request createIndexTemplateRequest = new Request ("PUT" , "_template/test_dest_index_template" );
113- createIndexTemplateRequest .setJsonEntity (DEST_INDEX_TEMPLATE );
114- createIndexTemplateRequest .setOptions (expectWarnings (RestPutIndexTemplateAction .DEPRECATION_WARNING ));
115- assertAcknowledged (client ().performRequest (createIndexTemplateRequest ));
148+ public void testThreeChainedTransforms () throws Exception {
149+ testChainedTransforms (3 );
150+ }
116151
117- final int numberOfTransforms = 3 ;
118- List <String > transformIds = new ArrayList <>(numberOfTransforms );
152+ private void testChainedTransforms (final int numTransforms ) throws Exception {
153+ final String reviewsIndexName = "reviews" ;
154+ final int numDocs = 100 ;
155+ final Instant now = Instant .now ();
156+ createReviewsIndex (
157+ reviewsIndexName ,
158+ numDocs ,
159+ 100 ,
160+ TransformIT ::getUserIdForRow ,
161+ row -> Instant .ofEpochMilli (now .toEpochMilli () - 1000 * numDocs + 1000 * row ).toString (),
162+ SET_INGEST_TIME_PIPELINE
163+ );
164+
165+ List <String > transformIds = new ArrayList <>(numTransforms );
119166 // Create the chain of transforms. Previous transform's destination index becomes next transform's source index.
120- for (int i = 0 ; i < numberOfTransforms ; ++i ) {
121- String transformId = "my-transform-" + i ;
167+ String transformIdPrefix = "my-transform-" + randomAlphaOfLength (4 ).toLowerCase (Locale .ROOT ) + "-" + numTransforms + "-" ;
168+ for (int i = 0 ; i < numTransforms ; ++i ) {
169+ String transformId = transformIdPrefix + i ;
122170 transformIds .add (transformId );
123171 // Set up the transform so that its source index is the destination index of the previous transform in the chain.
124172 // The number of documents is expected to be the same in all the indices.
125- String sourceIndex = i == 0 ? reviewsIndexName : "my-transform-" + (i - 1 ) + "-dest" ;
173+ String sourceIndex = i == 0 ? reviewsIndexName : transformIds . get (i - 1 ) + "-dest" ;
126174 String destIndex = transformId + "-dest" ;
127175 assertFalse (indexExists (destIndex ));
128176
@@ -137,12 +185,11 @@ public void testChainedTransforms() throws Exception {
137185 startTransform (transformId , RequestOptions .DEFAULT );
138186 }
139187
140- // Wait for the transforms to finish processing. Since the transforms are continuous, we cannot wait for them to be STOPPED.
141- // Instead, we wait for the expected number of processed documents.
188+ // Give the transforms some time to finish processing. Since the transforms are continuous, we cannot wait for them to be STOPPED.
142189 assertBusy (() -> {
190+ // Verify that each transform processed an expected number of documents.
143191 for (String transformId : transformIds ) {
144192 Map <?, ?> stats = getTransformStats (transformId );
145- // Verify that all the documents got processed.
146193 assertThat (
147194 "Stats were: " + stats ,
148195 XContentMapValues .extractValue (stats , "stats" , "documents_processed" ),
@@ -162,6 +209,15 @@ public void testChainedTransforms() throws Exception {
162209 }
163210
164211 private static String createTransformConfig (String sourceIndex , String destIndex ) {
165- return Strings .format (TRANSFORM_CONFIG_TEMPLATE , sourceIndex , destIndex , "1s" , "1s" , randomBoolean ());
212+ return Strings .format (
213+ TRANSFORM_CONFIG_TEMPLATE ,
214+ sourceIndex ,
215+ destIndex ,
216+ SET_INGEST_TIME_PIPELINE ,
217+ "1s" ,
218+ "1s" ,
219+ randomBoolean (),
220+ randomBoolean ()
221+ );
166222 }
167223}
0 commit comments