Skip to content

Commit 8fb5eac

Browse files
authored
[feat] direct load support detach task (#322)
1 parent 4961057 commit 8fb5eac

File tree

9 files changed

+377
-33
lines changed

9 files changed

+377
-33
lines changed

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadStatement.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,16 @@ public void write(ObDirectLoadBucket bucket) throws ObDirectLoadException {
279279
executor.write(bucket);
280280
}
281281

282+
public void detach() throws ObDirectLoadException {
283+
try {
284+
checkStatus();
285+
executor.detach();
286+
} catch (ObDirectLoadException e) {
287+
logger.warn("statement detach failed", e);
288+
throw e;
289+
}
290+
}
291+
282292
public ObDirectLoadStatementExecutionId getExecutionId() throws ObDirectLoadException {
283293
checkStatus();
284294
return executor.getExecutionId();
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.direct_load.execution;
19+
20+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
21+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
22+
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
23+
import com.alipay.oceanbase.rpc.direct_load.future.ObDirectLoadStatementPromiseTask;
24+
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObDirectLoadDetachRpc;
25+
import com.alipay.oceanbase.rpc.table.ObTable;
26+
import com.alipay.oceanbase.rpc.direct_load.protocol.ObDirectLoadProtocol;
27+
28+
public class ObDirectLoadStatementDetachTask extends ObDirectLoadStatementPromiseTask {
29+
30+
private final ObDirectLoadConnection connection;
31+
private final ObDirectLoadProtocol protocol;
32+
private final ObDirectLoadStatementExecutor executor;
33+
34+
public ObDirectLoadStatementDetachTask(ObDirectLoadStatement statement,
35+
ObDirectLoadStatementExecutor executor) {
36+
super(statement);
37+
this.connection = statement.getConnection();
38+
this.protocol = connection.getProtocol();
39+
this.executor = executor;
40+
}
41+
42+
@Override
43+
public void run() {
44+
try {
45+
doSendDetach();
46+
setSuccess();
47+
} catch (ObDirectLoadException e) {
48+
logger.warn("statement detach task run failed", e);
49+
setFailure(e);
50+
}
51+
}
52+
53+
private ObDirectLoadDetachRpc doSendDetach() throws ObDirectLoadException {
54+
final ObTable table = statement.getObTablePool().getControlObTable();
55+
final long timeoutMillis = statement.getTimeoutRemain();
56+
57+
ObDirectLoadDetachRpc rpc = protocol.getDetachRpc(executor.getTraceId());
58+
rpc.setSvrAddr(executor.getSvrAddr());
59+
rpc.setTableId(executor.getTableId());
60+
rpc.setTaskId(executor.getTaskId());
61+
62+
logger.debug("statement send detach rpc");
63+
connection.executeWithConnection(rpc, table, timeoutMillis);
64+
logger.debug("statement detach rpc response successful");
65+
66+
return rpc;
67+
}
68+
69+
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/execution/ObDirectLoadStatementExecutor.java

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class ObDirectLoadStatementExecutor {
4343
private static final int FAIL = 6;
4444
private static final int ABORT = 7;
4545
private AtomicInteger stateFlag = new AtomicInteger(NONE);
46+
private boolean isDetached = false;
4647

4748
private final ObDirectLoadStatement statement;
4849
private final ObDirectLoadTraceId traceId;
@@ -88,6 +89,10 @@ public ObAddr getSvrAddr() {
8889
return svrAddr;
8990
}
9091

92+
public boolean isDetached() {
93+
return isDetached;
94+
}
95+
9196
public String toString() {
9297
return String.format("{svrAddr:%s, tableId:%d, taskId:%d}", svrAddr, tableId, taskId);
9398
}
@@ -134,6 +139,28 @@ public synchronized ObDirectLoadStatementFuture commit() {
134139
return task;
135140
}
136141

142+
public synchronized void detach() throws ObDirectLoadException {
143+
logger.info("statement call detach");
144+
checkState(LOADING, COMMITTING, "detach");
145+
if (isDetached) {
146+
logger.debug("statement already is detached");
147+
} else {
148+
ObDirectLoadStatementPromiseTask task = new ObDirectLoadStatementDetachTask(statement,
149+
this);
150+
task.run();
151+
if (!task.isDone()) {
152+
logger.warn("statement detach task unexpected not done");
153+
throw new ObDirectLoadUnexpectedException(
154+
"statement detach task unexpected not done");
155+
}
156+
if (!task.isSuccess()) {
157+
throw task.cause();
158+
}
159+
isDetached = true;
160+
logger.debug("statement detach successful");
161+
}
162+
}
163+
137164
public ObDirectLoadStatementExecutionId getExecutionId() throws ObDirectLoadException {
138165
checkState(LOADING, "getExecutionId");
139166
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId(
@@ -155,59 +182,74 @@ public synchronized void resume(ObDirectLoadStatementExecutionId executionId)
155182
svrAddr = executionId.getSvrAddr();
156183
}
157184

158-
public synchronized void close() {
185+
public void close() {
159186
// 如果begin还在执行, 等待begin结束
160-
if (beginFuture != null && !beginFuture.isDone()) {
187+
ObDirectLoadStatementFuture beginFuture = null;
188+
synchronized (this) {
189+
if (this.beginFuture != null && !this.beginFuture.isDone()) {
190+
beginFuture = this.beginFuture;
191+
}
192+
}
193+
if (beginFuture != null) {
161194
logger.info("statement close wait begin");
162195
try {
163196
beginFuture.await();
164197
} catch (ObDirectLoadInterruptedException e) {
165198
logger.warn("statement wait begin failed");
166199
}
167200
}
168-
beginFuture = null;
169201
// 如果commit还在执行, 等待commit结束
170-
if (commitFuture != null && !commitFuture.isDone()) {
202+
ObDirectLoadStatementFuture commitFuture = null;
203+
synchronized (this) {
204+
if (this.commitFuture != null && !this.commitFuture.isDone()) {
205+
commitFuture = this.commitFuture;
206+
}
207+
}
208+
if (commitFuture != null) {
171209
logger.info("statement close wait commit");
172210
try {
173211
commitFuture.await();
174212
} catch (ObDirectLoadInterruptedException e) {
175213
logger.warn("statement wait commit failed");
176214
}
177215
}
178-
commitFuture = null;
179216
// 如果heart beat还在执行, 取消heart beat
180-
if (heartBeatTask != null && !heartBeatTask.isDone()) {
181-
logger.info("statement close wait heart beat");
182-
final boolean canceled = heartBeatTask.cancel();
183-
if (!canceled) {
184-
try {
185-
heartBeatTask.await();
186-
} catch (ObDirectLoadInterruptedException e) {
187-
logger.warn("statement wait heart beat failed");
217+
ObDirectLoadStatementHeartBeatTask heartBeatTask = null;
218+
synchronized (this) {
219+
if (this.heartBeatTask != null && !this.heartBeatTask.isDone()) {
220+
final boolean canceled = this.heartBeatTask.cancel();
221+
if (!canceled) {
222+
heartBeatTask = this.heartBeatTask;
188223
}
189224
}
190225
}
191-
heartBeatTask = null;
226+
if (heartBeatTask != null) {
227+
logger.info("statement close wait heart beat");
228+
try {
229+
heartBeatTask.await();
230+
} catch (ObDirectLoadInterruptedException e) {
231+
logger.warn("statement wait heart beat failed");
232+
}
233+
}
192234
// 退出任务
193235
abortIfNeed();
236+
ObDirectLoadStatementFuture abortFuture = null;
237+
synchronized (this) {
238+
if (this.abortFuture != null && !this.abortFuture.isDone()) {
239+
abortFuture = this.abortFuture;
240+
}
241+
}
194242
if (abortFuture != null) {
243+
logger.info("statement close wait abort");
195244
try {
196-
if (!abortFuture.isDone()) {
197-
logger.info("statement close wait abort");
198-
abortFuture.await();
199-
}
200-
if (!abortFuture.isSuccess()) {
201-
throw abortFuture.cause();
202-
}
203-
logger.info("statement abort successful");
245+
abortFuture.await();
204246
} catch (ObDirectLoadException e) {
205247
logger.warn("statement abort failed", e);
206248
}
207249
}
208250
}
209251

210-
private void abortIfNeed() {
252+
private synchronized void abortIfNeed() {
211253
logger.debug("statement abort if need");
212254
if (abortFuture != null) {
213255
logger.debug("statement in abort");
@@ -247,6 +289,8 @@ private void abortIfNeed() {
247289
logger.debug("statement no need abort because " + reason);
248290
setState(ABORT);
249291
}
292+
} else if (isDetached) {
293+
logger.debug("statement no need abort because is detached");
250294
} else {
251295
abort();
252296
}
@@ -471,9 +515,7 @@ void setFailure(ObDirectLoadException cause) {
471515
}
472516
executor.cause = cause;
473517
logger.warn("statement begin failed", cause);
474-
synchronized (executor) {
475-
executor.abortIfNeed();
476-
}
518+
executor.abortIfNeed();
477519
}
478520

479521
void clear() {
@@ -508,9 +550,7 @@ void setFailure(ObDirectLoadException cause) {
508550
}
509551
executor.cause = cause;
510552
logger.warn("statement commit failed", cause);
511-
synchronized (executor) {
512-
executor.abortIfNeed();
513-
}
553+
executor.abortIfNeed();
514554
}
515555

516556
};
@@ -559,9 +599,7 @@ void setFailure(ObDirectLoadException cause) {
559599
}
560600
executor.cause = cause;
561601
logger.warn("statement heart beat failed", cause);
562-
synchronized (executor) {
563-
executor.abortIfNeed();
564-
}
602+
executor.abortIfNeed();
565603
}
566604

567605
};

src/main/java/com/alipay/oceanbase/rpc/direct_load/protocol/ObDirectLoadProtocol.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ public interface ObDirectLoadProtocol {
4343

4444
ObDirectLoadHeartBeatRpc getHeartBeatRpc(ObDirectLoadTraceId traceId);
4545

46+
ObDirectLoadDetachRpc getDetachRpc(ObDirectLoadTraceId traceId) throws ObDirectLoadException;
47+
4648
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.direct_load.protocol.payload;
19+
20+
import com.alipay.oceanbase.rpc.direct_load.protocol.ObDirectLoadRpc;
21+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObAddr;
22+
23+
public interface ObDirectLoadDetachRpc extends ObDirectLoadRpc {
24+
25+
// arg
26+
void setSvrAddr(ObAddr addr);
27+
28+
void setTableId(long tableId);
29+
30+
void setTaskId(long taskId);
31+
32+
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/protocol/v0/ObDirectLoadProtocolV0.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828

2929
public class ObDirectLoadProtocolV0 implements ObDirectLoadProtocol {
3030

31+
public static final long OB_VERSION_4_2_5_4 = ObGlobal.calcVersion(4, (short) 2,
32+
(byte) 5, (byte) 4);
33+
public static final long OB_VERSION_4_3_0_0 = ObGlobal.calcVersion(4, (short) 3,
34+
(byte) 0, (byte) 0);
35+
3136
public static final long OB_VERSION_4_3_2_0 = ObGlobal.calcVersion(4, (short) 3,
3237
(byte) 2, (byte) 0);
3338
public static final long OB_VERSION_4_3_5_0 = ObGlobal.calcVersion(4, (short) 3,
@@ -106,4 +111,27 @@ public ObDirectLoadHeartBeatRpc getHeartBeatRpc(ObDirectLoadTraceId traceId) {
106111
return new ObDirectLoadHeartBeatRpcV0(traceId);
107112
}
108113

114+
@Override
115+
public ObDirectLoadDetachRpc getDetachRpc(ObDirectLoadTraceId traceId)
116+
throws ObDirectLoadException {
117+
if (obVersion < OB_VERSION_4_3_0_0) {
118+
if (obVersion < OB_VERSION_4_2_5_4) {
119+
logger.warn("detach in ob version " + ObGlobal.getObVsnString(obVersion)
120+
+ "is not supported, minimum version required is "
121+
+ ObGlobal.getObVsnString(OB_VERSION_4_2_5_4));
122+
throw new ObDirectLoadNotSupportedException(
123+
"detach in ob version " + ObGlobal.getObVsnString(obVersion)
124+
+ " is not supported, minimum version required is "
125+
+ ObGlobal.getObVsnString(OB_VERSION_4_2_5_4));
126+
}
127+
} else {
128+
logger.warn("detach in ob version " + ObGlobal.getObVsnString(obVersion)
129+
+ "is not supported");
130+
throw new ObDirectLoadNotSupportedException("detach in ob version "
131+
+ ObGlobal.getObVsnString(obVersion)
132+
+ " is not supported");
133+
}
134+
return new ObDirectLoadDetachRpcV0(traceId);
135+
}
136+
109137
}

0 commit comments

Comments
 (0)