Skip to content

Commit 6c81ec3

Browse files
committed
Switch to FairMQ SHM multi-part to implement the O2 interface
Performance optimization change only, the interface is unchanged
1 parent eb656e7 commit 6c81ec3

File tree

3 files changed

+54
-31
lines changed

3 files changed

+54
-31
lines changed

src/ConsumerFMQchannel.cxx

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class ConsumerFMQchannel: public Consumer {
5555
int memoryPoolPageSize;
5656
int memoryPoolNumberOfPages;
5757

58+
std::vector<FairMQMessagePtr> messagesToSend; // collect HBF messages of each update
59+
5860
public:
5961

6062

@@ -226,8 +228,8 @@ class ConsumerFMQchannel: public Consumer {
226228

227229
// create a header message
228230
//std::unique_ptr<FairMQMessage> msgHeader(transportFactory->CreateMessage((void *)stfHeader, sizeof(SubTimeframe), cleanupCallback, (void *)(blockRef)));
229-
std::unique_ptr<FairMQMessage> msgHeader(transportFactory->CreateMessage(memoryBuffer,(void *)stfHeader, sizeof(SubTimeframe), (void *)(blockRef)));
230-
sendingChannel->Send(msgHeader);
231+
assert(messagesToSend.empty());
232+
messagesToSend.emplace_back(std::move(sendingChannel->NewMessage(memoryBuffer,(void *)stfHeader, sizeof(SubTimeframe), (void *)(blockRef))));
231233
//printf("sent header %d bytes\n",(int)sizeof(SubTimeframe));
232234

233235

@@ -254,7 +256,7 @@ class ConsumerFMQchannel: public Consumer {
254256
pendingFrames.push_back(pf);
255257
};
256258

257-
auto pendingFramesSend = [&]() {
259+
auto pendingFramesCollect = [&]() {
258260
int nFrames=pendingFrames.size();
259261

260262
if (nFrames==0) {
@@ -272,8 +274,9 @@ class ConsumerFMQchannel: public Consumer {
272274
void *hint=(void *)pendingFrames[0].blockRef;
273275
//printf("block %p ix = %d : %d hint=%p\n",(void *)(&(b->data[ix])),ix,l,hint);
274276
//std::cout << typeid(pendingFrames[0].blockRef).name() << std::endl;
275-
std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage(memoryBuffer,(void *)(&(b->data[ix])),(size_t)(l),hint));
276-
sendingChannel->Send(msgBody);
277+
278+
// create and queue a fmq message
279+
messagesToSend.emplace_back(std::move(sendingChannel->NewMessage(memoryBuffer,(void *)(&(b->data[ix])),(size_t)(l),hint)));
277280
//printf("sent single HB %d = %d bytes\n",pendingFrames[0].HBid,l);
278281
//printf("left to FMQ: %p\n",pendingFrames[0].blockRef);
279282

@@ -321,9 +324,8 @@ class ConsumerFMQchannel: public Consumer {
321324
//std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage((void *)newBlock, totalSize, cleanupCallbackForMalloc, (void *)(newBlock)));
322325
//sendingChannel->Send(msgBody);
323326

324-
// create a fmq message
325-
std::unique_ptr<FairMQMessage> msgCopy(transportFactory->CreateMessage(memoryBuffer,(void *)newBlock, totalSize, (void *)(blockRef)));
326-
sendingChannel->Send(msgCopy);
327+
// create and queue a fmq message
328+
messagesToSend.emplace_back(std::move(sendingChannel->NewMessage(memoryBuffer,(void *)newBlock, totalSize, (void *)(blockRef))));
327329

328330
//printf("sent reallocated HB %d (originally %d blocks) = %d bytes\n",pendingFrames[0].HBid,nFrames,totalSize);
329331
}
@@ -347,7 +349,7 @@ class ConsumerFMQchannel: public Consumer {
347349
pendingFramesAppend(HBstart,HBlength,lastHBid,br);
348350
}
349351
// send pending frames, if any
350-
pendingFramesSend();
352+
pendingFramesCollect();
351353

352354
// update new HB frame
353355
HBstart=offset;
@@ -362,7 +364,13 @@ class ConsumerFMQchannel: public Consumer {
362364
}
363365

364366
// purge pendingFrames
365-
pendingFramesSend();
367+
pendingFramesCollect();
368+
369+
// send all the messages
370+
if (sendingChannel->Send(messagesToSend) >= 0)
371+
messagesToSend.clear();
372+
else
373+
LOG(ERROR) << "Sending failed!";
366374

367375
/*
368376

src/testRxFMQ.cxx

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <fairmq/FairMQTransportFactory.h>
44
#include <fairmq/zeromq/FairMQTransportFactoryZMQ.h>
55
#include <memory>
6+
#include <vector>
67

78
int main() {
89

@@ -13,19 +14,23 @@ int main() {
1314

1415
auto factory = FairMQTransportFactory::CreateTransportFactory(cfgTransportType);
1516
auto pull = FairMQChannel{cfgChannelName, cfgChannelType, factory};
16-
pull.Connect(cfgChannelAddress);
17+
pull.Connect(cfgChannelAddress);
18+
int64_t ret;
1719

1820
for (;;) {
19-
auto msg = pull.NewMessage();
20-
if (pull.ReceiveAsync(msg)>0) {
21-
if (msg->GetSize()==0) {continue;}
22-
int sz=(int)msg->GetSize();
23-
void *data=msg->GetData();
24-
printf("Received message %p size %d\n",data,sz);
25-
sleep(1);
26-
printf("Releasing message %p size %d\n",data,sz);
21+
std::vector<FairMQMessagePtr> msgs;
22+
23+
ret = pull.Receive(msgs);
24+
if (ret > 0) {
25+
for (auto &msg : msgs) {
26+
int sz=(int)msg->GetSize();
27+
void *data=msg->GetData();
28+
printf("Received message %p size %d\n",data,sz);
29+
printf("Releasing message %p size %d\n",data,sz);
30+
}
2731
} else {
28-
usleep(1000);
32+
printf("Error while receiving messages %d\n", (int)ret);
33+
return -1;
2934
}
3035
}
3136

src/testTxFMQ.cxx

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,39 @@ int main() {
1111
std::string cfgChannelType="pair";
1212
std::string cfgChannelAddress="ipc:///tmp/test-pipe";
1313

14-
14+
1515
auto transportFactory=FairMQTransportFactory::CreateTransportFactory(cfgTransportType);
1616
auto channel=FairMQChannel{cfgChannelName, cfgChannelType, transportFactory};
1717
channel.Bind(cfgChannelAddress);
1818
if (!channel.ValidateChannel()) { return -1; }
19-
20-
int bufferSize=100*1024*1024;
19+
20+
const size_t bufferSize=100*1024*1024;
2121
auto memoryBuffer=channel.Transport()->CreateUnmanagedRegion(
2222
bufferSize,
2323
[](void* data, size_t size, void* hint) {
2424
// cleanup callback
25-
printf("ack %p (size %d) hint=%p\n",data,(int)size,hint);
25+
printf("ack %p (size %d) hint=%p\n",data,(int)size,hint);
2626
}
2727
);
2828
printf("Created buffer %p size %ld\n",memoryBuffer->GetData(),memoryBuffer->GetSize());
2929
size_t msgSize=100;
30-
for (int ix=0;ix<bufferSize;ix+=msgSize) {
31-
void *dataPtr=(void *)(&((char *)memoryBuffer->GetData())[ix]);
32-
void *hint=(void *)ix;
33-
std::unique_ptr<FairMQMessage> msg(transportFactory->CreateMessage(memoryBuffer,dataPtr,msgSize,hint));
34-
printf("send %p : %ld bytes hint=%p\n",dataPtr,msgSize,hint);
35-
channel.Send(msg);
36-
usleep(3000000);
30+
31+
size_t ix=0;
32+
while (ix<(bufferSize-msgSize)) {
33+
// send random number of messages in one multipart [1-50]
34+
int nmsgs = (rand()%50)+1;
35+
nmsgs = std::min(nmsgs, (int)((bufferSize-ix)/msgSize));
36+
37+
std::vector<FairMQMessagePtr> msgs;
38+
for (int im=0; im < nmsgs; im++, ix+=msgSize) {
39+
void *dataPtr=(void *)(&((char *)memoryBuffer->GetData())[ix]);
40+
void *hint=(void *)ix;
41+
msgs.emplace_back(transportFactory->CreateMessage(memoryBuffer,dataPtr,msgSize,hint));
42+
printf("send %p : %ld bytes hint=%p\n",dataPtr,msgSize,hint);
43+
}
44+
printf("* sending %lu messages\n",msgs.size());
45+
channel.Send(msgs);
46+
sleep(2);
3747
}
3848

3949
return 0;

0 commit comments

Comments
 (0)