Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/replication/fbSampleReplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ReplPlugin : public IReplicatedSessionImpl<ReplPlugin, CheckStatusWrapper>
FB_BOOLEAN cleanupTransaction(ISC_INT64 number) override;
FB_BOOLEAN deprecatedSetSequence(const char* name, ISC_INT64 value) override;
FB_BOOLEAN setSequence2(const char* schemaName, const char* genName, ISC_INT64 value) override;
FB_BOOLEAN flushSequences() override;

private:
friend class ReplTransaction;
Expand Down Expand Up @@ -264,6 +265,12 @@ FB_BOOLEAN ReplPlugin::setSequence2(const char* schemaName, const char* genName,
return FB_TRUE;
}

FB_BOOLEAN ReplPlugin::flushSequences()
{
WriteLog(log, "%p\tflushSequences()\n");
return FB_TRUE;
}

ReplTransaction::ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number):
parent(session), trans(transaction)
{
Expand Down
1 change: 1 addition & 0 deletions src/include/firebird/FirebirdInterface.idl
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,7 @@ interface ReplicatedSession : PluginBase

version: // 5.0 => 6.0 Alpha1
void setSequence2(Status status, const string schemaName, const string genName, int64 value);
void flushSequences(Status status);
}

// Profiler interfaces
Expand Down
30 changes: 30 additions & 0 deletions src/include/firebird/IdlFbInterfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -7328,6 +7328,7 @@ namespace Firebird
void (CLOOP_CARG *cleanupTransaction)(IReplicatedSession* self, IStatus* status, ISC_INT64 number) CLOOP_NOEXCEPT;
void (CLOOP_CARG *deprecatedSetSequence)(IReplicatedSession* self, IStatus* status, const char* name, ISC_INT64 value) CLOOP_NOEXCEPT;
void (CLOOP_CARG *setSequence2)(IReplicatedSession* self, IStatus* status, const char* schemaName, const char* genName, ISC_INT64 value) CLOOP_NOEXCEPT;
void (CLOOP_CARG *flushSequences)(IReplicatedSession* self, IStatus* status) CLOOP_NOEXCEPT;
};

protected:
Expand Down Expand Up @@ -7385,6 +7386,19 @@ namespace Firebird
static_cast<VTable*>(this->cloopVTable)->setSequence2(this, status, schemaName, genName, value);
StatusType::checkException(status);
}

template <typename StatusType> void flushSequences(StatusType* status)
{
if (cloopVTable->version < 5)
{
StatusType::setVersionError(status, "IReplicatedSession", cloopVTable->version, 5);
StatusType::checkException(status);
return;
}
StatusType::clearException(status);
static_cast<VTable*>(this->cloopVTable)->flushSequences(this, status);
StatusType::checkException(status);
}
};

#define FIREBIRD_IPROFILER_PLUGIN_VERSION 4u
Expand Down Expand Up @@ -21387,6 +21401,7 @@ namespace Firebird
this->cleanupTransaction = &Name::cloopcleanupTransactionDispatcher;
this->deprecatedSetSequence = &Name::cloopdeprecatedSetSequenceDispatcher;
this->setSequence2 = &Name::cloopsetSequence2Dispatcher;
this->flushSequences = &Name::cloopflushSequencesDispatcher;
}
} vTable;

Expand Down Expand Up @@ -21465,6 +21480,20 @@ namespace Firebird
}
}

static void CLOOP_CARG cloopflushSequencesDispatcher(IReplicatedSession* self, IStatus* status) CLOOP_NOEXCEPT
{
StatusType status2(status);

try
{
static_cast<Name*>(self)->Name::flushSequences(&status2);
}
catch (...)
{
StatusType::catchException(&status2);
}
}

static void CLOOP_CARG cloopsetOwnerDispatcher(IPluginBase* self, IReferenceCounted* r) CLOOP_NOEXCEPT
{
try
Expand Down Expand Up @@ -21534,6 +21563,7 @@ namespace Firebird
virtual void cleanupTransaction(StatusType* status, ISC_INT64 number) = 0;
virtual void deprecatedSetSequence(StatusType* status, const char* name, ISC_INT64 value) = 0;
virtual void setSequence2(StatusType* status, const char* schemaName, const char* genName, ISC_INT64 value) = 0;
virtual void flushSequences(StatusType* status) = 0;
};

template <typename Name, typename StatusType, typename Base>
Expand Down
25 changes: 25 additions & 0 deletions src/include/gen/Firebird.pas
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ ISC_TIMESTAMP_TZ_EX = record
IReplicatedSession_cleanupTransactionPtr = procedure(this: IReplicatedSession; status: IStatus; number: Int64); cdecl;
IReplicatedSession_deprecatedSetSequencePtr = procedure(this: IReplicatedSession; status: IStatus; name: PAnsiChar; value: Int64); cdecl;
IReplicatedSession_setSequence2Ptr = procedure(this: IReplicatedSession; status: IStatus; schemaName: PAnsiChar; genName: PAnsiChar; value: Int64); cdecl;
IReplicatedSession_flushSequencesPtr = procedure(this: IReplicatedSession; status: IStatus); cdecl;
IProfilerPlugin_initPtr = procedure(this: IProfilerPlugin; status: IStatus; attachment: IAttachment; ticksFrequency: QWord); cdecl;
IProfilerPlugin_startSessionPtr = function(this: IProfilerPlugin; status: IStatus; description: PAnsiChar; options: PAnsiChar; timestamp: ISC_TIMESTAMP_TZ): IProfilerSession; cdecl;
IProfilerPlugin_flushPtr = procedure(this: IProfilerPlugin; status: IStatus); cdecl;
Expand Down Expand Up @@ -3929,6 +3930,7 @@ ReplicatedSessionVTable = class(PluginBaseVTable)
cleanupTransaction: IReplicatedSession_cleanupTransactionPtr;
deprecatedSetSequence: IReplicatedSession_deprecatedSetSequencePtr;
setSequence2: IReplicatedSession_setSequence2Ptr;
flushSequences: IReplicatedSession_flushSequencesPtr;
end;

IReplicatedSession = class(IPluginBase)
Expand All @@ -3939,6 +3941,7 @@ IReplicatedSession = class(IPluginBase)
procedure cleanupTransaction(status: IStatus; number: Int64);
procedure deprecatedSetSequence(status: IStatus; name: PAnsiChar; value: Int64);
procedure setSequence2(status: IStatus; schemaName: PAnsiChar; genName: PAnsiChar; value: Int64);
procedure flushSequences(status: IStatus);
end;

IReplicatedSessionImpl = class(IReplicatedSession)
Expand All @@ -3953,6 +3956,7 @@ IReplicatedSessionImpl = class(IReplicatedSession)
procedure cleanupTransaction(status: IStatus; number: Int64); virtual; abstract;
procedure deprecatedSetSequence(status: IStatus; name: PAnsiChar; value: Int64); virtual; abstract;
procedure setSequence2(status: IStatus; schemaName: PAnsiChar; genName: PAnsiChar; value: Int64); virtual; abstract;
procedure flushSequences(status: IStatus); virtual; abstract;
end;

ProfilerPluginVTable = class(PluginBaseVTable)
Expand Down Expand Up @@ -10102,6 +10106,17 @@ procedure IReplicatedSession.setSequence2(status: IStatus; schemaName: PAnsiChar
FbException.checkException(status);
end;

procedure IReplicatedSession.flushSequences(status: IStatus);
begin
if (vTable.version < 5) then begin
FbException.setVersionError(status, 'IReplicatedSession', vTable.version, 5);
end
else begin
ReplicatedSessionVTable(vTable).flushSequences(Self, status);
end;
FbException.checkException(status);
end;

procedure IProfilerPlugin.init(status: IStatus; attachment: IAttachment; ticksFrequency: QWord);
begin
ProfilerPluginVTable(vTable).init(Self, status, attachment, ticksFrequency);
Expand Down Expand Up @@ -17567,6 +17582,15 @@ procedure IReplicatedSessionImpl_setSequence2Dispatcher(this: IReplicatedSession
end
end;

procedure IReplicatedSessionImpl_flushSequencesDispatcher(this: IReplicatedSession; status: IStatus); cdecl;
begin
try
IReplicatedSessionImpl(this).flushSequences(status);
except
on e: Exception do FbException.catchException(status, e);
end
end;

var
IReplicatedSessionImpl_vTable: ReplicatedSessionVTable;

Expand Down Expand Up @@ -18956,6 +18980,7 @@ initialization
IReplicatedSessionImpl_vTable.cleanupTransaction := @IReplicatedSessionImpl_cleanupTransactionDispatcher;
IReplicatedSessionImpl_vTable.deprecatedSetSequence := @IReplicatedSessionImpl_deprecatedSetSequenceDispatcher;
IReplicatedSessionImpl_vTable.setSequence2 := @IReplicatedSessionImpl_setSequence2Dispatcher;
IReplicatedSessionImpl_vTable.flushSequences := @IReplicatedSessionImpl_flushSequencesDispatcher;

IProfilerPluginImpl_vTable := ProfilerPluginVTable.create;
IProfilerPluginImpl_vTable.version := 4;
Expand Down
16 changes: 16 additions & 0 deletions src/jrd/replication/Publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,15 @@ void REPL_trans_commit(thread_db* tdbb, jrd_tra* transaction)
{
const auto replicator = transaction->tra_replicator;
if (!replicator)
{
const auto att = tdbb->getAttachment();
if (att->att_replicator)
{
FbLocalStatus status;
att->att_replicator->flushSequences(&status);
}
return;
}

FbLocalStatus status;
replicator->commit(&status);
Expand All @@ -452,7 +460,15 @@ void REPL_trans_rollback(thread_db* tdbb, jrd_tra* transaction)
{
const auto replicator = transaction->tra_replicator;
if (!replicator)
{
const auto att = tdbb->getAttachment();
if (att->att_replicator)
{
FbLocalStatus status;
att->att_replicator->flushSequences(&status);
}
return;
}

FbLocalStatus status;
replicator->rollback(&status);
Expand Down
44 changes: 30 additions & 14 deletions src/jrd/replication/Replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,7 @@ void Replicator::commitTransaction(CheckStatusWrapper* status, Transaction* tran
const auto dataLength = txnData.buffer->getCount() - sizeof(Block);
fb_assert(txnData.flushes || dataLength > sizeof(UCHAR));

for (const auto& generator : m_generators)
{
fb_assert(generator.name.object.hasData() && generator.name.schema.hasData());

const auto [schemaAtom, objectAtom] = txnData.defineQualifiedAtom(generator.name);

txnData.putTag(opSetSequence);
txnData.putInt32(schemaAtom);
txnData.putInt32(objectAtom);
txnData.putInt64(generator.value);
}

txnData.putGenerators(&m_generators);
m_generators.clear();

txnData.putTag(opCommitTransaction);
Expand All @@ -236,9 +225,13 @@ void Replicator::rollbackTransaction(CheckStatusWrapper* status, Transaction* tr
{
auto& txnData = transaction->getData();

if (txnData.flushes)
if (txnData.flushes || m_generators.hasData())
{
txnData.putTag(opRollbackTransaction);
txnData.putGenerators(&m_generators);
m_generators.clear();

if (txnData.flushes)
txnData.putTag(opRollbackTransaction);
flush(txnData, FLUSH_SYNC, BLOCK_END_TRANS);
}
}
Expand Down Expand Up @@ -501,3 +494,26 @@ void Replicator::setSequence2(CheckStatusWrapper* status,
ex.stuffException(status);
}
}

void Replicator::flushSequences(CheckStatusWrapper* status)
{
if (m_generators.isEmpty())
return;

try
{
BatchBlock block(getPool());
block.header.traNumber = 0;
block.buffer = m_manager->getBuffer();
block.header.length = (ULONG)block.buffer->getCount();

block.putGenerators(&m_generators);
m_generators.clear();

flush(block, FLUSH_SYNC);
}
catch (const Exception& ex)
{
ex.stuffException(status);
}
}
33 changes: 25 additions & 8 deletions src/jrd/replication/Replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ namespace Replication
typedef Firebird::ObjectsArray<Firebird::string> NameCache;
typedef Firebird::HalfStaticArray<SavNumber, 16> SavepointStack;

struct GeneratorValue
{
Jrd::QualifiedName name;
SINT64 value = 0;
};

typedef Firebird::Array<GeneratorValue> GeneratorCache;

struct BatchBlock
{
Block header{};
Expand Down Expand Up @@ -135,6 +143,21 @@ namespace Replication
{
buffer->add(data, length);
}

void putGenerators(const GeneratorCache* generators)
{
for (const auto& generator : *generators)
{
fb_assert(generator.name.object.hasData() && generator.name.schema.hasData());

const auto [schemaAtom, objectAtom] = defineQualifiedAtom(generator.name);

putTag(opSetSequence);
putInt32(schemaAtom);
putInt32(objectAtom);
putInt64(generator.value);
}
}
};

class Transaction final :
Expand Down Expand Up @@ -255,14 +278,6 @@ namespace Replication
BatchBlock m_data;
};

struct GeneratorValue
{
Jrd::QualifiedName name;
SINT64 value = 0;
};

typedef Firebird::Array<GeneratorValue> GeneratorCache;

enum FlushReason
{
FLUSH_OVERFLOW,
Expand Down Expand Up @@ -295,6 +310,8 @@ namespace Replication
void setSequence2(Firebird::CheckStatusWrapper* status, const char* schemaName, const char* genName,
SINT64 value) override;

void flushSequences(Firebird::CheckStatusWrapper* status) override;

private:
Manager* const m_manager;
const Config* const m_config;
Expand Down
Loading