Skip to content

Commit a75a552

Browse files
kazimuthrekhoff
andauthored
Significantly reduce small byte array allocations (#305)
This purges the DbValue type, instead using row instances themselves as primary key for rows without primary keys. In addition, it instantiates only a single BinaryReader when reading updates for a table, rather than instantiating a BinaryReader and performing an array copy per-row of the table. Addresses clockworklabs/SpacetimeDBPrivate#1633 ## API - [ ] This is an API breaking change to the SDK *If the API is breaking, please state below what will break* ## Requires SpacetimeDB PRs ## Testsuite SpacetimeDB branch name: master ## Testing *Write instructions for a test that you performed for this PR* - [ ] CI --------- Co-authored-by: rekhoff <[email protected]>
1 parent 98cae26 commit a75a552

File tree

7 files changed

+283
-160
lines changed

7 files changed

+283
-160
lines changed

src/BSATNHelpers.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,27 @@ namespace SpacetimeDB
55
{
66
public static class BSATNHelpers
77
{
8+
/// <summary>
9+
/// Decode an element of a BSATN-serializable type from a list of bytes.
10+
///
11+
/// This method performs several allocations. Prefer calling <c>IStructuralReadWrite.Read<T>(BinaryReader)</c> when
12+
/// deserializing many items from a buffer.
13+
/// </summary>
14+
/// <typeparam name="T"></typeparam>
15+
/// <param name="bsatn"></param>
16+
/// <returns></returns>
817
public static T Decode<T>(System.Collections.Generic.List<byte> bsatn) where T : IStructuralReadWrite, new() =>
918
Decode<T>(bsatn.ToArray());
1019

20+
/// <summary>
21+
/// Decode an element of a BSATN-serializable type from a byte array.
22+
///
23+
/// This method performs several allocations. Prefer calling <c>IStructuralReadWrite.Read<T>(BinaryReader)</c> when
24+
/// deserializing many items from a buffer.
25+
/// </summary>
26+
/// <typeparam name="T"></typeparam>
27+
/// <param name="bsatn"></param>
28+
/// <returns></returns>
1129
public static T Decode<T>(byte[] bsatn) where T : IStructuralReadWrite, new()
1230
{
1331
using var stream = new MemoryStream(bsatn);

src/ListStream.cs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using SpacetimeDB;
5+
6+
/// <summary>
7+
/// A stream that reads from an underlying list.
8+
///
9+
/// Uses one less allocation than converting to a byte array and building a MemoryStream.
10+
/// </summary>
11+
internal class ListStream : Stream
12+
{
13+
private List<byte> list;
14+
private int pos;
15+
16+
public ListStream(List<byte> data)
17+
{
18+
this.list = data;
19+
this.pos = 0;
20+
}
21+
22+
public override bool CanRead => true;
23+
24+
public override bool CanSeek => true;
25+
26+
public override bool CanWrite => false;
27+
28+
public override long Length => list.Count;
29+
30+
public override long Position { get => pos; set => pos = (int)value; }
31+
32+
public override void Flush()
33+
{
34+
// do nothing
35+
}
36+
37+
public override int Read(byte[] buffer, int offset, int count)
38+
{
39+
int listPos = pos;
40+
int listEnd = Math.Min(list.Count, listPos + count);
41+
int bufPos = offset;
42+
int bufLength = buffer.Length;
43+
for (; listPos < listEnd && bufPos < bufLength; listPos++, bufPos++)
44+
{
45+
buffer[bufPos] = list[listPos];
46+
}
47+
pos = listPos;
48+
return bufPos - offset;
49+
}
50+
51+
public override int Read(Span<byte> buffer)
52+
{
53+
int listPos = pos;
54+
int listLength = list.Count;
55+
int bufPos = 0;
56+
int bufLength = buffer.Length;
57+
for (; listPos < listLength && bufPos < bufLength; listPos++, bufPos++)
58+
{
59+
buffer[bufPos] = list[listPos];
60+
}
61+
pos = listPos;
62+
return bufPos;
63+
}
64+
65+
public override long Seek(long offset, SeekOrigin origin)
66+
{
67+
switch (origin)
68+
{
69+
case SeekOrigin.Begin:
70+
pos = (int)offset;
71+
break;
72+
case SeekOrigin.Current:
73+
pos += (int)offset;
74+
break;
75+
case SeekOrigin.End:
76+
pos = (int)(Length + offset);
77+
break;
78+
}
79+
return pos;
80+
}
81+
82+
public override void SetLength(long value)
83+
{
84+
throw new System.NotSupportedException();
85+
}
86+
87+
public override void Write(byte[] buffer, int offset, int count)
88+
{
89+
throw new System.NotSupportedException();
90+
}
91+
}

src/ListStream.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/SpacetimeDBClient.cs

Lines changed: 56 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using SpacetimeDB.Internal;
1212
using SpacetimeDB.ClientApi;
1313
using Thread = System.Threading.Thread;
14+
using System.Diagnostics;
1415

1516

1617
namespace SpacetimeDB
@@ -211,11 +212,10 @@ struct UnprocessedMessage
211212

212213
struct ProcessedDatabaseUpdate
213214
{
214-
// Map: table handles -> (primary key -> DbValue).
215-
// If a particular table has no primary key, the "primary key" is just a byte[]
216-
// storing the BSATN encoding of the row.
217-
// See Decode(...).
218-
public Dictionary<IRemoteTableHandle, MultiDictionaryDelta<object, DbValue>> Updates;
215+
// Map: table handles -> (primary key -> IStructuralReadWrite).
216+
// If a particular table has no primary key, the "primary key" is just the row itself.
217+
// This is valid because any [SpacetimeDB.Type] automatically has a correct Equals and HashSet implementation.
218+
public Dictionary<IRemoteTableHandle, MultiDictionaryDelta<object, IStructuralReadWrite>> Updates;
219219

220220
// Can't override the default constructor. Make sure you use this one!
221221
public static ProcessedDatabaseUpdate New()
@@ -225,13 +225,11 @@ public static ProcessedDatabaseUpdate New()
225225
return result;
226226
}
227227

228-
public MultiDictionaryDelta<object, DbValue> DeltaForTable(IRemoteTableHandle table)
228+
public MultiDictionaryDelta<object, IStructuralReadWrite> DeltaForTable(IRemoteTableHandle table)
229229
{
230230
if (!Updates.TryGetValue(table, out var delta))
231231
{
232-
// Make sure we use GenericEqualityComparer here, since it handles byte[]s and arbitrary primary key types
233-
// correctly.
234-
delta = new MultiDictionaryDelta<object, DbValue>(GenericEqualityComparer.Instance, DbValueComparer.Instance);
232+
delta = new MultiDictionaryDelta<object, IStructuralReadWrite>(EqualityComparer<object>.Default, EqualityComparer<object>.Default);
235233
Updates[table] = delta;
236234
}
237235

@@ -265,18 +263,20 @@ struct ProcessedMessage
265263
/// If not, the BSATN for the entire row is used instead.
266264
/// </summary>
267265
/// <param name="table"></param>
268-
/// <param name="bin"></param>
266+
/// <param name="reader"></param>
269267
/// <param name="primaryKey"></param>
270268
/// <returns></returns>
271-
static DbValue Decode(IRemoteTableHandle table, byte[] bin, out object primaryKey)
269+
static IStructuralReadWrite Decode(IRemoteTableHandle table, BinaryReader reader, out object primaryKey)
272270
{
273-
var obj = table.DecodeValue(bin);
271+
var obj = table.DecodeValue(reader);
272+
274273
// TODO(1.1): we should exhaustively check that GenericEqualityComparer works
275274
// for all types that are allowed to be primary keys.
276275
var primaryKey_ = table.GetPrimaryKey(obj);
277-
primaryKey_ ??= bin;
276+
primaryKey_ ??= obj;
278277
primaryKey = primaryKey_;
279-
return new(obj, bin);
278+
279+
return obj;
280280
}
281281

282282
private static readonly Status Committed = new Status.Committed(default);
@@ -353,24 +353,28 @@ private static QueryUpdate DecompressDecodeQueryUpdate(CompressableQueryUpdate u
353353
return new QueryUpdate.BSATN().Read(new BinaryReader(memoryStream));
354354
}
355355

356-
private static IEnumerable<byte[]> BsatnRowListIter(BsatnRowList list)
357-
{
358-
var rowsData = list.RowsData;
359356

360-
return list.SizeHint switch
361-
{
362-
RowSizeHint.FixedSize(var size) => Enumerable
363-
.Range(0, rowsData.Count / size)
364-
.Select(index => rowsData.Skip(index * size).Take(size).ToArray()),
365-
366-
RowSizeHint.RowOffsets(var offsets) => offsets.Zip(
367-
offsets.Skip(1).Append((ulong)rowsData.Count),
368-
(start, end) => rowsData.Take((int)end).Skip((int)start).ToArray()
369-
),
370-
371-
_ => throw new InvalidOperationException("Unknown RowSizeHint variant"),
372-
};
373-
}
357+
/// <summary>
358+
/// Prepare to read a BsatnRowList.
359+
///
360+
/// This could return an IEnumerable, but we return the reader and row count directly to avoid an allocation.
361+
/// It is legitimate to repeatedly call <c>IStructuralReadWrite.Read<T></c> <c>rowCount</c> times on the resulting
362+
/// BinaryReader:
363+
/// Our decoding infrastructure guarantees that reading a value consumes the correct number of bytes
364+
/// from the BinaryReader. (This is easy because BSATN doesn't have padding.)
365+
/// </summary>
366+
/// <param name="list"></param>
367+
/// <returns>A reader for the rows of the list and a count of rows.</returns>
368+
private static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList list) =>
369+
(
370+
new BinaryReader(new ListStream(list.RowsData)),
371+
list.SizeHint switch
372+
{
373+
RowSizeHint.FixedSize(var size) => list.RowsData.Count / size,
374+
RowSizeHint.RowOffsets(var offsets) => offsets.Count,
375+
_ => throw new NotImplementedException()
376+
}
377+
);
374378

375379
#if UNITY_WEBGL && !UNITY_EDITOR
376380
IEnumerator PreProcessMessages()
@@ -455,9 +459,10 @@ void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, Pro
455459
{
456460
Log.Warn("Non-insert during an insert-only server message!");
457461
}
458-
foreach (var bin in BsatnRowListIter(qu.Inserts))
462+
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
463+
for (var i = 0; i < insertRowCount; i++)
459464
{
460-
var obj = Decode(table, bin, out var pk);
465+
var obj = Decode(table, insertReader, out var pk);
461466
delta.Add(pk, obj);
462467
}
463468
}
@@ -473,9 +478,11 @@ void PreProcessDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, Pro
473478
{
474479
Log.Warn("Non-delete during a delete-only operation!");
475480
}
476-
foreach (var bin in BsatnRowListIter(qu.Deletes))
481+
482+
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
483+
for (var i = 0; i < deleteRowCount; i++)
477484
{
478-
var obj = Decode(table, bin, out var pk);
485+
var obj = Decode(table, deleteReader, out var pk);
479486
delta.Remove(pk, obj);
480487
}
481488
}
@@ -491,14 +498,17 @@ void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedData
491498
// Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once
492499
// to the table, it doesn't matter that we call Add before Remove here.
493500

494-
foreach (var bin in BsatnRowListIter(qu.Inserts))
501+
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
502+
for (var i = 0; i < insertRowCount; i++)
495503
{
496-
var obj = Decode(table, bin, out var pk);
504+
var obj = Decode(table, insertReader, out var pk);
497505
delta.Add(pk, obj);
498506
}
499-
foreach (var bin in BsatnRowListIter(qu.Deletes))
507+
508+
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
509+
for (var i = 0; i < deleteRowCount; i++)
500510
{
501-
var obj = Decode(table, bin, out var pk);
511+
var obj = Decode(table, deleteReader, out var pk);
502512
delta.Remove(pk, obj);
503513
}
504514
}
@@ -1002,9 +1012,13 @@ T[] LogAndThrow(string error)
10021012
return LogAndThrow($"Mismatched result type, expected {typeof(T)} but got {resultTable.TableName}");
10031013
}
10041014

1005-
return BsatnRowListIter(resultTable.Rows)
1006-
.Select(BSATNHelpers.Decode<T>)
1007-
.ToArray();
1015+
var (resultReader, resultCount) = ParseRowList(resultTable.Rows);
1016+
var output = new T[resultCount];
1017+
for (int i = 0; i < resultCount; i++)
1018+
{
1019+
output[i] = IStructuralReadWrite.Read<T>(resultReader);
1020+
}
1021+
return output;
10081022
}
10091023

10101024
public bool IsActive => webSocket.IsConnected;
@@ -1056,32 +1070,4 @@ public uint Next()
10561070
return lastAllocated;
10571071
}
10581072
}
1059-
internal readonly struct DbValue
1060-
{
1061-
public readonly IStructuralReadWrite value;
1062-
public readonly byte[] bytes;
1063-
1064-
public DbValue(IStructuralReadWrite value, byte[] bytes)
1065-
{
1066-
this.value = value;
1067-
this.bytes = bytes;
1068-
}
1069-
1070-
// TODO: having a nice ToString here would give better way better errors when applying table deltas,
1071-
// but it's tricky to do that generically.
1072-
}
1073-
1074-
/// <summary>
1075-
/// DbValue comparer that uses BSATN-encoded records to compare DbValues for equality.
1076-
/// </summary>
1077-
internal readonly struct DbValueComparer : IEqualityComparer<DbValue>
1078-
{
1079-
public static DbValueComparer Instance = new();
1080-
1081-
public bool Equals(DbValue x, DbValue y) =>
1082-
ByteArrayComparer.Instance.Equals(x.bytes, y.bytes);
1083-
1084-
public int GetHashCode(DbValue obj) =>
1085-
ByteArrayComparer.Instance.GetHashCode(obj.bytes);
1086-
}
10871073
}

0 commit comments

Comments
 (0)