5353import org .elasticsearch .common .util .concurrent .EsExecutors ;
5454import org .elasticsearch .common .xcontent .XContentHelper ;
5555import org .elasticsearch .core .FixForMultiProject ;
56+ import org .elasticsearch .core .Predicates ;
5657import org .elasticsearch .core .Strings ;
5758import org .elasticsearch .core .TimeValue ;
5859import org .elasticsearch .core .Tuple ;
99100import java .util .function .Function ;
100101import java .util .function .IntConsumer ;
101102import java .util .function .LongSupplier ;
103+ import java .util .function .Predicate ;
102104import java .util .stream .Collectors ;
103105
104106import static org .elasticsearch .cluster .metadata .Metadata .DEFAULT_PROJECT_ID ;
@@ -1773,6 +1775,62 @@ public void testExecuteFailureRedirection() throws Exception {
17731775 verify (completionHandler , times (1 )).accept (Thread .currentThread (), null );
17741776 }
17751777
1778+ public void testFailureRedirectionWithoutNodeFeatureEnabled () throws Exception {
1779+ final CompoundProcessor processor = mockCompoundProcessor ();
1780+ IngestService ingestService = createWithProcessors (
1781+ Map .of (
1782+ "mock" ,
1783+ (factories , tag , description , config , projectId ) -> processor ,
1784+ "set" ,
1785+ (factories , tag , description , config , projectId ) -> new FakeProcessor ("set" , "" , "" , (ingestDocument ) -> fail ())
1786+ ),
1787+ Predicates .never ()
1788+ );
1789+ PutPipelineRequest putRequest1 = putJsonPipelineRequest ("_id1" , "{\" processors\" : [{\" mock\" : {}}]}" );
1790+ // given that set -> fail() above, it's a failure if a document executes against this pipeline
1791+ PutPipelineRequest putRequest2 = putJsonPipelineRequest ("_id2" , "{\" processors\" : [{\" set\" : {}}]}" );
1792+ var projectId = randomProjectIdOrDefault ();
1793+ ClusterState clusterState = ClusterState .builder (ClusterName .DEFAULT )
1794+ .putProjectMetadata (ProjectMetadata .builder (projectId ).build ())
1795+ .build ();
1796+ ClusterState previousClusterState = clusterState ;
1797+ clusterState = executePut (projectId , putRequest1 , clusterState );
1798+ clusterState = executePut (projectId , putRequest2 , clusterState );
1799+ ingestService .applyClusterState (new ClusterChangedEvent ("" , clusterState , previousClusterState ));
1800+ final IndexRequest indexRequest = new IndexRequest ("_index" ).id ("_id" )
1801+ .source (Map .of ())
1802+ .setPipeline ("_id1" )
1803+ .setFinalPipeline ("_id2" );
1804+ doThrow (new RuntimeException ()).when (processor )
1805+ .execute (eqIndexTypeId (indexRequest .version (), indexRequest .versionType (), Map .of ()), any ());
1806+ final Function <String , Boolean > redirectCheck = (idx ) -> indexRequest .index ().equals (idx );
1807+ @ SuppressWarnings ("unchecked" )
1808+ final TriConsumer <Integer , String , Exception > redirectHandler = mock (TriConsumer .class );
1809+ @ SuppressWarnings ("unchecked" )
1810+ final TriConsumer <Integer , Exception , IndexDocFailureStoreStatus > failureHandler = mock (TriConsumer .class );
1811+ @ SuppressWarnings ("unchecked" )
1812+ final BiConsumer <Thread , Exception > completionHandler = mock (BiConsumer .class );
1813+ ingestService .executeBulkRequest (
1814+ projectId ,
1815+ 1 ,
1816+ List .of (indexRequest ),
1817+ indexReq -> {},
1818+ redirectCheck ,
1819+ redirectHandler ,
1820+ failureHandler ,
1821+ completionHandler ,
1822+ EsExecutors .DIRECT_EXECUTOR_SERVICE
1823+ );
1824+ verify (processor ).execute (eqIndexTypeId (indexRequest .version (), indexRequest .versionType (), Map .of ()), any ());
1825+ verifyNoInteractions (redirectHandler );
1826+ verify (failureHandler , times (1 )).apply (
1827+ eq (0 ),
1828+ any (RuntimeException .class ),
1829+ eq (IndexDocFailureStoreStatus .NOT_APPLICABLE_OR_UNKNOWN )
1830+ );
1831+ verify (completionHandler , times (1 )).accept (Thread .currentThread (), null );
1832+ }
1833+
17761834 public void testExecuteFailureStatusOnFailureWithoutRedirection () throws Exception {
17771835 final CompoundProcessor processor = mockCompoundProcessor ();
17781836 IngestService ingestService = createWithProcessors (
@@ -3276,6 +3334,10 @@ private static IngestService createWithProcessors() {
32763334 }
32773335
32783336 private static IngestService createWithProcessors (Map <String , Processor .Factory > processors ) {
3337+ return createWithProcessors (processors , DataStream .DATA_STREAM_FAILURE_STORE_FEATURE ::equals );
3338+ }
3339+
3340+ private static IngestService createWithProcessors (Map <String , Processor .Factory > processors , Predicate <NodeFeature > featureTest ) {
32793341 Client client = mock (Client .class );
32803342 ThreadPool threadPool = mock (ThreadPool .class );
32813343 when (threadPool .generic ()).thenReturn (EsExecutors .DIRECT_EXECUTOR_SERVICE );
@@ -3299,7 +3361,7 @@ public Map<String, Processor.Factory> getProcessors(final Processor.Parameters p
32993361 new FeatureService (List .of ()) {
33003362 @ Override
33013363 public boolean clusterHasFeature (ClusterState state , NodeFeature feature ) {
3302- return DataStream . DATA_STREAM_FAILURE_STORE_FEATURE . equals (feature );
3364+ return featureTest . test (feature );
33033365 }
33043366 }
33053367 );
0 commit comments