diff --git a/libs/cluster/Session/SlotVerification/ClusterSlotVerify.cs b/libs/cluster/Session/SlotVerification/ClusterSlotVerify.cs index d8b236fc08c..d87933c0001 100644 --- a/libs/cluster/Session/SlotVerification/ClusterSlotVerify.cs +++ b/libs/cluster/Session/SlotVerification/ClusterSlotVerify.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -using System; using System.Runtime.CompilerServices; using System.Threading; using Garnet.common; @@ -118,39 +117,17 @@ bool CanOperateOnKey(ref PinnedSpanByte key, int slot, bool readOnly) } } - ClusterSlotVerificationResult MultiKeySlotVerify(ClusterConfig config, ref Span keys, bool readOnly, byte sessionAsking, int count) - { - var _end = count < 0 ? keys.Length : count; - var slot = HashSlotUtils.HashSlot(keys[0]); - var verifyResult = SingleKeySlotVerify(ref config, ref keys[0], readOnly, sessionAsking, slot); - - for (var i = 1; i < _end; i++) - { - var _slot = HashSlotUtils.HashSlot(keys[i]); - var _verifyResult = SingleKeySlotVerify(ref config, ref keys[i], readOnly, sessionAsking, _slot); - - // Check if slot changes between keys - if (_slot != slot) - return new(SlotVerifiedState.CROSSSLOT, slot); - - // Check if state of key changes - if (_verifyResult.state != verifyResult.state) - return new(SlotVerifiedState.TRYAGAIN, slot); - } - - return verifyResult; - } - - ClusterSlotVerificationResult MultiKeySlotVerify(ClusterConfig config, ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi) + ClusterSlotVerificationResult MultiKeySlotVerify(ClusterConfig config, ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, bool isTxn) { // Find the first valid key and initialize slot/result var specIndex = 0; - (int firstIdx, int lastIdx, int step) searchArgs = default; - while (specIndex < csvi.keySpecs.Length && + // If slot verification is called from transaction manager, parse state contains consecutive keys so we can skip key search + (int firstIdx, int lastIdx, int step) searchArgs = isTxn ? (0, parseState.Count - 1, 1) : default; + while (specIndex < csvi.keySpecs?.Length && !parseState.TryGetKeySearchArgsFromSimpleKeySpec(csvi.keySpecs[specIndex], csvi.isSubCommand, out searchArgs)) specIndex++; - if (specIndex == csvi.keySpecs.Length) + if (specIndex == csvi.keySpecs?.Length && !isTxn) return default; ref var firstKey = ref parseState.GetArgSliceByRef(searchArgs.firstIdx); @@ -164,7 +141,7 @@ ClusterSlotVerificationResult MultiKeySlotVerify(ClusterConfig config, ref Sessi return verifyResult; // Verify keys from remaining specs - for (specIndex++; specIndex < csvi.keySpecs.Length; specIndex++) + for (specIndex++; specIndex < csvi.keySpecs?.Length; specIndex++) { if (!parseState.TryGetKeySearchArgsFromSimpleKeySpec(csvi.keySpecs[specIndex], csvi.isSubCommand, out searchArgs)) continue; diff --git a/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs b/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs index 307bb39c95d..b1574a73458 100644 --- a/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs +++ b/libs/cluster/Session/SlotVerification/RespClusterSlotVerify.cs @@ -7,7 +7,6 @@ using Garnet.common; using Garnet.server; using Microsoft.Extensions.Logging; -using Tsavorite.core; namespace Garnet.cluster { @@ -87,32 +86,6 @@ private void WriteClusterSlotVerificationMessage(ClusterConfig config, ClusterSl SendAndReset(ref dcurr, ref dend); } - /// - /// Check if read/write is permitted on an array of keys and generate appropriate resp response. - /// - /// - /// - /// - /// - /// - /// - /// - public bool NetworkKeyArraySlotVerify(Span keys, bool readOnly, byte sessionAsking, ref byte* dcurr, ref byte* dend, int count = -1) - { - // If cluster is not enabled or a transaction is running skip slot check - if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) - return false; - - var config = clusterProvider.clusterManager.CurrentConfig; - var vres = MultiKeySlotVerify(config, ref keys, readOnly, sessionAsking, count); - - if (vres.state == SlotVerifiedState.OK) - return false; - else - WriteClusterSlotVerificationMessage(config, vres, ref dcurr, ref dend); - return true; - } - /// /// Verify multi-key slot ownership /// @@ -121,13 +94,13 @@ public bool NetworkKeyArraySlotVerify(Span keys, bool readOnly, /// /// /// - public unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend) + public unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend, bool isTxn = false) { // If cluster is not enabled or a transaction is running skip slot check if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false; var config = clusterProvider.clusterManager.CurrentConfig; - var vres = MultiKeySlotVerify(config, ref parseState, ref csvi); + var vres = MultiKeySlotVerify(config, ref parseState, ref csvi, isTxn); if (vres.state == SlotVerifiedState.OK) return false; @@ -143,14 +116,15 @@ public unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, r /// /// /// + /// /// - public unsafe bool NetworkMultiKeySlotVerifyNoResponse(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend) + public unsafe bool NetworkMultiKeySlotVerifyNoResponse(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend, bool isTxn = false) { // If cluster is not enabled or a transaction is running skip slot check if (!clusterProvider.serverOptions.EnableCluster || txnManager.state == TxnState.Running) return false; var config = clusterProvider.clusterManager.CurrentConfig; - var vres = MultiKeySlotVerify(config, ref parseState, ref csvi); + var vres = MultiKeySlotVerify(config, ref parseState, ref csvi, isTxn); return vres.state != SlotVerifiedState.OK; } diff --git a/libs/server/Cluster/IClusterSession.cs b/libs/server/Cluster/IClusterSession.cs index 9e18621f51f..255e46957c3 100644 --- a/libs/server/Cluster/IClusterSession.cs +++ b/libs/server/Cluster/IClusterSession.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -using System; using Garnet.common; using Garnet.server.ACL; using Tsavorite.core; @@ -86,11 +85,6 @@ public interface IClusterSession /// public void WriteCachedSlotVerificationMessage(ref MemoryResult output); - /// - /// Key array slot verify (write result to network) - /// - unsafe bool NetworkKeyArraySlotVerify(Span keys, bool readOnly, byte SessionAsking, ref byte* dcurr, ref byte* dend, int count = -1); - /// /// Array slot verify (write result to network) /// @@ -98,8 +92,9 @@ public interface IClusterSession /// /// /// + /// /// - unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend); + unsafe bool NetworkMultiKeySlotVerify(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend, bool isTxn = false); /// /// Array slot verify with no response @@ -108,8 +103,9 @@ public interface IClusterSession /// /// /// + /// /// - unsafe bool NetworkMultiKeySlotVerifyNoResponse(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend); + unsafe bool NetworkMultiKeySlotVerifyNoResponse(ref SessionParseState parseState, ref ClusterSlotVerificationInput csvi, ref byte* dcurr, ref byte* dend, bool isTxn = false); /// /// Sets the currently authenticated in this session (used for permission checks) diff --git a/libs/server/Resp/Parser/SessionParseState.cs b/libs/server/Resp/Parser/SessionParseState.cs index 24676b33f58..f7b067aece1 100644 --- a/libs/server/Resp/Parser/SessionParseState.cs +++ b/libs/server/Resp/Parser/SessionParseState.cs @@ -26,6 +26,16 @@ public unsafe struct SessionParseState /// public int Count; + /// + /// Get the allocated capacity of the argument buffer + /// + public int Capacity { get; } + + /// + /// Get a Span of the parsed parameters in the form an PinnedSpanByte + /// + public ReadOnlySpan Parameters => new(bufferPtr, Count); + /// /// Pointer to the slice of (which is always pinned) that is accessible within the range of this instance's arguments. /// @@ -41,17 +51,13 @@ public unsafe struct SessionParseState /// PinnedSpanByte[] rootBuffer; - /// - /// Get a Span of the parsed parameters in the form an PinnedSpanByte - /// - public ReadOnlySpan Parameters => new(bufferPtr, Count); - private SessionParseState(ref PinnedSpanByte[] rootBuffer, int rootCount, ref PinnedSpanByte* bufferPtr, int count) : this() { this.rootBuffer = rootBuffer; this.rootCount = rootCount; this.bufferPtr = bufferPtr; this.Count = count; + this.Capacity = rootBuffer.Length; } /// diff --git a/libs/server/Resp/RespServerSessionSlotVerify.cs b/libs/server/Resp/RespServerSessionSlotVerify.cs index 364e7e120fd..2e55671781c 100644 --- a/libs/server/Resp/RespServerSessionSlotVerify.cs +++ b/libs/server/Resp/RespServerSessionSlotVerify.cs @@ -1,9 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -using System; using System.Diagnostics; -using Tsavorite.core; namespace Garnet.server { @@ -12,16 +10,6 @@ namespace Garnet.server /// internal sealed unsafe partial class RespServerSession : ServerSessionBase { - /// - /// This method is used to verify slot ownership for provided array of key argslices. - /// - /// Array of key ArgSlice - /// Whether caller is going to perform a readonly or read/write operation - /// Key count if different than keys array length - /// True when ownership is verified, false otherwise - bool NetworkKeyArraySlotVerify(Span keys, bool readOnly, int count = -1) - => clusterSession != null && clusterSession.NetworkKeyArraySlotVerify(keys, readOnly, SessionAsking, ref dcurr, ref dend, count); - /// /// Validate if this command can be served based on the current slot assignment /// diff --git a/libs/server/Transaction/TransactionManager.cs b/libs/server/Transaction/TransactionManager.cs index a159369b2e3..f53e21d7f33 100644 --- a/libs/server/Transaction/TransactionManager.cs +++ b/libs/server/Transaction/TransactionManager.cs @@ -68,6 +68,8 @@ public sealed unsafe partial class TransactionManager private readonly RespServerSession respSession; readonly FunctionsState functionsState; internal readonly ScratchBufferAllocator scratchBufferAllocator; + internal readonly ScratchBufferAllocator txnScratchBufferAllocator; + internal SessionParseState txnKeysParseState; private readonly TsavoriteLog appendOnlyFile; internal readonly WatchedKeysContainer watchContainer; private readonly StateMachineDriver stateMachineDriver; @@ -135,7 +137,8 @@ internal TransactionManager( this.respSession = respSession; - watchContainer = new WatchedKeysContainer(initialSliceBufferSize, functionsState.watchVersionMap); + txnScratchBufferAllocator = new ScratchBufferAllocator(); + watchContainer = new WatchedKeysContainer(initialSliceBufferSize, functionsState.watchVersionMap, txnScratchBufferAllocator); keyEntries = new TxnKeyEntries(initialSliceBufferSize, unifiedTransactionalContext); this.scratchBufferAllocator = scratchBufferAllocator; @@ -149,7 +152,10 @@ internal TransactionManager( this.clusterEnabled = clusterEnabled; if (clusterEnabled) - keys = new PinnedSpanByte[initialKeyBufferSize]; + { + txnKeysParseState.Initialize(initialKeyBufferSize); + txnKeysParseState.Count = 0; + } Reset(false); } @@ -184,9 +190,13 @@ internal void Reset(bool isRunning) functionsState.StoredProcMode = false; this.PerformWrites = false; - // Reset cluster variables used for slot verification - this.saveKeyRecvBufferPtr = null; - this.keyCount = 0; + // Reset cluster key parse state + if (clusterEnabled) + { + txnKeysParseState.Count = 0; + saveKeyRecvBufferPtr = null; + txnScratchBufferAllocator.Reset(); + } } internal bool RunTransactionProc(byte id, ref CustomProcedureInput procInput, CustomTransactionProcedure proc, ref MemoryResult output, bool isReplaying = false) @@ -257,7 +267,6 @@ internal bool RunTransactionProc(byte id, ref CustomProcedureInput procInput, Cu scratchBufferAllocator.Reset(); } - return true; } @@ -328,13 +337,22 @@ internal void AddTransactionStoreType(StoreType storeType) internal string GetLockset() => keyEntries.GetLockset(); - internal void GetKeysForValidation(byte* recvBufferPtr, out PinnedSpanByte[] keys, out int keyCount, out bool readOnly) + internal void GetSlotVerificationInput(byte* recvBufferPtr, byte sessionAsking, out ClusterSlotVerificationInput clusterSlotVerificationInput) { - UpdateRecvBufferPtr(recvBufferPtr); + // Copy keys if buffer changed since last queued command + if (recvBufferPtr != saveKeyRecvBufferPtr) + { + CopyExistingKeysToScratchBuffer(); + saveKeyRecvBufferPtr = recvBufferPtr; + } + watchContainer.SaveKeysToKeyList(this); - keys = this.keys; - keyCount = this.keyCount; - readOnly = keyEntries.IsReadOnly; + clusterSlotVerificationInput = new ClusterSlotVerificationInput + { + readOnly = keyEntries.IsReadOnly, + sessionAsking = sessionAsking, + // We don't specify key specs here as slot verification will know to iterate over all keys in this context + }; } void BeginTransaction() diff --git a/libs/server/Transaction/TxnClusterSlotCheck.cs b/libs/server/Transaction/TxnClusterSlotCheck.cs index 1d17a1dee38..1a698f558d9 100644 --- a/libs/server/Transaction/TxnClusterSlotCheck.cs +++ b/libs/server/Transaction/TxnClusterSlotCheck.cs @@ -1,51 +1,51 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -using System; +using System.Diagnostics; using Tsavorite.core; namespace Garnet.server { sealed unsafe partial class TransactionManager { - // Keys involved in the current transaction - PinnedSpanByte[] keys; - int keyCount; - - internal byte* saveKeyRecvBufferPtr; readonly bool clusterEnabled; + internal byte* saveKeyRecvBufferPtr; /// /// Keep track of actual key accessed by command /// - /// - public void SaveKeyArgSlice(PinnedSpanByte argSlice) + /// + public void SaveKeyArgSlice(PinnedSpanByte keySlice) { // Execute method only if clusterEnabled if (!clusterEnabled) return; - // Grow the buffer if needed - if (keyCount >= keys.Length) + + var count = txnKeysParseState.Count; + + // Double the parse state buffer capacity if needed, and copy existing parameters to the extended buffer + if (count >= txnKeysParseState.Capacity) { - var oldKeys = keys; - keys = new PinnedSpanByte[keys.Length * 2]; - Array.Copy(oldKeys, keys, oldKeys.Length); + var oldParams = txnKeysParseState.Parameters; + txnKeysParseState.Initialize(count * 2); + txnKeysParseState.SetArguments(0, oldParams); } - keys[keyCount++] = argSlice; + txnKeysParseState.Count = count + 1; + txnKeysParseState.SetArgument(count, keySlice); } /// - /// Update argslice ptr if input buffer has been resized + /// Copy all existing keys into so they are independent of the old receive buffer. + /// Called when the receive buffer has been reallocated since keys were last stored. /// - /// - public unsafe void UpdateRecvBufferPtr(byte* recvBufferPtr) + public void CopyExistingKeysToScratchBuffer() { - // Execute method only if clusterEnabled - if (!clusterEnabled) return; - if (recvBufferPtr != saveKeyRecvBufferPtr) + Debug.Assert(clusterEnabled); + + for (var i = 0; i < txnKeysParseState.Count; i++) { - for (int i = 0; i < keyCount; i++) - keys[i].ptr = recvBufferPtr + (keys[i].ptr - saveKeyRecvBufferPtr); + ref var key = ref txnKeysParseState.GetArgSliceByRef(i); + key = txnScratchBufferAllocator.CreateArgSlice(key.ReadOnlySpan); } } } diff --git a/libs/server/Transaction/TxnRespCommands.cs b/libs/server/Transaction/TxnRespCommands.cs index 10c0f2b25d4..1483b4ec303 100644 --- a/libs/server/Transaction/TxnRespCommands.cs +++ b/libs/server/Transaction/TxnRespCommands.cs @@ -29,7 +29,7 @@ private bool NetworkMULTI() txnManager.txnStartHead = readHead; txnManager.state = TxnState.Started; txnManager.operationCntTxn = 0; - //Keep track of ptr for key verification when cluster mode is enabled + // Track receive buffer ptr for key pointer adjustment at EXEC time txnManager.saveKeyRecvBufferPtr = recvBufferPtr; while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) @@ -39,35 +39,42 @@ private bool NetworkMULTI() private bool NetworkEXEC() { - // pass over the EXEC in buffer during execution + // Pass over the EXEC in buffer during execution if (txnManager.state == TxnState.Running) { txnManager.Commit(); return true; - } + // Abort and reset the transaction - else if (txnManager.state == TxnState.Aborted) + if (txnManager.state == TxnState.Aborted) { while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_EXEC_ABORT, ref dcurr, dend)) SendAndReset(); txnManager.Reset(false); + txnManager.watchContainer.Reset(); return true; } - // start running transaction and setting readHead to first operation - else if (txnManager.state == TxnState.Started) + + // Start running transaction and setting readHead to first operation + if (txnManager.state == TxnState.Started) { - var _origReadHead = endReadHead; + var origReadHead = endReadHead; endReadHead = txnManager.txnStartHead; - txnManager.GetKeysForValidation(recvBufferPtr, out var keys, out int keyCount, out bool readOnly); - if (NetworkKeyArraySlotVerify(keys, readOnly, keyCount)) + if (clusterSession != null) { - logger?.LogWarning("Failed CheckClusterTxnKeys"); - txnManager.Reset(false); - txnManager.watchContainer.Reset(); - endReadHead = _origReadHead; - return true; + txnManager.GetSlotVerificationInput(recvBufferPtr, SessionAsking, out var clusterSlotVerificationInput); + + if (txnManager.txnKeysParseState.Count > 0 && + clusterSession.NetworkMultiKeySlotVerify(ref txnManager.txnKeysParseState, ref clusterSlotVerificationInput, ref dcurr, ref dend, isTxn: true)) + { + logger?.LogWarning("Failed CheckClusterTxnKeys"); + txnManager.Reset(false); + txnManager.watchContainer.Reset(); + endReadHead = origReadHead; + return true; + } } var startTxn = txnManager.Run(); @@ -79,17 +86,17 @@ private bool NetworkEXEC() } else { - endReadHead = _origReadHead; + endReadHead = origReadHead; WriteNullArray(); } return true; } + // EXEC without MULTI command while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_EXEC_WO_MULTI, ref dcurr, dend)) SendAndReset(); return true; - } /// @@ -177,6 +184,12 @@ private bool NetworkSKIP(RespCommand cmd) return true; } + if (clusterSession != null && recvBufferPtr != txnManager.saveKeyRecvBufferPtr) + { + txnManager.CopyExistingKeysToScratchBuffer(); + txnManager.saveKeyRecvBufferPtr = recvBufferPtr; + } + txnManager.LockKeys(commandInfo); while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_QUEUED, ref dcurr, dend)) @@ -198,6 +211,7 @@ private bool NetworkDISCARD() while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend)) SendAndReset(); txnManager.Reset(false); + txnManager.watchContainer.Reset(); return true; } diff --git a/libs/server/Transaction/TxnWatchedKeysContainer.cs b/libs/server/Transaction/TxnWatchedKeysContainer.cs index 4d588c0e46f..e32b4871efd 100644 --- a/libs/server/Transaction/TxnWatchedKeysContainer.cs +++ b/libs/server/Transaction/TxnWatchedKeysContainer.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. using System; -using System.Runtime.CompilerServices; using Tsavorite.core; namespace Garnet.server @@ -13,30 +12,26 @@ namespace Garnet.server internal sealed unsafe class WatchedKeysContainer { /// - /// Array to keep slice of keys inside keyBuffer + /// Array to keep watched keys data /// WatchedKeySlice[] keySlices; /// - /// Array to keep slice of keys inside keyBuffer + /// Version map for watch validation /// readonly WatchVersionMap versionMap; - readonly int initialWatchBufferSize = 1 << 16; readonly int initialSliceBufferSize; + readonly ScratchBufferAllocator txnScratchBufferAllocator; int sliceBufferSize; - int watchBufferSize; - byte[] watchBuffer; - byte* watchBufferPtr; - int watchBufferHeadAddress; int sliceCount; - public WatchedKeysContainer(int size, WatchVersionMap versionMap) + public WatchedKeysContainer(int size, WatchVersionMap versionMap, ScratchBufferAllocator txnScratchBufferAllocator) { this.versionMap = versionMap; - watchBufferHeadAddress = 0; sliceCount = 0; initialSliceBufferSize = size; + this.txnScratchBufferAllocator = txnScratchBufferAllocator; } /// @@ -45,13 +40,12 @@ public WatchedKeysContainer(int size, WatchVersionMap versionMap) public void Reset() { sliceCount = 0; - watchBufferPtr -= watchBufferHeadAddress; - watchBufferHeadAddress = 0; + txnScratchBufferAllocator.Reset(); } public bool RemoveWatch(PinnedSpanByte key) { - for (int i = 0; i < sliceCount; i++) + for (var i = 0; i < sliceCount; i++) { if (key.ReadOnlySpan.SequenceEqual(keySlices[i].slice.ReadOnlySpan)) { @@ -68,40 +62,19 @@ public void AddWatch(PinnedSpanByte key) { // Double the struct buffer sliceBufferSize = sliceBufferSize == 0 ? initialSliceBufferSize : sliceBufferSize * 2; - var _oldBuffer = keySlices; + var oldBuffer = keySlices; keySlices = GC.AllocateUninitializedArray(sliceBufferSize, true); - if (_oldBuffer != null) Array.Copy(_oldBuffer, keySlices, _oldBuffer.Length); - } - if (watchBufferHeadAddress + key.Length > watchBufferSize) - { - // Double the watch buffer - watchBufferSize = watchBufferSize == 0 ? initialWatchBufferSize : watchBufferSize * 2; - var _oldBuffer = watchBuffer; - watchBuffer = GC.AllocateUninitializedArray(watchBufferSize, true); - var watchBufferPtrBase = (byte*)Unsafe.AsPointer(ref watchBuffer[0]); - watchBufferPtr = watchBufferPtrBase + watchBufferHeadAddress; - - if (_oldBuffer != null) - { - Array.Copy(_oldBuffer, watchBuffer, _oldBuffer.Length); - var oldWatchBufferPtrBase = (byte*)Unsafe.AsPointer(ref _oldBuffer[0]); - - // Update pointer for existing watches - for (int i = 0; i < sliceCount; i++) - keySlices[i].slice.ptr = watchBufferPtrBase + (keySlices[i].slice.ptr - oldWatchBufferPtrBase); - } + if (oldBuffer != null) Array.Copy(oldBuffer, keySlices, oldBuffer.Length); } - var slice = PinnedSpanByte.FromPinnedPointer(watchBufferPtr, key.Length); - key.ReadOnlySpan.CopyTo(slice.Span); + // Copy key bytes into scratch buffer (independent of receive buffer lifetime) + var keySlice = txnScratchBufferAllocator.CreateArgSlice(key.ReadOnlySpan); - keySlices[sliceCount].slice = slice; + keySlices[sliceCount].slice = keySlice; keySlices[sliceCount].isWatched = true; - keySlices[sliceCount].hash = Utility.HashBytes(slice.ReadOnlySpan); + keySlices[sliceCount].hash = Utility.HashBytes(keySlice.ReadOnlySpan); keySlices[sliceCount].version = versionMap.ReadVersion(keySlices[sliceCount].hash); - watchBufferPtr += key.Length; - watchBufferHeadAddress += key.Length; sliceCount++; } @@ -111,9 +84,9 @@ public void AddWatch(PinnedSpanByte key) /// public bool ValidateWatchVersion() { - for (int i = 0; i < sliceCount; i++) + for (var i = 0; i < sliceCount; i++) { - WatchedKeySlice key = keySlices[i]; + var key = keySlices[i]; if (!key.isWatched) continue; if (versionMap.ReadVersion(key.hash) != key.version) return false; @@ -123,9 +96,9 @@ public bool ValidateWatchVersion() public bool SaveKeysToLock(TransactionManager txnManager) { - for (int i = 0; i < sliceCount; i++) + for (var i = 0; i < sliceCount; i++) { - WatchedKeySlice watchedKeySlice = keySlices[i]; + var watchedKeySlice = keySlices[i]; if (!watchedKeySlice.isWatched) continue; var slice = keySlices[i].slice; @@ -136,7 +109,7 @@ public bool SaveKeysToLock(TransactionManager txnManager) public bool SaveKeysToKeyList(TransactionManager txnManager) { - for (int i = 0; i < sliceCount; i++) + for (var i = 0; i < sliceCount; i++) { txnManager.SaveKeyArgSlice(keySlices[i].slice); }