Skip to content

Batch index updates #336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 103 additions & 50 deletions src/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,22 @@ public abstract class UniqueIndexBase<Column> : IndexBase<Column>

public UniqueIndexBase(RemoteTableHandle<EventContext, Row> table)
{
// Guaranteed to be a valid cast by contract of OnInternalInsert.
table.OnInternalInsert += row => cache.Add(GetKey((Row)row.Row), row);
// Guaranteed to be a valid cast by contract of OnInternalDelete.
table.OnInternalDelete += row => cache.Remove(GetKey((Row)row.Row));
table.OnInternalInsert += rows =>
{
foreach (var preHashed in rows)
{
// Guaranteed to be a valid cast by contract of OnInternalInsert.
cache.Add(GetKey((Row)preHashed.Row), preHashed);
}
};
table.OnInternalDelete += rows =>
{
foreach (var preHashed in rows)
{
// Guaranteed to be a valid cast by contract of OnInternalDelete.
cache.Remove(GetKey((Row)preHashed.Row));
}
};
}

public Row? Find(Column value) => cache.TryGetValue(value, out var row) ? (Row)row.Row : null;
Expand All @@ -99,44 +111,50 @@ public abstract class BTreeIndexBase<Column> : IndexBase<Column>

public BTreeIndexBase(RemoteTableHandle<EventContext, Row> table)
{
table.OnInternalInsert += preHashed =>
table.OnInternalInsert += preHashedRows =>
{
// Guaranteed to be a valid cast by contract of OnInternalInsert.
var row = (Row)preHashed.Row;
var key = GetKey(row);
if (cache.TryGetValue(key, out var rows))
foreach (var preHashed in preHashedRows)
{
rows.Add(preHashed);
// Need to update the parent dictionary: rows is a mutable struct.
// Just updating the local `rows` variable won't update the parent dict.
cache[key] = rows;
}
else
{
rows = new()
// Guaranteed to be a valid cast by contract of OnInternalInsert.
var row = (Row)preHashed.Row;
var key = GetKey(row);
if (cache.TryGetValue(key, out var set))
{
preHashed
};
cache.Add(key, rows);
set.Add(preHashed);
// Need to update the parent dictionary: `set` is a mutable struct.
// Just updating the local `set` variable won't update the parent dict.
cache[key] = set;
}
else
{
set = new()
{
preHashed
};
cache.Add(key, set);
}
}
};

table.OnInternalDelete += preHashed =>
table.OnInternalDelete += preHashedRows =>
{
// Guaranteed to be a valid cast by contract of OnInternalDelete.
var row = (Row)preHashed.Row;
var key = GetKey(row);
var keyCache = cache[key];
keyCache.Remove(preHashed);
if (keyCache.Count == 0)
foreach (var preHashed in preHashedRows)
{
cache.Remove(key);
}
else
{
// Need to update the parent dictionary: keyCache is a mutable struct.
// Just updating the local `keyCache` variable won't update the parent dict.
cache[key] = keyCache;
// Guaranteed to be a valid cast by contract of OnInternalDelete.
var row = (Row)preHashed.Row;
var key = GetKey(row);
var set = cache[key];
set.Remove(preHashed);
if (set.Count == 0)
{
cache.Remove(key);
}
else
{
// Need to update the parent dictionary: `set` is a mutable struct.
// Just updating the local `set` variable won't update the parent dict.
cache[key] = set;
}
}
};
}
Expand All @@ -155,20 +173,21 @@ public RemoteTableHandle(IDbConnection conn) : base(conn) { }
protected virtual object? GetPrimaryKey(Row row) => null;

// These events are used by indices to add/remove rows to their dictionaries.
// They can assume the Row stored in the PreHashedRow passed is of the correct type;
//
// They are passed all the modified rows for an update at once:
// this avoids the overhead of invoking handlers per-row.
// (Unfortunately, it's too late to make this sort of change for user callbacks...)
//
// These callbacks can assume the Row stored in the PreHashedRow passed is of the correct type;
// the check is done before performing these callbacks.
// TODO: figure out if they can be merged into regular OnInsert / OnDelete.
// I didn't do that because that delays the index updates until after the row is processed.
// In theory, that shouldn't be the issue, but I didn't want to break it right before leaving :)
// - Ingvar
private AbstractEventHandler<PreHashedRow> OnInternalInsertHandler { get; } = new();
private event Action<PreHashedRow> OnInternalInsert
private AbstractEventHandler<List<PreHashedRow>> OnInternalInsertHandler { get; } = new();
private event Action<List<PreHashedRow>> OnInternalInsert
{
add => OnInternalInsertHandler.AddListener(value);
remove => OnInternalInsertHandler.RemoveListener(value);
}
private AbstractEventHandler<PreHashedRow> OnInternalDeleteHandler { get; } = new();
private event Action<PreHashedRow> OnInternalDelete
private AbstractEventHandler<List<PreHashedRow>> OnInternalDeleteHandler { get; } = new();
private event Action<List<PreHashedRow>> OnInternalDelete
{
add => OnInternalDeleteHandler.AddListener(value);
remove => OnInternalDeleteHandler.RemoveListener(value);
Expand Down Expand Up @@ -310,6 +329,9 @@ void IRemoteTableHandle.PreApply(IEventContext context, MultiDictionaryDelta<obj
}
}

List<PreHashedRow> scratchInsertBuffer = new();
List<PreHashedRow> scratchDeleteBuffer = new();

void IRemoteTableHandle.Apply(IEventContext context, MultiDictionaryDelta<object, PreHashedRow> multiDictionaryDelta)
{
try
Expand All @@ -325,50 +347,76 @@ void IRemoteTableHandle.Apply(IEventContext context, MultiDictionaryDelta<object
throw new Exception($"While table `{RemoteTableName}` was applying:\n{deltaString} \nto:\n{entriesString}", e);
}

// I would like to pre-allocate the needed capacity in scratchInsertBuffer here, but there is no way to :(
scratchInsertBuffer.Clear();

// Update indices.
// This is a local operation -- it only looks at our indices and doesn't invoke user code.
// So we don't need to wait for other tables to be updated to do it.
// (And we need to do it before any PostApply is called.)
// (And we need to do it before ANY client callback is invoked.)

// First, we gather the updated rows into buffers.
// This is also when we verify that the PreHashedRows store rows of the correct types.
foreach (var (_, value) in wasInserted)
{
if (value.Row is Row newRow)
{
OnInternalInsertHandler.Invoke(value);
scratchInsertBuffer.Add(value);
}
else
{
throw new Exception($"Invalid row type for table {RemoteTableName}: {value.GetType().Name}");
}
}
// Then, we dispatch the index update handlers on the buffers.
// Doing things a buffer-at-a-time lets us avoid indirection in the inner loop.
OnInternalInsertHandler.Invoke(scratchInsertBuffer);

scratchInsertBuffer.Clear();
scratchDeleteBuffer.Clear();

foreach (var (_, oldValue, newValue) in wasUpdated)
{
if (oldValue.Row is Row oldRow)
{
OnInternalDeleteHandler.Invoke(oldValue);
scratchDeleteBuffer.Add(oldValue);
}
else
{
throw new Exception($"Invalid row type for table {RemoteTableName}: {oldValue.GetType().Name}");
}


if (newValue.Row is Row newRow)
{
OnInternalInsertHandler.Invoke(newValue);
scratchInsertBuffer.Add(newValue);
}
else
{
throw new Exception($"Invalid row type for table {RemoteTableName}: {newValue.GetType().Name}");
}
}

// Make sure to invoke OnInternalDeleteHandler first, otherwise
// we might accidentally delete a row after we update it!
OnInternalDeleteHandler.Invoke(scratchDeleteBuffer);
OnInternalInsertHandler.Invoke(scratchInsertBuffer);

scratchInsertBuffer.Clear();
scratchDeleteBuffer.Clear();

foreach (var (_, value) in wasRemoved)
{
if (value.Row is Row oldRow)
{
OnInternalDeleteHandler.Invoke(value);
scratchDeleteBuffer.Add(value);
}
else
{
throw new Exception($"Invalid row type for table {RemoteTableName}: {value.GetType().Name}");
}
}
OnInternalDeleteHandler.Invoke(scratchDeleteBuffer);

scratchDeleteBuffer.Clear();
}

void IRemoteTableHandle.PostApply(IEventContext context)
Expand Down Expand Up @@ -434,6 +482,11 @@ public void Invoke(EventContext ctx, Row oldRow, Row newRow)
///
/// You MUST create objects of this type with the single-argument constructor.
/// Default-initializing an object of this type breaks its invariant, which is that Hash is the hash of Row.
///
/// Note: if struct types for table rows are ever added to the SDK, this should be updated to take a generic argument
/// of type Row. This *may* be able to avoid boxing the row structs -- you currently have to box them, since they're stored type-erased
/// as IStructuralReadWrite. But it doesn't matter rn, since currently all row types are generated as classes.
/// This may be challenging to massage SpacetimeDBClient into supporting, though.
/// </summary>
internal struct PreHashedRow
{
Expand Down
Loading