@@ -27,8 +27,9 @@ public class KafkaPostgresOrderEventConsumer implements Runnable {
2727 final static String inventoryTopicName = "inventory.topic" ;
2828 KafkaPostgressInventoryResource inventoryResource ;
2929
30- public KafkaPostgresOrderEventConsumer (KafkaPostgressInventoryResource inventoryResource ) {
30+ public KafkaPostgresOrderEventConsumer (KafkaPostgressInventoryResource inventoryResource ) throws SQLException {
3131 this .inventoryResource = inventoryResource ;
32+ setupDB (inventoryResource .postgresDataSource .getConnection ());
3233 }
3334
3435 @ Override
@@ -69,9 +70,7 @@ public void listenForOrderEvents() {
6970 Order order = JsonUtils .read (txt , Order .class );
7071 System .out .print (" orderid:" + order .getOrderid ());
7172 System .out .print (" itemid:" + order .getItemid ());
72- if (inventoryResource .crashAfterOrderMessageReceived ) System .exit (-1 );
7373 updateDataAndSendEventOnInventory (order .getOrderid (), order .getItemid ());
74- if (inventoryResource .crashAfterOrderMessageProcessed ) System .exit (-1 );
7574 } catch (Exception ex ) {
7675 System .out .printf ("message did not contain order" );
7776 ex .printStackTrace ();
@@ -81,11 +80,12 @@ public void listenForOrderEvents() {
8180 }
8281
8382 private void updateDataAndSendEventOnInventory ( String orderid , String itemid ) throws Exception {
83+ if (inventoryResource .crashAfterOrderMessageReceived ) System .exit (-1 );
8484 String inventorylocation = evaluateInventory (itemid );
8585 Inventory inventory = new Inventory (orderid , itemid , inventorylocation , "beer" ); //static suggestiveSale - represents an additional service/event
8686 String jsonString = JsonUtils .writeValueAsString (inventory );
8787 System .out .println ("send inventory status message... jsonString:" + jsonString );
88- System . out . println ( "sendInsertAndSendOrderMessage........." );
88+ if ( inventoryResource . crashAfterOrderMessageProcessed ) System . exit (- 1 );
8989 String topicName = inventoryTopicName ;
9090 Properties props = new Properties ();
9191 props .put ("bootstrap.servers" , "kafka-service:9092" );
@@ -109,21 +109,23 @@ private void updateDataAndSendEventOnInventory( String orderid, String itemid) t
109109 private String evaluateInventory (String id ) {
110110 System .out .println ("KafkaPostgresOrderEventConsumer postgresDataSource:" + inventoryResource .postgresDataSource );
111111 System .out .println ("KafkaPostgresOrderEventConsumer evaluateInventory for inventoryid:" + id );
112- String DECREMENT_BY_ID =
113- "update inventory set inventorycount = inventorycount - 1 where inventoryid = ? and inventorycount > 0 returning inventorylocation into ?" ;
114- // try (CallableStatement st = inventoryResource.postgresDataSource.getConnection().prepareCall(DECREMENT_BY_ID)) {
115112 try (PreparedStatement st = inventoryResource .postgresDataSource .getConnection ().prepareStatement (
116113 "select inventorycount, inventorylocation from inventory where inventoryid = ?"
117114 )) {
118115 st .setString (1 , id );
119- // st.re.registerOutParameter(2, Types.VARCHAR);
120116 ResultSet rs = st .executeQuery ();
121117 rs .next ();
122118 int inventoryCount = rs .getInt (1 );
123119 String inventorylocation = rs .getString (2 );
124120 rs .close ();
125121 System .out .println ("InventoryServiceOrderEventConsumer.updateDataAndSendEventOnInventory id {" + id + "} location {" + inventorylocation + "} inventoryCount:" + inventoryCount );
126122 if (inventoryCount > 0 ) {
123+ PreparedStatement decrementPS = inventoryResource .postgresDataSource .getConnection ().prepareStatement (
124+ "update inventory set inventorycount = ? where inventoryid = ?" );
125+ decrementPS .setInt (1 , inventoryCount - 1 );
126+ decrementPS .setString (2 , id );
127+ decrementPS .execute ();
128+ System .out .println ("InventoryServiceOrderEventConsumer.updateDataAndSendEventOnInventory reduced inventory count to:" + (inventoryCount - 1 ));
127129 return inventorylocation ;
128130 } else {
129131 return "inventorydoesnotexist" ;
@@ -134,17 +136,22 @@ private String evaluateInventory(String id) {
134136 return "unable to find inventory status" ;
135137 }
136138
139+ private void setupDB (Connection connection ) throws SQLException {
140+ createInventoryTable (connection );
141+ populateInventoryTable (connection );
142+ }
143+
137144 private void createInventoryTable (Connection connection ) throws SQLException {
138- System .out .println ("KafkaPostgresOrderEventConsumer createInventoryTable" );
145+ System .out .println ("KafkaPostgresOrderEventConsumer createInventoryTable IF NOT EXISTS " );
139146 connection .prepareStatement (
140- "create table inventory ( inventoryid varchar(16) PRIMARY KEY NOT NULL, inventorylocation varchar(32), inventorycount integer CONSTRAINT positive_inventory CHECK (inventorycount >= 0) )" ).execute ();
147+ "CREATE TABLE IF NOT EXISTS inventory ( inventoryid varchar(16) PRIMARY KEY NOT NULL, inventorylocation varchar(32), inventorycount integer CONSTRAINT positive_inventory CHECK (inventorycount >= 0) )" ).execute ();
141148 }
142149
143150 private void populateInventoryTable (Connection connection ) throws SQLException {
144- System .out .println ("KafkaPostgresOrderEventConsumer populateInventoryTable" );
145- connection .prepareStatement ("insert into inventory values ('sushi', '1468 WEBSTER ST,San Francisco,CA', 0)" ).execute ();
146- connection .prepareStatement ("insert into inventory values ('pizza', '1469 WEBSTER ST,San Francisco,CA', 0)" ).execute ();
147- connection .prepareStatement ("insert into inventory values ('burger', '1470 WEBSTER ST,San Francisco,CA', 0)" ).execute ();
151+ System .out .println ("KafkaPostgresOrderEventConsumer populateInventoryTable if not populated " );
152+ connection .prepareStatement ("insert into inventory values ('sushi', '1468 WEBSTER ST,San Francisco,CA', 0) ON CONFLICT DO NOTHING " ).execute ();
153+ connection .prepareStatement ("insert into inventory values ('pizza', '1469 WEBSTER ST,San Francisco,CA', 0) ON CONFLICT DO NOTHING " ).execute ();
154+ connection .prepareStatement ("insert into inventory values ('burger', '1470 WEBSTER ST,San Francisco,CA', 0) ON CONFLICT DO NOTHING " ).execute ();
148155 }
149156
150157
0 commit comments