Skip to content

Commit 6e75e1d

Browse files
committed
fix after review
1 parent 2788235 commit 6e75e1d

File tree

5 files changed

+19
-58
lines changed

5 files changed

+19
-58
lines changed

plugin/input/file/offset.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,7 @@ func (o *offsetDB) parseOptionalLine(content string, prefix string) (string, str
220220
return o.parseLine(content, prefix)
221221
}
222222

223-
if strings.HasPrefix(content, " streams:") || content[0] == '-' {
224-
return "", content, nil
225-
}
226-
227-
linePos := strings.IndexByte(content, '\n')
228-
var nextLine string
229-
if linePos >= 0 {
230-
nextLine = content[:linePos]
231-
} else {
232-
nextLine = content
233-
}
234-
return "", "", fmt.Errorf("unexpected line format, expected %q or streams section, got %q", prefix, safeSubstring(nextLine, len(prefix)))
223+
return "", content, nil
235224
}
236225

237226
func safeSubstring(s string, length int) string {

plugin/input/file/offset_test.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -387,32 +387,6 @@ func TestParseOptionalLineLastReadTimestamp(t *testing.T) {
387387
expectedRemain: "",
388388
desc: "empty content",
389389
},
390-
391-
// Error cases
392-
{
393-
name: "unexpected line before streams",
394-
content: " unknown_field: value\n streams:",
395-
expectedErr: true,
396-
desc: "unexpected line causes error",
397-
},
398-
{
399-
name: "wrong field name",
400-
content: " lastreadtimestamp: 1763651665000000000\n streams:",
401-
expectedErr: true,
402-
desc: "wrong field name (no underscores)",
403-
},
404-
{
405-
name: "case mismatch",
406-
content: " Last_Read_Timestamp: 1763651665000000000\n streams:",
407-
expectedErr: true,
408-
desc: "wrong case in field name",
409-
},
410-
{
411-
name: "missing colon",
412-
content: " last_read_timestamp 1763651665000000000\n streams:",
413-
expectedErr: true,
414-
desc: "missing colon after field name",
415-
},
416390
}
417391

418392
for _, tt := range tests {

plugin/input/file/provider.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"syscall"
1010
"time"
1111

12-
sync_atomic "sync/atomic"
13-
1412
"github.com/ozontech/file.d/logger"
1513
"github.com/ozontech/file.d/pipeline"
1614
"github.com/ozontech/file.d/xtime"
@@ -104,37 +102,38 @@ func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
104102
return n
105103
}
106104

105+
// eofInfo tracks end-of-file state for read operations.
106+
// Used to determine when files can be safely removed after processing.
107107
type eofInfo struct {
108-
timestamp int64
109-
offset int64
108+
timestamp atomic.Int64
109+
offset atomic.Int64
110110
}
111111

112112
func (e *eofInfo) setTimestamp(t time.Time) {
113-
sync_atomic.StoreInt64(&e.timestamp, t.UnixNano())
113+
e.timestamp.Store(t.UnixNano())
114114
}
115115

116116
func (e *eofInfo) getTimestamp() time.Time {
117-
nanos := sync_atomic.LoadInt64(&e.timestamp)
117+
nanos := e.timestamp.Load()
118118
return time.Unix(0, nanos)
119119
}
120120

121121
// setUnixNanoTimestamp sets timestamp in seconds
122122
func (e *eofInfo) setUnixNanoTimestamp(nanos int64) {
123-
sync_atomic.StoreInt64(&e.timestamp, nanos)
123+
e.timestamp.Store(nanos)
124124
}
125125

126126
// getUnixNanoTimestamp returns timestamp in seconds
127127
func (e *eofInfo) getUnixNanoTimestamp() int64 {
128-
nanos := sync_atomic.LoadInt64(&e.timestamp)
129-
return nanos
128+
return e.timestamp.Load()
130129
}
131130

132131
func (e *eofInfo) setOffset(offset int64) {
133-
sync_atomic.StoreInt64(&e.offset, offset)
132+
e.offset.Store(offset)
134133
}
135134

136135
func (e *eofInfo) getOffset() int64 {
137-
return sync_atomic.LoadInt64(&e.offset)
136+
return e.offset.Load()
138137
}
139138

140139
type inodeID uint64

plugin/input/file/provider_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,5 +286,4 @@ func TestEOFInfo(t *testing.T) {
286286
gotTime := e.getTimestamp()
287287
assert.Equal(t, newUnixTs, gotTime.UnixNano())
288288
})
289-
290289
}

plugin/input/file/worker.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
100100
if job.mimeType == "application/x-lz4" {
101101
if isNotFileBeingWritten(file.Name()) {
102102
// lz4 does not support appending, so we check that no one is writing to the file
103-
logger.Error("cannot lock file", zap.String("filename", file.Name()))
103+
logger.Error("cannot process incomplete file. write in progress", zap.String("filename", file.Name()))
104104
break
105105
}
106106
lz4Reader := lz4.NewReader(file)
@@ -132,14 +132,14 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
132132
for {
133133
n, err := reader.Read(readBuf)
134134
controller.IncReadOps()
135-
// if we read to end of file it's time to check truncation etc and process next job
136-
if (!job.isCompressed && err == io.EOF) || n == 0 {
137-
// cause lz4reader can return EOF and n > 0
138-
isEOFReached = true
139-
break
140-
}
141135
if err != nil {
142-
if !job.isCompressed && err != io.EOF {
136+
if (!job.isCompressed && err == io.EOF) || n == 0 {
137+
// cause lz4reader can return EOF and n > 0
138+
isEOFReached = true
139+
break
140+
}
141+
if !(job.isCompressed && (err == io.EOF || err == io.ErrUnexpectedEOF)) {
142+
// except EOF-errors on compressed file
143143
logger.Fatalf("file %d:%s read error, %s read=%d", sourceID, sourceName, err.Error(), n)
144144
}
145145
}

0 commit comments

Comments
 (0)