Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions internal/irzstd/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,6 @@ func (w *diskWriter) Close() error {
return nil
}

// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
func (w *diskWriter) GetUseDiskBuffer() bool {
return true
}

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -284,6 +276,21 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) {
return w.getZstdFileSize()
}

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: Error calling stat
func (w *diskWriter) CheckEmpty() (bool, error) {
zstdFileInfo, err := w.zstdFile.Stat()
if err != nil {
return false, err
}

empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0)
return empty, nil
}

// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then
// truncated.
//
Expand Down
29 changes: 17 additions & 12 deletions internal/irzstd/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/y-scope/clp-ffi-go/ir"
)

// Converts log events into Zstd compressed IR. Log events provided to writer are immediately
// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer
// receives logs, they are immediately sent to s3.
// Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd
// compressed IR and stored in [memoryWriter.zstdBuffer].
type memoryWriter struct {
zstdBuffer *bytes.Buffer
irWriter *ir.Writer
Expand Down Expand Up @@ -96,14 +95,6 @@ func (w *memoryWriter) Reset() error {
return nil
}

// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
func (w *memoryWriter) GetUseDiskBuffer() bool {
return false
}

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -113,7 +104,8 @@ func (w *memoryWriter) GetZstdOutput() io.Reader {
}

// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
// Instead, calling Len() on buffer.
// Instead, calling Len() on buffer. Size may slightly lag the real size since some data in the
// current block will be in the [zstd] encoder's internal buffer.
//
// Returns:
// - size: Bytes written
Expand All @@ -139,3 +131,16 @@ func (w *memoryWriter) Close() error {
}
return nil
}

// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will
// flush Zstd Writer potentially creating unnecessary frames.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: nil error to comply with interface
func (w *memoryWriter) CheckEmpty() (bool, error) {
w.zstdWriter.Flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method doesn't match what it does and to some degree doesn't really match its use case either.

the plugin now attempts to flush buffered data to S3 on shutdown. This is not needed for disk buffer mode since those files are recovered on restart (however it may sense to make this a configurable option in the future).

I feel it is confusing that during a graceful exit we don't upload to S3 when using disk buffering. Instead we wait until restart for recovery to handle this. If I stop fluentbit, I'd expect all the logs available to be uploaded.

I feel adding a flush method would fit the use case more.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify — are you suggesting introducing a separate Flush() method on the writer, in addition to CheckEmpty()?

Copy link
Contributor Author

@davemarco davemarco Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the expectation, but I was hesitant to upload on graceful shutdown in disk buffer mode because it can introduce duplication or corruption issues.

If we upload but the process is forcefully exited before the disk files are properly truncated, recovery could upload them again and create duplicates. Forcing additional flushes from the IR file to the Zstd file during shutdown could also be slow and, if interrupted mid-write, leave files in an inconsistent state.

One of the reasons to select disk buffering mode is fault tolerance; exiting quickly and relying on upload on restart is the safe path.

If we want “upload everything on stop” behavior for disk buffering, we could consider making it a configurable option if that’s something users would expect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify — are you suggesting introducing a separate Flush() method on the writer, in addition to CheckEmpty()?

Yup

For the behaviour, I think it is fair as long as we document everything.


empty := w.zstdBuffer.Len() == 0
return empty, nil
}
13 changes: 7 additions & 6 deletions internal/irzstd/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ type Writer interface {
// - err
Reset() error

// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
GetUseDiskBuffer() bool

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -61,6 +55,13 @@ type Writer interface {
// - size: Bytes written
// - err
GetZstdOutputSize() (int, error)

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err
CheckEmpty() (bool, error)
}

// Writes log events to a IR Writer.
Expand Down
59 changes: 59 additions & 0 deletions plugins/out_clp_s3/internal/exit/exit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Package exit provides functions for gracefully shutting down the plugin. Exit functions are only
// called when Fluent Bit receives a kill signal, not during an abrupt crash. The plugin is given
// limited time to clean up resources before Fluent Bit terminates it.

package exit

import (
"github.com/y-scope/fluent-bit-clp/internal/outctx"
)

// Fs gracefully exits the plugin by closing files.
//
// Parameters:
// - ctx: Plugin context
//
// Returns:
// - err: Error closing file
func Fs(ctx *outctx.S3Context) error {
for _, eventManager := range ctx.EventManagers {
err := eventManager.Writer.Close()
if err != nil {
return err
}
eventManager.Writer = nil
}

return nil
}

// S3 gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt,
// however Fluent Bit may kill the plugin before the upload completes.
//
// Parameters:
// - ctx: Plugin context
//
// Returns:
// - err: Error closing file
func S3(ctx *outctx.S3Context) error {
for _, eventManager := range ctx.EventManagers {
empty, err := eventManager.Writer.CheckEmpty()
if err != nil {
return err
}
if empty {
continue
}
err = eventManager.ToS3(ctx.Config, ctx.Uploader)
if err != nil {
return err
}
err = eventManager.Writer.Close()
if err != nil {
return err
}
eventManager.Writer = nil
}

return nil
}
8 changes: 1 addition & 7 deletions plugins/out_clp_s3/internal/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent,
}
}

// Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always
// uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater
// than upload size.
// Checks whether Zstd buffer size is greater than or equal to upload size.
//
// Parameters:
// - eventManager: Manager for Fluent Bit events with the same tag
Expand All @@ -123,10 +121,6 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent,
// - readyToUpload: Boolean if upload criteria met or not
// - err: Error getting Zstd buffer size
func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) {
if !eventManager.Writer.GetUseDiskBuffer() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one change changes buffer behaviour to not immediately upload

return true, nil
}

bufferSize, err := eventManager.Writer.GetZstdOutputSize()
if err != nil {
return false, fmt.Errorf("error could not get size of buffer: %w", err)
Expand Down
22 changes: 0 additions & 22 deletions plugins/out_clp_s3/internal/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,6 @@ import (
"github.com/y-scope/fluent-bit-clp/internal/outctx"
)

// If useDiskBuffer is set, close all files prior to exit. Graceful exit will only be called
// if Fluent Bit receives a kill signal and not during an abrupt crash. Plugin is only
// given a limited time to clean up resources, so output is not sent to s3. Instead
// they are sent during startup.
//
// Parameters:
// - ctx: Plugin context
//
// Returns:
// - err: Error closing file
func GracefulExit(ctx *outctx.S3Context) error {
for _, eventManager := range ctx.EventManagers {
err := eventManager.Writer.Close()
if err != nil {
return err
}
eventManager.Writer = nil
}

return nil
}

// Sends existing disk buffers to S3.
//
// Parameters:
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_clp_s3/out_clp_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/fluent/fluent-bit-go/output"

"github.com/y-scope/fluent-bit-clp/internal/outctx"
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/exit"
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush"
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/recovery"
)
Expand Down Expand Up @@ -132,7 +133,12 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int {

log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id)

err := recovery.GracefulExit(outCtx)
var err error
if outCtx.Config.UseDiskBuffer {
err = exit.Fs(outCtx)
} else {
err = exit.S3(outCtx)
}
if err != nil {
log.Printf("Failed to exit gracefully")
}
Expand Down