Skip to content

Commit a4b7523

Browse files
committed
Add Count, Timestamp queries and bump model
1 parent 8e83c8e commit a4b7523

File tree

13 files changed

+479
-103
lines changed

13 files changed

+479
-103
lines changed

.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
SERVICE_NAME=agg-inventory-query
2+
KAFKA_LOG_PRODUCER_TOPIC=log.sink
3+
14
# ===> Kafka
25
KAFKA_BROKERS=kafka:9092
36

inventory/model.go

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@ const AggregateID int8 = 2
1818
type Inventory struct {
1919
ID objectid.ObjectID `bson:"_id,omitempty" json:"_id,omitempty"`
2020
ItemID uuuid.UUID `bson:"itemID,omitempty" json:"itemID,omitempty"`
21-
Barcode string `bson:"barcode,omitempty" json:"barcode,omitempty"`
2221
DateArrived int64 `bson:"dateArrived,omitempty" json:"dateArrived,omitempty"`
2322
DateSold int64 `bson:"dateSold,omitempty" json:"dateSold,omitempty"`
2423
DeviceID uuuid.UUID `bson:"deviceID,omitempty" json:"deviceID,omitempty"`
2524
DonateWeight float64 `bson:"donateWeight,omitempty" json:"donateWeight,omitempty"`
26-
ExpiryDate int64 `bson:"expiryDate,omitempty" json:"expiryDate,omitempty"`
2725
Lot string `bson:"lot,omitempty" json:"lot,omitempty"`
2826
Name string `bson:"name,omitempty" json:"name,omitempty"`
2927
Origin string `bson:"origin,omitempty" json:"origin,omitempty"`
@@ -34,20 +32,18 @@ type Inventory struct {
3432
SoldWeight float64 `bson:"soldWeight,omitempty" json:"soldWeight,omitempty"`
3533
Timestamp int64 `bson:"timestamp,omitempty" json:"timestamp,omitempty"`
3634
TotalWeight float64 `bson:"totalWeight,omitempty" json:"totalWeight,omitempty"`
37-
UPC int64 `bson:"upc,omitempty" json:"upc,omitempty"`
35+
UPC string `bson:"upc,omitempty" json:"upc,omitempty"`
3836
WasteWeight float64 `bson:"wasteWeight,omitempty" json:"wasteWeight,omitempty"`
3937
}
4038

4139
// MarshalBSON returns bytes of BSON-type.
4240
func (i Inventory) MarshalBSON() ([]byte, error) {
4341
in := map[string]interface{}{
4442
"itemID": i.ItemID.String(),
45-
"barcode": i.Barcode,
4643
"dateArrived": i.DateArrived,
4744
"dateSold": i.DateSold,
4845
"deviceID": i.DeviceID.String(),
4946
"donateWeight": i.DonateWeight,
50-
"expiryDate": i.ExpiryDate,
5147
"lot": i.Lot,
5248
"name": i.Name,
5349
"origin": i.Origin,
@@ -72,12 +68,10 @@ func (i Inventory) MarshalBSON() ([]byte, error) {
7268
func (i *Inventory) MarshalJSON() ([]byte, error) {
7369
in := map[string]interface{}{
7470
"itemID": i.ItemID.String(),
75-
"barcode": i.Barcode,
7671
"dateArrived": i.DateArrived,
7772
"dateSold": i.DateSold,
7873
"deviceID": i.DeviceID.String(),
7974
"donateWeight": i.DonateWeight,
80-
"expiryDate": i.ExpiryDate,
8175
"lot": i.Lot,
8276
"name": i.Name,
8377
"origin": i.Origin,
@@ -165,12 +159,6 @@ func (i *Inventory) unmarshalFromMap(m map[string]interface{}) error {
165159
}
166160
}
167161

168-
if m["barcode"] != nil {
169-
i.Barcode, assertOK = m["barcode"].(string)
170-
if !assertOK {
171-
return errors.New("Error while asserting Barcode")
172-
}
173-
}
174162
if m["dateArrived"] != nil {
175163
i.DateArrived, err = util.AssertInt64(m["dateArrived"])
176164
if err != nil {
@@ -192,13 +180,6 @@ func (i *Inventory) unmarshalFromMap(m map[string]interface{}) error {
192180
return err
193181
}
194182
}
195-
if m["expiryDate"] != nil {
196-
i.ExpiryDate, err = util.AssertInt64(m["expiryDate"])
197-
if err != nil {
198-
err = errors.Wrap(err, "Error while asserting ExpiryDate")
199-
return err
200-
}
201-
}
202183
if m["lot"] != nil {
203184
i.Lot, assertOK = m["lot"].(string)
204185
if !assertOK {
@@ -259,10 +240,9 @@ func (i *Inventory) unmarshalFromMap(m map[string]interface{}) error {
259240
}
260241
}
261242
if m["upc"] != nil {
262-
i.UPC, err = util.AssertInt64(m["upc"])
263-
if err != nil {
264-
err = errors.Wrap(err, "Error while asserting UPC")
265-
return err
243+
i.UPC, assertOK = m["upc"].(string)
244+
if !assertOK {
245+
return errors.New("Error while asserting UPC")
266246
}
267247
}
268248
if m["wasteWeight"] != nil {

inventory/query.go

Lines changed: 7 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,18 @@
11
package inventory
22

33
import (
4-
"encoding/json"
5-
"log"
6-
74
"github.com/TerrexTech/go-eventstore-models/model"
85
"github.com/TerrexTech/go-mongoutils/mongo"
9-
"github.com/pkg/errors"
106
)
117

128
// Query handles "query" events.
139
func Query(collection *mongo.Collection, event *model.Event) *model.KafkaResponse {
14-
filter := map[string]interface{}{}
15-
16-
err := json.Unmarshal(event.Data, &filter)
17-
if err != nil {
18-
err = errors.Wrap(err, "Query: Error while unmarshalling Event-data")
19-
log.Println(err)
20-
return &model.KafkaResponse{
21-
AggregateID: event.AggregateID,
22-
CorrelationID: event.CorrelationID,
23-
Error: err.Error(),
24-
ErrorCode: InternalError,
25-
EventAction: event.EventAction,
26-
ServiceAction: event.ServiceAction,
27-
UUID: event.UUID,
28-
}
29-
}
30-
31-
if len(filter) == 0 {
32-
err = errors.New("blank filter provided")
33-
err = errors.Wrap(err, "Query")
34-
log.Println(err)
35-
return &model.KafkaResponse{
36-
AggregateID: event.AggregateID,
37-
CorrelationID: event.CorrelationID,
38-
Error: err.Error(),
39-
ErrorCode: InternalError,
40-
EventAction: event.EventAction,
41-
ServiceAction: event.ServiceAction,
42-
UUID: event.UUID,
43-
}
44-
}
45-
46-
result, err := collection.Find(filter)
47-
if err != nil {
48-
err = errors.Wrap(err, "Query: Error in DeleteMany")
49-
log.Println(err)
50-
return &model.KafkaResponse{
51-
AggregateID: event.AggregateID,
52-
CorrelationID: event.CorrelationID,
53-
Error: err.Error(),
54-
ErrorCode: DatabaseError,
55-
EventAction: event.EventAction,
56-
ServiceAction: event.ServiceAction,
57-
UUID: event.UUID,
58-
}
59-
}
60-
61-
resultMarshal, err := json.Marshal(result)
62-
if err != nil {
63-
err = errors.Wrap(err, "Query: Error marshalling Inventory Delete-result")
64-
log.Println(err)
65-
return &model.KafkaResponse{
66-
AggregateID: event.AggregateID,
67-
CorrelationID: event.CorrelationID,
68-
Error: err.Error(),
69-
ErrorCode: InternalError,
70-
EventAction: event.EventAction,
71-
ServiceAction: event.ServiceAction,
72-
UUID: event.UUID,
73-
}
74-
}
75-
76-
return &model.KafkaResponse{
77-
AggregateID: event.AggregateID,
78-
CorrelationID: event.CorrelationID,
79-
EventAction: event.EventAction,
80-
Result: resultMarshal,
81-
ServiceAction: event.ServiceAction,
82-
UUID: event.UUID,
10+
switch event.ServiceAction {
11+
case "timestamp":
12+
return queryTimestamp(collection, event)
13+
case "count":
14+
return queryCount(collection, event)
15+
default:
16+
return queryInventory(collection, event)
8317
}
8418
}

inventory/query_count.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package inventory
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"strconv"
7+
8+
"github.com/mongodb/mongo-go-driver/mongo/findopt"
9+
10+
"github.com/pkg/errors"
11+
12+
"github.com/TerrexTech/go-eventstore-models/model"
13+
"github.com/TerrexTech/go-mongoutils/mongo"
14+
)
15+
16+
func queryCount(collection *mongo.Collection, event *model.Event) *model.KafkaResponse {
17+
count, err := strconv.Atoi(string(event.Data))
18+
if err != nil {
19+
err = errors.Wrap(err, "QueryCount: Error converting parameter to number")
20+
log.Println(err)
21+
return &model.KafkaResponse{
22+
AggregateID: event.AggregateID,
23+
CorrelationID: event.CorrelationID,
24+
Error: err.Error(),
25+
ErrorCode: InternalError,
26+
EventAction: event.EventAction,
27+
ServiceAction: event.ServiceAction,
28+
UUID: event.UUID,
29+
}
30+
}
31+
32+
if count == 0 {
33+
err = errors.New("count must be greater than 0")
34+
log.Println(err)
35+
return &model.KafkaResponse{
36+
AggregateID: event.AggregateID,
37+
CorrelationID: event.CorrelationID,
38+
Error: err.Error(),
39+
ErrorCode: DatabaseError,
40+
EventAction: event.EventAction,
41+
ServiceAction: event.ServiceAction,
42+
UUID: event.UUID,
43+
}
44+
}
45+
if count > 100 {
46+
err = errors.New("count must be less than 100")
47+
log.Println(err)
48+
return &model.KafkaResponse{
49+
AggregateID: event.AggregateID,
50+
CorrelationID: event.CorrelationID,
51+
Error: err.Error(),
52+
ErrorCode: DatabaseError,
53+
EventAction: event.EventAction,
54+
ServiceAction: event.ServiceAction,
55+
UUID: event.UUID,
56+
}
57+
}
58+
59+
params := map[string]interface{}{
60+
"timestamp": map[string]interface{}{
61+
"$ne": 0,
62+
},
63+
}
64+
eventData, err := json.Marshal(params)
65+
if err != nil {
66+
err = errors.Wrap(err, "Error marshalling TimeConstraint-filter")
67+
log.Println(err)
68+
return &model.KafkaResponse{
69+
AggregateID: event.AggregateID,
70+
CorrelationID: event.CorrelationID,
71+
Error: err.Error(),
72+
ErrorCode: InternalError,
73+
EventAction: event.EventAction,
74+
ServiceAction: event.ServiceAction,
75+
UUID: event.UUID,
76+
}
77+
}
78+
event.Data = eventData
79+
80+
findopts := findopt.Limit(int64(count))
81+
return queryInventory(collection, event, findopts)
82+
}

inventory/query_inventory.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package inventory
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
7+
"github.com/mongodb/mongo-go-driver/mongo/findopt"
8+
9+
"github.com/TerrexTech/go-eventstore-models/model"
10+
"github.com/TerrexTech/go-mongoutils/mongo"
11+
"github.com/pkg/errors"
12+
)
13+
14+
func queryInventory(
15+
collection *mongo.Collection,
16+
event *model.Event,
17+
findopts ...findopt.Find,
18+
) *model.KafkaResponse {
19+
filter := map[string]interface{}{}
20+
21+
err := json.Unmarshal(event.Data, &filter)
22+
if err != nil {
23+
err = errors.Wrap(err, "Query: Error while unmarshalling Event-data")
24+
log.Println(err)
25+
return &model.KafkaResponse{
26+
AggregateID: event.AggregateID,
27+
CorrelationID: event.CorrelationID,
28+
Error: err.Error(),
29+
ErrorCode: InternalError,
30+
EventAction: event.EventAction,
31+
ServiceAction: event.ServiceAction,
32+
UUID: event.UUID,
33+
}
34+
}
35+
36+
if len(filter) == 0 {
37+
err = errors.New("blank filter provided")
38+
err = errors.Wrap(err, "Query")
39+
log.Println(err)
40+
return &model.KafkaResponse{
41+
AggregateID: event.AggregateID,
42+
CorrelationID: event.CorrelationID,
43+
Error: err.Error(),
44+
ErrorCode: InternalError,
45+
EventAction: event.EventAction,
46+
ServiceAction: event.ServiceAction,
47+
UUID: event.UUID,
48+
}
49+
}
50+
51+
result, err := collection.Find(filter, findopts...)
52+
if err != nil {
53+
err = errors.Wrap(err, "Query: Error in DeleteMany")
54+
log.Println(err)
55+
return &model.KafkaResponse{
56+
AggregateID: event.AggregateID,
57+
CorrelationID: event.CorrelationID,
58+
Error: err.Error(),
59+
ErrorCode: DatabaseError,
60+
EventAction: event.EventAction,
61+
ServiceAction: event.ServiceAction,
62+
UUID: event.UUID,
63+
}
64+
}
65+
66+
resultMarshal, err := json.Marshal(result)
67+
if err != nil {
68+
err = errors.Wrap(err, "Query: Error marshalling Inventory Delete-result")
69+
log.Println(err)
70+
return &model.KafkaResponse{
71+
AggregateID: event.AggregateID,
72+
CorrelationID: event.CorrelationID,
73+
Error: err.Error(),
74+
ErrorCode: InternalError,
75+
EventAction: event.EventAction,
76+
ServiceAction: event.ServiceAction,
77+
UUID: event.UUID,
78+
}
79+
}
80+
81+
return &model.KafkaResponse{
82+
AggregateID: event.AggregateID,
83+
CorrelationID: event.CorrelationID,
84+
EventAction: event.EventAction,
85+
Result: resultMarshal,
86+
ServiceAction: event.ServiceAction,
87+
UUID: event.UUID,
88+
}
89+
}

0 commit comments

Comments
 (0)