Skip to content

Conversation

@novegit
Copy link
Contributor

@novegit novegit commented Jul 28, 2020

rules in rewrite_tag filter were combined with OR combination. In some usecases
an AND-combination is helpful. For instance, when logmessages in kubernetes from
customer namespaces should be dropped, which haven't set a special annotation
field. Without an AND-combination, two filter section are necessary, to get this
done.

Configuration example:
to each rule a fifth field with true|false can be added. 'true' means, that this
rule should be "AND"-combined with the next rule. "false" means default "OR"
behaviour, and is not needed. So its full compatible with old filter configuration.

[FILTER]
    Name          rewrite_tag
    Match         tail
    Rule          $log ^(1|3)$    newtag_or    false false
    Rule          $log ^(.*end)$  newtag_and_1 false true
    Rule          $log ^(1.*)$    newtag_and_2 false false
    Rule          $log ^(2.*)$    newtag_or    false

Signed-off-by: Michael Voelker [email protected]


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • Documentation required for this feature

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

rules in rewrite_tag filter were combined with OR combination. In some usecases
an AND-combination is helpful. For instance, when logmessages in kubernetes from
customer namespaces should be dropped, which haven't set a special annotation
field. Without an AND-combination, two filter section are necessary, to get this
done.

Configuration example:
to each rule a fifth field with true|false can be added. 'true' means, that this
rule should be "AND"-combined with the next rule. "false" means default "OR"
behaviour, and is not needed. So its full compatible with old filter configuration.

```
[FILTER]
    Name          rewrite_tag
    Match         tail
    Rule          $log ^(1|3)$    newtag_or    false false
    Rule          $log ^(.*end)$  newtag_and_1 false true
    Rule          $log ^(1.*)$    newtag_and_2 false false
    Rule          $log ^(2.*)$    newtag_or    false
```

Signed-off-by: Michael Voelker <[email protected]>
@novegit
Copy link
Contributor Author

novegit commented Jul 28, 2020

Example configfiguration

fluentbit.conf

[SERVICE]
    # Flush
    # =====
    # Set an interval of seconds before to flush records to a destination
    Flush        5

    # Daemon
    # ======
    # Instruct Fluent Bit to run in foreground or background mode.
    Daemon       Off
    Log_Level debug

    # Parsers_File
    # ============
    # Specify an optional 'Parsers' configuration file
    Parsers_File parsers.conf
    Plugins_File plugins.conf

    # HTTP Server
    # ===========
    # Enable/Disable the built-in HTTP Server for metrics
    HTTP_Server  On
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020

[INPUT]
    Name tail
    Tag tail
    Path /var/tmp/loginput.txt
    Docker_Mode On

[FILTER]
    Name          rewrite_tag
    Match         tail
    Rule          $log ^(1|3)$    newtag_or    false false
    Rule          $log ^(.*end)$  newtag_and_1 false true
    Rule          $log ^(1.*)$    newtag_and_2 false false
    Rule          $log ^(2.*)$    newtag_or    false

[OUTPUT]
    Name  stdout
    Match *

/var/tmp/loginput.txt

1
2
3
4
5
6
7
8
9
10start
10test
10end
11no
12

rewrite_tag result:

[0] newtag_or: [1595957976.822152000, {"log"=>"1"}]
[1] newtag_or: [1595957976.825958100, {"log"=>"2"}]
[2] newtag_or: [1595957976.826111700, {"log"=>"3"}]
[0] newtag_and_2: [1595957976.826542900, {"log"=>"10end"}]

@novegit
Copy link
Contributor Author

novegit commented Jul 28, 2020

Debug log

/root@79f207404693:/tmp/src/build# usr/bin/valgrind.bin /fluent-bit/bin/fluent-bit -c /fluent-bit/etc/fluent-bit.conf
==17== Memcheck, a memory error detector
==17== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==17== Using Valgrind-3.14.0 and LibVEX; rerun with -h for copyright info
==17== Command: /fluent-bit/bin/fluent-bit -c /fluent-bit/etc/fluent-bit.conf
==17==
Fluent Bit v1.6.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2020/07/28 17:39:36] [ info] Configuration:
[2020/07/28 17:39:36] [ info]  flush time     | 5.000000 seconds
[2020/07/28 17:39:36] [ info]  grace          | 5 seconds
[2020/07/28 17:39:36] [ info]  daemon         | 0
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  inputs:
[2020/07/28 17:39:36] [ info]      tail
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  filters:
[2020/07/28 17:39:36] [ info]      rewrite_tag.0
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  outputs:
[2020/07/28 17:39:36] [ info]      stdout.0
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  collectors:
[2020/07/28 17:39:36] [ info] [engine] started (pid=17)
[2020/07/28 17:39:36] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2020/07/28 17:39:36] [debug] [storage] [cio stream] new stream registered: tail.0
[2020/07/28 17:39:36] [ info] [storage] version=1.0.4, initializing...
[2020/07/28 17:39:36] [ info] [storage] in-memory
[2020/07/28 17:39:36] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] inotify watch fd=19
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] scanning path /var/tmp/loginput.txt
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] inode=15564311 appended as /var/tmp/loginput.txt
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] scan_glob add(): /var/tmp/loginput.txt, inode 15564311
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] 1 new files found on path '/var/tmp/loginput.txt'
[2020/07/28 17:39:36] [debug] [storage] [cio stream] new stream registered: emitter_for_rewrite_tag.0
[2020/07/28 17:39:36] [debug] [router] match rule tail.0:stdout.0
[2020/07/28 17:39:36] [debug] [router] match rule emitter.1:stdout.0
[2020/07/28 17:39:36] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2020/07/28 17:39:36] [ info] [sp] stream processor started
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] inode=15564311 file=/var/tmp/loginput.txt promote to TAIL_EVENT
[2020/07/28 17:39:36] [ info] inotify_fs_add(): inode=15564311 watch_fd=1 name=/var/tmp/loginput.txt
[0] tail: [1595957976.826163800, {"log"=>"4"}]
[1] tail: [1595957976.826225200, {"log"=>"5"}]
[2] tail: [1595957976.826260300, {"log"=>"6"}]
[3] tail: [1595957976.826295600, {"log"=>"7"}]
[4] tail: [1595957976.826329600, {"log"=>"8"}]
[5] tail: [1595957976.826414200, {"log"=>"9"}]
[6] tail: [1595957976.826452800, {"log"=>"10start"}]
[7] tail: [1595957976.826488900, {"log"=>"10test"}]
[8] tail: [1595957976.826579100, {"log"=>"11no"}]
[9] tail: [1595957976.826613300, {"log"=>"12"}]
[0] newtag_or: [1595957976.822152000, {"log"=>"1"}]
[1] newtag_or: [1595957976.825958100, {"log"=>"2"}]
[2] newtag_or: [1595957976.826111700, {"log"=>"3"}]
[2020/07/28 17:39:41] [debug] [task] created task=0x5fb1a60 id=0 OK
[0] newtag_and_2: [1595957976.826542900, {"log"=>"10end"}]
[2020/07/28 17:39:41] [debug] [task] created task=0x5fccc20 id=1 OK
[2020/07/28 17:39:41] [debug] [task] created task=0x5fccdb0 id=2 OK
[2020/07/28 17:39:41] [debug] [task] destroy task=0x5fb1a60 (task_id=0)
[2020/07/28 17:39:41] [debug] [task] destroy task=0x5fccc20 (task_id=1)
[2020/07/28 17:39:41] [debug] [task] destroy task=0x5fccdb0 (task_id=2)
[2020/07/28 17:40:36] [debug] [input:tail:tail.0] scanning path /var/tmp/loginput.txt
[2020/07/28 17:40:36] [debug] [input:tail:tail.0] scan_blog add(): dismissed: /var/tmp/loginput.txt, inode 15564311
[2020/07/28 17:40:36] [debug] [input:tail:tail.0] 0 new files found on path '/var/tmp/loginput.txt'
^C[engine] caught signal (SIGINT)
[2020/07/28 17:41:02] [ info] [input] pausing tail.0
[2020/07/28 17:41:02] [ info] [input] pausing emitter_for_rewrite_tag.0
[2020/07/28 17:41:02] [debug] [input:tail:tail.0] inode=15564311 removing file name /var/tmp/loginput.txt
[2020/07/28 17:41:02] [ info] inotify_fs_remove(): inode=15564311 watch_fd=1
==17== Invalid free() / delete / delete[] / realloc()
==17==    at 0x48369AB: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==17==    by 0x509CAB9: free_key_mem (dlerror.c:223)
==17==    by 0x509CAB9: __dlerror_main_freeres (dlerror.c:239)
==17==    by 0x5224B71: __libc_freeres (in /lib/x86_64-linux-gnu/libc-2.28.so)
==17==    by 0x482B19E: _vgnU_freeres (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_core-amd64-linux.so)
==17==    by 0x1832AA: flb_signal_handler (fluent-bit.c:396)
==17==    by 0x50F383F: ??? (in /lib/x86_64-linux-gnu/libc-2.28.so)
==17==    by 0x51B57EE: epoll_wait (epoll_wait.c:30)
==17==    by 0x64B256: _mk_event_wait (mk_event_epoll.c:354)
==17==    by 0x64B546: mk_event_wait (mk_event.c:163)
==17==    by 0x211301: flb_engine_start (flb_engine.c:555)
==17==    by 0x1844F5: flb_main (fluent-bit.c:1035)
==17==    by 0x184543: main (fluent-bit.c:1048)
==17==  Address 0x5d9b1e0 is in a rw- anonymous segment
==17==
==17==
==17== HEAP SUMMARY:
==17==     in use at exit: 89,571 bytes in 663 blocks
==17==   total heap usage: 5,358 allocs, 4,696 frees, 6,115,075 bytes allocated
==17==
==17== LEAK SUMMARY:
==17==    definitely lost: 48 bytes in 2 blocks
==17==    indirectly lost: 882 bytes in 5 blocks
==17==      possibly lost: 0 bytes in 0 blocks
==17==    still reachable: 88,641 bytes in 656 blocks
==17==         suppressed: 0 bytes in 0 blocks
==17== Rerun with --leak-check=full to see details of leaked memory
==17==
==17== For counts of detected and suppressed errors, rerun with: -v
==17== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

novegit added a commit to novegit/fluent-bit-docs that referenced this pull request Jul 29, 2020
description for and-combination for rewrite_tag filter rules
PR: fluent/fluent-bit#2399

Signed-off-by: Michael Voelker <[email protected]>
@edsiper
Copy link
Member

edsiper commented Jul 29, 2020

thanks for opening this PR.

I think it's not the main purpose of rewrite tag filter to discard records, but rewrite tags only, and optionally discard the original matched ones.

If you need to "let pass" records that matches a criteria there are other ways to do it. E.g: consider the following test file with two records:

{"log": {"kubernetes": {"msg": "test1", "logme": "yes"}}}
{"log": {"kubernetes": {"msg": "test2"}}}

Using tail + grep filter you can accomplish the same thing you need:

fluent-bit -R parsers.conf \
  -i tail -p path=test.log -p parser=json \
  -F grep -p "regex=\$log['kubernetes']['logme'] yes" -m '*' \
  -o stdout -p format=json_lines -f 1

output:

{"date":1596059959.811035,"log":{"kubernetes":{"msg":"test1","logme":"yes"}}}

another way to accomplish the same is using the stream processor, making the input plugin non-routable and creating a new stream that matches a SQL criteria.

@edsiper edsiper self-assigned this Jul 29, 2020
@edsiper edsiper added the waiting-for-user Waiting for more information, tests or requested changes label Jul 29, 2020
@novegit
Copy link
Contributor Author

novegit commented Jul 30, 2020

thanks for the reply, I didnt' knew so far, that grep can handle nested fields with this regex syntax. (In grep documentation section is only an example about using filter/nest for this case)

But i still don't see, that grep can solve my usecase. Task is to drop messages, that fulfill both conditions:
kubernetes['namespace_name'] should match ^.*-(acc|dev|prd|test)$
and
kubernetes['annotations']['loggingkafkatopic'] does not exist

input:

{"log": {"kubernetes": {"namespace_name: "namespace-customer1-tst", "labels": {"app": "prometheus"}, "annotations": {"loggingkafkatopic": "customer_kafka_topic" }}}}
{"log": {"kubernetes": {"namespace_name: "namespace-customer1-dev", "labels": {"app": "prometheus"}, "annotations": {"sample: "data" }}}}
{"log": {"kubernetes": {"namespace_name: "openshift-monitoring", "labels": {"app": "prometheus"}, "annotations": {"loggingkafkatopic": "customer_kafka_topic" }}}}
fluent-bit -R parsers.conf \
 -i tail -p path=in -p parser=json \
 -F grep -p "regex=\$log['kubernetes']['name_space'] .*-(acc|dev|prd|tst)" -m '*'  
 -p "exclude=\$log['kubernetes']['annotations']['loggingkafkatopic'] .*"   
 -o stdout

result:

[0] tail.0: [1596138829.479714300, {"log"=>"{"log": {"kubernetes": {"name_space: "namespace-customer1-tst", "labels": {"app": "prometheus"}, "annotations": {"loggingkafkatopic": "customer_kafka_topic" }}}}"}]
[1] tail.0: [1596138829.479724300, {"log"=>"{"log": {"kubernetes": {"name_space: "namespace-customer1-dev", "labels": {"app": "prometheus"}, "annotations": {"sample: "data" }}}}"}]

I assume, that the exclude-part isn't executed for the first result line, because when the first 'regex' matched, the grep-filter is finished for the message.

An alternate requirement could be in future, not to drop, but to route them to a seperate output with a customers pool kafka topic, collecting all the 'customer-logs' without a namespace specific kafkatopic (set in annotation loggingkafkatopic) to seperate them from the clusters default kafkatopic.

@eschabell
Copy link

@novegit can you look at resolving the conflicts in this PR? I'll also ping you on the same for the docs PR: fluent/fluent-bit-docs#349

@edsiper when he's resolved the conflicts, can you reassign a reviewer?

patrick-stephens pushed a commit to fluent/fluent-bit-docs that referenced this pull request Oct 21, 2025
* filter_rewrite_tag: add and-combination for rules

description for and-combination for rewrite_tag filter rules
PR: fluent/fluent-bit#2399

Signed-off-by: Michael Voelker <[email protected]>

* Apply suggestions from code review

Signed-off-by: Lynette  Miles <[email protected]>

* Apply suggestions from code review

Signed-off-by: Lynette  Miles <[email protected]>

---------

Signed-off-by: Michael Voelker <[email protected]>
Signed-off-by: Lynette  Miles <[email protected]>
Co-authored-by: Lynette  Miles <[email protected]>
@eschabell
Copy link

@patrick-stephens the docs PR fluent/fluent-bit-docs#349 for this code PR was merged today, so need to get this merged for docs to match.

TomlinfreeGit pushed a commit to TomlinfreeGit/fluent-bit-docs that referenced this pull request Oct 28, 2025
* filter_rewrite_tag: add and-combination for rules

description for and-combination for rewrite_tag filter rules
PR: fluent/fluent-bit#2399

Signed-off-by: Michael Voelker <[email protected]>

* Apply suggestions from code review

Signed-off-by: Lynette  Miles <[email protected]>

* Apply suggestions from code review

Signed-off-by: Lynette  Miles <[email protected]>

---------

Signed-off-by: Michael Voelker <[email protected]>
Signed-off-by: Lynette  Miles <[email protected]>
Co-authored-by: Lynette  Miles <[email protected]>
Signed-off-by: Tom <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

waiting-for-user Waiting for more information, tests or requested changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants