Skip to content

Commit 1f5e2bc

Browse files
committed
input_log: improve context extraction and routing
This patch improves context extraction from log records and enhances the routing logic with better context handling. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 03d3c2e commit 1f5e2bc

File tree

1 file changed

+90
-44
lines changed

1 file changed

+90
-44
lines changed

src/flb_input_log.c

Lines changed: 90 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -391,40 +391,63 @@ static int build_payload_for_route(struct flb_input_instance *ins,
391391
return 0;
392392
}
393393

394-
/* Second pass: find GROUP_START record */
394+
/* Second pass: encode records in order, preserving group structure */
395395
for (i = 0; i < record_count; i++) {
396-
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0 &&
397-
record_type == FLB_LOG_EVENT_GROUP_START) {
398-
group_start_record = records[i];
399-
break;
396+
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) != 0) {
397+
continue;
400398
}
401-
}
402399

403-
if (group_start_record != NULL) {
404-
ret = encode_chunk_record(encoder, group_start_record);
405-
if (ret != 0) {
406-
flb_free(matched_by_route);
407-
flb_log_event_encoder_destroy(encoder);
408-
return -1;
400+
if (record_type == FLB_LOG_EVENT_GROUP_START) {
401+
group_start_record = records[i];
402+
group_end = NULL;
403+
continue;
409404
}
410-
}
411-
412-
/* Encode matching records */
413-
for (i = 0; i < record_count; i++) {
414-
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0 &&
415-
record_type == FLB_LOG_EVENT_NORMAL) {
416-
if (matched_by_route[i]) {
417-
ret = encode_chunk_record(encoder, records[i]);
405+
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
406+
if (group_end != NULL &&
407+
group_start_record != NULL &&
408+
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
409+
ret = encode_chunk_record(encoder, group_end);
418410
if (ret != 0) {
419411
flb_free(matched_by_route);
420412
flb_log_event_encoder_destroy(encoder);
421413
return -1;
422414
}
415+
group_end = NULL;
416+
}
417+
group_start_record = NULL;
418+
continue;
419+
}
420+
else if (record_type == FLB_LOG_EVENT_NORMAL && matched_by_route[i]) {
421+
if (group_start_record != NULL &&
422+
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
423+
if (group_end == NULL) {
424+
ret = encode_chunk_record(encoder, group_start_record);
425+
if (ret != 0) {
426+
flb_free(matched_by_route);
427+
flb_log_event_encoder_destroy(encoder);
428+
return -1;
429+
}
430+
for (size_t j = i + 1; j < record_count; j++) {
431+
if (flb_log_event_decoder_get_record_type(&records[j]->event, &record_type) == 0 &&
432+
record_type == FLB_LOG_EVENT_GROUP_END &&
433+
records[j]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
434+
group_end = records[j];
435+
break;
436+
}
437+
}
438+
}
439+
}
440+
441+
ret = encode_chunk_record(encoder, records[i]);
442+
if (ret != 0) {
443+
flb_free(matched_by_route);
444+
flb_log_event_encoder_destroy(encoder);
445+
return -1;
423446
}
424447
}
425448
}
426449

427-
if (group_end != NULL && group_start_record != NULL) {
450+
if (group_end != NULL) {
428451
ret = encode_chunk_record(encoder, group_end);
429452
if (ret != 0) {
430453
flb_free(matched_by_route);
@@ -515,15 +538,6 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
515538
return 0;
516539
}
517540

518-
/* Second pass: find GROUP_START record */
519-
for (i = 0; i < record_count; i++) {
520-
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0 &&
521-
record_type == FLB_LOG_EVENT_GROUP_START) {
522-
group_start_record = records[i];
523-
break;
524-
}
525-
}
526-
527541
matched_by_default = flb_calloc(record_count, sizeof(int));
528542
if (!matched_by_default) {
529543
flb_errno();
@@ -549,31 +563,63 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
549563
}
550564
}
551565

552-
if (group_start_record != NULL) {
553-
ret = encode_chunk_record(encoder, group_start_record);
554-
if (ret != 0) {
555-
flb_free(matched_by_default);
556-
flb_log_event_encoder_destroy(encoder);
557-
return -1;
566+
/* Second pass: encode records in order, preserving group structure */
567+
for (i = 0; i < record_count; i++) {
568+
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) != 0) {
569+
continue;
558570
}
559-
}
560571

561-
/* Encode matching records */
562-
for (i = 0; i < record_count; i++) {
563-
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) == 0 &&
564-
record_type == FLB_LOG_EVENT_NORMAL) {
565-
if (matched_by_default[i]) {
566-
ret = encode_chunk_record(encoder, records[i]);
572+
if (record_type == FLB_LOG_EVENT_GROUP_START) {
573+
group_start_record = records[i];
574+
group_end = NULL;
575+
continue;
576+
}
577+
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
578+
if (group_end != NULL &&
579+
group_start_record != NULL &&
580+
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
581+
ret = encode_chunk_record(encoder, group_end);
567582
if (ret != 0) {
568583
flb_free(matched_by_default);
569584
flb_log_event_encoder_destroy(encoder);
570585
return -1;
571586
}
587+
group_end = NULL;
588+
}
589+
group_start_record = NULL;
590+
continue;
591+
}
592+
else if (record_type == FLB_LOG_EVENT_NORMAL && matched_by_default[i]) {
593+
if (group_start_record != NULL &&
594+
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
595+
if (group_end == NULL) {
596+
ret = encode_chunk_record(encoder, group_start_record);
597+
if (ret != 0) {
598+
flb_free(matched_by_default);
599+
flb_log_event_encoder_destroy(encoder);
600+
return -1;
601+
}
602+
for (size_t j = i + 1; j < record_count; j++) {
603+
if (flb_log_event_decoder_get_record_type(&records[j]->event, &record_type) == 0 &&
604+
record_type == FLB_LOG_EVENT_GROUP_END &&
605+
records[j]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
606+
group_end = records[j];
607+
break;
608+
}
609+
}
610+
}
611+
}
612+
613+
ret = encode_chunk_record(encoder, records[i]);
614+
if (ret != 0) {
615+
flb_free(matched_by_default);
616+
flb_log_event_encoder_destroy(encoder);
617+
return -1;
572618
}
573619
}
574620
}
575621

576-
if (group_end != NULL && group_start_record != NULL) {
622+
if (group_end != NULL) {
577623
ret = encode_chunk_record(encoder, group_end);
578624
if (ret != 0) {
579625
flb_free(matched_by_default);

0 commit comments

Comments
 (0)