Skip to content

Commit e26e566

Browse files
add P2P mode and abort interface
1 parent fb6f1ab commit e26e566

File tree

3 files changed

+319
-11
lines changed

3 files changed

+319
-11
lines changed

example/simple-table-demo/src/main/java/com/oceanbase/example/ObDirectLoadDemo.java

Lines changed: 235 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public static void main(String[] args) {
5353
SimpleTest.run();
5454
ParallelWriteTest.run();
5555
MultiNodeWriteTest.run();
56+
P2PModeWriteTest.run();
57+
SimpleAbortTest.run();
58+
P2PModeAbortTest.run();
5659
}
5760

5861
private static void prepareTestTable() throws Exception {
@@ -105,16 +108,16 @@ private static ObDirectLoadConnection buildConnection(int writeThreadNum)
105108
.enableParallelWrite(writeThreadNum).build();
106109
}
107110

108-
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection)
111+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, boolean isP2PMode)
109112
throws ObDirectLoadException {
110113
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
111-
.setParallel(parallel).setQueryTimeout(timeout).build();
114+
.setParallel(parallel).setQueryTimeout(timeout).setIsP2PMode(isP2PMode).build();
112115
}
113116

114-
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId)
117+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId, boolean isP2PMode)
115118
throws ObDirectLoadException {
116119
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
117-
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).build();
120+
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).setIsP2PMode(isP2PMode).build();
118121
}
119122

120123
private static class SimpleTest {
@@ -127,7 +130,7 @@ public static void run() {
127130
prepareTestTable();
128131

129132
connection = buildConnection(1);
130-
statement = buildStatement(connection);
133+
statement = buildStatement(connection, false);
131134

132135
statement.begin();
133136

@@ -192,7 +195,7 @@ public static void run() {
192195
prepareTestTable();
193196

194197
connection = buildConnection(parallel);
195-
statement = buildStatement(connection);
198+
statement = buildStatement(connection, false);
196199

197200
statement.begin();
198201

@@ -246,7 +249,7 @@ public void run() {
246249
executionId.decode(executionIdBytes);
247250

248251
connection = buildConnection(1);
249-
statement = buildStatement(connection, executionId);
252+
statement = buildStatement(connection, executionId, false);
250253

251254
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
252255
ObObj[] rowObjs = new ObObj[2];
@@ -277,7 +280,7 @@ public static void run() {
277280
prepareTestTable();
278281

279282
connection = buildConnection(1);
280-
statement = buildStatement(connection);
283+
statement = buildStatement(connection, false);
281284

282285
statement.begin();
283286

@@ -313,4 +316,228 @@ public static void run() {
313316

314317
};
315318

319+
private static class P2PModeWriteTest {
320+
321+
private static class P2PNodeWriter implements Runnable {
322+
323+
private final byte[] executionIdBytes;
324+
private final int id;
325+
private final AtomicInteger ref_cnt;
326+
327+
P2PNodeWriter(byte[] executionIdBytes, int id, AtomicInteger ref_cnt) {
328+
this.executionIdBytes = executionIdBytes;
329+
this.id = id;
330+
this.ref_cnt = ref_cnt;
331+
}
332+
333+
@Override
334+
public void run() {
335+
ObDirectLoadConnection connection = null;
336+
ObDirectLoadStatement statement = null;
337+
try {
338+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
339+
executionId.decode(executionIdBytes);
340+
341+
connection = buildConnection(1);
342+
statement = buildStatement(connection, executionId, true);
343+
344+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
345+
ObObj[] rowObjs = new ObObj[2];
346+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
347+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
348+
bucket.addRow(rowObjs);
349+
statement.write(bucket);
350+
351+
if (0 == ref_cnt.decrementAndGet()) {
352+
statement.commit();
353+
}
354+
355+
} catch (Exception e) {
356+
throw new RuntimeException(e);
357+
} finally {
358+
if (null != statement) {
359+
statement.close();
360+
}
361+
if (null != connection) {
362+
connection.close();
363+
}
364+
}
365+
}
366+
367+
};
368+
369+
public static void run() {
370+
System.out.println("P2PModeWriteTest start");
371+
final int writeThreadNum = 10;
372+
ObDirectLoadConnection connection = null;
373+
ObDirectLoadStatement statement = null;
374+
final AtomicInteger ref_cnt = new AtomicInteger(writeThreadNum);
375+
try {
376+
prepareTestTable();
377+
378+
connection = buildConnection(1);
379+
statement = buildStatement(connection, true);
380+
381+
statement.begin();
382+
383+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
384+
byte[] executionIdBytes = executionId.encode();
385+
386+
Thread[] threads = new Thread[writeThreadNum];
387+
for (int i = 0; i < threads.length; ++i) {
388+
P2PNodeWriter NodeWriter = new P2PNodeWriter(executionIdBytes, i, ref_cnt);
389+
Thread thread = new Thread(NodeWriter);
390+
thread.start();
391+
threads[i] = thread;
392+
}
393+
for (int i = 0; i < threads.length; ++i) {
394+
threads[i].join();
395+
}
396+
queryTestTable(writeThreadNum);
397+
} catch (Exception e) {
398+
throw new RuntimeException(e);
399+
} finally {
400+
if (null != statement) {
401+
statement.close();
402+
}
403+
if (null != connection) {
404+
connection.close();
405+
}
406+
}
407+
System.out.println("P2PModeWriteTest successful");
408+
}
409+
410+
};
411+
412+
private static class SimpleAbortTest {
413+
414+
public static void run() {
415+
System.out.println("SimpleAbortTest start");
416+
ObDirectLoadConnection connection = null;
417+
ObDirectLoadStatement statement = null;
418+
try {
419+
prepareTestTable();
420+
System.out.println("prepareTestTable");
421+
422+
connection = buildConnection(1);
423+
statement = buildStatement(connection, false);
424+
425+
statement.begin();
426+
427+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
428+
ObObj[] rowObjs = new ObObj[2];
429+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
430+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
431+
bucket.addRow(rowObjs);
432+
statement.write(bucket);
433+
434+
statement.abort();
435+
436+
queryTestTable(0);
437+
} catch (Exception e) {
438+
throw new RuntimeException(e);
439+
} finally {
440+
if (null != statement) {
441+
statement.close();
442+
}
443+
if (null != connection) {
444+
connection.close();
445+
}
446+
}
447+
System.out.println("SimpleAbortTest successful");
448+
}
449+
450+
};
451+
452+
private static class P2PModeAbortTest {
453+
454+
455+
private static class AbortP2PNode implements Runnable {
456+
457+
private final byte[] executionIdBytes;
458+
private final int id;
459+
460+
AbortP2PNode(byte[] executionIdBytes, int id) {
461+
this.executionIdBytes = executionIdBytes;
462+
this.id = id;
463+
}
464+
465+
@Override
466+
public void run() {
467+
ObDirectLoadConnection connection = null;
468+
ObDirectLoadStatement statement = null;
469+
try {
470+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
471+
executionId.decode(executionIdBytes);
472+
473+
connection = buildConnection(1);
474+
statement = buildStatement(connection, executionId, true);
475+
476+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
477+
ObObj[] rowObjs = new ObObj[2];
478+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
479+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
480+
bucket.addRow(rowObjs);
481+
statement.write(bucket);
482+
483+
statement.abort();
484+
485+
} catch (Exception e) {
486+
throw new RuntimeException(e);
487+
} finally {
488+
if (null != statement) {
489+
statement.close();
490+
}
491+
if (null != connection) {
492+
connection.close();
493+
}
494+
}
495+
}
496+
497+
};
498+
499+
public static void run() {
500+
System.out.println("P2PModeAbortTest start");
501+
ObDirectLoadConnection connection = null;
502+
ObDirectLoadStatement statement = null;
503+
try {
504+
prepareTestTable();
505+
506+
connection = buildConnection(1);
507+
statement = buildStatement(connection, true);
508+
509+
statement.begin();
510+
511+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
512+
ObObj[] rowObjs = new ObObj[2];
513+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
514+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
515+
bucket.addRow(rowObjs);
516+
statement.write(bucket);
517+
518+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
519+
byte[] executionIdBytes = executionId.encode();
520+
521+
AbortP2PNode abortP2PNode = new AbortP2PNode(executionIdBytes, 3);
522+
Thread abortNodeThread = new Thread(abortP2PNode);
523+
abortNodeThread.start();
524+
abortNodeThread.join();
525+
526+
queryTestTable(0);
527+
528+
} catch (Exception e) {
529+
throw new RuntimeException(e);
530+
} finally {
531+
if (null != statement) {
532+
statement.close();
533+
}
534+
if (null != connection) {
535+
connection.close();
536+
}
537+
}
538+
System.out.println("P2PModeAbortTest successful");
539+
}
540+
541+
};
542+
316543
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public synchronized void init(Builder builder) throws ObDirectLoadException {
8787
connection.getProtocol().checkIsSupported(this);
8888
obTablePool = new ObDirectLoadConnection.ObTablePool(connection, logger, queryTimeout);
8989
obTablePool.init();
90-
executor = new ObDirectLoadStatementExecutor(this);
90+
executor = new ObDirectLoadStatementExecutor(this, builder.isP2PMode);
9191
if (builder.executionId != null) {
9292
executor.resume(builder.executionId);
9393
}
@@ -308,6 +308,24 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
308308
executor.resume(executionId);
309309
}
310310

311+
public ObDirectLoadStatementFuture abortAsync() {
312+
try {
313+
checkStatus();
314+
return executor.requestAbort();
315+
} catch (ObDirectLoadException e) {
316+
logger.warn("statement abort failed", e);
317+
return new ObDirectLoadStatementFailedFuture(this, e);
318+
}
319+
}
320+
321+
public void abort() throws ObDirectLoadException {
322+
ObDirectLoadStatementFuture future = abortAsync();
323+
future.await();
324+
if (!future.isSuccess()) {
325+
throw future.cause();
326+
}
327+
}
328+
311329
public static final class Builder {
312330

313331
private final ObDirectLoadConnection connection;
@@ -325,6 +343,7 @@ public static final class Builder {
325343

326344
private ObDirectLoadTraceId traceId = null;
327345
private ObDirectLoadStatementExecutionId executionId = null;
346+
private boolean isP2PMode = false;
328347

329348
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
330349

@@ -382,6 +401,11 @@ public ObDirectLoadTraceId getTraceId() {
382401
return traceId;
383402
}
384403

404+
public Builder setIsP2PMode(boolean isP2PMode) {
405+
this.isP2PMode = isP2PMode;
406+
return this;
407+
}
408+
385409
public String toString() {
386410
return String
387411
.format(

0 commit comments

Comments
 (0)