Skip to content
103 changes: 85 additions & 18 deletions src/remote/client/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,8 @@ void Blob::getInfo(CheckStatusWrapper* status,
if (blob->rbl_info.getLocalInfo(itemsLength, items, bufferLength, buffer))
return;

fb_assert(!blob->isCached());

rem_port* port = rdb->rdb_port;
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);

Expand Down Expand Up @@ -1353,7 +1355,8 @@ void Blob::freeClientData(CheckStatusWrapper* status, bool force)

try
{
release_object(status, rdb, op_cancel_blob, blob->rbl_id);
if (!blob->isCached())
release_object(status, rdb, op_cancel_blob, blob->rbl_id);
}
catch (const Exception&)
{
Expand Down Expand Up @@ -1426,10 +1429,13 @@ void Blob::internalClose(CheckStatusWrapper* status)

if ((blob->rbl_flags & Rbl::CREATE) && blob->rbl_ptr != blob->rbl_buffer)
{
fb_assert(!blob->isCached());

send_blob(status, blob, 0, NULL);
}

release_object(status, rdb, op_close_blob, blob->rbl_id);
if (!blob->isCached())
release_object(status, rdb, op_close_blob, blob->rbl_id);
release_blob(blob);
blob = NULL;
}
Expand Down Expand Up @@ -1847,14 +1853,13 @@ IBlob* Attachment::createBlob(CheckStatusWrapper* status, ITransaction* apiTra,
p_blob->p_blob_bpb.cstr_address = NULL;

Rbl* blob = FB_NEW Rbl();
*blob_id = packet->p_resp.p_resp_blob_id;
blob->rbl_blob_id = *blob_id = packet->p_resp.p_resp_blob_id;
blob->rbl_rdb = rdb;
blob->rbl_rtr = transaction;
blob->rbl_id = packet->p_resp.p_resp_object;
blob->rbl_flags |= Rbl::CREATE;
SET_OBJECT(rdb, blob, blob->rbl_id);
blob->rbl_next = transaction->rtr_blobs;
transaction->rtr_blobs = blob;
transaction->rtr_blobs.add(blob);

IBlob* b = FB_NEW Blob(blob);
b->addRef();
Expand Down Expand Up @@ -3578,8 +3583,21 @@ ITransaction* Statement::execute(CheckStatusWrapper* status, ITransaction* apiTr
if (out_msg_length)
port->port_statement->rsr_message->msg_address = out_msg;

// Prepare to receive inline blobs
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
UCHAR blobInfo[64];

UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);

receive_packet(port, packet);

while (packet->p_operation == op_inline_blob)
{
fb_assert(transaction);
transaction->setupInlineBlob(p_blob);
receive_packet(port, packet);
}

if (packet->p_operation != op_sql_response)
REMOTE_check_response(status, rdb, packet);
else
Expand Down Expand Up @@ -3940,8 +3958,21 @@ ITransaction* Attachment::execute(CheckStatusWrapper* status, ITransaction* apiT
if (in_msg_length || out_msg_length)
port->port_statement->rsr_message->msg_address = out_msg;

// Prepare to receive inline blobs
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
UCHAR blobInfo[64];

UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);

receive_packet(rdb->rdb_port, packet);

while (packet->p_operation == op_inline_blob)
{
fb_assert(transaction);
transaction->setupInlineBlob(p_blob);
receive_packet(port, packet);
}

if (packet->p_operation != op_sql_response)
REMOTE_check_response(status, rdb, packet);
else
Expand Down Expand Up @@ -5507,6 +5538,8 @@ int Blob::getSegment(CheckStatusWrapper* status, unsigned int bufferLength, void
break;
}

fb_assert(!blob->isCached());

// Preparatory to asking for more data, use input buffer length
// to cue more efficient blob buffering.

Expand Down Expand Up @@ -5668,6 +5701,20 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
Rtr* transaction = remoteTransaction(apiTra);
CHECK_HANDLE(transaction, isc_bad_trans_handle);

if (transaction->rtr_blobs.locate(*id))
{
Rbl* blob = transaction->rtr_blobs.current();

if (!bpb_length)
{
Blob* iBlob = FB_NEW Blob(blob);
iBlob->addRef();
return iBlob;
}

release_blob(blob);
}

// Validate data length

CHECK_LENGTH(port, bpb_length);
Expand Down Expand Up @@ -5769,10 +5816,10 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
Rbl* blob = FB_NEW Rbl;
blob->rbl_rdb = rdb;
blob->rbl_rtr = transaction;
blob->rbl_blob_id = *id;
blob->rbl_id = packet->p_resp.p_resp_object;
SET_OBJECT(rdb, blob, blob->rbl_id);
blob->rbl_next = transaction->rtr_blobs;
transaction->rtr_blobs = blob;
transaction->rtr_blobs.add(blob);

Blob* iBlob = FB_NEW Blob(blob);
iBlob->addRef();
Expand Down Expand Up @@ -6619,6 +6666,11 @@ int Blob::seek(CheckStatusWrapper* status, int mode, int offset)

CHECK_HANDLE(blob, isc_bad_segstr_handle);

if (blob->isCached())
{
Arg::Gds(isc_wish_list).raise();
}

Rdb* rdb = blob->rbl_rdb;
CHECK_HANDLE(rdb, isc_bad_db_handle);
rem_port* port = rdb->rdb_port;
Expand Down Expand Up @@ -7843,6 +7895,12 @@ static void batch_dsql_fetch(rem_port* port,
// Avoid damaging preallocated buffer for response data
UseStandardBuffer guard(packet->p_resp.p_resp_data);

// Prepare to receive inline blobs
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
UCHAR blobInfo[64];

UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);

statement->rsr_flags.set(Rsr::FETCHED);
while (true)
{
Expand Down Expand Up @@ -7878,6 +7936,17 @@ static void batch_dsql_fetch(rem_port* port,
throw;
}

if (packet->p_operation == op_inline_blob)
{
fb_assert(!statement->rsr_rtr || statement->rsr_rtr->rtr_id == p_blob->p_tran_id);

Rtr* transaction = statement->rsr_rtr ?
statement->rsr_rtr : port->port_objects[p_blob->p_tran_id];

transaction->setupInlineBlob(p_blob);
continue;
}

if (packet->p_operation != op_fetch_response)
{
statement->rsr_flags.set(Rsr::STREAM_ERR);
Expand Down Expand Up @@ -9217,16 +9286,12 @@ static void release_blob( Rbl* blob)
**************************************/
Rtr* transaction = blob->rbl_rtr;
Rdb* rdb = blob->rbl_rdb;
rdb->rdb_port->releaseObject(blob->rbl_id);

for (Rbl** p = &transaction->rtr_blobs; *p; p = &(*p)->rbl_next)
{
if (*p == blob)
{
*p = blob->rbl_next;
break;
}
}
if (!blob->isCached())
rdb->rdb_port->releaseObject(blob->rbl_id);

if (transaction->rtr_blobs.locate(blob->rbl_blob_id))
transaction->rtr_blobs.fastRemove();

delete blob;
}
Expand Down Expand Up @@ -9384,8 +9449,8 @@ static void release_transaction( Rtr* transaction)
Rdb* rdb = transaction->rtr_rdb;
rdb->rdb_port->releaseObject(transaction->rtr_id);

while (transaction->rtr_blobs)
release_blob(transaction->rtr_blobs);
while (transaction->rtr_blobs.getFirst())
release_blob(transaction->rtr_blobs.current());

for (Rtr** p = &rdb->rdb_transactions; *p; p = &(*p)->rtr_next)
{
Expand Down Expand Up @@ -9433,6 +9498,8 @@ static void send_blob(CheckStatusWrapper* status,
* Actually send blob data (which might be buffered)
*
**************************************/
fb_assert(!blob->isCached());

Rdb* rdb = blob->rbl_rdb;
PACKET* packet = &rdb->rdb_packet;
packet->p_operation = op_put_segment;
Expand Down
7 changes: 6 additions & 1 deletion src/remote/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)

ULONG net_length = 0;
ULONG offset = 0;
dsc* const begin = format->fmt_desc.begin();

for (dsc* desc = format->fmt_desc.begin(); count; --count, ++desc)
for (dsc* desc = begin; count; --count, ++desc)
{
if (blr_length-- == 0)
return NULL;
Expand Down Expand Up @@ -262,6 +263,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
desc->dsc_dtype = dtype_quad;
desc->dsc_length = sizeof(SLONG) * 2;
desc->dsc_scale = *blr++;

format->fmt_blob_idx.add(desc - begin);
break;

case blr_float:
Expand Down Expand Up @@ -312,6 +315,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
USHORT textType = *blr++;
textType += (*blr++) << 8;
desc->setTextType(textType);

format->fmt_blob_idx.add(desc - begin);
}
break;

Expand Down
105 changes: 105 additions & 0 deletions src/remote/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ static bool_t xdr_trrq_blr(RemoteXdr*, CSTRING*);
static bool_t xdr_trrq_message(RemoteXdr*, USHORT);
static bool_t xdr_bytes(RemoteXdr*, void*, ULONG);
static bool_t xdr_blob_stream(RemoteXdr*, SSHORT, CSTRING*);
static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff);
static Rsr* getStatement(RemoteXdr*, USHORT);
static Rtr* getTransaction(RemoteXdr*, USHORT);


inline void fixupLength(const RemoteXdr* xdrs, ULONG& length)
Expand Down Expand Up @@ -1138,6 +1140,42 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
return P_TRUE(xdrs, p);
}

case op_inline_blob:
{
P_INLINE_BLOB* p_blob = &p->p_inline_blob;
MAP(xdr_short, reinterpret_cast<SSHORT&>(p_blob->p_tran_id));
MAP(xdr_quad, p_blob->p_blob_id);

if (xdrs->x_op == XDR_ENCODE)
{
MAP(xdr_response, p_blob->p_blob_info);
if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data))
return P_FALSE(xdrs, p);
}
else if (xdrs->x_op == XDR_DECODE)
{
Rtr* tran = getTransaction(xdrs, p_blob->p_tran_id);

if (!tran)
return P_FALSE(xdrs, p);

MAP(xdr_response, p_blob->p_blob_info);

Rbl* blb = tran->createInlineBlob();
p_blob->p_blob_data = &blb->rbl_data;

if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data))
{
tran->rtr_inline_blob = nullptr;
delete blb;

return P_FALSE(xdrs, p);
}
}

return P_TRUE(xdrs, p);
}

///case op_insert:
default:
#ifdef DEV_BUILD
Expand Down Expand Up @@ -2274,6 +2312,23 @@ static Rsr* getStatement(RemoteXdr* xdrs, USHORT statement_id)
return port->port_statement;
}

static Rtr* getTransaction(RemoteXdr* xdrs, USHORT tran_id)
{
rem_port* port = xdrs->x_public;

if (tran_id >= port->port_objects.getCount())
return nullptr;

try
{
return port->port_objects[tran_id];
}
catch (const status_exception&)
{
return nullptr;
}
}

static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* strmPortion)
{
if (xdrs->x_op == XDR_FREE)
Expand Down Expand Up @@ -2492,3 +2547,53 @@ static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* str

return TRUE;
}

static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff)
{
SLONG len;
UCHAR* data;
static const SCHAR filler[4] = { 0, 0, 0, 0 };

switch (xdrs->x_op)
{
case XDR_ENCODE:
len = buff->getCount();
if (!xdr_long(xdrs, &len))
return FALSE;

data = buff->begin();
if (len && !xdrs->x_putbytes(reinterpret_cast<const SCHAR*>(data), len))
return FALSE;

len = (4 - len) & 3;
if (len)
return xdrs->x_putbytes(filler, len);

return TRUE;

case XDR_DECODE:
if (!xdr_long(xdrs, &len))
return FALSE;

if (len)
{
data = buff->getBuffer(len);
if (!xdrs->x_getbytes(reinterpret_cast<SCHAR*>(data), len))
return FALSE;
}

len = (4 - len) & 3;
if (len)
{
SCHAR trash[4];
return xdrs->x_getbytes(trash, len);
}

return TRUE;

case XDR_FREE:
return TRUE;
}

return FALSE;
}
Loading
Loading