Skip to content

Commit 337cbba

Browse files
committed
Split VectorSet code from ResultProcessor (also: *Lease*)
1 parent a148d99 commit 337cbba

File tree

3 files changed

+331
-313
lines changed

3 files changed

+331
-313
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
using System.Diagnostics;
2+
using Pipelines.Sockets.Unofficial.Arenas;
3+
4+
// ReSharper disable once CheckNamespace
5+
namespace StackExchange.Redis;
6+
7+
internal abstract partial class ResultProcessor
8+
{
9+
// Lease result processors
10+
public static readonly ResultProcessor<Lease<float>?> LeaseFloat32 = new LeaseFloat32Processor();
11+
12+
public static readonly ResultProcessor<Lease<byte>>
13+
Lease = new LeaseProcessor();
14+
15+
public static readonly ResultProcessor<Lease<byte>>
16+
LeaseFromArray = new LeaseFromArrayProcessor();
17+
18+
private abstract class LeaseProcessor<T> : ResultProcessor<Lease<T>?>
19+
{
20+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
21+
{
22+
if (result.Resp2TypeArray != ResultType.Array)
23+
{
24+
return false; // not an array
25+
}
26+
27+
// deal with null
28+
if (result.IsNull)
29+
{
30+
SetResult(message, Lease<T>.Empty);
31+
return true;
32+
}
33+
34+
// lease and fill
35+
var items = result.GetItems();
36+
var length = checked((int)items.Length);
37+
var lease = Lease<T>.Create(length, clear: false); // note this handles zero nicely
38+
var target = lease.Span;
39+
int index = 0;
40+
foreach (ref RawResult item in items)
41+
{
42+
if (!TryParse(item, out target[index++]))
43+
{
44+
// something went wrong; recycle and quit
45+
lease.Dispose();
46+
return false;
47+
}
48+
}
49+
Debug.Assert(index == length, "length mismatch");
50+
SetResult(message, lease);
51+
return true;
52+
}
53+
54+
protected abstract bool TryParse(in RawResult raw, out T parsed);
55+
}
56+
57+
private abstract class InterleavedLeaseProcessor<T> : ResultProcessor<Lease<T>?>
58+
{
59+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
60+
{
61+
if (result.Resp2TypeArray != ResultType.Array)
62+
{
63+
return false; // not an array
64+
}
65+
66+
// deal with null
67+
if (result.IsNull)
68+
{
69+
SetResult(message, Lease<T>.Empty);
70+
return true;
71+
}
72+
73+
// lease and fill
74+
var items = result.GetItems();
75+
var length = checked((int)items.Length) / 2;
76+
var lease = Lease<T>.Create(length, clear: false); // note this handles zero nicely
77+
var target = lease.Span;
78+
79+
var iter = items.GetEnumerator();
80+
for (int i = 0; i < target.Length; i++)
81+
{
82+
bool ok = iter.MoveNext();
83+
if (ok)
84+
{
85+
ref readonly RawResult first = ref iter.Current;
86+
ok = iter.MoveNext() && TryParse(in first, in iter.Current, out target[i]);
87+
}
88+
if (!ok)
89+
{
90+
lease.Dispose();
91+
return false;
92+
}
93+
}
94+
SetResult(message, lease);
95+
return true;
96+
}
97+
98+
protected abstract bool TryParse(in RawResult first, in RawResult second, out T parsed);
99+
}
100+
101+
// takes a nested vector of the form [[A],[B,C],[D]] and exposes it as [A,B,C,D]; this is
102+
// especially useful for VLINKS
103+
private abstract class FlattenedLeaseProcessor<T> : ResultProcessor<Lease<T>?>
104+
{
105+
protected virtual long GetArrayLength(in RawResult array) => array.GetItems().Length;
106+
107+
protected virtual bool TryReadOne(ref Sequence<RawResult>.Enumerator reader, out T value)
108+
{
109+
if (reader.MoveNext())
110+
{
111+
return TryReadOne(in reader.Current, out value);
112+
}
113+
value = default!;
114+
return false;
115+
}
116+
117+
protected virtual bool TryReadOne(in RawResult result, out T value)
118+
{
119+
value = default!;
120+
return false;
121+
}
122+
123+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
124+
{
125+
if (result.Resp2TypeArray != ResultType.Array)
126+
{
127+
return false; // not an array
128+
}
129+
if (result.IsNull)
130+
{
131+
SetResult(message, Lease<T>.Empty);
132+
return true;
133+
}
134+
var items = result.GetItems();
135+
long length = 0;
136+
foreach (ref RawResult item in items)
137+
{
138+
if (item.Resp2TypeArray == ResultType.Array && !item.IsNull)
139+
{
140+
length += GetArrayLength(in item);
141+
}
142+
}
143+
144+
if (length == 0)
145+
{
146+
SetResult(message, Lease<T>.Empty);
147+
return true;
148+
}
149+
var lease = Lease<T>.Create(checked((int)length), clear: false);
150+
int index = 0;
151+
var target = lease.Span;
152+
foreach (ref RawResult item in items)
153+
{
154+
if (item.Resp2TypeArray == ResultType.Array && !item.IsNull)
155+
{
156+
var iter = item.GetItems().GetEnumerator();
157+
while (index < target.Length && TryReadOne(ref iter, out target[index]))
158+
{
159+
index++;
160+
}
161+
}
162+
}
163+
164+
if (index == length)
165+
{
166+
SetResult(message, lease);
167+
return true;
168+
}
169+
lease.Dispose(); // failed to fill?
170+
return false;
171+
}
172+
}
173+
174+
private sealed class LeaseFloat32Processor : LeaseProcessor<float>
175+
{
176+
protected override bool TryParse(in RawResult raw, out float parsed)
177+
{
178+
var result = raw.TryGetDouble(out double val);
179+
parsed = (float)val;
180+
return result;
181+
}
182+
}
183+
184+
private sealed class LeaseProcessor : ResultProcessor<Lease<byte>>
185+
{
186+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
187+
{
188+
switch (result.Resp2TypeBulkString)
189+
{
190+
case ResultType.Integer:
191+
case ResultType.SimpleString:
192+
case ResultType.BulkString:
193+
SetResult(message, result.AsLease()!);
194+
return true;
195+
}
196+
return false;
197+
}
198+
}
199+
200+
private sealed class LeaseFromArrayProcessor : ResultProcessor<Lease<byte>>
201+
{
202+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
203+
{
204+
switch (result.Resp2TypeBulkString)
205+
{
206+
case ResultType.Array:
207+
var items = result.GetItems();
208+
if (items.Length == 1)
209+
{ // treat an array of 1 like a single reply
210+
SetResult(message, items[0].AsLease()!);
211+
return true;
212+
}
213+
break;
214+
}
215+
return false;
216+
}
217+
}
218+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
using Pipelines.Sockets.Unofficial.Arenas;
2+
3+
// ReSharper disable once CheckNamespace
4+
namespace StackExchange.Redis;
5+
6+
internal abstract partial class ResultProcessor
7+
{
8+
// VectorSet result processors
9+
public static readonly ResultProcessor<Lease<VectorSetLink>?> VectorSetLinksWithScores = new VectorSetLinksWithScoresProcessor();
10+
public static readonly ResultProcessor<Lease<RedisValue>?> VectorSetLinks = new VectorSetLinksProcessor();
11+
12+
public static ResultProcessor<VectorSetInfo?> VectorSetInfo = new VectorSetInfoProcessor();
13+
14+
private sealed class VectorSetLinksWithScoresProcessor : FlattenedLeaseProcessor<VectorSetLink>
15+
{
16+
protected override long GetArrayLength(in RawResult array) => array.GetItems().Length / 2;
17+
18+
protected override bool TryReadOne(ref Sequence<RawResult>.Enumerator reader, out VectorSetLink value)
19+
{
20+
if (reader.MoveNext())
21+
{
22+
ref readonly RawResult first = ref reader.Current;
23+
if (reader.MoveNext() && reader.Current.TryGetDouble(out var score))
24+
{
25+
value = new VectorSetLink(first.AsRedisValue(), score);
26+
return true;
27+
}
28+
}
29+
value = default;
30+
return false;
31+
}
32+
}
33+
34+
private sealed class VectorSetLinksProcessor : FlattenedLeaseProcessor<RedisValue>
35+
{
36+
protected override bool TryReadOne(in RawResult result, out RedisValue value)
37+
{
38+
value = result.AsRedisValue();
39+
return true;
40+
}
41+
}
42+
43+
private sealed class VectorSetInfoProcessor : ResultProcessor<VectorSetInfo?>
44+
{
45+
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
46+
{
47+
if (result.Resp2TypeArray == ResultType.Array)
48+
{
49+
if (result.IsNull)
50+
{
51+
SetResult(message, null);
52+
return true;
53+
}
54+
var quantType = VectorSetQuantization.Unknown;
55+
string? quantTypeRaw = null;
56+
int vectorDim = 0, maxLevel = 0;
57+
long size = 0, vsetUid = 0, hnswMaxNodeUid = 0;
58+
var iter = result.GetItems().GetEnumerator();
59+
while (iter.MoveNext())
60+
{
61+
var key = iter.Current;
62+
if (!iter.MoveNext()) break;
63+
var value = iter.Current;
64+
65+
var len = key.Payload.Length;
66+
var keyHash = key.Payload.Hash64();
67+
switch (key.Payload.Length)
68+
{
69+
case 4 when keyHash == FastHash._4.size && key.IsEqual(FastHash._4.size_u8) && value.TryGetInt64(out var i64):
70+
size = i64;
71+
break;
72+
case 8 when keyHash == FastHash._8.vset_uid && key.IsEqual(FastHash._8.vset_uid_u8) && value.TryGetInt64(out var i64):
73+
vsetUid = i64;
74+
break;
75+
case 9 when keyHash == FastHash._9.max_level && key.IsEqual(FastHash._9.max_level_u8) && value.TryGetInt64(out var i64):
76+
maxLevel = checked((int)i64);
77+
break;
78+
case 10 when keyHash == FastHash._10.vector_dim && key.IsEqual(FastHash._10.vector_dim_u8) && value.TryGetInt64(out var i64):
79+
vectorDim = checked((int)i64);
80+
break;
81+
case 10 when keyHash == FastHash._10.quant_type && key.IsEqual(FastHash._10.quant_type_u8):
82+
var qHash = value.Payload.Hash64();
83+
switch (value.Payload.Length)
84+
{
85+
case 3 when qHash == FastHash._3.bin && value.IsEqual(FastHash._3.bin_u8):
86+
quantType = VectorSetQuantization.Binary;
87+
break;
88+
case 3 when qHash == FastHash._3.f32 && value.IsEqual(FastHash._3.f32_u8):
89+
quantType = VectorSetQuantization.None;
90+
break;
91+
case 4 when qHash == FastHash._4.int8 && value.IsEqual(FastHash._4.int8_u8):
92+
quantType = VectorSetQuantization.Int8;
93+
break;
94+
default:
95+
quantTypeRaw = value.GetString();
96+
quantType = VectorSetQuantization.Unknown;
97+
break;
98+
}
99+
break;
100+
case 17 when keyHash == FastHash._17.hnsw_max_node_uid && key.IsEqual(FastHash._17.hnsw_max_node_uid_u8) && value.TryGetInt64(out var i64):
101+
hnswMaxNodeUid = i64;
102+
break;
103+
}
104+
}
105+
106+
SetResult(message, new VectorSetInfo(quantType, quantTypeRaw, vectorDim, size, maxLevel, vsetUid, hnswMaxNodeUid));
107+
return true;
108+
}
109+
return false;
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)