Skip to content

Commit 0262c1e

Browse files
authored
Merge pull request #13 from Barthelemy/fixes-for-fairmq
Fixes for latest FairMQ
2 parents a4a5a18 + 258f7ea commit 0262c1e

File tree

5 files changed

+105
-115
lines changed

5 files changed

+105
-115
lines changed

src/ConsumerFMQ.cxx

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include <fairmq/FairMQDevice.h>
77
#include <fairmq/FairMQMessage.h>
88
#include <fairmq/FairMQTransportFactory.h>
9-
#include <fairmq/zeromq/FairMQTransportFactoryZMQ.h>
109

1110

1211
class FMQSender : public FairMQDevice
@@ -16,8 +15,8 @@ class FMQSender : public FairMQDevice
1615
FMQSender() { }
1716
~FMQSender() { }
1817

19-
protected:
20-
18+
protected:
19+
2120
void Run() override {
2221
while (CheckCurrentState(RUNNING)) {
2322
//printf("loop Run()\n");
@@ -45,39 +44,39 @@ class ConsumerFMQ: public Consumer {
4544
FMQSender sender;
4645

4746

48-
// todo: check why this type is not public in FMQ interface?
49-
typedef std::unordered_map<std::string, std::vector<FairMQChannel>> FairMQMap;
47+
// todo: check why this type is not public in FMQ interface?
48+
typedef std::unordered_map<std::string, std::vector<FairMQChannel>> FairMQMap;
5049
FairMQMap m;
51-
52-
FairMQTransportFactory *transportFactory;
53-
54-
public:
50+
51+
std::shared_ptr<FairMQTransportFactory> transportFactory;
52+
53+
public:
5554

5655

5756
ConsumerFMQ(ConfigFile &cfg, std::string cfgEntryPoint) : Consumer(cfg,cfgEntryPoint), channels(1) {
58-
57+
5958
channels[0].UpdateType("pair"); // pub or push?
6059
channels[0].UpdateMethod("bind");
6160
channels[0].UpdateAddress("ipc:///tmp/readout-pipe-0");
62-
channels[0].UpdateRateLogging(0);
63-
channels[0].UpdateSndBufSize(10);
61+
channels[0].UpdateRateLogging(0);
62+
channels[0].UpdateSndBufSize(10);
6463
if (!channels[0].ValidateChannel()) {
6564
throw "ConsumerFMQ: channel validation failed";
6665
}
6766

6867

6968
// todo: def "data-out" as const string to name output channel to which we will push
7069
m.emplace(std::string("data-out"),channels);
71-
70+
7271
for (auto it : m) {
7372
std::cout << it.first << " = " << it.second.size() << " channels " << std::endl;
7473
for (auto ch : it.second) {
7574
std::cout << ch.GetAddress() <<std::endl;
7675
}
7776
}
7877

79-
transportFactory = new FairMQTransportFactoryZMQ();
80-
78+
transportFactory = FairMQTransportFactory::CreateTransportFactory("zeromq");
79+
8180
sender.fChannels = m;
8281
sender.SetTransport("zeromq");
8382
sender.ChangeState(FairMQStateMachine::Event::INIT_DEVICE);
@@ -88,18 +87,16 @@ class ConsumerFMQ: public Consumer {
8887

8988
// sender.InteractiveStateLoop();
9089
}
91-
90+
9291
~ConsumerFMQ() {
9392
sender.ChangeState(FairMQStateMachine::Event::STOP);
9493
sender.ChangeState(FairMQStateMachine::Event::RESET_TASK);
9594
sender.WaitForEndOfState(FairMQStateMachine::Event::RESET_TASK);
9695
sender.ChangeState(FairMQStateMachine::Event::RESET_DEVICE);
9796
sender.WaitForEndOfState(FairMQStateMachine::Event::RESET_DEVICE);
9897
sender.ChangeState(FairMQStateMachine::Event::END);
99-
100-
delete transportFactory;
10198
}
102-
99+
103100
int pushData(DataBlockContainerReference &b) {
104101

105102
// we create a copy of the reference, in a newly allocated object, so that reference is kept alive until this new object is destroyed in the cleanupCallback
@@ -109,7 +106,7 @@ class ConsumerFMQ: public Consumer {
109106

110107
sender.fChannels.at("data-out").at(0).Send(msgHeader);
111108
sender.fChannels.at("data-out").at(0).Send(msgBody);
112-
109+
113110

114111

115112
// how to know if it was a success?
@@ -120,7 +117,7 @@ class ConsumerFMQ: public Consumer {
120117

121118
// use multipart?
122119
// channels.at("data-out").at(0).SendPart(msgBody);
123-
120+
124121
return 0;
125122
}
126123
private:

0 commit comments

Comments
 (0)