Skip to content

Null reference exception on Upsert #942

@lance-cognitiv

Description

@lance-cognitiv

Stack trace:

 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at FASTER.core.FasterKV`2.HandleRetryStatus[Input,Output,Context,FasterSession](OperationStatus internalStatus, FasterSession fasterSession, PendingContext`3& pendingContext)
   at Cognitiv.Leo.DataApi.Server.MemoryCache`1.AddOrUpdate(TKey key, Int32 payloadSize, Action`1 serializer) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/MessageCache.cs:line 156
   --- End of inner exception stack trace ---
   at Cognitiv.Leo.DataApi.Server.MemoryCache`1.AddOrUpdate(TKey key, Int32 payloadSize, Action`1 serializer) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/MessageCache.cs:line 237
   at Cognitiv.Leo.DataApi.Server.MessageCache.AddOrUpdate(Int64 partitionKey, Int32 messageId, MessageContainer messageContainer) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/MessageCache.cs:line 293
   at Cognitiv.Leo.DataApi.Server.ProjectionEngine.ExecuteQueryAsync(Dictionary`2 featureSetBases, Dictionary`2 features, Object featureLock, Dictionary`2 messages, Object messageLock, QueryMessagesOperation operation, Func`2 valueAllocator, Func`2 arrayAllocator) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/ProjectionEngine.cs:line 267
   at Cognitiv.Leo.DataApi.Server.ProjectionEngine.ProjectAsync(ProjectionRequest request, FlatBufferBuilder responseBuilder) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/ProjectionEngine.cs:line 677
   at Cognitiv.Leo.DataApi.Server.ProjectRequestHandler.HandleAsync(HttpContext httpContext) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/ProjectRequestHandler.cs:line 85```

I added some logs in the exception in an attempt to capture the underlying issue. This seems to happen under high load scenarios, and I have not been able to reproduce it. I am currently using faster version 2.6.5.

The error pops up once, and then the server becomes unresponsive and CPU gets pinned.

Perf metrics:

  49.24%  [kernel]                            [k] arch_local_irq_enable
   7.59%  memfd:doublemapper (deleted)        [.] 0x00000000028492ec
   6.80%  [kernel]                            [k] do_sched_yield
   6.29%  memfd:doublemapper (deleted)        [.] 0x00000000028493c4
   4.31%  memfd:doublemapper (deleted)        [.] 0x00000000023e0290
   3.29%  [kernel]                            [k] el0_svc
   1.70%  libc.so.6 (deleted)                 [.] 0x00000000000ceccc
   1.27%  [kernel]                            [k] get_random_u16
   1.11%  memfd:doublemapper (deleted)        [.] 0x00000000023e02c0
   0.78%  memfd:doublemapper (deleted)        [.] 0x00000000023e0284
   0.75%  memfd:doublemapper (deleted)        [.] 0x0000000002849e8c
   0.74%  memfd:doublemapper (deleted)        [.] 0x00000000023e02e8
   0.65%  [kernel]                            [k] schedule_debug.constprop.0
   0.56%  [kernel]                            [k] __schedule
   0.53%  [kernel]                            [k] do_notify_resume
   0.52%  [kernel]                            [k] invoke_syscall
   0.48%  memfd:doublemapper (deleted)        [.] 0x0000000002848d9c

Nominal perf metrics:

   3.93%  libcoreclr.so                       [.] 0x0000000000316988
   3.79%  [kernel]                            [k] arch_local_irq_enable
   3.06%  libcoreclr.so                       [.] 0x00000000001f139c
   2.11%  libcoreclr.so                       [.] 0x00000000001f144c
   1.85%  memfd:doublemapper (deleted)        [.] 0x000000000240314c
   1.41%  libcoreclr.so                       [.] 0x00000000001f1398
   1.29%  libcoreclr.so                       [.] 0x000000000035ccf8
   1.22%  agent                               [.] _start
   0.90%  memfd:doublemapper (deleted)        [.] 0x0000000000000078
   0.89%  [kernel]                            [k] el0_svc
   0.87%  memfd:doublemapper (deleted)        [.] 0x00000000010edeb0
   0.84%  memfd:doublemapper (deleted)        [.] 0x0000000000000038
   0.80%  [kernel]                            [k] arch_local_irq_restore
   0.79%  memfd:doublemapper (deleted)        [.] 0x00000000022887f8
   0.78%  memfd:doublemapper (deleted)        [.] 0x00000000023ec6b8
   0.76%  [vdso]                              [.] 0x0000000000000548
   0.71%  memfd:doublemapper (deleted)        [.] 0x00000000019f4278
   0.66%  libcoreclr.so                       [.] 0x000000000035ccf4
   0.63%  memfd:doublemapper (deleted)        [.] 0x00000000023eee68
   0.59%  memfd:doublemapper (deleted)        [.] 0x0000000002403390
   0.58%  [kernel]                            [k] default_idle_call
   0.53%  libcoreclr.so                       [.] 0x0000000000316990
   0.53%  [kernel]                            [k] do_sched_yield
   0.51%  memfd:doublemapper (deleted)        [.] 0x00000000019edf68

Memory:

ubuntu@ip-10-38-64-176:~$ free -h
               total        used        free      shared  buff/cache   available
Mem:            61Gi        14Gi        42Gi        50Mi       4.6Gi        46Gi
Swap:             0B          0B          0B

Code:

using System.Runtime.InteropServices;
using Cognitiv.Leo.Shared;
using FASTER.core;
using StatsdClient;

namespace Cognitiv.Leo.DataApi.Server;

[StructLayout(LayoutKind.Sequential)]
public struct EntryInfo
{
	public int HitCount;

	public void Serialize(Span<byte> buffer)
	{
		BinaryPrimitives.WriteInt32LittleEndian(buffer, HitCount);
	}

	public static EntryInfo Deserialize(ReadOnlySpan<byte> buffer)
	{
		return new EntryInfo
		{
			HitCount = BinaryPrimitives.ReadInt32LittleEndian(buffer)
		};
	}

	public static int Size => sizeof(int);
}

public class SpanByteFunctions : SpanByteFunctions<SpanByte, ArraySegment<byte>, Empty>
{
	public Func<int, ArraySegment<byte>>? Allocator { get; set; }

	public override bool SingleReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref ArraySegment<byte> dst, ref ReadInfo readInfo)
	{
		dst = ArrayAllocator.Allocate(value.LengthWithoutMetadata, Allocator);
		value.AsSpan().CopyTo(dst);
		return true;
	}

	public override bool ConcurrentReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref ArraySegment<byte> dst, ref ReadInfo readInfo)
	{
		dst = ArrayAllocator.Allocate(value.LengthWithoutMetadata, Allocator);
		value.AsSpan().CopyTo(dst);
		return true;
	}
}

public abstract class MemoryCache<TKey> where TKey : struct
{
	private const int MaxStackallocSize = 1024;
	private readonly FasterKV<SpanByte, SpanByte> store;
	private readonly ThreadLocal<(SpanByteFunctions functions, ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions> session)> sessions;
	private readonly SimpleObjectPool<UnmanagedBuffer> payloadBufferPool;

	public MemoryCache(long memorySizeBytes)
	{
		var settings = new FasterKVSettings<SpanByte, SpanByte>(null)
		{
			MemorySize = memorySizeBytes,
			ReadCacheEnabled = false
		};
		store = new FasterKV<SpanByte, SpanByte>(settings);
		sessions = new ThreadLocal<(SpanByteFunctions functions, ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions> session)>(
			() => {
				var functions = new SpanByteFunctions();
				var session = store.For(functions).NewSession<SpanByteFunctions>();
				return (functions, session);
			},
			trackAllValues: true);
		payloadBufferPool = new SimpleObjectPool<UnmanagedBuffer>(() => new UnmanagedBuffer(1 << 14));
	}

	protected abstract int GetKeySize();
	protected abstract void SerializeKey(TKey key, Span<byte> buffer);

	public bool Read(
		TKey key, 
		Func<int, ArraySegment<byte>>? valueAllocator,
		out EntryInfo entryInfo,
		out ArraySegment<byte> value)
	{
		ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions>? session = null;
		var gotSession = false;
		var keySerialized = false;
		var spanByteSerialized = false;
		var read = false;
		var entryDeserialized = false;

		try
		{
			entryInfo = default;
			value = default;

			var (functions, s) = sessions.Value;
			gotSession = true;

			functions.Allocator = valueAllocator;
			session = s;

			// Stack allocation is safe because session completion always called
			Span<byte> keyBuffer = stackalloc byte[GetKeySize()];
			SerializeKey(key, keyBuffer);
			keySerialized = true;

			var keySpanByte = SpanByte.FromFixedSpan(keyBuffer);
			spanByteSerialized = true;

			var status = session.Read(keySpanByte, out var output);
			read = true;
			if (!status.Found)
				return false;
		
			entryInfo = EntryInfo.Deserialize(output[..EntryInfo.Size]);
			entryDeserialized = true;

			value = output[EntryInfo.Size..];
			return true;
		}
		catch (Exception e)
		{
			var keySerializedInException = true;
			var keyHex = string.Empty;
			try
			{
				Span<byte> keyBuffer = new byte[GetKeySize()];
				SerializeKey(key, keyBuffer);
				keyHex = Convert.ToHexString(keyBuffer);
			}
			catch (Exception)
			{
				keySerializedInException = false;
			}

			var createdNewSession = true;
			try
			{
				var functions = new SpanByteFunctions();
				sessions.Value = (functions, store.For(functions).NewSession<SpanByteFunctions>());
			}
			catch (Exception)
			{
				createdNewSession = false;
			}

			var sessionIsNull = session is null;
			throw new Exception($"Exception on Read. Key={keyHex}, GotSession={gotSession}, SessionIsNull={sessionIsNull}, KeySerialized={keySerialized}, SpanByteSerialized={spanByteSerialized}, Read={read}, EntryDeserialized={entryDeserialized}, KeySerializedInException={keySerializedInException}, CreatedNewSession={createdNewSession}", e);
		}
	}

	public void AddOrUpdate(
		TKey key, 
		int payloadSize, 
		Action<Span<byte>> serializer)
	{
		ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions>? session = null;
		var gotSession = false;
		var keySerialized = false;
		var spanByteSerialized = false;
		var isSmallPayloadSerialized = false;
		var isLargePayloadSerialized = false;
		var isUpserted = false;
		try
		{
			var (_, s) = sessions.Value;
			gotSession = true;
			session = s;

			Span<byte> keyBuffer = stackalloc byte[GetKeySize()];
			SerializeKey(key, keyBuffer);
			keySerialized = true;

			var keySpanByte = SpanByte.FromFixedSpan(keyBuffer);
			spanByteSerialized = true;

			var entryInfo = new EntryInfo { HitCount = 0 };
		
			var size = EntryInfo.Size + payloadSize;
			if (size <= MaxStackallocSize)
			{
				Span<byte> buffer = stackalloc byte[size];
			
				entryInfo.Serialize(buffer[..EntryInfo.Size]);
				serializer(buffer[EntryInfo.Size..]);
				isSmallPayloadSerialized = true;

				session.Upsert(keySpanByte, SpanByte.FromFixedSpan(buffer));
				isUpserted = true;
			}
			else
			{
				var buffer = payloadBufferPool.Get();
				try
				{
					var span = buffer.AsSpan(size);

					entryInfo.Serialize(span[..EntryInfo.Size]);
					serializer(span[EntryInfo.Size..]);
					isLargePayloadSerialized = true;

					session.Upsert(keySpanByte, SpanByte.FromFixedSpan(span));
					isUpserted = true;
				}
				finally
				{
					payloadBufferPool.Return(buffer);
				}
			}
		}
		catch (Exception e)
		{
			var keySerializedInException = true;
			var keyHex = string.Empty;
			try
			{
				Span<byte> keyBuffer = new byte[GetKeySize()];
				SerializeKey(key, keyBuffer);
				keyHex = Convert.ToHexString(keyBuffer);
			}
			catch (Exception)
			{
				keySerializedInException = false;
			}

			var createdNewSession = true;
			try
			{
				var functions = new SpanByteFunctions();
				sessions.Value = (functions, store.For(functions).NewSession<SpanByteFunctions>());
			}
			catch (Exception)
			{
				createdNewSession = false;
			}

			var sessionIsNull = session is null;
			throw new Exception($"Exception on AddOrUpdate. Key={keyHex}, GotSession={gotSession}, SessionIsNull={sessionIsNull}, KeySerialized={keySerialized}, SpanByteSerialized={spanByteSerialized}, IsSmallPayloadSerialized={isSmallPayloadSerialized}, IsLargePayloadSerialized={isLargePayloadSerialized}, IsUpserted={isUpserted}, KeySerializedInException={keySerializedInException}, CreatedNewSession={createdNewSession}", e);
		}
	}
}

public class MessageCache : MemoryCache<(long, int)>
{
	private Dictionary<int, long> messageTtls = new();

	public MessageCache() : base(10L * 1024L * 1024L * 1024L)
	{
	}

	public void SetTtls(Dictionary<int, long> ttls)
	{
		messageTtls = ttls;
	}

	protected override int GetKeySize() => sizeof(int) + sizeof(long);

	protected override void SerializeKey((long, int) key, Span<byte> buffer)
	{
		BitConverter.TryWriteBytes(buffer[..sizeof(long)], key.Item1);
		BitConverter.TryWriteBytes(buffer.Slice(sizeof(long), sizeof(int)), key.Item2);
	}

	public bool TryPopulate(
		long partitionKey, 
		int messageId, 
		MessageContainer messageContainer,
		Func<int, ArraySegment<byte>>? valueAllocator=null,
		Func<int, ArraySegment<ArraySegment<byte>>>? arrayAllocator=null)
	{
		if (!messageTtls.TryGetValue(messageId, out var ttl))
			return false;

		var key = (partitionKey, messageId);
		if (!Read(key, valueAllocator, out _, out var output))
			return false;

		var readEpoch = MessageContainer.DeserializeReadEpochMs(output);
		var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
		var staleness = now - readEpoch;
		if (staleness > ttl)
			return false;

		messageContainer.Deserialize(output, 0, arrayAllocator);
		return true;
	}

	public void AddOrUpdate(
		long partitionKey, 
		int messageId, 
		MessageContainer messageContainer)
	{
		var containerSize = messageContainer.GetSerializedSize();
		AddOrUpdate((partitionKey, messageId), containerSize, messageContainer.Serialize);
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions