Skip to content

Commit 694def9

Browse files
committed
Introduce blob-based storage using ITableStorage<T> API
We modify the binary-based serializers to work on top of Stream instead of byte arrays. And the new blob-based implementations use the table name as the container name and the partition key as the folder path for the blobs. This allows using document-storage like API without hitting the limitation of column sizes in table storage.
1 parent ef773da commit 694def9

File tree

15 files changed

+753
-28
lines changed

15 files changed

+753
-28
lines changed

.netconfig

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,3 +431,9 @@
431431
sha = 77e83f238196d2723640abef0c7b6f43994f9747
432432
etag = fcb9759a96966df40dcd24906fd328ddec05953b7e747a6bb8d0d1e4c3865274
433433
weak
434+
[file "src/TableStorage/System/AsyncLazy.cs"]
435+
url = https://github.com/devlooped/catbag/blob/main/System/Threading/Tasks/AsyncLazy.cs
436+
sha = 9f3330f09713aa5f746047e3a50ee839147a5797
437+
438+
etag = 73320600b7a18e0eb25cadc3d687c69dc79181b0458facf526666e150c634782
439+
weak

src/TableStorage.Bson/BsonDocumentSerializer.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,24 @@ partial class BsonDocumentSerializer : IBinaryDocumentSerializer
3232
public static IDocumentSerializer Default { get; } = new BsonDocumentSerializer();
3333

3434
/// <inheritdoc />
35-
public T? Deserialize<T>(byte[] data)
35+
public T? Deserialize<T>(Stream data)
3636
{
3737
if (data.Length == 0)
3838
return default;
3939

40-
using var mem = new MemoryStream(data);
41-
using var reader = new Newtonsoft.Json.Bson.BsonDataReader(mem);
40+
var reader = new Newtonsoft.Json.Bson.BsonDataReader(data);
4241
return (T?)serializer.Deserialize<T>(reader);
4342
}
4443

4544
/// <inheritdoc />
46-
public byte[] Serialize<T>(T value)
45+
public void Serialize<T>(T value, Stream stream)
4746
{
4847
if (value == null)
49-
return new byte[0];
48+
return;
5049

51-
using var mem = new MemoryStream();
52-
using var writer = new Newtonsoft.Json.Bson.BsonDataWriter(mem);
50+
var writer = new Newtonsoft.Json.Bson.BsonDataWriter(stream);
5351
serializer.Serialize(writer, value);
54-
return mem.ToArray();
52+
writer.Flush();
5553
}
5654

5755
#if NET6_0_OR_GREATER

src/TableStorage.MessagePack/MessagePackDocumentSerializer.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#nullable enable
33
using System;
44
using System.Collections.Generic;
5+
using System.IO;
56
using MessagePack;
67
using MessagePack.Formatters;
78
using MessagePack.Resolvers;
@@ -36,10 +37,18 @@ public MessagePackDocumentSerializer(MessagePackSerializerOptions? options = def
3637
}
3738

3839
/// <inheritdoc />
39-
public T? Deserialize<T>(byte[] data) => data.Length == 0 ? default : MessagePackSerializer.Deserialize<T>(data, options);
40+
public T? Deserialize<T>(Stream data) => data.Length == 0 ? default : MessagePackSerializer.Deserialize<T>(data, options);
4041

4142
/// <inheritdoc />
42-
public byte[] Serialize<T>(T value) => value == null ? new byte[0] : MessagePackSerializer.Serialize(value.GetType(), value, options);
43+
public void Serialize<T>(T value, Stream stream)
44+
{
45+
if (value == null)
46+
return;
47+
if (stream == null)
48+
throw new ArgumentNullException(nameof(stream));
49+
50+
MessagePackSerializer.Serialize(stream, value, options);
51+
}
4352

4453
#if NET6_0_OR_GREATER
4554
internal class DateOnlyFormatterResolver : IFormatterResolver

src/TableStorage.Protobuf/ProtobufDocumentSerializer.cs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,21 @@ static ProtobufDocumentSerializer()
2626
}
2727

2828
/// <inheritdoc />
29-
public T? Deserialize<T>(byte[] data)
29+
public T? Deserialize<T>(Stream stream)
3030
{
31-
if (data.Length == 0)
31+
if (stream.CanSeek && stream.Length == 0)
3232
return default;
3333

34-
using var mem = new MemoryStream(data);
35-
return (T?)Serializer.Deserialize(typeof(T), mem);
34+
return (T?)Serializer.Deserialize(typeof(T), stream);
3635
}
3736

3837
/// <inheritdoc />
39-
public byte[] Serialize<T>(T value)
38+
public void Serialize<T>(T value, Stream stream)
4039
{
4140
if (value == null)
42-
return new byte[0];
41+
return;
4342

44-
using var mem = new MemoryStream();
45-
Serializer.Serialize(mem, value);
46-
return mem.ToArray();
43+
Serializer.Serialize(stream, value);
4744
}
4845
}
4946
}

src/TableStorage/BlobPartition.cs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//<auto-generated/>
2+
#nullable enable
3+
#if NET8_0_OR_GREATER
4+
using System;
5+
using System.Collections.Concurrent;
6+
using System.Reflection;
7+
8+
namespace Devlooped
9+
{
10+
/// <summary>
11+
/// Factory methods to create <see cref="ITableStorage{T}"/> instances
12+
/// that store entities as a serialized document in blob storage.
13+
/// </summary>
14+
static partial class BlobPartition
15+
{
16+
/// <summary>
17+
/// Creates an <see cref="ITableStorage{T}"/> for the given entity type
18+
/// <typeparamref name="T"/>, using <see cref="DocumentPartition.DefaultTableName"/> as the table name and the
19+
/// <typeparamref name="T"/> <c>Name</c> as the partition key.
20+
/// </summary>
21+
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
22+
/// <param name="storageAccount">The storage account to use.</param>
23+
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
24+
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
25+
/// <param name="includeProperties">Whether to serialize properties as columns too, like table repositories, for easier querying.</param>
26+
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
27+
public static ITableStoragePartition<T> Create<T>(
28+
CloudStorageAccount storageAccount,
29+
Func<T, string> rowKey,
30+
IDocumentSerializer? serializer = default) where T : class
31+
=> Create(storageAccount, DocumentPartition.DefaultTableName, typeof(T).Name, rowKey, serializer);
32+
33+
/// <summary>
34+
/// Creates an <see cref="ITableStorage{T}"/> for the given entity type
35+
/// <typeparamref name="T"/>.
36+
/// </summary>
37+
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
38+
/// <param name="storageAccount">The storage account to use.</param>
39+
/// <param name="tableName">Optional table name to use. If not provided, the <typeparamref name="T"/>
40+
/// <c>Name</c> will be used, unless a <see cref="TableAttribute"/> on the type overrides it.</param>
41+
/// <param name="partitionKey">Optional function to retrieve the partition key for a given entity.
42+
/// If not provided, the class will need a property annotated with <see cref="PartitionKeyAttribute"/>.</param>
43+
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
44+
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
45+
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
46+
/// <returns>The new <see cref="ITableStorage{T}"/>.</returns>
47+
public static ITableStoragePartition<T> Create<T>(
48+
CloudStorageAccount storageAccount,
49+
string? tableName = default,
50+
string? partitionKey = default,
51+
Func<T, string>? rowKey = default,
52+
IDocumentSerializer? serializer = default) where T : class
53+
{
54+
tableName ??= DocumentPartition.GetDefaultTableName<T>();
55+
partitionKey ??= TablePartition.GetDefaultPartitionKey<T>();
56+
rowKey ??= RowKeyAttribute.CreateCompiledAccessor<T>();
57+
serializer ??= DocumentSerializer.Default;
58+
59+
return new BlobPartition<T>(storageAccount, tableName, partitionKey, rowKey, serializer);
60+
}
61+
}
62+
}
63+
#endif
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//<auto-generated/>
2+
#nullable enable
3+
#if NET8_0_OR_GREATER
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq.Expressions;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Mono.Linq.Expressions;
10+
11+
namespace Devlooped
12+
{
13+
/// <inheritdoc />
14+
partial class BlobPartition<T> : ITableStoragePartition<T> where T : class
15+
{
16+
readonly BlobStorage<T> storage;
17+
18+
/// <summary>
19+
/// Initializes the repository with the given storage account and optional table name.
20+
/// </summary>
21+
/// <param name="storageAccount">The <see cref="CloudStorageAccount"/> to use to connect to the table.</param>
22+
/// <param name="tableName">The table that backs this table partition.</param>
23+
/// <param name="partitionKey">The fixed partition key that backs this table partition.</param>
24+
/// <param name="rowKey">A function to determine the row key for an entity of type <typeparamref name="T"/> within the partition.</param>
25+
/// <param name="serializer">The serializer to use.</param>
26+
/// <param name="includeProperties">Whether to serialize properties as columns too, like table repositories, for easier querying.</param>
27+
protected internal BlobPartition(CloudStorageAccount storageAccount, string tableName, string partitionKey, Func<T, string> rowKey, IDocumentSerializer serializer, bool includeProperties = false)
28+
{
29+
PartitionKey = partitionKey ?? TablePartition.GetDefaultPartitionKey<T>();
30+
storage = new BlobStorage<T>(storageAccount, tableName, _ => PartitionKey, rowKey, serializer);
31+
}
32+
33+
/// <inheritdoc />
34+
public string TableName => storage.TableName;
35+
36+
/// <inheritdoc />
37+
public string PartitionKey { get; }
38+
39+
/// <inheritdoc />
40+
public Task<bool> DeleteAsync(T entity, CancellationToken cancellation = default)
41+
=> storage.DeleteAsync(entity, cancellation);
42+
43+
/// <inheritdoc />
44+
public Task<bool> DeleteAsync(string rowKey, CancellationToken cancellation = default)
45+
=> storage.DeleteAsync(PartitionKey, rowKey, cancellation);
46+
47+
/// <inheritdoc />
48+
public IAsyncEnumerable<T> EnumerateAsync(CancellationToken cancellation = default)
49+
=> storage.EnumerateAsync(PartitionKey, cancellation);
50+
51+
/// <inheritdoc />
52+
public Task<T?> GetAsync(string rowKey, CancellationToken cancellation = default)
53+
=> storage.GetAsync(PartitionKey, rowKey, cancellation);
54+
55+
/// <inheritdoc />
56+
public Task<T> PutAsync(T entity, CancellationToken cancellation = default)
57+
=> storage.PutAsync(entity, cancellation);
58+
}
59+
}
60+
#endif

src/TableStorage/BlobStorage.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//<auto-generated/>
2+
#nullable enable
3+
#if NET8_0_OR_GREATER
4+
using System;
5+
6+
namespace Devlooped
7+
{
8+
/// <summary>
9+
/// Factory methods to create <see cref="ITableStorage{T}"/> instances
10+
/// that store entities as a serialized document in blob storage.
11+
/// </summary>
12+
static partial class BlobStorage
13+
{
14+
/// <summary>
15+
/// Creates an <see cref="ITableStorage{T}"/> for the given entity type
16+
/// <typeparamref name="T"/>.
17+
/// </summary>
18+
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
19+
/// <param name="storageAccount">The storage account to use.</param>
20+
/// <param name="tableName">Optional table name to use. If not provided, the <typeparamref name="T"/>
21+
/// <c>Name</c> will be used, unless a <see cref="TableAttribute"/> on the type overrides it.</param>
22+
/// <param name="partitionKey">Optional function to retrieve the partition key for a given entity.
23+
/// If not provided, the class will need a property annotated with <see cref="PartitionKeyAttribute"/>.</param>
24+
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
25+
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
26+
/// <param name="serializer">Optional serializer to use instead of the default <see cref="DocumentSerializer.Default"/>.</param>
27+
/// <returns>The new <see cref="ITableStorage{T}"/>.</returns>
28+
public static ITableStorage<T> Create<T>(
29+
CloudStorageAccount storageAccount,
30+
string? tableName = default,
31+
Func<T, string>? partitionKey = default,
32+
Func<T, string>? rowKey = default,
33+
IDocumentSerializer? serializer = default) where T : class
34+
{
35+
tableName ??= DocumentPartition.GetDefaultTableName<T>();
36+
partitionKey ??= PartitionKeyAttribute.CreateCompiledAccessor<T>();
37+
rowKey ??= RowKeyAttribute.CreateCompiledAccessor<T>();
38+
serializer ??= DocumentSerializer.Default;
39+
40+
return new BlobStorage<T>(storageAccount, tableName, partitionKey, rowKey, serializer);
41+
}
42+
}
43+
}
44+
#endif

0 commit comments

Comments
 (0)