Skip to content

Commit 3cdcdf3

Browse files
committed
Merge branch batch_duplicate_rowkey into master
Title: fix:batch operation duplicate rowkey fix batch operation with duplicate rowkey in mutate column value and rowkey. now, the client will remove those duplicate rowkey automatically. Link: https://code.alibaba-inc.com/oceanbase/obkv-table-client-java/codereview/11443697
2 parents e7abd52 + 297d7a6 commit 3cdcdf3

File tree

9 files changed

+80
-4
lines changed

9 files changed

+80
-4
lines changed

src/main/java/com/alipay/oceanbase/rpc/mutation/Append.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,14 @@ public Append addMutateColVal(ColumnValue... columnValues) {
112112
return this;
113113
}
114114

115+
/*
116+
* Remove rowkey from mutateColval
117+
*/
118+
public Append removeRowkeyFromMutateColval() {
119+
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
120+
return this;
121+
}
122+
115123
/*
116124
* execute
117125
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,31 +108,37 @@ public BatchOperationResult execute() throws Exception {
108108
throw new IllegalArgumentException("Invalid type in batch operation, "
109109
+ type);
110110
case INSERT:
111+
((Insert) mutation).removeRowkeyFromMutateColval();
111112
batchOps.insert(mutation.getRowKey(), ((Insert) mutation).getColumns(),
112113
((Insert) mutation).getValues());
113114
break;
114115
case DEL:
115116
batchOps.delete(mutation.getRowKey());
116117
break;
117118
case UPDATE:
119+
((Update) mutation).removeRowkeyFromMutateColval();
118120
batchOps.update(mutation.getRowKey(), ((Update) mutation).getColumns(),
119121
((Update) mutation).getValues());
120122
break;
121123
case INSERT_OR_UPDATE:
124+
((InsertOrUpdate) mutation).removeRowkeyFromMutateColval();
122125
batchOps.insertOrUpdate(mutation.getRowKey(),
123126
((InsertOrUpdate) mutation).getColumns(),
124127
((InsertOrUpdate) mutation).getValues());
125128
break;
126129
case REPLACE:
130+
((Replace) mutation).removeRowkeyFromMutateColval();
127131
batchOps.replace(mutation.getRowKey(), ((Replace) mutation).getColumns(),
128132
((Replace) mutation).getValues());
129133
break;
130134
case INCREMENT:
135+
((Increment) mutation).removeRowkeyFromMutateColval();
131136
batchOps.increment(mutation.getRowKey(),
132137
((Increment) mutation).getColumns(),
133138
((Increment) mutation).getValues(), withResult);
134139
break;
135140
case APPEND:
141+
((Append) mutation).removeRowkeyFromMutateColval();
136142
batchOps.append(mutation.getRowKey(), ((Append) mutation).getColumns(),
137143
((Append) mutation).getValues(), withResult);
138144
break;

src/main/java/com/alipay/oceanbase/rpc/mutation/Increment.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,14 @@ public Increment addMutateColVal(ColumnValue... columnValues) {
112112
return this;
113113
}
114114

115+
/*
116+
* Remove rowkey from mutateColval
117+
*/
118+
public Increment removeRowkeyFromMutateColval() {
119+
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
120+
return this;
121+
}
122+
115123
/*
116124
* execute
117125
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/Insert.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public Insert addMutateColVal(ColumnValue... columnValues) throws Exception {
109109
return this;
110110
}
111111

112+
/*
113+
* Remove rowkey from mutateColval
114+
*/
115+
public Insert removeRowkeyFromMutateColval() {
116+
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
117+
return this;
118+
}
119+
112120
/*
113121
* execute
114122
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/InsertOrUpdate.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public InsertOrUpdate addMutateColVal(ColumnValue... columnValues) {
109109
return this;
110110
}
111111

112+
/*
113+
* Remove rowkey from mutateColval
114+
*/
115+
public InsertOrUpdate removeRowkeyFromMutateColval() {
116+
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
117+
return this;
118+
}
119+
112120
/*
113121
* execute
114122
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public T addScanRange(Object[] start, boolean startEquals, Object[] end, boolean
330330

331331
static void removeRowkeyFromMutateColval(List<String> columns, List<Object> values,
332332
List<String> rowKeyNames) {
333-
if (null == columns || null == rowKeyNames) {
333+
if (null == columns || null == rowKeyNames || columns.size() != values.size()) {
334334
return;
335335
}
336336
for (int i = values.size() - 1; i >= 0; --i) {

src/main/java/com/alipay/oceanbase/rpc/mutation/Replace.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public Replace addMutateColVal(ColumnValue... columnValues) {
109109
return this;
110110
}
111111

112+
/*
113+
* Remove rowkey from mutateColval
114+
*/
115+
public Replace removeRowkeyFromMutateColval() {
116+
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
117+
return this;
118+
}
119+
112120
/*
113121
* execute
114122
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/Update.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ public Update addMutateColVal(ColumnValue... columnValues) {
109109
return this;
110110
}
111111

112+
/*
113+
* Remove rowkey from mutateColval
114+
*/
115+
public Update removeRowkeyFromMutateColval() {
116+
removeRowkeyFromMutateColval(this.columns, this.values, this.rowKeyNames);
117+
return this;
118+
}
119+
112120
/*
113121
* execute
114122
*/

src/test/java/com/alipay/oceanbase/rpc/ObTableClientTest.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1934,10 +1934,10 @@ public void testBatchMutation() throws Exception {
19341934
Assert.assertNull(opResult.getOperationRow().get("c4"));
19351935
Assert.assertNull(batchResult.get(2).getOperationRow().get("c4"));
19361936

1937-
long c1Vals[] = { 0L, 1L, 2L };
1938-
String c2Vals[] = { "row_0", "row_1", "row_2" };
1937+
long[] c1Vals = { 0L, 1L, 2L };
1938+
String[] c2Vals = { "row_0", "row_1", "row_2" };
19391939
byte[] c3Val = new byte[] { 1 };
1940-
long c4Vals[] = { 100L, 101L, 102L };
1940+
long[] c4Vals = { 100L, 101L, 102L };
19411941
BatchOperation batchOperation = client.batchOperation("test_mutation");
19421942
for (int i = 0; i < c1Vals.length; i++) {
19431943
Row rowKey1 = row(colVal("c1", c1Vals[i]), colVal("c2", c2Vals[i]));
@@ -1953,6 +1953,26 @@ public void testBatchMutation() throws Exception {
19531953
Assert.assertTrue(Arrays.equals(c3Val, (byte[]) row.get("c3")));
19541954
Assert.assertEquals(c4Vals[i], row.get("c4"));
19551955
}
1956+
1957+
// test duplicate rowkey in mutatecolval and rowkey
1958+
Insert insert_2 = insert().setRowKey(row(colVal("c1", 5L), colVal("c2", "row_5")))
1959+
.addMutateColVal(colVal("c1", 5L), colVal("c2", "row_5"))
1960+
.addMutateColVal(colVal("c3", new byte[] { 1 })).addMutateColVal(colVal("c4", 5L));
1961+
Update update_1 = update().setRowKey(row(colVal("c1", 0L), colVal("c2", "row_0")))
1962+
.addMutateRow(row(colVal("c1", 0L), colVal("c2", "row_0")))
1963+
.addMutateColVal(colVal("c3", new byte[] { 1 })).addMutateColVal(colVal("c4", 0L));
1964+
InsertOrUpdate iou_0 = insertOrUpdate()
1965+
.setRowKey(row(colVal("c1", 1L), colVal("c2", "row_1")))
1966+
.addMutateColVal(colVal("c2", "row_1"))
1967+
.addMutateColVal(colVal("c3", new byte[] { 1 })).addMutateColVal(colVal("c4", 0L));
1968+
1969+
batchResult = client.batchOperation("test_mutation")
1970+
.addOperation(insert_2, update_1, iou_0).execute();
1971+
Assert.assertEquals(0, batchResult.getWrongCount());
1972+
Assert.assertEquals(3, batchResult.getCorrectCount());
1973+
Assert.assertEquals(0, batchResult.getCorrectIdx()[0]);
1974+
Assert.assertEquals(1, batchResult.get(1).getAffectedRows());
1975+
Assert.assertEquals(1, batchResult.get(2).getAffectedRows());
19561976
} catch (Exception e) {
19571977
e.printStackTrace();
19581978
Assert.assertTrue(false);
@@ -1967,6 +1987,8 @@ public void testBatchMutation() throws Exception {
19671987
.execute();
19681988
client.delete("test_mutation").setRowKey(colVal("c1", 4L), colVal("c2", "row_4"))
19691989
.execute();
1990+
client.delete("test_mutation").setRowKey(colVal("c1", 5L), colVal("c2", "row_5"))
1991+
.execute();
19701992
}
19711993
}
19721994

0 commit comments

Comments
 (0)