diff --git a/examples/replication/fbSampleReplicator.cpp b/examples/replication/fbSampleReplicator.cpp index 8eac456b053..fa62653c48e 100644 --- a/examples/replication/fbSampleReplicator.cpp +++ b/examples/replication/fbSampleReplicator.cpp @@ -37,6 +37,7 @@ class ReplPlugin : public IReplicatedSessionImpl FB_BOOLEAN cleanupTransaction(ISC_INT64 number) override; FB_BOOLEAN deprecatedSetSequence(const char* name, ISC_INT64 value) override; FB_BOOLEAN setSequence2(const char* schemaName, const char* genName, ISC_INT64 value) override; + FB_BOOLEAN flushSequences() override; private: friend class ReplTransaction; @@ -264,6 +265,12 @@ FB_BOOLEAN ReplPlugin::setSequence2(const char* schemaName, const char* genName, return FB_TRUE; } +FB_BOOLEAN ReplPlugin::flushSequences() +{ + WriteLog(log, "%p\tflushSequences()\n"); + return FB_TRUE; +} + ReplTransaction::ReplTransaction(ReplPlugin* session, ITransaction* transaction, ISC_INT64 number): parent(session), trans(transaction) { diff --git a/src/include/firebird/FirebirdInterface.idl b/src/include/firebird/FirebirdInterface.idl index 4bce83e70a4..ff4a644ed8a 100644 --- a/src/include/firebird/FirebirdInterface.idl +++ b/src/include/firebird/FirebirdInterface.idl @@ -1830,6 +1830,7 @@ interface ReplicatedSession : PluginBase version: // 5.0 => 6.0 Alpha1 void setSequence2(Status status, const string schemaName, const string genName, int64 value); + void flushSequences(Status status); } // Profiler interfaces diff --git a/src/include/firebird/IdlFbInterfaces.h b/src/include/firebird/IdlFbInterfaces.h index ea71d25e6bc..dab643ff108 100644 --- a/src/include/firebird/IdlFbInterfaces.h +++ b/src/include/firebird/IdlFbInterfaces.h @@ -7328,6 +7328,7 @@ namespace Firebird void (CLOOP_CARG *cleanupTransaction)(IReplicatedSession* self, IStatus* status, ISC_INT64 number) CLOOP_NOEXCEPT; void (CLOOP_CARG *deprecatedSetSequence)(IReplicatedSession* self, IStatus* status, const char* name, ISC_INT64 value) CLOOP_NOEXCEPT; void (CLOOP_CARG *setSequence2)(IReplicatedSession* self, IStatus* status, const char* schemaName, const char* genName, ISC_INT64 value) CLOOP_NOEXCEPT; + void (CLOOP_CARG *flushSequences)(IReplicatedSession* self, IStatus* status) CLOOP_NOEXCEPT; }; protected: @@ -7385,6 +7386,19 @@ namespace Firebird static_cast(this->cloopVTable)->setSequence2(this, status, schemaName, genName, value); StatusType::checkException(status); } + + template void flushSequences(StatusType* status) + { + if (cloopVTable->version < 5) + { + StatusType::setVersionError(status, "IReplicatedSession", cloopVTable->version, 5); + StatusType::checkException(status); + return; + } + StatusType::clearException(status); + static_cast(this->cloopVTable)->flushSequences(this, status); + StatusType::checkException(status); + } }; #define FIREBIRD_IPROFILER_PLUGIN_VERSION 4u @@ -21387,6 +21401,7 @@ namespace Firebird this->cleanupTransaction = &Name::cloopcleanupTransactionDispatcher; this->deprecatedSetSequence = &Name::cloopdeprecatedSetSequenceDispatcher; this->setSequence2 = &Name::cloopsetSequence2Dispatcher; + this->flushSequences = &Name::cloopflushSequencesDispatcher; } } vTable; @@ -21465,6 +21480,20 @@ namespace Firebird } } + static void CLOOP_CARG cloopflushSequencesDispatcher(IReplicatedSession* self, IStatus* status) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + static_cast(self)->Name::flushSequences(&status2); + } + catch (...) + { + StatusType::catchException(&status2); + } + } + static void CLOOP_CARG cloopsetOwnerDispatcher(IPluginBase* self, IReferenceCounted* r) CLOOP_NOEXCEPT { try @@ -21534,6 +21563,7 @@ namespace Firebird virtual void cleanupTransaction(StatusType* status, ISC_INT64 number) = 0; virtual void deprecatedSetSequence(StatusType* status, const char* name, ISC_INT64 value) = 0; virtual void setSequence2(StatusType* status, const char* schemaName, const char* genName, ISC_INT64 value) = 0; + virtual void flushSequences(StatusType* status) = 0; }; template diff --git a/src/include/gen/Firebird.pas b/src/include/gen/Firebird.pas index eb0cfeae5a0..01d12c886ed 100644 --- a/src/include/gen/Firebird.pas +++ b/src/include/gen/Firebird.pas @@ -758,6 +758,7 @@ ISC_TIMESTAMP_TZ_EX = record IReplicatedSession_cleanupTransactionPtr = procedure(this: IReplicatedSession; status: IStatus; number: Int64); cdecl; IReplicatedSession_deprecatedSetSequencePtr = procedure(this: IReplicatedSession; status: IStatus; name: PAnsiChar; value: Int64); cdecl; IReplicatedSession_setSequence2Ptr = procedure(this: IReplicatedSession; status: IStatus; schemaName: PAnsiChar; genName: PAnsiChar; value: Int64); cdecl; + IReplicatedSession_flushSequencesPtr = procedure(this: IReplicatedSession; status: IStatus); cdecl; IProfilerPlugin_initPtr = procedure(this: IProfilerPlugin; status: IStatus; attachment: IAttachment; ticksFrequency: QWord); cdecl; IProfilerPlugin_startSessionPtr = function(this: IProfilerPlugin; status: IStatus; description: PAnsiChar; options: PAnsiChar; timestamp: ISC_TIMESTAMP_TZ): IProfilerSession; cdecl; IProfilerPlugin_flushPtr = procedure(this: IProfilerPlugin; status: IStatus); cdecl; @@ -3929,6 +3930,7 @@ ReplicatedSessionVTable = class(PluginBaseVTable) cleanupTransaction: IReplicatedSession_cleanupTransactionPtr; deprecatedSetSequence: IReplicatedSession_deprecatedSetSequencePtr; setSequence2: IReplicatedSession_setSequence2Ptr; + flushSequences: IReplicatedSession_flushSequencesPtr; end; IReplicatedSession = class(IPluginBase) @@ -3939,6 +3941,7 @@ IReplicatedSession = class(IPluginBase) procedure cleanupTransaction(status: IStatus; number: Int64); procedure deprecatedSetSequence(status: IStatus; name: PAnsiChar; value: Int64); procedure setSequence2(status: IStatus; schemaName: PAnsiChar; genName: PAnsiChar; value: Int64); + procedure flushSequences(status: IStatus); end; IReplicatedSessionImpl = class(IReplicatedSession) @@ -3953,6 +3956,7 @@ IReplicatedSessionImpl = class(IReplicatedSession) procedure cleanupTransaction(status: IStatus; number: Int64); virtual; abstract; procedure deprecatedSetSequence(status: IStatus; name: PAnsiChar; value: Int64); virtual; abstract; procedure setSequence2(status: IStatus; schemaName: PAnsiChar; genName: PAnsiChar; value: Int64); virtual; abstract; + procedure flushSequences(status: IStatus); virtual; abstract; end; ProfilerPluginVTable = class(PluginBaseVTable) @@ -10102,6 +10106,17 @@ procedure IReplicatedSession.setSequence2(status: IStatus; schemaName: PAnsiChar FbException.checkException(status); end; +procedure IReplicatedSession.flushSequences(status: IStatus); +begin + if (vTable.version < 5) then begin + FbException.setVersionError(status, 'IReplicatedSession', vTable.version, 5); + end + else begin + ReplicatedSessionVTable(vTable).flushSequences(Self, status); + end; + FbException.checkException(status); +end; + procedure IProfilerPlugin.init(status: IStatus; attachment: IAttachment; ticksFrequency: QWord); begin ProfilerPluginVTable(vTable).init(Self, status, attachment, ticksFrequency); @@ -17567,6 +17582,15 @@ procedure IReplicatedSessionImpl_setSequence2Dispatcher(this: IReplicatedSession end end; +procedure IReplicatedSessionImpl_flushSequencesDispatcher(this: IReplicatedSession; status: IStatus); cdecl; +begin + try + IReplicatedSessionImpl(this).flushSequences(status); + except + on e: Exception do FbException.catchException(status, e); + end +end; + var IReplicatedSessionImpl_vTable: ReplicatedSessionVTable; @@ -18956,6 +18980,7 @@ initialization IReplicatedSessionImpl_vTable.cleanupTransaction := @IReplicatedSessionImpl_cleanupTransactionDispatcher; IReplicatedSessionImpl_vTable.deprecatedSetSequence := @IReplicatedSessionImpl_deprecatedSetSequenceDispatcher; IReplicatedSessionImpl_vTable.setSequence2 := @IReplicatedSessionImpl_setSequence2Dispatcher; + IReplicatedSessionImpl_vTable.flushSequences := @IReplicatedSessionImpl_flushSequencesDispatcher; IProfilerPluginImpl_vTable := ProfilerPluginVTable.create; IProfilerPluginImpl_vTable.version := 4; diff --git a/src/jrd/replication/Publisher.cpp b/src/jrd/replication/Publisher.cpp index 3f5aa2b09be..18bee4ebd12 100644 --- a/src/jrd/replication/Publisher.cpp +++ b/src/jrd/replication/Publisher.cpp @@ -433,7 +433,15 @@ void REPL_trans_commit(thread_db* tdbb, jrd_tra* transaction) { const auto replicator = transaction->tra_replicator; if (!replicator) + { + const auto att = tdbb->getAttachment(); + if (att->att_replicator) + { + FbLocalStatus status; + att->att_replicator->flushSequences(&status); + } return; + } FbLocalStatus status; replicator->commit(&status); @@ -452,7 +460,15 @@ void REPL_trans_rollback(thread_db* tdbb, jrd_tra* transaction) { const auto replicator = transaction->tra_replicator; if (!replicator) + { + const auto att = tdbb->getAttachment(); + if (att->att_replicator) + { + FbLocalStatus status; + att->att_replicator->flushSequences(&status); + } return; + } FbLocalStatus status; replicator->rollback(&status); diff --git a/src/jrd/replication/Replicator.cpp b/src/jrd/replication/Replicator.cpp index a2e9f4b68db..bedf663f51a 100644 --- a/src/jrd/replication/Replicator.cpp +++ b/src/jrd/replication/Replicator.cpp @@ -207,18 +207,7 @@ void Replicator::commitTransaction(CheckStatusWrapper* status, Transaction* tran const auto dataLength = txnData.buffer->getCount() - sizeof(Block); fb_assert(txnData.flushes || dataLength > sizeof(UCHAR)); - for (const auto& generator : m_generators) - { - fb_assert(generator.name.object.hasData() && generator.name.schema.hasData()); - - const auto [schemaAtom, objectAtom] = txnData.defineQualifiedAtom(generator.name); - - txnData.putTag(opSetSequence); - txnData.putInt32(schemaAtom); - txnData.putInt32(objectAtom); - txnData.putInt64(generator.value); - } - + txnData.putGenerators(&m_generators); m_generators.clear(); txnData.putTag(opCommitTransaction); @@ -236,9 +225,13 @@ void Replicator::rollbackTransaction(CheckStatusWrapper* status, Transaction* tr { auto& txnData = transaction->getData(); - if (txnData.flushes) + if (txnData.flushes || m_generators.hasData()) { - txnData.putTag(opRollbackTransaction); + txnData.putGenerators(&m_generators); + m_generators.clear(); + + if (txnData.flushes) + txnData.putTag(opRollbackTransaction); flush(txnData, FLUSH_SYNC, BLOCK_END_TRANS); } } @@ -501,3 +494,26 @@ void Replicator::setSequence2(CheckStatusWrapper* status, ex.stuffException(status); } } + +void Replicator::flushSequences(CheckStatusWrapper* status) +{ + if (m_generators.isEmpty()) + return; + + try + { + BatchBlock block(getPool()); + block.header.traNumber = 0; + block.buffer = m_manager->getBuffer(); + block.header.length = (ULONG)block.buffer->getCount(); + + block.putGenerators(&m_generators); + m_generators.clear(); + + flush(block, FLUSH_SYNC); + } + catch (const Exception& ex) + { + ex.stuffException(status); + } +} diff --git a/src/jrd/replication/Replicator.h b/src/jrd/replication/Replicator.h index 38e0f0d01c0..7bbd9c12daa 100644 --- a/src/jrd/replication/Replicator.h +++ b/src/jrd/replication/Replicator.h @@ -42,6 +42,14 @@ namespace Replication typedef Firebird::ObjectsArray NameCache; typedef Firebird::HalfStaticArray SavepointStack; + struct GeneratorValue + { + Jrd::QualifiedName name; + SINT64 value = 0; + }; + + typedef Firebird::Array GeneratorCache; + struct BatchBlock { Block header{}; @@ -135,6 +143,21 @@ namespace Replication { buffer->add(data, length); } + + void putGenerators(const GeneratorCache* generators) + { + for (const auto& generator : *generators) + { + fb_assert(generator.name.object.hasData() && generator.name.schema.hasData()); + + const auto [schemaAtom, objectAtom] = defineQualifiedAtom(generator.name); + + putTag(opSetSequence); + putInt32(schemaAtom); + putInt32(objectAtom); + putInt64(generator.value); + } + } }; class Transaction final : @@ -255,14 +278,6 @@ namespace Replication BatchBlock m_data; }; - struct GeneratorValue - { - Jrd::QualifiedName name; - SINT64 value = 0; - }; - - typedef Firebird::Array GeneratorCache; - enum FlushReason { FLUSH_OVERFLOW, @@ -295,6 +310,8 @@ namespace Replication void setSequence2(Firebird::CheckStatusWrapper* status, const char* schemaName, const char* genName, SINT64 value) override; + void flushSequences(Firebird::CheckStatusWrapper* status) override; + private: Manager* const m_manager; const Config* const m_config;