Skip to content

Commit c853af2

Browse files
committed
Merge branch gaode_master into master
Title: Prepare release v1.0.2 - Fixed a persistent login failure when using a connection concurrently - Fixed an issue where logs could not be printed using the logback framework - Added interface for atomic batch mutation - Fixed some use case failures caused by ODP error code adjustments - Fixed the problem that the client connection cannot be appropriately released when it is closed in ODP mode - Added traceID printing to print logs Link: https://code.alibaba-inc.com/oceanbase/obkv-table-client-java/codereview/11924895
2 parents dc9cc5a + f645e2f commit c853af2

File tree

13 files changed

+256
-58
lines changed

13 files changed

+256
-58
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,13 @@
140140
<version>2.0.9</version>
141141
<scope>test</scope>
142142
</dependency>
143+
144+
<dependency>
145+
<groupId>ch.qos.logback</groupId>
146+
<artifactId>logback-classic</artifactId>
147+
<version>1.1.7</version>
148+
<scope>test</scope>
149+
</dependency>
143150
</dependencies>
144151

145152
<build>

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static com.alipay.oceanbase.rpc.property.Property.*;
6868
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
6969
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
70+
import static com.alipay.oceanbase.rpc.util.TraceUtil.formatTraceMessage;
7071

7172
public class ObTableClient extends AbstractObTableClient implements Lifecycle {
7273
private static final Logger logger = TableClientLoggerFactory
@@ -218,6 +219,9 @@ public void close() throws Exception {
218219
throw new ObTableCloseException(sb.toString(), throwException);
219220
}
220221
}
222+
if (odpTable != null) {
223+
odpTable.close();
224+
}
221225
} finally {
222226
BOOT.info("ObTableClient is closed");
223227
statusLock.unlock();
@@ -1503,7 +1507,7 @@ public Map<String, Object> execute(ObPair<Long, ObTable> obPair) throws Exceptio
15031507
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
15041508

15051509
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1506-
MONITOR.info(logMessage(tableName, "GET", endpoint, rowKey,
1510+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "GET", endpoint, rowKey,
15071511
(ObTableOperationResult) result, getTableTime - startTime,
15081512
System.currentTimeMillis() - getTableTime));
15091513
return ((ObTableOperationResult) result).getEntity().getSimpleProperties();
@@ -1539,7 +1543,7 @@ public Long execute(ObPair<Long, ObTable> obPair) throws Exception {
15391543
request.setPartitionId(partId);
15401544
ObPayload result = obPair.getRight().execute(request);
15411545
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1542-
MONITOR.info(logMessage(tableName, "UPDATE", endpoint, rowKey,
1546+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "UPDATE", endpoint, rowKey,
15431547
(ObTableOperationResult) result, getTableTime - start,
15441548
System.currentTimeMillis() - getTableTime));
15451549
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1578,7 +1582,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
15781582
request.setPartitionId(partId);
15791583
ObPayload result = obTable.execute(request);
15801584
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1581-
MONITOR.info(logMessage(tableName, "UPDATE", endpoint, rowKey,
1585+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "UPDATE", endpoint, rowKey,
15821586
(ObTableOperationResult) result, TableTime - start,
15831587
System.currentTimeMillis() - TableTime));
15841588
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1608,7 +1612,7 @@ public static String buildParamsString(List<Object> rowKeys) {
16081612
return stringBuilder.toString();
16091613
}
16101614

1611-
private String logMessage(String tableName, String methodName, String endpoint,
1615+
private String logMessage(String traceId, String tableName, String methodName, String endpoint,
16121616
Object[] rowKeys, ObTableQueryAndMutateResult result,
16131617
long routeTableTime, long executeTime) {
16141618
if (org.apache.commons.lang.StringUtils.isNotBlank(endpoint)) {
@@ -1621,15 +1625,15 @@ private String logMessage(String tableName, String methodName, String endpoint,
16211625
String res = String.valueOf(result.getAffectedRows());
16221626

16231627
StringBuilder stringBuilder = new StringBuilder();
1624-
stringBuilder.append(",").append(database).append(",").append(tableName).append(",")
1628+
stringBuilder.append(traceId).append(",").append(database).append(",").append(tableName).append(",")
16251629
.append(methodName).append(",").append(endpoint).append(",").append(argsValue)
16261630
.append(",").append(result.toString()).append(",").append(res).append(",")
16271631
.append(routeTableTime).append(",").append(executeTime).append(",")
16281632
.append(executeTime + routeTableTime);
16291633
return stringBuilder.toString();
16301634
}
16311635

1632-
private String logMessage(String tableName, String methodName, String endpoint,
1636+
private String logMessage(String traceId, String tableName, String methodName, String endpoint,
16331637
Object[] rowKeys, ObTableOperationResult result, long routeTableTime,
16341638
long executeTime) {
16351639
if (org.apache.commons.lang.StringUtils.isNotBlank(endpoint)) {
@@ -1654,7 +1658,7 @@ private String logMessage(String tableName, String methodName, String endpoint,
16541658
String errorCodeStringValue = resultCode.toString();
16551659

16561660
StringBuilder stringBuilder = new StringBuilder();
1657-
stringBuilder.append(",").append(database).append(",").append(tableName).append(",")
1661+
stringBuilder.append(traceId).append(",").append(database).append(",").append(tableName).append(",")
16581662
.append(methodName).append(",").append(endpoint).append(",").append(argsValue)
16591663
.append(",").append(errorCodeStringValue).append(",").append(res).append(",")
16601664
.append(routeTableTime).append(",").append(executeTime).append(",")
@@ -1690,7 +1694,7 @@ public Long execute(ObPair<Long, ObTable> obPair) throws Exception {
16901694
request.setPartitionId(partId);
16911695
ObPayload result = obPair.getRight().execute(request);
16921696
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1693-
MONITOR.info(logMessage(tableName, "DELETE", endpoint, rowKey,
1697+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "DELETE", endpoint, rowKey,
16941698
(ObTableOperationResult) result, getTableTime - start,
16951699
System.currentTimeMillis() - getTableTime));
16961700
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1726,7 +1730,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
17261730
request.setPartitionId(partId);
17271731
ObPayload result = obTable.execute(request);
17281732
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1729-
MONITOR.info(logMessage(tableName, "DELETE", endpoint, rowKey,
1733+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "DELETE", endpoint, rowKey,
17301734
(ObTableOperationResult) result, TableTime - start,
17311735
System.currentTimeMillis() - TableTime));
17321736
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1763,7 +1767,7 @@ public Long execute(ObPair<Long, ObTable> obPair) throws Exception {
17631767
request.setPartitionId(partId);
17641768
ObPayload result = obPair.getRight().execute(request);
17651769
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1766-
MONITOR.info(logMessage(tableName, "INSERT", endpoint, rowKey,
1770+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "INSERT", endpoint, rowKey,
17671771
(ObTableOperationResult) result, getTableTime - start,
17681772
System.currentTimeMillis() - getTableTime));
17691773
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1802,7 +1806,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
18021806
request.setPartitionId(partId);
18031807
ObPayload result = obTable.execute(request);
18041808
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1805-
MONITOR.info(logMessage(tableName, "INSERT", endpoint, rowKey,
1809+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "INSERT", endpoint, rowKey,
18061810
(ObTableOperationResult) result, TableTime - start,
18071811
System.currentTimeMillis() - TableTime));
18081812
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1839,7 +1843,7 @@ public Long execute(ObPair<Long, ObTable> obPair) throws Exception {
18391843
request.setPartitionId(partId);
18401844
ObPayload result = obPair.getRight().execute(request);
18411845
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1842-
MONITOR.info(logMessage(tableName, "REPLACE", endpoint, rowKey,
1846+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "REPLACE", endpoint, rowKey,
18431847
(ObTableOperationResult) result, getTableTime - start,
18441848
System.currentTimeMillis() - getTableTime));
18451849
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1878,7 +1882,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
18781882
request.setPartitionId(partId);
18791883
ObPayload result = obTable.execute(request);
18801884
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1881-
MONITOR.info(logMessage(tableName, "REPLACE", endpoint, rowKey,
1885+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "REPLACE", endpoint, rowKey,
18821886
(ObTableOperationResult) result, TableTime - start,
18831887
System.currentTimeMillis() - TableTime));
18841888
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1915,7 +1919,7 @@ public Long execute(ObPair<Long, ObTable> obPair) throws Exception {
19151919
request.setPartitionId(partId);
19161920
ObPayload result = obPair.getRight().execute(request);
19171921
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1918-
MONITOR.info(logMessage(tableName, "INERT_OR_UPDATE", endpoint, rowKey,
1922+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "INERT_OR_UPDATE", endpoint, rowKey,
19191923
(ObTableOperationResult) result, getTableTime - start,
19201924
System.currentTimeMillis() - getTableTime));
19211925
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -1955,7 +1959,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
19551959
request.setPartitionId(partId);
19561960
ObPayload result = obTable.execute(request);
19571961
String endpoint = obTable.getIp() + ":" + obTable.getPort();
1958-
MONITOR.info(logMessage(tableName, "INERT_OR_UPDATE", endpoint, rowKey,
1962+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "INERT_OR_UPDATE", endpoint, rowKey,
19591963
(ObTableOperationResult) result, TableTime - start,
19601964
System.currentTimeMillis() - TableTime));
19611965
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -2004,7 +2008,7 @@ public Map<String, Object> execute(ObPair<Long, ObTable> obPair) throws Exceptio
20042008
request.setPartitionId(partId);
20052009
ObPayload result = obPair.getRight().execute(request);
20062010
String endpoint = obTable.getIp() + ":" + obTable.getPort();
2007-
MONITOR.info(logMessage(tableName, "INCREMENT", endpoint, rowKey,
2011+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "INCREMENT", endpoint, rowKey,
20082012
(ObTableOperationResult) result, getTableTime - start,
20092013
System.currentTimeMillis() - getTableTime));
20102014
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -2049,7 +2053,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
20492053
request.setPartitionId(partId);
20502054
ObPayload result = obTable.execute(request);
20512055
String endpoint = obTable.getIp() + ":" + obTable.getPort();
2052-
MONITOR.info(logMessage(tableName, "INCREMENT", endpoint, rowKey,
2056+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "INCREMENT", endpoint, rowKey,
20532057
(ObTableOperationResult) result, TableTime - start,
20542058
System.currentTimeMillis() - TableTime));
20552059
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -2083,7 +2087,7 @@ public Map<String, Object> execute(ObPair<Long, ObTable> obPair) throws Exceptio
20832087
request.setPartitionId(partId);
20842088
ObPayload result = obPair.getRight().execute(request);
20852089
String endpoint = obTable.getIp() + ":" + obTable.getPort();
2086-
MONITOR.info(logMessage(tableName, "APPEND", endpoint, rowKey,
2090+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "APPEND", endpoint, rowKey,
20872091
(ObTableOperationResult) result, getTableTime - start,
20882092
System.currentTimeMillis() - getTableTime));
20892093
checkObTableOperationResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -2123,7 +2127,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
21232127
request.setPartitionId(partId);
21242128
ObPayload result = obTable.execute(request);
21252129
String endpoint = obTable.getIp() + ":" + obTable.getPort();
2126-
MONITOR.info(logMessage(tableName, "APPEND", endpoint, rowKey,
2130+
MONITOR.info(logMessage(tableName, formatTraceMessage(request), "APPEND", endpoint, rowKey,
21272131
(ObTableOperationResult) result, TableTime - start,
21282132
System.currentTimeMillis() - TableTime));
21292133
checkResult(obTable.getIp(), obTable.getPort(), request, result);
@@ -2184,7 +2188,7 @@ public ObPayload execute(ObPair<Long, ObTable> obPair) throws Exception {
21842188
} else {
21852189
curRowKey = rowKey;
21862190
}
2187-
MONITOR.info(logMessage(tableQuery.toString(), type.toString(), endpoint,
2191+
MONITOR.info(logMessage(formatTraceMessage(request), tableQuery.toString(), type.toString(), endpoint,
21882192
curRowKey, (ObTableQueryAndMutateResult) result, TableTime - start,
21892193
System.currentTimeMillis() - TableTime));
21902194
checkResult(obTable.getIp(), obTable.getPort(), request, result);

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import com.alipay.remoting.Connection;
2929
import org.slf4j.Logger;
3030

31-
import java.util.UUID;
3231
import java.util.concurrent.atomic.AtomicLong;
3332

33+
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.MONITOR;
34+
import static com.alipay.oceanbase.rpc.util.TraceUtil.formatTraceMessage;
35+
3436
public class ObTableConnection {
3537

3638
private static final Logger LOGGER = TableClientLoggerFactory
@@ -81,6 +83,7 @@ private synchronized boolean connect() throws Exception {
8183
if (checkAvailable()) { // double check status available
8284
return false;
8385
}
86+
final long start = System.currentTimeMillis();
8487
Exception cause = null;
8588
int tries = 0;
8689
int maxTryTimes = obTable.getObTableConnectTryTimes();
@@ -95,6 +98,8 @@ private synchronized boolean connect() throws Exception {
9598
"connect failed at " + tries + " try " + TraceUtil.formatIpPort(obTable), e);
9699
}
97100
}
101+
String endpoint = obTable.getIp() + ":" + obTable.getPort();
102+
MONITOR.info(logMessage("", "CONNECT", endpoint, System.currentTimeMillis() - start));
98103

99104
if (tries >= maxTryTimes) {
100105
throw new ObTableServerConnectException("connect failed after max " + maxTryTimes
@@ -113,6 +118,7 @@ private synchronized boolean connect() throws Exception {
113118
}
114119

115120
private synchronized void login() throws Exception {
121+
final long start = System.currentTimeMillis();
116122
ObTableLoginRequest request = new ObTableLoginRequest();
117123
request.setTenantName(obTable.getTenantName());
118124
request.setUserName(obTable.getUserName());
@@ -146,6 +152,8 @@ private synchronized void login() throws Exception {
146152
}
147153
}
148154

155+
String endpoint = obTable.getIp() + ":" + obTable.getPort();
156+
MONITOR.info(logMessage(formatTraceMessage(request), "LOGIN", endpoint, System.currentTimeMillis() - start));
149157
if (tries >= maxTryTimes) {
150158
throw new ObTableServerConnectException("login failed after max " + maxTryTimes
151159
+ " tries " + TraceUtil.formatIpPort(obTable),
@@ -187,7 +195,7 @@ public void checkStatus() throws Exception {
187195
}
188196
}
189197

190-
private void reconnect(String msg) throws Exception {
198+
private synchronized void reconnect(String msg) throws Exception {
191199
if (connect()) {
192200
LOGGER.warn("reconnect success. reconnect reason: [{}]", msg);
193201
} else {
@@ -241,9 +249,14 @@ private boolean checkAvailable() {
241249
if (connection == null) {
242250
return false;
243251
}
244-
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
252+
if (connection.getChannel() == null) {
253+
return false;
254+
}
255+
256+
if (!connection.getChannel().isActive()) {
245257
return false;
246258
}
259+
247260
if (credential == null) {
248261
return false;
249262
}
@@ -271,4 +284,14 @@ public long getNextSequence() {
271284
return sequence.incrementAndGet();
272285
}
273286

287+
private String logMessage(String traceId, String methodName, String endpoint, long executeTime) {
288+
if (org.apache.commons.lang.StringUtils.isNotBlank(endpoint)) {
289+
endpoint = endpoint.replaceAll(",", "#");
290+
}
291+
292+
StringBuilder stringBuilder = new StringBuilder();
293+
stringBuilder.append(traceId).append(",").append(methodName).append(",").append(endpoint).append(",").append(executeTime);
294+
return stringBuilder.toString();
295+
}
296+
274297
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class BatchOperation {
3434
private Table client;
3535
boolean withResult;
3636
private List<Object> operations;
37+
boolean isAtomic = true;
3738

3839
/*
3940
* default constructor
@@ -95,6 +96,11 @@ public BatchOperation addOperation(List<Mutation> mutations) {
9596
return this;
9697
}
9798

99+
public BatchOperation setIsAtomic(boolean isAtomic) {
100+
this.isAtomic = isAtomic;
101+
return this;
102+
}
103+
98104
public BatchOperationResult execute() throws Exception {
99105
// add rowkeyElement
100106
boolean hasSetRowkeyElement = false;
@@ -159,7 +165,7 @@ public BatchOperationResult execute() throws Exception {
159165
throw new ObTableException("unknown operation " + operation);
160166
}
161167
}
162-
168+
batchOps.setAtomicOperation(isAtomic);
163169
return new BatchOperationResult(batchOps.executeWithResult());
164170
}
165171
}

0 commit comments

Comments
 (0)