Skip to content

Commit d6d6c19

Browse files
authored
Upgrade Quack and Surge libraries (#5500)
1 parent b24e870 commit d6d6c19

File tree

13 files changed

+70
-84
lines changed

13 files changed

+70
-84
lines changed

src/Directory.Packages.props

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@
5555
<PackageVersion Include="Kurrent.Connectors.Pulsar" Version="1.1.1-alpha.1.57" />
5656
<PackageVersion Include="Kurrent.Connectors.Sql" Version="1.1.1-alpha.1.57" />
5757
<PackageVersion Include="Kurrent.Surge" Version="1.1.1-alpha.1.57" />
58-
<PackageVersion Include="Kurrent.Surge.Core" Version="1.1.1-alpha.1.57" />
58+
<PackageVersion Include="Kurrent.Surge.Core" Version="1.1.1-alpha.1.60" />
5959
<PackageVersion Include="Kurrent.Surge.DataProtection" Version="1.1.1-alpha.1.57" />
6060
<PackageVersion Include="Kurrent.Surge.DuckDB" Version="1.1.1-alpha.1.57" />
61-
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.101" />
61+
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.127" />
6262
<PackageVersion Include="KurrentDB.Client" Version="1.0.0" />
6363
<PackageVersion Include="librdkafka.redist" Version="2.5.0" />
6464
<PackageVersion Include="LruCacheNet" Version="1.2.0" />
@@ -170,4 +170,4 @@
170170
<PackageVersion Include="xunit.v3.extensibility.core" Version="3.1.0" />
171171
<PackageVersion Include="YamlDotnet" Version="16.3.0" />
172172
</ItemGroup>
173-
</Project>
173+
</Project>

src/KurrentDB.SecondaryIndexing.LoadTesting/Environments/DuckDB/RawQuackMessageBatchAppender.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ public ValueTask AppendToDefaultIndex(TestMessageBatch batch) {
4646
LastSequence++;
4747

4848
using (var row = _defaultIndexAppender.CreateRow()) {
49-
row.Append(logPosition);
50-
row.AppendDefault();
51-
row.Append(eventNumber);
52-
row.Append(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
53-
row.AppendDefault();
54-
row.Append(1); //stream.Id
55-
row.Append(1); //eventType.Id
56-
row.Append(1); //category.Id
57-
row.AppendDefault();
49+
row.Add(logPosition);
50+
row.AddDefault();
51+
row.Add(eventNumber);
52+
row.Add(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
53+
row.AddDefault();
54+
row.Add(1); //stream.Id
55+
row.Add(1); //eventType.Id
56+
row.Add(1); //category.Id
57+
row.AddDefault();
5858
}
5959

6060
if (LastSequence < LastCommittedSequence + _commitSize)

src/KurrentDB.SecondaryIndexing.Tests/Indexes/DefaultIndexProcessorTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

44
using Dapper;
5+
using DotNext;
56
using Kurrent.Quack.ConnectionPool;
67
using KurrentDB.Core.Data;
78
using KurrentDB.Core.Index.Hashes;
@@ -124,7 +125,7 @@ private void AssertDefaultIndexQueryReturns(List<long> expected) {
124125
}
125126

126127
private void AssertLastLogPositionQueryReturns(long? expectedLogPosition) {
127-
var actual = DuckDb.QueryFirstOrDefault<LastPositionResult, GetLastLogPositionQuery>();
128+
var actual = DuckDb.QueryFirstOrDefault<LastPositionResult, GetLastLogPositionQuery>().OrNull();
128129

129130
Assert.Equal(expectedLogPosition, actual?.PreparePosition);
130131
}

src/KurrentDB.SecondaryIndexing/Indexes/Category/CategorySql.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ internal static class CategorySql {
1111
/// Get index records for a given category where log position is greater the start position
1212
/// </summary>
1313
public struct CategoryIndexQueryExcl : IQuery<CategoryIndexQueryArgs, IndexQueryRecord> {
14-
public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
14+
public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
1515
=> new(statement) {
1616
args.Category,
1717
args.StartPosition,
@@ -29,7 +29,7 @@ public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatem
2929
/// Get index records for a given category where the log position is greater or equal the start position
3030
/// </summary>
3131
public struct CategoryIndexQueryIncl : IQuery<CategoryIndexQueryArgs, IndexQueryRecord> {
32-
public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
32+
public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
3333
=> new(statement) {
3434
args.Category,
3535
args.StartPosition,
@@ -47,7 +47,7 @@ public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatem
4747
/// Get index records for a given category where log position is less the start position
4848
/// </summary>
4949
public struct CategoryIndexBackQueryExcl : IQuery<CategoryIndexQueryArgs, IndexQueryRecord> {
50-
public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
50+
public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
5151
=> new(statement) {
5252
args.Category,
5353
args.StartPosition,
@@ -64,7 +64,7 @@ public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatem
6464
/// Get index records for a given category where the log position is less or equal the start position
6565
/// </summary>
6666
public struct CategoryIndexBackQueryIncl : IQuery<CategoryIndexQueryArgs, IndexQueryRecord> {
67-
public static BindingContext Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
67+
public static StatementBindingResult Bind(in CategoryIndexQueryArgs args, PreparedStatement statement)
6868
=> new(statement) {
6969
args.Category,
7070
args.StartPosition,

src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultIndexProcessor.cs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,26 +88,26 @@ public bool TryIndex(ResolvedEvent resolvedEvent) {
8888
var category = GetStreamCategory(resolvedEvent.Event.EventStreamId);
8989
var created = new DateTimeOffset(resolvedEvent.Event.TimeStamp).ToUnixTimeMilliseconds();
9090
using (var row = _appender.CreateRow()) {
91-
row.Append(logPosition);
91+
row.Add(logPosition);
9292
if (commitPosition.HasValue && logPosition != commitPosition)
93-
row.Append(commitPosition.Value);
93+
row.Add(commitPosition.Value);
9494
else
95-
row.Append(DBNull.Value);
96-
row.Append(eventNumber);
97-
row.Append(created);
98-
row.Append(DBNull.Value); // expires
99-
row.Append(stream);
100-
row.Append(streamHash);
101-
row.Append(schemaName);
102-
row.Append(category);
103-
row.Append(false); // is_deleted TODO: What happens if the event is deleted before we commit?
95+
row.Add(DBNull.Value);
96+
row.Add(eventNumber);
97+
row.Add(created);
98+
row.Add(DBNull.Value); // expires
99+
row.Add(stream);
100+
row.Add(streamHash);
101+
row.Add(schemaName);
102+
row.Add(category);
103+
row.Add(false); // is_deleted TODO: What happens if the event is deleted before we commit?
104104
if (schemaId != null) {
105-
row.Append(schemaId);
105+
row.Add(schemaId);
106106
} else {
107-
row.Append(DBNull.Value);
107+
row.Add(DBNull.Value);
108108
}
109109

110-
row.Append(schemaFormat);
110+
row.Add(schemaFormat);
111111
}
112112

113113
_inFlightRecords.Append(logPosition, commitPosition ?? logPosition, category, schemaName, resolvedEvent.Event.EventStreamId,
@@ -130,10 +130,9 @@ static string GetStreamCategory(string streamName) {
130130
public TFPos GetLastPosition() => LastIndexedPosition;
131131

132132
private (TFPos, DateTimeOffset) ReadLastIndexedRecord() {
133-
var result = _connection.QueryFirstOrDefault<LastPositionResult, GetLastLogPositionQuery>();
134-
return result != null
135-
? (new(result.Value.CommitPosition ?? result.Value.PreparePosition, result.Value.PreparePosition),
136-
DateTimeOffset.FromUnixTimeMilliseconds(result.Value.Timestamp))
133+
return _connection.QueryFirstOrDefault<LastPositionResult, GetLastLogPositionQuery>().TryGet(out var result)
134+
? (new TFPos(result.CommitPosition ?? result.PreparePosition, result.PreparePosition),
135+
DateTimeOffset.FromUnixTimeMilliseconds(result.Timestamp))
137136
: (TFPos.Invalid, DateTimeOffset.MinValue);
138137
}
139138

src/KurrentDB.SecondaryIndexing/Indexes/Default/DefaultSql.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public record struct ReadDefaultIndexQueryArgs(long StartPosition, long EndPosit
1313
/// Get index records for the default index with a log position greater than the start position
1414
/// </summary>
1515
public struct ReadDefaultIndexQueryExcl : IQuery<ReadDefaultIndexQueryArgs, IndexQueryRecord> {
16-
public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
16+
public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
1717
=> new(statement) { args.StartPosition, args.EndPosition, args.Count };
1818

1919
public static ReadOnlySpan<byte> CommandText =>
@@ -26,7 +26,7 @@ public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedSta
2626
/// Get index records for the default index with log position greater or equal than the start position
2727
/// </summary>
2828
public struct ReadDefaultIndexQueryIncl : IQuery<ReadDefaultIndexQueryArgs, IndexQueryRecord> {
29-
public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
29+
public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
3030
=> new(statement) { args.StartPosition, args.EndPosition, args.Count };
3131

3232
public static ReadOnlySpan<byte> CommandText =>
@@ -39,7 +39,7 @@ public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedSta
3939
/// Get index records for the default index with the log position less than the start position
4040
/// </summary>
4141
public struct ReadDefaultIndexBackQueryExcl : IQuery<ReadDefaultIndexQueryArgs, IndexQueryRecord> {
42-
public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
42+
public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
4343
=> new(statement) { args.StartPosition, args.Count };
4444

4545
public static ReadOnlySpan<byte> CommandText =>
@@ -52,7 +52,7 @@ public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedSta
5252
/// Get index records for the default index with log position less or equal than the start position
5353
/// </summary>
5454
public struct ReadDefaultIndexBackQueryIncl : IQuery<ReadDefaultIndexQueryArgs, IndexQueryRecord> {
55-
public static BindingContext Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
55+
public static StatementBindingResult Bind(in ReadDefaultIndexQueryArgs args, PreparedStatement statement)
5656
=> new(statement) { args.StartPosition, args.Count };
5757

5858
public static ReadOnlySpan<byte> CommandText =>

src/KurrentDB.SecondaryIndexing/Indexes/EventType/EventTypeSql.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public record struct ReadEventTypeIndexQueryArgs(string EventType, long StartPos
1313
/// Get index records for a given event type where the log position is greater than the start position
1414
/// </summary>
1515
public struct ReadEventTypeIndexQueryExcl : IQuery<ReadEventTypeIndexQueryArgs, IndexQueryRecord> {
16-
public static BindingContext Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
16+
public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
1717
=> new(statement) {
1818
args.EventType,
1919
args.StartPosition,
@@ -31,7 +31,7 @@ public static ReadOnlySpan<byte> CommandText
3131
/// Get index records for a given event type where the log position is greater or equal the start position
3232
/// </summary>
3333
public struct ReadEventTypeIndexQueryIncl : IQuery<ReadEventTypeIndexQueryArgs, IndexQueryRecord> {
34-
public static BindingContext Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
34+
public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
3535
=> new(statement) {
3636
args.EventType,
3737
args.StartPosition,
@@ -49,7 +49,7 @@ public static ReadOnlySpan<byte> CommandText
4949
/// Get index records for a given event type where the log position is less than the start position
5050
/// </summary>
5151
public struct ReadEventTypeIndexBackQueryExcl : IQuery<ReadEventTypeIndexQueryArgs, IndexQueryRecord> {
52-
public static BindingContext Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
52+
public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
5353
=> new(statement) {
5454
args.EventType,
5555
args.StartPosition,
@@ -66,7 +66,7 @@ public static ReadOnlySpan<byte> CommandText
6666
/// Get index records for a given event type where the log position is less or equal the start position
6767
/// </summary>
6868
public struct ReadEventTypeIndexBackQueryIncl : IQuery<ReadEventTypeIndexQueryArgs, IndexQueryRecord> {
69-
public static BindingContext Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
69+
public static StatementBindingResult Bind(in ReadEventTypeIndexQueryArgs args, PreparedStatement statement)
7070
=> new(statement) {
7171
args.EventType,
7272
args.StartPosition,

src/KurrentDB.SecondaryIndexing/Indexes/User/IField.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal readonly record struct Int16Field(short Key) : IField {
2727
public static string GetCreateStatement(string field) => $", \"{field}\" SMALLINT not null";
2828
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
2929
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
30-
public void AppendTo(Appender.Row row) => row.Append(Key);
30+
public void AppendTo(Appender.Row row) => row.Add(Key);
3131
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
3232
public override string ToString() => Key.ToString();
3333
}
@@ -39,7 +39,7 @@ internal readonly record struct Int32Field(int Key) : IField {
3939
public static string GetCreateStatement(string field) => $", \"{field}\" INTEGER not null";
4040
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
4141
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
42-
public void AppendTo(Appender.Row row) => row.Append(Key);
42+
public void AppendTo(Appender.Row row) => row.Add(Key);
4343
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
4444
public override string ToString() => Key.ToString();
4545
}
@@ -51,7 +51,7 @@ internal readonly record struct Int64Field(long Key) : IField {
5151
public static string GetCreateStatement(string field) => $", \"{field}\" BIGINT not null";
5252
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
5353
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
54-
public void AppendTo(Appender.Row row) => row.Append(Key);
54+
public void AppendTo(Appender.Row row) => row.Add(Key);
5555
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
5656
public override string ToString() => Key.ToString();
5757
}
@@ -63,7 +63,7 @@ internal readonly record struct UInt32Field(uint Key) : IField {
6363
public static string GetCreateStatement(string field) => $", \"{field}\" UINTEGER not null";
6464
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
6565
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
66-
public void AppendTo(Appender.Row row) => row.Append(Key);
66+
public void AppendTo(Appender.Row row) => row.Add(Key);
6767
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
6868
public override string ToString() => Key.ToString();
6969
}
@@ -75,7 +75,7 @@ internal readonly record struct UInt64Field(ulong Key) : IField {
7575
public static string GetCreateStatement(string field) => $", \"{field}\" UBIGINT not null";
7676
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
7777
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
78-
public void AppendTo(Appender.Row row) => row.Append(Key);
78+
public void AppendTo(Appender.Row row) => row.Add(Key);
7979
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
8080
public override string ToString() => Key.ToString();
8181
}
@@ -87,7 +87,7 @@ internal readonly record struct DoubleField(double Key) : IField {
8787
public static string GetCreateStatement(string field) => $", \"{field}\" DOUBLE not null";
8888
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
8989
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
90-
public void AppendTo(Appender.Row row) => row.Append(Key);
90+
public void AppendTo(Appender.Row row) => row.Add(Key);
9191
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
9292
public override string ToString() => Key.ToString(CultureInfo.InvariantCulture);
9393
}
@@ -99,7 +99,7 @@ internal readonly record struct StringField(string Key) : IField {
9999
public static string GetCreateStatement(string field) => $", \"{field}\" VARCHAR not null";
100100
public string GetQueryStatement(string field) => $"and \"{field}\" = ?";
101101
public void BindTo(PreparedStatement statement, ref int index) => statement.Bind(index++, Key);
102-
public void AppendTo(Appender.Row row) => row.Append(Key);
102+
public void AppendTo(Appender.Row row) => row.Add(Key);
103103
public void WriteTo(IDuckDBDataWriter writer, ulong rowIndex) => writer.WriteValue(Key, rowIndex);
104104
public override string ToString() => Key;
105105
}

src/KurrentDB.SecondaryIndexing/Indexes/User/UserIndexProcessor.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ public override bool TryIndex(ResolvedEvent resolvedEvent) {
127127
_log.LogUserIndexIsAppendingEvent(IndexName, eventNumber, streamId, resolvedEvent.OriginalPosition, fieldStr);
128128

129129
using (var row = _appender.CreateRow()) {
130-
row.Append(preparePosition);
130+
row.Add(preparePosition);
131131

132132
if (commitPosition.HasValue && preparePosition != commitPosition)
133-
row.Append(commitPosition.Value);
133+
row.Add(commitPosition.Value);
134134
else
135-
row.Append(DBNull.Value);
135+
row.Add(DBNull.Value);
136136

137-
row.Append(eventNumber);
138-
row.Append(created);
137+
row.Add(eventNumber);
138+
row.Add(created);
139139
field?.AppendTo(row);
140140
}
141141

0 commit comments

Comments
 (0)