Skip to content

Commit 5d3fd36

Browse files
author
bhupeshbhatia
committed
Updated inventory query for capstone
1 parent c0c910d commit 5d3fd36

File tree

6 files changed

+168
-66
lines changed

6 files changed

+168
-66
lines changed

inventory/model.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Inventory struct {
3636
WasteWeight float64 `bson:"wasteWeight,omitempty" json:"wasteWeight,omitempty"`
3737
OnFlashSale bool `bson:"onFlashSale,omitempty" json:"onFlashSale,omitempty"`
3838
FlashSaleTimestamp int64 `bson:"flashSaleTimestamp,omitempty" json:"flashSaleTimestamp,omitempty"`
39+
ProjectedDate int64 `bson:"projectedDate,omitempty" json:"projectedDate,omitempty"`
3940
}
4041

4142
// MarshalBSON returns bytes of BSON-type.
@@ -60,6 +61,7 @@ func (i Inventory) MarshalBSON() ([]byte, error) {
6061
"upc": i.UPC,
6162
"wasteWeight": i.WasteWeight,
6263
"flashSaleTimestamp": i.FlashSaleTimestamp,
64+
"projectedDate": i.ProjectedDate,
6365
}
6466

6567
if i.ID != objectid.NilObjectID {
@@ -90,6 +92,7 @@ func (i *Inventory) MarshalJSON() ([]byte, error) {
9092
"upc": i.UPC,
9193
"wasteWeight": i.WasteWeight,
9294
"flashSaleTimestamp": i.FlashSaleTimestamp,
95+
"projectedDate": i.ProjectedDate,
9396
}
9497

9598
if i.ID != objectid.NilObjectID {
@@ -271,6 +274,13 @@ func (i *Inventory) unmarshalFromMap(m map[string]interface{}) error {
271274
return err
272275
}
273276
}
277+
if m["projectedDate"] != nil {
278+
i.ProjectedDate, err = util.AssertInt64(m["projectedDate"])
279+
if err != nil {
280+
err = errors.Wrap(err, "Error while asserting ProjectedDate")
281+
return err
282+
}
283+
}
274284

275285
return nil
276286
}

inventory/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package inventory
22

33
import (
4+
"log"
5+
46
"github.com/TerrexTech/go-eventstore-models/model"
57
"github.com/TerrexTech/go-mongoutils/mongo"
68
)
79

810
// Query handles "query" events.
911
func Query(collection *mongo.Collection, event *model.Event) *model.KafkaResponse {
12+
log.Println("^^^^^^^^^^^^^^^^^^^^^^^^^")
1013
switch event.ServiceAction {
1114
case "timestamp":
1215
return queryTimestamp(collection, event)
1316
case "count":
1417
return queryCount(collection, event)
18+
case "flashOrSale":
19+
return queryFlashOrSale(collection, event)
1520
default:
1621
return queryInventory(collection, event)
1722
}

inventory/query_flashOrSale.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package inventory
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
7+
"github.com/mongodb/mongo-go-driver/mongo/findopt"
8+
9+
"github.com/TerrexTech/go-eventstore-models/model"
10+
"github.com/TerrexTech/go-mongoutils/mongo"
11+
"github.com/pkg/errors"
12+
)
13+
14+
// TimeConstraints is the parameter for querying inventory by stat-time and end-time.
15+
type FlashOrSale struct {
16+
Field string `json:"field,omitempty"`
17+
Weight float64 `json:"weight,omitempty"`
18+
Count int `json:"count,omitempty"`
19+
}
20+
21+
func queryFlashOrSale(collection *mongo.Collection, event *model.Event) *model.KafkaResponse {
22+
flashOrSale := &FlashOrSale{}
23+
err := json.Unmarshal(event.Data, &flashOrSale)
24+
if err != nil {
25+
err = errors.Wrap(err, "Query: Error while unmarshalling Event-data")
26+
log.Println(err)
27+
return &model.KafkaResponse{
28+
AggregateID: event.AggregateID,
29+
CorrelationID: event.CorrelationID,
30+
Error: err.Error(),
31+
ErrorCode: InternalError,
32+
EventAction: event.EventAction,
33+
ServiceAction: event.ServiceAction,
34+
UUID: event.UUID,
35+
}
36+
}
37+
38+
if flashOrSale.Weight < 1 {
39+
err = errors.New("blank weight provided")
40+
err = errors.Wrap(err, "QueryFlashOrSale")
41+
log.Println(err)
42+
return &model.KafkaResponse{
43+
AggregateID: event.AggregateID,
44+
CorrelationID: event.CorrelationID,
45+
Error: err.Error(),
46+
ErrorCode: InternalError,
47+
EventAction: event.EventAction,
48+
ServiceAction: event.ServiceAction,
49+
UUID: event.UUID,
50+
}
51+
}
52+
53+
filter := map[string]interface{}{
54+
flashOrSale.Field: map[string]interface{}{
55+
"$gt": flashOrSale.Weight,
56+
},
57+
}
58+
eventData, err := json.Marshal(filter)
59+
if err != nil {
60+
err = errors.Wrap(err, "Error marshalling TimeConstraint-filter")
61+
log.Println(err)
62+
return &model.KafkaResponse{
63+
AggregateID: event.AggregateID,
64+
CorrelationID: event.CorrelationID,
65+
Error: err.Error(),
66+
ErrorCode: InternalError,
67+
EventAction: event.EventAction,
68+
ServiceAction: event.ServiceAction,
69+
UUID: event.UUID,
70+
}
71+
}
72+
event.Data = eventData
73+
74+
count := flashOrSale.Count
75+
// if count < 1 {
76+
// count = 50
77+
// } else if count == 50 {
78+
// count = 50
79+
// } else if count == 100 {
80+
// count = 100
81+
// }
82+
findopts := findopt.Limit(int64(count))
83+
return queryInventory(collection, event, findopts)
84+
}

inventory/query_inventory.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func queryInventory(
1818
) *model.KafkaResponse {
1919
filter := map[string]interface{}{}
2020

21+
log.Println(event.Data)
2122
err := json.Unmarshal(event.Data, &filter)
2223
if err != nil {
2324
err = errors.Wrap(err, "Query: Error while unmarshalling Event-data")
@@ -33,6 +34,7 @@ func queryInventory(
3334
}
3435
}
3536

37+
log.Println(len(filter))
3638
if len(filter) == 0 {
3739
err = errors.New("blank filter provided")
3840
err = errors.Wrap(err, "Query")
@@ -48,7 +50,9 @@ func queryInventory(
4850
}
4951
}
5052

53+
log.Println(filter)
5154
result, err := collection.Find(filter, findopts...)
55+
log.Println(result)
5256
if err != nil {
5357
err = errors.Wrap(err, "Query: Error in DeleteMany")
5458
log.Println(err)

inventory/query_timestamp.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ type TimeConstraints struct {
1818
Count int `json:"count,omitempty"`
1919
}
2020

21-
func queryTimestamp(
22-
collection *mongo.Collection,
23-
event *model.Event,
24-
) *model.KafkaResponse {
21+
func queryTimestamp(collection *mongo.Collection, event *model.Event) *model.KafkaResponse {
2522
timeCtnt := &TimeConstraints{}
2623
err := json.Unmarshal(event.Data, &timeCtnt)
2724
if err != nil {
@@ -91,11 +88,13 @@ func queryTimestamp(
9188
event.Data = eventData
9289

9390
count := timeCtnt.Count
94-
if count < 1 {
95-
count = 50
96-
} else if count > 100 {
97-
count = 100
98-
}
91+
// if count < 1 {
92+
// count = 50
93+
// } else if count == 50 {
94+
// count = 50
95+
// } else if count == 100 {
96+
// count = 100
97+
// }
9998
findopts := findopt.Limit(int64(count))
10099
return queryInventory(collection, event, findopts)
101100
}

run_test.sh

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,72 @@
1-
#!/usr/bin/env bash
1+
# #!/usr/bin/env bash
22

3-
cd test
4-
echo "===> Changing directory to \"./test\""
3+
# cd test
4+
# echo "===> Changing directory to \"./test\""
55

6-
docker-compose up -d --build --force-recreate cassandra kafka mongo
7-
rc=$?
8-
if [[ $rc != 0 ]]
9-
then exit $rc
10-
fi
6+
# docker-compose up -d --build --force-recreate cassandra kafka mongo
7+
# rc=$?
8+
# if [[ $rc != 0 ]]
9+
# then exit $rc
10+
# fi
1111

12-
function ping_cassandra() {
13-
docker exec -it cassandra /usr/bin/nodetool status | grep UN
14-
res=$?
15-
}
12+
# function ping_cassandra() {
13+
# docker exec -it cassandra /usr/bin/nodetool status | grep UN
14+
# res=$?
15+
# }
1616

17-
echo "Waiting for Cassandra to be ready."
17+
# echo "Waiting for Cassandra to be ready."
1818

19-
# For some reason, GoCql still can't connect to Cassandra even if the nodetool
20-
# shows positive results. There has to be a better way than this.
21-
max_attempts=40
22-
cur_attempts=0
23-
ping_cassandra
24-
while (( res != 0 && ++cur_attempts != max_attempts ))
25-
do
26-
ping_cassandra
27-
echo Attempt: $cur_attempts of $max_attempts
28-
sleep 1
29-
done
19+
# # For some reason, GoCql still can't connect to Cassandra even if the nodetool
20+
# # shows positive results. There has to be a better way than this.
21+
# max_attempts=40
22+
# cur_attempts=0
23+
# ping_cassandra
24+
# while (( res != 0 && ++cur_attempts != max_attempts ))
25+
# do
26+
# ping_cassandra
27+
# echo Attempt: $cur_attempts of $max_attempts
28+
# sleep 1
29+
# done
3030

31-
if (( cur_attempts == max_attempts )); then
32-
echo "Cassandra Timed Out."
33-
exit 1
34-
else
35-
echo "Cassandra response received."
36-
fi
31+
# if (( cur_attempts == max_attempts )); then
32+
# echo "Cassandra Timed Out."
33+
# exit 1
34+
# else
35+
# echo "Cassandra response received."
36+
# fi
3737

38-
echo "Waiting additional time for Cassandra to be ready."
39-
add_wait=30
40-
cur_add_wait=0
41-
while (( ++cur_add_wait != add_wait ))
42-
do
43-
echo Additional Wait: $cur_add_wait of $add_wait seconds
44-
sleep 1
45-
done
38+
# echo "Waiting additional time for Cassandra to be ready."
39+
# add_wait=30
40+
# cur_add_wait=0
41+
# while (( ++cur_add_wait != add_wait ))
42+
# do
43+
# echo Additional Wait: $cur_add_wait of $add_wait seconds
44+
# sleep 1
45+
# done
4646

47-
docker-compose up -d --build --force-recreate etcd
47+
# docker-compose up -d --build --force-recreate etcd
4848

49-
docker-compose up -d --build --force-recreate go-eventpersistence
50-
echo "Waiting for go-eventpersistence to initialize"
51-
sleep 5
49+
# docker-compose up -d --build --force-recreate go-eventpersistence
50+
# echo "Waiting for go-eventpersistence to initialize"
51+
# sleep 5
5252

53-
docker-compose up -d --build --force-recreate go-eventstore-query
54-
echo "Waiting for go-eventstore-query to initialize"
55-
sleep 5
53+
# docker-compose up -d --build --force-recreate go-eventstore-query
54+
# echo "Waiting for go-eventstore-query to initialize"
55+
# sleep 5
5656

57-
docker-compose up -d --build --force-recreate agg-inventory-cmd
58-
echo "Waiting for agg-inventory-cmd to initialize"
59-
sleep 5
57+
# docker-compose up -d --build --force-recreate agg-inventory-cmd
58+
# echo "Waiting for agg-inventory-cmd to initialize"
59+
# sleep 5
6060

61-
docker-compose up -d --build --force-recreate agg-inventory-query
62-
sleep 5
61+
# docker-compose up -d --build --force-recreate agg-inventory-query
62+
# sleep 5
6363

64-
docker ps -a
64+
# docker ps -a
6565

66-
docker-compose up --exit-code-from agg-inventory-query-test
67-
rc=$?
68-
if [[ $rc != 0 ]]
69-
docker ps -a
70-
then exit $rc
71-
fi
66+
# docker-compose up --exit-code-from agg-inventory-query-test
67+
# rc=$?
68+
# if [[ $rc != 0 ]]
69+
# docker ps -a
70+
# then exit $rc
71+
# fi
7272

0 commit comments

Comments
 (0)