@@ -679,73 +679,6 @@ void IBVerbs :: dereg( SlotID id )
679679}
680680
681681
682- void IBVerbs :: blockingCompareAndSwap(SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap)
683- {
684- const MemorySlot & src = m_memreg.lookup ( srcSlot );
685- const MemorySlot & dst = m_memreg.lookup ( dstSlot);
686-
687- char * localAddr
688- = static_cast <char *>(src.glob [m_pid]._addr ) + srcOffset;
689- const char * remoteAddr
690- = static_cast <const char *>(dst.glob [dstPid]._addr ) + dstOffset;
691-
692- struct ibv_sge sge;
693- memset (&sge, 0 , sizeof (sge));
694- sge.addr = reinterpret_cast <uintptr_t >( localAddr );
695- sge.length = std::min<size_t >(size, m_maxMsgSize );
696- sge.lkey = src.mr ->lkey ;
697-
698- struct ibv_send_wr wr;
699- memset (&wr, 0 , sizeof (wr));
700- wr.wr_id = srcSlot;
701- wr.sg_list = &sge;
702- wr.next = NULL ; // this needs to be set, otherwise EINVAL return error in ibv_post_send
703- wr.num_sge = 1 ;
704- wr.opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
705- wr.send_flags = IBV_SEND_SIGNALED;
706- wr.wr .atomic .remote_addr = reinterpret_cast <uintptr_t >(remoteAddr);
707- wr.wr .atomic .compare_add = compare_add;
708- wr.wr .atomic .swap = swap;
709- wr.wr .atomic .rkey = dst.glob [dstPid]._rkey ;
710- struct ibv_send_wr *bad_wr;
711- int error;
712- std::vector<ibv_wc_opcode> opcodes;
713-
714- blockingCompareAndSwap:
715- if (int err = ibv_post_send (m_connectedQps[dstPid].get (), &wr, &bad_wr ))
716- {
717- LOG (1 , " Error while posting RDMA requests: " << std::strerror (err) );
718- throw Exception (" Error while posting RDMA requests" );
719- }
720-
721- /* *
722- * Keep waiting on a completion of events until you
723- * register a completed atomic compare-and-swap
724- */
725- do {
726- opcodes = wait_completion (error);
727- if (error) {
728- LOG (1 , " Error in wait_completion" );
729- std::abort ();
730- }
731- } while (std::find (opcodes.begin (), opcodes.end (), IBV_WC_COMP_SWAP) == opcodes.end ());
732-
733- uint64_t * remoteValueFound = reinterpret_cast <uint64_t *>(localAddr);
734- /*
735- * if we fetched the value we expected, then
736- * we are holding the lock now (that is, we swapped successfully!)
737- * else, re-post your request for the lock
738- */
739- if (remoteValueFound[0 ] != compare_add) {
740- LOG (4 , " Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0 ] << " compare_add = " << compare_add << " go on, iterate\n " );
741- goto blockingCompareAndSwap;
742- }
743- else {
744- LOG (4 , " Process " << m_pid << " reads value " << remoteValueFound[0 ] << " and expected = " << compare_add <<" gets the lock, done\n " );
745- }
746- // else we hold the lock and swap value into the remote slot ...
747- }
748-
749682void IBVerbs :: put( SlotID srcSlot, size_t srcOffset,
750683 int dstPid, SlotID dstSlot, size_t dstOffset, size_t size)
751684{
0 commit comments