Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 171 additions & 70 deletions sdks/csharp/src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,80 @@ private static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList lis
}
);

/// <summary>
/// A collection of updates to the same table that need to be pre-processed.
/// This is the unit of work that is spread across worker threads.
///
/// It is fairly coarse-grained -- which means latency depends on the table with the most updates.
///
/// We actually could reduce latency further by doing more fine-grained parallelism --
/// splitting up row parsing within a TableUpdate using the RowSizeHint data in a BsatnRowList.
/// Probably we would want to split large messages into chunks, but only large messages.
/// However, if we do this, we'd need to have data from multiple threads aggregated into a single
/// MultiDictionaryDelta. To do this, we'd need to:
/// - make MultiDictionaryDelta a synchronized data structure
/// - or, add methods to MultiDictionaryDelta that allow them to be combined after being
/// produced independently.
///
/// I'm not comfortable doing either of these while MultiDictionaryDelta is as complicated as it is.
/// Commit ba9f3be made MultiDictionary so complicated because we needed to handle a weird edge case.
/// https://github.com/clockworklabs/SpacetimeDB/pull/2654 made the edge-case impossible server-side,
/// so once it has been released for a while, we could consider reverting ba9f3be and doing fancier parallelism here.
/// </summary>
internal struct UpdatesToPreProcess
{
/// <summary>
/// The table handle to use to parse rows.
/// You should only use this in a thread-safe way.
/// </summary>
public IRemoteTableHandle Table;

/// <summary>
/// The updates to parse.
/// There may be multiple TableUpdates corresponding to a single table.
/// (Each TableUpdate then contains multiple CompressableQueryUpdates.
/// Unfortunately, these are compressed independently due to serverside limitations.)
/// </summary>
public List<TableUpdate> Updates;

/// <summary>
/// The delta to fill with data. Starts out empty.
/// This delta is also stored in a ProcessedDatabaseUpdate, so modifying it
/// directly modifies a ProcessedDatabaseUpdate that needs to be filled in with data.
/// </summary>
public MultiDictionaryDelta<object, PreHashedRow> Delta;
}

/// <summary>
/// An optimized ForEach, depending on how many bytes we need to process.
/// For small messages, avoid the overhead from parallelizing.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="values">The enumerator to consume.</param>
/// <param name="action">The action to perform for each element of the enumerator.</param>
/// <param name="bytes">The number of bytes in the compressed message.</param>
/// <param name="enoughBytesToParallelize">The threshold for whether to parse updates on multiple threads.</param>
static void FastParallelForEach<T>(
IEnumerable<T> values,
Action<T> action,
int bytes,
int enoughBytesToParallelize = 32_000
)
{
if (bytes >= enoughBytesToParallelize)
{
Parallel.ForEach(values, action);
}
else
{
foreach (var value in values)
{
action(value);
}
}

}

#if UNITY_WEBGL && !UNITY_EDITOR
IEnumerator PreProcessMessages()
#else
Expand Down Expand Up @@ -416,8 +490,11 @@ void PreProcessMessages()
}
}

IEnumerable<(IRemoteTableHandle, TableUpdate)> GetTables(DatabaseUpdate updates)
IEnumerable<UpdatesToPreProcess> GetUpdatesToPreProcess(DatabaseUpdate updates, ProcessedDatabaseUpdate dbOps)
{
// There may be multiple TableUpdates corresponding to a single table in a DatabaseUpdate.
// Preemptively group them.
Dictionary<IRemoteTableHandle, UpdatesToPreProcess> tableToUpdates = new(updates.Tables.Count);
foreach (var update in updates.Tables)
{
var tableName = update.TableName;
Expand All @@ -427,125 +504,142 @@ void PreProcessMessages()
Log.Error($"Unknown table name: {tableName}");
continue;
}
yield return (table, update);
if (tableToUpdates.ContainsKey(table))
{
tableToUpdates[table].Updates.Add(update);
}
else
{
var delta = dbOps.DeltaForTable(table);
tableToUpdates[table] = new()
{
Table = table,
Updates = new() { update },
Delta = delta
};
}
}

return tableToUpdates.Values;
}

ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub)

ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub, int messageBytes)
{
var dbOps = ProcessedDatabaseUpdate.New();
// This is all of the inserts
int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows);

// First apply all of the state
foreach (var (table, update) in GetTables(initSub.DatabaseUpdate))
FastParallelForEach(GetUpdatesToPreProcess(initSub.DatabaseUpdate, dbOps), (todo) =>
{
PreProcessInsertOnlyTable(table, update, dbOps);
}
PreProcessInsertOnlyTable(todo);
}, bytes: messageBytes);
return dbOps;
}

/// <summary>
/// TODO: the dictionary is here for backwards compatibility and can be removed
/// once we get rid of legacy subscriptions.
/// </summary>
ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied)
ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied, int messageBytes)
{
var dbOps = ProcessedDatabaseUpdate.New();
foreach (var (table, update) in GetTables(subscribeMultiApplied.Update))
FastParallelForEach(GetUpdatesToPreProcess(subscribeMultiApplied.Update, dbOps), (todo) =>
{
PreProcessInsertOnlyTable(table, update, dbOps);
}
PreProcessInsertOnlyTable(todo);
}, bytes: messageBytes);
return dbOps;
}

void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps)
void PreProcessInsertOnlyTable(UpdatesToPreProcess todo)
{
var delta = dbOps.DeltaForTable(table);

foreach (var cqu in update.Updates)
foreach (var update in todo.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Deletes.RowsData.Count > 0)
{
Log.Warn("Non-insert during an insert-only server message!");
}
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
foreach (var cqu in update.Updates)
{
var obj = Decode(table, insertReader, out var pk);
delta.Add(pk, obj);
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Deletes.RowsData.Count > 0)
{
Log.Warn("Non-insert during an insert-only server message!");
}
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
{
var obj = Decode(todo.Table, insertReader, out var pk);
todo.Delta.Add(pk, obj);
}
}
}
}

void PreProcessDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps)
void PreProcessDeleteOnlyTable(UpdatesToPreProcess todo)
{
var delta = dbOps.DeltaForTable(table);
foreach (var cqu in update.Updates)
foreach (var update in todo.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Inserts.RowsData.Count > 0)
foreach (var cqu in update.Updates)
{
Log.Warn("Non-delete during a delete-only operation!");
}
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Inserts.RowsData.Count > 0)
{
Log.Warn("Non-delete during a delete-only operation!");
}

var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(table, deleteReader, out var pk);
delta.Remove(pk, obj);
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(todo.Table, deleteReader, out var pk);
todo.Delta.Remove(pk, obj);
}
}
}
}

void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps)
void PreProcessTable(UpdatesToPreProcess todo)
{
var delta = dbOps.DeltaForTable(table);
foreach (var cqu in update.Updates)
foreach (var update in todo.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
foreach (var compressableQueryUpdate in update.Updates)
{
var qu = DecompressDecodeQueryUpdate(compressableQueryUpdate);

// Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once
// to the table, it doesn't matter that we call Add before Remove here.
// Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once
// to the table, it doesn't matter that we call Add before Remove here.

var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
{
var obj = Decode(table, insertReader, out var pk);
delta.Add(pk, obj);
}
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
{
var obj = Decode(todo.Table, insertReader, out var pk);
todo.Delta.Add(pk, obj);
}

var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(table, deleteReader, out var pk);
delta.Remove(pk, obj);
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(todo.Table, deleteReader, out var pk);
todo.Delta.Remove(pk, obj);
}
}
}

}

ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied)
ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied, int messageBytes)
{
var dbOps = ProcessedDatabaseUpdate.New();

foreach (var (table, update) in GetTables(unsubMultiApplied.Update))
FastParallelForEach(GetUpdatesToPreProcess(unsubMultiApplied.Update, dbOps), (todo) =>
{
PreProcessDeleteOnlyTable(table, update, dbOps);
}
PreProcessDeleteOnlyTable(todo);
}, bytes: messageBytes);

return dbOps;
}

ProcessedDatabaseUpdate PreProcessDatabaseUpdate(DatabaseUpdate updates)
ProcessedDatabaseUpdate PreProcessDatabaseUpdate(DatabaseUpdate updates, int messageBytes)
{
var dbOps = ProcessedDatabaseUpdate.New();

foreach (var (table, update) in GetTables(updates))
FastParallelForEach(GetUpdatesToPreProcess(updates, dbOps), (todo) =>
{
PreProcessTable(table, update, dbOps);
}
PreProcessTable(todo);
}, bytes: messageBytes);
return dbOps;
}

Expand Down Expand Up @@ -574,12 +668,12 @@ ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed)
switch (message)
{
case ServerMessage.InitialSubscription(var initSub):
dbOps = PreProcessLegacySubscription(initSub);
dbOps = PreProcessLegacySubscription(initSub, unprocessed.bytes.Length);
break;
case ServerMessage.SubscribeApplied(var subscribeApplied):
break;
case ServerMessage.SubscribeMultiApplied(var subscribeMultiApplied):
dbOps = PreProcessSubscribeMultiApplied(subscribeMultiApplied);
dbOps = PreProcessSubscribeMultiApplied(subscribeMultiApplied, unprocessed.bytes.Length);
break;
case ServerMessage.SubscriptionError(var subscriptionError):
// do nothing; main thread will warn.
Expand All @@ -588,7 +682,7 @@ ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed)
// do nothing; main thread will warn.
break;
case ServerMessage.UnsubscribeMultiApplied(var unsubscribeMultiApplied):
dbOps = PreProcessUnsubscribeMultiApplied(unsubscribeMultiApplied);
dbOps = PreProcessUnsubscribeMultiApplied(unsubscribeMultiApplied, unprocessed.bytes.Length);
break;
case ServerMessage.TransactionUpdate(var transactionUpdate):
// Convert the generic event arguments in to a domain specific event object
Expand Down Expand Up @@ -617,11 +711,11 @@ ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed)

if (transactionUpdate.Status is UpdateStatus.Committed(var committed))
{
dbOps = PreProcessDatabaseUpdate(committed);
dbOps = PreProcessDatabaseUpdate(committed, unprocessed.bytes.Length);
}
break;
case ServerMessage.TransactionUpdateLight(var update):
dbOps = PreProcessDatabaseUpdate(update.Update);
dbOps = PreProcessDatabaseUpdate(update.Update, unprocessed.bytes.Length);
break;
case ServerMessage.IdentityToken(var identityToken):
break;
Expand Down Expand Up @@ -904,7 +998,14 @@ private void OnMessageProcessComplete(ProcessedMessage processed)
}
}

// Note: this method is called from unit tests.
/// <summary>
/// Callback for receiving a message from the websocket.
/// Note: this method is invoked on the websocket thread, not on the main thread.
/// That's fine, since all it does is push a message to a queue.
/// Note: this method is called from unit tests.
/// </summary>
/// <param name="bytes"></param>
/// <param name="timestamp"></param>
internal void OnMessageReceived(byte[] bytes, DateTime timestamp) =>
_messageQueue.Add(new UnprocessedMessage { bytes = bytes, timestamp = timestamp });

Expand Down