diff --git a/sdks/csharp/src/SpacetimeDBClient.cs b/sdks/csharp/src/SpacetimeDBClient.cs index b0303b2460a..58c6d90b82c 100644 --- a/sdks/csharp/src/SpacetimeDBClient.cs +++ b/sdks/csharp/src/SpacetimeDBClient.cs @@ -387,6 +387,80 @@ private static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList lis } ); + /// + /// A collection of updates to the same table that need to be pre-processed. + /// This is the unit of work that is spread across worker threads. + /// + /// It is fairly coarse-grained -- which means latency depends on the table with the most updates. + /// + /// We actually could reduce latency further by doing more fine-grained parallelism -- + /// splitting up row parsing within a TableUpdate using the RowSizeHint data in a BsatnRowList. + /// Probably we would want to split large messages into chunks, but only large messages. + /// However, if we do this, we'd need to have data from multiple threads aggregated into a single + /// MultiDictionaryDelta. To do this, we'd need to: + /// - make MultiDictionaryDelta a synchronized data structure + /// - or, add methods to MultiDictionaryDelta that allow them to be combined after being + /// produced independently. + /// + /// I'm not comfortable doing either of these while MultiDictionaryDelta is as complicated as it is. + /// Commit ba9f3be made MultiDictionary so complicated because we needed to handle a weird edge case. + /// https://github.com/clockworklabs/SpacetimeDB/pull/2654 made the edge-case impossible server-side, + /// so once it has been released for a while, we could consider reverting ba9f3be and doing fancier parallelism here. + /// + internal struct UpdatesToPreProcess + { + /// + /// The table handle to use to parse rows. + /// You should only use this in a thread-safe way. + /// + public IRemoteTableHandle Table; + + /// + /// The updates to parse. + /// There may be multiple TableUpdates corresponding to a single table. + /// (Each TableUpdate then contains multiple CompressableQueryUpdates. + /// Unfortunately, these are compressed independently due to serverside limitations.) + /// + public List Updates; + + /// + /// The delta to fill with data. Starts out empty. + /// This delta is also stored in a ProcessedDatabaseUpdate, so modifying it + /// directly modifies a ProcessedDatabaseUpdate that needs to be filled in with data. + /// + public MultiDictionaryDelta Delta; + } + + /// + /// An optimized ForEach, depending on how many bytes we need to process. + /// For small messages, avoid the overhead from parallelizing. + /// + /// + /// The enumerator to consume. + /// The action to perform for each element of the enumerator. + /// The number of bytes in the compressed message. + /// The threshold for whether to parse updates on multiple threads. + static void FastParallelForEach( + IEnumerable values, + Action action, + int bytes, + int enoughBytesToParallelize = 32_000 + ) + { + if (bytes >= enoughBytesToParallelize) + { + Parallel.ForEach(values, action); + } + else + { + foreach (var value in values) + { + action(value); + } + } + + } + #if UNITY_WEBGL && !UNITY_EDITOR IEnumerator PreProcessMessages() #else @@ -416,8 +490,11 @@ void PreProcessMessages() } } - IEnumerable<(IRemoteTableHandle, TableUpdate)> GetTables(DatabaseUpdate updates) + IEnumerable GetUpdatesToPreProcess(DatabaseUpdate updates, ProcessedDatabaseUpdate dbOps) { + // There may be multiple TableUpdates corresponding to a single table in a DatabaseUpdate. + // Preemptively group them. + Dictionary tableToUpdates = new(updates.Tables.Count); foreach (var update in updates.Tables) { var tableName = update.TableName; @@ -427,21 +504,34 @@ void PreProcessMessages() Log.Error($"Unknown table name: {tableName}"); continue; } - yield return (table, update); + if (tableToUpdates.ContainsKey(table)) + { + tableToUpdates[table].Updates.Add(update); + } + else + { + var delta = dbOps.DeltaForTable(table); + tableToUpdates[table] = new() + { + Table = table, + Updates = new() { update }, + Delta = delta + }; + } } + + return tableToUpdates.Values; } - ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub) + + ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub, int messageBytes) { var dbOps = ProcessedDatabaseUpdate.New(); - // This is all of the inserts - int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows); - // First apply all of the state - foreach (var (table, update) in GetTables(initSub.DatabaseUpdate)) + FastParallelForEach(GetUpdatesToPreProcess(initSub.DatabaseUpdate, dbOps), (todo) => { - PreProcessInsertOnlyTable(table, update, dbOps); - } + PreProcessInsertOnlyTable(todo); + }, bytes: messageBytes); return dbOps; } @@ -449,103 +539,107 @@ ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub /// TODO: the dictionary is here for backwards compatibility and can be removed /// once we get rid of legacy subscriptions. /// - ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied) + ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied, int messageBytes) { var dbOps = ProcessedDatabaseUpdate.New(); - foreach (var (table, update) in GetTables(subscribeMultiApplied.Update)) + FastParallelForEach(GetUpdatesToPreProcess(subscribeMultiApplied.Update, dbOps), (todo) => { - PreProcessInsertOnlyTable(table, update, dbOps); - } + PreProcessInsertOnlyTable(todo); + }, bytes: messageBytes); return dbOps; } - void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + void PreProcessInsertOnlyTable(UpdatesToPreProcess todo) { - var delta = dbOps.DeltaForTable(table); - - foreach (var cqu in update.Updates) + foreach (var update in todo.Updates) { - var qu = DecompressDecodeQueryUpdate(cqu); - if (qu.Deletes.RowsData.Count > 0) - { - Log.Warn("Non-insert during an insert-only server message!"); - } - var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); - for (var i = 0; i < insertRowCount; i++) + foreach (var cqu in update.Updates) { - var obj = Decode(table, insertReader, out var pk); - delta.Add(pk, obj); + var qu = DecompressDecodeQueryUpdate(cqu); + if (qu.Deletes.RowsData.Count > 0) + { + Log.Warn("Non-insert during an insert-only server message!"); + } + var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); + for (var i = 0; i < insertRowCount; i++) + { + var obj = Decode(todo.Table, insertReader, out var pk); + todo.Delta.Add(pk, obj); + } } } } - void PreProcessDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + void PreProcessDeleteOnlyTable(UpdatesToPreProcess todo) { - var delta = dbOps.DeltaForTable(table); - foreach (var cqu in update.Updates) + foreach (var update in todo.Updates) { - var qu = DecompressDecodeQueryUpdate(cqu); - if (qu.Inserts.RowsData.Count > 0) + foreach (var cqu in update.Updates) { - Log.Warn("Non-delete during a delete-only operation!"); - } + var qu = DecompressDecodeQueryUpdate(cqu); + if (qu.Inserts.RowsData.Count > 0) + { + Log.Warn("Non-delete during a delete-only operation!"); + } - var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); - for (var i = 0; i < deleteRowCount; i++) - { - var obj = Decode(table, deleteReader, out var pk); - delta.Remove(pk, obj); + var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); + for (var i = 0; i < deleteRowCount; i++) + { + var obj = Decode(todo.Table, deleteReader, out var pk); + todo.Delta.Remove(pk, obj); + } } } } - void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + void PreProcessTable(UpdatesToPreProcess todo) { - var delta = dbOps.DeltaForTable(table); - foreach (var cqu in update.Updates) + foreach (var update in todo.Updates) { - var qu = DecompressDecodeQueryUpdate(cqu); + foreach (var compressableQueryUpdate in update.Updates) + { + var qu = DecompressDecodeQueryUpdate(compressableQueryUpdate); - // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once - // to the table, it doesn't matter that we call Add before Remove here. + // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once + // to the table, it doesn't matter that we call Add before Remove here. - var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); - for (var i = 0; i < insertRowCount; i++) - { - var obj = Decode(table, insertReader, out var pk); - delta.Add(pk, obj); - } + var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); + for (var i = 0; i < insertRowCount; i++) + { + var obj = Decode(todo.Table, insertReader, out var pk); + todo.Delta.Add(pk, obj); + } - var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); - for (var i = 0; i < deleteRowCount; i++) - { - var obj = Decode(table, deleteReader, out var pk); - delta.Remove(pk, obj); + var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); + for (var i = 0; i < deleteRowCount; i++) + { + var obj = Decode(todo.Table, deleteReader, out var pk); + todo.Delta.Remove(pk, obj); + } } } - } - ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied) + ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied, int messageBytes) { var dbOps = ProcessedDatabaseUpdate.New(); - foreach (var (table, update) in GetTables(unsubMultiApplied.Update)) + FastParallelForEach(GetUpdatesToPreProcess(unsubMultiApplied.Update, dbOps), (todo) => { - PreProcessDeleteOnlyTable(table, update, dbOps); - } + PreProcessDeleteOnlyTable(todo); + }, bytes: messageBytes); return dbOps; } - ProcessedDatabaseUpdate PreProcessDatabaseUpdate(DatabaseUpdate updates) + ProcessedDatabaseUpdate PreProcessDatabaseUpdate(DatabaseUpdate updates, int messageBytes) { var dbOps = ProcessedDatabaseUpdate.New(); - foreach (var (table, update) in GetTables(updates)) + FastParallelForEach(GetUpdatesToPreProcess(updates, dbOps), (todo) => { - PreProcessTable(table, update, dbOps); - } + PreProcessTable(todo); + }, bytes: messageBytes); return dbOps; } @@ -574,12 +668,12 @@ ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) switch (message) { case ServerMessage.InitialSubscription(var initSub): - dbOps = PreProcessLegacySubscription(initSub); + dbOps = PreProcessLegacySubscription(initSub, unprocessed.bytes.Length); break; case ServerMessage.SubscribeApplied(var subscribeApplied): break; case ServerMessage.SubscribeMultiApplied(var subscribeMultiApplied): - dbOps = PreProcessSubscribeMultiApplied(subscribeMultiApplied); + dbOps = PreProcessSubscribeMultiApplied(subscribeMultiApplied, unprocessed.bytes.Length); break; case ServerMessage.SubscriptionError(var subscriptionError): // do nothing; main thread will warn. @@ -588,7 +682,7 @@ ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) // do nothing; main thread will warn. break; case ServerMessage.UnsubscribeMultiApplied(var unsubscribeMultiApplied): - dbOps = PreProcessUnsubscribeMultiApplied(unsubscribeMultiApplied); + dbOps = PreProcessUnsubscribeMultiApplied(unsubscribeMultiApplied, unprocessed.bytes.Length); break; case ServerMessage.TransactionUpdate(var transactionUpdate): // Convert the generic event arguments in to a domain specific event object @@ -617,11 +711,11 @@ ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) if (transactionUpdate.Status is UpdateStatus.Committed(var committed)) { - dbOps = PreProcessDatabaseUpdate(committed); + dbOps = PreProcessDatabaseUpdate(committed, unprocessed.bytes.Length); } break; case ServerMessage.TransactionUpdateLight(var update): - dbOps = PreProcessDatabaseUpdate(update.Update); + dbOps = PreProcessDatabaseUpdate(update.Update, unprocessed.bytes.Length); break; case ServerMessage.IdentityToken(var identityToken): break; @@ -904,7 +998,14 @@ private void OnMessageProcessComplete(ProcessedMessage processed) } } - // Note: this method is called from unit tests. + /// + /// Callback for receiving a message from the websocket. + /// Note: this method is invoked on the websocket thread, not on the main thread. + /// That's fine, since all it does is push a message to a queue. + /// Note: this method is called from unit tests. + /// + /// + /// internal void OnMessageReceived(byte[] bytes, DateTime timestamp) => _messageQueue.Add(new UnprocessedMessage { bytes = bytes, timestamp = timestamp });