|
2 | 2 | * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
3 | 3 | * Copyright (c) 2014 Artem Y. Polyakov <[email protected]>.
|
4 | 4 | * All rights reserved.
|
5 |
| - * Copyright (c) 2015-2016 Research Organization for Information Science |
| 5 | + * Copyright (c) 2015-2017 Research Organization for Information Science |
6 | 6 | * and Technology (RIST). All rights reserved.
|
7 | 7 | * Copyright (c) 2016 Mellanox Technologies, Inc.
|
8 | 8 | * All rights reserved.
|
@@ -142,46 +142,69 @@ static void lost_connection(pmix_peer_t *peer, pmix_status_t err)
|
142 | 142 | PMIX_REPORT_EVENT(err, _notify_complete);
|
143 | 143 | }
|
144 | 144 |
|
145 |
| -static pmix_status_t send_bytes(int sd, char **buf, size_t *remain) |
| 145 | +static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg) |
146 | 146 | {
|
147 | 147 | pmix_status_t ret = PMIX_SUCCESS;
|
148 |
| - int rc; |
149 |
| - char *ptr = *buf; |
150 |
| - while (0 < *remain) { |
151 |
| - rc = write(sd, ptr, *remain); |
152 |
| - if (rc < 0) { |
153 |
| - if (pmix_socket_errno == EINTR) { |
154 |
| - continue; |
155 |
| - } else if (pmix_socket_errno == EAGAIN) { |
156 |
| - /* tell the caller to keep this message on active, |
157 |
| - * but let the event lib cycle so other messages |
158 |
| - * can progress while this socket is busy |
159 |
| - */ |
160 |
| - ret = PMIX_ERR_RESOURCE_BUSY; |
161 |
| - goto exit; |
162 |
| - } else if (pmix_socket_errno == EWOULDBLOCK) { |
163 |
| - /* tell the caller to keep this message on active, |
164 |
| - * but let the event lib cycle so other messages |
165 |
| - * can progress while this socket is busy |
166 |
| - */ |
167 |
| - ret = PMIX_ERR_WOULD_BLOCK; |
168 |
| - goto exit; |
169 |
| - } |
| 148 | + struct iovec iov[2]; |
| 149 | + int iov_count; |
| 150 | + ssize_t remain = msg->sdbytes, rc; |
| 151 | + iov[0].iov_base = msg->sdptr; |
| 152 | + iov[0].iov_len = msg->sdbytes; |
| 153 | + if (!msg->hdr_sent && NULL != msg->data) { |
| 154 | + iov[1].iov_base = msg->data->base_ptr; |
| 155 | + iov[1].iov_len = ntohl(msg->hdr.nbytes); |
| 156 | + remain += ntohl(msg->hdr.nbytes); |
| 157 | + iov_count = 2; |
| 158 | + } else { |
| 159 | + iov_count = 1; |
| 160 | + } |
| 161 | +retry: |
| 162 | + rc = writev(sd, iov, iov_count); |
| 163 | + if (PMIX_LIKELY(rc == remain)) { |
| 164 | + /* we successfully sent the header and the msg data if any */ |
| 165 | + msg->hdr_sent = true; |
| 166 | + msg->sdbytes = 0; |
| 167 | + msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len; |
| 168 | + return PMIX_SUCCESS; |
| 169 | + } else if (rc < 0) { |
| 170 | + if (pmix_socket_errno == EINTR) { |
| 171 | + goto retry; |
| 172 | + } else if (pmix_socket_errno == EAGAIN) { |
| 173 | + /* tell the caller to keep this message on active, |
| 174 | + * but let the event lib cycle so other messages |
| 175 | + * can progress while this socket is busy |
| 176 | + */ |
| 177 | + return PMIX_ERR_RESOURCE_BUSY; |
| 178 | + } else if (pmix_socket_errno == EWOULDBLOCK) { |
| 179 | + /* tell the caller to keep this message on active, |
| 180 | + * but let the event lib cycle so other messages |
| 181 | + * can progress while this socket is busy |
| 182 | + */ |
| 183 | + return PMIX_ERR_WOULD_BLOCK; |
| 184 | + } else { |
170 | 185 | /* we hit an error and cannot progress this message */
|
171 |
| - pmix_output(0, "pmix_ptl_base_msg_send_bytes: write failed: %s (%d) [sd = %d]", |
| 186 | + pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]", |
172 | 187 | strerror(pmix_socket_errno),
|
173 | 188 | pmix_socket_errno, sd);
|
174 |
| - ret = PMIX_ERR_UNREACH; |
175 |
| - goto exit; |
| 189 | + return PMIX_ERR_UNREACH; |
176 | 190 | }
|
177 |
| - /* update location */ |
178 |
| - (*remain) -= rc; |
179 |
| - ptr += rc; |
| 191 | + } else { |
| 192 | + /* short writev. This usually means the kernel buffer is full, |
| 193 | + * so there is no point for retrying at that time. |
| 194 | + * simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */ |
| 195 | + if ((size_t)rc < msg->sdbytes) { |
| 196 | + /* partial write of the header or the msg data */ |
| 197 | + msg->sdptr = (char *)msg->sdptr + rc; |
| 198 | + msg->sdbytes -= rc; |
| 199 | + } else { |
| 200 | + /* header was fully written, but only a part of the msg data was written */ |
| 201 | + msg->hdr_sent = true; |
| 202 | + rc -= msg->sdbytes; |
| 203 | + msg->sdptr = (char *)msg->data->base_ptr + rc; |
| 204 | + msg->sdbytes = ntohl(msg->hdr.nbytes) - rc; |
| 205 | + } |
| 206 | + return PMIX_ERR_RESOURCE_BUSY; |
180 | 207 | }
|
181 |
| - /* we sent the full data block */ |
182 |
| -exit: |
183 |
| - *buf = ptr; |
184 |
| - return ret; |
185 | 208 | }
|
186 | 209 |
|
187 | 210 | static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
|
@@ -253,72 +276,30 @@ void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
|
253 | 276 | (NULL == msg) ? "NULL" : "NON-NULL");
|
254 | 277 |
|
255 | 278 | if (NULL != msg) {
|
256 |
| - if (!msg->hdr_sent) { |
| 279 | + pmix_output_verbose(2, pmix_globals.debug_output, |
| 280 | + "ptl:base:send_handler SENDING MSG"); |
| 281 | + if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) { |
| 282 | + // message is complete |
257 | 283 | pmix_output_verbose(2, pmix_globals.debug_output,
|
258 |
| - "ptl:base:send_handler SENDING HEADER"); |
259 |
| - if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) { |
260 |
| - /* header is completely sent */ |
261 |
| - pmix_output_verbose(2, pmix_globals.debug_output, |
262 |
| - "ptl:base:send_handler HEADER SENT"); |
263 |
| - msg->hdr_sent = true; |
264 |
| - /* setup to send the data */ |
265 |
| - if (NULL == msg->data) { |
266 |
| - /* this was a zero-byte msg - nothing more to do */ |
267 |
| - PMIX_RELEASE(msg); |
268 |
| - peer->send_msg = NULL; |
269 |
| - goto next; |
270 |
| - } else { |
271 |
| - /* send the data as a single block */ |
272 |
| - msg->sdptr = msg->data->base_ptr; |
273 |
| - msg->sdbytes = ntohl(msg->hdr.nbytes); |
274 |
| - } |
275 |
| - /* fall thru and let the send progress */ |
276 |
| - } else if (PMIX_ERR_RESOURCE_BUSY == rc || |
277 |
| - PMIX_ERR_WOULD_BLOCK == rc) { |
278 |
| - /* exit this event and let the event lib progress */ |
279 |
| - pmix_output_verbose(2, pmix_globals.debug_output, |
280 |
| - "ptl:base:send_handler RES BUSY OR WOULD BLOCK"); |
281 |
| - return; |
282 |
| - } else { |
283 |
| - // report the error |
284 |
| - event_del(&peer->send_event); |
285 |
| - peer->send_ev_active = false; |
286 |
| - PMIX_RELEASE(msg); |
287 |
| - peer->send_msg = NULL; |
288 |
| - lost_connection(peer, rc); |
289 |
| - return; |
290 |
| - } |
291 |
| - } |
292 |
| - |
293 |
| - if (msg->hdr_sent) { |
| 284 | + "ptl:base:send_handler MSG SENT"); |
| 285 | + PMIX_RELEASE(msg); |
| 286 | + peer->send_msg = NULL; |
| 287 | + } else if (PMIX_ERR_RESOURCE_BUSY == rc || |
| 288 | + PMIX_ERR_WOULD_BLOCK == rc) { |
| 289 | + /* exit this event and let the event lib progress */ |
294 | 290 | pmix_output_verbose(2, pmix_globals.debug_output,
|
295 |
| - "ptl:base:send_handler SENDING BODY OF MSG"); |
296 |
| - if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) { |
297 |
| - // message is complete |
298 |
| - pmix_output_verbose(2, pmix_globals.debug_output, |
299 |
| - "ptl:base:send_handler BODY SENT"); |
300 |
| - PMIX_RELEASE(msg); |
301 |
| - peer->send_msg = NULL; |
302 |
| - } else if (PMIX_ERR_RESOURCE_BUSY == rc || |
303 |
| - PMIX_ERR_WOULD_BLOCK == rc) { |
304 |
| - /* exit this event and let the event lib progress */ |
305 |
| - pmix_output_verbose(2, pmix_globals.debug_output, |
306 |
| - "ptl:base:send_handler RES BUSY OR WOULD BLOCK"); |
307 |
| - return; |
308 |
| - } else { |
309 |
| - // report the error |
310 |
| - pmix_output(0, "ptl:base:peer_send_handler: unable to send message ON SOCKET %d", |
311 |
| - peer->sd); |
312 |
| - event_del(&peer->send_event); |
313 |
| - peer->send_ev_active = false; |
314 |
| - PMIX_RELEASE(msg); |
315 |
| - peer->send_msg = NULL; |
316 |
| - lost_connection(peer, rc); |
317 |
| - return; |
318 |
| - } |
| 291 | + "ptl:base:send_handler RES BUSY OR WOULD BLOCK"); |
| 292 | + return; |
| 293 | + } else { |
| 294 | + // report the error |
| 295 | + event_del(&peer->send_event); |
| 296 | + peer->send_ev_active = false; |
| 297 | + PMIX_RELEASE(msg); |
| 298 | + peer->send_msg = NULL; |
| 299 | + lost_connection(peer, rc); |
| 300 | + return; |
319 | 301 | }
|
320 | 302 |
|
321 |
| - next: |
322 | 303 | /* if current message completed - progress any pending sends by
|
323 | 304 | * moving the next in the queue into the "on-deck" position. Note
|
324 | 305 | * that this doesn't mean we send the message right now - we will
|
|
0 commit comments