Skip to content

Commit 7ae0501

Browse files
committed
refactor(v2): improve code quality with comments and file reorganization
- Add comprehensive package documentation explaining the plugin architecture - Reorganize internal package into logical files: - s3.go: S3 client creation, validation, and upload functions - ingestion.go: Ingestion context creation and management - context.go: Plugin context, configuration, and type definitions - flush_manager.go: Dual-timer flush strategy implementation - Add detailed block comment explaining the dual-timer flush strategy - Add godoc comments to all exported types and functions - Document Fluent Bit plugin callbacks and data flow - Fix potential bug: use m.hardTimeout.IsZero() instead of nextHardTimeout.IsZero() - Extract magic values into named constants - Improve error messages and logging consistency
1 parent ffb7505 commit 7ae0501

File tree

6 files changed

+690
-279
lines changed

6 files changed

+690
-279
lines changed

plugins/out_clp_s3_v2/internal/context.go

Lines changed: 125 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,52 +13,128 @@ import (
1313
"github.com/y-scope/clp-ffi-go/ir"
1414
)
1515

16-
// FlushConfigContext stores the flush control configurations
16+
// Default configuration values.
17+
const (
18+
// defaultLogLevelKey is the JSON key used to extract log severity from records.
19+
defaultLogLevelKey = "level"
20+
// defaultFlushDelta is the default time between flushes for all log levels.
21+
defaultFlushDelta = 3 * time.Second
22+
)
23+
24+
// FlushConfigContext stores configuration for the dual-timer flush strategy.
25+
//
26+
// The flush strategy uses two timers per log stream:
27+
// - Hard timer: Guarantees logs are uploaded within a maximum time window
28+
// - Soft timer: Uploads after a period of inactivity (no new logs)
29+
//
30+
// Each log level can have different timer values, allowing critical logs
31+
// (ERROR, FATAL) to trigger faster uploads than debug logs.
1732
type FlushConfigContext struct {
18-
LogLevelKey string
33+
// LogLevelKey is the JSON key used to extract log severity from records.
34+
// Common values: "level", "severity", "log_level"
35+
LogLevelKey string
36+
37+
// defaultLogLevel is the log level index to use when level cannot be determined.
38+
// Maps to the index in hardDeltas/softDeltas arrays.
1939
defaultLogLevel int
20-
hardDeltas []time.Duration
21-
softDeltas []time.Duration
40+
41+
// hardDeltas contains the hard flush deadline for each log level.
42+
// Index corresponds to log level (0=debug, 1=info, 2=warn, 3=error, 4=fatal).
43+
hardDeltas []time.Duration
44+
45+
// softDeltas contains the soft flush delay for each log level.
46+
// Soft timer resets on each log event, triggering upload after inactivity.
47+
softDeltas []time.Duration
2248
}
2349

24-
// flushContext manages timing and callback logic for log flushing.
50+
// flushContext manages the dual-timer flush strategy for a single log stream.
51+
//
52+
// Thread-safety: All public methods acquire the Mutex before modifying state.
53+
// The timers run in separate goroutines but callbacks also acquire the Mutex.
2554
type flushContext struct {
26-
HardTimer *time.Timer
27-
hardTimeout time.Time
28-
softDelta time.Duration
29-
SoftTimer *time.Timer
55+
// HardTimer fires at the hard deadline, guaranteeing upload within a time window.
56+
HardTimer *time.Timer
57+
// hardTimeout tracks when the hard timer will fire (for comparison with new events).
58+
hardTimeout time.Time
59+
60+
// SoftTimer fires after a period of inactivity (no new log events).
61+
SoftTimer *time.Timer
62+
// softDelta tracks the current soft timer duration (minimum seen for this batch).
63+
softDelta time.Duration
64+
65+
// userCallback is invoked when either timer fires, triggering S3 upload.
3066
userCallback func()
31-
Mutex sync.Mutex
67+
68+
// Mutex protects all fields from concurrent access.
69+
Mutex sync.Mutex
3270
}
3371

34-
// compressionContext encapsulates file and compression writers.
72+
// compressionContext encapsulates the compression pipeline resources.
73+
//
74+
// The pipeline processes log events as:
75+
//
76+
// Log Event → IR Writer → Zstd Writer → File
3577
type compressionContext struct {
36-
File *os.File
78+
// File is the temporary file storing compressed logs before S3 upload.
79+
File *os.File
80+
// ZstdWriter compresses data using Zstandard algorithm.
3781
ZstdWriter *zstd.Encoder
38-
IRWriter *ir.Writer
82+
// IRWriter encodes log events into CLP's Intermediate Representation format.
83+
IRWriter *ir.Writer
3984
}
4085

41-
// IngestionContext contains compression and flush contexts for a particular log path.
86+
// IngestionContext contains all resources for processing a single log stream.
87+
//
88+
// Each unique log path (derived from Fluent Bit tag) has its own IngestionContext,
89+
// allowing independent compression and flush timing per stream.
4290
type IngestionContext struct {
91+
// Compression holds the file and encoder resources for this stream.
4392
Compression *compressionContext
44-
Flush *flushContext
93+
// Flush manages the upload timing for this stream.
94+
Flush *flushContext
4595
}
4696

47-
// s3Context holds AWS S3 configuration and client.
97+
// s3Context holds the S3 client and bucket configuration.
4898
type s3Context struct {
99+
// Client is the configured AWS S3 client.
49100
Client *s3.Client
101+
// Bucket is the target S3 bucket for log uploads.
50102
Bucket string
51103
}
52104

53-
// PluginContext is the top-level context for the plugin.
105+
// PluginContext is the top-level context for the out_clp_s3_v2 plugin.
106+
//
107+
// A single PluginContext is created during FLBPluginInit and shared across
108+
// all flush callbacks. It contains:
109+
// - S3 configuration and client
110+
// - Map of ingestion contexts (one per log stream/tag)
111+
// - Flush timing configuration
54112
type PluginContext struct {
55-
S3 *s3Context
56-
Ingestion map[string]*IngestionContext
113+
// S3 holds the S3 client and bucket configuration.
114+
S3 *s3Context
115+
// Ingestion maps log paths to their ingestion contexts.
116+
// Key is typically the Fluent Bit tag or file_path from log records.
117+
Ingestion map[string]*IngestionContext
118+
// FlushConfig contains the dual-timer flush strategy configuration.
57119
FlushConfig *FlushConfigContext
58120
}
59121

60-
// NewPluginContext initializes a new PluginContext
122+
// NewPluginContext creates and initializes a new PluginContext from Fluent Bit configuration.
123+
//
124+
// This function:
125+
// 1. Creates an S3 client using AWS credentials from the environment
126+
// 2. Validates the target S3 bucket exists and is accessible
127+
// 3. Loads flush timing configuration from plugin settings
128+
//
129+
// Configuration keys read from Fluent Bit:
130+
// - log_bucket: Target S3 bucket name (required)
131+
// - log_level_key: JSON key for log severity (default: "level")
132+
// - flush_hard_delta_*: Hard timer durations per log level
133+
// - flush_soft_delta_*: Soft timer durations per log level
134+
//
135+
// Returns an error if S3 client creation or bucket validation fails.
61136
func NewPluginContext(plugin unsafe.Pointer) (*PluginContext, error) {
137+
// Create and validate S3 client
62138
client, err := S3CreateClient()
63139
if err != nil {
64140
log.Printf("[error] Failed to create S3 client: %v", err)
@@ -72,60 +148,60 @@ func NewPluginContext(plugin unsafe.Pointer) (*PluginContext, error) {
72148
}
73149
log.Printf("[info] Logs are configured to be uploaded to s3://%s", bucket)
74150

75-
logLevelKey := getConfigWithDefaultString(plugin, "log_level_key", "level")
151+
// Load log level key configuration
152+
logLevelKey := getConfigWithDefault(plugin, "log_level_key", defaultLogLevelKey)
76153
log.Printf("[info] Log level key is configured to: %q", logLevelKey)
77154

78-
// Flush behavior control - use very aggressive defaults for now
155+
// Load flush timing configuration for each log level
156+
// Index order: 0=debug, 1=info, 2=warn, 3=error, 4=fatal
79157
hardDeltas := []time.Duration{
80-
getConfigWithDefaultTimeDuration(plugin, "flush_hard_delta_debug", 3*time.Second),
81-
getConfigWithDefaultTimeDuration(plugin, "flush_hard_delta_info", 3*time.Second),
82-
getConfigWithDefaultTimeDuration(plugin, "flush_hard_delta_warn", 3*time.Second),
83-
getConfigWithDefaultTimeDuration(plugin, "flush_hard_delta_error", 3*time.Second),
84-
getConfigWithDefaultTimeDuration(plugin, "flush_hard_delta_fatal", 3*time.Second),
158+
getConfigDuration(plugin, "flush_hard_delta_debug", defaultFlushDelta),
159+
getConfigDuration(plugin, "flush_hard_delta_info", defaultFlushDelta),
160+
getConfigDuration(plugin, "flush_hard_delta_warn", defaultFlushDelta),
161+
getConfigDuration(plugin, "flush_hard_delta_error", defaultFlushDelta),
162+
getConfigDuration(plugin, "flush_hard_delta_fatal", defaultFlushDelta),
85163
}
86164
softDeltas := []time.Duration{
87-
getConfigWithDefaultTimeDuration(plugin, "flush_soft_delta_debug", 3*time.Second),
88-
getConfigWithDefaultTimeDuration(plugin, "flush_soft_delta_info", 3*time.Second),
89-
getConfigWithDefaultTimeDuration(plugin, "flush_soft_delta_warn", 3*time.Second),
90-
getConfigWithDefaultTimeDuration(plugin, "flush_soft_delta_error", 3*time.Second),
91-
getConfigWithDefaultTimeDuration(plugin, "flush_soft_delta_fatal", 3*time.Second),
165+
getConfigDuration(plugin, "flush_soft_delta_debug", defaultFlushDelta),
166+
getConfigDuration(plugin, "flush_soft_delta_info", defaultFlushDelta),
167+
getConfigDuration(plugin, "flush_soft_delta_warn", defaultFlushDelta),
168+
getConfigDuration(plugin, "flush_soft_delta_error", defaultFlushDelta),
169+
getConfigDuration(plugin, "flush_soft_delta_fatal", defaultFlushDelta),
92170
}
93171

94-
pluginCtx := &PluginContext{
172+
return &PluginContext{
95173
S3: &s3Context{
96174
Client: client,
97175
Bucket: bucket,
98176
},
99177
Ingestion: make(map[string]*IngestionContext),
100178
FlushConfig: &FlushConfigContext{
101179
LogLevelKey: logLevelKey,
102-
defaultLogLevel: 0,
180+
defaultLogLevel: 0, // Default to debug level
103181
hardDeltas: hardDeltas,
104182
softDeltas: softDeltas,
105183
},
106-
}
107-
108-
return pluginCtx, nil
184+
}, nil
109185
}
110186

111-
func getConfigWithDefaultTimeDuration(
112-
plugin unsafe.Pointer,
113-
key string,
114-
defaultVal time.Duration,
115-
) time.Duration {
116-
duration, err := time.ParseDuration(output.FLBPluginConfigKey(plugin, key))
187+
// getConfigDuration reads a duration configuration value with a default fallback.
188+
func getConfigDuration(plugin unsafe.Pointer, key string, defaultVal time.Duration) time.Duration {
189+
rawValue := output.FLBPluginConfigKey(plugin, key)
190+
if rawValue == "" {
191+
return defaultVal
192+
}
193+
194+
duration, err := time.ParseDuration(rawValue)
117195
if err != nil {
118-
log.Printf("[error] Failed to parse duration %q: %v", key, err)
196+
log.Printf("[warn] Failed to parse duration for %q (%q): %v; using default %v",
197+
key, rawValue, err, defaultVal)
119198
return defaultVal
120199
}
121200
return duration
122201
}
123202

124-
func getConfigWithDefaultString(
125-
plugin unsafe.Pointer,
126-
key,
127-
defaultVal string,
128-
) string {
203+
// getConfigWithDefault reads a string configuration value with a default fallback.
204+
func getConfigWithDefault(plugin unsafe.Pointer, key, defaultVal string) string {
129205
val := output.FLBPluginConfigKey(plugin, key)
130206
if val == "" {
131207
return defaultVal

0 commit comments

Comments
 (0)