-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProducer.cpp
More file actions
105 lines (83 loc) · 3.42 KB
/
Producer.cpp
File metadata and controls
105 lines (83 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* File: Producer.cpp
* Author: debian
*
* Created on 15. Juni 2017, 17:07
*/
#include "Producer.h"
Producer::Producer(const char* id, const char* const addr, const char** products, unsigned int n) {
this->client = new MQTTClient(id, addr, this);
this->products = products;
this->productCount = n;
this->loopCount = 0;
this->stopProducer = false;
}
Producer::~Producer() {
delete client;
}
void Producer::start() {
for(int i = 0; i < productCount; i++) {
if(char* channel = (char*)malloc(strlen(products[i])+ strlen(DEMAND_PREFIX)+1)) {
strcpy(channel, DEMAND_PREFIX);
strcat(channel, products[i]);
client->subscribe(channel);
free(channel);
}
}
client->subscribe(ORDER_CHANNEL);
while(!stopProducer) {
client->loop();
if(!(loopCount % OFFER_DELAY) && rand() < RAND_MAX/2) {
const char* item = products[rand()%productCount];
if(char* channel = (char*)malloc(strlen(OFFER_PREFIX)+strlen(item)+1)) {
strcpy(channel, OFFER_PREFIX);
strcat(channel, item);
MQTTClient::Offer offer;
strncpy(offer.item, item, MAX_ITEM_LEN);
offer.price = 60 + rand()%91;
offer.amount = 50 + rand()%151;
client->publish(channel, &offer, sizeof(offer));
saveOffer(&offer);
free(channel);
//printf("Generated offer for %d %s at %d each\n", offer.amount, offer.item, offer.price);
}
}
loopCount++;
//printf("Producer alive\n");
}
client->disconnect();
}
void Producer::stop() {
stopProducer = true;
}
void Producer::on_mosqEvent(const char* channel, const void* msg) {
MQTTClient::Offer* offer = (MQTTClient::Offer*)msg;
//printf("%d times %s for %d each\n", offer->amount, offer->item, offer->price);
if(strstr(channel, DEMAND_PREFIX)) {
const char* item = channel + strlen(DEMAND_PREFIX);
if(!strcmp(item, offer->item) && 10+rand()%21+offer->price/5 > 50) {
if(char* responseChannel = (char*)malloc(strlen(item)+strlen(OFFER_PREFIX)+1)) {
strcpy(responseChannel, OFFER_PREFIX);
strcat(responseChannel, item);
client->publish(responseChannel, offer, sizeof(*offer));
//printf("Created offer for %d %s responding to demand\n", offer->amount, offer->item);
saveOffer(offer);
free(responseChannel);
}
}
} else if(strstr(channel, ORDER_CHANNEL)) {
//printf("Order for %s received\n", offer->item);
if(!strncmp(offer->item, lastOffer.item, MAX_ITEM_LEN) && offer->price >= lastOffer.price && offer->amount <= lastOffer.amount) {
client->publish(CONFIRM_CHANNEL, offer, sizeof(*offer));
*lastOffer.item = 0;
//printf("Order for %d %s accepted\n", offer->amount, offer->item);
} else {
//printf("Order for %d %s ignored\n", offer->amount, offer->item);
}
}
}
void Producer::saveOffer(MQTTClient::Offer* offer) {
strncpy(lastOffer.item, offer->item, MAX_ITEM_LEN);
lastOffer.price = offer->price;
lastOffer.amount = offer->amount;
}