Skip to content

Commit d4f0bef

Browse files
committed
fix class edits diff syncs
1 parent 034182f commit d4f0bef

File tree

3 files changed

+132
-1
lines changed

3 files changed

+132
-1
lines changed

chotki_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,3 +468,130 @@ func (k *Test) Store(off uint64, rdt byte, old []byte, clock rdx.Clock) (bare []
468468
}
469469
return
470470
}
471+
472+
func TestChotki_ClassEdit(t *testing.T) {
473+
dirs, clear := testdirs(0xa, 0xb)
474+
defer clear()
475+
476+
a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A"})
477+
assert.Nil(t, err)
478+
479+
b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B"})
480+
assert.Nil(t, err)
481+
482+
cid, err := a.NewClass(context.Background(), rdx.ID0, Schema...)
483+
assert.NoError(t, err)
484+
485+
sc, err := a.ClassFields(cid)
486+
assert.NoError(t, err)
487+
assert.Equal(t, Fields(Schema), sc[1:])
488+
489+
_, err = b.ClassFields(cid)
490+
assert.Error(t, err)
491+
492+
syncData(a, b)
493+
494+
sc, err = b.ClassFields(cid)
495+
assert.NoError(t, err)
496+
assert.Equal(t, Fields(Schema), sc[1:])
497+
498+
Schema2 := []Field{
499+
{Name: "test", RdxType: rdx.String, Offset: 0},
500+
{Name: "test2", RdxType: rdx.Integer, Offset: 1},
501+
}
502+
503+
_, err = a.NewClass(context.Background(), cid, Schema2...)
504+
assert.NoError(t, err)
505+
506+
sc, err = a.ClassFields(cid)
507+
assert.NoError(t, err)
508+
assert.Equal(t, Fields(Schema2), sc[1:])
509+
510+
syncData(a, b)
511+
512+
sc, err = b.ClassFields(cid)
513+
assert.NoError(t, err)
514+
assert.Equal(t, Fields(Schema2), sc[1:])
515+
516+
_ = a.Close()
517+
_ = b.Close()
518+
519+
}
520+
521+
func TestChotki_ClassEdit_Live(t *testing.T) {
522+
dirs, clear := testdirs(0xa, 0xb)
523+
defer clear()
524+
525+
a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A", Logger: utils.NewDefaultLogger(slog.LevelInfo)})
526+
assert.Nil(t, err)
527+
defer a.Close()
528+
529+
b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B", Logger: utils.NewDefaultLogger(slog.LevelInfo)})
530+
assert.Nil(t, err)
531+
defer b.Close()
532+
533+
cid, err := a.NewClass(context.Background(), rdx.ID0, Schema...)
534+
assert.NoError(t, err)
535+
536+
sc, err := a.ClassFields(cid)
537+
assert.NoError(t, err)
538+
assert.Equal(t, Fields(Schema), sc[1:])
539+
540+
_, err = b.ClassFields(cid)
541+
assert.Error(t, err)
542+
543+
synca := Syncer{
544+
Host: a,
545+
PingPeriod: 100 * time.Second,
546+
PingWait: 3 * time.Second,
547+
Mode: SyncRWLive,
548+
Name: "a",
549+
Src: a.src,
550+
log: utils.NewDefaultLogger(slog.LevelDebug),
551+
oqueue: &FeedCloserTest{},
552+
}
553+
syncb := Syncer{
554+
Host: b,
555+
PingPeriod: 100 * time.Second,
556+
Mode: SyncRWLive,
557+
PingWait: 3 * time.Second,
558+
Name: "b",
559+
Src: b.src,
560+
log: utils.NewDefaultLogger(slog.LevelDebug),
561+
oqueue: &FeedCloserTest{},
562+
}
563+
564+
ctx, cancel := context.WithCancel(context.Background())
565+
defer cancel()
566+
567+
a.outq.Store("b", &syncb)
568+
569+
go protocol.PumpCtx(ctx, &synca, &syncb)
570+
go protocol.PumpCtx(ctx, &syncb, &synca)
571+
time.Sleep(time.Millisecond * 10)
572+
573+
sc, err = b.ClassFields(cid)
574+
assert.NoError(t, err)
575+
assert.Equal(t, Fields(Schema), sc[1:])
576+
577+
Schema2 := []Field{
578+
{Name: "test", RdxType: rdx.String, Offset: 0},
579+
{Name: "test2", RdxType: rdx.Integer, Offset: 1},
580+
}
581+
582+
_, err = a.NewClass(context.Background(), cid, Schema2...)
583+
assert.NoError(t, err)
584+
585+
sc, err = a.ClassFields(cid)
586+
assert.NoError(t, err)
587+
assert.Equal(t, Fields(Schema2), sc[1:])
588+
589+
time.Sleep(10 * time.Millisecond)
590+
591+
sc, err = b.ClassFields(cid)
592+
assert.NoError(t, err)
593+
assert.Equal(t, Fields(Schema2), sc[1:])
594+
595+
syncb.Close()
596+
synca.Close()
597+
}

packets.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err
2626
d := rdx.UnzipUint64(dzip)
2727
at := ref.ProPlus(d)
2828
rdt, bare, rest = protocol.TakeAny(rest)
29+
// clean cache after class update
30+
if rdt == 'C' {
31+
cho.types.Clear()
32+
}
2933
err = batch.Merge(OKey(at, rdt), bare, cho.opts.PebbleWriteOptions)
3034
if err == nil && rdt == 'O' {
3135
cid := rdx.IDFromZipBytes(bare)

rdx/X.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func X2string(rdt byte, tlv []byte, new_val string, src uint64) (delta []byte) {
170170

171171
func Xdiff(rdt byte, tlv []byte, sendvv VV) (diff []byte) {
172172
switch rdt {
173-
case 'C', 'O', 'Y':
173+
case 'O', 'Y':
174174
diff = nil
175175
case 'F':
176176
diff = Fdiff(tlv, sendvv)

0 commit comments

Comments
 (0)