11package io .odpf .depot .bigtable .parser ;
22
3+ import com .google .protobuf .Timestamp ;
34import io .odpf .depot .TestBookingLogKey ;
45import io .odpf .depot .TestBookingLogMessage ;
6+ import io .odpf .depot .TestLocation ;
57import io .odpf .depot .TestServiceType ;
68import io .odpf .depot .bigtable .model .BigTableRecord ;
79import io .odpf .depot .bigtable .model .BigTableSchema ;
@@ -60,14 +62,22 @@ public void setUp() throws IOException, InvalidTemplateException {
6062 MockitoAnnotations .openMocks (this );
6163 System .setProperty ("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS" , "io.odpf.depot.TestBookingLogMessage" );
6264 System .setProperty ("SINK_CONNECTOR_SCHEMA_MESSAGE_MODE" , String .valueOf (SinkConnectorSchemaMessageMode .LOG_MESSAGE ));
63- System .setProperty ("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING" , "{}" );
65+ System .setProperty ("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING" , "{ \" cf1 \" : { \" q1 \" : \" order_number \" , \" q2 \" : \" service_type \" } }" );
6466 System .setProperty ("SINK_BIGTABLE_ROW_KEY_TEMPLATE" , "row-key-constant-string" );
6567
6668
6769 TestBookingLogKey bookingLogKey1 = TestBookingLogKey .newBuilder ().setOrderNumber ("order#1" ).setOrderUrl ("order-url#1" ).build ();
68- TestBookingLogMessage bookingLogMessage1 = TestBookingLogMessage .newBuilder ().setOrderNumber ("order#1" ).setOrderUrl ("order-url#1" ).setServiceType (TestServiceType .Enum .GO_SEND ).build ();
70+ TestBookingLogMessage bookingLogMessage1 = TestBookingLogMessage .newBuilder ().setOrderNumber ("order#1" ).setOrderUrl ("order-url#1" )
71+ .setEventTimestamp (Timestamp .newBuilder ().setSeconds (100L ).setNanos (200 ).build ())
72+ .setServiceType (TestServiceType .Enum .GO_SEND )
73+ .setDriverPickupLocation (TestLocation .newBuilder ().setLatitude (100D ).setLongitude (200D ).build ())
74+ .build ();
6975 TestBookingLogKey bookingLogKey2 = TestBookingLogKey .newBuilder ().setOrderNumber ("order#2" ).setOrderUrl ("order-url#2" ).build ();
70- TestBookingLogMessage bookingLogMessage2 = TestBookingLogMessage .newBuilder ().setOrderNumber ("order#2" ).setOrderUrl ("order-url#2" ).setServiceType (TestServiceType .Enum .GO_SHOP ).build ();
76+ TestBookingLogMessage bookingLogMessage2 = TestBookingLogMessage .newBuilder ().setOrderNumber ("order#2" ).setOrderUrl ("order-url#2" )
77+ .setEventTimestamp (Timestamp .newBuilder ().setSeconds (101L ).setNanos (202 ).build ())
78+ .setServiceType (TestServiceType .Enum .GO_SHOP )
79+ .setDriverPickupLocation (TestLocation .newBuilder ().setLatitude (300D ).setLongitude (400D ).build ())
80+ .build ();
7181
7282 OdpfMessage message1 = new OdpfMessage (bookingLogKey1 .toByteArray (), bookingLogMessage1 .toByteArray ());
7383 OdpfMessage message2 = new OdpfMessage (bookingLogKey2 .toByteArray (), bookingLogMessage2 .toByteArray ());
@@ -92,6 +102,57 @@ public void shouldReturnValidRecordsForListOfValidOdpfMessages() {
92102 assertNull (records .get (1 ).getErrorInfo ());
93103 }
94104
105+ @ Test
106+ public void shouldReturnValidRecordsForListOfValidOdpfMessagesForComplexFieldsInColumnsMapping () throws InvalidTemplateException {
107+ System .setProperty ("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING" , "{ \" cf1\" : { \" q1\" : \" order_number\" , \" q2\" : \" service_type\" , \" q3\" : \" driver_pickup_location\" } }" );
108+ ProtoOdpfMessageParser protoOdpfMessageParser = new ProtoOdpfMessageParser (stencilClient );
109+ sinkConfig = ConfigFactory .create (BigTableSinkConfig .class , System .getProperties ());
110+ Tuple <SinkConnectorSchemaMessageMode , String > modeAndSchema = MessageConfigUtils .getModeAndSchema (sinkConfig );
111+ BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser (new Template (sinkConfig .getRowKeyTemplate ()), schema );
112+ BigTableSchema bigtableSchema = new BigTableSchema (sinkConfig .getColumnFamilyMapping ());
113+ bigTableRecordParser = new BigTableRecordParser (protoOdpfMessageParser , bigTableRowKeyParser , modeAndSchema , schema , bigtableSchema );
114+
115+ List <BigTableRecord > records = bigTableRecordParser .convert (messages );
116+ assertTrue (records .get (0 ).isValid ());
117+ assertTrue (records .get (1 ).isValid ());
118+ assertNull (records .get (0 ).getErrorInfo ());
119+ assertNull (records .get (1 ).getErrorInfo ());
120+ }
121+
122+ @ Test
123+ public void shouldReturnValidRecordsForListOfValidOdpfMessagesForNestedTimestampFieldsInColumnsMapping () throws InvalidTemplateException {
124+ System .setProperty ("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING" , "{ \" cf1\" : { \" q1\" : \" order_number\" , \" q2\" : \" service_type\" , \" q3\" : \" event_timestamp.nanos\" } }" );
125+ ProtoOdpfMessageParser protoOdpfMessageParser = new ProtoOdpfMessageParser (stencilClient );
126+ sinkConfig = ConfigFactory .create (BigTableSinkConfig .class , System .getProperties ());
127+ Tuple <SinkConnectorSchemaMessageMode , String > modeAndSchema = MessageConfigUtils .getModeAndSchema (sinkConfig );
128+ BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser (new Template (sinkConfig .getRowKeyTemplate ()), schema );
129+ BigTableSchema bigtableSchema = new BigTableSchema (sinkConfig .getColumnFamilyMapping ());
130+ bigTableRecordParser = new BigTableRecordParser (protoOdpfMessageParser , bigTableRowKeyParser , modeAndSchema , schema , bigtableSchema );
131+
132+ List <BigTableRecord > records = bigTableRecordParser .convert (messages );
133+ assertTrue (records .get (0 ).isValid ());
134+ assertTrue (records .get (1 ).isValid ());
135+ assertNull (records .get (0 ).getErrorInfo ());
136+ assertNull (records .get (1 ).getErrorInfo ());
137+ }
138+
139+ @ Test
140+ public void shouldReturnValidRecordsForListOfValidOdpfMessagesForNestedFieldsInColumnsMapping () throws InvalidTemplateException {
141+ System .setProperty ("SINK_BIGTABLE_COLUMN_FAMILY_MAPPING" , "{ \" cf1\" : { \" q1\" : \" order_number\" , \" q2\" : \" service_type\" , \" q3\" : \" driver_pickup_location.latitude\" } }" );
142+ ProtoOdpfMessageParser protoOdpfMessageParser = new ProtoOdpfMessageParser (stencilClient );
143+ sinkConfig = ConfigFactory .create (BigTableSinkConfig .class , System .getProperties ());
144+ Tuple <SinkConnectorSchemaMessageMode , String > modeAndSchema = MessageConfigUtils .getModeAndSchema (sinkConfig );
145+ BigTableRowKeyParser bigTableRowKeyParser = new BigTableRowKeyParser (new Template (sinkConfig .getRowKeyTemplate ()), schema );
146+ BigTableSchema bigtableSchema = new BigTableSchema (sinkConfig .getColumnFamilyMapping ());
147+ bigTableRecordParser = new BigTableRecordParser (protoOdpfMessageParser , bigTableRowKeyParser , modeAndSchema , schema , bigtableSchema );
148+
149+ List <BigTableRecord > records = bigTableRecordParser .convert (messages );
150+ assertTrue (records .get (0 ).isValid ());
151+ assertTrue (records .get (1 ).isValid ());
152+ assertNull (records .get (0 ).getErrorInfo ());
153+ assertNull (records .get (1 ).getErrorInfo ());
154+ }
155+
95156 @ Test
96157 public void shouldReturnInvalidRecordForAnyNullOdpfMessage () {
97158 List <BigTableRecord > records = bigTableRecordParser .convert (Collections .list (new OdpfMessage (null , null )));
0 commit comments