Skip to content

Commit d2c3d2d

Browse files
lavkeshMayurGubrelemayur.gubrele
authored
chore: Minor refactor (#66)
* feat: Bigtable Sink (#33) * feat: add bigtable sink with stringified odpf messages as values * feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud * feat: add wrapper class on bigtable client * refactor: fix checkstyle * feat: add bigtable parser tests * feat: add bigtable sink tests * feat: add bigtable client tests * chore: revert version bump * chore: revert version change in build.gradle Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: qa and review fixes * chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true * feat: Bigtable record parser (#39) * feat: Bigtable Sink (#33) * feat: add bigtable sink with stringified odpf messages as values * feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud * feat: add wrapper class on bigtable client * refactor: fix checkstyle * feat: add bigtable parser tests * feat: add bigtable sink tests * feat: add bigtable client tests * chore: revert version bump * chore: revert version change in build.gradle Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * feat: create bigtable records using InputOutputFieldMapping provided as configuration * refactor: fix checkstyle and add unit tests * review: minor refactor * refactor: add BigTableSchemaTest and fix BigTableRecordParserTest * refactor: fix checkstyle * tests: add few more tests Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: schema refactoring * chore: naming conventions * feat: add functionality to create rowkey from configured template (#44) * chore: refactor Template validation * chore: change exception message in Template * feat: add bigtable sink metrics and logging (#51) * feat: Bigtable Sink (#33) * feat: add bigtable sink with stringified odpf messages as values * feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud * feat: add wrapper class on bigtable client * refactor: fix checkstyle * feat: add bigtable parser tests * feat: add bigtable sink tests * feat: add bigtable client tests * chore: revert version bump * chore: revert version change in build.gradle Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: qa and review fixes * chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true * feat: Bigtable record parser (#39) * feat: Bigtable Sink (#33) * feat: add bigtable sink with stringified odpf messages as values * feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud * feat: add wrapper class on bigtable client * refactor: fix checkstyle * feat: add bigtable parser tests * feat: add bigtable sink tests * feat: add bigtable client tests * chore: revert version bump * chore: revert version change in build.gradle Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * feat: create bigtable records using InputOutputFieldMapping provided as configuration * refactor: fix checkstyle and add unit tests * review: minor refactor * refactor: add BigTableSchemaTest and fix BigTableRecordParserTest * refactor: fix checkstyle * tests: add few more tests Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: schema refactoring * chore: naming conventions * feat: add functionality to create rowkey from configured template (#44) * chore: refactor Template validation * chore: change exception message in Template * feat: add bigtable sink metrics and logging (#51) * feat: parse bigtable errors and create odpf response * feat: capture error metrics with predefined tags * chore: add tests for bigtable response parser * chore: remove deprecated jcenter repo from build.gradle * chore: comment out classpath dependencies * chore: remove unused classpath dependencies * chore: minor refactor * chore: remove unused classpath dependencies * refactor: change BigtableRecord,Response contracts, improved logging, added tests * feat: Bigtable error parsing (#55) * feat: Bigtable Sink (#33) * feat: add bigtable sink with stringified odpf messages as values * feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud * feat: add wrapper class on bigtable client * refactor: fix checkstyle * feat: add bigtable parser tests * feat: add bigtable sink tests * feat: add bigtable client tests * chore: revert version bump * chore: revert version change in build.gradle Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: qa and review fixes * chore: change default SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH to true * feat: Bigtable record parser (#39) * feat: Bigtable Sink (#33) * feat: add bigtable sink with stringified odpf messages as values * feat: add BIGTABLE_CREDENTIAL_PATH to access google cloud * feat: add wrapper class on bigtable client * refactor: fix checkstyle * feat: add bigtable parser tests * feat: add bigtable sink tests * feat: add bigtable client tests * chore: revert version bump * chore: revert version change in build.gradle Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * feat: create bigtable records using InputOutputFieldMapping provided as configuration * refactor: fix checkstyle and add unit tests * review: minor refactor * refactor: add BigTableSchemaTest and fix BigTableRecordParserTest * refactor: fix checkstyle * tests: add few more tests Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: schema refactoring * chore: naming conventions * feat: add functionality to create rowkey from configured template (#44) * chore: refactor Template validation * chore: change exception message in Template * feat: add bigtable sink metrics and logging (#51) * feat: parse bigtable errors and create odpf response * feat: capture error metrics with predefined tags * chore: add tests for bigtable response parser * chore: remove deprecated jcenter repo from build.gradle * chore: comment out classpath dependencies * chore: remove unused classpath dependencies * chore: minor refactor * chore: remove unused classpath dependencies * refactor: change BigtableRecord,Response contracts, improved logging, added tests Co-authored-by: mayur.gubrele <[email protected]> Co-authored-by: lavkesh <[email protected]> * chore: small fixes Co-authored-by: Mayur Gubrele <[email protected]> Co-authored-by: mayur.gubrele <[email protected]>
1 parent 85435fd commit d2c3d2d

File tree

4 files changed

+32
-0
lines changed

4 files changed

+32
-0
lines changed

src/main/java/io/odpf/depot/message/MessageUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import com.jayway.jsonpath.PathNotFoundException;
66
import org.json.JSONObject;
77

8+
import java.io.IOException;
9+
810
public class MessageUtils {
911

1012
public static Object getFieldFromJsonObject(String name, JSONObject jsonObject, Configuration jsonPathConfig) {
@@ -16,4 +18,15 @@ public static Object getFieldFromJsonObject(String name, JSONObject jsonObject,
1618
throw new IllegalArgumentException("Invalid field config : " + name, e);
1719
}
1820
}
21+
22+
public static void validate(OdpfMessage message, Class validClass) throws IOException {
23+
if ((message.getLogKey() != null && !(validClass.isInstance(message.getLogKey())))
24+
|| (message.getLogMessage() != null && !(validClass.isInstance(message.getLogMessage())))) {
25+
throw new IOException(
26+
String.format("Expected class %s, but found: LogKey class: %s, LogMessage class: %s",
27+
validClass,
28+
message.getLogKey() != null ? message.getLogKey().getClass() : "n/a",
29+
message.getLogMessage() != null ? message.getLogMessage().getClass() : "n/a"));
30+
}
31+
}
1932
}

src/main/java/io/odpf/depot/message/json/JsonOdpfMessageParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.odpf.depot.config.OdpfSinkConfig;
66
import io.odpf.depot.exception.ConfigurationException;
77
import io.odpf.depot.exception.EmptyMessageException;
8+
import io.odpf.depot.message.MessageUtils;
89
import io.odpf.depot.message.OdpfMessage;
910
import io.odpf.depot.message.OdpfMessageParser;
1011
import io.odpf.depot.message.OdpfMessageSchema;
@@ -42,6 +43,7 @@ public ParsedOdpfMessage parse(OdpfMessage message, SinkConnectorSchemaMessageMo
4243
if (type == null) {
4344
throw new IOException("message mode not defined");
4445
}
46+
MessageUtils.validate(message, byte[].class);
4547
byte[] payload;
4648
switch (type) {
4749
case LOG_KEY:

src/main/java/io/odpf/depot/message/proto/ProtoOdpfMessageParser.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.odpf.depot.config.OdpfSinkConfig;
99
import io.odpf.depot.exception.ConfigurationException;
1010
import io.odpf.depot.exception.EmptyMessageException;
11+
import io.odpf.depot.message.MessageUtils;
1112
import io.odpf.depot.message.OdpfMessage;
1213
import io.odpf.depot.message.OdpfMessageParser;
1314
import io.odpf.depot.message.OdpfMessageSchema;
@@ -60,6 +61,7 @@ public ParsedOdpfMessage parse(OdpfMessage message, SinkConnectorSchemaMessageMo
6061
if (type == null) {
6162
throw new IOException("parser mode not defined");
6263
}
64+
MessageUtils.validate(message, byte[].class);
6365
byte[] payload;
6466
switch (type) {
6567
case LOG_MESSAGE:

src/test/java/io/odpf/depot/message/MessageUtilsTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import org.json.JSONObject;
66
import org.junit.Assert;
77
import org.junit.Test;
8+
import org.junit.jupiter.api.Assertions;
9+
10+
import java.io.IOException;
811

912
public class MessageUtilsTest {
1013
private final Configuration configuration = Configuration.builder()
@@ -70,4 +73,16 @@ public void shouldThrowExceptionIfInvalidPath() {
7073
exception = Assert.assertThrows(IllegalArgumentException.class, () -> MessageUtils.getFieldFromJsonObject("test[0].testing", object, configuration));
7174
Assert.assertEquals("Invalid field config : test[0].testing", exception.getMessage());
7275
}
76+
@Test
77+
public void shouldNotThrowExceptionIfValid() throws IOException {
78+
OdpfMessage message = new OdpfMessage("test", "test");
79+
MessageUtils.validate(message, String.class);
80+
81+
}
82+
@Test
83+
public void shouldThrowExceptionIfNotValid() {
84+
OdpfMessage message = new OdpfMessage("test", "test");
85+
IOException ioException = Assertions.assertThrows(IOException.class, () -> MessageUtils.validate(message, Integer.class));
86+
Assert.assertEquals("Expected class class java.lang.Integer, but found: LogKey class: class java.lang.String, LogMessage class: class java.lang.String", ioException.getMessage());
87+
}
7388
}

0 commit comments

Comments
 (0)