@@ -53,6 +53,9 @@ public static void main(String[] args) {
5353 SimpleTest .run ();
5454 ParallelWriteTest .run ();
5555 MultiNodeWriteTest .run ();
56+ P2PModeWriteTest .run ();
57+ SimpleAbortTest .run ();
58+ P2PModeAbortTest .run ();
5659 }
5760
5861 private static void prepareTestTable () throws Exception {
@@ -105,16 +108,16 @@ private static ObDirectLoadConnection buildConnection(int writeThreadNum)
105108 .enableParallelWrite (writeThreadNum ).build ();
106109 }
107110
108- private static ObDirectLoadStatement buildStatement (ObDirectLoadConnection connection )
111+ private static ObDirectLoadStatement buildStatement (ObDirectLoadConnection connection , boolean isP2PMode )
109112 throws ObDirectLoadException {
110113 return connection .getStatementBuilder ().setTableName (tableName ).setDupAction (dupAction )
111- .setParallel (parallel ).setQueryTimeout (timeout ).build ();
114+ .setParallel (parallel ).setQueryTimeout (timeout ).setIsP2PMode ( isP2PMode ). build ();
112115 }
113116
114- private static ObDirectLoadStatement buildStatement (ObDirectLoadConnection connection , ObDirectLoadStatementExecutionId executionId )
117+ private static ObDirectLoadStatement buildStatement (ObDirectLoadConnection connection , ObDirectLoadStatementExecutionId executionId , boolean isP2PMode )
115118 throws ObDirectLoadException {
116119 return connection .getStatementBuilder ().setTableName (tableName ).setDupAction (dupAction )
117- .setParallel (parallel ).setQueryTimeout (timeout ).setExecutionId (executionId ).build ();
120+ .setParallel (parallel ).setQueryTimeout (timeout ).setExecutionId (executionId ).setIsP2PMode ( isP2PMode ). build ();
118121 }
119122
120123 private static class SimpleTest {
@@ -127,7 +130,7 @@ public static void run() {
127130 prepareTestTable ();
128131
129132 connection = buildConnection (1 );
130- statement = buildStatement (connection );
133+ statement = buildStatement (connection , false );
131134
132135 statement .begin ();
133136
@@ -192,7 +195,7 @@ public static void run() {
192195 prepareTestTable ();
193196
194197 connection = buildConnection (parallel );
195- statement = buildStatement (connection );
198+ statement = buildStatement (connection , false );
196199
197200 statement .begin ();
198201
@@ -246,7 +249,7 @@ public void run() {
246249 executionId .decode (executionIdBytes );
247250
248251 connection = buildConnection (1 );
249- statement = buildStatement (connection , executionId );
252+ statement = buildStatement (connection , executionId , false );
250253
251254 ObDirectLoadBucket bucket = new ObDirectLoadBucket ();
252255 ObObj [] rowObjs = new ObObj [2 ];
@@ -277,7 +280,7 @@ public static void run() {
277280 prepareTestTable ();
278281
279282 connection = buildConnection (1 );
280- statement = buildStatement (connection );
283+ statement = buildStatement (connection , false );
281284
282285 statement .begin ();
283286
@@ -313,4 +316,228 @@ public static void run() {
313316
314317 };
315318
319+ private static class P2PModeWriteTest {
320+
321+ private static class P2PNodeWriter implements Runnable {
322+
323+ private final byte [] executionIdBytes ;
324+ private final int id ;
325+ private final AtomicInteger ref_cnt ;
326+
327+ P2PNodeWriter (byte [] executionIdBytes , int id , AtomicInteger ref_cnt ) {
328+ this .executionIdBytes = executionIdBytes ;
329+ this .id = id ;
330+ this .ref_cnt = ref_cnt ;
331+ }
332+
333+ @ Override
334+ public void run () {
335+ ObDirectLoadConnection connection = null ;
336+ ObDirectLoadStatement statement = null ;
337+ try {
338+ ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId ();
339+ executionId .decode (executionIdBytes );
340+
341+ connection = buildConnection (1 );
342+ statement = buildStatement (connection , executionId , true );
343+
344+ ObDirectLoadBucket bucket = new ObDirectLoadBucket ();
345+ ObObj [] rowObjs = new ObObj [2 ];
346+ rowObjs [0 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), id );
347+ rowObjs [1 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), id );
348+ bucket .addRow (rowObjs );
349+ statement .write (bucket );
350+
351+ if (0 == ref_cnt .decrementAndGet ()) {
352+ statement .commit ();
353+ }
354+
355+ } catch (Exception e ) {
356+ throw new RuntimeException (e );
357+ } finally {
358+ if (null != statement ) {
359+ statement .close ();
360+ }
361+ if (null != connection ) {
362+ connection .close ();
363+ }
364+ }
365+ }
366+
367+ };
368+
369+ public static void run () {
370+ System .out .println ("P2PModeWriteTest start" );
371+ final int writeThreadNum = 10 ;
372+ ObDirectLoadConnection connection = null ;
373+ ObDirectLoadStatement statement = null ;
374+ final AtomicInteger ref_cnt = new AtomicInteger (writeThreadNum );
375+ try {
376+ prepareTestTable ();
377+
378+ connection = buildConnection (1 );
379+ statement = buildStatement (connection , true );
380+
381+ statement .begin ();
382+
383+ ObDirectLoadStatementExecutionId executionId = statement .getExecutionId ();
384+ byte [] executionIdBytes = executionId .encode ();
385+
386+ Thread [] threads = new Thread [writeThreadNum ];
387+ for (int i = 0 ; i < threads .length ; ++i ) {
388+ P2PNodeWriter NodeWriter = new P2PNodeWriter (executionIdBytes , i , ref_cnt );
389+ Thread thread = new Thread (NodeWriter );
390+ thread .start ();
391+ threads [i ] = thread ;
392+ }
393+ for (int i = 0 ; i < threads .length ; ++i ) {
394+ threads [i ].join ();
395+ }
396+ queryTestTable (writeThreadNum );
397+ } catch (Exception e ) {
398+ throw new RuntimeException (e );
399+ } finally {
400+ if (null != statement ) {
401+ statement .close ();
402+ }
403+ if (null != connection ) {
404+ connection .close ();
405+ }
406+ }
407+ System .out .println ("P2PModeWriteTest successful" );
408+ }
409+
410+ };
411+
412+ private static class SimpleAbortTest {
413+
414+ public static void run () {
415+ System .out .println ("SimpleAbortTest start" );
416+ ObDirectLoadConnection connection = null ;
417+ ObDirectLoadStatement statement = null ;
418+ try {
419+ prepareTestTable ();
420+ System .out .println ("prepareTestTable" );
421+
422+ connection = buildConnection (1 );
423+ statement = buildStatement (connection , false );
424+
425+ statement .begin ();
426+
427+ ObDirectLoadBucket bucket = new ObDirectLoadBucket ();
428+ ObObj [] rowObjs = new ObObj [2 ];
429+ rowObjs [0 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), 1 );
430+ rowObjs [1 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), 2 );
431+ bucket .addRow (rowObjs );
432+ statement .write (bucket );
433+
434+ statement .abort ();
435+
436+ queryTestTable (0 );
437+ } catch (Exception e ) {
438+ throw new RuntimeException (e );
439+ } finally {
440+ if (null != statement ) {
441+ statement .close ();
442+ }
443+ if (null != connection ) {
444+ connection .close ();
445+ }
446+ }
447+ System .out .println ("SimpleAbortTest successful" );
448+ }
449+
450+ };
451+
452+ private static class P2PModeAbortTest {
453+
454+
455+ private static class AbortP2PNode implements Runnable {
456+
457+ private final byte [] executionIdBytes ;
458+ private final int id ;
459+
460+ AbortP2PNode (byte [] executionIdBytes , int id ) {
461+ this .executionIdBytes = executionIdBytes ;
462+ this .id = id ;
463+ }
464+
465+ @ Override
466+ public void run () {
467+ ObDirectLoadConnection connection = null ;
468+ ObDirectLoadStatement statement = null ;
469+ try {
470+ ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId ();
471+ executionId .decode (executionIdBytes );
472+
473+ connection = buildConnection (1 );
474+ statement = buildStatement (connection , executionId , true );
475+
476+ ObDirectLoadBucket bucket = new ObDirectLoadBucket ();
477+ ObObj [] rowObjs = new ObObj [2 ];
478+ rowObjs [0 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), id );
479+ rowObjs [1 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), id );
480+ bucket .addRow (rowObjs );
481+ statement .write (bucket );
482+
483+ statement .abort ();
484+
485+ } catch (Exception e ) {
486+ throw new RuntimeException (e );
487+ } finally {
488+ if (null != statement ) {
489+ statement .close ();
490+ }
491+ if (null != connection ) {
492+ connection .close ();
493+ }
494+ }
495+ }
496+
497+ };
498+
499+ public static void run () {
500+ System .out .println ("P2PModeAbortTest start" );
501+ ObDirectLoadConnection connection = null ;
502+ ObDirectLoadStatement statement = null ;
503+ try {
504+ prepareTestTable ();
505+
506+ connection = buildConnection (1 );
507+ statement = buildStatement (connection , true );
508+
509+ statement .begin ();
510+
511+ ObDirectLoadBucket bucket = new ObDirectLoadBucket ();
512+ ObObj [] rowObjs = new ObObj [2 ];
513+ rowObjs [0 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), 1 );
514+ rowObjs [1 ] = new ObObj (ObObjType .ObInt32Type .getDefaultObjMeta (), 2 );
515+ bucket .addRow (rowObjs );
516+ statement .write (bucket );
517+
518+ ObDirectLoadStatementExecutionId executionId = statement .getExecutionId ();
519+ byte [] executionIdBytes = executionId .encode ();
520+
521+ AbortP2PNode abortP2PNode = new AbortP2PNode (executionIdBytes , 3 );
522+ Thread abortNodeThread = new Thread (abortP2PNode );
523+ abortNodeThread .start ();
524+ abortNodeThread .join ();
525+
526+ queryTestTable (0 );
527+
528+ } catch (Exception e ) {
529+ throw new RuntimeException (e );
530+ } finally {
531+ if (null != statement ) {
532+ statement .close ();
533+ }
534+ if (null != connection ) {
535+ connection .close ();
536+ }
537+ }
538+ System .out .println ("P2PModeAbortTest successful" );
539+ }
540+
541+ };
542+
316543}
0 commit comments