Skip to content

Commit 705b0da

Browse files
committed
Remove unnecessary part of loki writer
1 parent 405f696 commit 705b0da

File tree

1 file changed

+3
-19
lines changed

1 file changed

+3
-19
lines changed

pkg/pipeline/write/write_loki.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ type Loki struct {
5757
saneLabels map[string]model.LabelName
5858
client emitter
5959
timeNow func() time.Time
60-
in chan config.GenericMap
6160
exitChan <-chan struct{}
6261
metrics *metrics
6362
}
@@ -208,21 +207,9 @@ func getFloat64(timestamp interface{}) (ft float64, ok bool) {
208207
// Write writes a flow before being stored
209208
func (l *Loki) Write(entry config.GenericMap) {
210209
log.Debugf("entering Loki Write")
211-
l.in <- entry
212-
}
213-
214-
func (l *Loki) processRecords() {
215-
for {
216-
select {
217-
case <-l.exitChan:
218-
log.Debugf("exiting writeLoki because of signal")
219-
return
220-
case record := <-l.in:
221-
err := l.ProcessRecord(record)
222-
if err != nil {
223-
log.Errorf("Write (Loki) error %v", err)
224-
}
225-
}
210+
err := l.ProcessRecord(entry)
211+
if err != nil {
212+
log.Errorf("Write (Loki) error %v", err)
226213
}
227214
}
228215

@@ -278,11 +265,8 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
278265
client: client,
279266
timeNow: time.Now,
280267
exitChan: pUtils.ExitChannel(),
281-
in: in,
282268
metrics: newMetrics(opMetrics, params.Name),
283269
}
284270

285-
go l.processRecords()
286-
287271
return l, nil
288272
}

0 commit comments

Comments
 (0)