-
Notifications
You must be signed in to change notification settings - Fork 5.1k
WebPubSub input and output binding identity-based connection #54208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using Azure.Messaging.WebPubSub; | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.WebPubSub | ||
| { | ||
| internal interface IWebPubSubServiceClientFactory | ||
| { | ||
| /// <summary> | ||
| /// Creates a WebPubSubServiceClient with fallback connection and hub resolution. | ||
| /// Priority for connection: | ||
| /// 1. attributeConnection (can be connection string or config section name) | ||
| /// 2. options (identity-based connection prioritized over connection string for security) | ||
| /// Priority for hub: attributeHub > options.Hub | ||
| /// </summary> | ||
| /// <param name="attributeConnection">Connection from the attribute (can be connection string or config section name).</param> | ||
| /// <param name="attributeHub">Hub from the attribute (highest priority).</param> | ||
| /// <returns>A configured WebPubSubServiceClient instance.</returns> | ||
| WebPubSubServiceClient Create(string attributeConnection, string attributeHub); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -27,18 +27,24 @@ internal class WebPubSubConfigProvider : IExtensionConfigProvider, IAsyncConvert | |||||
| private readonly ILogger _logger; | ||||||
| private readonly WebPubSubFunctionsOptions _options; | ||||||
| private readonly IWebPubSubTriggerDispatcher _dispatcher; | ||||||
| private readonly IWebPubSubServiceClientFactory _clientFactory; | ||||||
| private readonly IOptionsMonitor<WebPubSubServiceAccessOptions> _accessOptions; | ||||||
|
|
||||||
| public WebPubSubConfigProvider( | ||||||
| IOptions<WebPubSubFunctionsOptions> options, | ||||||
| INameResolver nameResolver, | ||||||
| ILoggerFactory loggerFactory, | ||||||
| IConfiguration configuration) | ||||||
| IConfiguration configuration, | ||||||
| IOptionsMonitor<WebPubSubServiceAccessOptions> accessOptions, | ||||||
| IWebPubSubServiceClientFactory clientFactory) | ||||||
| { | ||||||
| _options = options.Value; | ||||||
| _logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("WebPubSub")); | ||||||
| _nameResolver = nameResolver; | ||||||
| _configuration = configuration; | ||||||
| _dispatcher = new WebPubSubTriggerDispatcher(_logger, _options); | ||||||
| _accessOptions = accessOptions; | ||||||
| _clientFactory = clientFactory; | ||||||
| } | ||||||
|
|
||||||
| public void Initialize(ExtensionConfigContext context) | ||||||
|
|
@@ -48,16 +54,6 @@ public void Initialize(ExtensionConfigContext context) | |||||
| throw new ArgumentNullException(nameof(context)); | ||||||
| } | ||||||
|
|
||||||
| if (string.IsNullOrEmpty(_options.ConnectionString)) | ||||||
| { | ||||||
| _options.ConnectionString = _nameResolver.Resolve(Constants.WebPubSubConnectionStringName); | ||||||
| } | ||||||
|
|
||||||
| if (string.IsNullOrEmpty(_options.Hub)) | ||||||
| { | ||||||
| _options.Hub = _nameResolver.Resolve(Constants.HubNameStringName); | ||||||
| } | ||||||
|
|
||||||
| Exception webhookException = null; | ||||||
| try | ||||||
| { | ||||||
|
|
@@ -107,25 +103,53 @@ public Task<HttpResponseMessage> ConvertAsync(HttpRequestMessage input, Cancella | |||||
| return _dispatcher.ExecuteAsync(input, cancellationToken); | ||||||
| } | ||||||
|
|
||||||
| private void ValidateWebPubSubConnectionAttributeBinding(WebPubSubConnectionAttribute attribute, Type type) | ||||||
| internal WebPubSubService GetService(WebPubSubAttribute attribute) | ||||||
| { | ||||||
| ValidateConnectionString( | ||||||
| var client = _clientFactory.Create( | ||||||
| attribute.Connection, | ||||||
| $"{nameof(WebPubSubConnectionAttribute)}.{nameof(WebPubSubConnectionAttribute.Connection)}"); | ||||||
| attribute.Hub); | ||||||
| return new WebPubSubService(client); | ||||||
| } | ||||||
|
|
||||||
| private void ValidateWebPubSubAttributeBinding(WebPubSubAttribute attribute, Type type) | ||||||
| { | ||||||
| ValidateConnectionString( | ||||||
| attribute.Connection, | ||||||
| $"{nameof(WebPubSubAttribute)}.{nameof(WebPubSubAttribute.Connection)}"); | ||||||
| ValidateWebPubSubConnectionCore(attribute.Connection, attribute.Hub, "WebPubSub"); | ||||||
| } | ||||||
|
|
||||||
| internal WebPubSubService GetService(WebPubSubAttribute attribute) | ||||||
| private void ValidateWebPubSubConnectionAttributeBinding(WebPubSubConnectionAttribute attribute, Type type) | ||||||
| { | ||||||
| ValidateWebPubSubConnectionCore(attribute.Connection, attribute.Hub, "WebPubSubConnection"); | ||||||
| } | ||||||
|
|
||||||
| private void ValidateWebPubSubConnectionCore(string attributeConnection, string attributeHub, string extensionType) | ||||||
| { | ||||||
| var connectionString = Utilities.FirstOrDefault(attribute.Connection, _options.ConnectionString); | ||||||
| var hubName = Utilities.FirstOrDefault(attribute.Hub, _options.Hub); | ||||||
| return new WebPubSubService(connectionString, hubName); | ||||||
| var webPubSubAccessExists = true; | ||||||
| if (attributeConnection == null) | ||||||
| { | ||||||
| if (_accessOptions.CurrentValue.WebPubSubAccess == null) | ||||||
| { | ||||||
| webPubSubAccessExists = false; | ||||||
| } | ||||||
| } | ||||||
| else | ||||||
| { | ||||||
| if (!WebPubSubServiceAccessUtil.CanCreateFromIConfiguration(_configuration.GetSection(attributeConnection))) | ||||||
| { | ||||||
| webPubSubAccessExists = false; | ||||||
| } | ||||||
| } | ||||||
| if (!webPubSubAccessExists) | ||||||
| { | ||||||
| throw new InvalidOperationException( | ||||||
| $"Connection must be specified through one of the following:" + Environment.NewLine + | ||||||
| $" * Set '{extensionType}.Connection' property to the name of a config section that contains a Web PubSub connection." + Environment.NewLine + | ||||||
| $" * Set a Web PubSub connection under '{Constants.WebPubSubConnectionStringName}'."); | ||||||
| } | ||||||
|
|
||||||
| if ((attributeHub ?? _accessOptions.CurrentValue.Hub) is null) | ||||||
| { | ||||||
| throw new InvalidOperationException($"Resolved 'Hub' value is null for extension '{extensionType}''"); | ||||||
|
||||||
| throw new InvalidOperationException($"Resolved 'Hub' value is null for extension '{extensionType}''"); | |
| throw new InvalidOperationException($"Resolved 'Hub' value is null for extension '{extensionType}'"); |
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -4,7 +4,11 @@ | |||
| using System; | ||||
| using Microsoft.Azure.WebJobs; | ||||
| using Microsoft.Azure.WebJobs.Extensions.WebPubSub; | ||||
| using Microsoft.Extensions.Azure; | ||||
| using Microsoft.Extensions.Configuration; | ||||
| using Microsoft.Extensions.DependencyInjection; | ||||
| using Microsoft.Extensions.DependencyInjection.Extensions; | ||||
| using Microsoft.Extensions.Options; | ||||
|
|
||||
| namespace Microsoft.Extensions.Hosting | ||||
| { | ||||
|
|
@@ -20,13 +24,21 @@ public static class WebPubSubJobsBuilderExtensions | |||
| /// <returns><see cref="IWebJobsBuilder"/>.</returns> | ||||
| public static IWebJobsBuilder AddWebPubSub(this IWebJobsBuilder builder) | ||||
| { | ||||
| ; | ||||
|
||||
| ; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using System; | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.WebPubSub; | ||
|
|
||
| #nullable enable | ||
|
|
||
| /// <summary> | ||
| /// Access information to Web PubSub service. | ||
| /// </summary> | ||
| internal class WebPubSubServiceAccess(Uri serviceEndpoint, WebPubSubServiceCredential credential) | ||
| { | ||
| public Uri ServiceEndpoint { get; } = serviceEndpoint; | ||
| public WebPubSubServiceCredential Credential { get; } = credential; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.WebPubSub; | ||
|
|
||
| internal class WebPubSubServiceAccessOptions | ||
| { | ||
| public WebPubSubServiceAccess? WebPubSubAccess { get; set; } | ||
| public string? Hub { get; set; } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using Microsoft.Extensions.Azure; | ||
| using Microsoft.Extensions.Configuration; | ||
| using Microsoft.Extensions.Options; | ||
|
|
||
| namespace Microsoft.Azure.WebJobs.Extensions.WebPubSub | ||
| { | ||
| /// <summary> | ||
| /// Configures <see cref="WebPubSubServiceAccessOptions"/> by reading from the default configuration section. | ||
| /// </summary> | ||
| internal class WebPubSubServiceAccessOptionsSetup : IConfigureOptions<WebPubSubServiceAccessOptions> | ||
| { | ||
| private readonly IConfiguration _configuration; | ||
| private readonly AzureComponentFactory _azureComponentFactory; | ||
| private readonly INameResolver _nameResolver; | ||
| private readonly IOptionsMonitor<WebPubSubFunctionsOptions> _publicOptions; | ||
|
|
||
| public WebPubSubServiceAccessOptionsSetup( | ||
| IConfiguration configuration, | ||
| AzureComponentFactory azureComponentFactory, | ||
| INameResolver nameResolver, | ||
| IOptionsMonitor<WebPubSubFunctionsOptions> publicOptions) | ||
| { | ||
| _configuration = configuration; | ||
| _azureComponentFactory = azureComponentFactory; | ||
| _nameResolver = nameResolver; | ||
| _publicOptions = publicOptions; | ||
| } | ||
|
|
||
| public void Configure(WebPubSubServiceAccessOptions options) | ||
| { | ||
| var publicOptions = _publicOptions.CurrentValue; | ||
|
|
||
| // WebPubSubFunctionsOptions.ConnectionString can be set via code only. Takes the highest priority. | ||
| if (!string.IsNullOrEmpty(publicOptions.ConnectionString)) | ||
| { | ||
| options.WebPubSubAccess = WebPubSubServiceAccessUtil.CreateFromConnectionString(publicOptions.ConnectionString); | ||
| } | ||
| else | ||
| { | ||
| var defaultSection = _configuration.GetSection(Constants.WebPubSubConnectionStringName); | ||
| if (WebPubSubServiceAccessUtil.CreateFromIConfiguration(defaultSection, _azureComponentFactory, out var access)) | ||
| { | ||
| options.WebPubSubAccess = access!; | ||
| } | ||
| } | ||
|
|
||
| // Only configure Hub from the default config section if not already set | ||
| options.Hub = publicOptions.Hub ?? _nameResolver.Resolve(Constants.HubNameStringName); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,117 @@ | ||||||||||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||||||||||
| // Licensed under the MIT License. | ||||||||||
|
|
||||||||||
| using System; | ||||||||||
| using System.Collections.Generic; | ||||||||||
| using Microsoft.Extensions.Azure; | ||||||||||
| using Microsoft.Extensions.Configuration; | ||||||||||
|
|
||||||||||
| namespace Microsoft.Azure.WebJobs.Extensions.WebPubSub; | ||||||||||
|
|
||||||||||
| internal static class WebPubSubServiceAccessUtil | ||||||||||
| { | ||||||||||
| private const string EndpointPropertyName = "Endpoint"; | ||||||||||
| private const string AccessKeyPropertyName = "AccessKey"; | ||||||||||
| private const string PortPropertyName = "Port"; | ||||||||||
| private static readonly char[] KeyValueSeparator = { '=' }; | ||||||||||
| private static readonly char[] PropertySeparator = { ';' }; | ||||||||||
|
|
||||||||||
| internal static WebPubSubServiceAccess CreateFromConnectionString(string connectionString) | ||||||||||
| { | ||||||||||
| if (string.IsNullOrEmpty(connectionString)) | ||||||||||
| { | ||||||||||
| throw new ArgumentNullException(nameof(connectionString)); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| var properties = connectionString.Split(PropertySeparator, StringSplitOptions.RemoveEmptyEntries); | ||||||||||
|
|
||||||||||
| var dict = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase); | ||||||||||
| foreach (var property in properties) | ||||||||||
| { | ||||||||||
| var kvp = property.Split(KeyValueSeparator, 2); | ||||||||||
| if (kvp.Length != 2) | ||||||||||
| continue; | ||||||||||
|
|
||||||||||
| var key = kvp[0].Trim(); | ||||||||||
| if (dict.ContainsKey(key)) | ||||||||||
| { | ||||||||||
| throw new ArgumentException($"Duplicate properties found in connection string: {key}."); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| dict.Add(key, kvp[1].Trim()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| if (!dict.TryGetValue(EndpointPropertyName, out var endpoint)) | ||||||||||
| { | ||||||||||
| throw new ArgumentException($"Required property not found in connection string: {EndpointPropertyName}."); | ||||||||||
| } | ||||||||||
| endpoint = endpoint.TrimEnd('/'); | ||||||||||
|
|
||||||||||
| // AccessKey is optional when connection string is disabled. | ||||||||||
| dict.TryGetValue(AccessKeyPropertyName, out var accessKey); | ||||||||||
|
|
||||||||||
| int? port = null; | ||||||||||
| if (dict.TryGetValue(PortPropertyName, out var rawPort)) | ||||||||||
| { | ||||||||||
| if (int.TryParse(rawPort, out var portValue) && portValue > 0 && portValue <= 0xFFFF) | ||||||||||
| { | ||||||||||
| port = portValue; | ||||||||||
| } | ||||||||||
| else | ||||||||||
| { | ||||||||||
| throw new ArgumentException($"Invalid Port value: {rawPort}"); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| var uriBuilder = new UriBuilder(endpoint); | ||||||||||
| if (port.HasValue) | ||||||||||
| { | ||||||||||
| uriBuilder.Port = port.Value; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return new WebPubSubServiceAccess(uriBuilder.Uri, new KeyCredential(accessKey)); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| internal static bool CreateFromIConfiguration(IConfigurationSection section, AzureComponentFactory azureComponentFactory, out WebPubSubServiceAccess? result) | ||||||||||
| { | ||||||||||
| if (!string.IsNullOrEmpty(section.Value)) | ||||||||||
| { | ||||||||||
| result = CreateFromConnectionString(section.Value); | ||||||||||
| return true; | ||||||||||
| } | ||||||||||
| else | ||||||||||
| { | ||||||||||
| // Check if this is an identity-based connection (has serviceUri) | ||||||||||
| var serviceUri = section[Constants.ServiceUriKey]; | ||||||||||
| if (!string.IsNullOrEmpty(serviceUri)) | ||||||||||
| { | ||||||||||
| var endpoint = new Uri(serviceUri); | ||||||||||
| var tokenCrential = azureComponentFactory.CreateTokenCredential(section); | ||||||||||
| result = new WebPubSubServiceAccess(endpoint, new IdentityCredential(tokenCrential)); | ||||||||||
|
Comment on lines
+89
to
+90
|
||||||||||
| var tokenCrential = azureComponentFactory.CreateTokenCredential(section); | |
| result = new WebPubSubServiceAccess(endpoint, new IdentityCredential(tokenCrential)); | |
| var tokenCredential = azureComponentFactory.CreateTokenCredential(section); | |
| result = new WebPubSubServiceAccess(endpoint, new IdentityCredential(tokenCredential)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation logic has inconsistent handling: when
attributeConnectionis null, it checks if_accessOptions.CurrentValue.WebPubSubAccess == null, but whenattributeConnectionis not null, it only checks if the configuration section can create an access object. This means if a non-null but empty connection name is provided, and the global connection is configured, the validation will fail even though it could fall back to the global connection. Consider aligning the validation logic with the actual client creation logic inWebPubSubServiceClientFactory.Create().