Skip to content

Commit 2103f33

Browse files
committed
Send small blobs inline, initial version.
1 parent 65b8050 commit 2103f33

File tree

7 files changed

+482
-44
lines changed

7 files changed

+482
-44
lines changed

src/remote/client/interface.cpp

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,6 +1312,8 @@ void Blob::getInfo(CheckStatusWrapper* status,
13121312
if (blob->rbl_info.getLocalInfo(itemsLength, items, bufferLength, buffer))
13131313
return;
13141314

1315+
fb_assert(!blob->isCached());
1316+
13151317
rem_port* port = rdb->rdb_port;
13161318
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
13171319

@@ -1353,7 +1355,8 @@ void Blob::freeClientData(CheckStatusWrapper* status, bool force)
13531355

13541356
try
13551357
{
1356-
release_object(status, rdb, op_cancel_blob, blob->rbl_id);
1358+
if (!blob->isCached())
1359+
release_object(status, rdb, op_cancel_blob, blob->rbl_id);
13571360
}
13581361
catch (const Exception&)
13591362
{
@@ -1426,10 +1429,13 @@ void Blob::internalClose(CheckStatusWrapper* status)
14261429

14271430
if ((blob->rbl_flags & Rbl::CREATE) && blob->rbl_ptr != blob->rbl_buffer)
14281431
{
1432+
fb_assert(!blob->isCached());
1433+
14291434
send_blob(status, blob, 0, NULL);
14301435
}
14311436

1432-
release_object(status, rdb, op_close_blob, blob->rbl_id);
1437+
if (!blob->isCached())
1438+
release_object(status, rdb, op_close_blob, blob->rbl_id);
14331439
release_blob(blob);
14341440
blob = NULL;
14351441
}
@@ -1847,14 +1853,13 @@ IBlob* Attachment::createBlob(CheckStatusWrapper* status, ITransaction* apiTra,
18471853
p_blob->p_blob_bpb.cstr_address = NULL;
18481854

18491855
Rbl* blob = FB_NEW Rbl();
1850-
*blob_id = packet->p_resp.p_resp_blob_id;
1856+
blob->rbl_blob_id = *blob_id = packet->p_resp.p_resp_blob_id;
18511857
blob->rbl_rdb = rdb;
18521858
blob->rbl_rtr = transaction;
18531859
blob->rbl_id = packet->p_resp.p_resp_object;
18541860
blob->rbl_flags |= Rbl::CREATE;
18551861
SET_OBJECT(rdb, blob, blob->rbl_id);
1856-
blob->rbl_next = transaction->rtr_blobs;
1857-
transaction->rtr_blobs = blob;
1862+
transaction->rtr_blobs.add(blob);
18581863

18591864
IBlob* b = FB_NEW Blob(blob);
18601865
b->addRef();
@@ -3578,8 +3583,21 @@ ITransaction* Statement::execute(CheckStatusWrapper* status, ITransaction* apiTr
35783583
if (out_msg_length)
35793584
port->port_statement->rsr_message->msg_address = out_msg;
35803585

3586+
// Prepare to receive inline blobs
3587+
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
3588+
UCHAR blobInfo[64];
3589+
3590+
UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);
3591+
35813592
receive_packet(port, packet);
35823593

3594+
while (packet->p_operation == op_inline_blob)
3595+
{
3596+
fb_assert(transaction);
3597+
transaction->setupInlineBlob(p_blob);
3598+
receive_packet(port, packet);
3599+
}
3600+
35833601
if (packet->p_operation != op_sql_response)
35843602
REMOTE_check_response(status, rdb, packet);
35853603
else
@@ -3940,8 +3958,21 @@ ITransaction* Attachment::execute(CheckStatusWrapper* status, ITransaction* apiT
39403958
if (in_msg_length || out_msg_length)
39413959
port->port_statement->rsr_message->msg_address = out_msg;
39423960

3961+
// Prepare to receive inline blobs
3962+
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
3963+
UCHAR blobInfo[64];
3964+
3965+
UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);
3966+
39433967
receive_packet(rdb->rdb_port, packet);
39443968

3969+
while (packet->p_operation == op_inline_blob)
3970+
{
3971+
fb_assert(transaction);
3972+
transaction->setupInlineBlob(p_blob);
3973+
receive_packet(port, packet);
3974+
}
3975+
39453976
if (packet->p_operation != op_sql_response)
39463977
REMOTE_check_response(status, rdb, packet);
39473978
else
@@ -5507,6 +5538,8 @@ int Blob::getSegment(CheckStatusWrapper* status, unsigned int bufferLength, void
55075538
break;
55085539
}
55095540

5541+
fb_assert(!blob->isCached());
5542+
55105543
// Preparatory to asking for more data, use input buffer length
55115544
// to cue more efficient blob buffering.
55125545

@@ -5668,6 +5701,20 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
56685701
Rtr* transaction = remoteTransaction(apiTra);
56695702
CHECK_HANDLE(transaction, isc_bad_trans_handle);
56705703

5704+
if (transaction->rtr_blobs.locate(*id))
5705+
{
5706+
Rbl* blob = transaction->rtr_blobs.current();
5707+
5708+
if (!bpb_length)
5709+
{
5710+
Blob* iBlob = FB_NEW Blob(blob);
5711+
iBlob->addRef();
5712+
return iBlob;
5713+
}
5714+
5715+
release_blob(blob);
5716+
}
5717+
56715718
// Validate data length
56725719

56735720
CHECK_LENGTH(port, bpb_length);
@@ -5769,10 +5816,10 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
57695816
Rbl* blob = FB_NEW Rbl;
57705817
blob->rbl_rdb = rdb;
57715818
blob->rbl_rtr = transaction;
5819+
blob->rbl_blob_id = *id;
57725820
blob->rbl_id = packet->p_resp.p_resp_object;
57735821
SET_OBJECT(rdb, blob, blob->rbl_id);
5774-
blob->rbl_next = transaction->rtr_blobs;
5775-
transaction->rtr_blobs = blob;
5822+
transaction->rtr_blobs.add(blob);
57765823

57775824
Blob* iBlob = FB_NEW Blob(blob);
57785825
iBlob->addRef();
@@ -6619,6 +6666,11 @@ int Blob::seek(CheckStatusWrapper* status, int mode, int offset)
66196666

66206667
CHECK_HANDLE(blob, isc_bad_segstr_handle);
66216668

6669+
if (blob->isCached())
6670+
{
6671+
Arg::Gds(isc_wish_list).raise();
6672+
}
6673+
66226674
Rdb* rdb = blob->rbl_rdb;
66236675
CHECK_HANDLE(rdb, isc_bad_db_handle);
66246676
rem_port* port = rdb->rdb_port;
@@ -7843,6 +7895,12 @@ static void batch_dsql_fetch(rem_port* port,
78437895
// Avoid damaging preallocated buffer for response data
78447896
UseStandardBuffer guard(packet->p_resp.p_resp_data);
78457897

7898+
// Prepare to receive inline blobs
7899+
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
7900+
UCHAR blobInfo[64];
7901+
7902+
UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);
7903+
78467904
statement->rsr_flags.set(Rsr::FETCHED);
78477905
while (true)
78487906
{
@@ -7878,6 +7936,17 @@ static void batch_dsql_fetch(rem_port* port,
78787936
throw;
78797937
}
78807938

7939+
if (packet->p_operation == op_inline_blob)
7940+
{
7941+
fb_assert(!statement->rsr_rtr || statement->rsr_rtr->rtr_id == p_blob->p_tran_id);
7942+
7943+
Rtr* transaction = statement->rsr_rtr ?
7944+
statement->rsr_rtr : port->port_objects[p_blob->p_tran_id];
7945+
7946+
transaction->setupInlineBlob(p_blob);
7947+
continue;
7948+
}
7949+
78817950
if (packet->p_operation != op_fetch_response)
78827951
{
78837952
statement->rsr_flags.set(Rsr::STREAM_ERR);
@@ -9217,16 +9286,12 @@ static void release_blob( Rbl* blob)
92179286
**************************************/
92189287
Rtr* transaction = blob->rbl_rtr;
92199288
Rdb* rdb = blob->rbl_rdb;
9220-
rdb->rdb_port->releaseObject(blob->rbl_id);
92219289

9222-
for (Rbl** p = &transaction->rtr_blobs; *p; p = &(*p)->rbl_next)
9223-
{
9224-
if (*p == blob)
9225-
{
9226-
*p = blob->rbl_next;
9227-
break;
9228-
}
9229-
}
9290+
if (!blob->isCached())
9291+
rdb->rdb_port->releaseObject(blob->rbl_id);
9292+
9293+
if (transaction->rtr_blobs.locate(blob->rbl_blob_id))
9294+
transaction->rtr_blobs.fastRemove();
92309295

92319296
delete blob;
92329297
}
@@ -9384,8 +9449,8 @@ static void release_transaction( Rtr* transaction)
93849449
Rdb* rdb = transaction->rtr_rdb;
93859450
rdb->rdb_port->releaseObject(transaction->rtr_id);
93869451

9387-
while (transaction->rtr_blobs)
9388-
release_blob(transaction->rtr_blobs);
9452+
while (transaction->rtr_blobs.getFirst())
9453+
release_blob(transaction->rtr_blobs.current());
93899454

93909455
for (Rtr** p = &rdb->rdb_transactions; *p; p = &(*p)->rtr_next)
93919456
{
@@ -9433,6 +9498,8 @@ static void send_blob(CheckStatusWrapper* status,
94339498
* Actually send blob data (which might be buffered)
94349499
*
94359500
**************************************/
9501+
fb_assert(!blob->isCached());
9502+
94369503
Rdb* rdb = blob->rbl_rdb;
94379504
PACKET* packet = &rdb->rdb_packet;
94389505
packet->p_operation = op_put_segment;

src/remote/parser.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,9 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
161161

162162
ULONG net_length = 0;
163163
ULONG offset = 0;
164+
dsc* const begin = format->fmt_desc.begin();
164165

165-
for (dsc* desc = format->fmt_desc.begin(); count; --count, ++desc)
166+
for (dsc* desc = begin; count; --count, ++desc)
166167
{
167168
if (blr_length-- == 0)
168169
return NULL;
@@ -262,6 +263,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
262263
desc->dsc_dtype = dtype_quad;
263264
desc->dsc_length = sizeof(SLONG) * 2;
264265
desc->dsc_scale = *blr++;
266+
267+
format->fmt_blob_idx.add(desc - begin);
265268
break;
266269

267270
case blr_float:
@@ -312,6 +315,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
312315
USHORT textType = *blr++;
313316
textType += (*blr++) << 8;
314317
desc->setTextType(textType);
318+
319+
format->fmt_blob_idx.add(desc - begin);
315320
}
316321
break;
317322

src/remote/protocol.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ static bool_t xdr_trrq_blr(RemoteXdr*, CSTRING*);
120120
static bool_t xdr_trrq_message(RemoteXdr*, USHORT);
121121
static bool_t xdr_bytes(RemoteXdr*, void*, ULONG);
122122
static bool_t xdr_blob_stream(RemoteXdr*, SSHORT, CSTRING*);
123+
static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff);
123124
static Rsr* getStatement(RemoteXdr*, USHORT);
125+
static Rtr* getTransaction(RemoteXdr*, USHORT);
124126

125127

126128
inline void fixupLength(const RemoteXdr* xdrs, ULONG& length)
@@ -1138,6 +1140,42 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
11381140
return P_TRUE(xdrs, p);
11391141
}
11401142

1143+
case op_inline_blob:
1144+
{
1145+
P_INLINE_BLOB* p_blob = &p->p_inline_blob;
1146+
MAP(xdr_short, reinterpret_cast<SSHORT&>(p_blob->p_tran_id));
1147+
MAP(xdr_quad, p_blob->p_blob_id);
1148+
1149+
if (xdrs->x_op == XDR_ENCODE)
1150+
{
1151+
MAP(xdr_response, p_blob->p_blob_info);
1152+
if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data))
1153+
return P_FALSE(xdrs, p);
1154+
}
1155+
else if (xdrs->x_op == XDR_DECODE)
1156+
{
1157+
Rtr* tran = getTransaction(xdrs, p_blob->p_tran_id);
1158+
1159+
if (!tran)
1160+
return P_FALSE(xdrs, p);
1161+
1162+
MAP(xdr_response, p_blob->p_blob_info);
1163+
1164+
Rbl* blb = tran->createInlineBlob();
1165+
p_blob->p_blob_data = &blb->rbl_data;
1166+
1167+
if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data))
1168+
{
1169+
tran->rtr_inline_blob = nullptr;
1170+
delete blb;
1171+
1172+
return P_FALSE(xdrs, p);
1173+
}
1174+
}
1175+
1176+
return P_TRUE(xdrs, p);
1177+
}
1178+
11411179
///case op_insert:
11421180
default:
11431181
#ifdef DEV_BUILD
@@ -2274,6 +2312,23 @@ static Rsr* getStatement(RemoteXdr* xdrs, USHORT statement_id)
22742312
return port->port_statement;
22752313
}
22762314

2315+
static Rtr* getTransaction(RemoteXdr* xdrs, USHORT tran_id)
2316+
{
2317+
rem_port* port = xdrs->x_public;
2318+
2319+
if (tran_id >= port->port_objects.getCount())
2320+
return nullptr;
2321+
2322+
try
2323+
{
2324+
return port->port_objects[tran_id];
2325+
}
2326+
catch (const status_exception&)
2327+
{
2328+
return nullptr;
2329+
}
2330+
}
2331+
22772332
static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* strmPortion)
22782333
{
22792334
if (xdrs->x_op == XDR_FREE)
@@ -2492,3 +2547,53 @@ static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* str
24922547

24932548
return TRUE;
24942549
}
2550+
2551+
static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff)
2552+
{
2553+
SLONG len;
2554+
UCHAR* data;
2555+
static const SCHAR filler[4] = { 0, 0, 0, 0 };
2556+
2557+
switch (xdrs->x_op)
2558+
{
2559+
case XDR_ENCODE:
2560+
len = buff->getCount();
2561+
if (!xdr_long(xdrs, &len))
2562+
return FALSE;
2563+
2564+
data = buff->begin();
2565+
if (len && !xdrs->x_putbytes(reinterpret_cast<const SCHAR*>(data), len))
2566+
return FALSE;
2567+
2568+
len = (4 - len) & 3;
2569+
if (len)
2570+
return xdrs->x_putbytes(filler, len);
2571+
2572+
return TRUE;
2573+
2574+
case XDR_DECODE:
2575+
if (!xdr_long(xdrs, &len))
2576+
return FALSE;
2577+
2578+
if (len)
2579+
{
2580+
data = buff->getBuffer(len);
2581+
if (!xdrs->x_getbytes(reinterpret_cast<SCHAR*>(data), len))
2582+
return FALSE;
2583+
}
2584+
2585+
len = (4 - len) & 3;
2586+
if (len)
2587+
{
2588+
SCHAR trash[4];
2589+
return xdrs->x_getbytes(trash, len);
2590+
}
2591+
2592+
return TRUE;
2593+
2594+
case XDR_FREE:
2595+
return TRUE;
2596+
}
2597+
2598+
return FALSE;
2599+
}

0 commit comments

Comments
 (0)