Skip to content

Commit ccb29ae

Browse files
authored
add committs in objects (#22529)
- Add commit timestamp as the last column in all objects to facilitate data change backtracing. - When reading a legacy object without a commit timestamp, a ConstNull vector will be returned. - The usage example [pkg/vm/engine/tae/blockio/read.go:594](https://github.com/matrixorigin/matrixone/blob/8533657686ca40e92ac5d8dd2cd334f399f6516f/pkg/vm/engine/tae/blockio/read.go#L594) Approved by: @aunjgr, @XuPeng-SH
1 parent 2a6c802 commit ccb29ae

File tree

12 files changed

+92
-57
lines changed

12 files changed

+92
-57
lines changed

pkg/container/types/txnts.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ func (ts TS) ToString() string {
145145
return fmt.Sprintf("%d-%d", ts.Physical(), ts.Logical())
146146
}
147147

148+
// format interface
149+
func (ts TS) String() string {
150+
return ts.ToString()
151+
}
152+
148153
func (ts TS) Valid() bool {
149154
return ts.Physical() >= 0
150155
}

pkg/container/vector/vector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,7 @@ func TestShuffle(t *testing.T) {
14011401
require.NoError(t, err)
14021402
v.Shuffle([]int64{1, 2}, mp)
14031403
require.Equal(t, vs[1:3], MustFixedColWithTypeCheck[types.TS](v))
1404-
require.Equal(t, "[[0 0 0 0 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0 0 0]]", v.String())
1404+
require.Equal(t, "[0-0 0-0]", v.String())
14051405
v.Free(mp)
14061406
require.Equal(t, int64(0), mp.CurrNB())
14071407
}
@@ -2989,7 +2989,7 @@ func TestRowToString(t *testing.T) {
29892989
v := NewVec(types.T_TS.ToType())
29902990
err := AppendFixedList(v, vs, nil, mp)
29912991
require.NoError(t, err)
2992-
require.Equal(t, "[0 0 0 0 0 0 0 0 0 0 0 0]", v.RowToString(1))
2992+
require.Equal(t, "0-0", v.RowToString(1))
29932993
v.Free(mp)
29942994
require.Equal(t, int64(0), mp.CurrNB())
29952995
}

pkg/objectio/funcs.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -148,38 +148,49 @@ func ReadOneBlockWithMeta(
148148
}
149149

150150
var filledEntries []fileservice.IOEntry
151+
putFillHolder := func(i int, seqnum uint16) {
152+
if filledEntries == nil {
153+
filledEntries = make([]fileservice.IOEntry, len(seqnums))
154+
}
155+
filledEntries[i] = fileservice.IOEntry{
156+
Size: int64(seqnum), // a marker, it can not be zero
157+
}
158+
}
159+
151160
blkmeta := meta.GetBlockMeta(uint32(blk))
152161
maxSeqnum := blkmeta.GetMaxSeqnum()
153162
for i, seqnum := range seqnums {
154163
// special columns
155164
if seqnum >= SEQNUM_UPPER {
156165
metaColCnt := blkmeta.GetMetaColumnCount()
157-
// read appendable block file, the last columns is commits and abort
158-
if seqnum == SEQNUM_COMMITTS {
166+
switch seqnum {
167+
case SEQNUM_COMMITTS:
159168
seqnum = metaColCnt - 1
160-
} else if seqnum == SEQNUM_ABORT {
169+
case SEQNUM_ABORT:
161170
panic("not support")
162-
} else {
171+
default:
163172
panic(fmt.Sprintf("bad path to read special column %d", seqnum))
164173
}
174+
// if the last column is not commits, do not read it
175+
// 1. created by cn
176+
// 2. old version tn nonappendable block
165177
col := blkmeta.ColumnMeta(seqnum)
166-
ext := col.Location()
167-
ioVec.Entries = append(ioVec.Entries, fileservice.IOEntry{
168-
Offset: int64(ext.Offset()),
169-
Size: int64(ext.Length()),
170-
ToCacheData: factory(int64(ext.OriginSize()), ext.Alg()),
171-
})
178+
if col.DataType() != uint8(types.T_TS) {
179+
putFillHolder(i, seqnum)
180+
} else {
181+
ext := col.Location()
182+
ioVec.Entries = append(ioVec.Entries, fileservice.IOEntry{
183+
Offset: int64(ext.Offset()),
184+
Size: int64(ext.Length()),
185+
ToCacheData: factory(int64(ext.OriginSize()), ext.Alg()),
186+
})
187+
}
172188
continue
173189
}
174190

175191
// need fill vector
176192
if seqnum > maxSeqnum || blkmeta.ColumnMeta(seqnum).DataType() == 0 {
177-
if filledEntries == nil {
178-
filledEntries = make([]fileservice.IOEntry, len(seqnums))
179-
}
180-
filledEntries[i] = fileservice.IOEntry{
181-
Size: int64(seqnum), // a marker, it can not be zero
182-
}
193+
putFillHolder(i, seqnum)
183194
continue
184195
}
185196

pkg/objectio/ioutil/loadfuncs.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@ package ioutil
1616

1717
import (
1818
"context"
19-
"github.com/matrixorigin/matrixone/pkg/logutil"
2019
"math"
2120

22-
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
23-
2421
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2522
"github.com/matrixorigin/matrixone/pkg/container/batch"
2623
"github.com/matrixorigin/matrixone/pkg/container/types"
2724
"github.com/matrixorigin/matrixone/pkg/container/vector"
2825
"github.com/matrixorigin/matrixone/pkg/fileservice"
26+
"github.com/matrixorigin/matrixone/pkg/logutil"
2927
"github.com/matrixorigin/matrixone/pkg/objectio"
28+
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
3029
)
3130

3231
func LoadColumnsData(

pkg/sql/colexec/table_clone/table_clone.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func clone(
353353
col, area := vector.MustVarlenaRawData(bat.Vecs[idx])
354354
for i := range col {
355355
stats := objectio.ObjectStats(col[i].GetByteSlice(area))
356-
if stats.GetAppendable() || !stats.GetCNCreated() {
356+
if stats.GetAppendable() /*|| !stats.GetCNCreated()*/ {
357357
return moerr.NewInternalErrorNoCtxf("object fmt wrong: %s", stats.FlagString())
358358
}
359359

pkg/vm/engine/disttae/table_meta_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (r *TableMetaReader) collect(
227227
rowCnt += int(obj.ObjectStats.Rows())
228228

229229
if err = colexec.ExpandObjectStatsToBatch(
230-
mp, isTombstone, outBatch, true, obj.ObjectStats); err != nil {
230+
mp, isTombstone, outBatch, false, obj.ObjectStats); err != nil {
231231
return nil, err
232232
}
233233

pkg/vm/engine/tae/rpc/handle.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -790,9 +790,10 @@ func (h *Handle) HandleWrite(
790790
statsVec := req.Batch.Vecs[0]
791791
for i := 0; i < statsVec.Length(); i++ {
792792
s := objectio.ObjectStats(statsVec.GetBytesAt(i))
793-
if !s.GetCNCreated() {
794-
logutil.Fatalf("the `CNCreated` mask not set: %s", s.String())
795-
}
793+
// do not check because clone will send reusable objects to tn
794+
// if !s.GetCNCreated() {
795+
// logutil.Infof("the `CNCreated` mask not set: %s", s.String())
796+
// }
796797
persistedMemoryInsertRows += int(s.Rows())
797798
}
798799
err = tb.AddDataFiles(

pkg/vm/engine/tae/rpc/tool.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -762,9 +762,12 @@ func (c *objGetArg) GetData(ctx context.Context) (res string, err error) {
762762
return
763763
}
764764
col := blk.ColumnMeta(idx)
765-
col.ZoneMap()
766-
idxs = append(idxs, idx)
767765
tp := types.T(col.DataType()).ToType()
766+
if col.DataType() == uint8(types.T_TS) && i == len(c.cols)-1 {
767+
idxs = append(idxs, objectio.SEQNUM_COMMITTS)
768+
} else {
769+
idxs = append(idxs, idx)
770+
}
768771
typs = append(typs, tp)
769772
}
770773

pkg/vm/engine/tae/tables/jobs/flushTableTail.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -586,10 +586,8 @@ func (task *flushTableTailTask) mergeAObjs(ctx context.Context, isTombstone bool
586586
}
587587
seqnums = append(seqnums, def.SeqNum)
588588
}
589-
if isTombstone {
590-
readColIdxs = append(readColIdxs, objectio.SEQNUM_COMMITTS)
591-
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
592-
}
589+
readColIdxs = append(readColIdxs, objectio.SEQNUM_COMMITTS)
590+
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
593591

594592
// read from aobjects
595593
readedBats := make([]*containers.Batch, 0, len(objHandles))

pkg/vm/engine/tae/tables/jobs/mergeobjects.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,8 @@ func NewMergeObjectsTask(
152152
task.idxs = append(task.idxs, def.Idx)
153153
task.attrs = append(task.attrs, def.Name)
154154
}
155-
if isTombstone {
156-
task.idxs = append(task.idxs, objectio.SEQNUM_COMMITTS)
157-
task.attrs = append(task.attrs, objectio.TombstoneAttr_CommitTs_Attr)
158-
}
155+
task.idxs = append(task.idxs, objectio.SEQNUM_COMMITTS)
156+
task.attrs = append(task.attrs, objectio.TombstoneAttr_CommitTs_Attr)
159157
task.BaseTask = tasks.NewBaseTask(task, tasks.DataCompactionTask, ctx)
160158

161159
if task.GetTotalSize() > 300*common.Const1MBytes {
@@ -369,9 +367,7 @@ func (task *mergeObjectsTask) PrepareNewWriter() *ioutil.BlockWriter {
369367
}
370368
seqnums = append(seqnums, def.SeqNum)
371369
}
372-
if task.isTombstone {
373-
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
374-
}
370+
seqnums = append(seqnums, objectio.SEQNUM_COMMITTS)
375371
sortkeyIsPK := false
376372
sortkeyPos := -1
377373

0 commit comments

Comments
 (0)