1010import io .kafbat .ui .exception .CelException ;
1111import io .kafbat .ui .model .TopicMessageDTO ;
1212import java .time .OffsetDateTime ;
13- import java .time .temporal .ChronoUnit ;
1413import java .util .ArrayList ;
14+ import java .util .Base64 ;
1515import java .util .List ;
1616import java .util .Map ;
17+ import java .util .UUID ;
1718import java .util .function .Predicate ;
1819import org .apache .commons .lang3 .RandomStringUtils ;
1920import org .junit .jupiter .api .Nested ;
@@ -100,7 +101,7 @@ void canCheckTimestampMs() {
100101 var ts = OffsetDateTime .now ();
101102 var f = celScriptFilter ("record.timestampMs == " + ts .toInstant ().toEpochMilli ());
102103 assertTrue (f .test (msg ().timestamp (ts )));
103- assertFalse (f .test (msg ().timestamp (ts .plus (1L , ChronoUnit . SECONDS ))));
104+ assertFalse (f .test (msg ().timestamp (ts .plusSeconds (1L ))));
104105 }
105106
106107 @ Test
@@ -177,6 +178,7 @@ void filterSpeedIsAtLeast5kPerSec() {
177178 toFilter .add (msg ().content (jsonContent ).key (randString ));
178179 }
179180 // first iteration for warmup
181+ // noinspection ResultOfMethodCallIgnored
180182 toFilter .stream ().filter (f ).count ();
181183
182184 long before = System .currentTimeMillis ();
@@ -188,10 +190,15 @@ void filterSpeedIsAtLeast5kPerSec() {
188190 }
189191 }
190192
193+ @ Test
194+ void testBase64DecodingWorks () {
195+ var uuid = UUID .randomUUID ().toString ();
196+ var msg = "test." + Base64 .getEncoder ().encodeToString (uuid .getBytes ());
197+ var f = celScriptFilter ("string(base64.decode(record.value.split('.')[1])).contains('" + uuid + "')" );
198+ assertTrue (f .test (msg ().content (msg )));
199+ }
200+
191201 private TopicMessageDTO msg () {
192- return new TopicMessageDTO ()
193- .timestamp (OffsetDateTime .now ())
194- .offset (-1L )
195- .partition (1 );
202+ return new TopicMessageDTO (1 , -1L , OffsetDateTime .now ());
196203 }
197204}
0 commit comments