Skip to content

Commit faeeb8e

Browse files
authored
Fix for large file cryptography support (#1528)
* Porting the changes from Dapr.Cryptography 1.16 back to 1.15 Signed-off-by: Whit Waldo <[email protected]>
1 parent 0873c5e commit faeeb8e

File tree

7 files changed

+379
-209
lines changed

7 files changed

+379
-209
lines changed

examples/Client/Cryptography/Examples/EncryptDecryptFileStreamExample.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ public override async Task RunAsync(CancellationToken cancellationToken)
3737
await using var encryptFs = new FileStream(fileName, FileMode.Open);
3838

3939
var bufferedEncryptedBytes = new ArrayBufferWriter<byte>();
40-
await foreach (var bytes in (await client.EncryptAsync(componentName, encryptFs, keyName,
41-
new EncryptionOptions(KeyWrapAlgorithm.Rsa), cancellationToken))
42-
.WithCancellation(cancellationToken))
40+
await foreach (var bytes in (client.EncryptAsync(componentName, encryptFs, keyName,
41+
new EncryptionOptions(KeyWrapAlgorithm.Rsa), cancellationToken)))
4342
{
4443
bufferedEncryptedBytes.Write(bytes.Span);
4544
}
@@ -53,8 +52,8 @@ public override async Task RunAsync(CancellationToken cancellationToken)
5352

5453
//We'll stream the decrypted bytes from a MemoryStream into the above temporary file
5554
await using var encryptedMs = new MemoryStream(bufferedEncryptedBytes.WrittenMemory.ToArray());
56-
await foreach (var result in (await client.DecryptAsync(componentName, encryptedMs, keyName,
57-
cancellationToken)).WithCancellation(cancellationToken))
55+
await foreach (var result in (client.DecryptAsync(componentName, encryptedMs, keyName,
56+
cancellationToken)))
5857
{
5958
decryptFs.Write(result.Span);
6059
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2025 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
#nullable enable
15+
using System;
16+
using System.Collections.Generic;
17+
using System.IO;
18+
using System.Runtime.CompilerServices;
19+
using System.Threading;
20+
using System.Threading.Channels;
21+
using System.Threading.Tasks;
22+
using Google.Protobuf;
23+
using Grpc.Core;
24+
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
25+
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
26+
27+
namespace Dapr.Client.Crypto;
28+
29+
/// <summary>
30+
/// Provides the implementation to decrypt a stream of plaintext data with the Dapr runtime.
31+
/// </summary>
32+
internal sealed class DecryptionStreamProcessor : IDisposable
33+
{
34+
private bool disposed;
35+
private readonly Channel<ReadOnlyMemory<byte>> outputChannel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>();
36+
37+
/// <summary>
38+
/// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams.
39+
/// </summary>
40+
internal event EventHandler<Exception>? OnException;
41+
42+
/// <summary>
43+
/// Sends the provided bytes in chunks to the sidecar for the encryption operation.
44+
/// </summary>
45+
/// <param name="inputStream">The stream containing the bytes to decrypt.</param>
46+
/// <param name="call">The call to make to the sidecar to process the encryption operation.</param>
47+
/// <param name="streamingBlockSizeInBytes">The size, in bytes, of the streaming blocks.</param>
48+
/// <param name="options">The decryption options.</param>
49+
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
50+
public async Task ProcessStreamAsync(
51+
Stream inputStream,
52+
AsyncDuplexStreamingCall<Autogenerated.DecryptRequest, Autogenerated.DecryptResponse> call,
53+
int streamingBlockSizeInBytes,
54+
Autogenerated.DecryptRequestOptions options,
55+
CancellationToken cancellationToken)
56+
{
57+
//Read from the input stream and write to the gRPC call
58+
_ = Task.Run(async () =>
59+
{
60+
try
61+
{
62+
await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes);
63+
var buffer = new byte[streamingBlockSizeInBytes];
64+
int bytesRead;
65+
ulong sequenceNumber = 0;
66+
67+
while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0)
68+
{
69+
var request = new Autogenerated.DecryptRequest
70+
{
71+
Payload = new Autogenerated.StreamPayload
72+
{
73+
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
74+
}
75+
};
76+
77+
//Only include the options in the first message
78+
if (sequenceNumber == 0)
79+
{
80+
request.Options = options;
81+
}
82+
83+
await call.RequestStream.WriteAsync(request, cancellationToken);
84+
85+
//Increment the sequence number
86+
sequenceNumber++;
87+
}
88+
}
89+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
90+
{
91+
// Expected cancellation exception
92+
}
93+
catch (Exception ex)
94+
{
95+
OnException?.Invoke(this, ex);
96+
}
97+
finally
98+
{
99+
await call.RequestStream.CompleteAsync();
100+
}
101+
}, cancellationToken);
102+
103+
//Start reading from the gRPC call and writing to the output channel
104+
_ = Task.Run(async () =>
105+
{
106+
try
107+
{
108+
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
109+
{
110+
await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken);
111+
}
112+
}
113+
catch (Exception ex)
114+
{
115+
OnException?.Invoke(this, ex);
116+
}
117+
finally
118+
{
119+
outputChannel.Writer.Complete();
120+
}
121+
}, cancellationToken);
122+
}
123+
124+
/// <summary>
125+
/// Retrieves the processed bytes from the operation from the sidecar and
126+
/// returns as an enumerable stream.
127+
/// </summary>
128+
public async IAsyncEnumerable<ReadOnlyMemory<byte>> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
129+
{
130+
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
131+
{
132+
yield return data;
133+
}
134+
}
135+
136+
public void Dispose()
137+
{
138+
Dispose(true);
139+
}
140+
141+
private void Dispose(bool disposing)
142+
{
143+
if (!disposed)
144+
{
145+
if (disposing)
146+
{
147+
outputChannel.Writer.TryComplete();
148+
}
149+
150+
disposed = true;
151+
}
152+
}
153+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2025 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
#nullable enable
15+
using System;
16+
using System.Collections.Generic;
17+
using System.IO;
18+
using System.Runtime.CompilerServices;
19+
using System.Threading;
20+
using System.Threading.Channels;
21+
using System.Threading.Tasks;
22+
using Google.Protobuf;
23+
using Grpc.Core;
24+
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
25+
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
26+
27+
namespace Dapr.Client.Crypto;
28+
29+
/// <summary>
30+
/// Provides the implementation to encrypt a stream of plaintext data with the Dapr runtime.
31+
/// </summary>
32+
internal sealed class EncryptionStreamProcessor : IDisposable
33+
{
34+
private bool disposed;
35+
private readonly Channel<ReadOnlyMemory<byte>> outputChannel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>();
36+
37+
/// <summary>
38+
/// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams.
39+
/// </summary>
40+
internal event EventHandler<Exception>? OnException;
41+
42+
/// <summary>
43+
/// Sends the provided bytes in chunks to the sidecar for the encryption operation.
44+
/// </summary>
45+
/// <param name="inputStream">The stream containing the bytes to encrypt.</param>
46+
/// <param name="call">The call to make to the sidecar to process the encryption operation.</param>
47+
/// <param name="options">The encryption options.</param>
48+
/// <param name="streamingBlockSizeInBytes">The size, in bytes, of the streaming blocks.</param>
49+
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
50+
public async Task ProcessStreamAsync(
51+
Stream inputStream,
52+
AsyncDuplexStreamingCall<Autogenerated.EncryptRequest, Autogenerated.EncryptResponse> call,
53+
Autogenerated.EncryptRequestOptions options,
54+
int streamingBlockSizeInBytes,
55+
CancellationToken cancellationToken)
56+
{
57+
//Read from the input stream and write to the gRPC call
58+
_ = Task.Run(async () =>
59+
{
60+
try
61+
{
62+
await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes);
63+
var buffer = new byte[streamingBlockSizeInBytes];
64+
int bytesRead;
65+
ulong sequenceNumber = 0;
66+
67+
while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0)
68+
{
69+
var request = new Autogenerated.EncryptRequest
70+
{
71+
Payload = new Autogenerated.StreamPayload
72+
{
73+
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
74+
}
75+
};
76+
77+
//Only include the options in the first message
78+
if (sequenceNumber == 0)
79+
{
80+
request.Options = options;
81+
}
82+
83+
await call.RequestStream.WriteAsync(request, cancellationToken);
84+
85+
//Increment the sequence number
86+
sequenceNumber++;
87+
}
88+
}
89+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
90+
{
91+
// Expected cancellation exception
92+
}
93+
catch (Exception ex)
94+
{
95+
OnException?.Invoke(this, ex);
96+
}
97+
finally
98+
{
99+
await call.RequestStream.CompleteAsync();
100+
}
101+
}, cancellationToken);
102+
103+
//Start reading from the gRPC call and writing to the output channel
104+
_ = Task.Run(async () =>
105+
{
106+
try
107+
{
108+
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
109+
{
110+
await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken);
111+
}
112+
}
113+
catch (Exception ex)
114+
{
115+
OnException?.Invoke(this, ex);
116+
}
117+
finally
118+
{
119+
outputChannel.Writer.Complete();
120+
}
121+
}, cancellationToken);
122+
}
123+
124+
/// <summary>
125+
/// Retrieves the processed bytes from the operation from the sidecar and
126+
/// returns as an enumerable stream.
127+
/// </summary>
128+
public async IAsyncEnumerable<ReadOnlyMemory<byte>> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
129+
{
130+
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
131+
{
132+
yield return data;
133+
}
134+
}
135+
136+
public void Dispose()
137+
{
138+
Dispose(true);
139+
GC.SuppressFinalize(this);
140+
}
141+
142+
private void Dispose(bool disposing)
143+
{
144+
if (!disposed)
145+
{
146+
if (disposing)
147+
{
148+
outputChannel.Writer.TryComplete();
149+
}
150+
151+
disposed = true;
152+
}
153+
}
154+
}

src/Dapr.Client/DaprClient.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,7 @@ public abstract Task<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName
11051105
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that can be used to cancel the operation.</param>
11061106
/// <returns>An array of encrypted bytes.</returns>
11071107
[Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
1108-
public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> EncryptAsync(string vaultResourceName, Stream plaintextStream, string keyName,
1108+
public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName, Stream plaintextStream, string keyName,
11091109
EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default);
11101110

11111111
/// <summary>
@@ -1144,7 +1144,7 @@ public abstract Task<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName
11441144
/// <returns>An asynchronously enumerable array of decrypted bytes.</returns>
11451145
[Obsolete(
11461146
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
1147-
public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
1147+
public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
11481148
string keyName, DecryptionOptions options, CancellationToken cancellationToken = default);
11491149

11501150
/// <summary>
@@ -1157,7 +1157,7 @@ public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string
11571157
/// <returns>An asynchronously enumerable array of decrypted bytes.</returns>
11581158
[Obsolete(
11591159
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
1160-
public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
1160+
public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
11611161
string keyName, CancellationToken cancellationToken = default);
11621162

11631163
#endregion

0 commit comments

Comments
 (0)