Skip to content

Commit 04e6e69

Browse files
authored
Merge pull request #103 from scality/improvement/LOGC-16-batch-ops
LOGC-16: Add comprehensive E2E tests for batch processing
2 parents d5e39ef + 341aa74 commit 04e6e69

File tree

6 files changed

+401
-37
lines changed

6 files changed

+401
-37
lines changed

deployment/conf/log-courier.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ consumer:
2121
max-discovery-interval-seconds: 1
2222
discovery-interval-jitter-factor: 0
2323
num-workers: 5
24-
max-buckets-per-discovery: 100
24+
max-buckets-per-discovery: 5
2525
max-logs-per-bucket: 100000
2626

2727
# Retry logic

pkg/logcourier/logobject.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ func (b *LogObjectBuilder) formatLogRecords(records []LogRecord) []byte {
108108
// writeLogRecord writes a single log record according to AWS format
109109
// Takes a pointer to avoid copying the ~1KB struct on each call (called for each record in the batch)
110110
// Field order must match AWS S3 Server Access Log format exactly
111-
//
112111
func (b *LogObjectBuilder) writeLogRecord(w *bytes.Buffer, rec *LogRecord) {
113112
b.writeStringPtr(w, rec.BucketOwner) // 1. Bucket Owner
114113
w.WriteByte(' ')

pkg/logcourier/metrics.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,37 @@ type Metrics struct {
1717

1818
// GeneralMetrics tracks general system state and errors
1919
type GeneralMetrics struct {
20-
BatchesProcessed *prometheus.CounterVec // labels: status (success/failed_permanent/failed_transient)
21-
RecordsPermanentErrors prometheus.Counter
20+
BatchesProcessed *prometheus.CounterVec // labels: status (success/failed_permanent/failed_transient)
21+
RecordsPermanentErrors prometheus.Counter
2222
BatchProcessingDuration prometheus.Histogram
23-
CyclesTotal prometheus.Counter
24-
CycleDuration prometheus.Histogram
25-
Lag prometheus.Summary // Time from log generation to S3 upload
23+
CyclesTotal prometheus.Counter
24+
CycleDuration prometheus.Histogram
25+
Lag prometheus.Summary // Time from log generation to S3 upload
2626
}
2727

2828
// DiscoveryMetrics tracks batch discovery operations
2929
type DiscoveryMetrics struct {
3030
BatchesFound prometheus.Counter
31-
Duration prometheus.Histogram
31+
Duration prometheus.Histogram
3232
}
3333

3434
// FetchMetrics tracks log fetching from ClickHouse
3535
type FetchMetrics struct {
36-
RecordsTotal prometheus.Counter
36+
RecordsTotal prometheus.Counter
3737
RecordsPerBucket prometheus.Summary
38-
Duration prometheus.Histogram
38+
Duration prometheus.Histogram
3939
}
4040

4141
// BuildMetrics tracks log object building
4242
type BuildMetrics struct {
43-
ObjectsTotal prometheus.Counter
43+
ObjectsTotal prometheus.Counter
4444
ObjectSizeBytes prometheus.Summary
45-
Duration prometheus.Histogram
45+
Duration prometheus.Histogram
4646
}
4747

4848
// UploadMetrics tracks S3 upload operations
4949
type UploadMetrics struct {
50-
ObjectsTotal *prometheus.CounterVec // labels: status (success/failed)
50+
ObjectsTotal *prometheus.CounterVec // labels: status (success/failed)
5151
Duration *prometheus.HistogramVec // labels: status (success/failed)
5252
}
5353

test/e2e/helpers_test.go

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,32 @@ func configureBucketLogging(client *s3.Client, sourceBucket, targetBucket, prefi
183183
return err
184184
}
185185

186+
// configureBucketPolicyForCrossAccountAccess sets up a bucket policy granting
187+
// PutObject permission to the service-access-logging-user. Required for cross-account
188+
// access in Integration environments.
189+
func configureBucketPolicyForCrossAccountAccess(client *s3.Client, bucket string) error {
190+
policy := fmt.Sprintf(`{
191+
"Version": "2012-10-17",
192+
"Statement": [
193+
{
194+
"Sid": "AllowCrossAccountPutObject",
195+
"Effect": "Allow",
196+
"Principal": {
197+
"AWS": "arn:aws:iam::000000000000:user/scality-internal/service-access-logging-user"
198+
},
199+
"Action": "s3:PutObject",
200+
"Resource": "arn:aws:s3:::%s/*"
201+
}
202+
]
203+
}`, bucket)
204+
205+
_, err := client.PutBucketPolicy(context.Background(), &s3.PutBucketPolicyInput{
206+
Bucket: aws.String(bucket),
207+
Policy: aws.String(policy),
208+
})
209+
return err
210+
}
211+
186212
// findLogObjectsSince finds all log objects in a bucket created after a given time
187213
// returns the list of object keys that were created after the given time
188214
func findLogObjectsSince(client *s3.Client, bucket, prefix string, since time.Time) ([]string, error) {
@@ -348,7 +374,9 @@ func fetchAllLogsInBucketSince(ctx *E2ETestContext, since time.Time) ([]*ParsedL
348374
return fetchLogsFromPrefix(ctx.S3Client, ctx.DestinationBucket, "", since)
349375
}
350376

351-
// fetchLogsFromPrefix fetches and parses all log records from a specific prefix since a given time
377+
// fetchLogsFromPrefix fetches and parses all log records from a specific prefix since a given time.
378+
// Verifies that records within each log object are in chronological order.
379+
// Returns all records combined across objects (no ordering guarantee across objects).
352380
func fetchLogsFromPrefix(client *s3.Client, bucket, prefix string, since time.Time) ([]*ParsedLogRecord, error) {
353381
objectKeys, err := findLogObjectsSince(client, bucket, prefix, since)
354382
if err != nil {
@@ -363,30 +391,47 @@ func fetchLogsFromPrefix(client *s3.Client, bucket, prefix string, since time.Ti
363391
}
364392

365393
records := parseLogContent(content)
394+
395+
// Verify chronological order within this object
396+
for i := 1; i < len(records); i++ {
397+
if records[i].Time.Before(records[i-1].Time) {
398+
return nil, fmt.Errorf("object %s: logs not in chronological order: record %d (%v) is before record %d (%v)",
399+
key, i, records[i].Time, i-1, records[i-1].Time)
400+
}
401+
}
402+
366403
allRecords = append(allRecords, records...)
367404
}
368405

369406
return allRecords, nil
370407
}
371408

372-
// waitForLogCount waits for at least expectedCount logs to appear
373-
func waitForLogCount(ctx *E2ETestContext, expectedCount int) []*ParsedLogRecord {
409+
// waitForLogCountWithPrefix waits for at least expectedCount logs to appear under a specific prefix.
410+
func waitForLogCountWithPrefix(ctx *E2ETestContext, prefix string, expectedCount int) []*ParsedLogRecord {
411+
GinkgoHelper()
412+
374413
var allLogs []*ParsedLogRecord
375414

376415
Eventually(func() int {
377-
logs, err := fetchAllLogsSince(ctx, ctx.TestStartTime)
416+
logs, err := fetchLogsFromPrefix(ctx.S3Client, ctx.DestinationBucket, prefix, ctx.TestStartTime)
378417
if err != nil {
379418
return 0
380419
}
381420
allLogs = logs
382421
return len(logs)
383422
}, logWaitTimeout, logPollInterval).Should(BeNumerically(">=", expectedCount),
384-
"Expected at least %d logs within %v", expectedCount, logWaitTimeout)
423+
"Expected at least %d logs with prefix %s within %v", expectedCount, prefix, logWaitTimeout)
385424

386425
return allLogs
387426
}
388427

389-
// VerifyLogs waits for logs, verifies they match expected values, and checks chronological order.
428+
// waitForLogCount waits for at least expectedCount logs to appear.
429+
func waitForLogCount(ctx *E2ETestContext, expectedCount int) []*ParsedLogRecord {
430+
GinkgoHelper()
431+
return waitForLogCountWithPrefix(ctx, ctx.LogPrefix, expectedCount)
432+
}
433+
434+
// VerifyLogs waits for logs and verifies they match expected values.
390435
// Returns the logs for additional assertions if needed.
391436
func (ctx *E2ETestContext) VerifyLogs(expected ...ExpectedLogBuilder) []*ParsedLogRecord {
392437
GinkgoHelper()
@@ -397,8 +442,6 @@ func (ctx *E2ETestContext) VerifyLogs(expected ...ExpectedLogBuilder) []*ParsedL
397442
verifyLogRecord(logs[i], exp)
398443
}
399444

400-
verifyChronologicalOrder(logs)
401-
402445
return logs
403446
}
404447

@@ -479,16 +522,31 @@ func verifyLogRecord(actual *ParsedLogRecord, expected ExpectedLogBuilder) {
479522
}
480523
}
481524

482-
// verifyChronologicalOrder verifies logs are in chronological order
483-
func verifyChronologicalOrder(records []*ParsedLogRecord) {
525+
// verifyLogKeys verifies that logs contain exactly the expected keys with no duplicates.
526+
// It filters logs by bucket and keyPrefix, then checks:
527+
// - Each matching log has the expected operation
528+
// - Each key is in the expectedKeys set
529+
// - No duplicate keys exist
530+
// - The total count matches expected
531+
func verifyLogKeys(logs []*ParsedLogRecord, bucket, keyPrefix string, expectedKeys map[string]bool, expectedOp string) {
484532
GinkgoHelper()
485533

486-
for i := 1; i < len(records); i++ {
487-
Expect(records[i].Time.After(records[i-1].Time) ||
488-
records[i].Time.Equal(records[i-1].Time)).To(BeTrue(),
489-
"Logs should be in chronological order: record %d (%v) should be after or equal to record %d (%v)",
490-
i, records[i].Time, i-1, records[i-1].Time)
534+
seenKeys := make(map[string]bool)
535+
for _, log := range logs {
536+
if log.Bucket != bucket || !strings.HasPrefix(log.Key, keyPrefix) {
537+
continue
538+
}
539+
Expect(log.Operation).To(Equal(expectedOp),
540+
"Expected %s operation for key %s", expectedOp, log.Key)
541+
Expect(expectedKeys[log.Key]).To(BeTrue(),
542+
"Unexpected key: %s", log.Key)
543+
Expect(seenKeys[log.Key]).To(BeFalse(),
544+
"Duplicate key: %s", log.Key)
545+
seenKeys[log.Key] = true
491546
}
547+
548+
Expect(seenKeys).To(HaveLen(len(expectedKeys)),
549+
"Expected %d unique keys, got %d", len(expectedKeys), len(seenKeys))
492550
}
493551

494552
// retryWithBackoff executes a function with exponential backoff retry logic.
@@ -538,6 +596,25 @@ func createBucketWithRetry(client *s3.Client, bucket string) error {
538596
return nil
539597
}
540598

599+
// putObjects creates multiple objects with keys based on keyFormat.
600+
// keyFormat should contain a single %d verb (e.g., "prefix-%d.txt").
601+
// If content is nil, generates unique content per object.
602+
func putObjects(ctx *E2ETestContext, keyFormat string, count int, content []byte) {
603+
for i := 0; i < count; i++ {
604+
key := fmt.Sprintf(keyFormat, i)
605+
body := content
606+
if body == nil {
607+
body = []byte(fmt.Sprintf("data %d", i))
608+
}
609+
_, err := ctx.S3Client.PutObject(context.Background(), &s3.PutObjectInput{
610+
Bucket: aws.String(ctx.SourceBucket),
611+
Key: aws.String(key),
612+
Body: bytes.NewReader(body),
613+
})
614+
Expect(err).NotTo(HaveOccurred(), "PUT operation %d should succeed", i)
615+
}
616+
}
617+
541618
// setupE2ETest creates and initializes an E2E test context
542619
func setupE2ETest() *E2ETestContext {
543620
GinkgoHelper()
@@ -560,6 +637,10 @@ func setupE2ETest() *E2ETestContext {
560637
err = configureBucketLogging(sharedS3Client, sourceBucket, destBucket, logPrefix)
561638
Expect(err).NotTo(HaveOccurred(), "Failed to configure bucket logging")
562639

640+
// Configure bucket policy for cross-account access
641+
err = configureBucketPolicyForCrossAccountAccess(sharedS3Client, destBucket)
642+
Expect(err).NotTo(HaveOccurred(), "Failed to configure bucket policy")
643+
563644
return &E2ETestContext{
564645
TestName: testName,
565646
S3Client: sharedS3Client,

0 commit comments

Comments
 (0)