Skip to content

Commit d12a8cc

Browse files
committed
update docs and remove unsued function
1 parent 72d36df commit d12a8cc

File tree

4 files changed

+135
-40
lines changed

4 files changed

+135
-40
lines changed

chotki.go

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ type syncPoint struct {
176176
start time.Time
177177
}
178178

179-
// TLV all the way down
179+
// Main Chotki struct
180180
type Chotki struct {
181181
last rdx.ID
182182
src uint64
@@ -200,6 +200,7 @@ type Chotki struct {
200200
types *xsync.MapOf[rdx.ID, classes.Fields]
201201
}
202202

203+
// Checks if the directory exists and is a directory.
203204
func Exists(dirname string) (bool, error) {
204205
stats, err := os.Stat(dirname)
205206
if err != nil {
@@ -222,6 +223,8 @@ func Exists(dirname string) (bool, error) {
222223
return desc.Exists, nil
223224
}
224225

226+
// Worket that cleans diff syncs that took too long.
227+
// Otherwise they stay in the memmory forever if can't be finished
225228
func (cho *Chotki) cleanSyncs(ctx context.Context) {
226229
for ctx.Err() == nil {
227230
cho.syncs.Range(func(id rdx.ID, s *syncPoint) bool {
@@ -240,6 +243,7 @@ func (cho *Chotki) cleanSyncs(ctx context.Context) {
240243
}
241244
}
242245

246+
// Opens a new Chotki instance.
243247
func Open(dirname string, opts Options) (*Chotki, error) {
244248
exists, err := Exists(dirname)
245249
if err != nil {
@@ -261,11 +265,12 @@ func Open(dirname string, opts Options) (*Chotki, error) {
261265
ctx, cancel := context.WithCancel(context.Background())
262266
var wg sync.WaitGroup
263267
cho := Chotki{
264-
db: db,
265-
src: opts.Src,
266-
dir: absdir,
267-
log: opts.Logger,
268-
opts: opts,
268+
db: db,
269+
src: opts.Src,
270+
dir: absdir,
271+
log: opts.Logger,
272+
opts: opts,
273+
// TODO: delete as it is unused actually
269274
clock: &rdx.LocalLogicalClock{Source: opts.Src},
270275

271276
outq: xsync.NewMapOf[string, protocol.DrainCloser](),
@@ -318,22 +323,26 @@ func Open(dirname string, opts Options) (*Chotki, error) {
318323
)
319324
cho.IndexManager = indexes.NewIndexManager(&cho)
320325
wg.Add(1)
326+
// reindex tasks are checked in a separate worker
321327
go func() {
322328
defer wg.Done()
323329
cho.IndexManager.CheckReindexTasks(ctx)
324330
}()
325331

326332
wg.Add(1)
333+
// diff syncs are cleaned up in a separate worker
327334
go func() {
328335
defer wg.Done()
329336
cho.cleanSyncs(ctx)
330337
}()
331338

332339
if !exists {
333340
id0 := rdx.IDFromSrcSeqOff(opts.Src, 0, 0)
334-
341+
// apply log0, some default objects for all replicas
335342
init := append(protocol.Records(nil), Log0...)
343+
// log1 is parameter that allows to define some default objects for the replica
336344
init = append(init, opts.Log1...)
345+
// creates a replica object, however it is now unused
337346
init = append(init, protocol.Record('Y',
338347
protocol.Record('I', id0.ZipBytes()),
339348
protocol.Record('R', rdx.ID0.ZipBytes()),
@@ -357,6 +366,7 @@ func Open(dirname string, opts Options) (*Chotki, error) {
357366
return &cho, nil
358367
}
359368

369+
// Gracefully closes the Chotki instance.
360370
func (cho *Chotki) Close() error {
361371
cho.lock.Lock()
362372
defer cho.lock.Unlock()
@@ -393,85 +403,83 @@ func (cho *Chotki) Close() error {
393403
return nil
394404
}
395405

406+
// Returns an atomic counter object that can be used to increment/decrement a counter.
396407
func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *counters.AtomicCounter {
397408
counter, _ := cho.counterCache.LoadOrStore(rid.ToOff(offset), counters.NewAtomicCounter(cho, rid, offset, updatePeriod))
398409
return counter.(*counters.AtomicCounter)
399410
}
400411

401-
// ToyKV convention key, lit O, then O00000-00000000-000 id
412+
// Returns the source id of the Chotki instance.
402413
func (cho *Chotki) Source() uint64 {
403414
return cho.src
404415
}
405416

417+
// Returns the clock of the Chotki instance.
406418
func (cho *Chotki) Clock() rdx.Clock {
407419
return cho.clock
408420
}
409421

422+
// Returns the latest used rdx.ID of current replica
410423
func (cho *Chotki) Last() rdx.ID {
411424
return cho.last
412425
}
413426

427+
// Returns the write options of the Chotki instance.
414428
func (cho *Chotki) WriteOptions() *pebble.WriteOptions {
415429
return cho.opts.PebbleWriteOptions
416430
}
417431

432+
// Returns the logger of the Chotki instance.
418433
func (cho *Chotki) Logger() utils.Logger {
419434
return cho.log
420435
}
421436

437+
// Returns a new snapshot of the Chotki instance.
422438
func (cho *Chotki) Snapshot() pebble.Reader {
423439
return cho.db.NewSnapshot()
424440
}
425441

442+
// Returns the database of the Chotki instance.
426443
func (cho *Chotki) Database() *pebble.DB {
427444
return cho.db
428445
}
429446

447+
// Returns the directory of the Chotki instance.
430448
func (cho *Chotki) Directory() string {
431449
return cho.dir
432450
}
433451

452+
// Returns the new instance of the ORM object
434453
func (cho *Chotki) ObjectMapper() *ORM {
435454
return NewORM(cho, cho.db.NewSnapshot())
436455
}
437456

438-
func (cho *Chotki) RestoreNet() error {
439-
i := cho.db.NewIter(&pebble.IterOptions{})
440-
defer i.Close()
441-
442-
for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
443-
address := string(i.Key()[1:])
444-
_ = cho.net.Listen(address)
445-
}
446-
447-
for i.SeekGE([]byte{'c'}); i.Valid() && i.Key()[0] == 'C'; i.Next() {
448-
address := string(i.Key()[1:])
449-
_ = cho.net.Connect(address)
450-
}
451-
452-
return nil
453-
}
454-
457+
// Starts listening on the given address.
455458
func (cho *Chotki) Listen(addr string) error {
456459
return cho.net.Listen(addr)
457460
}
458461

462+
// Stops listening on the given address.
459463
func (cho *Chotki) Unlisten(addr string) error {
460464
return cho.net.Unlisten(addr)
461465
}
462466

467+
// Connects to the given address.
463468
func (cho *Chotki) Connect(addr string) error {
464469
return cho.net.Connect(addr)
465470
}
466471

472+
// Connects to the given address pool.
467473
func (cho *Chotki) ConnectPool(name string, addrs []string) error {
468474
return cho.net.ConnectPool(name, addrs)
469475
}
470476

477+
// Disconnects from the given address.
471478
func (cho *Chotki) Disconnect(addr string) error {
472479
return cho.net.Disconnect(addr)
473480
}
474481

482+
// Returns the version vector of the Chotki instance as rdx.VV structure.
475483
func (cho *Chotki) VersionVector() (vv rdx.VV, err error) {
476484
val, clo, err := cho.db.Get(host.VKey0)
477485
if err == nil {
@@ -519,6 +527,7 @@ func (cho *Chotki) RemoveAllHooks(fid rdx.ID) {
519527
cho.hooks.Delete(fid)
520528
}
521529

530+
// Broadcasts some records to all active replication sessions that this replica has, except one.
522531
func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, except string) {
523532
cho.outq.Range(func(name string, hose protocol.DrainCloser) bool {
524533
if name != except {
@@ -533,7 +542,11 @@ func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, exce
533542
})
534543
}
535544

536-
// Here new packets are timestamped and queued for save
545+
// Commits records to actual storage (pebble) and broadcasts the update to all active replication sessions.
546+
// Increments replica last rdx.ID and stamps this update with it. This id will be used as an ID of a new object (if it is an object), also
547+
// this will be the latest seen rdx.ID in the version vector.
548+
// Uses an exclusive lock, so all commits are serialized, but remember that Drain/drain calls are not.
549+
// All replication sessions will call Drain in parallel.
537550
func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error) {
538551
// prevent cancellation as it can make this function non atomic
539552
ctx = context.WithoutCancel(ctx)
@@ -549,8 +562,10 @@ func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body
549562
if cho.db == nil {
550563
return rdx.BadId, chotki_errors.ErrClosed
551564
}
565+
// create new last id
552566
id = cho.last.IncPro(1).ZeroOff()
553567
i := protocol.Record('I', id.ZipBytes())
568+
// this is typically some higher order structure (like for object it will be class id, for field update it will be object id)
554569
r := protocol.Record('R', ref.ZipBytes())
555570
packet := protocol.Record(lit, i, r, protocol.Join(body...))
556571
recs := protocol.Records{packet}
@@ -631,6 +646,8 @@ func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric) {
631646
}
632647
n.collected_prev = nw_collected
633648
}
649+
650+
// Returns a list of prometheus collectors for the Chotki instance.
634651
func (cho *Chotki) Metrics() []prometheus.Collector {
635652
cho.db.Metrics()
636653
return []prometheus.Collector{
@@ -652,6 +669,9 @@ func (cho *Chotki) Metrics() []prometheus.Collector {
652669
}
653670
}
654671

672+
// Handles all updates and actually writes the to storage.
673+
// the allowed types are 'C', 'O', 'E', 'H', 'D', 'V', 'B', 'P', 'Y'
674+
// do not confuse it with RDX types
655675
func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error) {
656676
EventsMetric.Add(float64(len(recs)))
657677
var calls []CallHook
@@ -660,77 +680,89 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error)
660680
break
661681
}
662682

683+
// parse the packet to understand what kind of packet is it
663684
lit, id, ref, body, parseErr := replication.ParsePacket(packet)
664685
if parseErr != nil {
665686
cho.log.WarnCtx(ctx, "bad packet", "err", parseErr)
666687
return parseErr
667688
}
668689

690+
// if this is a packet commited by our replica we need to update our last id
691+
// as current commit holds the mutex, it is safe to update this id
669692
if id.Src() == cho.src && cho.last.Less(id) {
670693
if id.Off() != 0 {
671694
return rdx.ErrBadPacket
672695
}
673696
cho.last = id
674697
}
675698

699+
// noApply can be set to true if we don't want to apply the batch
676700
pb, noApply := pebble.Batch{}, false
677701

678702
cho.log.DebugCtx(ctx, "new packet", "type", string(lit), "packet", id.String())
679703

680704
switch lit {
681-
case 'Y': // create replica log
705+
case 'Y': // creates a replica log
682706
if ref != rdx.ID0 {
683707
return ErrBadYPacket
684708
}
685709
err = cho.ApplyOY('Y', id, ref, body, &pb)
686710

687-
case 'C': // create class
711+
case 'C': // creates a class
688712
err = cho.ApplyC(id, ref, body, &pb, &calls)
689713
if err == nil {
690714
// clear cache for classes if class changed
691715
cho.types.Clear()
692716
}
693717

694-
case 'O': // create object
718+
case 'O': // creates an object
695719
if ref == rdx.ID0 {
696720
return ErrBadOPacket
697721
}
698722
err = cho.ApplyOY('O', id, ref, body, &pb)
699723

700-
case 'E': // edit object
724+
case 'E': // edits an object
701725
if ref == rdx.ID0 {
702726
return ErrBadEPacket
703727
}
704728
err = cho.ApplyE(id, ref, body, &pb, &calls)
705729

706730
case 'H': // handshake
707731
d := cho.db.NewBatch()
732+
// we also create a new sync point, pebble batch that we will write diffs to
708733
cho.syncs.Store(id, &syncPoint{batch: d, start: time.Now()})
709734
err = cho.ApplyH(id, ref, body, d)
710735

711-
case 'D': // diff
736+
case 'D': // diff packet
737+
// load sync point if exists
712738
s, ok := cho.syncs.Load(id)
713739
if !ok {
714740
return ErrSyncUnknown
715741
}
716742
err = cho.ApplyD(id, ref, body, s.batch)
743+
// we use separate batch, so noApply is true
717744
noApply = true
718745

719-
case 'V':
746+
case 'V': // version vector sent in the end of diff sync
747+
// load sync point if exists
720748
s, ok := cho.syncs.Load(id)
721749
if !ok {
722750
return ErrSyncUnknown
723751
}
752+
// update blocks version vectors
724753
err = cho.ApplyV(id, ref, body, s.batch)
725754
if err == nil {
755+
// apply batch anddelete sync point as diff sync is finished
726756
err = cho.db.Apply(s.batch, cho.opts.PebbleWriteOptions)
727757
cho.syncs.Delete(id)
728758
cho.log.InfoCtx(ctx, "applied diff batch and deleted it", "id", id)
729759
}
760+
// we don't want to apply the batch as we already applied it
730761
noApply = true
731762

732-
case 'B': // bye dear
763+
case 'B': // session end
733764
cho.log.InfoCtx(ctx, "received session end", "id", id.String(), "data", string(body))
765+
// delete sync point if not already
734766
cho.syncs.Delete(id)
735767
case 'P': // ping noop
736768
default:
@@ -744,7 +776,7 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error)
744776
}
745777
}
746778

747-
if err != nil { // fixme separate packets
779+
if err != nil {
748780
return
749781
}
750782

@@ -757,6 +789,7 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error)
757789
return
758790
}
759791

792+
// Public for drain method with some additional metrics
760793
func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error) {
761794
now := time.Now()
762795
defer func() {
@@ -779,6 +812,7 @@ func dumpKVString(key, value []byte) (str string) {
779812
return
780813
}
781814

815+
// Dumps all objects to the writer.
782816
func (cho *Chotki) DumpObjects(writer io.Writer) {
783817
io := pebble.IterOptions{
784818
LowerBound: []byte{'O'},
@@ -791,6 +825,7 @@ func (cho *Chotki) DumpObjects(writer io.Writer) {
791825
}
792826
}
793827

828+
// Dumps all version vectors to the writer.
794829
func (cho *Chotki) DumpVV(writer io.Writer) {
795830
io := pebble.IterOptions{
796831
LowerBound: []byte{'V'},
@@ -806,6 +841,7 @@ func (cho *Chotki) DumpVV(writer io.Writer) {
806841
}
807842
}
808843

844+
// Dumps all objects and version vectors to the writer.
809845
func (cho *Chotki) DumpAll(writer io.Writer) {
810846
cho.DumpObjects(writer)
811847
fmt.Fprintln(writer, "")

0 commit comments

Comments
 (0)