@@ -522,11 +522,6 @@ public void testBatchesThreads() throws Exception {
522522 runWriteTest (2 , 20 , 20 , 200 , "batchesThreads" );
523523 }
524524
525- @ Test
526- public void testEverything () throws Exception {
527- runWriteTest (2 , 20 , 20 , 200 , "everything" );
528- }
529-
530525 public void runWriteTest ( int batchSize , int externalThreadCount , int batcherThreadCount ,
531526 int totalDocCount , String testName )
532527 {
@@ -555,38 +550,48 @@ public void runWriteTest( int batchSize, int externalThreadCount, int batcherThr
555550 .withThreadCount (batcherThreadCount )
556551 .onBatchSuccess (
557552 batch -> {
558- logger .debug ("[testWrites_{}] batch: {}, items: {}" , testName ,
559- batch .getJobBatchNumber (), batch .getItems ().length );
560- successfulBatchCount .incrementAndGet ();
561- for ( WriteEvent event : batch .getItems () ) {
562- successfulCount .incrementAndGet ();
563- //logger.debug("success event.getTargetUri()=[{}]", event.getTargetUri());
564- }
565- if ( expectedBatchSize != batch .getItems ().length ) {
566- // if this isn't the last batch
567- if ( batch .getJobBatchNumber () != expectedBatches ) {
568- failures .append ("ERROR: There should be " + expectedBatchSize +
569- " items in batch " + batch .getJobBatchNumber () + " but there are " + batch .getItems ().length );
553+ try {
554+ logger .debug ("[testWrites_{}] batch: {}, items: {}" , testName ,
555+ batch .getJobBatchNumber (), batch .getItems ().length );
556+ successfulBatchCount .incrementAndGet ();
557+ for ( WriteEvent event : batch .getItems () ) {
558+ successfulCount .incrementAndGet ();
559+ //logger.debug("success event.getTargetUri()=[{}]", event.getTargetUri());
560+ }
561+ if ( expectedBatchSize != batch .getItems ().length ) {
562+ // if this isn't the last batch
563+ if ( batch .getJobBatchNumber () != expectedBatches ) {
564+ failures .append ("ERROR: There should be " + expectedBatchSize +
565+ " items in batch " + batch .getJobBatchNumber () + " but there are " + batch .getItems ().length );
566+ }
570567 }
568+ batchTicket .set (batch .getJobTicket ());
569+ batchTimestamp .set (batch .getTimestamp ());
570+ } catch (Throwable e ) {
571+ System .out .println ("onBatchSuccess failed" );
572+ e .printStackTrace (System .out );
571573 }
572- batchTicket .set (batch .getJobTicket ());
573- batchTimestamp .set (batch .getTimestamp ());
574574 }
575575 )
576576 .onBatchFailure (
577577 (batch , throwable ) -> {
578- failureBatchCount .incrementAndGet ();
579- failureCount .addAndGet (batch .getItems ().length );
580- throwable .printStackTrace ();
581- for ( WriteEvent event : batch .getItems () ) {
582- logger .debug ("failure event.getTargetUri()=[{}]" , event .getTargetUri ());
583- }
584- if ( expectedBatchSize != batch .getItems ().length ) {
585- // if this isn't the last batch
586- if ( batch .getJobBatchNumber () != expectedBatches ) {
587- failures .append ("ERROR: There should be " + expectedBatchSize +
588- " items in batch " + batch .getJobBatchNumber () + " but there are " + batch .getItems ().length );
578+ try {
579+ failureBatchCount .incrementAndGet ();
580+ failureCount .addAndGet (batch .getItems ().length );
581+ throwable .printStackTrace ();
582+ for ( WriteEvent event : batch .getItems () ) {
583+ logger .debug ("failure event.getTargetUri()=[{}]" , event .getTargetUri ());
584+ }
585+ if ( expectedBatchSize != batch .getItems ().length ) {
586+ // if this isn't the last batch
587+ if ( batch .getJobBatchNumber () != expectedBatches ) {
588+ failures .append ("ERROR: There should be " + expectedBatchSize +
589+ " items in batch " + batch .getJobBatchNumber () + " but there are " + batch .getItems ().length );
590+ }
589591 }
592+ } catch (Throwable e ) {
593+ System .out .println ("onBatchFailure failed" );
594+ e .printStackTrace (System .out );
590595 }
591596 }
592597 )
@@ -600,59 +605,78 @@ public void runWriteTest( int batchSize, int externalThreadCount, int batcherThr
600605 assertEquals (writeBatcherJobId , batcher .getJobId ());
601606 assertEquals (batcherThreadCount , batcher .getThreadCount ());
602607
603- class WriteOperationRunnable implements Runnable {
608+ final AtomicBoolean noExternalFailure = new AtomicBoolean ( true );
604609
610+ class WriteOperationRunnable implements Runnable {
605611 @ Override
606612 public void run () {
607- String threadName = Thread .currentThread ().getName ();
608- DocumentWriteSet writeSet = client .newDocumentManager ().newWriteSet ();
609- for (int j = 1 ; j <= docsPerExternalThread ; j ++) {
610- String uri = "/" + collection + "/" + threadName + "/" + j + ".txt" ;
611- DocumentMetadataHandle meta = new DocumentMetadataHandle ().withCollections (whbTestCollection , collection );
612- writeSet .add (uri , meta , new StringHandle ("test" ).withFormat (Format .TEXT ));
613- }
614- for (DocumentWriteOperation doc : writeSet ) {
615- batcher .add (doc );
613+ try {
614+ String threadName = Thread .currentThread ().getName ();
615+ DocumentWriteSet writeSet = client .newDocumentManager ().newWriteSet ();
616+ for (int j = 1 ; j <= docsPerExternalThread ; j ++) {
617+ String uri = "/" + collection + "/" + threadName + "/" + j + ".txt" ;
618+ DocumentMetadataHandle meta = new DocumentMetadataHandle ().withCollections (whbTestCollection , collection );
619+ writeSet .add (uri , meta , new StringHandle ("test" ).withFormat (Format .TEXT ));
620+ }
621+ for (DocumentWriteOperation doc : writeSet ) {
622+ batcher .add (doc );
623+ }
624+ } catch (Throwable e ) {
625+ noExternalFailure .compareAndSet (true , false );
626+ System .out .println ("WriteOperationRunnable failed" );
627+ e .printStackTrace (System .out );
616628 }
617629 }
618630 }
619631
620632 class MyRunnable implements Runnable {
621-
622633 @ Override
623634 public void run () {
624- String threadName = Thread .currentThread ().getName ();
625- for (int j =1 ; j <= docsPerExternalThread ; j ++) {
626- String uri = "/" + collection + "/" + threadName + "/" + j + ".txt" ;
627- DocumentMetadataHandle meta = new DocumentMetadataHandle ()
628- .withCollections (whbTestCollection , collection );
629- batcher .add (uri , meta , new StringHandle ("test" ).withFormat (Format .TEXT ));
635+ try {
636+ String threadName = Thread .currentThread ().getName ();
637+ for (int j =1 ; j <= docsPerExternalThread ; j ++) {
638+ String uri = "/" + collection + "/" + threadName + "/" + j + ".txt" ;
639+ DocumentMetadataHandle meta = new DocumentMetadataHandle ()
640+ .withCollections (whbTestCollection , collection );
641+ batcher .add (uri , meta , new StringHandle ("test" ).withFormat (Format .TEXT ));
642+ }
643+ } catch (Throwable e ) {
644+ noExternalFailure .compareAndSet (true , false );
645+ System .out .println ("MyRunnable failed" );
646+ e .printStackTrace (System .out );
630647 }
631648 }
632649 }
633650
634651 Thread [] externalThreads = new Thread [externalThreadCount ];
652+ logger .debug ("starting runnables threadCount=[{}]" , externalThreadCount );
635653 for (int i = 0 ; i < externalThreadCount - 1 ; i ++) {
636654 externalThreads [i ] = new Thread (new MyRunnable (), testName + i );
637655 externalThreads [i ].start ();
638656 }
639657 externalThreads [externalThreadCount - 1 ] = new Thread (new WriteOperationRunnable (),
640658 testName + (externalThreadCount - 1 ));
641659 externalThreads [externalThreadCount - 1 ].start ();
660+ logger .debug ("started runnables threadCount=[{}]" , externalThreadCount );
642661
643662 for ( Thread thread : externalThreads ) {
644663 try { thread .join (); } catch (Exception e ) {}
645664 }
665+ logger .debug ("finished runnables threadCount=[{}]" , externalThreadCount );
646666 batcher .flushAndWait ();
667+
647668 int leftover = (totalDocCount % docsPerExternalThread );
669+ logger .debug ("adding leftover=[{}]" , leftover );
648670 // write any leftovers
649671 for (int j =0 ; j < leftover ; j ++) {
650672 String uri = "/" + collection + "/" + Thread .currentThread ().getName () + "/" + j + ".txt" ;
651673 DocumentMetadataHandle meta = new DocumentMetadataHandle ()
652674 .withCollections (whbTestCollection , collection );
653675 batcher .add (uri , meta , new StringHandle ("test" ).withFormat (Format .TEXT ));
654676 }
677+ logger .debug ("finished leftover=[{}]" , leftover );
655678 batcher .flushAndWait ();
679+
656680 JobReport report = moveMgr .getJobReport (ticket );
657681 //assertEquals("Job Report has incorrect completion information", false, report.isJobComplete());
658682
@@ -661,6 +685,7 @@ public void run() {
661685 //assertTrue("Job should be stopped now", batcher.isStopped());
662686
663687 if ( failures .length () > 0 ) fail (failures .toString ());
688+ assertTrue ("WriteOperationRunnable thread failed" , noExternalFailure .get ());
664689
665690 logger .debug ("expectedSuccess=[{}] successfulCount.get()=[{}]" , totalDocCount , successfulCount .get ());
666691 assertEquals ("The success listener ran wrong number of times" , totalDocCount , successfulCount .get ());
0 commit comments