Skip to content
37 changes: 16 additions & 21 deletions src/dsql/DsqlBatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,6 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat

const auto statement = req->getDsqlStatement();

if (statement->getFlags() & DsqlStatement::FLAG_ORPHAN)
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_bad_req_handle));
}

switch (statement->getType())
{
case DsqlStatement::TYPE_INSERT:
Expand All @@ -229,7 +223,7 @@ DsqlBatch* DsqlBatch::open(thread_db* tdbb, DsqlDmlRequest* req, IMessageMetadat
}

const dsql_msg* message = statement->getSendMsg();
if (! (inMetadata && message && req->parseMetadata(inMetadata, message->msg_parameters)))
if (! (inMetadata && message && message->msg_parameter > 0))
{
ERRD_post(Arg::Gds(isc_sqlerr) << Arg::Num(-901) <<
Arg::Gds(isc_batch_param));
Expand Down Expand Up @@ -659,18 +653,23 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
// execute request
m_dsqlRequest->req_transaction = transaction;
Request* req = m_dsqlRequest->getRequest();
DsqlStatement* dStmt = m_dsqlRequest->getDsqlStatement();
fb_assert(req);

// prepare completion interface
AutoPtr<BatchCompletionState, SimpleDispose> completionState
(FB_NEW BatchCompletionState(m_flags & (1 << IBatch::TAG_RECORD_COUNTS), m_detailed));
AutoSetRestore<bool> batchFlag(&req->req_batch_mode, true);
const dsql_msg* message = m_dsqlRequest->getDsqlStatement()->getSendMsg();
const dsql_msg* sendMessage = dStmt->getSendMsg();
// map message to internal engine format
// Do it one time only to avoid parsing its metadata for every message
m_dsqlRequest->metadataToFormat(m_meta, sendMessage);
// Using of positional DML in batch is strange but not forbidden
m_dsqlRequest->mapCursorKey(tdbb);
bool startRequest = true;

bool isExecBlock = m_dsqlRequest->getDsqlStatement()->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const auto receiveMessage = isExecBlock ? m_dsqlRequest->getDsqlStatement()->getReceiveMsg() : nullptr;
auto receiveMsgBuffer = isExecBlock ? m_dsqlRequest->req_msg_buffers[receiveMessage->msg_buffer_number] : nullptr;
bool isExecBlock = dStmt->getType() == DsqlStatement::TYPE_EXEC_BLOCK;
const dsql_msg* receiveMessage = isExecBlock ? dStmt->getReceiveMsg() : nullptr;

// process messages
ULONG remains;
Expand Down Expand Up @@ -726,25 +725,18 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)
*id = newId;
}

// map message to internal engine format
// pass m_meta one time only to avoid parsing its metadata for every message
m_dsqlRequest->mapInOut(tdbb, false, message, start ? m_meta : nullptr, nullptr, data);
data += m_messageSize;
remains -= m_messageSize;

UCHAR* msgBuffer = m_dsqlRequest->req_msg_buffers[message->msg_buffer_number];
try
{
// runsend data to request and collect stats
ULONG before = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
EXE_send(tdbb, req, message->msg_number, message->msg_length, msgBuffer);
EXE_send(tdbb, req, sendMessage->msg_number, m_messageSize, data);
ULONG after = req->req_records_inserted + req->req_records_updated +
req->req_records_deleted;
completionState->regUpdate(after - before);

if (isExecBlock)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, receiveMsgBuffer);
if (receiveMessage)
EXE_receive(tdbb, req, receiveMessage->msg_number, receiveMessage->msg_length, nullptr); // We don't care about returned record
}
catch (const Exception& ex)
{
Expand All @@ -764,6 +756,9 @@ Firebird::IBatchCompletionState* DsqlBatch::execute(thread_db* tdbb)

startRequest = true;
}

data += m_messageSize;
remains -= m_messageSize;
}

UCHAR* alignedData = FB_ALIGN(data, m_alignment);
Expand Down
11 changes: 1 addition & 10 deletions src/dsql/DsqlCompilerScratch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,7 @@ dsql_var* DsqlCompilerScratch::resolveVariable(const MetaName& varName)
// Generate BLR for a return.
void DsqlCompilerScratch::genReturn(bool eosFlag)
{
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION));

if (hasEos && !eosFlag)
appendUChar(blr_begin);
const bool hasEos = !(flags & (FLAG_TRIGGER | FLAG_FUNCTION | FLAG_EXEC_BLOCK));

appendUChar(blr_send);
appendUChar(1);
Expand Down Expand Up @@ -455,12 +452,6 @@ void DsqlCompilerScratch::genReturn(bool eosFlag)
}

appendUChar(blr_end);

if (hasEos && !eosFlag)
{
appendUChar(blr_stall);
appendUChar(blr_end);
}
}

void DsqlCompilerScratch::genParameters(Array<NestConst<ParameterClause> >& parameters,
Expand Down
9 changes: 7 additions & 2 deletions src/dsql/DsqlCompilerScratch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ typedef Firebird::Pair<
Firebird::NonPooled<NestConst<ValueListNode>, NestConst<ValueListNode>>> ReturningClause;


// DSQL Compiler scratch block - may be discarded after compilation in the future.
// DSQL Compiler scratch block.
// Contains any kind of objects used during DsqlStatement compilation
// Is deleted with its pool as soon as DsqlStatement is fully formed in prepareStatement()
// or with the statement itself (if the statement reqested it returning true from shouldPreserveScratch())
class DsqlCompilerScratch : public BlrDebugWriter
{
public:
Expand All @@ -70,6 +73,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
static const unsigned FLAG_DDL = 0x2000;
static const unsigned FLAG_FETCH = 0x4000;
static const unsigned FLAG_VIEW_WITH_CHECK = 0x8000;
static const unsigned FLAG_EXEC_BLOCK = 0x010000;

static const unsigned MAX_NESTING = 512;

Expand Down Expand Up @@ -105,7 +109,7 @@ class DsqlCompilerScratch : public BlrDebugWriter

protected:
// DsqlCompilerScratch should never be destroyed using delete.
// It dies together with it's pool in release_request().
// It dies together with it's pool.
~DsqlCompilerScratch()
{
}
Expand Down Expand Up @@ -317,6 +321,7 @@ class DsqlCompilerScratch : public BlrDebugWriter
DsqlCompilerScratch* mainScratch = nullptr;
Firebird::NonPooledMap<USHORT, USHORT> outerMessagesMap; // <outer, inner>
Firebird::NonPooledMap<USHORT, USHORT> outerVarsMap; // <outer, inner>
dsql_msg* recordKeyMessage = nullptr; // Side message for positioned DML

private:
Firebird::HalfStaticArray<SelectExprNode*, 4> ctes; // common table expressions
Expand Down
114 changes: 94 additions & 20 deletions src/dsql/DsqlCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "../dsql/dsql_proto.h"
#include "../dsql/DsqlCursor.h"
#include "../dsql/StmtNodes.h"

using namespace Firebird;
using namespace Jrd;
Expand All @@ -36,10 +37,10 @@ static const char* const SCRATCH = "fb_cursor_";
static const ULONG PREFETCH_SIZE = 65536; // 64 KB

DsqlCursor::DsqlCursor(DsqlDmlRequest* req, ULONG flags)
: m_dsqlRequest(req), m_message(req->getDsqlStatement()->getReceiveMsg()),
m_resultSet(NULL), m_flags(flags),
m_space(req->getPool(), SCRATCH),
m_state(BOS), m_eof(false), m_position(0), m_cachedCount(0)
: m_dsqlRequest(req),
m_message(req->getDsqlStatement()->getReceiveMsg()->msg_number),
m_flags(flags),
m_space(req->getPool(), SCRATCH)
{
TRA_link_cursor(m_dsqlRequest->req_transaction, this);
}
Expand All @@ -48,6 +49,8 @@ DsqlCursor::~DsqlCursor()
{
if (m_resultSet)
m_resultSet->resetHandle();

delete[] m_keyBuffer;
}

jrd_tra* DsqlCursor::getTransaction() const
Expand All @@ -66,6 +69,31 @@ void DsqlCursor::setInterfacePtr(JResultSet* interfacePtr) noexcept
m_resultSet = interfacePtr;
}

bool DsqlCursor::getCurrentRecordKey(USHORT context, RecordKey& key) const
{
if (m_keyBuffer == nullptr)
{
// A possible situation for a cursor not based on any record source such as
// a = 1;
// suspend;
return false;
}

if (context * sizeof(RecordKey) >= m_keyBufferLength)
{
fb_assert(false);
return false;
}

if (m_state != POSITIONED)
{
return false;
}

key = m_keyBuffer[context];
return key.recordNumber.bid_relation_id != 0;
}

void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)
{
if (!cursor)
Expand All @@ -88,7 +116,7 @@ void DsqlCursor::close(thread_db* tdbb, DsqlCursor* cursor)

if (dsqlRequest->req_traced && TraceManager::need_dsql_free(attachment))
{
TraceSQLStatementImpl stmt(dsqlRequest, NULL);
TraceSQLStatementImpl stmt(dsqlRequest, nullptr, nullptr);
TraceManager::event_dsql_free(attachment, &stmt, DSQL_close);
}

Expand All @@ -115,6 +143,17 @@ int DsqlCursor::fetchNext(thread_db* tdbb, UCHAR* buffer)
return 1;
}

if (m_keyBufferLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
if (m_keyBufferLength > 0)
m_keyBuffer = FB_NEW_POOL(m_dsqlRequest->getPool()) RecordKey[req->req_rpb.getCount()];
}

if (m_keyBufferLength > 0)
m_dsqlRequest->gatherRecordKey(m_keyBuffer);

m_state = POSITIONED;
return 0;
}
Expand Down Expand Up @@ -163,7 +202,7 @@ int DsqlCursor::fetchAbsolute(thread_db* tdbb, UCHAR* buffer, SLONG position)
{
if (!m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, buffer);
fb_assert(m_eof);
}

Expand Down Expand Up @@ -248,7 +287,7 @@ void DsqlCursor::getInfo(thread_db* tdbb,
case IResultSet::INF_RECORD_COUNT:
if (isScrollable && !m_eof)
{
cacheInput(tdbb);
cacheInput(tdbb, nullptr);
fb_assert(m_eof);
}
response.insertInt(tag, isScrollable ? m_cachedCount : -1);
Expand Down Expand Up @@ -291,48 +330,83 @@ int DsqlCursor::fetchFromCache(thread_db* tdbb, UCHAR* buffer, FB_UINT64 positio
{
if (position >= m_cachedCount)
{
if (m_eof || !cacheInput(tdbb, position))
if (m_eof || !cacheInput(tdbb, buffer, position))
{
m_state = EOS;
return 1;
}
}

fb_assert(position < m_cachedCount);
fb_assert(m_messageLength > 0); // At this point m_messageLength must be set by cacheInput

UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];
FB_UINT64 offset = position * (m_messageLength + m_keyBufferLength);
FB_UINT64 readBytes = m_space.read(offset, buffer, m_messageLength);

const FB_UINT64 offset = position * m_message->msg_length;
const FB_UINT64 readBytes = m_space.read(offset, msgBuffer, m_message->msg_length);
fb_assert(readBytes == m_message->msg_length);
if (m_keyBufferLength > 0)
{
offset += m_messageLength;
readBytes += m_space.read(offset, m_keyBuffer, m_keyBufferLength);
}

m_dsqlRequest->mapInOut(tdbb, true, m_message, NULL, buffer);
fb_assert(readBytes == m_messageLength + m_keyBufferLength);

m_position = position;
m_state = POSITIONED;
return 0;
}

bool DsqlCursor::cacheInput(thread_db* tdbb, FB_UINT64 position)
bool DsqlCursor::cacheInput(thread_db* tdbb, UCHAR* buffer, FB_UINT64 position)
{
fb_assert(!m_eof);

const ULONG prefetchCount = MAX(PREFETCH_SIZE / m_message->msg_length, 1);
const UCHAR* const msgBuffer = m_dsqlRequest->req_msg_buffers[m_message->msg_buffer_number];
// It could not be done before: user buffer length may be unknown until call setDelayedOutputMetadata()
if (m_messageLength == 0)
{
Request* req = m_dsqlRequest->getRequest();
const MessageNode* msg = req->getStatement()->getMessage(m_message);
m_messageLength = msg->getFormat(req)->fmt_length;
m_keyBufferLength = req->req_rpb.getCount() * sizeof(RecordKey);
if (m_keyBufferLength > 0)
{
// Save record key unconditionally because setCursorName() can be called after openCursor()
m_keyBuffer = FB_NEW_POOL(m_dsqlRequest->getPool()) RecordKey[req->req_rpb.getCount()];
}
}

std::unique_ptr<UCHAR[]> ownBuffer;
if (buffer == nullptr)
{
// We are called from getInfo() and there is no user-provided buffer for data.
// Create a temporary one.
// This code cannot be moved into getInfo() itself because it is most likely called before fetch()
// so m_messageLength is still unknown there.
ownBuffer.reset(buffer = FB_NEW UCHAR[m_messageLength]);
}

const ULONG prefetchCount = MAX(PREFETCH_SIZE / (m_messageLength + m_keyBufferLength), 1);

while (position >= m_cachedCount)
{
for (ULONG count = 0; count < prefetchCount; count++)
{
if (!m_dsqlRequest->fetch(tdbb, NULL))
if (!m_dsqlRequest->fetch(tdbb, buffer))
{
m_eof = true;
break;
}

const FB_UINT64 offset = m_cachedCount * m_message->msg_length;
const FB_UINT64 writtenBytes = m_space.write(offset, msgBuffer, m_message->msg_length);
fb_assert(writtenBytes == m_message->msg_length);
FB_UINT64 offset = m_cachedCount * (m_messageLength + m_keyBufferLength);
FB_UINT64 writtenBytes = m_space.write(offset, buffer, m_messageLength);

if (m_keyBufferLength > 0)
{
offset += m_messageLength;
m_dsqlRequest->gatherRecordKey(m_keyBuffer);
writtenBytes += m_space.write(offset, m_keyBuffer, m_keyBufferLength);
}

fb_assert(writtenBytes == m_messageLength + m_keyBufferLength);
m_cachedCount++;
}

Expand Down
Loading
Loading