@@ -5,6 +5,10 @@ import (
55 "sync"
66)
77
8+ // Segments manages multiple segments.
9+ // It is responsible for appending data to the active segment,
10+ // reading data from segments, and rolling over to a new segment
11+ // when the current segment is full.
812type Segments struct {
913 config * Config
1014 active * Segment
@@ -14,6 +18,7 @@ type Segments struct {
1418 mu * sync.Mutex
1519}
1620
21+ // NewSegments creates a new Segments instance with the given configuration and index.
1722func NewSegments (config * Config , index * Index ) (* Segments , error ) {
1823 segment , err := NewSegment (0 , config )
1924 if err != nil {
@@ -28,6 +33,8 @@ func NewSegments(config *Config, index *Index) (*Segments, error) {
2833 }, nil
2934}
3035
36+ // Append appends data to the active segment.
37+ // If the active segment is full, it rolls over to a new segment.
3138func (s * Segments ) Append (data []byte ) (int , error ) {
3239 if s .active .isFull (s .config .maxSegmentSizeInBytes ) {
3340 if err := s .rollOverSegment (); err != nil {
@@ -42,6 +49,12 @@ func (s *Segments) Append(data []byte) (int, error) {
4249 return messageId , nil
4350}
4451
52+ // Read reads data from the segment with the given message ID.
53+ // It retrieves the offset from the index and reads the data from the corresponding segment.
54+ // If the message ID is unknown, it returns an error.
55+ // If the segment is not found, it returns an error.
56+ // It returns the data read from the segment or an error if the operation fails.
57+ // It locks the segment for reading to ensure thread safety.
4558func (s * Segments ) Read (messageId int ) ([]byte , error ) {
4659 entry , ok := s .index .GetOffset (messageId )
4760 if ! ok {
@@ -54,10 +67,12 @@ func (s *Segments) Read(messageId int) ([]byte, error) {
5467 return segment .Read (entry .offset )
5568}
5669
70+ // Flush flushes any pending writes to the active segment.
5771func (s * Segments ) Flush () {
5872 s .active .store .Flush ()
5973}
6074
75+ // findSegment finds a segment by its ID.
6176func (s * Segments ) findSegment (segmentId int ) (* Segment , error ) {
6277 if s .active .id == segmentId {
6378 return s .active , nil
@@ -70,6 +85,9 @@ func (s *Segments) findSegment(segmentId int) (*Segment, error) {
7085 return nil , fmt .Errorf ("unknown segment %d" , segmentId )
7186}
7287
88+ // rollOverSegment rolls over to a new segment.
89+ // It closes the current active segment writer,
90+ // appends it to the closed segments list, and creates a new active segment.
7391func (s * Segments ) rollOverSegment () error {
7492 s .mu .Lock ()
7593 defer s .mu .Unlock ()
@@ -85,6 +103,8 @@ func (s *Segments) rollOverSegment() error {
85103 return nil
86104}
87105
106+ // Close closes the active segment and all closed segments.
107+ // It flushes any pending writes to the store and releases resources.
88108func (s * Segments ) Close () error {
89109 if err := s .active .Close (); err != nil {
90110 return err
0 commit comments