diff --git a/sdks/csharp/examples~/multiplicity/client/Model.cs b/sdks/csharp/examples~/multiplicity/client/Model.cs new file mode 100644 index 00000000000..29d1d2b6b10 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/Model.cs @@ -0,0 +1,164 @@ +using SpacetimeDB.Types; + +namespace client; + +class Model +{ + public HashSet ExpectedServerDogs = new HashSet(); + public HashSet ExpectedServerCats = new HashSet(); + + public HashSet ExpectedClientDogs = new HashSet(); + public HashSet ExpectedClientCats = new HashSet(); + + public void AddDog(Dog dog) + { + ExpectedServerDogs.Add(dog); + } + + public void UpdateDog(Dog dog) + { + if (!ContainsDog(dog.Name)) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"No Dog with Name:{dog.Name} found in local model. Cannot update local model."); + Console.ForegroundColor = ConsoleColor.White; + return; + } + foreach (Dog existingDog in ExpectedServerDogs) + { + if (existingDog.Name == dog.Name) + { + existingDog.Name = dog.Name; + existingDog.Color = dog.Color; + existingDog.Age = dog.Age; + } + } + } + + public void RemoveDog(Dog dog) + { + ExpectedServerDogs.Remove(dog); + } + + public void RemoveDog(string name) + { + foreach (Dog dog in ExpectedServerDogs) + { + if (dog.Name == name) + { + ExpectedServerDogs.Remove(dog); + } + } + } + + public bool ContainsDog(string name) + { + bool contains = false; + foreach (Dog dog in ExpectedServerDogs) + { + if (dog.Name == name) + { + contains = true; + } + } + return contains; + } + + public bool ContainsDog(string name, string color, uint age, HashSet modelHashSet) + { + bool contains = false; + foreach (Dog dog in modelHashSet) + { + if (dog.Name == name && dog.Color == color && dog.Age == age) + { + contains = true; + } + } + return contains; + } + + public void AddCat(Cat cat) + { + ExpectedServerCats.Add(cat); + } + + public void UpdateCat(Cat cat) + { + if (!ContainsDog(cat.Name)) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"No Dog with Name:{cat.Name} found in local model. Cannot update local model."); + Console.ForegroundColor = ConsoleColor.White; + return; + } + foreach (Dog existingCat in ExpectedServerDogs) + { + if (existingCat.Name == cat.Name) + { + existingCat.Name = cat.Name; + existingCat.Color = cat.Color; + existingCat.Age = cat.Age; + } + } + } + + public void RemoveCat(Cat cat) + { + ExpectedServerCats.Remove(cat); + } + + public void RemoveCat(string name) + { + foreach (Cat cat in ExpectedServerCats) + { + if (cat.Name == name) + { + ExpectedServerCats.Remove(cat); + } + } + } + + public bool ContainsCat(string name) + { + bool contains = false; + foreach (Cat cat in ExpectedServerCats) + { + if (cat.Name == name) + { + contains = true; + } + } + return contains; + } + + public bool ContainsCat(string name, string color, uint age, HashSet modelHashSet) + { + bool contains = false; + foreach (Cat cat in modelHashSet) + { + if (cat.Name == name && cat.Color == color && cat.Age == age) + { + contains = true; + } + } + return contains; + } + + public void OutputExpectedDogs(HashSet modelHashSet) + { + Console.WriteLine("Client dogs:"); + foreach (Dog dog in modelHashSet) + { + Console.WriteLine($" Dog (Name:{dog.Name}, Color:{dog.Color}, Age:{dog.Age})."); + } + } + + public void OutputExpectedCats(HashSet modelHashSet) + { + Console.WriteLine("Client dogs:"); + foreach (Cat cat in modelHashSet) + { + Console.WriteLine($" Cat (Name:{cat.Name}, Color:{cat.Color}, Age:{cat.Age})."); + } + } +} \ No newline at end of file diff --git a/sdks/csharp/examples~/multiplicity/client/Program.cs b/sdks/csharp/examples~/multiplicity/client/Program.cs new file mode 100644 index 00000000000..34daa812954 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/Program.cs @@ -0,0 +1,526 @@ +// Multiplicity Test Client +// This test adds several dogs to the Multiplicity Test Server, +// stores a local model of those dogs, +// creates several connections in different combinations, +// and compares the server-version to the local model. + +using client; +using SpacetimeDB; +using SpacetimeDB.Types; +using System.Collections.Concurrent; + +// Configure Output settings +bool show_on_insert_events_output = true; +bool show_on_update_events_output = true; +bool show_on_delete_events_output = true; +bool show_reducer_events_output = true; + +// Private variables +Model model = new Model(); +// declare a thread safe queue to store commands +var command_queue = new ConcurrentQueue<(string Command, string name, string color, uint age)>(); +bool ready_for_command = false; + +SubscriptionHandle? primaryTestSubscriptionHandle = null; +SubscriptionHandle? secondaryTestSubscriptionHandle = null; + +void Main() +{ + AuthToken.Init(".spacetime_csharp_multiplicity"); + + // Builds and connects to the database + DbConnection? conn = null; + conn = ConnectToDB(); + // Registers callbacks for reducers + RegisterCallbacks(conn); + // Declare a threadsafe cancel token to cancel the process loop + var cancellationTokenSource = new CancellationTokenSource(); + // Spawn a thread to call process updates and process commands + var thread = new Thread(() => ProcessThread(conn, cancellationTokenSource.Token)); + thread.Start(); + // Tests start here + Test1(); + Test2(); + // Handles CLI input + InputLoop(); + // This signals the ProcessThread to stop + cancellationTokenSource.Cancel(); + thread.Join(); +} + +const string HOST = "http://localhost:3000"; +const string DBNAME = "multiplicity"; + +DbConnection ConnectToDB() +{ + DbConnection? conn = null; + conn = DbConnection.Builder() + .WithUri(HOST) + .WithModuleName(DBNAME) + .WithToken(AuthToken.Token) + .OnConnect(OnConnected) + .OnConnectError(OnConnectError) + .OnDisconnect(OnDisconnect) + .Build(); + return conn; +} + +void RegisterCallbacks(DbConnection conn) +{ + conn.Db.Dog.OnInsert += Dog_OnInsert; + conn.Db.Dog.OnUpdate += Dog_OnUpdate; + conn.Db.Dog.OnDelete += Dog_OnDelete; + + conn.Db.Cat.OnInsert += Cat_OnInsert; + conn.Db.Cat.OnUpdate += Cat_OnUpdate; + conn.Db.Cat.OnDelete += Cat_OnDelete; + + conn.Reducers.OnAddDog += Reducer_OnAddDogEvent; + conn.Reducers.OnUpdateDog += Reducer_OnUpdateDogEvent; + conn.Reducers.OnRemoveDog += Reducer_OnRemoveDogEvent; + + conn.Reducers.OnAddCat += Reducer_OnAddCatEvent; + conn.Reducers.OnUpdateCat += Reducer_OnUpdateCatEvent; + conn.Reducers.OnRemoveCat += Reducer_OnRemoveCatEvent; +} + +# region Event Handlers +void Dog_OnInsert(EventContext ctx, Dog insertedValue) +{ + if (show_on_insert_events_output) Console.WriteLine($"EventContext: Dog (Name:{insertedValue.Name}, Color:{insertedValue.Color}, Age:{insertedValue.Age}) inserted."); +} + +void Dog_OnUpdate(EventContext ctx, Dog oldValue, Dog newValue) +{ + if (show_on_update_events_output) Console.WriteLine($"EventContext: Dog (Name:{oldValue.Name}, Color:{oldValue.Color}, Age:{oldValue.Age}) updated to (Name:{newValue.Name}, Color:{newValue.Color}, Age:{newValue.Age})."); +} + +void Dog_OnDelete(EventContext ctx, Dog deletedValue) +{ + if (show_on_delete_events_output) Console.WriteLine($"EventContext: Dog (Name:{deletedValue.Name}, Color:{deletedValue.Color}, Age:{deletedValue.Age}) deleted."); +} + +void Cat_OnInsert(EventContext ctx, Cat insertedValue) +{ + if (show_on_insert_events_output) Console.WriteLine($"EventContext: Cat (Name:{insertedValue.Name}, Color:{insertedValue.Color}, Age:{insertedValue.Age}) inserted."); +} + +void Cat_OnUpdate(EventContext ctx, Cat oldValue, Cat newValue) +{ + if (show_on_update_events_output) Console.WriteLine($"EventContext: Cat (Name:{oldValue.Name}, Color:{oldValue.Color}, Age:{oldValue.Age}) updated to (Name:{newValue.Name}, Color:{newValue.Color}, Age:{newValue.Age})."); +} + +void Cat_OnDelete(EventContext ctx, Cat deletedValue) +{ + if (show_on_delete_events_output) Console.WriteLine($"EventContext: Cat (Name:{deletedValue.Name}, Color:{deletedValue.Color}, Age:{deletedValue.Age}) deleted."); +} +# endregion + +# region Reducer Events + +void Reducer_OnAddDogEvent(ReducerEventContext ctx, string name, string color, uint age) +{ + if (show_reducer_events_output) Console.WriteLine($"ReducerEventContext: Add Event Dog (Name:{name}, Color:{color}, Age:{age}) called. Adding dog to local model."); + model.AddDog(new Dog(name, color, age)); + ready_for_command = true; +} + +void Reducer_OnUpdateDogEvent(ReducerEventContext ctx, string name, string color, uint age) +{ + if (show_reducer_events_output) Console.WriteLine($"ReducerEventContext: Update Event Dog (Name:{name}, Color:{color}, Age:{age}) called. Updating dog in local model."); + model.UpdateDog(new Dog(name, color, age)); + ready_for_command = true; +} + +void Reducer_OnRemoveDogEvent(ReducerEventContext ctx, string name) +{ + if (show_reducer_events_output) Console.WriteLine($"ReducerEventContext: Remove Event Dog (Name:{name}) called. Removing dog from local model."); + if (model.ContainsDog(name)) model.RemoveDog(name); + ready_for_command = true; +} + +void Reducer_OnAddCatEvent(ReducerEventContext ctx, string name, string color, uint age) +{ + if (show_reducer_events_output) Console.WriteLine($"ReducerEventContext: Add Event Cat (Name:{name}, Color:{color}, Age:{age}) called. Adding cat to local model."); + model.AddCat(new Cat(name, color, age)); + ready_for_command = true; +} + +void Reducer_OnUpdateCatEvent(ReducerEventContext ctx, string name, string color, uint age) +{ + if (show_reducer_events_output) Console.WriteLine($"ReducerEventContext: Update Event Cat (Name:{name}, Color:{color}, Age:{age}) called. Updating cat in local model."); + model.UpdateCat(new Cat(name, color, age)); + ready_for_command = true; +} + +void Reducer_OnRemoveCatEvent(ReducerEventContext ctx, string name) +{ + if (show_reducer_events_output) Console.WriteLine($"ReducerEventContext: Remove Event Cat (Name:{name}) called. Removing cat from local model."); + if (model.ContainsCat(name)) model.RemoveCat(name); + ready_for_command = true; +} + +# endregion + +void OnConnected(DbConnection conn, Identity identity, string authToken) +{ + AuthToken.SaveToken(authToken); + + ready_for_command = true; +} + +void OnConnectError(Exception e) +{ + Console.Write($"Error while connecting: {e}"); +} + +void OnDisconnect(DbConnection conn, Exception? e) +{ + if (e != null) + { + Console.Write($"Disconnected abnormally: {e}"); + } else { + Console.Write($"Disconnected normally."); + } +} + +void OnSubscriptionApplied(SubscriptionEventContext ctx) +{ + Console.WriteLine("Subscription Applied"); + ready_for_command = true; +} + +void OutputSubscribedServerDogs(DbConnection conn) +{ + Console.WriteLine("Subscribed Server dogs:"); + foreach (Dog dog in conn.Db.Dog.Iter()) + { + Console.WriteLine($" Dog (Name:{dog.Name}, Color:{dog.Color}, Age:{dog.Age})."); + } +} + +void OutputSubscribedServerCats(DbConnection conn) +{ + Console.WriteLine("Subscribed Server cats:"); + foreach (Cat cat in conn.Db.Cat.Iter()) + { + Console.WriteLine($" Cat (Name:{cat.Name}, Color:{cat.Color}, Age:{cat.Age})."); + } +} + +void CompareEventDogsToModel(DbConnection conn, HashSet modelHashSet) +{ + bool allMatched = true; + Console.WriteLine("Comparing Server dogs to Model:"); + foreach (Dog dog in conn.Db.Dog.Iter()) + { + if (model.ContainsDog(dog.Name, dog.Color, dog.Age, modelHashSet) == false) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"Dog (Name:{dog.Name}, Color:{dog.Color}, Age:{dog.Age}) was missing from local model."); + Console.ForegroundColor = ConsoleColor.White; + allMatched = false; + } + } + + foreach (Dog expectedDog in modelHashSet) + { + bool found = false; + foreach (Dog dog in conn.Db.Dog.Iter()) + { + if (expectedDog.Name == dog.Name && expectedDog.Color == dog.Color && expectedDog.Age == dog.Age) + { + found = true; + break; + } + } + + if (!found) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"Dog (Name:{expectedDog.Name}, Color:{expectedDog.Color}, Age:{expectedDog.Age}) was missing from server model."); + Console.ForegroundColor = ConsoleColor.White; + allMatched = false; + } + } + + if (allMatched) + { + Console.ForegroundColor = ConsoleColor.Green; + Console.WriteLine($"All dogs on server and model are equal."); + Console.ForegroundColor = ConsoleColor.White; + } +} + +void CompareEventCatsToModel(DbConnection conn, HashSet modelHashSet) +{ + bool allMatched = true; + Console.WriteLine("Comparing Server cats to Model:"); + foreach (Cat cat in conn.Db.Cat.Iter()) + { + if (model.ContainsCat(cat.Name, cat.Color, cat.Age, modelHashSet) == false) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"Cat (Name:{cat.Name}, Color:{cat.Color}, Age:{cat.Age}) was missing from local model."); + Console.ForegroundColor = ConsoleColor.White; + allMatched = false; + } + } + + foreach (Cat expectedCat in modelHashSet) + { + bool found = false; + foreach (Cat cat in conn.Db.Cat.Iter()) + { + if (expectedCat.Name == cat.Name && expectedCat.Color == cat.Color && expectedCat.Age == cat.Age) + { + found = true; + break; + } + } + + if (!found) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"Cat (Name:{expectedCat.Name}, Color:{expectedCat.Color}, Age:{expectedCat.Age}) was missing from server model."); + Console.ForegroundColor = ConsoleColor.White; + allMatched = false; + } + } + + if (allMatched) + { + Console.ForegroundColor = ConsoleColor.Green; + Console.WriteLine($"All cats on server and model are equal."); + Console.ForegroundColor = ConsoleColor.White; + } +} + +void ProcessThread(DbConnection conn, CancellationToken ct) +{ + try + { + // loop until cancellation token + while (!ct.IsCancellationRequested) + { + conn.FrameTick(); + + if (ready_for_command) ProcessCommands(conn); + + Thread.Sleep(100); + } + } + finally + { + conn.Disconnect(); + } +} + +void InputLoop() +{ + while (true) + { + var input = Console.ReadLine(); + if (input == null) + { + break; + } + } +} + +void AddStartingDogs() +{ + command_queue.Enqueue(("log", "","", 0)); + command_queue.Enqueue(("log", "Adding Default Dogs","", 0)); + command_queue.Enqueue(("add_dog", "Alpha","Black", 3)); + command_queue.Enqueue(("add_dog", "Beau","Brown", 4)); + command_queue.Enqueue(("add_dog", "Chance","White", 4)); + command_queue.Enqueue(("add_dog", "Dante","Grey", 3)); + command_queue.Enqueue(("add_dog", "Einstein","Brown", 3)); + command_queue.Enqueue(("add_dog", "Foo-Foo","Brown", 2)); + command_queue.Enqueue(("add_dog", "Georgette","White", 3)); + command_queue.Enqueue(("add_dog", "Hansel","Black", 2)); + command_queue.Enqueue(("add_dog", "Isaac","Black", 2)); + command_queue.Enqueue(("add_dog", "Shadow","Golden", 6)); +} + +void RemoveStartingDogs() +{ + command_queue.Enqueue(("log", "","", 0)); + command_queue.Enqueue(("log", "Removing Default Dogs","", 0)); + command_queue.Enqueue(("remove_dog", "Alpha","Black", 3)); + command_queue.Enqueue(("remove_dog", "Beau","Brown", 4)); + command_queue.Enqueue(("remove_dog", "Chance","White", 4)); + command_queue.Enqueue(("remove_dog", "Dante","Grey", 3)); + command_queue.Enqueue(("remove_dog", "Einstein","Brown", 3)); + command_queue.Enqueue(("remove_dog", "Foo-Foo","Brown", 2)); + command_queue.Enqueue(("remove_dog", "Georgette","White", 3)); + command_queue.Enqueue(("remove_dog", "Hansel","Black", 2)); + command_queue.Enqueue(("remove_dog", "Isaac","Black", 2)); + command_queue.Enqueue(("remove_dog", "Shadow","Golden", 6)); +} + +void Test1() +{ + AddStartingDogs(); + command_queue.Enqueue(("log", "","", 0)); + command_queue.Enqueue(("log", "=== Starting test 1: Adding/Removing records for a single Connection Handle with multiple Subscriptions ===","", 0)); + command_queue.Enqueue(("log", "--- Using a string array to subscribe to Only Brown Dogs and Dogs older than 3 ---","", 0)); + command_queue.Enqueue(("subscribe_to_test_1", "","", 0)); + command_queue.Enqueue(("set_client_dogs_test1", "","", 0)); + command_queue.Enqueue(("compare_client_dogs_to_server", "","", 0)); + command_queue.Enqueue(("print_server_dogs", "","", 0)); + command_queue.Enqueue(("print_client_dogs", "","", 0)); + + command_queue.Enqueue(("log", "--- Updating Dog \"Georgette\" to age 4, which should cause Georgette to be included in the model. ---","", 0)); + command_queue.Enqueue(("update_dog", "Georgette","White", 4)); + command_queue.Enqueue(("set_client_dogs_test1", "","", 0)); + command_queue.Enqueue(("compare_client_dogs_to_server", "","", 0)); + command_queue.Enqueue(("print_server_dogs", "","", 0)); + command_queue.Enqueue(("print_client_dogs", "","", 0)); + + command_queue.Enqueue(("log", "--- Updating Dog \"Foo-Foo\" to color \"Grey\", which should remove Foo-Foo from the model. ---","", 0)); + command_queue.Enqueue(("update_dog", "Foo-Foo","Grey", 2)); + command_queue.Enqueue(("set_client_dogs_test1", "","", 0)); + command_queue.Enqueue(("compare_client_dogs_to_server", "","", 0)); + command_queue.Enqueue(("print_server_dogs", "","", 0)); + command_queue.Enqueue(("print_client_dogs", "","", 0)); + + command_queue.Enqueue(("log", "--- Test 1 complete, unsubscribing ---","", 0)); + command_queue.Enqueue(("unsubscribe_to_test_1", "","", 0)); + RemoveStartingDogs(); +} + +void Test2() +{ + AddStartingDogs(); + command_queue.Enqueue(("log", "","", 0)); + command_queue.Enqueue(("log", "=== Starting test 2: Adding/Removing multiple overlapping Connection Handles ===","", 0)); + command_queue.Enqueue(("log", "--- Using one connection handle to subscribe to Only Brown Dogs and another connection handle to subscribe to Dogs older than 3 ---","", 0)); + command_queue.Enqueue(("subscribe_to_test_2", "","", 0)); + command_queue.Enqueue(("set_client_dogs_test2", "","", 0)); + command_queue.Enqueue(("compare_client_dogs_to_server", "","", 0)); + command_queue.Enqueue(("print_server_dogs", "","", 0)); + command_queue.Enqueue(("print_client_dogs", "","", 0)); + + command_queue.Enqueue(("log", "--- Unsubscribing handle of Only Brown Dogs ---","", 0)); + command_queue.Enqueue(("unsubscribe_to_primary_test_2", "","", 0)); + + command_queue.Enqueue(("log", "--- Updating Dog \"Georgette\" to age 4, which should cause Georgette to be included in the model. ---","", 0)); + command_queue.Enqueue(("update_dog", "Georgette","White", 4)); + command_queue.Enqueue(("set_client_dogs_test2", "","", 0)); + command_queue.Enqueue(("compare_client_dogs_to_server", "","", 0)); + command_queue.Enqueue(("print_server_dogs", "","", 0)); + command_queue.Enqueue(("print_client_dogs", "","", 0)); + + command_queue.Enqueue(("log", "--- Updating Dog \"Foo-Foo\" to color \"Grey\", which should remove Foo-Foo from the model. ---","", 0)); + command_queue.Enqueue(("update_dog", "Foo-Foo","Grey", 2)); + command_queue.Enqueue(("set_client_dogs_test2", "","", 0)); + command_queue.Enqueue(("compare_client_dogs_to_server", "","", 0)); + command_queue.Enqueue(("print_server_dogs", "","", 0)); + command_queue.Enqueue(("print_client_dogs", "","", 0)); + + command_queue.Enqueue(("log", "--- Test complete, unsubscribing handle of Dogs older than 3 ---","", 0)); + command_queue.Enqueue(("unsubscribe_to_secondary_test_2", "","", 0)); + RemoveStartingDogs(); +} + +void ProcessCommands(DbConnection conn) +{ + // process command queue + while (ready_for_command == true && command_queue.TryDequeue(out var command)) + { + switch (command.Command) + { + case "log": + Console.WriteLine(command.name); + break; + case "add_dog": + ready_for_command = false; + conn.Reducers.AddDog(command.name, command.color, command.age); + break; + case "add_cat": + ready_for_command = false; + conn.Reducers.AddCat(command.name, command.color, command.age); + break; + case "update_dog": + ready_for_command = false; + conn.Reducers.UpdateDog(command.name, command.color, command.age); + break; + case "update_cat": + ready_for_command = false; + conn.Reducers.UpdateCat(command.name, command.color, command.age); + break; + case "remove_dog": + ready_for_command = false; + conn.Reducers.RemoveDog(command.name); + break; + case "remove_cat": + ready_for_command = false; + conn.Reducers.RemoveCat(command.name); + break; + case "subscribe_to_test_1": + ready_for_command = false; + string[] subscriptionArray1 = new string[] { "SELECT * FROM dog WHERE dog.age > 3", "SELECT * FROM dog WHERE dog.color = 'Brown'" }; + primaryTestSubscriptionHandle = conn.SubscriptionBuilder() + .OnApplied(OnSubscriptionApplied) + .Subscribe(subscriptionArray1); + break; + case "set_client_dogs_test1": + model.ExpectedClientDogs = new HashSet(model.ExpectedServerDogs.Where(dog => dog.Age > 3 || dog.Color == "Brown")); + break; + case "unsubscribe_to_test_1": + primaryTestSubscriptionHandle?.Unsubscribe(); + break; + case "subscribe_to_test_2": + ready_for_command = false; + string[] primarySubscriptionArray = new string[] { "SELECT * FROM dog WHERE dog.age > 3" }; + string[] secondaySubscriptionArray = new string[] { "SELECT * FROM dog WHERE dog.color = 'Brown'" }; + primaryTestSubscriptionHandle = conn.SubscriptionBuilder() + .OnApplied(OnSubscriptionApplied) + .Subscribe(primarySubscriptionArray); + secondaryTestSubscriptionHandle = conn.SubscriptionBuilder() + .OnApplied(OnSubscriptionApplied) + .Subscribe(secondaySubscriptionArray); + break; + case "set_client_dogs_test2": + model.ExpectedClientDogs = new HashSet(model.ExpectedServerDogs.Where(dog => dog.Age > 3 || dog.Color == "Brown")); + break; + case "unsubscribe_to_primary_test_2": + primaryTestSubscriptionHandle?.Unsubscribe(); + break; + case "unsubscribe_to_secondary_test_2": + secondaryTestSubscriptionHandle?.Unsubscribe(); + break; + case "compare_client_dogs_to_server": + CompareEventDogsToModel(conn, model.ExpectedClientDogs); + break; + case "compare_client_cats_to_server": + CompareEventCatsToModel(conn, model.ExpectedClientCats); + break; + case "set_client_dogs_to_server": + model.ExpectedClientDogs = model.ExpectedServerDogs; + break; + case "set_client_cats_to_server": + model.ExpectedClientCats = model.ExpectedServerCats; + break; + case "set_client_cats_test1": + model.ExpectedClientCats = new HashSet(model.ExpectedServerCats.Where(cat => cat.Age > 2)); + break; + case "print_server_dogs": + OutputSubscribedServerDogs(conn); + break; + case "print_server_cats": + OutputSubscribedServerCats(conn); + break; + case "print_client_dogs": + model.OutputExpectedDogs(model.ExpectedClientDogs); + break; + case "print_client_cats": + model.OutputExpectedCats(model.ExpectedClientCats); + break; + } + } +} + +Main(); \ No newline at end of file diff --git a/sdks/csharp/examples~/multiplicity/client/client.csproj b/sdks/csharp/examples~/multiplicity/client/client.csproj new file mode 100644 index 00000000000..3167056980b --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/client.csproj @@ -0,0 +1,14 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/AddCat.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/AddCat.g.cs new file mode 100644 index 00000000000..9ebc1031344 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/AddCat.g.cs @@ -0,0 +1,75 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void AddCatHandler(ReducerEventContext ctx, string name, string color, uint age); + public event AddCatHandler? OnAddCat; + + public void AddCat(string name, string color, uint age) + { + conn.InternalCallReducer(new Reducer.AddCat(name, color, age), this.SetCallReducerFlags.AddCatFlags); + } + + public bool InvokeAddCat(ReducerEventContext ctx, Reducer.AddCat args) + { + if (OnAddCat == null) return false; + OnAddCat( + ctx, + args.Name, + args.Color, + args.Age + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class AddCat : Reducer, IReducerArgs + { + [DataMember(Name = "name")] + public string Name; + [DataMember(Name = "color")] + public string Color; + [DataMember(Name = "age")] + public uint Age; + + public AddCat( + string Name, + string Color, + uint Age + ) + { + this.Name = Name; + this.Color = Color; + this.Age = Age; + } + + public AddCat() + { + this.Name = ""; + this.Color = ""; + } + + string IReducerArgs.ReducerName => "add_cat"; + } + } + + public sealed partial class SetReducerFlags + { + internal CallReducerFlags AddCatFlags; + public void AddCat(CallReducerFlags flags) => AddCatFlags = flags; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/AddDog.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/AddDog.g.cs new file mode 100644 index 00000000000..31978a40f8f --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/AddDog.g.cs @@ -0,0 +1,75 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void AddDogHandler(ReducerEventContext ctx, string name, string color, uint age); + public event AddDogHandler? OnAddDog; + + public void AddDog(string name, string color, uint age) + { + conn.InternalCallReducer(new Reducer.AddDog(name, color, age), this.SetCallReducerFlags.AddDogFlags); + } + + public bool InvokeAddDog(ReducerEventContext ctx, Reducer.AddDog args) + { + if (OnAddDog == null) return false; + OnAddDog( + ctx, + args.Name, + args.Color, + args.Age + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class AddDog : Reducer, IReducerArgs + { + [DataMember(Name = "name")] + public string Name; + [DataMember(Name = "color")] + public string Color; + [DataMember(Name = "age")] + public uint Age; + + public AddDog( + string Name, + string Color, + uint Age + ) + { + this.Name = Name; + this.Color = Color; + this.Age = Age; + } + + public AddDog() + { + this.Name = ""; + this.Color = ""; + } + + string IReducerArgs.ReducerName => "add_dog"; + } + } + + public sealed partial class SetReducerFlags + { + internal CallReducerFlags AddDogFlags; + public void AddDog(CallReducerFlags flags) => AddDogFlags = flags; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/IdentityConnected.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/IdentityConnected.g.cs new file mode 100644 index 00000000000..688edfb2eb3 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/IdentityConnected.g.cs @@ -0,0 +1,37 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void IdentityConnectedHandler(ReducerEventContext ctx); + public event IdentityConnectedHandler? OnIdentityConnected; + + public bool InvokeIdentityConnected(ReducerEventContext ctx, Reducer.IdentityConnected args) + { + if (OnIdentityConnected == null) return false; + OnIdentityConnected( + ctx + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class IdentityConnected : Reducer, IReducerArgs + { + string IReducerArgs.ReducerName => "identity_connected"; + } + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/IdentityDisconnected.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/IdentityDisconnected.g.cs new file mode 100644 index 00000000000..b44f2ed76a6 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/IdentityDisconnected.g.cs @@ -0,0 +1,37 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void IdentityDisconnectedHandler(ReducerEventContext ctx); + public event IdentityDisconnectedHandler? OnIdentityDisconnected; + + public bool InvokeIdentityDisconnected(ReducerEventContext ctx, Reducer.IdentityDisconnected args) + { + if (OnIdentityDisconnected == null) return false; + OnIdentityDisconnected( + ctx + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class IdentityDisconnected : Reducer, IReducerArgs + { + string IReducerArgs.ReducerName => "identity_disconnected"; + } + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/RemoveCat.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/RemoveCat.g.cs new file mode 100644 index 00000000000..69be6683637 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/RemoveCat.g.cs @@ -0,0 +1,62 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void RemoveCatHandler(ReducerEventContext ctx, string name); + public event RemoveCatHandler? OnRemoveCat; + + public void RemoveCat(string name) + { + conn.InternalCallReducer(new Reducer.RemoveCat(name), this.SetCallReducerFlags.RemoveCatFlags); + } + + public bool InvokeRemoveCat(ReducerEventContext ctx, Reducer.RemoveCat args) + { + if (OnRemoveCat == null) return false; + OnRemoveCat( + ctx, + args.Name + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class RemoveCat : Reducer, IReducerArgs + { + [DataMember(Name = "name")] + public string Name; + + public RemoveCat(string Name) + { + this.Name = Name; + } + + public RemoveCat() + { + this.Name = ""; + } + + string IReducerArgs.ReducerName => "remove_cat"; + } + } + + public sealed partial class SetReducerFlags + { + internal CallReducerFlags RemoveCatFlags; + public void RemoveCat(CallReducerFlags flags) => RemoveCatFlags = flags; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/RemoveDog.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/RemoveDog.g.cs new file mode 100644 index 00000000000..2524cdc3d5e --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/RemoveDog.g.cs @@ -0,0 +1,62 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void RemoveDogHandler(ReducerEventContext ctx, string name); + public event RemoveDogHandler? OnRemoveDog; + + public void RemoveDog(string name) + { + conn.InternalCallReducer(new Reducer.RemoveDog(name), this.SetCallReducerFlags.RemoveDogFlags); + } + + public bool InvokeRemoveDog(ReducerEventContext ctx, Reducer.RemoveDog args) + { + if (OnRemoveDog == null) return false; + OnRemoveDog( + ctx, + args.Name + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class RemoveDog : Reducer, IReducerArgs + { + [DataMember(Name = "name")] + public string Name; + + public RemoveDog(string Name) + { + this.Name = Name; + } + + public RemoveDog() + { + this.Name = ""; + } + + string IReducerArgs.ReducerName => "remove_dog"; + } + } + + public sealed partial class SetReducerFlags + { + internal CallReducerFlags RemoveDogFlags; + public void RemoveDog(CallReducerFlags flags) => RemoveDogFlags = flags; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/UpdateCat.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/UpdateCat.g.cs new file mode 100644 index 00000000000..5ea398a7588 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/UpdateCat.g.cs @@ -0,0 +1,75 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void UpdateCatHandler(ReducerEventContext ctx, string name, string color, uint age); + public event UpdateCatHandler? OnUpdateCat; + + public void UpdateCat(string name, string color, uint age) + { + conn.InternalCallReducer(new Reducer.UpdateCat(name, color, age), this.SetCallReducerFlags.UpdateCatFlags); + } + + public bool InvokeUpdateCat(ReducerEventContext ctx, Reducer.UpdateCat args) + { + if (OnUpdateCat == null) return false; + OnUpdateCat( + ctx, + args.Name, + args.Color, + args.Age + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class UpdateCat : Reducer, IReducerArgs + { + [DataMember(Name = "name")] + public string Name; + [DataMember(Name = "color")] + public string Color; + [DataMember(Name = "age")] + public uint Age; + + public UpdateCat( + string Name, + string Color, + uint Age + ) + { + this.Name = Name; + this.Color = Color; + this.Age = Age; + } + + public UpdateCat() + { + this.Name = ""; + this.Color = ""; + } + + string IReducerArgs.ReducerName => "update_cat"; + } + } + + public sealed partial class SetReducerFlags + { + internal CallReducerFlags UpdateCatFlags; + public void UpdateCat(CallReducerFlags flags) => UpdateCatFlags = flags; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/UpdateDog.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/UpdateDog.g.cs new file mode 100644 index 00000000000..6320d8dced9 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Reducers/UpdateDog.g.cs @@ -0,0 +1,75 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + public delegate void UpdateDogHandler(ReducerEventContext ctx, string name, string color, uint age); + public event UpdateDogHandler? OnUpdateDog; + + public void UpdateDog(string name, string color, uint age) + { + conn.InternalCallReducer(new Reducer.UpdateDog(name, color, age), this.SetCallReducerFlags.UpdateDogFlags); + } + + public bool InvokeUpdateDog(ReducerEventContext ctx, Reducer.UpdateDog args) + { + if (OnUpdateDog == null) return false; + OnUpdateDog( + ctx, + args.Name, + args.Color, + args.Age + ); + return true; + } + } + + public abstract partial class Reducer + { + [SpacetimeDB.Type] + [DataContract] + public sealed partial class UpdateDog : Reducer, IReducerArgs + { + [DataMember(Name = "name")] + public string Name; + [DataMember(Name = "color")] + public string Color; + [DataMember(Name = "age")] + public uint Age; + + public UpdateDog( + string Name, + string Color, + uint Age + ) + { + this.Name = Name; + this.Color = Color; + this.Age = Age; + } + + public UpdateDog() + { + this.Name = ""; + this.Color = ""; + } + + string IReducerArgs.ReducerName => "update_dog"; + } + } + + public sealed partial class SetReducerFlags + { + internal CallReducerFlags UpdateDogFlags; + public void UpdateDog(CallReducerFlags flags) => UpdateDogFlags = flags; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/SpacetimeDBClient.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/SpacetimeDBClient.g.cs new file mode 100644 index 00000000000..0eca6c97b51 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/SpacetimeDBClient.g.cs @@ -0,0 +1,161 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteReducers : RemoteBase + { + internal RemoteReducers(DbConnection conn, SetReducerFlags flags) : base(conn) => SetCallReducerFlags = flags; + internal readonly SetReducerFlags SetCallReducerFlags; + } + + public sealed partial class RemoteTables : RemoteTablesBase + { + public RemoteTables(DbConnection conn) + { + AddTable(Cat = new(conn)); + AddTable(Dog = new(conn)); + } + } + + public sealed partial class SetReducerFlags { } + + public sealed class EventContext : IEventContext + { + private readonly DbConnection conn; + public readonly Event Event; + + public RemoteTables Db => conn.Db; + public RemoteReducers Reducers => conn.Reducers; + public SetReducerFlags SetReducerFlags => conn.SetReducerFlags; + + internal EventContext(DbConnection conn, Event Event) + { + this.conn = conn; + this.Event = Event; + } + } + + public sealed class ReducerEventContext : IReducerEventContext + { + private readonly DbConnection conn; + public readonly ReducerEvent Event; + + public RemoteTables Db => conn.Db; + public RemoteReducers Reducers => conn.Reducers; + public SetReducerFlags SetReducerFlags => conn.SetReducerFlags; + + internal ReducerEventContext(DbConnection conn, ReducerEvent reducerEvent) + { + this.conn = conn; + Event = reducerEvent; + } + } + + public sealed class ErrorContext : IErrorContext + { + private readonly DbConnection conn; + public readonly Exception Event; + Exception IErrorContext.Event { + get { + return Event; + } + } + + public RemoteTables Db => conn.Db; + public RemoteReducers Reducers => conn.Reducers; + public SetReducerFlags SetReducerFlags => conn.SetReducerFlags; + public Exception Error => Event; + + internal ErrorContext(DbConnection conn, Exception error) + { + this.conn = conn; + Event = error; + } + } + + public sealed class SubscriptionEventContext : ISubscriptionEventContext + { + private readonly DbConnection conn; + + public RemoteTables Db => conn.Db; + public RemoteReducers Reducers => conn.Reducers; + public SetReducerFlags SetReducerFlags => conn.SetReducerFlags; + + internal SubscriptionEventContext(DbConnection conn) + { + this.conn = conn; + } + } + + public abstract partial class Reducer + { + private Reducer() { } + } + + public sealed class DbConnection : DbConnectionBase + { + public override RemoteTables Db { get; } + public readonly RemoteReducers Reducers; + public readonly SetReducerFlags SetReducerFlags = new(); + + public DbConnection() + { + Db = new(this); + Reducers = new(this, SetReducerFlags); + } + + protected override Reducer ToReducer(TransactionUpdate update) + { + var encodedArgs = update.ReducerCall.Args; + return update.ReducerCall.ReducerName switch { + "add_cat" => BSATNHelpers.Decode(encodedArgs), + "add_dog" => BSATNHelpers.Decode(encodedArgs), + "identity_connected" => BSATNHelpers.Decode(encodedArgs), + "identity_disconnected" => BSATNHelpers.Decode(encodedArgs), + "remove_cat" => BSATNHelpers.Decode(encodedArgs), + "remove_dog" => BSATNHelpers.Decode(encodedArgs), + "update_cat" => BSATNHelpers.Decode(encodedArgs), + "update_dog" => BSATNHelpers.Decode(encodedArgs), + var reducer => throw new ArgumentOutOfRangeException("Reducer", $"Unknown reducer {reducer}") + }; + } + + protected override IEventContext ToEventContext(Event Event) => + new EventContext(this, Event); + + protected override IReducerEventContext ToReducerEventContext(ReducerEvent reducerEvent) => + new ReducerEventContext(this, reducerEvent); + + protected override ISubscriptionEventContext MakeSubscriptionEventContext() => + new SubscriptionEventContext(this); + + protected override IErrorContext ToErrorContext(Exception exception) => + new ErrorContext(this, exception); + + protected override bool Dispatch(IReducerEventContext context, Reducer reducer) + { + var eventContext = (ReducerEventContext)context; + return reducer switch { + Reducer.AddCat args => Reducers.InvokeAddCat(eventContext, args), + Reducer.AddDog args => Reducers.InvokeAddDog(eventContext, args), + Reducer.IdentityConnected args => Reducers.InvokeIdentityConnected(eventContext, args), + Reducer.IdentityDisconnected args => Reducers.InvokeIdentityDisconnected(eventContext, args), + Reducer.RemoveCat args => Reducers.InvokeRemoveCat(eventContext, args), + Reducer.RemoveDog args => Reducers.InvokeRemoveDog(eventContext, args), + Reducer.UpdateCat args => Reducers.InvokeUpdateCat(eventContext, args), + Reducer.UpdateDog args => Reducers.InvokeUpdateDog(eventContext, args), + _ => throw new ArgumentOutOfRangeException("Reducer", $"Unknown reducer {reducer}") + }; + } + + public SubscriptionBuilder SubscriptionBuilder() => new(this); + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Tables/Cat.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Tables/Cat.g.cs new file mode 100644 index 00000000000..6827d0e6b88 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Tables/Cat.g.cs @@ -0,0 +1,39 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.BSATN; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteTables + { + public sealed class CatHandle : RemoteTableHandle + { + protected override string RemoteTableName => "cat"; + + public sealed class NameUniqueIndex : UniqueIndexBase + { + protected override string GetKey(Cat row) => row.Name; + + public NameUniqueIndex(CatHandle table) : base(table) { } + } + + public readonly NameUniqueIndex Name; + + internal CatHandle(DbConnection conn) : base(conn) + { + Name = new(this); + } + + protected override object GetPrimaryKey(Cat row) => row.Name; + } + + public readonly CatHandle Cat; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Tables/Dog.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Tables/Dog.g.cs new file mode 100644 index 00000000000..6c62aa482cc --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Tables/Dog.g.cs @@ -0,0 +1,39 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using SpacetimeDB.BSATN; +using SpacetimeDB.ClientApi; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + public sealed partial class RemoteTables + { + public sealed class DogHandle : RemoteTableHandle + { + protected override string RemoteTableName => "dog"; + + public sealed class NameUniqueIndex : UniqueIndexBase + { + protected override string GetKey(Dog row) => row.Name; + + public NameUniqueIndex(DogHandle table) : base(table) { } + } + + public readonly NameUniqueIndex Name; + + internal DogHandle(DbConnection conn) : base(conn) + { + Name = new(this); + } + + protected override object GetPrimaryKey(Dog row) => row.Name; + } + + public readonly DogHandle Dog; + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Types/Cat.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Types/Cat.g.cs new file mode 100644 index 00000000000..a68bb572e17 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Types/Cat.g.cs @@ -0,0 +1,40 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + [SpacetimeDB.Type] + [DataContract] + public sealed partial class Cat + { + [DataMember(Name = "name")] + public string Name; + [DataMember(Name = "color")] + public string Color; + [DataMember(Name = "age")] + public uint Age; + + public Cat( + string Name, + string Color, + uint Age + ) + { + this.Name = Name; + this.Color = Color; + this.Age = Age; + } + + public Cat() + { + this.Name = ""; + this.Color = ""; + } + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/module_bindings/Types/Dog.g.cs b/sdks/csharp/examples~/multiplicity/client/module_bindings/Types/Dog.g.cs new file mode 100644 index 00000000000..320e2071b1f --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/module_bindings/Types/Dog.g.cs @@ -0,0 +1,40 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.Types +{ + [SpacetimeDB.Type] + [DataContract] + public sealed partial class Dog + { + [DataMember(Name = "name")] + public string Name; + [DataMember(Name = "color")] + public string Color; + [DataMember(Name = "age")] + public uint Age; + + public Dog( + string Name, + string Color, + uint Age + ) + { + this.Name = Name; + this.Color = Color; + this.Age = Age; + } + + public Dog() + { + this.Name = ""; + this.Color = ""; + } + } +} diff --git a/sdks/csharp/examples~/multiplicity/client/nuget.config b/sdks/csharp/examples~/multiplicity/client/nuget.config new file mode 100644 index 00000000000..8ff9d1e02da --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/client/nuget.config @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sdks/csharp/examples~/multiplicity/server/.gitignore b/sdks/csharp/examples~/multiplicity/server/.gitignore new file mode 100644 index 00000000000..31b13f058aa --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/server/.gitignore @@ -0,0 +1,17 @@ +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Spacetime ignore +/.spacetime \ No newline at end of file diff --git a/sdks/csharp/examples~/multiplicity/server/Cargo.toml b/sdks/csharp/examples~/multiplicity/server/Cargo.toml new file mode 100644 index 00000000000..877772e82d5 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/server/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "spacetime-module" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +crate-type = ["cdylib"] + +[dependencies] +spacetimedb = { git = "https://github.com/ClockworkLabs/SpacetimeDB" } +log = "0.4" diff --git a/sdks/csharp/examples~/multiplicity/server/src/lib.rs b/sdks/csharp/examples~/multiplicity/server/src/lib.rs new file mode 100644 index 00000000000..804d0852cf4 --- /dev/null +++ b/sdks/csharp/examples~/multiplicity/server/src/lib.rs @@ -0,0 +1,94 @@ +use spacetimedb::{table, reducer,ReducerContext, Table}; + +#[table(name = dog, public)] +pub struct Dog { + #[primary_key] + name: String, + color: String, + age: u32 +} + +#[table(name = cat, public)] +pub struct Cat { + #[primary_key] + name: String, + color: String, + age: u32 +} + +#[reducer] +pub fn add_dog(ctx: &ReducerContext, name: String, color: String, age: u32) { + ctx.db.dog().insert(Dog { name, color, age }); +} + +#[reducer] +pub fn add_cat(ctx: &ReducerContext, name: String, color: String, age: u32) { + ctx.db.cat().insert(Cat { name, color, age }); +} + +#[reducer] +pub fn update_dog(ctx: &ReducerContext, name: String, color: String, age: u32) -> Result<(), String> { + if let Some(dog) = ctx.db.dog().name().find(&name) { + ctx.db.dog().name().update(Dog { + name: name, + color: color, + age: age, + ..dog + }); + Ok(()) + } else { + Err("Cannot update unknown dog".to_string()) + } +} + +#[reducer] +pub fn update_cat(ctx: &ReducerContext, name: String, color: String, age: u32) -> Result<(), String> { + if let Some(cat) = ctx.db.cat().name().find(&name) { + ctx.db.cat().name().update(Cat { + name: name, + color: color, + age: age, + ..cat + }); + Ok(()) + } else { + Err("Cannot update unknown cat".to_string()) + } +} + +#[reducer] +pub fn remove_dog(ctx: &ReducerContext, name: String) -> Result<(), String> { + if let Some(dog) = ctx.db.dog().name().find(name.to_string()) { + ctx.db.dog().name().delete(&dog.name); + log::info!("Deleted dog named {:?}", name); + Ok(()) + } else { + Err("Cannot delete unknown dog".to_string()) + } +} + +#[reducer] +pub fn remove_cat(ctx: &ReducerContext, name: String) -> Result<(), String> { + if let Some(cat) = ctx.db.cat().name().find(name.to_string()) { + ctx.db.cat().name().delete(&cat.name); + log::info!("Deleted cat named {:?}", name); + Ok(()) + } else { + Err("Cannot delete unknown cat".to_string()) + } +} + +#[reducer(init)] +pub fn init(_ctx: &ReducerContext) { + // Called when the module is initially published +} + +#[reducer(client_connected)] +pub fn identity_connected(_ctx: &ReducerContext) { + // Called everytime a new client connects +} + +#[reducer(client_disconnected)] +pub fn identity_disconnected(_ctx: &ReducerContext) { + // Called everytime a client disconnects +} \ No newline at end of file diff --git a/sdks/csharp/examples~/quickstart/client/Program.cs b/sdks/csharp/examples~/quickstart/client/Program.cs index fc48da6012f..b6383d65c68 100644 --- a/sdks/csharp/examples~/quickstart/client/Program.cs +++ b/sdks/csharp/examples~/quickstart/client/Program.cs @@ -94,6 +94,7 @@ void PrintMessage(RemoteTables tables, Message message) void Message_OnInsert(EventContext ctx, Message insertedValue) { + if (ctx.Event is not Event.SubscribeApplied) { PrintMessage(ctx.Db, insertedValue); @@ -123,25 +124,15 @@ void OnConnect(DbConnection conn, Identity identity, string authToken) local_identity = identity; AuthToken.SaveToken(authToken); - var subscriptions = 0; - Action waitForSubscriptions = (SubscriptionEventContext ctx) => - { - // Note: callbacks are always invoked on the main thread, so you don't need to - // worry about thread synchronization or anything like that. - subscriptions += 1; - - if (subscriptions == 2) - { - OnSubscriptionApplied(ctx); - } - }; - var userSubscription = conn.SubscriptionBuilder() - .OnApplied(waitForSubscriptions) - .Subscribe("SELECT * FROM user"); - var messageSubscription = conn.SubscriptionBuilder() - .OnApplied(waitForSubscriptions) - .Subscribe("SELECT * FROM message"); + .OnApplied(OnSubscriptionApplied) + .Subscribe(new string[] { + "SELECT * FROM user", + "SELECT * FROM message", + // It is legal to have redundant subscriptions. + // However, keep in mind that data will be sent over the wire multiple times, + // once for each subscriptions. This can cause slowdowns if you aren't careful. + "SELECT * FROM message" }); // You can also use SubscribeToAllTables, but it should be avoided if you have any large tables: // conn.SubscriptionBuilder().OnApplied(OnSubscriptionApplied).SubscribeToAllTables(); diff --git a/sdks/csharp/src/Event.cs b/sdks/csharp/src/Event.cs index 6c7808d9ac3..aae39a9f3ca 100644 --- a/sdks/csharp/src/Event.cs +++ b/sdks/csharp/src/Event.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using SpacetimeDB.ClientApi; namespace SpacetimeDB @@ -89,8 +90,8 @@ Action callback } public SubscriptionHandle Subscribe( - string querySql - ) => new(conn, Applied, Error, querySql); + string[] querySqls + ) => new(conn, Applied, Error, querySqls); public void SubscribeToAllTables() { @@ -224,14 +225,14 @@ internal SubscriptionHandle( IDbConnection conn, Action? onApplied, Action? onError, - string querySql + string[] querySqls ) { state = new SubscriptionState.Pending(new()); this.onApplied = onApplied; this.onError = onError; this.conn = conn; - conn.Subscribe(this, querySql); + conn.Subscribe(this, querySqls); } /// @@ -254,11 +255,16 @@ public void UnsubscribeThen(Action? onEnded) { throw new Exception("Cannot unsubscribe from inactive subscription."); } - if (onEnded != null) + if (this.onEnded != null) { // TODO: should we just log here instead? Do we try not to throw exceptions on the main thread? throw new Exception("Unsubscribe already called."); } + if (onEnded == null) + { + // We need to put something in there to use this as a boolean. Shrug emoji + onEnded = (ctx) => { }; + } this.onEnded = onEnded; } } diff --git a/sdks/csharp/src/MultiDictionary.cs b/sdks/csharp/src/MultiDictionary.cs new file mode 100644 index 00000000000..40ba44c43d6 --- /dev/null +++ b/sdks/csharp/src/MultiDictionary.cs @@ -0,0 +1,434 @@ +using System; +using System.Linq; +using System.Text; +using System.Collections.Generic; +using System.Diagnostics; +using System.Data; + +namespace SpacetimeDB +{ + /// + /// A dictionary that may have multiple copies of a key-value pair. + /// Note that a particular key only maps to one value -- it is a logical error + /// to insert the same key with different values. + /// + /// + /// + internal struct MultiDictionary : IEquatable> + { + // The actual data. + readonly Dictionary RawDict; + readonly IEqualityComparer ValueComparer; + + /// + /// Construct a MultiDictionary. + /// + /// This is the only valid constructor for a Multidictionary - using the parameterless constructor + /// will result in null pointer errors. But we can't enforce this because of Unity. + /// + /// + public MultiDictionary(IEqualityComparer keyComparer, IEqualityComparer valueComparer) + { + RawDict = new(keyComparer); + ValueComparer = valueComparer; + } + + public static MultiDictionary FromEnumerable(IEnumerable> enumerable, IEqualityComparer keyComparer, IEqualityComparer valueComparer) + { + var result = new MultiDictionary(keyComparer, valueComparer); + foreach (var item in enumerable) + { + result.Add(item.Key, item.Value); + } + return result; + } + + /// + /// Return the count WITHOUT multiplicities. + /// This is mathematically unnatural, but cheap. + /// + public readonly uint CountDistinct => (uint)RawDict.Count; + + /// + /// Return the count WITH multiplicities. + /// + public readonly uint Count => RawDict.Select(item => item.Value.Multiplicity).Aggregate(0u, (a, b) => a + b); + + /// + /// Add a key-value-pair to the multidictionary. + /// If the key is already present, its associated value must satisfy + /// keyComparer.Equals(value, item.Value). + /// + /// + /// Whether the key is entirely new to the dictionary. If it was already present, we assert that the old value is equal to the new value. + public bool Add(TKey key, TValue value) + { + if (value == null) + { + throw new NullReferenceException("Null values are forbidden in multidictionary"); + } + Debug.Assert(RawDict != null); + Debug.Assert(key != null); + if (RawDict.TryGetValue(key, out var result)) + { + Debug.Assert(ValueComparer.Equals(value, result.Value), "Added key-value pair with mismatched value to existing data"); + RawDict[key] = (value, result.Multiplicity + 1); + return false; + } + else + { + RawDict[key] = (value, 1); + return true; + } + } + + /// + /// Completely clear the multidictionary. + /// + public void Clear() + { + RawDict.Clear(); + } + + /// + /// Whether the multidictionary contains any copies of an item. + /// + /// + /// + public bool Contains(KeyValuePair item) + { + if (RawDict.TryGetValue(item.Key, out var result)) + { + return ValueComparer.Equals(item.Value, result.Value); + } + return false; + } + + /// + /// Remove a key from the dictionary. + /// + /// + /// Whether the last copy of the key was removed. + public bool Remove(TKey key, out TValue row) + { + if (RawDict.TryGetValue(key, out var result)) + { + row = result.Value; + if (result.Multiplicity == 1) + { + RawDict.Remove(key); + return true; + } + else + { + RawDict[key] = (result.Value, result.Multiplicity - 1); + return false; + } + } + row = default!; // uhh, this might be null. Good thing it's an internal method? + return false; + } + + public bool Equals(MultiDictionary other) + { + foreach (var item in RawDict) + { + var (key, (value, multiplicity)) = item; + if (other.RawDict.TryGetValue(key, out var otherVM)) + { + var (otherValue, otherMultiplicity) = otherVM; + if (!(ValueComparer.Equals(value, otherValue) && multiplicity == otherMultiplicity)) + { + return false; + } + } + } + + return true; + } + + public readonly IEnumerable Values + { + get + { + + return RawDict.Select(item => item.Value.Value); + } + } + + public readonly IEnumerable> Entries + { + get + { + return RawDict.Select(item => new KeyValuePair(item.Key, item.Value.Value)); + } + } + + /// + /// Iterate the rows that will be removed when `delta` is applied. + /// + /// + /// + public readonly IEnumerable> WillRemove(MultiDictionaryDelta delta) + { + var self = this; + return delta.Entries.Where(entry => + { + var entryDelta = (int)entry.Value.Inserts - (int)entry.Value.Removes; + if (entryDelta >= 0) + { + return false; + } + if (self.RawDict.TryGetValue(entry.Key, out var mine)) + { + var resultMultiplicity = (int)mine.Multiplicity + entryDelta; + return resultMultiplicity <= 0; + } + else + { + Log.Warn($"Want to remove row with key {entry.Key}, but it doesn't exist!"); + return false; + } + }).Select(entry => new KeyValuePair(entry.Key, entry.Value.Value)); + } + + /// + /// Apply a collection of changes to a multidictionary. + /// + /// The changes to apply. + /// Called on rows that were inserted. + /// Called on rows that were updated (not including multiplicity changes). + /// Called on rows that were removed. + public void Apply(MultiDictionaryDelta delta, List> wasInserted, List<(TKey Key, TValue OldValue, TValue NewValue)> wasUpdated, List> wasRemoved) + { + foreach (var (key, their) in delta.Entries) + { + var entryDelta = (int)their.Inserts - (int)their.Removes; + + if (RawDict.TryGetValue(key, out var my)) + { + var newMultiplicity = (int)my.Multiplicity + entryDelta; + if (newMultiplicity > 0) + { + if (ValueComparer.Equals(my.Value, their.Value)) + { + // Update the count, NOT dispatching an update event. + + // It sort of matters if we use my.Value or their.Value here: + // we'd prefer to keep stricter equalities like pointer equality intact if possible. + // So even though my.Value and theirValue are "equal", prefer using my.Value for + // pointer stability reasons. + RawDict[key] = (my.Value, (uint)newMultiplicity); + } + else + { + // Update the count and value, dispatching an update event. + Debug.Assert(their.Removes >= my.Multiplicity, "Row was not removed enough times in update."); + + // Here, we actually have meaningful changes, so use their value. + RawDict[key] = (their.Value, (uint)newMultiplicity); + wasUpdated.Add((key, my.Value, their.Value)); // store both the old and new values. + } + } + else // if (newMultiplicity <= 0) + { + // This is a removal. + if (newMultiplicity < 0) + { + PseudoThrow($"Internal error: Removing row with key {key} {-entryDelta} times, but it is only present {my.Multiplicity} times."); + } + + RawDict.Remove(key); + wasRemoved.Add(new(key, their.Value)); + } + } + else + { + // Key is not present in map. + if (entryDelta < 0) + { + PseudoThrow($"Internal error: Removing row with key {key} {-entryDelta} times, but it not present."); + } + else if (entryDelta == 0) + { + // Hmm. + // This is not actually a problem. + // Do nothing. + } + else if (entryDelta > 0) + { + RawDict[key] = (their.Value, (uint)entryDelta); + wasInserted.Add(new(key, their.Value)); + } + } + } + + + } + + /// + /// Raise a debug assertion failure in debug mode, otherwise just warn and keep going. + /// + /// + private void PseudoThrow(string message) + { + Log.Warn(message); + Debug.Assert(false, message); + } + + public override string ToString() + { + StringBuilder result = new(); + result.Append("SpacetimeDB.MultiDictionary { "); + foreach (var item in RawDict) + { + result.Append($"({item.Key}: {item.Value.Value}) x {item.Value.Multiplicity}, "); + } + result.Append("}"); + return result.ToString(); + } + + } + + /// + /// A bulk change to a multidictionary. Allows both adding and removing rows. + /// + /// Can be applied to a multidictionary, and also inspected before application to see + /// what rows will be deleted. (This is used for OnBeforeDelete.) + /// + /// Curiously, the order of operations applied to a MultiDictionaryDelta does not matter. + /// No matter the order of Add and Remove calls on a delta, when the Delta is applied, + /// the result will be the same, as long as the Add and Remove *counts* for each KeyValuePair are + /// the same. + /// (This means that this is a "conflict-free replicated data type", unlike MultiDictionary.) + /// (MultiDictionary would also be "conflict-free" if it didn't support Remove.) + /// + /// The delta may include value updates. + /// A value can be updated multiple times, but each update must set the result to the same value. + /// When applying a delta, if the target multidictionary has multiple copies of (key, value) pair, + /// the row must be removed exactly the correct number of times. It can be inserted an arbitrary number of times. + /// + /// When removing a row for an update, it is legal for the passed value to be equal to EITHER the old value or the new value. + /// (This is because I'm not sure what SpacetimeDB core does.) + /// + /// + /// + internal struct MultiDictionaryDelta : IEquatable> + { + /// + /// For each key, track its NEW value (or old value, but only if we have never seen the new value). + /// Also track the number of times it has been removed and inserted. + /// We keep these separate so that we can debug-assert that a KVP has been removed enough times (in case + /// there are multiple copies of the KVP in the map we get applied to.) + /// + readonly Dictionary RawDict; + + readonly IEqualityComparer ValueComparer; + + /// + /// Construct a MultiDictionaryDelta. + /// + /// This is the only valid constructor for a MultiDictionaryDelta - using the parameterless constructor + /// will result in null pointer errors. But we can't enforce this because of Unity. + /// + /// + + public MultiDictionaryDelta(IEqualityComparer keyComparer, IEqualityComparer valueComparer) + { + RawDict = new(keyComparer); + ValueComparer = valueComparer; + } + + /// + /// Add a key-value-pair to the multidictionary. + /// If the key is already present, its associated value must satisfy + /// keyComparer.Equals(value, item.Value). + /// + /// + public void Add(TKey key, TValue value) + { + if (value == null) + { + throw new NullReferenceException("Null values are forbidden in multidictionary"); + } + Debug.Assert(RawDict != null); + Debug.Assert(key != null); + if (RawDict.TryGetValue(key, out var result)) + { + if (result.Inserts > 0) + { + Debug.Assert(ValueComparer.Equals(value, result.Value), "Added key-value pair with mismatched value to existing data"); + } + // Now, make sure we override the value, since it may have been added in a remove, which MAY have passed the + // out-of-date value. + RawDict[key] = (value, result.Removes, result.Inserts + 1); + } + else + { + RawDict[key] = (value, 0, 1); + } + } + + /// + /// Completely clear the multidictionary. + /// + public void Clear() + { + RawDict.Clear(); + } + + /// + /// Remove a key from the dictionary. + /// + /// + public void Remove(TKey key, TValue value) + { + if (RawDict.TryGetValue(key, out var result)) + { + // DON'T assert that result.Value == value: if an update is happening, that may not be the case. + RawDict[key] = (result.Value, result.Removes + 1, result.Inserts); + } + else + { + RawDict[key] = (value, 1, 0); + } + } + + public override string ToString() + { + StringBuilder result = new(); + result.Append("SpacetimeDB.MultiDictionaryDelta { "); + foreach (var item in RawDict) + { + result.Append($"({item.Key}: {item.Value.Value}) x (+{item.Value.Inserts} -{item.Value.Removes}), "); + } + result.Append("}"); + return result.ToString(); + } + + public bool Equals(MultiDictionaryDelta other) + { + foreach (var item in RawDict) + { + var (key, my) = item; + if (other.RawDict.TryGetValue(key, out var their)) + { + if (!(ValueComparer.Equals(my.Value, their.Value) && my.Inserts == their.Inserts && my.Removes == their.Removes)) + { + return false; + } + } + } + + return true; + } + + public readonly IEnumerable> Entries + { + get + { + return RawDict; + } + } + } +} \ No newline at end of file diff --git a/sdks/csharp/src/MultiDictionary.cs.meta b/sdks/csharp/src/MultiDictionary.cs.meta new file mode 100644 index 00000000000..92f5b2c96d5 --- /dev/null +++ b/sdks/csharp/src/MultiDictionary.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: e0a96cc75d55b1f4283f07bb48f8ca4f +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/ClientMessage.g.cs b/sdks/csharp/src/SpacetimeDB/ClientApi/ClientMessage.g.cs index c9646b85605..ad06d90ad12 100644 --- a/sdks/csharp/src/SpacetimeDB/ClientApi/ClientMessage.g.cs +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/ClientMessage.g.cs @@ -13,6 +13,8 @@ public partial record ClientMessage : SpacetimeDB.TaggedEnum<( Subscribe Subscribe, OneOffQuery OneOffQuery, SubscribeSingle SubscribeSingle, - Unsubscribe Unsubscribe + SubscribeMulti SubscribeMulti, + Unsubscribe Unsubscribe, + UnsubscribeMulti UnsubscribeMulti )>; } diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/ServerMessage.g.cs b/sdks/csharp/src/SpacetimeDB/ClientApi/ServerMessage.g.cs index 5e0c12740f7..53e85fb8700 100644 --- a/sdks/csharp/src/SpacetimeDB/ClientApi/ServerMessage.g.cs +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/ServerMessage.g.cs @@ -16,6 +16,8 @@ public partial record ServerMessage : SpacetimeDB.TaggedEnum<( OneOffQueryResponse OneOffQueryResponse, SubscribeApplied SubscribeApplied, UnsubscribeApplied UnsubscribeApplied, - SubscriptionError SubscriptionError + SubscriptionError SubscriptionError, + SubscribeMultiApplied SubscribeMultiApplied, + UnsubscribeMultiApplied UnsubscribeMultiApplied )>; } diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMulti.g.cs b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMulti.g.cs new file mode 100644 index 00000000000..41bf0ec481f --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMulti.g.cs @@ -0,0 +1,40 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.ClientApi +{ + [SpacetimeDB.Type] + [DataContract] + public sealed partial class SubscribeMulti + { + [DataMember(Name = "query_strings")] + public System.Collections.Generic.List QueryStrings; + [DataMember(Name = "request_id")] + public uint RequestId; + [DataMember(Name = "query_id")] + public QueryId QueryId; + + public SubscribeMulti( + System.Collections.Generic.List QueryStrings, + uint RequestId, + QueryId QueryId + ) + { + this.QueryStrings = QueryStrings; + this.RequestId = RequestId; + this.QueryId = QueryId; + } + + public SubscribeMulti() + { + this.QueryStrings = new(); + this.QueryId = new(); + } + } +} diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMulti.g.cs.meta b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMulti.g.cs.meta new file mode 100644 index 00000000000..68429491874 --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMulti.g.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: bca1b8b9e42c8c44da54059207cc331c +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMultiApplied.g.cs b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMultiApplied.g.cs new file mode 100644 index 00000000000..2f182a03df7 --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMultiApplied.g.cs @@ -0,0 +1,44 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.ClientApi +{ + [SpacetimeDB.Type] + [DataContract] + public sealed partial class SubscribeMultiApplied + { + [DataMember(Name = "request_id")] + public uint RequestId; + [DataMember(Name = "total_host_execution_duration_micros")] + public ulong TotalHostExecutionDurationMicros; + [DataMember(Name = "query_id")] + public QueryId QueryId; + [DataMember(Name = "update")] + public DatabaseUpdate Update; + + public SubscribeMultiApplied( + uint RequestId, + ulong TotalHostExecutionDurationMicros, + QueryId QueryId, + DatabaseUpdate Update + ) + { + this.RequestId = RequestId; + this.TotalHostExecutionDurationMicros = TotalHostExecutionDurationMicros; + this.QueryId = QueryId; + this.Update = Update; + } + + public SubscribeMultiApplied() + { + this.QueryId = new(); + this.Update = new(); + } + } +} diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMultiApplied.g.cs.meta b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMultiApplied.g.cs.meta new file mode 100644 index 00000000000..bbfb0fc6526 --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/SubscribeMultiApplied.g.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 617c823ceee44e449a26b57b0b085203 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMulti.g.cs b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMulti.g.cs new file mode 100644 index 00000000000..e36d808b20c --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMulti.g.cs @@ -0,0 +1,35 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.ClientApi +{ + [SpacetimeDB.Type] + [DataContract] + public sealed partial class UnsubscribeMulti + { + [DataMember(Name = "request_id")] + public uint RequestId; + [DataMember(Name = "query_id")] + public QueryId QueryId; + + public UnsubscribeMulti( + uint RequestId, + QueryId QueryId + ) + { + this.RequestId = RequestId; + this.QueryId = QueryId; + } + + public UnsubscribeMulti() + { + this.QueryId = new(); + } + } +} diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMulti.g.cs.meta b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMulti.g.cs.meta new file mode 100644 index 00000000000..cf3f4444263 --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMulti.g.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 170ba367e0a469043ac85da8f2d78edc +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMultiApplied.g.cs b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMultiApplied.g.cs new file mode 100644 index 00000000000..5cd5b992057 --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMultiApplied.g.cs @@ -0,0 +1,44 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; + +namespace SpacetimeDB.ClientApi +{ + [SpacetimeDB.Type] + [DataContract] + public sealed partial class UnsubscribeMultiApplied + { + [DataMember(Name = "request_id")] + public uint RequestId; + [DataMember(Name = "total_host_execution_duration_micros")] + public ulong TotalHostExecutionDurationMicros; + [DataMember(Name = "query_id")] + public QueryId QueryId; + [DataMember(Name = "update")] + public DatabaseUpdate Update; + + public UnsubscribeMultiApplied( + uint RequestId, + ulong TotalHostExecutionDurationMicros, + QueryId QueryId, + DatabaseUpdate Update + ) + { + this.RequestId = RequestId; + this.TotalHostExecutionDurationMicros = TotalHostExecutionDurationMicros; + this.QueryId = QueryId; + this.Update = Update; + } + + public UnsubscribeMultiApplied() + { + this.QueryId = new(); + this.Update = new(); + } + } +} diff --git a/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMultiApplied.g.cs.meta b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMultiApplied.g.cs.meta new file mode 100644 index 00000000000..ea39c611c0c --- /dev/null +++ b/sdks/csharp/src/SpacetimeDB/ClientApi/UnsubscribeMultiApplied.g.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ef0d207c56b038d47a8ea8d22d94b022 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/sdks/csharp/src/SpacetimeDBClient.cs b/sdks/csharp/src/SpacetimeDBClient.cs index a6f9e9871f7..b2ec315d540 100644 --- a/sdks/csharp/src/SpacetimeDBClient.cs +++ b/sdks/csharp/src/SpacetimeDBClient.cs @@ -10,7 +10,7 @@ using SpacetimeDB.Internal; using SpacetimeDB.ClientApi; using Thread = System.Threading.Thread; -using System.Diagnostics; + namespace SpacetimeDB { @@ -109,7 +109,7 @@ public interface IDbConnection internal void AddOnDisconnect(WebSocket.CloseEventHandler cb); internal void LegacySubscribe(ISubscriptionHandle handle, string[] querySqls); - internal void Subscribe(ISubscriptionHandle handle, string querySql); + internal void Subscribe(ISubscriptionHandle handle, string[] querySqls); internal void Unsubscribe(QueryId queryId); void FrameTick(); void Disconnect(); @@ -125,24 +125,6 @@ public abstract class DbConnectionBase : IDbConne { public static DbConnectionBuilder Builder() => new(); - readonly struct DbValue - { - public readonly IStructuralReadWrite value; - public readonly byte[] bytes; - - public DbValue(IStructuralReadWrite value, byte[] bytes) - { - this.value = value; - this.bytes = bytes; - } - } - - struct DbOp - { - public IRemoteTableHandle table; - public DbValue? delete; - public DbValue? insert; - } internal event Action? onConnect; @@ -218,25 +200,44 @@ struct UnprocessedMessage public DateTime timestamp; } + struct ProcessedDatabaseUpdate + { + // the following dictionaries are DISJOINT. + public Dictionary> Updates; + + // Can't override the default constructor. Make sure you use this one! + public static ProcessedDatabaseUpdate New() + { + ProcessedDatabaseUpdate result; + result.Updates = new(GenericEqualityComparer.Instance); + return result; + } + + public MultiDictionaryDelta DeltaForTable(IRemoteTableHandle table) + { + if (!Updates.TryGetValue(table, out var delta)) + { + delta = new MultiDictionaryDelta(GenericEqualityComparer.Instance, DbValueComparer.Instance); + Updates[table] = delta; + } + + return delta; + } + } + struct ProcessedMessage { public ServerMessage message; - public List dbOps; + public ProcessedDatabaseUpdate dbOps; public DateTime timestamp; public ReducerEvent? reducerEvent; } - struct PreProcessedMessage - { - public ProcessedMessage processed; - public Dictionary>? subscriptionInserts; - } - private readonly BlockingCollection _messageQueue = new(new ConcurrentQueue()); - private readonly BlockingCollection _preProcessedNetworkMessages = - new(new ConcurrentQueue()); + private readonly BlockingCollection _preProcessedNetworkMessages = + new(new ConcurrentQueue()); internal static bool IsTesting; internal bool HasPreProcessedMessage => _preProcessedNetworkMessages.Count > 0; @@ -244,10 +245,23 @@ struct PreProcessedMessage private readonly CancellationTokenSource _preProcessCancellationTokenSource = new(); private CancellationToken _preProcessCancellationToken => _preProcessCancellationTokenSource.Token; - static DbValue Decode(IRemoteTableHandle table, byte[] bin, out object? primaryKey) + /// + /// Decode a row for a table, producing a primary key. + /// If the table has a specific column marked `#[primary_key]`, use that. + /// If not, the BSATN for the entire row is used instead. + /// + /// + /// + /// + /// + static DbValue Decode(IRemoteTableHandle table, byte[] bin, out object primaryKey) { var obj = table.DecodeValue(bin); - primaryKey = table.GetPrimaryKey(obj); + // TODO(1.1): we should exhaustively check that GenericEqualityComparer works + // for all types that are allowed to be primary keys. + var primaryKey_ = table.GetPrimaryKey(obj); + primaryKey_ ??= bin; + primaryKey = primaryKey_; return new(obj, bin); } @@ -363,58 +377,37 @@ void PreProcessMessages() } } - (List, Dictionary>) PreProcessLegacySubscription(InitialSubscription initSub) + ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub) { - var dbOps = new List(); + var dbOps = ProcessedDatabaseUpdate.New(); // This is all of the inserts int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows); - // FIXME: shouldn't this be `new(initSub.DatabaseUpdate.Tables.Length)` ? - Dictionary> subscriptionInserts = new(capacity: cap); - - HashSet GetInsertHashSet(IRemoteTableHandle table, int tableSize) - { - if (!subscriptionInserts.TryGetValue(table, out var hashSet)) - { - hashSet = new HashSet(capacity: tableSize, comparer: ByteArrayComparer.Instance); - subscriptionInserts[table] = hashSet; - } - return hashSet; - } // First apply all of the state foreach (var (table, update) in GetTables(initSub.DatabaseUpdate)) { - var hashSet = GetInsertHashSet(table, (int)update.NumRows); - - PreProcessInsertOnlyTable(table, update, dbOps, hashSet); + PreProcessInsertOnlyTable(table, update, dbOps); } - return (dbOps, subscriptionInserts); + return dbOps; } /// /// TODO: the dictionary is here for backwards compatibility and can be removed /// once we get rid of legacy subscriptions. /// - (List, Dictionary>) PreProcessSubscribeApplied(SubscribeApplied subscribeApplied) + ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied) { - var table = Db.GetTable(subscribeApplied.Rows.TableName) ?? throw new Exception($"Unknown table name: {subscribeApplied.Rows.TableName}"); - var dbOps = new List(); - HashSet inserts = new(comparer: ByteArrayComparer.Instance); - - PreProcessInsertOnlyTable(table, subscribeApplied.Rows.TableRows, dbOps, inserts); - - var result = new Dictionary> + var dbOps = ProcessedDatabaseUpdate.New(); + foreach (var (table, update) in GetTables(subscribeMultiApplied.Update)) { - [table] = inserts - }; - - return (dbOps, result); + PreProcessInsertOnlyTable(table, update, dbOps); + } + return dbOps; } - void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, List dbOps, HashSet inserts) + void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) { - // In debug mode, make sure we use a byte array comparer in HashSet and not a reference-equal `byte[]` by accident. - Debug.Assert(inserts.Comparer is ByteArrayComparer); + var delta = dbOps.DeltaForTable(table); foreach (var cqu in update.Updates) { @@ -425,133 +418,74 @@ void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, Lis } foreach (var bin in BsatnRowListIter(qu.Inserts)) { - if (!inserts.Add(bin)) - { - // Ignore duplicate inserts in the same subscription update. - continue; - } - var obj = table.DecodeValue(bin); - var op = new DbOp - { - table = table, - insert = new(obj, bin), - }; - dbOps.Add(op); + var obj = Decode(table, bin, out var pk); + delta.Add(pk, obj); } } } - - /// - /// TODO: the dictionary is here for backwards compatibility and can be removed - /// once we get rid of legacy subscriptions. - /// - List PreProcessUnsubscribeApplied(UnsubscribeApplied unsubApplied) + void PreProcessDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) { - var table = Db.GetTable(unsubApplied.Rows.TableName) ?? throw new Exception($"Unknown table name: {unsubApplied.Rows.TableName}"); - var dbOps = new List(); - - // First apply all of the state - foreach (var cqu in unsubApplied.Rows.TableRows.Updates) + var delta = dbOps.DeltaForTable(table); + foreach (var cqu in update.Updates) { var qu = DecompressDecodeQueryUpdate(cqu); if (qu.Inserts.RowsData.Count > 0) { - Log.Warn("Non-insert during an UnsubscribeApplied!"); + Log.Warn("Non-delete during a delete-only operation!"); } foreach (var bin in BsatnRowListIter(qu.Deletes)) { - var obj = table.DecodeValue(bin); - var op = new DbOp - { - table = table, - delete = new(obj, bin), - }; - dbOps.Add(op); + var obj = Decode(table, bin, out var pk); + delta.Remove(pk, obj); } } - - return dbOps; } + void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + { + var delta = dbOps.DeltaForTable(table); + foreach (var cqu in update.Updates) + { + var qu = DecompressDecodeQueryUpdate(cqu); + // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once + // to the table, it doesn't matter that we call Add before Remove here. - List PreProcessDatabaseUpdate(DatabaseUpdate updates) - { - var dbOps = new List(); + foreach (var bin in BsatnRowListIter(qu.Inserts)) + { + var obj = Decode(table, bin, out var pk); + delta.Add(pk, obj); + } + foreach (var bin in BsatnRowListIter(qu.Deletes)) + { + var obj = Decode(table, bin, out var pk); + delta.Remove(pk, obj); + } + } - // All row updates that have a primary key, this contains inserts, deletes and updates. - // TODO: is there any guarantee that transaction update contains each table only once, aka updates are already grouped by table? - // If so, we could simplify this and other methods by moving the dictionary inside the main loop and using only the primary key as key. - var primaryKeyChanges = new Dictionary<(IRemoteTableHandle table, object primaryKeyValue), DbOp>(); + } - // First apply all of the state - foreach (var (table, update) in GetTables(updates)) + ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied) + { + var dbOps = ProcessedDatabaseUpdate.New(); + + foreach (var (table, update) in GetTables(unsubMultiApplied.Update)) { - foreach (var cqu in update.Updates) - { - var qu = DecompressDecodeQueryUpdate(cqu); - foreach (var row in BsatnRowListIter(qu.Inserts)) - { - var op = new DbOp { table = table, insert = Decode(table, row, out var pk) }; - if (pk != null) - { - // Compound key that we use for lookup. - // Consists of the table handle (for faster comparison that string names) + actual primary key of the row. - var key = (table, pk); + PreProcessDeleteOnlyTable(table, update, dbOps); + } - if (primaryKeyChanges.TryGetValue(key, out var oldOp)) - { - if (oldOp.insert is not null) - { - Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}"); - // TODO(jdetter): Is this a correctable error? This would be a major error on the - // SpacetimeDB side. - continue; - } - - op.delete = oldOp.delete; - } - primaryKeyChanges[key] = op; - } - else - { - dbOps.Add(op); - } - } + return dbOps; + } - foreach (var row in BsatnRowListIter(qu.Deletes)) - { - var op = new DbOp { table = table, delete = Decode(table, row, out var pk) }; - if (pk != null) - { - // Compound key that we use for lookup. - // Consists of the table handle (for faster comparison that string names) + actual primary key of the row. - var key = (table, pk); + ProcessedDatabaseUpdate PreProcessDatabaseUpdate(DatabaseUpdate updates) + { + var dbOps = ProcessedDatabaseUpdate.New(); - if (primaryKeyChanges.TryGetValue(key, out var oldOp)) - { - if (oldOp.delete is not null) - { - Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}"); - // TODO(jdetter): Is this a correctable error? This would be a major error on the - // SpacetimeDB side. - continue; - } - - op.insert = oldOp.insert; - } - primaryKeyChanges[key] = op; - } - else - { - dbOps.Add(op); - } - } - } + foreach (var (table, update) in GetTables(updates)) + { + PreProcessTable(table, update, dbOps); } - // Combine primary key updates and non-primary key updates - dbOps.AddRange(primaryKeyChanges.Values); return dbOps; } @@ -569,29 +503,32 @@ void PreProcessOneOffQuery(OneOffQueryResponse resp) resultSource.SetResult(resp); } - PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) + ProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) { - var dbOps = new List(); + var dbOps = ProcessedDatabaseUpdate.New(); var message = DecompressDecodeMessage(unprocessed.bytes); ReducerEvent? reducerEvent = default; - // This is all of the inserts, used for updating the stale but un-cleared client cache. - Dictionary>? subscriptionInserts = null; - switch (message) { case ServerMessage.InitialSubscription(var initSub): - (dbOps, subscriptionInserts) = PreProcessLegacySubscription(initSub); + dbOps = PreProcessLegacySubscription(initSub); break; case ServerMessage.SubscribeApplied(var subscribeApplied): - (dbOps, subscriptionInserts) = PreProcessSubscribeApplied(subscribeApplied); + break; + case ServerMessage.SubscribeMultiApplied(var subscribeMultiApplied): + dbOps = PreProcessSubscribeMultiApplied(subscribeMultiApplied); break; case ServerMessage.SubscriptionError(var subscriptionError): + // do nothing; main thread will warn. break; case ServerMessage.UnsubscribeApplied(var unsubscribeApplied): - dbOps = PreProcessUnsubscribeApplied(unsubscribeApplied); + // do nothing; main thread will warn. + break; + case ServerMessage.UnsubscribeMultiApplied(var unsubscribeMultiApplied): + dbOps = PreProcessUnsubscribeMultiApplied(unsubscribeMultiApplied); break; case ServerMessage.TransactionUpdate(var transactionUpdate): // Convert the generic event arguments in to a domain specific event object @@ -636,40 +573,10 @@ PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) throw new InvalidOperationException(); } - return new PreProcessedMessage - { - processed = new ProcessedMessage { message = message, dbOps = dbOps, timestamp = unprocessed.timestamp, reducerEvent = reducerEvent }, - subscriptionInserts = subscriptionInserts, - }; + return new ProcessedMessage { message = message, dbOps = dbOps, timestamp = unprocessed.timestamp, reducerEvent = reducerEvent }; } } - ProcessedMessage CalculateStateDiff(PreProcessedMessage preProcessedMessage) - { - var processed = preProcessedMessage.processed; - - // Perform the state diff, this has to be done on the main thread because we have to touch - // the client cache. - if (preProcessedMessage.subscriptionInserts is { } subscriptionInserts) - { - foreach (var (table, hashSet) in subscriptionInserts) - { - foreach (var (rowBytes, oldValue) in table.IterEntries().Where(kv => !hashSet.Contains(kv.Key))) - { - processed.dbOps.Add(new DbOp - { - table = table, - // This is a row that we had before, but we do not have it now. - // This must have been a delete. - delete = new(oldValue, rowBytes), - }); - } - } - } - - return processed; - } - public void Disconnect() { isClosing = true; @@ -720,81 +627,30 @@ void IDbConnection.Connect(string? token, string uri, string addressOrName, Comp } } - private void OnMessageProcessCompleteUpdate(IEventContext eventContext, List dbOps) + + private void OnMessageProcessCompleteUpdate(IEventContext eventContext, ProcessedDatabaseUpdate dbOps) { // First trigger OnBeforeDelete - foreach (var update in dbOps) + foreach (var (table, update) in dbOps.Updates) { - if (update is { delete: { value: var oldValue }, insert: null }) - { - try - { - update.table.InvokeBeforeDelete(eventContext, oldValue); - } - catch (Exception e) - { - Log.Exception(e); - } - } + table.PreApply(eventContext, update); } - // Apply all of the state - for (var i = 0; i < dbOps.Count; i++) + foreach (var (table, update) in dbOps.Updates) { - // TODO: Reimplement updates when we add support for primary keys - var update = dbOps[i]; - - if (update.delete is { } delete) - { - if (!update.table.DeleteEntry(delete.bytes)) - { - update.delete = null; - dbOps[i] = update; - } - } - - if (update.insert is { } insert) - { - if (!update.table.InsertEntry(insert.bytes, insert.value)) - { - update.insert = null; - dbOps[i] = update; - } - } + table.Apply(eventContext, update); } - // Send out events - foreach (var dbOp in dbOps) + foreach (var (table, _) in dbOps.Updates) { - try - { - switch (dbOp) - { - case { insert: { value: var newValue }, delete: { value: var oldValue } }: - dbOp.table.InvokeUpdate(eventContext, oldValue, newValue); - break; - - case { insert: { value: var newValue } }: - dbOp.table.InvokeInsert(eventContext, newValue); - break; - - case { delete: { value: var oldValue } }: - dbOp.table.InvokeDelete(eventContext, oldValue); - break; - } - } - catch (Exception e) - { - Log.Exception(e); - } + table.PostApply(eventContext); } } protected abstract bool Dispatch(IReducerEventContext context, Reducer reducer); - private void OnMessageProcessComplete(PreProcessedMessage preProcessed) + private void OnMessageProcessComplete(ProcessedMessage processed) { - var processed = CalculateStateDiff(preProcessed); var message = processed.message; var dbOps = processed.dbOps; var timestamp = processed.timestamp; @@ -824,17 +680,21 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed) } case ServerMessage.SubscribeApplied(var subscribeApplied): + Log.Warn($"Unexpected SubscribeApplied (we only expect to get SubscribeMultiApplied): {subscribeApplied}"); + break; + + case ServerMessage.SubscribeMultiApplied(var subscribeMultiApplied): { stats.ParseMessageTracker.InsertRequest(timestamp, $"type={nameof(ServerMessage.SubscribeApplied)}"); - stats.SubscriptionRequestTracker.FinishTrackingRequest(subscribeApplied.RequestId); + stats.SubscriptionRequestTracker.FinishTrackingRequest(subscribeMultiApplied.RequestId); var eventContext = MakeSubscriptionEventContext(); var legacyEventContext = ToEventContext(new Event.SubscribeApplied()); OnMessageProcessCompleteUpdate(legacyEventContext, dbOps); - if (subscriptions.TryGetValue(subscribeApplied.QueryId.Id, out var subscription)) + if (subscriptions.TryGetValue(subscribeMultiApplied.QueryId.Id, out var subscription)) { try { - subscription.OnApplied(eventContext, new SubscriptionAppliedType.Active(subscribeApplied.QueryId)); + subscription.OnApplied(eventContext, new SubscriptionAppliedType.Active(subscribeMultiApplied.QueryId)); } catch (Exception e) { @@ -882,13 +742,17 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed) } case ServerMessage.UnsubscribeApplied(var unsubscribeApplied): + Log.Warn($"Unexpected UnsubscribeApplied (we only expect to get UnsubscribeMultiApplied): {unsubscribeApplied}"); + break; + + case ServerMessage.UnsubscribeMultiApplied(var unsubscribeMultiApplied): { stats.ParseMessageTracker.InsertRequest(timestamp, $"type={nameof(ServerMessage.UnsubscribeApplied)}"); - stats.SubscriptionRequestTracker.FinishTrackingRequest(unsubscribeApplied.RequestId); + stats.SubscriptionRequestTracker.FinishTrackingRequest(unsubscribeMultiApplied.RequestId); var eventContext = MakeSubscriptionEventContext(); var legacyEventContext = ToEventContext(new Event.UnsubscribeApplied()); OnMessageProcessCompleteUpdate(legacyEventContext, dbOps); - if (subscriptions.TryGetValue(unsubscribeApplied.QueryId.Id, out var subscription)) + if (subscriptions.TryGetValue(unsubscribeMultiApplied.QueryId.Id, out var subscription)) { try { @@ -929,8 +793,6 @@ private void OnMessageProcessComplete(PreProcessedMessage preProcessed) } } - - if (processed.reducerEvent is { } reducerEvent) { var legacyEventContext = ToEventContext(new Event.Reducer(reducerEvent)); @@ -1007,7 +869,7 @@ void IDbConnection.LegacySubscribe(ISubscriptionHandle handle, string[] querySql )); } - void IDbConnection.Subscribe(ISubscriptionHandle handle, string querySql) + void IDbConnection.Subscribe(ISubscriptionHandle handle, string[] querySqls) { if (!webSocket.IsConnected) { @@ -1020,11 +882,11 @@ void IDbConnection.Subscribe(ISubscriptionHandle handle, string querySql) // casting request IDs to query IDs anywhere in the new code path. var queryId = queryIdAllocator.Next(); subscriptions[queryId] = handle; - webSocket.Send(new ClientMessage.SubscribeSingle( - new SubscribeSingle + webSocket.Send(new ClientMessage.SubscribeMulti( + new SubscribeMulti { RequestId = id, - Query = querySql, + QueryStrings = querySqls.ToList(), QueryId = new QueryId(queryId), } )); @@ -1110,7 +972,7 @@ void IDbConnection.Unsubscribe(QueryId queryId) var requestId = stats.SubscriptionRequestTracker.StartTrackingRequest(); - webSocket.Send(new ClientMessage.Unsubscribe(new() + webSocket.Send(new ClientMessage.UnsubscribeMulti(new() { RequestId = requestId, QueryId = queryId @@ -1139,4 +1001,32 @@ public uint Next() return lastAllocated; } } + internal readonly struct DbValue + { + public readonly IStructuralReadWrite value; + public readonly byte[] bytes; + + public DbValue(IStructuralReadWrite value, byte[] bytes) + { + this.value = value; + this.bytes = bytes; + } + + // TODO: having a nice ToString here would give better way better errors when applying table deltas, + // but it's tricky to do that generically. + } + + /// + /// DbValue comparer that uses BSATN-encoded records to compare DbValues for equality. + /// + internal readonly struct DbValueComparer : IEqualityComparer + { + public static DbValueComparer Instance = new(); + + public bool Equals(DbValue x, DbValue y) => + ByteArrayComparer.Instance.Equals(x.bytes, y.bytes); + + public int GetHashCode(DbValue obj) => + ByteArrayComparer.Instance.GetHashCode(obj.bytes); + } } diff --git a/sdks/csharp/src/Table.cs b/sdks/csharp/src/Table.cs index 35d76bb496d..926622521a5 100644 --- a/sdks/csharp/src/Table.cs +++ b/sdks/csharp/src/Table.cs @@ -1,8 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; - using SpacetimeDB.BSATN; namespace SpacetimeDB @@ -24,16 +24,31 @@ public interface IRemoteTableHandle internal Type ClientTableType { get; } internal IEnumerable> IterEntries(); - internal bool InsertEntry(byte[] rowBytes, IStructuralReadWrite value); - internal bool DeleteEntry(byte[] rowBytes); internal IStructuralReadWrite DecodeValue(byte[] bytes); - internal void InvokeInsert(IEventContext context, IStructuralReadWrite row); - internal void InvokeDelete(IEventContext context, IStructuralReadWrite row); - internal void InvokeBeforeDelete(IEventContext context, IStructuralReadWrite row); - internal void InvokeUpdate(IEventContext context, IStructuralReadWrite oldRow, IStructuralReadWrite newRow); + /// + /// Start applying a delta to the table. + /// This is called for all tables before any updates are actually applied, allowing OnBeforeDelete to be invoked correctly. + /// + /// + internal void PreApply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta); + + /// + /// Apply a delta to the table. + /// Should not invoke any user callbacks, since not all tables have been updated yet. + /// Should fix up indices, to be ready for PostApply. + /// + /// + internal void Apply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta); + + /// + /// Finish applying a delta to a table. + /// This is when row callbacks (besides OnBeforeDelete) actually happen. + /// + internal void PostApply(IEventContext context); } + public abstract class RemoteTableHandle : RemoteBase, IRemoteTableHandle where EventContext : class, IEventContext where Row : class, IStructuralReadWrite, new() @@ -105,6 +120,7 @@ public RemoteTableHandle(IDbConnection conn) : base(conn) { } // TODO: figure out if they can be merged into regular OnInsert / OnDelete. // I didn't do that because that delays the index updates until after the row is processed. // In theory, that shouldn't be the issue, but I didn't want to break it right before leaving :) + // - Ingvar private event Action? OnInternalInsert; private event Action? OnInternalDelete; @@ -114,76 +130,206 @@ public RemoteTableHandle(IDbConnection conn) : base(conn) { } // These are provided by RemoteTableHandle. Type IRemoteTableHandle.ClientTableType => typeof(Row); - private readonly Dictionary Entries = new(Internal.ByteArrayComparer.Instance); + // THE DATA IN THE TABLE. + // The keys of this map are: + // - Primary keys, if we have them. + // - Byte arrays, if we don't. + // But really, the keys are whatever SpacetimeDBClient chooses to give us. + // + // We store the BSATN encodings of objects next to their runtime representation. + // This is memory-inefficient, but allows us to quickly compare objects when seeing if an update is a "real" + // update or just a multiplicity change. + private readonly MultiDictionary Entries = new(GenericEqualityComparer.Instance, DbValueComparer.Instance); IEnumerable> IRemoteTableHandle.IterEntries() => - Entries.Select(kv => new KeyValuePair(kv.Key, kv.Value)); + Entries.Entries.Select(kv => new KeyValuePair(kv.Value.bytes, kv.Value.value)); - /// - /// Inserts the value into the table. There can be no existing value with the provided BSATN bytes. - /// - /// The BSATN encoded bytes of the row to retrieve. - /// The parsed row encoded by the rowBytes. - /// True if the row was inserted, false if the row wasn't inserted because it was a duplicate. - bool IRemoteTableHandle.InsertEntry(byte[] rowBytes, IStructuralReadWrite value) + // The function to use for decoding a type value. + IStructuralReadWrite IRemoteTableHandle.DecodeValue(byte[] bytes) => BSATNHelpers.Decode(bytes); + + public delegate void RowEventHandler(EventContext context, Row row); + public event RowEventHandler? OnInsert; + public event RowEventHandler? OnDelete; + public event RowEventHandler? OnBeforeDelete; + + public delegate void UpdateEventHandler(EventContext context, Row oldRow, Row newRow); + public event UpdateEventHandler? OnUpdate; + + public int Count => (int)Entries.CountDistinct; + + public IEnumerable Iter() => Entries.Entries.Select(entry => (Row)entry.Value.value); + + public Task RemoteQuery(string query) => + conn.RemoteQuery($"SELECT {RemoteTableName}.* FROM {RemoteTableName} {query}"); + + void InvokeInsert(IEventContext context, IStructuralReadWrite row) { - var row = (Row)value; - if (Entries.TryAdd(rowBytes, row)) + try { - OnInternalInsert?.Invoke(row); - return true; + OnInsert?.Invoke((EventContext)context, (Row)row); } - else + catch (Exception e) { - return false; + Log.Exception(e); } } - /// - /// Deletes a value from the table. - /// - /// The BSATN encoded bytes of the row to remove. - /// True if and only if the value was previously resident and has been deleted. - bool IRemoteTableHandle.DeleteEntry(byte[] rowBytes) + void InvokeDelete(IEventContext context, IStructuralReadWrite row) { - if (Entries.Remove(rowBytes, out var row)) + try + { + OnDelete?.Invoke((EventContext)context, (Row)row); + } + catch (Exception e) { - OnInternalDelete?.Invoke(row); - return true; + Log.Exception(e); } + } - Log.Warn("Deleting value that we don't have (no cached value available)"); - return false; + void InvokeBeforeDelete(IEventContext context, IStructuralReadWrite row) + { + try + { + OnBeforeDelete?.Invoke((EventContext)context, (Row)row); + } + catch (Exception e) + { + Log.Exception(e); + } } - // The function to use for decoding a type value. - IStructuralReadWrite IRemoteTableHandle.DecodeValue(byte[] bytes) => BSATNHelpers.Decode(bytes); + void InvokeUpdate(IEventContext context, IStructuralReadWrite oldRow, IStructuralReadWrite newRow) + { + try + { + OnUpdate?.Invoke((EventContext)context, (Row)oldRow, (Row)newRow); + } + catch (Exception e) + { + Log.Exception(e); + } + } - public delegate void RowEventHandler(EventContext context, Row row); - public event RowEventHandler? OnInsert; - public event RowEventHandler? OnDelete; - public event RowEventHandler? OnBeforeDelete; + List> wasInserted = new(); + List<(object key, DbValue oldValue, DbValue newValue)> wasUpdated = new(); + List> wasRemoved = new(); - public delegate void UpdateEventHandler(EventContext context, Row oldRow, Row newRow); - public event UpdateEventHandler? OnUpdate; + void IRemoteTableHandle.PreApply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta) + { + Debug.Assert(wasInserted.Count == 0 && wasUpdated.Count == 0 && wasRemoved.Count == 0, "Call Apply and PostApply before calling PreApply again"); - public int Count => Entries.Count; + foreach (var (_, value) in Entries.WillRemove(multiDictionaryDelta)) + { + InvokeBeforeDelete(context, value.value); + } + } - public IEnumerable Iter() => Entries.Values; + void IRemoteTableHandle.Apply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta) + { + try + { + Entries.Apply(multiDictionaryDelta, wasInserted, wasUpdated, wasRemoved); + } + catch (Exception e) + { + var deltaString = multiDictionaryDelta.ToString(); + deltaString = deltaString[..Math.Min(deltaString.Length, 10_000)]; + var entriesString = Entries.ToString(); + entriesString = entriesString[..Math.Min(entriesString.Length, 10_000)]; + throw new Exception($"While table `{RemoteTableName}` was applying:\n{deltaString} \nto:\n{entriesString}", e); + } - public Task RemoteQuery(string query) => - conn.RemoteQuery($"SELECT {RemoteTableName}.* FROM {RemoteTableName} {query}"); + // Update indices. + // This is a local operation -- it only looks at our indices and doesn't invoke user code. + // So we don't need to wait for other tables to be updated to do it. + // (And we need to do it before any PostApply is called.) + foreach (var (_, value) in wasInserted) + { + if (value.value is Row newRow) + { + OnInternalInsert?.Invoke(newRow); + } + else + { + throw new Exception($"Invalid row type for table {RemoteTableName}: {value.value.GetType().Name}"); + } + } + foreach (var (_, oldValue, newValue) in wasUpdated) + { + if (oldValue.value is Row oldRow) + { + OnInternalDelete?.Invoke((Row)oldValue.value); + } + else + { + throw new Exception($"Invalid row type for table {RemoteTableName}: {oldValue.value.GetType().Name}"); + } + + + if (newValue.value is Row newRow) + { + OnInternalInsert?.Invoke(newRow); + } + else + { + throw new Exception($"Invalid row type for table {RemoteTableName}: {newValue.value.GetType().Name}"); + } + } - void IRemoteTableHandle.InvokeInsert(IEventContext context, IStructuralReadWrite row) => - OnInsert?.Invoke((EventContext)context, (Row)row); + foreach (var (_, value) in wasRemoved) + { + if (value.value is Row oldRow) + { + OnInternalDelete?.Invoke(oldRow); + } + } + } - void IRemoteTableHandle.InvokeDelete(IEventContext context, IStructuralReadWrite row) => - OnDelete?.Invoke((EventContext)context, (Row)row); + void IRemoteTableHandle.PostApply(IEventContext context) + { + foreach (var (_, value) in wasInserted) + { + InvokeInsert(context, value.value); + } + foreach (var (_, oldValue, newValue) in wasUpdated) + { + InvokeUpdate(context, oldValue.value, newValue.value); + } + foreach (var (_, value) in wasRemoved) + { + InvokeDelete(context, value.value); + } + wasInserted.Clear(); + wasUpdated.Clear(); + wasRemoved.Clear(); - void IRemoteTableHandle.InvokeBeforeDelete(IEventContext context, IStructuralReadWrite row) => - OnBeforeDelete?.Invoke((EventContext)context, (Row)row); + } + } + + /// + /// Compare objects for equality. If they are byte arrays, use Internal.ByteArrayComparer. + /// + internal readonly struct GenericEqualityComparer : IEqualityComparer + { + public static GenericEqualityComparer Instance = new(); + + public new bool Equals(object x, object y) + { + if (x is byte[] x_ && y is byte[] y_) + { + return Internal.ByteArrayComparer.Instance.Equals(x_, y_); + } + return x.Equals(y); // MAKE SURE to use .Equals and not ==... that was a bug. + } + + public int GetHashCode(object obj) + { + if (obj is byte[] obj_) + { + return Internal.ByteArrayComparer.Instance.GetHashCode(obj_); + } + return obj.GetHashCode(); + } - void IRemoteTableHandle.InvokeUpdate(IEventContext context, IStructuralReadWrite oldRow, IStructuralReadWrite newRow) => - OnUpdate?.Invoke((EventContext)context, (Row)oldRow, (Row)newRow); } } diff --git a/sdks/csharp/tests~/MultiDictionaryTests.cs b/sdks/csharp/tests~/MultiDictionaryTests.cs new file mode 100644 index 00000000000..8b5859a451d --- /dev/null +++ b/sdks/csharp/tests~/MultiDictionaryTests.cs @@ -0,0 +1,239 @@ +using System.Diagnostics; +using CsCheck; +using SpacetimeDB; +using Xunit; + +public class MultiDictionaryTests +{ + /// + /// Generate a list of KeyValuePairs. + /// If any two items of the list have duplicate Keys, they are guaranteed to have duplicate Values. + /// + /// + /// + /// + /// + /// + /// + Gen>> ListWithNormalizedDuplicates(Gen g1, Gen g2, IEqualityComparer equality, int ListMinLength = 0, int ListMaxLength = 32) + where TKey : notnull + { + return Gen.Select(g1, g2, (b1, b2) => new KeyValuePair(b1, b2)).List[ListMinLength, ListMaxLength].Select(list => + { + Dictionary seenKeys = new(equality); + for (var i = 0; i < list.Count; i++) + { + var (b1, b2) = list[i]; + if (seenKeys.ContainsKey(b1)) + { + list[i] = new(b1, seenKeys[b1]); + } + else + { + seenKeys[b1] = b2; + } + } + return list; + }); + } + + [Fact] + public void Equality() + { + ListWithNormalizedDuplicates(Gen.Byte[1, 10], Gen.Byte[1, 10], EqualityComparer.Default).Sample(list => + { + var m1 = MultiDictionary.FromEnumerable(list, EqualityComparer.Default, EqualityComparer.Default); + Gen.Shuffle(list); + var m2 = MultiDictionary.FromEnumerable(list, EqualityComparer.Default, EqualityComparer.Default); + + Assert.Equal(m1, m2); + }); + + ListWithNormalizedDuplicates(Gen.Byte[1, 10].Array[1, 10], Gen.Byte[1, 10], SpacetimeDB.Internal.ByteArrayComparer.Instance).Sample(list => + { + var m1 = MultiDictionary.FromEnumerable(list, SpacetimeDB.Internal.ByteArrayComparer.Instance, EqualityComparer.Default); + Gen.Shuffle(list); + var m2 = MultiDictionary.FromEnumerable(list, SpacetimeDB.Internal.ByteArrayComparer.Instance, EqualityComparer.Default); + + Assert.Equal(m1, m2); + }); + + } + + /// + /// Generate a list of KeyValuePairs, and a list of bools that say whether or not to remove that key-value pair. + /// + /// + /// + /// + /// + /// + /// + /// + Gen<(List>, List)> ListWithRemovals(Gen g1, Gen g2, IEqualityComparer equality, + int maxLength = 32) + where TKey : notnull + => Gen.SelectMany( + Gen.Int[0, maxLength], (listLength) => Gen.Select( + // the data itself + ListWithNormalizedDuplicates(g1, g2, equality, listLength, listLength), + // whether this element should be added or removed + Gen.Bool.List[listLength] + )); + + [Fact] + public void Removals() + { + ListWithRemovals(Gen.Byte[1, 10], Gen.Byte[1, 10], EqualityComparer.Default).Sample((list, removals) => + { + var m1 = MultiDictionary.FromEnumerable(list, EqualityComparer.Default, EqualityComparer.Default); + var m2 = new MultiDictionary(EqualityComparer.Default, EqualityComparer.Default); + foreach (var (kvp, remove) in list.Zip(removals)) + { + if (remove) + { + m1.Remove(kvp.Key, out var _); + } + else + { + m2.Add(kvp.Key, kvp.Value); + } + } + + Assert.Equal(m1, m2); + }); + } + + // Check that MultiDictionaryDelta is in fact a CRDT. + [Fact] + public void ShuffleDelta() + { + ListWithRemovals(Gen.Byte[1, 10], Gen.Byte[1, 10], EqualityComparer.Default).Sample((list, removals) => + { + var m1 = new MultiDictionaryDelta(EqualityComparer.Default, EqualityComparer.Default); + var m2 = new MultiDictionaryDelta(EqualityComparer.Default, EqualityComparer.Default); + var listRemovals = list.Zip(removals).ToList(); + foreach (var (kvp, remove) in listRemovals) + { + if (remove) + { + m1.Remove(kvp.Key, kvp.Value); + } + else + { + m1.Add(kvp.Key, kvp.Value); + } + } + Gen.Shuffle(listRemovals); + foreach (var (kvp, remove) in listRemovals) + { + if (remove) + { + m2.Remove(kvp.Key, kvp.Value); + } + else + { + m2.Add(kvp.Key, kvp.Value); + } + } + + Assert.Equal(m1, m2); + }); + } + + // Note: this does not check proper batch updates yet, since I wasn't sure how to randomly generate them properly. + [Fact] + public void ChunkedRemovals() + { + var maxLength = 32; + Gen.Select(ListWithRemovals(Gen.Byte[1, 10], Gen.Byte[1, 10], EqualityComparer.Default, maxLength), Gen.Int[0, 32].List[0, 5]).Sample((listRemovals, cuts) => + { + // When looking at test failures for this test, keep in mind we do some post-processing of the sample input data. + // Probably there's a better way to rewrite it... + var (list, removals) = listRemovals; + cuts.Add(0); + cuts.Add(maxLength); + cuts = cuts.Select(cut => int.Min(cut, list.Count)).ToList(); + cuts.Sort(); + + var viaAddRemove = new MultiDictionary(EqualityComparer.Default, EqualityComparer.Default); + var viaChunkDeltas = new MultiDictionary(EqualityComparer.Default, EqualityComparer.Default); + + if (list.Count == 0) + { + return; + } + + foreach (var (start, end) in cuts.Zip(cuts.Skip(1))) + { + var delta = new MultiDictionaryDelta(EqualityComparer.Default, EqualityComparer.Default); + + var (start_, end_) = (int.Min(start, list.Count), int.Min(end, list.Count)); + + foreach (var (item, remove) in list[start..end].Zip(removals[start..end])) + { + // it's an error to remove-too-many-times with the Delta api. + // so, don't remove anything we don't have. + var remove_ = remove && viaAddRemove.Contains(item); + if (remove_) + { + viaAddRemove.Remove(item.Key, out var _); + delta.Remove(item.Key, item.Value); + } + else + { + viaAddRemove.Add(item.Key, item.Value); + delta.Add(item.Key, item.Value); + } + } + foreach (var (key, value) in viaChunkDeltas.WillRemove(delta)) + { + Assert.True(viaChunkDeltas.Contains(new(key, value))); + } + var wasInserted = new List>(); + var wasMaybeUpdated = new List<(byte key, byte oldValue, byte newValue)>(); + var wasRemoved = new List>(); + + viaChunkDeltas.Apply(delta, wasInserted, wasMaybeUpdated, wasRemoved); + foreach (var (key, value) in wasInserted) + { + Assert.True(viaChunkDeltas.Contains(new(key, value))); + } + foreach (var (key, oldValue, newValue) in wasMaybeUpdated) + { + Assert.True(viaChunkDeltas.Contains(new(key, newValue)) && oldValue == newValue); + } + foreach (var (key, value) in wasRemoved) + { + Assert.False(viaChunkDeltas.Contains(new(key, value))); + } + Assert.Equal(viaAddRemove, viaChunkDeltas); + } + }, iter: 10_000); + } + + [Fact] + public void IdentitiesWorkAsPrimaryKeys() + { + // GenericEqualityComparer used to have a bug, this is a regression test for that. + var identity = Identity.From(Convert.FromBase64String("l0qzG1GPRtC1mwr+54q98tv0325gozLc6cNzq4vrzqY=")); + var hashSet = new HashSet(GenericEqualityComparer.Instance) + { + identity + }; + Debug.Assert(hashSet.Contains(identity)); + + var dict = new MultiDictionary(GenericEqualityComparer.Instance, EqualityComparer.Default); + + dict.Add(identity, 3); + dict.Add(identity, 3); + + var delta = new MultiDictionaryDelta(GenericEqualityComparer.Instance, EqualityComparer.Default); + delta.Remove(identity, 3); + delta.Remove(identity, 3); + var wasInserted = new List>(); + var wasMaybeUpdated = new List<(object key, byte oldValue, byte newValue)>(); + var wasRemoved = new List>(); + dict.Apply(delta, wasInserted, wasMaybeUpdated, wasRemoved); + } +} \ No newline at end of file diff --git a/sdks/csharp/tests~/SnapshotTests.VerifySampleDump_dumpName=SubscribeApplied.verified.txt b/sdks/csharp/tests~/SnapshotTests.VerifySampleDump_dumpName=SubscribeApplied.verified.txt index 353c6ad230c..5a07da7749a 100644 --- a/sdks/csharp/tests~/SnapshotTests.VerifySampleDump_dumpName=SubscribeApplied.verified.txt +++ b/sdks/csharp/tests~/SnapshotTests.VerifySampleDump_dumpName=SubscribeApplied.verified.txt @@ -591,10 +591,7 @@ requestsAwaitingResponse: 6 }, OneOffRequestTracker: {}, - SubscriptionRequestTracker: { - sampleCount: 1, - requestsAwaitingResponse: 2 - }, + SubscriptionRequestTracker: {}, AllReducersTracker: { sampleCount: 9 }, diff --git a/sdks/csharp/tests~/SnapshotTests.cs b/sdks/csharp/tests~/SnapshotTests.cs index 11c2fa71b43..eab22149c6f 100644 --- a/sdks/csharp/tests~/SnapshotTests.cs +++ b/sdks/csharp/tests~/SnapshotTests.cs @@ -118,7 +118,7 @@ List updates } }); - private static ServerMessage.SubscribeApplied SampleSubscribeApplied( + private static ServerMessage.SubscribeMultiApplied SampleSubscribeApplied( uint requestId, uint queryId, ulong hostExecutionDuration, @@ -126,19 +126,12 @@ TableUpdate tableUpdate ) => new(new() { RequestId = requestId, - TotalHostExecutionDurationMicros = hostExecutionDuration, QueryId = new(queryId), - Rows = new() - { - // This message contains redundant data, shrug. - // Copy out the redundant fields. - TableId = tableUpdate.TableId, - TableName = tableUpdate.TableName, - TableRows = tableUpdate - } + TotalHostExecutionDurationMicros = hostExecutionDuration, + Update = new(new List { tableUpdate }) }); - private static ServerMessage.UnsubscribeApplied SampleUnsubscribeApplied( + private static ServerMessage.UnsubscribeMultiApplied SampleUnsubscribeApplied( uint requestId, uint queryId, ulong hostExecutionDuration, @@ -148,14 +141,7 @@ TableUpdate tableUpdate RequestId = requestId, TotalHostExecutionDurationMicros = hostExecutionDuration, QueryId = new(queryId), - Rows = new() - { - // This message contains redundant data, shrug. - // Copy out the redundant fields. - TableId = tableUpdate.TableId, - TableName = tableUpdate.TableName, - TableRows = tableUpdate - } + Update = new(new List { tableUpdate }) }); private static ServerMessage.SubscriptionError SampleSubscriptionError( diff --git a/sdks/csharp/tests~/tests.csproj b/sdks/csharp/tests~/tests.csproj index 804fa736c4d..87b71a1fa10 100644 --- a/sdks/csharp/tests~/tests.csproj +++ b/sdks/csharp/tests~/tests.csproj @@ -17,6 +17,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + diff --git a/sdks/csharp/unity-tests~ b/sdks/csharp/unity-tests~ index 3b4f71a2f68..25fed011bff 160000 --- a/sdks/csharp/unity-tests~ +++ b/sdks/csharp/unity-tests~ @@ -1 +1 @@ -Subproject commit 3b4f71a2f6854b91554dc38597c03f88f91b4966 +Subproject commit 25fed011bff683af70cca7a4ddd66fe84dfb6ed4