Skip to content

Commit 30798a2

Browse files
Merge #2
2: Update model as per agg-inventory-cmd r=Jaskaranbir a=Jaskaranbir Co-authored-by: Jaskaranbir <[email protected]>
2 parents dfbea4f + 09c5885 commit 30798a2

File tree

7 files changed

+38
-68
lines changed

7 files changed

+38
-68
lines changed

.env

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ KAFKA_BROKERS=kafka:9092
44
KAFKA_CONSUMER_EVENT_GROUP=agg.inventory.query.event.1
55
KAFKA_CONSUMER_EVENT_QUERY_GROUP=agg.inventory.query.eq.1
66

7-
KAFKA_CONSUMER_EVENT_TOPIC=event.persistence.response.2
8-
KAFKA_CONSUMER_EVENT_QUERY_TOPIC=esquery.response.2
7+
KAFKA_CONSUMER_EVENT_TOPIC=event.persistence.response
8+
KAFKA_CONSUMER_EVENT_QUERY_TOPIC=esquery.response
99
KAFKA_PRODUCER_EVENT_TOPIC=event.rns_eventstore.events
1010
KAFKA_PRODUCER_EVENT_QUERY_TOPIC=esquery.request
1111
KAFKA_PRODUCER_RESPONSE_TOPIC=agg.inventory.response

inventory/model.go

Lines changed: 26 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111
"github.com/pkg/errors"
1212
)
1313

14+
// AggregateID is the global AggregateID for Inventory Aggregate.
15+
const AggregateID int8 = 2
16+
1417
// Inventory defines the Inventory Aggregate.
1518
type Inventory struct {
1619
ID objectid.ObjectID `bson:"_id,omitempty" json:"_id,omitempty"`
@@ -25,7 +28,6 @@ type Inventory struct {
2528
Name string `bson:"name,omitempty" json:"name,omitempty"`
2629
Origin string `bson:"origin,omitempty" json:"origin,omitempty"`
2730
Price float64 `bson:"price,omitempty" json:"price,omitempty"`
28-
Quantity int64 `bson:"quantity,omitempty" json:"quantity,omitempty"`
2931
RSCustomerID uuuid.UUID `bson:"rsCustomerID,omitempty" json:"rsCustomerID,omitempty"`
3032
SalePrice float64 `bson:"salePrice,omitempty" json:"salePrice,omitempty"`
3133
SKU string `bson:"sku,omitempty" json:"sku,omitempty"`
@@ -36,58 +38,33 @@ type Inventory struct {
3638
WasteWeight float64 `bson:"wasteWeight,omitempty" json:"wasteWeight,omitempty"`
3739
}
3840

39-
// marshalInventory is simplified version of Inventory, for convenience
40-
// in Marshalling and Unmarshalling operations.
41-
type marshalInventory struct {
42-
ID objectid.ObjectID `bson:"_id,omitempty" json:"_id,omitempty"`
43-
ItemID string `bson:"itemID,omitempty" json:"itemID,omitempty"`
44-
Barcode string `bson:"barcode,omitempty" json:"barcode,omitempty"`
45-
DateArrived int64 `bson:"dateArrived,omitempty" json:"dateArrived,omitempty"`
46-
DateSold int64 `bson:"dateSold,omitempty" json:"dateSold,omitempty"`
47-
DeviceID string `bson:"deviceID,omitempty" json:"deviceID,omitempty"`
48-
DonateWeight float64 `bson:"donateWeight,omitempty" json:"donateWeight,omitempty"`
49-
ExpiryDate int64 `bson:"expiryDate,omitempty" json:"expiryDate,omitempty"`
50-
Lot string `bson:"lot,omitempty" json:"lot,omitempty"`
51-
Name string `bson:"name,omitempty" json:"name,omitempty"`
52-
Origin string `bson:"origin,omitempty" json:"origin,omitempty"`
53-
Price float64 `bson:"price,omitempty" json:"price,omitempty"`
54-
Quantity int64 `bson:"quantity,omitempty" json:"quantity,omitempty"`
55-
RSCustomerID string `bson:"rsCustomerID,omitempty" json:"rsCustomerID,omitempty"`
56-
SalePrice float64 `bson:"salePrice,omitempty" json:"salePrice,omitempty"`
57-
SKU string `bson:"sku,omitempty" json:"sku,omitempty"`
58-
SoldWeight float64 `bson:"soldWeight,omitempty" json:"soldWeight,omitempty"`
59-
Timestamp int64 `bson:"timestamp,omitempty" json:"timestamp,omitempty"`
60-
TotalWeight float64 `bson:"totalWeight,omitempty" json:"totalWeight,omitempty"`
61-
UPC int64 `bson:"upc,omitempty" json:"upc,omitempty"`
62-
WasteWeight float64 `bson:"wasteWeight,omitempty" json:"wasteWeight,omitempty"`
63-
}
64-
6541
// MarshalBSON returns bytes of BSON-type.
6642
func (i Inventory) MarshalBSON() ([]byte, error) {
67-
in := &marshalInventory{
68-
ID: i.ID,
69-
ItemID: i.ItemID.String(),
70-
Barcode: i.Barcode,
71-
DateArrived: i.DateArrived,
72-
DateSold: i.DateSold,
73-
DeviceID: i.DeviceID.String(),
74-
DonateWeight: i.DonateWeight,
75-
ExpiryDate: i.ExpiryDate,
76-
Lot: i.Lot,
77-
Name: i.Name,
78-
Origin: i.Origin,
79-
Price: i.Price,
80-
Quantity: i.Quantity,
81-
RSCustomerID: i.RSCustomerID.String(),
82-
SalePrice: i.SalePrice,
83-
SKU: i.SKU,
84-
SoldWeight: i.SoldWeight,
85-
Timestamp: i.Timestamp,
86-
TotalWeight: i.TotalWeight,
87-
UPC: i.UPC,
88-
WasteWeight: i.WasteWeight,
43+
in := map[string]interface{}{
44+
"itemID": i.ItemID.String(),
45+
"barcode": i.Barcode,
46+
"dateArrived": i.DateArrived,
47+
"dateSold": i.DateSold,
48+
"deviceID": i.DeviceID.String(),
49+
"donateWeight": i.DonateWeight,
50+
"expiryDate": i.ExpiryDate,
51+
"lot": i.Lot,
52+
"name": i.Name,
53+
"origin": i.Origin,
54+
"price": i.Price,
55+
"rsCustomerID": i.RSCustomerID.String(),
56+
"salePrice": i.SalePrice,
57+
"sku": i.SKU,
58+
"soldWeight": i.SoldWeight,
59+
"timestamp": i.Timestamp,
60+
"totalWeight": i.TotalWeight,
61+
"upc": i.UPC,
62+
"wasteWeight": i.WasteWeight,
8963
}
9064

65+
if i.ID != objectid.NilObjectID {
66+
in["_id"] = i.ID
67+
}
9168
return bson.Marshal(in)
9269
}
9370

@@ -105,7 +82,6 @@ func (i *Inventory) MarshalJSON() ([]byte, error) {
10582
"name": i.Name,
10683
"origin": i.Origin,
10784
"price": i.Price,
108-
"quantity": i.Quantity,
10985
"rsCustomerID": i.RSCustomerID.String(),
11086
"salePrice": i.SalePrice,
11187
"sku": i.SKU,
@@ -248,13 +224,6 @@ func (i *Inventory) unmarshalFromMap(m map[string]interface{}) error {
248224
return err
249225
}
250226
}
251-
if m["quantity"] != nil {
252-
i.Quantity, err = util.AssertInt64(m["quantity"])
253-
if err != nil {
254-
err = errors.Wrap(err, "Error while asserting Quantity")
255-
return err
256-
}
257-
}
258227
if m["salePrice"] != nil {
259228
i.SalePrice, err = util.AssertFloat64(m["salePrice"])
260229
if err != nil {

main/config_kafka.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package main
22

33
import (
4+
"fmt"
45
"os"
56

7+
"github.com/TerrexTech/agg-inventory-query/inventory"
68
"github.com/TerrexTech/go-commonutils/commonutil"
79
"github.com/TerrexTech/go-eventspoll/poll"
810
"github.com/TerrexTech/go-kafkautils/kafka"
@@ -20,6 +22,9 @@ func loadKafkaConfig() (*poll.KafkaConfig, error) {
2022
pEventQueryTopic := os.Getenv("KAFKA_PRODUCER_EVENT_QUERY_TOPIC")
2123
pResponseTopic := os.Getenv("KAFKA_PRODUCER_RESPONSE_TOPIC")
2224

25+
cEventTopic = fmt.Sprintf("%s.%d", cEventTopic, inventory.AggregateID)
26+
cEventQueryTopic = fmt.Sprintf("%s.%d", cEventQueryTopic, inventory.AggregateID)
27+
2328
kc := &poll.KafkaConfig{
2429
EventCons: &kafka.ConsumerConfig{
2530
KafkaBrokers: kafkaBrokers,

main/config_mongo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func loadMongoConfig() (*poll.MongoConfig, error) {
6767
}
6868

6969
return &poll.MongoConfig{
70-
AggregateID: aggregateID,
70+
AggregateID: inventory.AggregateID,
7171
AggCollection: aggMongoCollection,
7272
Connection: conn,
7373
MetaDatabaseName: database,

main/main.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"github.com/pkg/errors"
1111
)
1212

13-
var aggregateID int8 = 2
14-
1513
func validateEnv() error {
1614
missingVar, err := commonutil.ValidateEnv(
1715
"KAFKA_BROKERS",
@@ -84,8 +82,8 @@ func main() {
8482

8583
for {
8684
select {
87-
case err := <-eventPoll.Wait():
88-
err = errors.Wrap(err, "A critical error occurred")
85+
case <-eventPoll.RoutinesCtx().Done():
86+
err = errors.New("service-context closed")
8987
log.Fatalln(err)
9088

9189
case eventResp := <-eventPoll.Query():

test/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ services:
4949
- ./.env.esquery
5050

5151
agg-inventory-cmd:
52-
image: terrextech/agg-inventory-cmd:v1.0.0
52+
image: terrextech/agg-inventory-cmd:v1.1.0
5353
env_file:
5454
- ./.env.agg-inventory-cmd
5555

test/inventory_suite_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func TestInventory(t *testing.T) {
6161
}
6262

6363
var _ = Describe("InventoryAggregate", func() {
64-
var aggregateID int8 = 2
6564
var (
6665
kafkaBrokers []string
6766
eventsTopic string
@@ -93,7 +92,6 @@ var _ = Describe("InventoryAggregate", func() {
9392
Name: "test-name",
9493
Origin: "test-origin",
9594
Price: 13.4,
96-
Quantity: 45,
9795
RSCustomerID: rsCustomerID,
9896
SalePrice: 12.23,
9997
SKU: "test-sku",
@@ -114,7 +112,7 @@ var _ = Describe("InventoryAggregate", func() {
114112
mockEvent = &model.Event{
115113
Action: "insert",
116114
CorrelationID: cid,
117-
AggregateID: aggregateID,
115+
AggregateID: inventory.AggregateID,
118116
Data: marshalInv,
119117
Timestamp: time.Now(),
120118
UserUUID: uid,

0 commit comments

Comments
 (0)