Skip to content

Commit af81e8a

Browse files
authored
TT-1856 Fix err consumer has finished but you are still trying to accept logs (#1365)
1 parent f1f8047 commit af81e8a

File tree

3 files changed

+48
-26
lines changed

3 files changed

+48
-26
lines changed

lib/.changeset/v1.50.16.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- TT-1856 Fix err consumer has finished but you are still trying to accept logs

lib/logging/log.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (ct *CustomT) Write(p []byte) (n int, err error) {
4040
}
4141
if ct.ended {
4242
l := GetTestLogger(nil)
43-
l.Error().Msgf("%s %s: %s", afterTestEndedMsg, ct.Name(), string(p))
43+
l.Warn().Msgf("%s %s: %s", afterTestEndedMsg, ct.Name(), string(p))
4444
return len(p), nil
4545
}
4646
ct.T.Log(strings.TrimSuffix(str, "\n"))
@@ -53,7 +53,7 @@ func (ct CustomT) Printf(format string, v ...interface{}) {
5353
s := "%s: "
5454
formatted := fmt.Sprintf("%s %s%s", afterTestEndedMsg, s, format)
5555
l := GetTestLogger(nil)
56-
l.Error().Msgf(formatted, ct.Name(), v)
56+
l.Warn().Msgf(formatted, ct.Name(), v)
5757
} else {
5858
ct.L.Info().Msgf(format, v...)
5959
}

lib/logstream/logstream.go

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing
229229
select {
230230
case logErr := <-container.GetLogProductionErrorChannel():
231231
if logErr != nil {
232+
// Check if the container is not stopped or terminated
233+
if !container.IsRunning() {
234+
m.log.Info().
235+
Str("Container name", name).
236+
Msg("Skipping log producer retrying, because the container is not running anymore")
237+
break
238+
}
239+
232240
m.log.Error().
233241
Err(err).
234242
Str("Container name", name).
@@ -441,31 +449,48 @@ func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) {
441449

442450
// Stop stops the consumer and closes listening channel (it won't be accepting any logs from now on)
443451
func (g *ContainerLogConsumer) stop() error {
444-
if g.isDone {
445-
return nil
446-
}
452+
var stopErr error
453+
454+
// Use sync.Once to ensure the stop logic is executed only once
455+
g.stopOnce.Do(func() {
456+
// Send a signal to the listening channel if possible
457+
select {
458+
case g.logListeningDone <- struct{}{}:
459+
default:
460+
// No action needed if the channel is already full
461+
}
462+
463+
// Close the channel
464+
close(g.logListeningDone)
465+
})
447466

448-
g.isDone = true
449-
g.logListeningDone <- struct{}{}
450-
defer close(g.logListeningDone)
467+
return stopErr
468+
}
451469

452-
return nil
470+
// IsStopped checks if the consumer has already been stopped
471+
func (g *ContainerLogConsumer) IsStopped() bool {
472+
select {
473+
case <-g.logListeningDone:
474+
// Channel has been closed, meaning stop() was called
475+
return true
476+
default:
477+
// Channel is still open
478+
return false
479+
}
453480
}
454481

455482
// DisconnectContainer disconnects particular container
456483
func (m *LogStream) DisconnectContainer(container LogProducingContainer) error {
457484
var err error
458485

459-
if container.IsRunning() {
460-
m.log.Trace().Str("container", container.GetContainerID()).Msg("Disconnecting container")
461-
err = container.StopLogProducer()
462-
}
486+
m.log.Info().Str("Container", container.GetContainerID()).Msg("Disconnecting container")
463487

464488
consumerFound := false
465489
m.consumerMutex.RLock()
466490
for _, consumer := range m.consumers {
467491
if consumer.container.GetContainerID() == container.GetContainerID() {
468492
consumerFound = true
493+
m.log.Info().Str("Container", consumer.name).Msg("Stopping consumer")
469494
if stopErr := consumer.stop(); err != nil {
470495
m.log.Error().
471496
Err(stopErr).
@@ -555,7 +580,7 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
555580
// set the cursor to the end of the file, when done to resume writing, unless it was closed
556581
//revive:disable
557582
defer func() {
558-
if !consumer.isDone {
583+
if !consumer.IsStopped() {
559584
_, deferErr := consumer.tempFile.Seek(0, 2)
560585
attachError(deferErr)
561586
}
@@ -620,8 +645,10 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
620645
// FlushLogsToTargets flushes all logs for all consumers (containers) to their targets
621646
func (m *LogStream) FlushLogsToTargets() error {
622647
var preExecuteFn = func(consumer *ContainerLogConsumer) error {
623-
// do not accept any new logs
624-
consumer.isDone = true
648+
// Stop the consumer to ensure it doesn't accept new logs
649+
if err := consumer.stop(); err != nil {
650+
return fmt.Errorf("failed to stop consumer %s: %w", consumer.name, err)
651+
}
625652

626653
for _, handler := range m.logTargetHandlers {
627654
consumer.ls.log.Debug().
@@ -696,7 +723,7 @@ type ContainerLogConsumer struct {
696723
ls *LogStream
697724
tempFile *os.File
698725
encoder *gob.Encoder
699-
isDone bool
726+
stopOnce sync.Once
700727
hasErrored bool
701728
logListeningDone chan struct{}
702729
container LogProducingContainer
@@ -717,7 +744,6 @@ func newContainerLogConsumer(ctx context.Context, lw *LogStream, container LogPr
717744
prefix: prefix,
718745
logTargets: logTargets,
719746
ls: lw,
720-
isDone: false,
721747
hasErrored: false,
722748
logListeningDone: make(chan struct{}, 1),
723749
container: container,
@@ -766,8 +792,7 @@ func (g *ContainerLogConsumer) ResetTempFile() error {
766792
// MarkAsErrored marks the consumer as errored (which makes it stop accepting logs)
767793
func (g *ContainerLogConsumer) MarkAsErrored() {
768794
g.hasErrored = true
769-
g.isDone = true
770-
close(g.logListeningDone)
795+
_ = g.stop()
771796
}
772797

773798
// GetContainer returns the container that this consumer is connected to
@@ -784,12 +809,8 @@ func (g *ContainerLogConsumer) Accept(l tc.Log) {
784809
return
785810
}
786811

787-
if g.isDone {
788-
g.ls.log.Error().
789-
Str("Test", g.ls.testName).
790-
Str("Container", g.name).
791-
Str("Log", string(l.Content)).
792-
Msg("Consumer has finished, but you are still trying to accept logs. This should never happen")
812+
if g.IsStopped() {
813+
// if the consumer is stopped, we don't want to accept any more logs
793814
return
794815
}
795816

0 commit comments

Comments
 (0)