Skip to content

Commit 6146073

Browse files
committed
Impl tests
1 parent de52924 commit 6146073

File tree

4 files changed

+213
-0
lines changed

4 files changed

+213
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import io.kafbat.ui.serde.api.DeserializeResult;
8+
import io.kafbat.ui.serde.api.Serde;
9+
import io.kafbat.ui.serdes.PropertyResolverImpl;
10+
import java.util.Map;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
14+
public class CheckpointSerdeTest extends MirrorMakerSerdesAbstractTest {
15+
16+
private static final CheckpointSerde SERDE = new CheckpointSerde();
17+
18+
@BeforeEach
19+
void init() {
20+
SERDE.configure(
21+
PropertyResolverImpl.empty(),
22+
PropertyResolverImpl.empty(),
23+
PropertyResolverImpl.empty()
24+
);
25+
}
26+
27+
@Test
28+
void testCanDeserialize() {
29+
assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.KEY));
30+
assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.VALUE));
31+
}
32+
33+
@Test
34+
void testDeserializeKey() throws JsonProcessingException {
35+
var key = decodeBase64("AAVncm91cAAFdG9waWMAAAAD");
36+
var expected = Map.of(
37+
"partition", 3,
38+
"topic", "topic",
39+
"group", "group"
40+
);
41+
42+
var result = SERDE.deserializer(TOPIC, Serde.Target.KEY).deserialize(HEADERS, key);
43+
var resultMap = jsonToMap(result.getResult());
44+
45+
assertEquals(DeserializeResult.Type.JSON, result.getType());
46+
assertEquals(expected, resultMap);
47+
}
48+
49+
@Test
50+
void testDeserializeValue() throws JsonProcessingException {
51+
var value = decodeBase64("AAAAAAAAA/bkPgAAAAHz1jf7AAA=");
52+
var expected = Map.of(
53+
"offset", 8385869819L,
54+
"upstreamOffset", 66511934,
55+
"metadata", ""
56+
);
57+
58+
var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value);
59+
var resultMap = jsonToMap(result.getResult());
60+
61+
assertEquals(DeserializeResult.Type.JSON, result.getType());
62+
assertEquals(expected, resultMap);
63+
}
64+
65+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import io.kafbat.ui.serde.api.DeserializeResult;
8+
import io.kafbat.ui.serde.api.Serde;
9+
import io.kafbat.ui.serdes.PropertyResolverImpl;
10+
import java.util.Map;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
14+
public class HeartbeatSerdeTest extends MirrorMakerSerdesAbstractTest {
15+
16+
private static final HeartbeatSerde SERDE = new HeartbeatSerde();
17+
18+
@BeforeEach
19+
void init() {
20+
SERDE.configure(
21+
PropertyResolverImpl.empty(),
22+
PropertyResolverImpl.empty(),
23+
PropertyResolverImpl.empty()
24+
);
25+
}
26+
27+
@Test
28+
void testCanDeserialize() {
29+
assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.KEY));
30+
assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.VALUE));
31+
}
32+
33+
@Test
34+
void testDeserializeKey() throws JsonProcessingException {
35+
var key = decodeBase64("AAZzb3VyY2UABnRhcmdldA==");
36+
var expected = Map.of(
37+
"sourceClusterAlias", "source",
38+
"targetClusterAlias", "target"
39+
);
40+
41+
var result = SERDE.deserializer(TOPIC, Serde.Target.KEY).deserialize(HEADERS, key);
42+
var resultMap = jsonToMap(result.getResult());
43+
44+
assertEquals(DeserializeResult.Type.JSON, result.getType());
45+
assertEquals(expected, resultMap);
46+
}
47+
48+
@Test
49+
void testDeserializeValue() {
50+
var value = decodeBase64("AAAAAAGZgCEMZA==");
51+
var expected = "1758791273572";
52+
53+
var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value);
54+
55+
assertEquals(DeserializeResult.Type.STRING, result.getType());
56+
assertEquals(expected, result.getResult());
57+
}
58+
59+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import io.kafbat.ui.serdes.RecordHeadersImpl;
7+
import java.util.Base64;
8+
import java.util.Map;
9+
10+
public abstract class MirrorMakerSerdesAbstractTest {
11+
12+
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
13+
protected static final String TOPIC = "test-topic";
14+
protected static final RecordHeadersImpl HEADERS = new RecordHeadersImpl();
15+
16+
protected Map<String, Object> jsonToMap(String json) throws JsonProcessingException {
17+
//@formatter:off
18+
return OBJECT_MAPPER.readValue(json, new TypeReference<>() {});
19+
//@formatter:on
20+
}
21+
22+
protected static byte[] decodeBase64(String base64) {
23+
return Base64.getDecoder().decode(base64.trim());
24+
}
25+
26+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import io.kafbat.ui.serde.api.DeserializeResult;
8+
import io.kafbat.ui.serde.api.Serde;
9+
import io.kafbat.ui.serdes.PropertyResolverImpl;
10+
import java.util.Map;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
14+
public class OffsetSyncSerdeTest extends MirrorMakerSerdesAbstractTest {
15+
16+
private static final OffsetSyncSerde SERDE = new OffsetSyncSerde();
17+
18+
@BeforeEach
19+
void init() {
20+
SERDE.configure(
21+
PropertyResolverImpl.empty(),
22+
PropertyResolverImpl.empty(),
23+
PropertyResolverImpl.empty()
24+
);
25+
}
26+
27+
@Test
28+
void testCanDeserialize() {
29+
assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.KEY));
30+
assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.VALUE));
31+
}
32+
33+
@Test
34+
void testDeserializeKey() throws JsonProcessingException {
35+
var key = decodeBase64("AAl0b3BpY25hbWUAAAAA");
36+
var expected = Map.of(
37+
"partition", 0,
38+
"topic", "topicname"
39+
);
40+
41+
var result = SERDE.deserializer(TOPIC, Serde.Target.KEY).deserialize(HEADERS, key);
42+
var resultMap = jsonToMap(result.getResult());
43+
44+
assertEquals(DeserializeResult.Type.JSON, result.getType());
45+
assertEquals(expected, resultMap);
46+
}
47+
48+
@Test
49+
void testDeserializeValue() throws JsonProcessingException {
50+
var value = decodeBase64("AAAAAAACXsoAAAAAAAHMfw==");
51+
var expected = Map.of(
52+
"offset", 117887,
53+
"upstreamOffset", 155338
54+
);
55+
56+
var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value);
57+
var resultMap = jsonToMap(result.getResult());
58+
59+
assertEquals(DeserializeResult.Type.JSON, result.getType());
60+
assertEquals(expected, resultMap);
61+
}
62+
63+
}

0 commit comments

Comments
 (0)