Skip to content

Commit 8cba7d7

Browse files
authored
Implement storage events (Azure#46648)
1 parent 17ef743 commit 8cba7d7

File tree

9 files changed

+251
-30
lines changed

9 files changed

+251
-30
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@
771771
# ServiceOwners: @sunilagarwal @lfittl-msft @sr-msft @niklarin
772772

773773
# PRLabel: %Provisioning
774-
/sdk/provisioning/ @JoshLove-msft @tg-msft
774+
/sdk/provisioning/ @JoshLove-msft @tg-msft @christothes @KrzysztofCwalina
775775

776776
# ServiceLabel: %Provisioning
777777
# AzureSdkOwners: @JoshLove-msft

sdk/provisioning/Azure.Provisioning.CloudMachine/api/Azure.Provisioning.CloudMachine.netstandard2.0.cs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,31 @@ public readonly partial struct MessagingServices
2828
public void SendMessage(object serializable) { }
2929
public void WhenMessageReceived(System.Action<string> received) { }
3030
}
31+
public partial class StorageFile
32+
{
33+
internal StorageFile() { }
34+
public System.Threading.CancellationToken CancellationToken { get { throw null; } }
35+
public string Path { get { throw null; } }
36+
public string RequestId { get { throw null; } }
37+
public void Delete() { }
38+
public System.BinaryData Download() { throw null; }
39+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
40+
public override bool Equals(object obj) { throw null; }
41+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
42+
public override int GetHashCode() { throw null; }
43+
public static implicit operator Azure.Response (Azure.CloudMachine.StorageFile result) { throw null; }
44+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
45+
public override string ToString() { throw null; }
46+
}
3147
[System.Runtime.InteropServices.StructLayoutAttribute(System.Runtime.InteropServices.LayoutKind.Sequential)]
3248
public readonly partial struct StorageServices
3349
{
3450
private readonly object _dummy;
3551
private readonly int _dummyPrimitive;
36-
public System.BinaryData DownloadBlob(string name) { throw null; }
52+
public void DeleteBlob(string path) { }
53+
public System.BinaryData DownloadBlob(string path) { throw null; }
3754
public string UploadBlob(object json, string? name = null) { throw null; }
38-
public void WhenBlobCreated(System.Func<string, System.Threading.Tasks.Task> function) { }
39-
public void WhenBlobUploaded(System.Action<string> function) { }
55+
public void WhenBlobUploaded(System.Action<Azure.CloudMachine.StorageFile> function) { }
4056
}
4157
}
4258
namespace Azure.Core

sdk/provisioning/Azure.Provisioning.CloudMachine/src/Azure.Provisioning.CloudMachine.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<PackageReference Include="Azure.AI.OpenAI" />
1515
<PackageReference Include="Azure.Identity" />
1616
<PackageReference Include="Azure.Messaging.ServiceBus" />
17+
<PackageReference Include="Azure.Messaging.EventGrid" />
1718
<PackageReference Include="Azure.Security.KeyVault.Secrets" />
1819
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" VersionOverride="8.0.0" />
1920
</ItemGroup>

sdk/provisioning/Azure.Provisioning.CloudMachine/src/CDKLevel3/CloudMachineInfrastructure.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ namespace Azure.Provisioning.CloudMachine;
1919

2020
public class CloudMachineInfrastructure
2121
{
22+
internal const string SB_PRIVATE_TOPIC = "cm_servicebus_topic_private";
23+
internal const string SB_PRIVATE_SUB = "cm_servicebus_subscription_private";
2224
private readonly string _cmid;
2325

2426
private Infrastructure _infrastructure = new Infrastructure("cm");
@@ -114,9 +116,9 @@ public CloudMachineInfrastructure(string cmId)
114116
SupportOrdering = true,
115117
Status = ServiceBusMessagingEntityStatus.Active
116118
};
117-
_serviceBusSubscription_private = new("cm_servicebus_subscription_private", "2021-11-01")
119+
_serviceBusSubscription_private = new(SB_PRIVATE_SUB, "2021-11-01")
118120
{
119-
Name = "cm_servicebus_subscription_private",
121+
Name = SB_PRIVATE_SUB,
120122
Parent = _serviceBusTopic_private,
121123
IsClientAffine = false,
122124
LockDuration = new StringLiteral("PT30S"),

sdk/provisioning/Azure.Provisioning.CloudMachine/src/OFX/CloudMachineWorkspace.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Diagnostics.CodeAnalysis;
77
using Azure.Core;
88
using Azure.Identity;
9+
using Azure.Provisioning.CloudMachine;
910
using Microsoft.Extensions.Configuration;
1011

1112
namespace Azure.CloudMachine;
@@ -52,9 +53,9 @@ public override ClientConnectionOptions GetConnectionOptions(Type clientType, st
5253
case "Azure.Security.KeyVault.Secrets.SecretClient":
5354
return new ClientConnectionOptions(new($"https://{this.Id}.vault.azure.net/"), Credential);
5455
case "Azure.Messaging.ServiceBus.ServiceBusClient":
55-
return new ClientConnectionOptions(new($"{this.Id}.servicebus.windows.net"), Credential);
56+
return new ClientConnectionOptions(new($"https://{this.Id}.servicebus.windows.net"), Credential);
5657
case "Azure.Messaging.ServiceBus.ServiceBusSender":
57-
if (instanceId == default) instanceId = "cm_default_topic_sender";
58+
if (instanceId == default) instanceId = CloudMachineInfrastructure.SB_PRIVATE_TOPIC;
5859
return new ClientConnectionOptions(instanceId);
5960
case "Azure.Storage.Blobs.BlobContainerClient":
6061
if (instanceId == default) instanceId = "default";

sdk/provisioning/Azure.Provisioning.CloudMachine/src/OFX/MessagingServices.cs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.Threading.Tasks;
56
using Azure.Core;
67
using Azure.Messaging.ServiceBus;
8+
using Azure.Provisioning.CloudMachine;
79

810
namespace Azure.CloudMachine;
911

@@ -25,7 +27,20 @@ public void SendMessage(object serializable)
2527

2628
public void WhenMessageReceived(Action<string> received)
2729
{
28-
throw new NotImplementedException();
30+
var processor = _cm.Messaging.GetServiceBusProcessor();
31+
var cm = _cm;
32+
33+
// TODO: How to unsubscribe?
34+
// TODO: Use a subscription filter to ignore Event Grid system events
35+
processor.ProcessMessageAsync += async (args) =>
36+
{
37+
received(args.Message.Body.ToString());
38+
await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
39+
await Task.CompletedTask.ConfigureAwait(false);
40+
};
41+
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
42+
processor.StartProcessingAsync().GetAwaiter().GetResult();
43+
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
2944
}
3045

3146
private ServiceBusClient GetServiceBusClient()
@@ -42,6 +57,13 @@ private ServiceBusSender GetServiceBusSender()
4257
return sender;
4358
}
4459

60+
internal ServiceBusProcessor GetServiceBusProcessor()
61+
{
62+
MessagingServices messagingServices = this;
63+
ServiceBusProcessor sender = _cm.Subclients.Get(() => messagingServices.CreateProcessor());
64+
return sender;
65+
}
66+
4567
private ServiceBusSender CreateSender()
4668
{
4769
ServiceBusClient client = GetServiceBusClient();
@@ -53,7 +75,19 @@ private ServiceBusSender CreateSender()
5375
private ServiceBusClient CreateClient()
5476
{
5577
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient));
56-
ServiceBusClient client = new(connection.Id, connection.TokenCredential);
78+
ServiceBusClient client = new(connection.Endpoint!.AbsoluteUri, connection.TokenCredential);
5779
return client;
5880
}
81+
private ServiceBusProcessor CreateProcessor()
82+
{
83+
ServiceBusClient client = GetServiceBusClient();
84+
85+
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender));
86+
ServiceBusProcessor processor = client.CreateProcessor(
87+
connection.Id,
88+
CloudMachineInfrastructure.SB_PRIVATE_SUB,
89+
new() { ReceiveMode = ServiceBusReceiveMode.PeekLock, MaxConcurrentCalls = 5 });
90+
processor.ProcessErrorAsync += (args) => throw new Exception("error processing event", args.Exception);
91+
return processor;
92+
}
5993
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.ComponentModel;
6+
using System.Threading;
7+
8+
namespace Azure.CloudMachine;
9+
10+
public class StorageFile
11+
{
12+
private readonly Response? _response;
13+
14+
private StorageServices _storage;
15+
public string Path { get; internal set; }
16+
17+
/// <summary>
18+
/// The requestId for the storage operation that triggered this event
19+
/// </summary>
20+
public string RequestId { get; internal set; }
21+
22+
/// <summary>
23+
///
24+
/// </summary>
25+
/// <param name="result"></param>
26+
/// <remarks>returns null if the file is not created as a return value of a service method call.</remarks>
27+
public static implicit operator Response?(StorageFile result) => result._response;
28+
29+
public CancellationToken CancellationToken { get; internal set; }
30+
31+
public BinaryData Download()
32+
=> _storage.DownloadBlob(Path);
33+
34+
// public async Task<BinaryData> DownloadAsync()
35+
// => await _storage.DownloadBlobAsync(Path).ConfigureAwait(false);
36+
37+
public void Delete()
38+
=> _storage.DeleteBlob(Path);
39+
40+
// public async Task DeleteAsync()
41+
// => await _storage.DeleteBlobAsync(Path).ConfigureAwait(false);
42+
43+
// public Uri ShareFolder(AccessPermissions permissions, TimeSpan expiresAfter)
44+
// => _storage.ShareFolder(Path, permissions, expiresAfter);
45+
46+
// public Uri ShareFile(AccessPermissions permissions, TimeSpan expiresAfter)
47+
// => _storage.ShareFile(Path, permissions, expiresAfter);
48+
49+
internal StorageFile(StorageServices storage, string path, string requestId, Response? response = default)
50+
{
51+
_storage = storage;
52+
Path = path;
53+
RequestId = requestId;
54+
_response = response;
55+
}
56+
57+
[EditorBrowsable(EditorBrowsableState.Never)]
58+
public override bool Equals(object obj) => base.Equals(obj);
59+
60+
[EditorBrowsable(EditorBrowsableState.Never)]
61+
public override int GetHashCode() => base.GetHashCode();
62+
63+
[EditorBrowsable(EditorBrowsableState.Never)]
64+
public override string ToString() => $"{Path}";
65+
}

sdk/provisioning/Azure.Provisioning.CloudMachine/src/OFX/StorageServices.cs

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// Licensed under the MIT License.
33

44
using System;
5+
using System.IO;
56
using System.Threading.Tasks;
7+
using Azure.Messaging.EventGrid;
8+
using Azure.Messaging.EventGrid.SystemEvents;
69
using Azure.Core;
710
using Azure.Storage.Blobs;
811
using Azure.Storage.Blobs.Models;
@@ -26,31 +29,105 @@ private BlobContainerClient GetDefaultContainer()
2629
return container;
2730
}
2831

32+
private BlobContainerClient GetContainer(string containerName)
33+
{
34+
string blobContainerClientId = typeof(BlobContainerClient).FullName;
35+
CloudMachineClient cm = _cm;
36+
BlobContainerClient container = cm.Subclients.Get(() =>
37+
{
38+
ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient), containerName);
39+
BlobContainerClient container = new(connection.Endpoint, connection.TokenCredential);
40+
return container;
41+
});
42+
return container;
43+
}
44+
2945
public string UploadBlob(object json, string? name = default)
3046
{
3147
BlobContainerClient container = GetDefaultContainer();
3248

33-
if (name == default) name = $"b{Guid.NewGuid()}";
49+
if (name == default)
50+
name = $"b{Guid.NewGuid()}";
3451

3552
container.UploadBlob(name, BinaryData.FromObjectAsJson(json));
3653

3754
return name;
3855
}
3956

40-
public BinaryData DownloadBlob(string name)
57+
public BinaryData DownloadBlob(string path)
4158
{
42-
BlobContainerClient container = GetDefaultContainer();
43-
BlobClient blob = container.GetBlobClient(name);
59+
BlobClient blob = GetBlobClientFromPath(path, null);
4460
BlobDownloadResult result = blob.DownloadContent();
4561
return result.Content;
4662
}
4763

48-
public void WhenBlobUploaded(Action<string> function)
64+
public void DeleteBlob(string path)
65+
{
66+
BlobClient blob = GetBlobClientFromPath(path, null);
67+
blob.DeleteIfExists();
68+
}
69+
70+
private BlobClient GetBlobClientFromPath(string path, string? containerName)
71+
{
72+
var _blobContainer = GetDefaultContainer();
73+
var blobPath = ConvertPathToBlobPath(path, _blobContainer);
74+
if (containerName is null)
75+
{
76+
return _blobContainer.GetBlobClient(blobPath);
77+
}
78+
else
79+
{
80+
var container = GetContainer(containerName);
81+
container.CreateIfNotExists();
82+
return container.GetBlobClient(blobPath);
83+
}
84+
}
85+
86+
private static string ConvertPathToBlobPath(string path, BlobContainerClient container)
4987
{
50-
throw new NotImplementedException();
88+
if (Uri.TryCreate(path, UriKind.Absolute, out Uri blobUri))
89+
{
90+
if (blobUri.Host == container.Uri.Host)
91+
return blobUri.AbsoluteUri.Substring(container.Uri.AbsoluteUri.Length);
92+
if (!string.IsNullOrEmpty(blobUri.LocalPath))
93+
{
94+
return blobUri.LocalPath.Substring(Path.GetPathRoot(path).Length).Replace('\\', '/');
95+
}
96+
}
97+
return path.Substring(Path.GetPathRoot(path).Length).Replace('\\', '/');
5198
}
52-
public void WhenBlobCreated(Func<string, Task> function)
99+
100+
public void WhenBlobUploaded(Action<StorageFile> function)
53101
{
54-
throw new NotImplementedException();
102+
var processor = _cm.Messaging.GetServiceBusProcessor();
103+
var cm = _cm;
104+
105+
// TODO: How to unsubscribe?
106+
processor.ProcessMessageAsync += async (args) =>
107+
{
108+
EventGridEvent e = EventGridEvent.Parse(args.Message.Body);
109+
if (e.TryGetSystemEventData(out object systemEvent))
110+
{
111+
switch (systemEvent)
112+
{
113+
case StorageBlobCreatedEventData bc:
114+
var blobUri = bc.Url;
115+
var requestId = bc.ClientRequestId;
116+
// _logger.Log.EventReceived(nameof(OnProcessMessage), $"StorageBlobCreatedEventData: blobUri='{blobUri}' requestId='{requestId}'");
117+
118+
var eventArgs = new StorageFile(cm.Storage, blobUri, requestId, default);
119+
function(eventArgs);
120+
await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
121+
break;
122+
default:
123+
break;
124+
}
125+
}
126+
await Task.CompletedTask.ConfigureAwait(false);
127+
};
128+
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
129+
processor.StartProcessingAsync().GetAwaiter().GetResult();
130+
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
131+
55132
}
56133
}

0 commit comments

Comments
 (0)