Skip to content

Commit b36e7db

Browse files
committed
Refactor logstream
1 parent f91491f commit b36e7db

File tree

1 file changed

+35
-15
lines changed

1 file changed

+35
-15
lines changed

lib/logstream/logstream.go

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -441,15 +441,34 @@ func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) {
441441

442442
// Stop stops the consumer and closes listening channel (it won't be accepting any logs from now on)
443443
func (g *ContainerLogConsumer) stop() error {
444-
if g.isDone {
445-
return nil
446-
}
444+
var stopErr error
445+
446+
// Use sync.Once to ensure the stop logic is executed only once
447+
g.stopOnce.Do(func() {
448+
// Send a signal to the listening channel if possible
449+
select {
450+
case g.logListeningDone <- struct{}{}:
451+
default:
452+
// No action needed if the channel is already full
453+
}
447454

448-
g.isDone = true
449-
g.logListeningDone <- struct{}{}
450-
defer close(g.logListeningDone)
455+
// Close the channel
456+
close(g.logListeningDone)
457+
})
451458

452-
return nil
459+
return stopErr
460+
}
461+
462+
// IsStopped checks if the consumer has already been stopped
463+
func (g *ContainerLogConsumer) IsStopped() bool {
464+
select {
465+
case <-g.logListeningDone:
466+
// Channel has been closed, meaning stop() was called
467+
return true
468+
default:
469+
// Channel is still open
470+
return false
471+
}
453472
}
454473

455474
// DisconnectContainer disconnects particular container
@@ -558,7 +577,7 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
558577
// set the cursor to the end of the file, when done to resume writing, unless it was closed
559578
//revive:disable
560579
defer func() {
561-
if !consumer.isDone {
580+
if !consumer.IsStopped() {
562581
_, deferErr := consumer.tempFile.Seek(0, 2)
563582
attachError(deferErr)
564583
}
@@ -623,8 +642,10 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
623642
// FlushLogsToTargets flushes all logs for all consumers (containers) to their targets
624643
func (m *LogStream) FlushLogsToTargets() error {
625644
var preExecuteFn = func(consumer *ContainerLogConsumer) error {
626-
// do not accept any new logs
627-
consumer.isDone = true
645+
// Stop the consumer to ensure it doesn't accept new logs
646+
if err := consumer.stop(); err != nil {
647+
return fmt.Errorf("failed to stop consumer %s: %w", consumer.name, err)
648+
}
628649

629650
for _, handler := range m.logTargetHandlers {
630651
consumer.ls.log.Debug().
@@ -699,7 +720,7 @@ type ContainerLogConsumer struct {
699720
ls *LogStream
700721
tempFile *os.File
701722
encoder *gob.Encoder
702-
isDone bool
723+
stopOnce sync.Once
703724
hasErrored bool
704725
logListeningDone chan struct{}
705726
container LogProducingContainer
@@ -720,7 +741,6 @@ func newContainerLogConsumer(ctx context.Context, lw *LogStream, container LogPr
720741
prefix: prefix,
721742
logTargets: logTargets,
722743
ls: lw,
723-
isDone: false,
724744
hasErrored: false,
725745
logListeningDone: make(chan struct{}, 1),
726746
container: container,
@@ -769,8 +789,7 @@ func (g *ContainerLogConsumer) ResetTempFile() error {
769789
// MarkAsErrored marks the consumer as errored (which makes it stop accepting logs)
770790
func (g *ContainerLogConsumer) MarkAsErrored() {
771791
g.hasErrored = true
772-
g.isDone = true
773-
close(g.logListeningDone)
792+
_ = g.stop()
774793
}
775794

776795
// GetContainer returns the container that this consumer is connected to
@@ -787,7 +806,8 @@ func (g *ContainerLogConsumer) Accept(l tc.Log) {
787806
return
788807
}
789808

790-
if g.isDone {
809+
if g.IsStopped() {
810+
// if the consumer is stopped, we don't want to accept any more logs
791811
return
792812
}
793813

0 commit comments

Comments
 (0)