Skip to content

Commit c8cabb4

Browse files
fix changes handle (#23250)
fix changes handle Approved by: @XuPeng-SH
1 parent 139c1b7 commit c8cabb4

File tree

3 files changed

+117
-4
lines changed

3 files changed

+117
-4
lines changed

pkg/vm/engine/disttae/logtailreplay/change_handle.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1471,7 +1471,7 @@ func updateDataBatch(bat *batch.Batch, start, end types.TS, mp *mpool.MPool) {
14711471
func updateCNTombstoneBatch(bat *batch.Batch, committs types.TS, mp *mpool.MPool) {
14721472
var pk *vector.Vector
14731473
for _, vec := range bat.Vecs {
1474-
if vec.GetType().Oid != types.T_Rowid {
1474+
if vec.GetType().Oid != types.T_Rowid || vec.GetType().Oid != types.T_TS {
14751475
pk = vec
14761476
} else {
14771477
vec.Free(mp)
@@ -1485,6 +1485,12 @@ func updateCNTombstoneBatch(bat *batch.Batch, committs types.TS, mp *mpool.MPool
14851485
bat.Attrs = []string{objectio.TombstoneAttr_PK_Attr, objectio.DefaultCommitTS_Attr}
14861486
}
14871487
func updateCNDataBatch(bat *batch.Batch, commitTS types.TS, mp *mpool.MPool) {
1488+
for i, vec := range bat.Vecs {
1489+
if vec.GetType().Oid == types.T_TS {
1490+
vec.Free(mp)
1491+
bat.Vecs = append(bat.Vecs[:i], bat.Vecs[i+1:]...)
1492+
}
1493+
}
14881494
commitTSVec, err := vector.NewConstFixed(types.T_TS.ToType(), commitTS, bat.Vecs[0].Length(), mp)
14891495
if err != nil {
14901496
return
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2024 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package logtailreplay
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/require"
21+
22+
"github.com/matrixorigin/matrixone/pkg/common/mpool"
23+
"github.com/matrixorigin/matrixone/pkg/container/batch"
24+
"github.com/matrixorigin/matrixone/pkg/container/types"
25+
"github.com/matrixorigin/matrixone/pkg/container/vector"
26+
)
27+
28+
func TestUpdateCNDataBatch_RemoveTSVector(t *testing.T) {
29+
mp := mpool.MustNewZero()
30+
31+
// Create a batch with multiple vectors including T_TS type
32+
bat := batch.NewWithSize(3)
33+
34+
// Create an int64 vector (non-TS)
35+
vec1 := vector.NewVec(types.T_int64.ToType())
36+
vector.AppendFixed(vec1, int64(1), false, mp)
37+
vector.AppendFixed(vec1, int64(2), false, mp)
38+
bat.Vecs[0] = vec1
39+
40+
// Create a T_TS vector (should be removed)
41+
tsVec1 := vector.NewVec(types.T_TS.ToType())
42+
vector.AppendFixed(tsVec1, types.BuildTS(1, 0), false, mp)
43+
vector.AppendFixed(tsVec1, types.BuildTS(2, 0), false, mp)
44+
bat.Vecs[1] = tsVec1
45+
46+
// Create another int64 vector (non-TS)
47+
vec2 := vector.NewVec(types.T_int64.ToType())
48+
vector.AppendFixed(vec2, int64(3), false, mp)
49+
vector.AppendFixed(vec2, int64(4), false, mp)
50+
bat.Vecs[2] = vec2
51+
52+
bat.SetRowCount(2)
53+
54+
// Verify initial state: should have 3 vectors
55+
require.Equal(t, 3, len(bat.Vecs))
56+
require.Equal(t, types.T_int64, bat.Vecs[0].GetType().Oid)
57+
require.Equal(t, types.T_TS, bat.Vecs[1].GetType().Oid)
58+
require.Equal(t, types.T_int64, bat.Vecs[2].GetType().Oid)
59+
60+
// Call updateCNDataBatch
61+
newCommitTS := types.BuildTS(100, 0)
62+
updateCNDataBatch(bat, newCommitTS, mp)
63+
64+
// Verify T_TS vector is removed and new commitTS vector is added at the end
65+
require.Equal(t, 3, len(bat.Vecs))
66+
require.Equal(t, types.T_int64, bat.Vecs[0].GetType().Oid)
67+
require.Equal(t, types.T_int64, bat.Vecs[1].GetType().Oid)
68+
require.Equal(t, types.T_TS, bat.Vecs[2].GetType().Oid)
69+
70+
// Verify the new commitTS vector has the correct value
71+
require.True(t, bat.Vecs[2].IsConst())
72+
tsVal := vector.MustFixedColWithTypeCheck[types.TS](bat.Vecs[2])[0]
73+
require.Equal(t, newCommitTS, tsVal)
74+
75+
bat.Clean(mp)
76+
}
77+
78+
func TestUpdateCNDataBatch_NoTSVector(t *testing.T) {
79+
mp := mpool.MustNewZero()
80+
81+
// Create a batch without T_TS vectors
82+
bat := batch.NewWithSize(2)
83+
84+
vec1 := vector.NewVec(types.T_int64.ToType())
85+
vector.AppendFixed(vec1, int64(1), false, mp)
86+
vector.AppendFixed(vec1, int64(2), false, mp)
87+
bat.Vecs[0] = vec1
88+
89+
vec2 := vector.NewVec(types.T_int64.ToType())
90+
vector.AppendFixed(vec2, int64(3), false, mp)
91+
vector.AppendFixed(vec2, int64(4), false, mp)
92+
bat.Vecs[1] = vec2
93+
94+
bat.SetRowCount(2)
95+
96+
// Verify initial state: should have 2 vectors
97+
require.Equal(t, 2, len(bat.Vecs))
98+
99+
// Call updateCNDataBatch
100+
newCommitTS := types.BuildTS(100, 0)
101+
updateCNDataBatch(bat, newCommitTS, mp)
102+
103+
// Verify commitTS vector is added at the end
104+
require.Equal(t, 3, len(bat.Vecs))
105+
require.Equal(t, types.T_int64, bat.Vecs[0].GetType().Oid)
106+
require.Equal(t, types.T_int64, bat.Vecs[1].GetType().Oid)
107+
require.Equal(t, types.T_TS, bat.Vecs[2].GetType().Oid)
108+
109+
bat.Clean(mp)
110+
}

pkg/vm/engine/tae/txn/txnbase/mvccnode.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2222
"github.com/matrixorigin/matrixone/pkg/container/types"
2323
"github.com/matrixorigin/matrixone/pkg/container/vector"
24-
"github.com/matrixorigin/matrixone/pkg/logutil"
2524
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
2625
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
2726
)
@@ -79,7 +78,6 @@ func (un *TxnMVCCNode) CheckConflict(txn txnif.TxnReader) error {
7978
if un.IsSameTxn(txn) {
8079
return nil
8180
}
82-
logutil.Infof("Dedup-WW, prev txn is %x, new txn is %x", un.Txn.GetID(), txn.GetID())
8381
return txnif.ErrTxnWWConflict
8482
}
8583

@@ -88,7 +86,6 @@ func (un *TxnMVCCNode) CheckConflict(txn txnif.TxnReader) error {
8886
// ts CommitTs time
8987
startTS := txn.GetStartTS()
9088
if un.End.GT(&startTS) {
91-
logutil.Infof("Dedup-WW, prev ts is %x, new txn is %x", un.End.ToString(), startTS.ToString())
9289
return txnif.ErrTxnWWConflict
9390
}
9491
return nil

0 commit comments

Comments
 (0)