Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import io.grpc.Detachable;
import io.grpc.Drainable;
import io.grpc.HasByteBuffer;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.protobuf.ProtoUtils;
import io.netty.buffer.ByteBuf;
Expand All @@ -41,11 +43,12 @@
import java.util.Collections;
import java.util.List;
import org.apache.arrow.flight.grpc.AddWritableBuffer;
import org.apache.arrow.flight.grpc.GetReadableBuffer;
import org.apache.arrow.flight.impl.Flight.FlightData;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ForeignAllocation;
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
Expand All @@ -55,10 +58,14 @@
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.MetadataVersion;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The in-memory representation of FlightData used to manage a stream of Arrow messages. */
class ArrowMessage implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ArrowMessage.class);

// If true, deserialize Arrow data by giving Arrow a reference to the underlying gRPC buffer
// instead of copying the data. Defaults to true.
public static final boolean ENABLE_ZERO_COPY_READ;
Expand Down Expand Up @@ -312,8 +319,7 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
case APP_METADATA_TAG:
{
int size = readRawVarint32(stream);
appMetadata = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, appMetadata, size, ENABLE_ZERO_COPY_READ);
appMetadata = readBuffer(allocator, stream, size);
break;
}
case BODY_TAG:
Expand All @@ -323,8 +329,7 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
body = null;
}
int size = readRawVarint32(stream);
body = allocator.buffer(size);
GetReadableBuffer.readIntoBuffer(stream, body, size, ENABLE_ZERO_COPY_READ);
body = readBuffer(allocator, stream, size);
break;

default:
Expand Down Expand Up @@ -377,6 +382,121 @@ private static int readRawVarint32(int firstByte, InputStream is) throws IOExcep
return CodedInputStream.readRawVarint32(firstByte, is);
}

/**
* Reads data from the stream into an ArrowBuf, without copying data when possible.
*
* <p>First attempts to transfer ownership of the gRPC buffer to Arrow via {@link
* #wrapGrpcBuffer}. This avoids any memory copy when the gRPC transport provides a direct
* ByteBuffer (e.g., Netty).
*
* <p>If not possible (e.g., heap buffer, fragmented data, or unsupported transport), falls back
* to allocating a new buffer and copying data into it.
*
* @param allocator The allocator to use for buffer allocation
* @param stream The input stream to read from
* @param size The number of bytes to read
* @return An ArrowBuf containing the data
* @throws IOException if there is an error reading from the stream
*/
private static ArrowBuf readBuffer(BufferAllocator allocator, InputStream stream, int size)
throws IOException {
if (ENABLE_ZERO_COPY_READ) {
ArrowBuf zeroCopyBuf = wrapGrpcBuffer(stream, allocator, size);
if (zeroCopyBuf != null) {
return zeroCopyBuf;
}
}

// Fall back to allocating and copying
ArrowBuf buf = allocator.buffer(size);
byte[] heapBytes = new byte[size];
ByteStreams.readFully(stream, heapBytes);
buf.writeBytes(heapBytes);
buf.writerIndex(size);
return buf;
}

/**
* Attempts to wrap gRPC's buffer as an ArrowBuf without copying.
*
* <p>This method takes ownership of gRPC's underlying buffer via {@link Detachable#detach()} and
* wraps it as an ArrowBuf using {@link BufferAllocator#wrapForeignAllocation}. The gRPC buffer
* will be released when the ArrowBuf is closed.
*
* @param stream The gRPC-provided InputStream
* @param allocator The allocator to use for wrapping the foreign allocation
* @param size The number of bytes to wrap
* @return An ArrowBuf wrapping gRPC's buffer, or {@code null} if zero-copy is not possible
*/
static ArrowBuf wrapGrpcBuffer(
final InputStream stream, final BufferAllocator allocator, final int size) {

if (!(stream instanceof Detachable) || !(stream instanceof HasByteBuffer)) {
return null;
}

HasByteBuffer hasByteBuffer = (HasByteBuffer) stream;
if (!hasByteBuffer.byteBufferSupported()) {
return null;
}

ByteBuffer peekBuffer = hasByteBuffer.getByteBuffer();
if (peekBuffer == null) {
return null;
}
if (!peekBuffer.isDirect()) {
return null;
}
if (peekBuffer.remaining() < size) {
// Data is fragmented across multiple buffers; zero-copy not possible
return null;
}

// Take ownership
Detachable detachable = (Detachable) stream;
InputStream detachedStream = detachable.detach();

// Get buffer from detached stream
HasByteBuffer detachedHasByteBuffer = (HasByteBuffer) detachedStream;
ByteBuffer detachedByteBuffer = detachedHasByteBuffer.getByteBuffer();

if (detachedByteBuffer == null || !detachedByteBuffer.isDirect()) {
closeQuietly(detachedStream);
return null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we detached the stream, if we give up here, aren't we erroneously going to discard the actual data? My understanding of Detachable is that the original stream is now invalid, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. Actually this is redundant, the isDirect is already checked at the beginning of the method so I just removed it


// Calculate memory address accounting for buffer position
long baseAddress = MemoryUtil.getByteBufferAddress(detachedByteBuffer);
long dataAddress = baseAddress + detachedByteBuffer.position();

// Create ForeignAllocation with proper cleanup
ForeignAllocation foreignAllocation =
new ForeignAllocation(size, dataAddress) {
@Override
protected void release0() {
closeQuietly(detachedStream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we risk a race condition here where we close the stream (detached) before a write ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The buffer can't be written after detaching

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my point: the buffer can't be written, but nothing prevent to use the detached buffer to be written after the release. Let me recheck here.

}
};

try {
return allocator.wrapForeignAllocation(foreignAllocation);
} catch (Throwable t) {
// If it fails, clean up the detached stream and propagate
closeQuietly(detachedStream);
throw t;
}
}

private static void closeQuietly(InputStream stream) {
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
LOG.debug("Error closing detached gRPC stream", e);
}
}
}

/**
* Convert the ArrowMessage to an InputStream.
*
Expand Down

This file was deleted.

Loading
Loading