@@ -78,6 +78,8 @@ public class BulkIOCallersFnTest extends BasicJavaClientREST {
7878
7979 // Egress error endpoint ConfigName
8080 private static String JsonEgressErrorConfigName = "DynamicEgressServicesForJsonError" ;
81+ // Ingest error endpoint ConfigName
82+ private static String JsonIngestErrorConfigName = "DynamicIngestServicesForTextError" ;
8183
8284 // Ingest and Egress endpoint ConfigName
8385 private static String IngestEgressSessionFieldsConfigName = "DynamicIngestEgressSessionFields" ;
@@ -94,6 +96,7 @@ public class BulkIOCallersFnTest extends BasicJavaClientREST {
9496 private static String IngestServicesXmlURI = "/dynamic/fntest/DynamicIngestServices/xml/" ;
9597 private static String IngestServicesTextURI = "/dynamic/fntest/DynamicIngestServices/text/" ;
9698 private static String IngestServicesBinURI = "/dynamic/fntest/DynamicIngestServices/bin/" ;
99+ private static String IngestServicesJsonErrorURI = "/dynamic/fntest/DynamicIngestServicesError/json/" ;
97100 //Output URI
98101 private static String EgressServicesJsonURI = "/dynamic/fntest/DynamicEgressServices/json/" ;
99102 private static String EgressServicesJsonErrorURI = "/dynamic/fntest/DynamicEgressServicesError/json/" ;
@@ -241,6 +244,18 @@ public static void setUp() throws Exception {
241244 file = null ;
242245 handle = null ;
243246
247+ file = new File (ApiConfigDirPath + JsonIngestErrorConfigName + ".sjs" );
248+ handle = new FileHandle (file );
249+ docMgr .write (IngestServicesJsonErrorURI + JsonIngestErrorConfigName +".sjs" , metadataHandle , handle );
250+ file = null ;
251+ handle = null ;
252+
253+ file = new File (ApiConfigDirPath + JsonIngestErrorConfigName + ".api" );
254+ handle = new FileHandle (file );
255+ docMgr .write (IngestServicesJsonErrorURI + JsonIngestErrorConfigName +".api" , metadataHandle , handle );
256+ file = null ;
257+ handle = null ;
258+
244259 file = new File (ApiConfigDirPath + IngestEgressSessionFieldsConfigName + ".sjs" );
245260 handle = new FileHandle (file );
246261 docMgr .write (IngestEgressSessionFieldsURI + IngestEgressSessionFieldsConfigName +".sjs" , metadataHandle , handle );
@@ -652,6 +667,61 @@ public void TestIngestEgressOnTextDocs() throws Exception {
652667 }
653668 }
654669
670+ // Use /dynamic/fntest/DynamicIngestServices/text/DynamicIngestServicesForTextError.sjs endpoint to test Text Documents ingest
671+ // Verify that STOP_ALL_CALLS in client does not throw NPE. Git # 1265
672+ @ Test
673+ public void TestIngestOnTextDocsError () throws Exception {
674+ System .out .println ("Running TestIngestOnTextDocsError" );
675+ StringBuilder errorBuf = new StringBuilder ();
676+ try {
677+ int startBatchIdx = 0 ;
678+ int maxDocSize = 100 ;
679+
680+ ObjectMapper om = new ObjectMapper ();
681+ File apiFile = new File (ApiConfigDirPath + JsonIngestErrorConfigName + ".api" );
682+
683+ JsonNode api = om .readTree (new FileReader (apiFile ));
684+ JacksonHandle jhAPI = new JacksonHandle (api );
685+
686+ String state = "{\" next\" :" +startBatchIdx +"}" ;
687+ String work = "{\" max\" :" +maxDocSize +"}" ;
688+
689+ InputCaller <String > ingressEndpt = InputCaller .on (dbclient , jhAPI , new StringHandle ());
690+ InputCaller .BulkInputCaller <String > inputbulkCaller = ingressEndpt .bulkCaller (ingressEndpt .newCallContext ()
691+ .withEndpointConstantsAs (work .getBytes ())
692+ .withEndpointStateAs (state ));
693+ String [] strContent = { "This is first test document" ,
694+ "This is second test document" ,
695+ "This is third test document" ,
696+ "This is fourth test document" ,
697+ "This is fifth test document"
698+ };
699+ InputCaller .BulkInputCaller .ErrorListener InerrorListener =
700+ (retryCount , throwable , callContext , inputHandles )
701+ -> {
702+ for (BufferableHandle h :inputHandles ) {
703+ errorBuf .append (h .toString ());
704+ }
705+ return IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .STOP_ALL_CALLS ;
706+ };
707+
708+ Stream <String > input = Stream .of (strContent );
709+ input .forEach (inputbulkCaller ::accept );
710+ inputbulkCaller .setErrorListener (InerrorListener );
711+ inputbulkCaller .awaitCompletion ();
712+ String errStr = errorBuf .toString ();
713+ //System.out.println("Error buffer when STOP_ALL_CALLS " + errorBuf.toString());
714+ Assert .assertTrue (!errStr .contains ("Exception" ));
715+ Assert .assertTrue (errStr .isEmpty ());
716+
717+ } catch (Exception e ) {
718+ e .printStackTrace ();
719+ }
720+ finally {
721+ System .out .println ("End of TestIngestEgressOnTextDocs" );
722+ }
723+ }
724+
655725 // Use /dynamic/fntest/DynamicIngestServices/bin/DynamicIngestServicesForBin.sjs endpoint to test Binary Documents ingest
656726 @ Test
657727 public void TestIngestEgressOnBinDocs () throws Exception {
0 commit comments