@@ -292,6 +292,18 @@ ClientConnection *ClientConnection_init(PlasmaManagerState *state,
292292 */
293293void ClientConnection_free (ClientConnection *client_conn);
294294
295+ void ClientConnection_start_request (ClientConnection *client_conn) {
296+ client_conn->cursor = 0 ;
297+ }
298+
299+ void ClientConnection_finish_request (ClientConnection *client_conn) {
300+ client_conn->cursor = -1 ;
301+ }
302+
303+ bool ClientConnection_request_finished (ClientConnection *client_conn) {
304+ return client_conn->cursor == -1 ;
305+ }
306+
295307std::unordered_map<ObjectID, std::vector<WaitRequest *>, UniqueIDHasher> &
296308object_wait_requests_from_type (PlasmaManagerState *manager_state, int type) {
297309 /* We use different types of hash tables for different requests. */
@@ -540,27 +552,21 @@ int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
540552 s = RayConfig::instance ().buf_size ();
541553 r = write (conn->fd , buf->data + conn->cursor , s);
542554
543- if (r != s) {
544- LOG_ERROR (" write failed, errno was %d" , errno);
545- if (r > 0 ) {
546- LOG_ERROR (" partial write on fd %d" , conn->fd );
547- } else {
548- return errno;
549- }
555+ int err;
556+ if (r <= 0 ) {
557+ LOG_ERROR (" Write error" );
558+ err = errno;
550559 } else {
551560 conn->cursor += r;
561+ CHECK (conn->cursor <= buf->data_size + buf->metadata_size );
562+ /* If we've finished writing this buffer, reset the cursor. */
563+ if (conn->cursor == buf->data_size + buf->metadata_size ) {
564+ LOG_DEBUG (" writing on channel %d finished" , conn->fd );
565+ ClientConnection_finish_request (conn);
566+ }
567+ err = 0 ;
552568 }
553- if (r == 0 ) {
554- /* If we've finished writing this buffer, reset the cursor to zero. */
555- LOG_DEBUG (" writing on channel %d finished" , conn->fd );
556- conn->cursor = 0 ;
557- /* We are done sending the object, so release it. The corresponding call to
558- * plasma_get occurred in process_transfer_request. */
559- ARROW_CHECK_OK (conn->manager_state ->plasma_conn ->Release (
560- buf->object_id .to_plasma_id ()));
561- }
562-
563- return 0 ;
569+ return err;
564570}
565571
566572void send_queued_request (event_loop *loop,
@@ -589,13 +595,14 @@ void send_queued_request(event_loop *loop,
589595 break ;
590596 case MessageType_PlasmaDataReply:
591597 LOG_DEBUG (" Transferring object to manager" );
592- if (conn-> cursor == 0 ) {
593- /* If the cursor is zero , we haven't sent any requests for this object
598+ if (ClientConnection_request_finished ( conn) ) {
599+ /* If the cursor is not set , we haven't sent any requests for this object
594600 * yet, so send the initial data request. */
595601 err = handle_sigpipe (
596602 plasma::SendDataReply (conn->fd , buf->object_id .to_plasma_id (),
597603 buf->data_size , buf->metadata_size ),
598604 conn->fd );
605+ ClientConnection_start_request (conn);
599606 }
600607 if (err == 0 ) {
601608 err = write_object_chunk (conn, buf);
@@ -605,24 +612,25 @@ void send_queued_request(event_loop *loop,
605612 LOG_FATAL (" Buffered request has unknown type." );
606613 }
607614
608- /* If there was a SIGPIPE , stop sending to this manager. */
615+ /* If the other side hung up , stop sending to this manager. */
609616 if (err != 0 ) {
610- /* If there was an ECONNRESET, this means that we haven't finished
611- * connecting to this manager yet. Resend the request when the socket is
612- * ready for a write again. */
613- if (err == ECONNRESET) {
614- return ;
617+ if (buf->type == MessageType_PlasmaDataReply) {
618+ /* We errored while sending the object, so release it before removing the
619+ * connection. The corresponding call to plasma_get occurred in
620+ * process_transfer_request. */
621+ ARROW_CHECK_OK (conn->manager_state ->plasma_conn ->Release (
622+ buf->object_id .to_plasma_id ()));
615623 }
616624 event_loop_remove_file (loop, conn->fd );
617625 ClientConnection_free (conn);
618- return ;
619- }
620-
621- /* If we are done sending this request, remove it from the transfer queue. */
622- if (conn->cursor == 0 ) {
626+ } else if (ClientConnection_request_finished (conn)) {
627+ /* If we are done with this request, remove it from the transfer queue. */
623628 if (buf->type == MessageType_PlasmaDataReply) {
624- /* If we just finished sending an object to a remote manager, then remove
625- * the object from the hash table of pending transfer requests. */
629+ /* We are done sending the object, so release it. The corresponding call
630+ * to plasma_get occurred in process_transfer_request. */
631+ ARROW_CHECK_OK (conn->manager_state ->plasma_conn ->Release (
632+ buf->object_id .to_plasma_id ()));
633+ /* Remove the object from the hash table of pending transfer requests. */
626634 conn->pending_object_transfers .erase (buf->object_id );
627635 }
628636 conn->transfer_queue .pop_front ();
@@ -642,21 +650,21 @@ int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) {
642650 }
643651 r = read (conn->fd , buf->data + conn->cursor , s);
644652
645- if (r == - 1 ) {
646- LOG_ERROR ( " read error " );
647- } else if (r == 0 ) {
648- LOG_DEBUG ( " end of file " ) ;
653+ int err;
654+ if (r <= 0 ) {
655+ LOG_ERROR ( " Read error " );
656+ err = errno ;
649657 } else {
650658 conn->cursor += r;
659+ CHECK (conn->cursor <= buf->data_size + buf->metadata_size );
660+ /* If the cursor is equal to the full object size, reset the cursor and
661+ * we're done. */
662+ if (conn->cursor == buf->data_size + buf->metadata_size ) {
663+ ClientConnection_finish_request (conn);
664+ }
665+ err = 0 ;
651666 }
652- /* If the cursor is equal to the full object size, reset the cursor and we're
653- * done. */
654- if (conn->cursor == buf->data_size + buf->metadata_size ) {
655- conn->cursor = 0 ;
656- return 1 ;
657- } else {
658- return 0 ;
659- }
667+ return err;
660668}
661669
662670void process_data_chunk (event_loop *loop,
@@ -666,27 +674,37 @@ void process_data_chunk(event_loop *loop,
666674 /* Read the object chunk. */
667675 ClientConnection *conn = (ClientConnection *) context;
668676 PlasmaRequestBuffer *buf = conn->transfer_queue .front ();
669- int done = read_object_chunk (conn, buf);
670- if (!done) {
671- return ;
677+ int err = read_object_chunk (conn, buf);
678+ auto plasma_conn = conn->manager_state ->plasma_conn ;
679+ if (err != 0 ) {
680+ /* Abort the object that we were trying to read from the remote plasma
681+ * manager. */
682+ ARROW_CHECK_OK (plasma_conn->Release (buf->object_id .to_plasma_id ()));
683+ ARROW_CHECK_OK (plasma_conn->Abort (buf->object_id .to_plasma_id ()));
684+ /* Remove the bad connection. */
685+ event_loop_remove_file (loop, data_sock);
686+ ClientConnection_free (conn);
687+ } else if (ClientConnection_request_finished (conn)) {
688+ /* If we're done receiving the object, seal the object and release it. The
689+ * release corresponds to the call to plasma_create that occurred in
690+ * process_data_request. */
691+ LOG_DEBUG (" reading on channel %d finished" , data_sock);
692+ /* The following seal also triggers notification of clients for fetch or
693+ * wait requests, see process_object_notification. */
694+ ARROW_CHECK_OK (plasma_conn->Seal (buf->object_id .to_plasma_id ()));
695+ ARROW_CHECK_OK (plasma_conn->Release (buf->object_id .to_plasma_id ()));
696+ /* Remove the request buffer used for reading this object's data. */
697+ conn->transfer_queue .pop_front ();
698+ delete buf;
699+ /* Switch to listening for requests from this socket, instead of reading
700+ * object data. */
701+ event_loop_remove_file (loop, data_sock);
702+ bool success = event_loop_add_file (loop, data_sock, EVENT_LOOP_READ,
703+ process_message, conn);
704+ if (!success) {
705+ ClientConnection_free (conn);
706+ }
672707 }
673-
674- /* Seal the object and release it. The release corresponds to the call to
675- * plasma_create that occurred in process_data_request. */
676- LOG_DEBUG (" reading on channel %d finished" , data_sock);
677- /* The following seal also triggers notification of clients for fetch or
678- * wait requests, see process_object_notification. */
679- ARROW_CHECK_OK (
680- conn->manager_state ->plasma_conn ->Seal (buf->object_id .to_plasma_id ()));
681- ARROW_CHECK_OK (
682- conn->manager_state ->plasma_conn ->Release (buf->object_id .to_plasma_id ()));
683- /* Remove the request buffer used for reading this object's data. */
684- conn->transfer_queue .pop_front ();
685- delete buf;
686- /* Switch to listening for requests from this socket, instead of reading
687- * object data. */
688- event_loop_remove_file (loop, data_sock);
689- event_loop_add_file (loop, data_sock, EVENT_LOOP_READ, process_message, conn);
690708}
691709
692710void ignore_data_chunk (event_loop *loop,
@@ -698,17 +716,22 @@ void ignore_data_chunk(event_loop *loop,
698716 PlasmaRequestBuffer *buf = conn->ignore_buffer ;
699717
700718 /* Just read the transferred data into ignore_buf and then drop (free) it. */
701- int done = read_object_chunk (conn, buf);
702- if (!done) {
703- return ;
719+ int err = read_object_chunk (conn, buf);
720+ if (err != 0 ) {
721+ event_loop_remove_file (loop, data_sock);
722+ ClientConnection_free (conn);
723+ } else if (ClientConnection_request_finished (conn)) {
724+ free (buf->data );
725+ delete buf;
726+ /* Switch to listening for requests from this socket, instead of reading
727+ * object data. */
728+ event_loop_remove_file (loop, data_sock);
729+ bool success = event_loop_add_file (loop, data_sock, EVENT_LOOP_READ,
730+ process_message, conn);
731+ if (!success) {
732+ ClientConnection_free (conn);
733+ }
704734 }
705-
706- free (buf->data );
707- delete buf;
708- /* Switch to listening for requests from this socket, instead of reading
709- * object data. */
710- event_loop_remove_file (loop, data_sock);
711- event_loop_add_file (loop, data_sock, EVENT_LOOP_READ, process_message, conn);
712735}
713736
714737ClientConnection *get_manager_connection (PlasmaManagerState *state,
@@ -829,29 +852,29 @@ void process_data_request(event_loop *loop,
829852 * conn->transfer_queue. */
830853 conn->transfer_queue .push_back (buf);
831854 }
832- CHECK (conn->cursor == 0 );
855+ CHECK (ClientConnection_request_finished (conn));
856+ ClientConnection_start_request (conn);
833857
834858 /* Switch to reading the data from this socket, instead of listening for
835859 * other requests. */
836860 event_loop_remove_file (loop, client_sock);
861+ event_loop_file_handler data_chunk_handler;
837862 if (s.ok ()) {
838- bool success = event_loop_add_file (loop, client_sock, EVENT_LOOP_READ,
839- process_data_chunk, conn);
840- if (!success) {
841- ClientConnection_free (conn);
842- }
863+ data_chunk_handler = process_data_chunk;
843864 } else {
844865 /* Since plasma_create() has failed, we ignore the data transfer. We will
845866 * receive this transfer in g_ignore_buf and then drop it. Allocate memory
846867 * for data and metadata, if needed. All memory associated with
847868 * buf/g_ignore_buf will be freed in ignore_data_chunkc(). */
848869 conn->ignore_buffer = buf;
849870 buf->data = (uint8_t *) malloc (buf->data_size + buf->metadata_size );
850- bool success = event_loop_add_file (loop, client_sock, EVENT_LOOP_READ,
851- ignore_data_chunk, conn);
852- if (!success) {
853- ClientConnection_free (conn);
854- }
871+ data_chunk_handler = ignore_data_chunk;
872+ }
873+
874+ bool success = event_loop_add_file (loop, client_sock, EVENT_LOOP_READ,
875+ data_chunk_handler, conn);
876+ if (!success) {
877+ ClientConnection_free (conn);
855878 }
856879}
857880
@@ -1328,7 +1351,7 @@ ClientConnection *ClientConnection_init(PlasmaManagerState *state,
13281351 /* Create a new data connection context per client. */
13291352 ClientConnection *conn = new ClientConnection ();
13301353 conn->manager_state = state;
1331- conn-> cursor = 0 ;
1354+ ClientConnection_finish_request ( conn) ;
13321355 conn->fd = client_sock;
13331356 conn->num_return_objects = 0 ;
13341357
0 commit comments