Skip to content

Commit 0e7f901

Browse files
authored
fix direct load multi node write (#331)
1 parent 774a4af commit 0e7f901

File tree

7 files changed

+209
-38
lines changed

7 files changed

+209
-38
lines changed

example/simple-table-demo/src/main/java/com/oceanbase/example/ObDirectLoadDemo.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection conne
111111
.setParallel(parallel).setQueryTimeout(timeout).build();
112112
}
113113

114+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection, ObDirectLoadStatementExecutionId executionId)
115+
throws ObDirectLoadException {
116+
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
117+
.setParallel(parallel).setQueryTimeout(timeout).setExecutionId(executionId).build();
118+
}
119+
114120
private static class SimpleTest {
115121

116122
public static void run() {
@@ -240,9 +246,7 @@ public void run() {
240246
executionId.decode(executionIdBytes);
241247

242248
connection = buildConnection(1);
243-
statement = buildStatement(connection);
244-
245-
statement.resume(executionId);
249+
statement = buildStatement(connection, executionId);
246250

247251
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
248252
ObObj[] rowObjs = new ObObj[2];

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ public void executeWithConnection(final ObDirectLoadRpc rpc, ObTable table, long
231231
}
232232
}
233233

234-
public synchronized ObDirectLoadStatement createStatement() throws ObDirectLoadException {
234+
public synchronized ObDirectLoadStatement createStatement(ObDirectLoadTraceId traceId)
235+
throws ObDirectLoadException {
235236
if (!isInited) {
236237
logger.warn("connection not init");
237238
throw new ObDirectLoadIllegalStateException("connection not init");
@@ -240,7 +241,7 @@ public synchronized ObDirectLoadStatement createStatement() throws ObDirectLoadE
240241
logger.warn("connection is closed");
241242
throw new ObDirectLoadIllegalStateException("connection is closed");
242243
}
243-
ObDirectLoadStatement stmt = new ObDirectLoadStatement(this);
244+
ObDirectLoadStatement stmt = new ObDirectLoadStatement(this, traceId);
244245
this.statementList.addLast(stmt);
245246
return stmt;
246247
}
@@ -257,7 +258,9 @@ ObDirectLoadStatement buildStatement(ObDirectLoadStatement.Builder builder)
257258
throws ObDirectLoadException {
258259
ObDirectLoadStatement stmt = null;
259260
try {
260-
stmt = createStatement();
261+
final ObDirectLoadTraceId traceId = builder.getTraceId() != null ? builder.getTraceId()
262+
: ObDirectLoadTraceId.generateTraceId();
263+
stmt = createStatement(traceId);
261264
stmt.init(builder);
262265
} catch (Exception e) {
263266
logger.warn("build statement failed, args:" + builder, e);

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public class ObDirectLoadStatement {
5555
private ObDirectLoadStatementExecutor executor = null;
5656
private long startQueryTimeMillis = 0;
5757

58-
ObDirectLoadStatement(ObDirectLoadConnection connection) {
58+
ObDirectLoadStatement(ObDirectLoadConnection connection, ObDirectLoadTraceId traceId) {
5959
this.connection = connection;
60-
this.traceId = ObDirectLoadTraceId.generateTraceId();
60+
this.traceId = traceId;
6161
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
6262
}
6363

@@ -88,6 +88,9 @@ public synchronized void init(Builder builder) throws ObDirectLoadException {
8888
obTablePool = new ObDirectLoadConnection.ObTablePool(connection, logger, queryTimeout);
8989
obTablePool.init();
9090
executor = new ObDirectLoadStatementExecutor(this);
91+
if (builder.executionId != null) {
92+
executor.resume(builder.executionId);
93+
}
9194
startQueryTimeMillis = System.currentTimeMillis();
9295
isInited = true;
9396
logger.info("statement init successful, args:" + builder);
@@ -294,6 +297,7 @@ public ObDirectLoadStatementExecutionId getExecutionId() throws ObDirectLoadExce
294297
return executor.getExecutionId();
295298
}
296299

300+
@Deprecated
297301
public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirectLoadException {
298302
if (executionId == null || !executionId.isValid()) {
299303
logger.warn("Param 'executionId' must not be null or invalid, value:" + executionId);
@@ -306,20 +310,23 @@ public void resume(ObDirectLoadStatementExecutionId executionId) throws ObDirect
306310

307311
public static final class Builder {
308312

309-
private final ObDirectLoadConnection connection;
313+
private final ObDirectLoadConnection connection;
310314

311-
private String tableName = null;
312-
private String[] columnNames = null;
313-
private String[] partitionNames = null;
314-
private ObLoadDupActionType dupAction = ObLoadDupActionType.INVALID_MODE;
315+
private String tableName = null;
316+
private String[] columnNames = null;
317+
private String[] partitionNames = null;
318+
private ObLoadDupActionType dupAction = ObLoadDupActionType.INVALID_MODE;
315319

316-
private int parallel = 0;
317-
private long queryTimeout = 0;
320+
private int parallel = 0;
321+
private long queryTimeout = 0;
318322

319-
private long maxErrorRowCount = 0;
320-
private String loadMethod = "full";
323+
private long maxErrorRowCount = 0;
324+
private String loadMethod = "full";
321325

322-
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
326+
private ObDirectLoadTraceId traceId = null;
327+
private ObDirectLoadStatementExecutionId executionId = null;
328+
329+
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;
323330

324331
Builder(ObDirectLoadConnection connection) {
325332
this.connection = connection;
@@ -365,12 +372,22 @@ public Builder setLoadMethod(String loadMethod) {
365372
return this;
366373
}
367374

375+
public Builder setExecutionId(ObDirectLoadStatementExecutionId executionId) {
376+
this.traceId = executionId.getTraceId();
377+
this.executionId = executionId;
378+
return this;
379+
}
380+
381+
public ObDirectLoadTraceId getTraceId() {
382+
return traceId;
383+
}
384+
368385
public String toString() {
369386
return String
370387
.format(
371-
"{tableName:%s, columnNames:%s, partitionNames:%s, dupAction:%s, parallel:%d, queryTimeout:%d, maxErrorRowCount:%d, loadMethod:%s}",
388+
"{tableName:%s, columnNames:%s, partitionNames:%s, dupAction:%s, parallel:%d, queryTimeout:%d, maxErrorRowCount:%d, loadMethod:%s, executionId:%s}",
372389
tableName, Arrays.toString(columnNames), Arrays.toString(partitionNames),
373-
dupAction, parallel, queryTimeout, maxErrorRowCount, loadMethod);
390+
dupAction, parallel, queryTimeout, maxErrorRowCount, loadMethod, executionId);
374391
}
375392

376393
public ObDirectLoadStatement build() throws ObDirectLoadException {

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadTraceId.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
import java.net.InetAddress;
2121
import java.util.concurrent.atomic.AtomicLong;
2222

23+
import com.alipay.oceanbase.rpc.util.ObByteBuf;
24+
import com.alipay.oceanbase.rpc.util.Serialization;
25+
26+
import io.netty.buffer.ByteBuf;
27+
import io.netty.buffer.Unpooled;
28+
2329
public class ObDirectLoadTraceId {
2430

2531
private final long uniqueId;
@@ -30,10 +36,6 @@ public ObDirectLoadTraceId(long uniqueId, long sequence) {
3036
this.sequence = sequence;
3137
}
3238

33-
public String toString() {
34-
return String.format("Y%X-%016X", uniqueId, sequence);
35-
}
36-
3739
public long getUniqueId() {
3840
return uniqueId;
3941
}
@@ -42,6 +44,40 @@ public long getSequence() {
4244
return sequence;
4345
}
4446

47+
public String toString() {
48+
return String.format("Y%X-%016X", uniqueId, sequence);
49+
}
50+
51+
public byte[] encode() {
52+
int needBytes = (int) getEncodedSize();
53+
ObByteBuf buf = new ObByteBuf(needBytes);
54+
encode(buf);
55+
return buf.bytes;
56+
}
57+
58+
public void encode(ObByteBuf buf) {
59+
Serialization.encodeVi64(buf, uniqueId);
60+
Serialization.encodeVi64(buf, sequence);
61+
}
62+
63+
public static ObDirectLoadTraceId decode(ByteBuf buf) {
64+
long uniqueId = Serialization.decodeVi64(buf);
65+
long sequence = Serialization.decodeVi64(buf);
66+
return new ObDirectLoadTraceId(uniqueId, sequence);
67+
}
68+
69+
public static ObDirectLoadTraceId decode(byte[] bytes) {
70+
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
71+
return decode(buf);
72+
}
73+
74+
public int getEncodedSize() {
75+
int len = 0;
76+
len += Serialization.getNeedBytes(uniqueId);
77+
len += Serialization.getNeedBytes(sequence);
78+
return len;
79+
}
80+
4581
public static final ObDirectLoadTraceId DEFAULT_TRACE_ID;
4682
public static TraceIdGenerator traceIdGenerator;
4783

src/main/java/com/alipay/oceanbase/rpc/direct_load/execution/ObDirectLoadStatementExecutionId.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc.direct_load.execution;
1919

20+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadTraceId;
2021
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
2122
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadIllegalArgumentException;
2223
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObAddr;
@@ -28,9 +29,11 @@
2829

2930
public class ObDirectLoadStatementExecutionId {
3031

31-
private long tableId = 0;
32-
private long taskId = 0;
33-
private ObAddr svrAddr = new ObAddr();
32+
private long tableId = 0;
33+
private long taskId = 0;
34+
private ObAddr svrAddr = new ObAddr();
35+
36+
private ObDirectLoadTraceId traceId = null;
3437

3538
public ObDirectLoadStatementExecutionId() {
3639
}
@@ -46,6 +49,20 @@ public ObDirectLoadStatementExecutionId(long tableId, long taskId, ObAddr svrAdd
4649
this.svrAddr = svrAddr;
4750
}
4851

52+
public ObDirectLoadStatementExecutionId(long tableId, long taskId, ObAddr svrAddr,
53+
ObDirectLoadTraceId traceId)
54+
throws ObDirectLoadException {
55+
if (tableId < 0 || taskId <= 0 || svrAddr == null || traceId == null) {
56+
throw new ObDirectLoadIllegalArgumentException(String.format(
57+
"invalid args, tableId:%d, taskId:%d, svrAddr:%s, traceId:%s", tableId, taskId,
58+
svrAddr, traceId));
59+
}
60+
this.tableId = tableId;
61+
this.taskId = taskId;
62+
this.svrAddr = svrAddr;
63+
this.traceId = traceId;
64+
}
65+
4966
public long getTableId() {
5067
return tableId;
5168
}
@@ -58,12 +75,17 @@ public ObAddr getSvrAddr() {
5875
return svrAddr;
5976
}
6077

78+
public ObDirectLoadTraceId getTraceId() {
79+
return traceId;
80+
}
81+
6182
public boolean isValid() {
6283
return tableId >= 0 && taskId > 0 && svrAddr.isValid();
6384
}
6485

6586
public String toString() {
66-
return String.format("{tableId:%d, taskId:%d, svrAddr:%s}", tableId, taskId, svrAddr);
87+
return String.format("{tableId:%d, taskId:%d, svrAddr:%s, traceId:%s}", tableId, taskId,
88+
svrAddr, traceId);
6789
}
6890

6991
public byte[] encode() {
@@ -77,12 +99,18 @@ public void encode(ObByteBuf buf) {
7799
Serialization.encodeVi64(buf, tableId);
78100
Serialization.encodeVi64(buf, taskId);
79101
svrAddr.encode(buf);
102+
if (traceId != null) {
103+
traceId.encode(buf);
104+
}
80105
}
81106

82107
public ObDirectLoadStatementExecutionId decode(ByteBuf buf) {
83108
tableId = Serialization.decodeVi64(buf);
84109
taskId = Serialization.decodeVi64(buf);
85110
svrAddr.decode(buf);
111+
if (buf.readableBytes() > 0) {
112+
traceId = ObDirectLoadTraceId.decode(buf);
113+
}
86114
return this;
87115
}
88116

@@ -96,6 +124,9 @@ public int getEncodedSize() {
96124
len += Serialization.getNeedBytes(tableId);
97125
len += Serialization.getNeedBytes(taskId);
98126
len += svrAddr.getEncodedSize();
127+
if (traceId != null) {
128+
len += traceId.getEncodedSize();
129+
}
99130
return len;
100131
}
101132

src/main/java/com/alipay/oceanbase/rpc/direct_load/execution/ObDirectLoadStatementExecutor.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public class ObDirectLoadStatementExecutor {
5959
private ObAddr svrAddr = null;
6060
private ObDirectLoadException cause = null; // 失败原因
6161

62+
private AtomicInteger writingCount = new AtomicInteger(0);
63+
6264
public ObDirectLoadStatementExecutor(ObDirectLoadStatement statement) {
6365
this.statement = statement;
6466
this.traceId = statement.getTraceId();
@@ -164,7 +166,7 @@ public synchronized void detach() throws ObDirectLoadException {
164166
public ObDirectLoadStatementExecutionId getExecutionId() throws ObDirectLoadException {
165167
checkState(LOADING, "getExecutionId");
166168
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId(
167-
tableId, taskId, svrAddr);
169+
tableId, taskId, svrAddr, traceId);
168170
return executionId;
169171
}
170172

@@ -247,6 +249,25 @@ public void close() {
247249
logger.warn("statement abort failed", e);
248250
}
249251
}
252+
// 如果还有写没结束, 等待写结束
253+
if (writingCount.get() > 0) {
254+
logger.info("statement close wait write");
255+
try {
256+
final long startTimeMillis = System.currentTimeMillis();
257+
long loopCnt = 0;
258+
while (writingCount.get() > 0) {
259+
Thread.sleep(10);
260+
++loopCnt;
261+
if (loopCnt % 100 == 0) {
262+
final long curTimeMillis = System.currentTimeMillis();
263+
logger.warn("statement has been wait write for "
264+
+ (curTimeMillis - startTimeMillis) + " ms");
265+
}
266+
}
267+
} catch (Exception e) {
268+
logger.warn("statement wait write failed", e);
269+
}
270+
}
250271
}
251272

252273
private synchronized void abortIfNeed() {
@@ -343,15 +364,23 @@ void stopHeartBeat() {
343364

344365
public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
345366
checkState(LOADING, LOADING_ONLY, "write");
346-
ObDirectLoadStatementPromiseTask task = new ObDirectLoadStatementWriteTask(statement, this,
347-
bucket);
348-
task.run();
349-
if (!task.isDone()) {
350-
logger.warn("statement write task unexpected not done");
351-
throw new ObDirectLoadUnexpectedException("statement write task unexpected not done");
352-
}
353-
if (!task.isSuccess()) {
354-
throw task.cause();
367+
writingCount.incrementAndGet();
368+
try {
369+
ObDirectLoadStatementPromiseTask task = new ObDirectLoadStatementWriteTask(statement,
370+
this, bucket);
371+
task.run();
372+
if (!task.isDone()) {
373+
logger.warn("statement write task unexpected not done");
374+
throw new ObDirectLoadUnexpectedException(
375+
"statement write task unexpected not done");
376+
}
377+
if (!task.isSuccess()) {
378+
throw task.cause();
379+
}
380+
} catch (ObDirectLoadException e) {
381+
throw e;
382+
} finally {
383+
writingCount.decrementAndGet();
355384
}
356385
}
357386

0 commit comments

Comments
 (0)