Skip to content

Commit ef51031

Browse files
committed
Merge branch 'ironMann-shm__multipart_send'
2 parents 0262c1e + 774bd9a commit ef51031

File tree

3 files changed

+54
-30
lines changed

3 files changed

+54
-30
lines changed

src/ConsumerFMQchannel.cxx

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class ConsumerFMQchannel: public Consumer {
5555
int memoryPoolNumberOfPages;
5656

5757
public:
58+
std::vector<FairMQMessagePtr> messagesToSend; // collect HBF messages of each update
59+
5860

5961

6062
ConsumerFMQchannel(ConfigFile &cfg, std::string cfgEntryPoint) : Consumer(cfg,cfgEntryPoint) {
@@ -225,8 +227,8 @@ class ConsumerFMQchannel: public Consumer {
225227

226228
// create a header message
227229
//std::unique_ptr<FairMQMessage> msgHeader(transportFactory->CreateMessage((void *)stfHeader, sizeof(SubTimeframe), cleanupCallback, (void *)(blockRef)));
228-
std::unique_ptr<FairMQMessage> msgHeader(transportFactory->CreateMessage(memoryBuffer,(void *)stfHeader, sizeof(SubTimeframe), (void *)(blockRef)));
229-
sendingChannel->Send(msgHeader);
230+
assert(messagesToSend.empty());
231+
messagesToSend.emplace_back(std::move(sendingChannel->NewMessage(memoryBuffer,(void *)stfHeader, sizeof(SubTimeframe), (void *)(blockRef))));
230232
//printf("sent header %d bytes\n",(int)sizeof(SubTimeframe));
231233

232234

@@ -252,8 +254,8 @@ class ConsumerFMQchannel: public Consumer {
252254
//printf("allocating blockRef %p for %p\n",pf.blockRef,br);
253255
pendingFrames.push_back(pf);
254256
};
255-
256-
auto pendingFramesSend = [&]() {
257+
258+
auto pendingFramesCollect = [&]() {
257259
int nFrames=pendingFrames.size();
258260

259261
if (nFrames==0) {
@@ -271,8 +273,10 @@ class ConsumerFMQchannel: public Consumer {
271273
void *hint=(void *)pendingFrames[0].blockRef;
272274
//printf("block %p ix = %d : %d hint=%p\n",(void *)(&(b->data[ix])),ix,l,hint);
273275
//std::cout << typeid(pendingFrames[0].blockRef).name() << std::endl;
274-
std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage(memoryBuffer,(void *)(&(b->data[ix])),(size_t)(l),hint));
275-
sendingChannel->Send(msgBody);
276+
277+
// create and queue a fmq message
278+
messagesToSend.emplace_back(std::move(sendingChannel->NewMessage(memoryBuffer,(void *)(&(b->data[ix])),(size_t)(l),hint)));
279+
276280
//printf("sent single HB %d = %d bytes\n",pendingFrames[0].HBid,l);
277281
//printf("left to FMQ: %p\n",pendingFrames[0].blockRef);
278282

@@ -319,10 +323,9 @@ class ConsumerFMQchannel: public Consumer {
319323

320324
//std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage((void *)newBlock, totalSize, cleanupCallbackForMalloc, (void *)(newBlock)));
321325
//sendingChannel->Send(msgBody);
322-
323-
// create a fmq message
324-
std::unique_ptr<FairMQMessage> msgCopy(transportFactory->CreateMessage(memoryBuffer,(void *)newBlock, totalSize, (void *)(blockRef)));
325-
sendingChannel->Send(msgCopy);
326+
327+
// create and queue a fmq message
328+
messagesToSend.emplace_back(std::move(sendingChannel->NewMessage(memoryBuffer,(void *)newBlock, totalSize, (void *)(blockRef))));
326329

327330
//printf("sent reallocated HB %d (originally %d blocks) = %d bytes\n",pendingFrames[0].HBid,nFrames,totalSize);
328331
}
@@ -346,7 +349,7 @@ class ConsumerFMQchannel: public Consumer {
346349
pendingFramesAppend(HBstart,HBlength,lastHBid,br);
347350
}
348351
// send pending frames, if any
349-
pendingFramesSend();
352+
pendingFramesCollect();
350353

351354
// update new HB frame
352355
HBstart=offset;
@@ -361,8 +364,14 @@ class ConsumerFMQchannel: public Consumer {
361364
}
362365

363366
// purge pendingFrames
364-
pendingFramesSend();
365-
367+
pendingFramesCollect();
368+
369+
// send all the messages
370+
if (sendingChannel->Send(messagesToSend) >= 0)
371+
messagesToSend.clear();
372+
else
373+
LOG(ERROR) << "Sending failed!";
374+
366375
/*
367376
368377
std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage((void *)(&b->data[HBstart]), (size_t)(HBlength), cleanupCallback, (void *)(ptr)));

src/testRxFMQ.cxx

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <fairmq/FairMQMessage.h>
33
#include <fairmq/FairMQTransportFactory.h>
44
#include <memory>
5+
#include <vector>
56

67
int main() {
78

@@ -13,18 +14,22 @@ int main() {
1314
auto factory = FairMQTransportFactory::CreateTransportFactory(cfgTransportType);
1415
auto pull = FairMQChannel{cfgChannelName, cfgChannelType, factory};
1516
pull.Connect(cfgChannelAddress);
17+
int64_t ret;
1618

1719
for (;;) {
18-
auto msg = pull.NewMessage();
19-
if (pull.ReceiveAsync(msg)>0) {
20-
if (msg->GetSize()==0) {continue;}
21-
int sz=(int)msg->GetSize();
22-
void *data=msg->GetData();
23-
printf("Received message %p size %d\n",data,sz);
24-
sleep(1);
25-
printf("Releasing message %p size %d\n",data,sz);
20+
std::vector<FairMQMessagePtr> msgs;
21+
22+
ret = pull.Receive(msgs);
23+
if (ret > 0) {
24+
for (auto &msg : msgs) {
25+
int sz=(int)msg->GetSize();
26+
void *data=msg->GetData();
27+
printf("Received message %p size %d\n",data,sz);
28+
printf("Releasing message %p size %d\n",data,sz);
29+
}
2630
} else {
27-
usleep(1000);
31+
printf("Error while receiving messages %d\n", (int)ret);
32+
return -1;
2833
}
2934
}
3035

src/testTxFMQ.cxx

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ int main() {
1616
channel.Bind(cfgChannelAddress);
1717
if (!channel.ValidateChannel()) { return -1; }
1818

19-
int bufferSize=100*1024*1024;
19+
const size_t bufferSize=100*1024*1024;
2020
auto memoryBuffer=channel.Transport()->CreateUnmanagedRegion(
2121
bufferSize,
2222
[](void* data, size_t size, void* hint) {
@@ -26,13 +26,23 @@ int main() {
2626
);
2727
printf("Created buffer %p size %ld\n",memoryBuffer->GetData(),memoryBuffer->GetSize());
2828
size_t msgSize=100;
29-
for (int ix=0;ix<bufferSize;ix+=msgSize) {
30-
void *dataPtr=(void *)(&((char *)memoryBuffer->GetData())[ix]);
31-
void *hint=(void *)ix;
32-
std::unique_ptr<FairMQMessage> msg(transportFactory->CreateMessage(memoryBuffer,dataPtr,msgSize,hint));
33-
printf("send %p : %ld bytes hint=%p\n",dataPtr,msgSize,hint);
34-
channel.Send(msg);
35-
usleep(3000000);
29+
30+
size_t ix=0;
31+
while (ix<(bufferSize-msgSize)) {
32+
// send random number of messages in one multipart [1-50]
33+
int nmsgs = (rand()%50)+1;
34+
nmsgs = std::min(nmsgs, (int)((bufferSize-ix)/msgSize));
35+
36+
std::vector<FairMQMessagePtr> msgs;
37+
for (int im=0; im < nmsgs; im++, ix+=msgSize) {
38+
void *dataPtr=(void *)(&((char *)memoryBuffer->GetData())[ix]);
39+
void *hint=(void *)ix;
40+
msgs.emplace_back(transportFactory->CreateMessage(memoryBuffer,dataPtr,msgSize,hint));
41+
printf("send %p : %ld bytes hint=%p\n",dataPtr,msgSize,hint);
42+
}
43+
printf("* sending %lu messages\n",msgs.size());
44+
channel.Send(msgs);
45+
sleep(2);
3646
}
3747

3848
return 0;

0 commit comments

Comments
 (0)