Skip to content

Commit 28bb9c0

Browse files
wt0530githubgxll
authored andcommitted
[fix][dingo-exec] Fix number of records returned by optimistic transaction duplicate key
1 parent eceec07 commit 28bb9c0

File tree

1 file changed

+68
-13
lines changed

1 file changed

+68
-13
lines changed

dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartInsertOperator.java

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import io.dingodb.common.exception.DingoTypeRangeException;
2323
import io.dingodb.common.log.LogUtils;
2424
import io.dingodb.common.meta.SchemaState;
25+
import io.dingodb.common.partition.RangeDistribution;
2526
import io.dingodb.common.profile.InsertProfile;
2627
import io.dingodb.common.store.KeyValue;
2728
import io.dingodb.common.type.DingoType;
2829
import io.dingodb.common.type.TupleMapping;
2930
import io.dingodb.common.type.converter.DataConverter;
31+
import io.dingodb.common.util.ByteArrayUtils;
3032
import io.dingodb.common.util.Optional;
3133
import io.dingodb.common.util.Pair;
3234
import io.dingodb.common.util.Utils;
@@ -67,6 +69,7 @@
6769
import java.util.ArrayList;
6870
import java.util.Arrays;
6971
import java.util.List;
72+
import java.util.NavigableMap;
7073
import java.util.stream.Collectors;
7174

7275
import static io.dingodb.common.util.NoBreakFunctions.wrap;
@@ -135,8 +138,8 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
135138
profile.typeCheck(start);
136139
start = System.currentTimeMillis();
137140
IndexTable index = null;
138-
boolean unique = false;
139141
byte[] indexOldDeleteKey = null;
142+
boolean mainTableInsert = true;
140143
if (context.getIndexId() != null) {
141144
boolean duplicate = param.getUpdateMapping() != null && param.getUpdates() != null;
142145
indexTable = (Table) TransactionManager.getIndex(txnId, context.getIndexId());
@@ -262,7 +265,7 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
262265
byte[] oldKey = value.getKey();
263266
if (oldKey[oldKey.length - 2] == Op.PUTIFABSENT.getCode()
264267
|| oldKey[oldKey.length - 2] == Op.PUT.getCode()) {
265-
unique = index.isUnique();
268+
boolean unique = index.isUnique();
266269
if (unique) {
267270
throw new DuplicateEntryException("Duplicate entry "
268271
+ TransactionUtil.duplicateEntryKey(tableId, key, txnId) + " for key 'PRIMARY'");
@@ -329,7 +332,6 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
329332
KeyValue oldKv = store.txnGet(txnId.seq, key, param.getLockTimeOut());
330333
long count = param.getTable().getColumnIndices2(indexTable.keyColumns()).stream().filter(i -> param.getUpdateMapping().findIdx(i) >= 0).count();
331334
Object[] tempTuple = param.getCodec().decode(oldKv);
332-
unique = true;
333335
if (duplicate && count == 0) {
334336
// primary table
335337
schema = param.getSchema();
@@ -384,8 +386,8 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
384386
isDocument,
385387
index,
386388
indexOldDeleteKey,
387-
unique,
388-
indexOldTuple);
389+
indexOldTuple,
390+
mainTableInsert);
389391

390392
// primary table
391393
schema = param.getSchema();
@@ -416,8 +418,8 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
416418
isDocument,
417419
index,
418420
indexOldDeleteKey,
419-
unique,
420-
indexOldTuple);
421+
indexOldTuple,
422+
mainTableInsert);
421423
profile.step5(start);
422424
return true;
423425
}
@@ -428,6 +430,59 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
428430
}
429431
}
430432
}
433+
} else {
434+
IndexTable uniqueIndex = param.getTable().getIndexes().stream()
435+
.filter(IndexTable::isUnique).findFirst().orElse(null);
436+
if (param.getUpdates() != null && param.getUpdateMapping() != null && uniqueIndex != null) {
437+
// primary table & unique index
438+
KeyValueCodec indexCodec = CodecService.getDefault().createKeyValueCodec(
439+
uniqueIndex.getCodecVersion(), uniqueIndex.version,
440+
uniqueIndex.tupleType(), uniqueIndex.keyMapping());
441+
List<Integer> columnIndices2 = param.getTable().getColumnIndices2(uniqueIndex.columns);
442+
Object[] tuple2 = columnIndices2.stream().map(i -> {
443+
if (i == -1) {
444+
return null;
445+
}
446+
return primaryOldTuple[i];
447+
}).toArray();
448+
byte[] keyEncode = indexCodec.encodeKey(tuple2);
449+
CommonId indexId = uniqueIndex.getTableId();
450+
NavigableMap<ByteArrayUtils.ComparableByteArray, RangeDistribution> ranges =
451+
MetaService.root().getRangeDistribution(indexId);
452+
CommonId regionId = PartitionService.getService(
453+
Optional.ofNullable(uniqueIndex.getPartitionStrategy())
454+
.orElse(DingoPartitionServiceProvider.RANGE_FUNC_NAME))
455+
.calcPartId(keyEncode, ranges);
456+
StoreInstance indexLocalStore =
457+
Services.LOCAL_STORE.getInstance(indexId, regionId);
458+
CodecService.getDefault().setId(keyEncode, regionId.domain);
459+
byte[] indexIdByte = indexId.encode();
460+
byte[] regionIdByte = regionId.encode();
461+
byte[] txnIdByte = txnId.encode();
462+
byte[] insertKey = ByteUtils.encode(
463+
CommonId.CommonType.TXN_CACHE_DATA,
464+
keyEncode,
465+
Op.PUTIFABSENT.getCode(),
466+
(txnIdByte.length + indexIdByte.length + regionIdByte.length),
467+
txnIdByte,
468+
indexIdByte,
469+
regionIdByte);
470+
byte[] updateKey = Arrays.copyOf(insertKey, insertKey.length);
471+
updateKey[updateKey.length - 2] = (byte) Op.PUT.getCode();
472+
List<byte[]> bytes = new ArrayList<>(3);
473+
bytes.add(insertKey);
474+
bytes.add(updateKey);
475+
List<KeyValue> keyValues = indexLocalStore.get(bytes);
476+
if (keyValues != null && !keyValues.isEmpty()) {
477+
mainTableInsert = false;
478+
} else {
479+
StoreInstance indexStore = Services.KV_STORE.getInstance(indexId, regionId);
480+
KeyValue kv = indexStore.txnGet(txnId.seq, keyEncode, param.getLockTimeOut());
481+
if (kv != null && kv.getValue() != null) {
482+
mainTableInsert = false;
483+
}
484+
}
485+
}
431486
}
432487
start = insert(
433488
context,
@@ -448,8 +503,8 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
448503
isDocument,
449504
index,
450505
indexOldDeleteKey,
451-
unique,
452-
indexOldTuple);
506+
indexOldTuple,
507+
mainTableInsert);
453508
profile.step5(start);
454509
return true;
455510
}
@@ -472,8 +527,8 @@ private static long insert(Context context,
472527
boolean isDocument,
473528
IndexTable index,
474529
byte[] indexOldDeleteKey,
475-
boolean unique,
476-
Object[] indexOldTuple) {
530+
Object[] indexOldTuple,
531+
boolean mainTableInsert) {
477532
if (context.isWithoutPrimary()) {
478533
schema.setCheckFieldCount(false);
479534
DingoType dingoType = codec.getDingoType();
@@ -851,14 +906,14 @@ private static long insert(Context context,
851906
return start;
852907
}
853908
if (localStore.put(keyValue) && context.getIndexId() == null) {
854-
if (!context.isDuplicateKey()) {
909+
if (!context.isDuplicateKey() && mainTableInsert) {
855910
param.inc(num);
856911
}
857912
context.addKeyState(true);
858913
if (context.isDuplicateKey() && oldTuple != null) {
859914
Long updateNum = Optional.mapOrGet(pair, Pair::getValue, () -> 0L);
860915
if (updateNum > 0) {
861-
param.inc(unique ? 1 : 2);
916+
param.inc(2);
862917
}
863918
}
864919
}

0 commit comments

Comments
 (0)