Skip to content

Commit 2fde31b

Browse files
committed
Fix unsubscribe bug found by testing against Bitcraft
Warning message fix (thanks Jeff) Update for new Subscribe/UnsubscribeMulti thanks zeke Add MultiDictionary; update dispatching logic still wrong Add even more wacky data structures WHOOOOOO Fix MultiDictionary bug More asserts and comments Tests passing Better errors, purge dead package Test Update for blackholio Update blackholio Fix comment Remove dead One more dead COMMENT More comments Added a multiplicity test example Creates a small sample program with a Rust server and C# client that tests various subscriptions and outputs the results to the CLI. Make sure we insert before we delete Fix more comments Add CRDT test for MultiDictionaryDelta Revert "Added a multiplicity test example" This reverts commit ff10925. Undo DLL changes
1 parent bc8d3b3 commit 2fde31b

21 files changed

+1291
-379
lines changed

examples~/quickstart/client/Program.cs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ void PrintMessage(RemoteTables tables, Message message)
9494

9595
void Message_OnInsert(EventContext ctx, Message insertedValue)
9696
{
97+
9798
if (ctx.Event is not Event<Reducer>.SubscribeApplied)
9899
{
99100
PrintMessage(ctx.Db, insertedValue);
@@ -123,25 +124,15 @@ void OnConnect(DbConnection conn, Identity identity, string authToken)
123124
local_identity = identity;
124125
AuthToken.SaveToken(authToken);
125126

126-
var subscriptions = 0;
127-
Action<SubscriptionEventContext> waitForSubscriptions = (SubscriptionEventContext ctx) =>
128-
{
129-
// Note: callbacks are always invoked on the main thread, so you don't need to
130-
// worry about thread synchronization or anything like that.
131-
subscriptions += 1;
132-
133-
if (subscriptions == 2)
134-
{
135-
OnSubscriptionApplied(ctx);
136-
}
137-
};
138-
139127
var userSubscription = conn.SubscriptionBuilder()
140-
.OnApplied(waitForSubscriptions)
141-
.Subscribe("SELECT * FROM user");
142-
var messageSubscription = conn.SubscriptionBuilder()
143-
.OnApplied(waitForSubscriptions)
144-
.Subscribe("SELECT * FROM message");
128+
.OnApplied(OnSubscriptionApplied)
129+
.Subscribe(new string[] {
130+
"SELECT * FROM user",
131+
"SELECT * FROM message",
132+
// It is legal to have redundant subscriptions.
133+
// However, keep in mind that data will be sent over the wire multiple times,
134+
// once for each subscriptions. This can cause slowdowns if you aren't careful.
135+
"SELECT * FROM message" });
145136

146137
// You can also use SubscribeToAllTables, but it should be avoided if you have any large tables:
147138
// conn.SubscriptionBuilder().OnApplied(OnSubscriptionApplied).SubscribeToAllTables();

src/Event.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using SpacetimeDB.ClientApi;
34

45
namespace SpacetimeDB
@@ -89,8 +90,8 @@ Action<ErrorContext, Exception> callback
8990
}
9091

9192
public SubscriptionHandle<SubscriptionEventContext, ErrorContext> Subscribe(
92-
string querySql
93-
) => new(conn, Applied, Error, querySql);
93+
string[] querySqls
94+
) => new(conn, Applied, Error, querySqls);
9495

9596
public void SubscribeToAllTables()
9697
{
@@ -224,14 +225,14 @@ internal SubscriptionHandle(
224225
IDbConnection conn,
225226
Action<SubscriptionEventContext>? onApplied,
226227
Action<ErrorContext, Exception>? onError,
227-
string querySql
228+
string[] querySqls
228229
)
229230
{
230231
state = new SubscriptionState.Pending(new());
231232
this.onApplied = onApplied;
232233
this.onError = onError;
233234
this.conn = conn;
234-
conn.Subscribe(this, querySql);
235+
conn.Subscribe(this, querySqls);
235236
}
236237

237238
/// <summary>
@@ -254,11 +255,16 @@ public void UnsubscribeThen(Action<SubscriptionEventContext>? onEnded)
254255
{
255256
throw new Exception("Cannot unsubscribe from inactive subscription.");
256257
}
257-
if (onEnded != null)
258+
if (this.onEnded != null)
258259
{
259260
// TODO: should we just log here instead? Do we try not to throw exceptions on the main thread?
260261
throw new Exception("Unsubscribe already called.");
261262
}
263+
if (onEnded == null)
264+
{
265+
// We need to put something in there to use this as a boolean. Shrug emoji
266+
onEnded = (ctx) => { };
267+
}
262268
this.onEnded = onEnded;
263269
}
264270
}

0 commit comments

Comments
 (0)