Skip to content

Commit 3c44a2c

Browse files
Merge pull request #3 from utaal/thread-safe-flags
Replace the flags on QueuePair with a separate class: this makes the api fully thread-safe again
2 parents 13f9316 + e714ae6 commit 3c44a2c

File tree

2 files changed

+66
-75
lines changed

2 files changed

+66
-75
lines changed

src/infinity/queues/QueuePair.cpp

Lines changed: 41 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,20 @@
2121
namespace infinity {
2222
namespace queues {
2323

24+
int OperationFlags::ibvFlags() {
25+
int flags = 0;
26+
if (fenced) {
27+
flags |= IBV_SEND_FENCE;
28+
}
29+
if (signaled) {
30+
flags |= IBV_SEND_SIGNALED;
31+
}
32+
if (inlined) {
33+
flags |= IBV_SEND_INLINE;
34+
}
35+
return flags;
36+
}
37+
2438
QueuePair::QueuePair(infinity::core::Context* context) :
2539
context(context) {
2640

@@ -57,8 +71,6 @@ QueuePair::QueuePair(infinity::core::Context* context) :
5771

5872
this->userData = NULL;
5973
this->userDataSize = 0;
60-
61-
this->defaultFlags = 0;
6274
}
6375

6476
QueuePair::~QueuePair() {
@@ -130,14 +142,15 @@ uint32_t QueuePair::getSequenceNumber() {
130142
}
131143

132144
void QueuePair::send(infinity::memory::Buffer* buffer, infinity::requests::RequestToken *requestToken) {
133-
send(buffer, 0, buffer->getSizeInBytes(), requestToken);
145+
send(buffer, 0, buffer->getSizeInBytes(), OperationFlags(), requestToken);
134146
}
135147

136148
void QueuePair::send(infinity::memory::Buffer* buffer, uint32_t sizeInBytes, infinity::requests::RequestToken *requestToken) {
137-
send(buffer, 0, sizeInBytes, requestToken);
149+
send(buffer, 0, sizeInBytes, OperationFlags(), requestToken);
138150
}
139151

140-
void QueuePair::send(infinity::memory::Buffer* buffer, uint64_t localOffset, uint32_t sizeInBytes, infinity::requests::RequestToken *requestToken) {
152+
void QueuePair::send(infinity::memory::Buffer* buffer, uint64_t localOffset, uint32_t sizeInBytes, OperationFlags send_flags,
153+
infinity::requests::RequestToken *requestToken) {
141154

142155
if (requestToken != NULL) {
143156
requestToken->reset();
@@ -161,7 +174,7 @@ void QueuePair::send(infinity::memory::Buffer* buffer, uint64_t localOffset, uin
161174
workRequest.sg_list = &sgElement;
162175
workRequest.num_sge = 1;
163176
workRequest.opcode = IBV_WR_SEND;
164-
workRequest.send_flags = defaultFlags;
177+
workRequest.send_flags = send_flags.ibvFlags();
165178
if (requestToken != NULL) {
166179
workRequest.send_flags |= IBV_SEND_SIGNALED;
167180
}
@@ -175,7 +188,7 @@ void QueuePair::send(infinity::memory::Buffer* buffer, uint64_t localOffset, uin
175188
}
176189

177190
void QueuePair::sendWithImmediate(infinity::memory::Buffer* buffer, uint64_t localOffset, uint32_t sizeInBytes, uint32_t immediateValue,
178-
infinity::requests::RequestToken* requestToken) {
191+
OperationFlags send_flags, infinity::requests::RequestToken* requestToken) {
179192

180193
if (requestToken != NULL) {
181194
requestToken->reset();
@@ -201,7 +214,7 @@ void QueuePair::sendWithImmediate(infinity::memory::Buffer* buffer, uint64_t loc
201214
workRequest.num_sge = 1;
202215
workRequest.opcode = IBV_WR_SEND_WITH_IMM;
203216
workRequest.imm_data = htonl(immediateValue);
204-
workRequest.send_flags = defaultFlags;
217+
workRequest.send_flags = send_flags.ibvFlags();
205218
if (requestToken != NULL) {
206219
workRequest.send_flags |= IBV_SEND_SIGNALED;
207220
}
@@ -215,17 +228,17 @@ void QueuePair::sendWithImmediate(infinity::memory::Buffer* buffer, uint64_t loc
215228
}
216229

217230
void QueuePair::write(infinity::memory::Buffer* buffer, infinity::memory::RegionToken* destination, infinity::requests::RequestToken *requestToken) {
218-
write(buffer, 0, destination, 0, buffer->getSizeInBytes(), requestToken);
231+
write(buffer, 0, destination, 0, buffer->getSizeInBytes(), OperationFlags(), requestToken);
219232
INFINITY_ASSERT(buffer->getSizeInBytes() <= ((uint64_t) UINT32_MAX), "[INFINITY][QUEUES][QUEUEPAIR] Request must be smaller or equal to UINT_32_MAX bytes. This memory region is larger. Please explicitly indicate the size of the data to transfer.\n");
220233
}
221234

222235
void QueuePair::write(infinity::memory::Buffer* buffer, infinity::memory::RegionToken* destination, uint32_t sizeInBytes,
223236
infinity::requests::RequestToken *requestToken) {
224-
write(buffer, 0, destination, 0, sizeInBytes, requestToken);
237+
write(buffer, 0, destination, 0, sizeInBytes, OperationFlags(), requestToken);
225238
}
226239

227240
void QueuePair::write(infinity::memory::Buffer* buffer, uint64_t localOffset, infinity::memory::RegionToken* destination, uint64_t remoteOffset,
228-
uint32_t sizeInBytes, infinity::requests::RequestToken *requestToken) {
241+
uint32_t sizeInBytes, OperationFlags send_flags, infinity::requests::RequestToken *requestToken) {
229242

230243
if (requestToken != NULL) {
231244
requestToken->reset();
@@ -249,7 +262,7 @@ void QueuePair::write(infinity::memory::Buffer* buffer, uint64_t localOffset, in
249262
workRequest.sg_list = &sgElement;
250263
workRequest.num_sge = 1;
251264
workRequest.opcode = IBV_WR_RDMA_WRITE;
252-
workRequest.send_flags = defaultFlags;
265+
workRequest.send_flags = send_flags.ibvFlags();
253266
if (requestToken != NULL) {
254267
workRequest.send_flags |= IBV_SEND_SIGNALED;
255268
}
@@ -268,7 +281,7 @@ void QueuePair::write(infinity::memory::Buffer* buffer, uint64_t localOffset, in
268281
}
269282

270283
void QueuePair::writeWithImmediate(infinity::memory::Buffer* buffer, uint64_t localOffset, infinity::memory::RegionToken* destination, uint64_t remoteOffset,
271-
uint32_t sizeInBytes, uint32_t immediateValue, infinity::requests::RequestToken* requestToken) {
284+
uint32_t sizeInBytes, uint32_t immediateValue, OperationFlags send_flags, infinity::requests::RequestToken* requestToken) {
272285

273286
if (requestToken != NULL) {
274287
requestToken->reset();
@@ -294,7 +307,7 @@ void QueuePair::writeWithImmediate(infinity::memory::Buffer* buffer, uint64_t lo
294307
workRequest.num_sge = 1;
295308
workRequest.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
296309
workRequest.imm_data = htonl(immediateValue);
297-
workRequest.send_flags = defaultFlags;
310+
workRequest.send_flags = send_flags.ibvFlags();
298311
if (requestToken != NULL) {
299312
workRequest.send_flags |= IBV_SEND_SIGNALED;
300313
}
@@ -313,7 +326,7 @@ void QueuePair::writeWithImmediate(infinity::memory::Buffer* buffer, uint64_t lo
313326
}
314327

315328
void QueuePair::multiWrite(infinity::memory::Buffer** buffers, uint32_t* sizesInBytes, uint64_t* localOffsets, uint32_t numberOfElements,
316-
infinity::memory::RegionToken* destination, uint64_t remoteOffset, infinity::requests::RequestToken* requestToken) {
329+
infinity::memory::RegionToken* destination, uint64_t remoteOffset, OperationFlags send_flags, infinity::requests::RequestToken* requestToken) {
317330

318331
if (requestToken != NULL) {
319332
requestToken->reset();
@@ -347,7 +360,7 @@ void QueuePair::multiWrite(infinity::memory::Buffer** buffers, uint32_t* sizesIn
347360
workRequest.sg_list = sgElements;
348361
workRequest.num_sge = numberOfElements;
349362
workRequest.opcode = IBV_WR_RDMA_WRITE;
350-
workRequest.send_flags = defaultFlags;
363+
workRequest.send_flags = send_flags.ibvFlags();
351364
if (requestToken != NULL) {
352365
workRequest.send_flags |= IBV_SEND_SIGNALED;
353366
}
@@ -365,7 +378,7 @@ void QueuePair::multiWrite(infinity::memory::Buffer** buffers, uint32_t* sizesIn
365378
}
366379

367380
void QueuePair::multiWriteWithImmediate(infinity::memory::Buffer** buffers, uint32_t* sizesInBytes, uint64_t* localOffsets, uint32_t numberOfElements,
368-
infinity::memory::RegionToken* destination, uint64_t remoteOffset, uint32_t immediateValue, infinity::requests::RequestToken* requestToken) {
381+
infinity::memory::RegionToken* destination, uint64_t remoteOffset, uint32_t immediateValue, OperationFlags send_flags, infinity::requests::RequestToken* requestToken) {
369382

370383
if (requestToken != NULL) {
371384
requestToken->reset();
@@ -401,7 +414,7 @@ void QueuePair::multiWriteWithImmediate(infinity::memory::Buffer** buffers, uint
401414
workRequest.num_sge = numberOfElements;
402415
workRequest.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
403416
workRequest.imm_data = htonl(immediateValue);
404-
workRequest.send_flags = defaultFlags;
417+
workRequest.send_flags = send_flags.ibvFlags();
405418
if (requestToken != NULL) {
406419
workRequest.send_flags |= IBV_SEND_SIGNALED;
407420
}
@@ -420,17 +433,17 @@ void QueuePair::multiWriteWithImmediate(infinity::memory::Buffer** buffers, uint
420433
}
421434

422435
void QueuePair::read(infinity::memory::Buffer* buffer, infinity::memory::RegionToken* source, infinity::requests::RequestToken *requestToken) {
423-
read(buffer, 0, source, 0, buffer->getSizeInBytes(), requestToken);
436+
read(buffer, 0, source, 0, buffer->getSizeInBytes(), OperationFlags(), requestToken);
424437
INFINITY_ASSERT(buffer->getSizeInBytes() <= ((uint64_t) UINT32_MAX), "[INFINITY][QUEUES][QUEUEPAIR] Request must be smaller or equal to UINT_32_MAX bytes. This memory region is larger. Please explicitly indicate the size of the data to transfer.\n");
425438
}
426439

427440
void QueuePair::read(infinity::memory::Buffer* buffer, infinity::memory::RegionToken* source, uint32_t sizeInBytes,
428441
infinity::requests::RequestToken *requestToken) {
429-
read(buffer, 0, source, 0, sizeInBytes, requestToken);
442+
read(buffer, 0, source, 0, sizeInBytes, OperationFlags(), requestToken);
430443
}
431444

432445
void QueuePair::read(infinity::memory::Buffer* buffer, uint64_t localOffset, infinity::memory::RegionToken* source, uint64_t remoteOffset, uint32_t sizeInBytes,
433-
infinity::requests::RequestToken *requestToken) {
446+
OperationFlags send_flags, infinity::requests::RequestToken *requestToken) {
434447

435448
if (requestToken != NULL) {
436449
requestToken->reset();
@@ -454,7 +467,7 @@ void QueuePair::read(infinity::memory::Buffer* buffer, uint64_t localOffset, inf
454467
workRequest.sg_list = &sgElement;
455468
workRequest.num_sge = 1;
456469
workRequest.opcode = IBV_WR_RDMA_READ;
457-
workRequest.send_flags = defaultFlags;
470+
workRequest.send_flags = send_flags.ibvFlags();
458471
if (requestToken != NULL) {
459472
workRequest.send_flags |= IBV_SEND_SIGNALED;
460473
}
@@ -473,7 +486,7 @@ void QueuePair::read(infinity::memory::Buffer* buffer, uint64_t localOffset, inf
473486
}
474487

475488
void QueuePair::compareAndSwap(infinity::memory::RegionToken* destination, infinity::memory::Atomic* previousValue, uint64_t compare, uint64_t swap,
476-
infinity::requests::RequestToken *requestToken) {
489+
OperationFlags send_flags, infinity::requests::RequestToken *requestToken) {
477490

478491
if (requestToken != NULL) {
479492
requestToken->reset();
@@ -494,7 +507,7 @@ void QueuePair::compareAndSwap(infinity::memory::RegionToken* destination, infin
494507
workRequest.sg_list = &sgElement;
495508
workRequest.num_sge = 1;
496509
workRequest.opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
497-
workRequest.send_flags = defaultFlags;
510+
workRequest.send_flags = send_flags.ibvFlags();
498511
if (requestToken != NULL) {
499512
workRequest.send_flags |= IBV_SEND_SIGNALED;
500513
}
@@ -512,15 +525,15 @@ void QueuePair::compareAndSwap(infinity::memory::RegionToken* destination, infin
512525
}
513526

514527
void QueuePair::compareAndSwap(infinity::memory::RegionToken* destination, uint64_t compare, uint64_t swap, infinity::requests::RequestToken *requestToken) {
515-
compareAndSwap(destination, context->defaultAtomic, compare, swap, requestToken);
528+
compareAndSwap(destination, context->defaultAtomic, compare, swap, OperationFlags(), requestToken);
516529
}
517530

518531
void QueuePair::fetchAndAdd(infinity::memory::RegionToken* destination, uint64_t add, infinity::requests::RequestToken *requestToken) {
519-
fetchAndAdd(destination, context->defaultAtomic, add, requestToken);
532+
fetchAndAdd(destination, context->defaultAtomic, add, OperationFlags(), requestToken);
520533
}
521534

522535
void QueuePair::fetchAndAdd(infinity::memory::RegionToken* destination, infinity::memory::Atomic* previousValue, uint64_t add,
523-
infinity::requests::RequestToken *requestToken) {
536+
OperationFlags send_flags, infinity::requests::RequestToken *requestToken) {
524537

525538
if (requestToken != NULL) {
526539
requestToken->reset();
@@ -541,7 +554,7 @@ void QueuePair::fetchAndAdd(infinity::memory::RegionToken* destination, infinity
541554
workRequest.sg_list = &sgElement;
542555
workRequest.num_sge = 1;
543556
workRequest.opcode = IBV_WR_ATOMIC_FETCH_AND_ADD;
544-
workRequest.send_flags = defaultFlags;
557+
workRequest.send_flags = send_flags.ibvFlags();
545558
if (requestToken != NULL) {
546559
workRequest.send_flags |= IBV_SEND_SIGNALED;
547560
}
@@ -557,29 +570,6 @@ void QueuePair::fetchAndAdd(infinity::memory::RegionToken* destination, infinity
557570

558571
}
559572

560-
void QueuePair::enableFencedOperations() {
561-
defaultFlags |= IBV_SEND_FENCE;
562-
}
563-
564-
void QueuePair::disableFencedOperations() {
565-
defaultFlags &= (~IBV_SEND_FENCE);
566-
}
567-
568-
void QueuePair::enableSignaledOperations() {
569-
defaultFlags |= IBV_SEND_SIGNALED;
570-
}
571-
572-
void QueuePair::disableSignaledOperations() {
573-
defaultFlags &= (~IBV_SEND_SIGNALED);
574-
}
575-
576-
void QueuePair::enableInlinedOperations() {
577-
defaultFlags |= IBV_SEND_INLINE;
578-
}
579-
580-
void QueuePair::disableInlinedOperations() {
581-
defaultFlags &= (~IBV_SEND_INLINE);
582-
}
583573

584574

585575
bool QueuePair::hasUserData() {

src/infinity/queues/QueuePair.h

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ class QueuePairFactory;
2626
namespace infinity {
2727
namespace queues {
2828

29+
class OperationFlags {
30+
31+
public:
32+
bool fenced;
33+
bool signaled;
34+
bool inlined;
35+
36+
OperationFlags() : fenced(false), signaled(false), inlined(false) { };
37+
38+
/**
39+
* Turn the bools into a bit field.
40+
*/
41+
int ibvFlags();
42+
};
43+
2944
class QueuePair {
3045

3146
friend class infinity::queues::QueuePairFactory;
@@ -71,19 +86,6 @@ class QueuePair {
7186
uint32_t getQueuePairNumber();
7287
uint32_t getSequenceNumber();
7388

74-
public:
75-
76-
/**
77-
* Modification of flags
78-
*/
79-
80-
void enableFencedOperations();
81-
void disableFencedOperations();
82-
void enableSignaledOperations();
83-
void disableSignaledOperations();
84-
void enableInlinedOperations();
85-
void disableInlinedOperations();
86-
8789
public:
8890

8991
/**
@@ -92,19 +94,20 @@ class QueuePair {
9294

9395
void send(infinity::memory::Buffer *buffer, infinity::requests::RequestToken *requestToken = NULL);
9496
void send(infinity::memory::Buffer *buffer, uint32_t sizeInBytes, infinity::requests::RequestToken *requestToken = NULL);
95-
void send(infinity::memory::Buffer *buffer, uint64_t localOffset, uint32_t sizeInBytes, infinity::requests::RequestToken *requestToken = NULL);
97+
void send(infinity::memory::Buffer *buffer, uint64_t localOffset, uint32_t sizeInBytes, OperationFlags flags,
98+
infinity::requests::RequestToken *requestToken = NULL);
9699

97100
void write(infinity::memory::Buffer *buffer, infinity::memory::RegionToken *destination, infinity::requests::RequestToken *requestToken = NULL);
98101
void write(infinity::memory::Buffer *buffer, infinity::memory::RegionToken *destination, uint32_t sizeInBytes,
99102
infinity::requests::RequestToken *requestToken = NULL);
100103
void write(infinity::memory::Buffer *buffer, uint64_t localOffset, infinity::memory::RegionToken *destination, uint64_t remoteOffset, uint32_t sizeInBytes,
101-
infinity::requests::RequestToken *requestToken = NULL);
104+
OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
102105

103106
void read(infinity::memory::Buffer *buffer, infinity::memory::RegionToken *source, infinity::requests::RequestToken *requestToken = NULL);
104107
void read(infinity::memory::Buffer *buffer, infinity::memory::RegionToken *source, uint32_t sizeInBytes, infinity::requests::RequestToken *requestToken =
105108
NULL);
106109
void read(infinity::memory::Buffer *buffer, uint64_t localOffset, infinity::memory::RegionToken *source, uint64_t remoteOffset, uint32_t sizeInBytes,
107-
infinity::requests::RequestToken *requestToken = NULL);
110+
OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
108111

109112
public:
110113

@@ -113,16 +116,16 @@ class QueuePair {
113116
*/
114117

115118
void multiWrite(infinity::memory::Buffer **buffers, uint32_t *sizesInBytes, uint64_t *localOffsets, uint32_t numberOfElements,
116-
infinity::memory::RegionToken *destination, uint64_t remoteOffset, infinity::requests::RequestToken *requestToken = NULL);
119+
infinity::memory::RegionToken *destination, uint64_t remoteOffset, OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
117120

118121
void sendWithImmediate(infinity::memory::Buffer *buffer, uint64_t localOffset, uint32_t sizeInBytes, uint32_t immediateValue,
119-
infinity::requests::RequestToken *requestToken = NULL);
122+
OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
120123

121124
void writeWithImmediate(infinity::memory::Buffer *buffer, uint64_t localOffset, infinity::memory::RegionToken *destination, uint64_t remoteOffset,
122-
uint32_t sizeInBytes, uint32_t immediateValue, infinity::requests::RequestToken *requestToken = NULL);
125+
uint32_t sizeInBytes, uint32_t immediateValue, OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
123126

124127
void multiWriteWithImmediate(infinity::memory::Buffer **buffers, uint32_t *sizesInBytes, uint64_t *localOffsets, uint32_t numberOfElements,
125-
infinity::memory::RegionToken *destination, uint64_t remoteOffset, uint32_t immediateValue, infinity::requests::RequestToken *requestToken = NULL);
128+
infinity::memory::RegionToken *destination, uint64_t remoteOffset, uint32_t immediateValue, OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
126129

127130
public:
128131

@@ -132,10 +135,10 @@ class QueuePair {
132135

133136
void compareAndSwap(infinity::memory::RegionToken *destination, uint64_t compare, uint64_t swap, infinity::requests::RequestToken *requestToken = NULL);
134137
void compareAndSwap(infinity::memory::RegionToken *destination, infinity::memory::Atomic *previousValue, uint64_t compare, uint64_t swap,
135-
infinity::requests::RequestToken *requestToken = NULL);
138+
OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
136139
void fetchAndAdd(infinity::memory::RegionToken *destination, uint64_t add, infinity::requests::RequestToken *requestToken = NULL);
137140
void fetchAndAdd(infinity::memory::RegionToken *destination, infinity::memory::Atomic *previousValue, uint64_t add,
138-
infinity::requests::RequestToken *requestToken = NULL);
141+
OperationFlags flags, infinity::requests::RequestToken *requestToken = NULL);
139142

140143
protected:
141144

@@ -147,8 +150,6 @@ class QueuePair {
147150
void *userData;
148151
uint32_t userDataSize;
149152

150-
int defaultFlags;
151-
152153
};
153154

154155
} /* namespace queues */

0 commit comments

Comments
 (0)