@@ -47,7 +47,8 @@ public void testBulkSourceReleaseWhenIngestReplacesSource() throws Exception {
4747 String pipelineId = "pipeline_id" ;
4848 putPipeline (pipelineId );
4949
50- IncrementalBulkService incrementalBulkService = internalCluster ().getInstance (IncrementalBulkService .class );
50+ // Get the data node to ensure the ingest pipeline can be performed on this node
51+ IncrementalBulkService incrementalBulkService = internalCluster ().getDataNodeInstance (IncrementalBulkService .class );
5152
5253 ReleasableBytesReference originalBytes = new ReleasableBytesReference (new BytesArray ("{\" field\" : \" value\" }" ), () -> {});
5354
@@ -80,7 +81,8 @@ public void testBytesReferencedByTwoSourcesNotReleasedIfOnlyOneIngestPipeline()
8081 String pipelineId = "pipeline_id" ;
8182 putPipeline (pipelineId );
8283
83- IncrementalBulkService incrementalBulkService = internalCluster ().getInstance (IncrementalBulkService .class );
84+ // Get the data node to ensure the ingest pipeline can be performed on this node
85+ IncrementalBulkService incrementalBulkService = internalCluster ().getDataNodeInstance (IncrementalBulkService .class );
8486
8587 ReleasableBytesReference originalBytes = new ReleasableBytesReference (
8688 new BytesArray ("{\" field\" : \" value1\" }{\" field\" : \" value2\" }" ),
@@ -130,7 +132,8 @@ public void testSomeReferencesCanBeReleasedWhileOthersRetained() throws Exceptio
130132 String pipelineId = "pipeline_id" ;
131133 putPipeline (pipelineId );
132134
133- IncrementalBulkService incrementalBulkService = internalCluster ().getInstance (IncrementalBulkService .class );
135+ // Get the data node to ensure the ingest pipeline can be performed on this node
136+ IncrementalBulkService incrementalBulkService = internalCluster ().getDataNodeInstance (IncrementalBulkService .class );
134137
135138 ReleasableBytesReference releasedBytes = new ReleasableBytesReference (new BytesArray ("{\" field\" : \" value1\" }" ), () -> {});
136139 ReleasableBytesReference retainedBytes = new ReleasableBytesReference (
@@ -147,15 +150,14 @@ public void testSomeReferencesCanBeReleasedWhileOthersRetained() throws Exceptio
147150
148151 IndexRequest indexRequest2 = new IndexRequest ();
149152 indexRequest2 .index (index );
150- indexRequest2 .sourceContext ().source (retainedBytes .retainedSlice (0 , splitPoint ), XContentType .JSON );
153+ indexRequest2 .sourceContext ().source (retainedBytes .slice (0 , splitPoint ), XContentType .JSON );
151154 indexRequest2 .setPipeline (pipelineId );
152155
153156 IndexRequest indexRequestNoIngest = new IndexRequest ();
154157 indexRequestNoIngest .index (index );
155158 indexRequestNoIngest .sourceContext ()
156159 .source (retainedBytes .retainedSlice (splitPoint , retainedBytes .length () - splitPoint ), XContentType .JSON );
157160
158- retainedBytes .decRef ();
159161 assertTrue (retainedBytes .hasReferences ());
160162
161163 CountDownLatch blockLatch = new CountDownLatch (1 );
0 commit comments