-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathmemory.go
More file actions
177 lines (152 loc) · 4.61 KB
/
memory.go
File metadata and controls
177 lines (152 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package irzstd
import (
"bytes"
"fmt"
"io"
"github.com/klauspost/compress/zstd"
"github.com/y-scope/clp-ffi-go/ffi"
"github.com/y-scope/clp-ffi-go/ir"
)
// Converts log events into Zstd compressed IR. Log events provided to writer are immediately
// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer
// receives logs, they are immediately sent to s3.
type memoryWriter struct {
zstdBuffer *bytes.Buffer
irWriter *ir.Writer
size int
timezone string
zstdWriter *zstd.Encoder
closed bool
}
// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is
// off.
//
// Parameters:
// - timezone: Time zone of the log source
// - size: Byte length
//
// Returns:
// - memoryWriter: Memory writer for Zstd compressed IR
// - err: Error opening Zstd/IR writers
func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) {
var zstdBuffer bytes.Buffer
irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size)
if err != nil {
return nil, err
}
memoryWriter := memoryWriter{
size: size,
timezone: timezone,
irWriter: irWriter,
zstdWriter: zstdWriter,
zstdBuffer: &zstdBuffer,
}
return &memoryWriter, nil
}
// Converts log events to Zstd compressed IR and outputs to the Zstd buffer.
//
// Parameters:
// - logEvents: A slice of log events to be encoded
//
// Returns:
// - numEvents: Number of log events successfully written to IR writer buffer
// - err: Error writing IR/Zstd
func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
numEvents, err := writeIr(w.irWriter, logEvents)
if err != nil {
return numEvents, err
}
_, err = w.irWriter.WriteTo(w.zstdWriter)
return numEvents, err
}
// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. After
// calling close, [memoryWriter] must be reset prior to calling write.
//
// Returns:
// - err: Error closing buffers
func (w *memoryWriter) CloseStreams() error {
_, err := w.irWriter.CloseTo(w.zstdWriter)
if err != nil {
return err
}
w.irWriter = nil
err = w.zstdWriter.Close()
w.closed = true
return err
}
// Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers
// and associated buffers.
//
// Returns:
// - err: Error opening IR writer
func (w *memoryWriter) Reset() error {
var err error
w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone)
if err != nil {
return err
}
w.zstdBuffer.Reset()
w.zstdWriter.Reset(w.zstdBuffer)
w.closed = false
return nil
}
// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
func (w *memoryWriter) GetUseDiskBuffer() bool {
return false
}
// Getter for closed.
//
// Returns:
// - closed: Boolean that is true if IR and Zstd streams are closed.
func (w *memoryWriter) GetClosed() bool {
return w.closed
}
// Getter for Zstd Output.
//
// Returns:
// - zstdOutput: Reader for Zstd output
func (w *memoryWriter) GetZstdOutput() io.Reader {
return w.zstdBuffer
}
// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
// Instead, calling Len() on buffer. Try to avoid calling this as will flush Zstd Writer
// potentially creating unnecessary frames.
//
// Returns:
// - size: Bytes written
// - err: nil error to comply with interface
func (w *memoryWriter) GetZstdOutputSize() (int, error) {
w.zstdWriter.Flush()
return w.zstdBuffer.Len(), nil
}
// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will
// flush Zstd Writer potentially creating unnecessary frames.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: nil error to comply with interface
func (w *memoryWriter) CheckEmpty() (bool, error) {
w.zstdWriter.Flush()
empty := w.zstdBuffer.Len() == 0
return empty, nil
}
// Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere.
// Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not
// added. It is preferable to add postamble on recovery so that IR is in the same state
// (i.e. not terminated) for an abrupt crash and a graceful exit. Function does not call
// [zstd.Encoder.Close] as it does not explicitly free memory and may add undesirable null frame.
//
// Returns:
// - err: Error closing irWriter, error closing files
func (w *memoryWriter) Close() error {
if w.irWriter != nil {
err := w.irWriter.Serializer.Close()
if err != nil {
return fmt.Errorf("error could not close irWriter: %w", err)
}
}
return nil
}