From e1662a1f2cc91bc9c26f3b47c0897ce91da0567d Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 15 May 2025 11:55:44 -0700 Subject: [PATCH 1/6] Add AspNetCoreLocalGrpcListener --- .../DurableTaskExtension.cs | 9 +- .../Grpc/AspNetCoreLocalGrpcListener.cs | 120 ++++ .../Grpc/ILocalGrpcListener.cs | 38 ++ .../Grpc/LegacyLocalGrpcListener.cs | 123 ++++ .../Grpc/{ => Protos}/README.md | 0 .../{ => Protos}/orchestrator_service.proto | 0 .../Grpc/{ => Protos}/refresh-protos.ps1 | 0 .../Grpc/{ => Protos}/versions.txt | 0 .../LocalGrpcListener.cs | 594 ------------------ .../Options/DurableTaskOptions.cs | 8 +- .../TaskHubGrpcServer.cs | 479 ++++++++++++++ .../WebJobs.Extensions.DurableTask.csproj | 3 +- 12 files changed, 773 insertions(+), 601 deletions(-) create mode 100644 src/WebJobs.Extensions.DurableTask/Grpc/AspNetCoreLocalGrpcListener.cs create mode 100644 src/WebJobs.Extensions.DurableTask/Grpc/ILocalGrpcListener.cs create mode 100644 src/WebJobs.Extensions.DurableTask/Grpc/LegacyLocalGrpcListener.cs rename src/WebJobs.Extensions.DurableTask/Grpc/{ => Protos}/README.md (100%) rename src/WebJobs.Extensions.DurableTask/Grpc/{ => Protos}/orchestrator_service.proto (100%) rename src/WebJobs.Extensions.DurableTask/Grpc/{ => Protos}/refresh-protos.ps1 (100%) rename src/WebJobs.Extensions.DurableTask/Grpc/{ => Protos}/versions.txt (100%) delete mode 100644 src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs create mode 100644 src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 6ca3fe2be..e83a0e6f6 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -19,6 +19,7 @@ using DurableTask.Core.Middleware; using Microsoft.Azure.WebJobs.Description; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Storage; using Microsoft.Azure.WebJobs.Host; @@ -66,7 +67,7 @@ public class DurableTaskExtension : #pragma warning disable CS0169 private readonly ITelemetryActivator telemetryActivator; #pragma warning restore CS0169 - private readonly LocalGrpcListener localGrpcListener; + private readonly ILocalGrpcListener localGrpcListener; private readonly bool isOptionsConfigured; private readonly Guid extensionGuid; @@ -174,7 +175,7 @@ public DurableTaskExtension( runtimeType == WorkerRuntimeType.Custom) { this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.MiddlewarePassthrough; - this.localGrpcListener = new LocalGrpcListener(this); + this.localGrpcListener = LocalGrpcListener.Create(this, this.Options.GrpcListenerMode); this.HostLifetimeService.OnStopped.Register(this.StopLocalGrpcServer); } else @@ -479,12 +480,12 @@ private void StopLocalHttpServer() private void StartLocalGrpcServer() { - this.localGrpcListener.StartAsync().GetAwaiter().GetResult(); + this.localGrpcListener.StartAsync(default).GetAwaiter().GetResult(); } private void StopLocalGrpcServer() { - this.localGrpcListener.StopAsync().GetAwaiter().GetResult(); + this.localGrpcListener.StopAsync(default).GetAwaiter().GetResult(); } private void InitializeForFunctionsV1(ExtensionConfigContext context) diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/AspNetCoreLocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/Grpc/AspNetCoreLocalGrpcListener.cs new file mode 100644 index 000000000..7979cf7b9 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Grpc/AspNetCoreLocalGrpcListener.cs @@ -0,0 +1,120 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#nullable enable +using System; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Hosting.Server; +using Microsoft.AspNetCore.Hosting.Server.Features; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc +{ + internal class AspNetCoreLocalGrpcListener : ILocalGrpcListener + { + private const string HostName = "127.0.0.1"; + private readonly DurableTaskExtension extension; + private IHost? host; + + public AspNetCoreLocalGrpcListener(DurableTaskExtension extension) + { + this.extension = extension ?? throw new ArgumentNullException(nameof(extension)); + } + + public string? ListenAddress { get; private set; } + + public async Task StartAsync(CancellationToken cancellationToken) + { + int port = GetFreeTcpPort(); + this.host = new HostBuilder().ConfigureWebHost( + builder => + { + builder.UseKestrel(o => o.Listen( + IPAddress.Parse(HostName), + port, + listenOptions => listenOptions.Protocols = HttpProtocols.Http2)); + + builder.ConfigureServices(services => + { + services.AddSingleton(this.extension); + services.AddGrpc(options => + { + options.MaxReceiveMessageSize = int.MaxValue; + options.MaxSendMessageSize = int.MaxValue; + }); + }); + + builder.Configure((context, app) => + { + if (context.HostingEnvironment.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + app.UseRouting(); + app.UseEndpoints(endpoints => + { + endpoints.MapGrpcService(); + }); + }); + }) + .Build(); + + await this.host.StartAsync(cancellationToken); + + // Get the actual address we've started on. + IServer? server = this.host.Services.GetService(); + IServerAddressesFeature? addressFeature = server?.Features.Get(); + this.ListenAddress = addressFeature?.Addresses.SingleOrDefault(); + + var expected = new Uri($"http://{HostName}:{port}"); + if (!Uri.TryCreate(this.ListenAddress, UriKind.Absolute, out Uri? uri) || expected != uri) + { + this.extension.TraceHelper.ExtensionWarningEvent( + this.extension.Options.HubName, + instanceId: string.Empty, + functionName: string.Empty, + message: $"Configured Uri ({expected}) does not match actual Uri ({uri})."); + } + + this.extension.TraceHelper.ExtensionInformationalEvent( + this.extension.Options.HubName, + instanceId: string.Empty, + functionName: string.Empty, + message: $"Opened local gRPC endpoint: {this.ListenAddress} (Mode=AspNetCore)", + writeToUserLogs: true); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + if (this.host is { } host) + { + return host.StopAsync(cancellationToken); + } + + return Task.CompletedTask; + } + + private static int GetFreeTcpPort() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + try + { + listener.Start(); + return ((IPEndPoint)listener.LocalEndpoint).Port; + } + finally + { + listener.Stop(); + } + } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/ILocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/Grpc/ILocalGrpcListener.cs new file mode 100644 index 000000000..13447be8e --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Grpc/ILocalGrpcListener.cs @@ -0,0 +1,38 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#nullable enable +using Microsoft.Extensions.Hosting; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc +{ + internal enum LocalGrpcListenerMode + { + Default = 0, + Legacy = 1, + AspNetCore = 2, + } + + /// + /// A local listener for gRPC communication between host and worker. + /// + internal interface ILocalGrpcListener : IHostedService + { + /// + /// Gets the address this listener is listening to. + /// + string? ListenAddress { get; } + } + + internal static class LocalGrpcListener + { + public static ILocalGrpcListener Create(DurableTaskExtension extension, LocalGrpcListenerMode mode) + { + return mode switch + { + LocalGrpcListenerMode.Default or LocalGrpcListenerMode.AspNetCore => new AspNetCoreLocalGrpcListener(extension), + _ => new LegacyLocalGrpcListener(extension), + }; + } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/LegacyLocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/Grpc/LegacyLocalGrpcListener.cs new file mode 100644 index 000000000..731a2a56f --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/Grpc/LegacyLocalGrpcListener.cs @@ -0,0 +1,123 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#nullable enable +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc +{ + internal class LegacyLocalGrpcListener : ILocalGrpcListener + { + private const int DefaultPort = 4001; + + // Pick a large, fixed range of ports that are going to be valid in all environment. + // Avoiding ports below 1024 as those are blocked by app service sandbox. + // Ephemeral ports for most OS start well above 32768. See https://www.ncftp.com/ncftpd/doc/misc/ephemeral_ports.html + private const int MinPort = 30000; + private const int MaxPort = 31000; + + private readonly DurableTaskExtension extension; + + private readonly Random portGenerator; + private readonly HashSet attemptedPorts; + + private Server? grpcServer; + + public LegacyLocalGrpcListener(DurableTaskExtension extension) + { + this.extension = extension ?? throw new ArgumentNullException(nameof(extension)); + + this.portGenerator = new Random(); + this.attemptedPorts = new HashSet(); + } + + public string? ListenAddress { get; private set; } + + public async Task StartAsync(CancellationToken cancelToken) + { + const int maxAttempts = 10; + int numAttempts = 1; + while (numAttempts <= maxAttempts) + { + ChannelOption[] options = new[] + { + new ChannelOption(ChannelOptions.MaxReceiveMessageLength, int.MaxValue), + new ChannelOption(ChannelOptions.MaxSendMessageLength, int.MaxValue), + }; + + if (this.grpcServer != null) + { + await this.grpcServer.ShutdownAsync(); + } + + this.grpcServer = new Server(options); + this.grpcServer.Services.Add(P.TaskHubSidecarService.BindService(new TaskHubGrpcServer(this.extension))); + + int listeningPort = numAttempts == 1 ? DefaultPort : this.GetRandomPort(); + int portBindingResult = this.grpcServer.Ports.Add("localhost", listeningPort, ServerCredentials.Insecure); + if (portBindingResult != 0) + { + try + { + this.grpcServer.Start(); + this.ListenAddress = $"http://localhost:{listeningPort}"; + + this.extension.TraceHelper.ExtensionInformationalEvent( + this.extension.Options.HubName, + instanceId: string.Empty, + functionName: string.Empty, + message: $"Opened local gRPC endpoint: {this.ListenAddress}", + writeToUserLogs: true); + + return; + } + catch (IOException) + { + portBindingResult = 0; + } + } + + if (portBindingResult == 0) + { + this.extension.TraceHelper.ExtensionWarningEvent( + this.extension.Options.HubName, + functionName: string.Empty, + instanceId: string.Empty, + message: $"Failed to open local port {listeningPort}. This was attempt #{numAttempts} to open a local port."); + this.attemptedPorts.Add(listeningPort); + numAttempts++; + } + } + + throw new IOException($"Unable to find a port to open an RPC endpoint on after {maxAttempts} attempts"); + } + + public async Task StopAsync(CancellationToken cancelToken) + { + if (this.grpcServer != null) + { + await this.grpcServer.ShutdownAsync(); + } + } + + private int GetRandomPort() + { + // Get a random port that has not already been attempted so we don't waste time trying + // to listen to a port we know is not free. + int randomPort; + do + { + randomPort = this.portGenerator.Next(MinPort, MaxPort); + } + while (this.attemptedPorts.Contains(randomPort)); + + return randomPort; + } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/README.md b/src/WebJobs.Extensions.DurableTask/Grpc/Protos/README.md similarity index 100% rename from src/WebJobs.Extensions.DurableTask/Grpc/README.md rename to src/WebJobs.Extensions.DurableTask/Grpc/Protos/README.md diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/orchestrator_service.proto b/src/WebJobs.Extensions.DurableTask/Grpc/Protos/orchestrator_service.proto similarity index 100% rename from src/WebJobs.Extensions.DurableTask/Grpc/orchestrator_service.proto rename to src/WebJobs.Extensions.DurableTask/Grpc/Protos/orchestrator_service.proto diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/refresh-protos.ps1 b/src/WebJobs.Extensions.DurableTask/Grpc/Protos/refresh-protos.ps1 similarity index 100% rename from src/WebJobs.Extensions.DurableTask/Grpc/refresh-protos.ps1 rename to src/WebJobs.Extensions.DurableTask/Grpc/Protos/refresh-protos.ps1 diff --git a/src/WebJobs.Extensions.DurableTask/Grpc/versions.txt b/src/WebJobs.Extensions.DurableTask/Grpc/Protos/versions.txt similarity index 100% rename from src/WebJobs.Extensions.DurableTask/Grpc/versions.txt rename to src/WebJobs.Extensions.DurableTask/Grpc/Protos/versions.txt diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs deleted file mode 100644 index 6f91b16d1..000000000 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ /dev/null @@ -1,594 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See LICENSE in the project root for license information. -#nullable enable -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Reflection; -using System.Reflection.Metadata.Ecma335; -using System.Threading; -using System.Threading.Tasks; -using DurableTask.Core; -using DurableTask.Core.Entities; -using DurableTask.Core.Exceptions; -using DurableTask.Core.History; -using DurableTask.Core.Query; -using DurableTask.Core.Serializing.Internal; -using Google.Protobuf.WellKnownTypes; -using Grpc.Core; -using Microsoft.ApplicationInsights.Extensibility.Implementation; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using DTCore = DurableTask.Core; -using P = Microsoft.DurableTask.Protobuf; - -namespace Microsoft.Azure.WebJobs.Extensions.DurableTask -{ - internal class LocalGrpcListener : IHostedService - { - private const int DefaultPort = 4001; - - // Pick a large, fixed range of ports that are going to be valid in all environment. - // Avoiding ports below 1024 as those are blocked by app service sandbox. - // Ephemeral ports for most OS start well above 32768. See https://www.ncftp.com/ncftpd/doc/misc/ephemeral_ports.html - private const int MinPort = 30000; - private const int MaxPort = 31000; - - private readonly DurableTaskExtension extension; - - private readonly Random portGenerator; - private readonly HashSet attemptedPorts; - - private Server? grpcServer; - - public LocalGrpcListener(DurableTaskExtension extension) - { - this.extension = extension ?? throw new ArgumentNullException(nameof(extension)); - - this.portGenerator = new Random(); - this.attemptedPorts = new HashSet(); - } - - public string? ListenAddress { get; private set; } - - public async Task StartAsync(CancellationToken cancelToken = default) - { - const int maxAttempts = 10; - int numAttempts = 1; - while (numAttempts <= maxAttempts) - { - ChannelOption[] options = new[] - { - new ChannelOption(ChannelOptions.MaxReceiveMessageLength, int.MaxValue), - new ChannelOption(ChannelOptions.MaxSendMessageLength, int.MaxValue), - }; - - if (this.grpcServer != null) - { - await this.grpcServer.ShutdownAsync(); - } - - this.grpcServer = new Server(options); - this.grpcServer.Services.Add(P.TaskHubSidecarService.BindService(new TaskHubGrpcServer(this))); - - int listeningPort = numAttempts == 1 ? DefaultPort : this.GetRandomPort(); - int portBindingResult = this.grpcServer.Ports.Add("localhost", listeningPort, ServerCredentials.Insecure); - if (portBindingResult != 0) - { - try - { - this.grpcServer.Start(); - this.ListenAddress = $"http://localhost:{listeningPort}"; - - this.extension.TraceHelper.ExtensionInformationalEvent( - this.extension.Options.HubName, - instanceId: string.Empty, - functionName: string.Empty, - message: $"Opened local gRPC endpoint: {this.ListenAddress}", - writeToUserLogs: true); - - return; - } - catch (IOException) - { - portBindingResult = 0; - } - } - - if (portBindingResult == 0) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: string.Empty, - instanceId: string.Empty, - message: $"Failed to open local port {listeningPort}. This was attempt #{numAttempts} to open a local port."); - this.attemptedPorts.Add(listeningPort); - numAttempts++; - } - } - - throw new IOException($"Unable to find a port to open an RPC endpoint on after {maxAttempts} attempts"); - } - - public async Task StopAsync(CancellationToken cancelToken = default) - { - if (this.grpcServer != null) - { - await this.grpcServer.ShutdownAsync(); - } - } - - private int GetRandomPort() - { - // Get a random port that has not already been attempted so we don't waste time trying - // to listen to a port we know is not free. - int randomPort; - do - { - randomPort = this.portGenerator.Next(MinPort, MaxPort); - } - while (this.attemptedPorts.Contains(randomPort)); - - return randomPort; - } - - private class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase - { - private readonly DurableTaskExtension extension; - - public TaskHubGrpcServer(LocalGrpcListener listener) - { - this.extension = listener.extension; - } - - public override Task Hello(Empty request, ServerCallContext context) - { - return Task.FromResult(new Empty()); - } - - public override Task CreateTaskHub(P.CreateTaskHubRequest request, ServerCallContext context) - { - this.GetDurabilityProvider(context).CreateAsync(request.RecreateIfExists); - return Task.FromResult(new P.CreateTaskHubResponse()); - } - - public override Task DeleteTaskHub(P.DeleteTaskHubRequest request, ServerCallContext context) - { - this.GetDurabilityProvider(context).DeleteAsync(); - return Task.FromResult(new P.DeleteTaskHubResponse()); - } - - public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) - { - try - { - // Create the orchestration instance - var instance = new OrchestrationInstance - { - InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), - ExecutionId = Guid.NewGuid().ToString(), - }; - - // Create the ExecutionStartedEvent - ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input) - { - Name = request.Name, - Version = request.Version, - OrchestrationInstance = instance, - ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), - }; - - // Get the parent trace context from CreateInstanceRequest - string? traceParent = request.ParentTraceContext?.TraceParent; - string? traceState = request.ParentTraceContext?.TraceState; - - // Create a new activity with the parent context - ActivityContext.TryParse(traceParent, traceState, out ActivityContext parentActivityContext); - using Activity? scheduleOrchestrationActivity = StartActivityForNewOrchestration(executionStartedEvent, parentActivityContext); - - // Schedule the orchestration - await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( - new TaskMessage - { - Event = executionStartedEvent, - OrchestrationInstance = instance, - }, - this.GetStatusesNotToOverride()); - - return new P.CreateInstanceResponse - { - InstanceId = instance.InstanceId, - }; - } - catch (OrchestrationAlreadyExistsException) - { - throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); - } - catch (InvalidOperationException ex) when (ex.Message.EndsWith("already exists.")) // for older versions of DTF.AS and DTFx.Netherite - { - throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); - } - catch (Exception ex) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: request.Name, - instanceId: request.InstanceId, - message: $"Failed to start instanceId {request.InstanceId} due to internal exception.\n Exception trace: {ex}."); - throw new RpcException(new Status(StatusCode.Internal, $"Failed to start instance with ID {request.InstanceId}.\nInner Exception message: {ex.Message}.")); - } - } - - private OrchestrationStatus[] GetStatusesNotToOverride() - { - OverridableStates overridableStates = this.extension.Options.OverridableExistingInstanceStates; - return overridableStates.ToDedupeStatuses(); - } - - internal static Activity? StartActivityForNewOrchestration(ExecutionStartedEvent startEvent, ActivityContext parentTraceContext) - { - // Create the Activity Source for the WebJobs extension - ActivitySource activitySource = new ActivitySource("WebJobs.Extensions.DurableTask"); - - // Start the new activity to represent scheduling the orchestration - Activity? newActivity = activitySource.CreateActivity( - name: Schema.SpanNames.CreateOrchestration(startEvent.Name, startEvent.Version), - kind: ActivityKind.Producer, - parentContext: parentTraceContext); - - newActivity?.Start(); - - if (newActivity != null && !string.IsNullOrEmpty(newActivity.Id)) - { - newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration); - newActivity.SetTag(Schema.Task.Name, startEvent.Name); - newActivity.SetTag(Schema.Task.InstanceId, startEvent.OrchestrationInstance.InstanceId); - newActivity.SetTag(Schema.Task.ExecutionId, startEvent.OrchestrationInstance.ExecutionId); - - if (!string.IsNullOrEmpty(startEvent.Version)) - { - newActivity.SetTag(Schema.Task.Version, startEvent.Version); - } - - // Set the parent trace context for the ExecutionStartedEvent - startEvent.ParentTraceContext = new DTCore.Tracing.DistributedTraceContext(newActivity?.Id!, newActivity?.TraceStateString); - } - - return newActivity; - } - - public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) - { - bool throwStatusExceptionsOnRaiseEvent = this.extension.Options.ThrowStatusExceptionsOnRaiseEvent ?? this.extension.DefaultDurabilityProvider.CheckStatusBeforeRaiseEvent; - - try - { - await this.GetClient(context).RaiseEventAsync(request.InstanceId, request.Name, Raw(request.Input)); - } - catch (ArgumentNullException ex) - { - // Indicates a required argument (e.g., instanceId, eventName, or input) is null or empty. - throw new RpcException(new Status(StatusCode.InvalidArgument, ex.Message)); - } - catch (ArgumentException ex) - { - // Indicates the provided instanceId has no orchestration status. - if (throwStatusExceptionsOnRaiseEvent) - { - throw new RpcException(new Status(StatusCode.NotFound, ex.Message)); - } - } - catch (InvalidOperationException) - { - // Indicates the orchestration instance exists but is already in a completed state. - if (throwStatusExceptionsOnRaiseEvent) - { - throw new RpcException(new Status(StatusCode.FailedPrecondition, "The orchestration instance with the provided instance id is not running.")); - } - } - catch (Exception ex) - { - // Any other unexpected exceptions. - throw new RpcException(new Status(StatusCode.Unknown, ex.Message)); - } - - return new P.RaiseEventResponse(); - } - - public async override Task SignalEntity(P.SignalEntityRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal( - new OrchestrationInstance() { InstanceId = request.InstanceId }, - Guid.Parse(request.RequestId), - request.Name, - request.Input, - EntityMessageEvent.GetCappedScheduledTime( - DateTime.UtcNow, - entityOrchestrationService.EntityBackendProperties!.MaximumSignalDelayTime, - request.ScheduledTime?.ToDateTime())); - - await durabilityProvider.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); - - // No fields in the response - return new P.SignalEntityResponse(); - } - - public async override Task GetEntity(P.GetEntityRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - EntityBackendQueries.EntityMetadata? metaData = await entityOrchestrationService.EntityBackendQueries!.GetEntityAsync( - DTCore.Entities.EntityId.FromString(request.InstanceId), - request.IncludeState, - includeStateless: false, - context.CancellationToken); - - return new P.GetEntityResponse() - { - Exists = metaData.HasValue, - Entity = metaData.HasValue ? this.ConvertEntityMetadata(metaData.Value) : default, - }; - } - - public async override Task QueryEntities(P.QueryEntitiesRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - P.EntityQuery query = request.Query; - EntityBackendQueries.EntityQueryResult result = await entityOrchestrationService.EntityBackendQueries!.QueryEntitiesAsync( - new EntityBackendQueries.EntityQuery() - { - InstanceIdStartsWith = query.InstanceIdStartsWith, - LastModifiedFrom = query.LastModifiedFrom?.ToDateTime(), - LastModifiedTo = query.LastModifiedTo?.ToDateTime(), - IncludeTransient = query.IncludeTransient, - IncludeState = query.IncludeState, - ContinuationToken = query.ContinuationToken, - PageSize = query.PageSize, - }, - context.CancellationToken); - - var response = new P.QueryEntitiesResponse() - { - ContinuationToken = result.ContinuationToken, - }; - - foreach (EntityBackendQueries.EntityMetadata entityMetadata in result.Results) - { - response.Entities.Add(this.ConvertEntityMetadata(entityMetadata)); - } - - return response; - } - - public async override Task CleanEntityStorage(P.CleanEntityStorageRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - EntityBackendQueries.CleanEntityStorageResult result = await entityOrchestrationService.EntityBackendQueries!.CleanEntityStorageAsync( - new EntityBackendQueries.CleanEntityStorageRequest() - { - RemoveEmptyEntities = request.RemoveEmptyEntities, - ReleaseOrphanedLocks = request.ReleaseOrphanedLocks, - ContinuationToken = request.ContinuationToken, - }, - context.CancellationToken); - - return new P.CleanEntityStorageResponse() - { - EmptyEntitiesRemoved = result.EmptyEntitiesRemoved, - OrphanedLocksReleased = result.OrphanedLocksReleased, - ContinuationToken = result.ContinuationToken, - }; - } - - public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) - { - await this.GetClient(context).TerminateAsync(request.InstanceId, request.Output); - return new P.TerminateResponse(); - } - - public async override Task SuspendInstance(P.SuspendRequest request, ServerCallContext context) - { - await this.GetClient(context).SuspendAsync(request.InstanceId, request.Reason); - return new P.SuspendResponse(); - } - - public async override Task ResumeInstance(P.ResumeRequest request, ServerCallContext context) - { - await this.GetClient(context).ResumeAsync(request.InstanceId, request.Reason); - return new P.ResumeResponse(); - } - - public async override Task RewindInstance(P.RewindInstanceRequest request, ServerCallContext context) - { -#pragma warning disable CS0618 // Type or member is obsolete - await this.GetClient(context).RewindAsync(request.InstanceId, request.Reason); -#pragma warning restore CS0618 // Type or member is obsolete - return new P.RewindInstanceResponse(); - } - - public async override Task GetInstance(P.GetInstanceRequest request, ServerCallContext context) - { - OrchestrationState state = await this.GetDurabilityProvider(context) - .GetOrchestrationStateAsync(request.InstanceId, executionId: null); - if (state == null) - { - return new P.GetInstanceResponse() { Exists = false }; - } - - return CreateGetInstanceResponse(state, request); - } - - public async override Task QueryInstances(P.QueryInstancesRequest request, ServerCallContext context) - { - var query = ProtobufUtils.ToOrchestrationQuery(request); - var queryClient = (IOrchestrationServiceQueryClient)this.GetDurabilityProvider(context); - OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken); - return ProtobufUtils.CreateQueryInstancesResponse(result, request); - } - - public async override Task PurgeInstances(P.PurgeInstancesRequest request, ServerCallContext context) - { - var purgeClient = (IOrchestrationServicePurgeClient)this.GetDurabilityProvider(context); - - PurgeResult result; - try - { - switch (request.RequestCase) - { - case P.PurgeInstancesRequest.RequestOneofCase.InstanceId: - result = await purgeClient.PurgeInstanceStateAsync(request.InstanceId); - break; - - case P.PurgeInstancesRequest.RequestOneofCase.PurgeInstanceFilter: - var purgeInstanceFilter = ProtobufUtils.ToPurgeInstanceFilter(request); - result = await purgeClient.PurgeInstanceStateAsync(purgeInstanceFilter); - break; - - default: - throw new RpcException(new Status(StatusCode.InvalidArgument, $"Unknown purge request type '{request.RequestCase}'.")); - } - - return ProtobufUtils.CreatePurgeInstancesResponse(result); - } - catch (RpcException) - { - // Rethrow RPC-related exceptions as-is. - throw; - } - catch (Exception ex) - { - // Wrap all other exceptions in an RpcException. - throw new RpcException(new Status(StatusCode.Internal, $"Failed during purging instances: {ex.Message}")); - } - } - - public async override Task WaitForInstanceStart(P.GetInstanceRequest request, ServerCallContext context) - { - int retryCount = 0; - while (true) - { - // Keep fetching the status until we get one of the states we care about - OrchestrationState state = await this.GetDurabilityProvider(context) - .GetOrchestrationStateAsync(request.InstanceId, executionId: null); - if (state != null && state.OrchestrationStatus != OrchestrationStatus.Pending) - { - return CreateGetInstanceResponse(state, request); - } - - // Increase the delay time by 1 second every 10 seconds up to 10 seconds. - // The cancellation token is what will break us out of this loop if the orchestration - // never leaves the "Pending" state. - var delay = TimeSpan.FromSeconds(Math.Min(10, (retryCount / 10) + 1)); - await Task.Delay(delay, context.CancellationToken); - retryCount++; - } - } - - public async override Task WaitForInstanceCompletion(P.GetInstanceRequest request, ServerCallContext context) - { - OrchestrationState state = await this.GetDurabilityProvider(context).WaitForOrchestrationAsync( - request.InstanceId, - executionId: null, - timeout: Timeout.InfiniteTimeSpan, - context.CancellationToken); - - if (state == null) - { - return new P.GetInstanceResponse() { Exists = false }; - } - - return CreateGetInstanceResponse(state, request); - } - -#pragma warning disable CS0618 // Type or member is obsolete -- 'internal' usage. - private static RawInput Raw(string input) - { - return new RawInput(input); - } -#pragma warning restore CS0618 // Type or member is obsolete - - private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, P.GetInstanceRequest request) - { - return new P.GetInstanceResponse - { - Exists = true, - OrchestrationState = new P.OrchestrationState - { - InstanceId = state.OrchestrationInstance.InstanceId, - Name = state.Name, - OrchestrationStatus = (P.OrchestrationStatus)state.OrchestrationStatus, - CreatedTimestamp = Timestamp.FromDateTime(state.CreatedTime), - LastUpdatedTimestamp = Timestamp.FromDateTime(state.LastUpdatedTime), - Input = request.GetInputsAndOutputs ? state.Input : null, - Output = request.GetInputsAndOutputs ? state.Output : null, - CustomStatus = request.GetInputsAndOutputs ? state.Status : null, - FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, - }, - }; - } - - private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = failureDetails.ErrorType, - ErrorMessage = failureDetails.ErrorMessage, - StackTrace = failureDetails.StackTrace, - }; - } - - private DurableClientAttribute GetAttribute(ServerCallContext context) - { - string? taskHub = context.RequestHeaders.GetValue("Durable-TaskHub"); - string? connectionName = context.RequestHeaders.GetValue("Durable-ConnectionName"); - return new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; - } - - private DurabilityProvider GetDurabilityProvider(ServerCallContext context) - { - return this.extension.GetDurabilityProvider(this.GetAttribute(context)); - } - - private IDurableClient GetClient(ServerCallContext context) - { - return this.extension.GetClient(this.GetAttribute(context)); - } - - private void CheckEntitySupport(ServerCallContext context, out DurabilityProvider durabilityProvider, out IEntityOrchestrationService entityOrchestrationService) - { - durabilityProvider = this.GetDurabilityProvider(context); - entityOrchestrationService = durabilityProvider; - if (entityOrchestrationService?.EntityBackendProperties == null) - { - throw new RpcException(new Grpc.Core.Status( - Grpc.Core.StatusCode.Unimplemented, - $"Missing entity support for storage backend '{durabilityProvider.GetBackendInfo()}'. Entity support" + - $" may have not been implemented yet, or the selected package version is too old.")); - } - } - - private P.EntityMetadata ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metaData) - { - return new P.EntityMetadata() - { - InstanceId = metaData.EntityId.ToString(), - LastModifiedTime = metaData.LastModifiedTime.ToTimestamp(), - BacklogQueueSize = metaData.BacklogQueueSize, - LockedBy = metaData.LockedBy, - SerializedState = metaData.SerializedState, - }; - } - } - } -} \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs index a9eb0f4a7..94e01745b 100644 --- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs +++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Net.Http; using DurableTask.AzureStorage.Partitioning; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Grpc; using Microsoft.Azure.WebJobs.Host; using Newtonsoft.Json; using Newtonsoft.Json.Converters; @@ -238,7 +239,7 @@ public string HubName public bool StoreInputsInOrchestrationHistory { get; set; } = false; /// - /// If UseAppLease is true, gets or sets the AppLeaaseOptions used for acquiring the lease to start the application. + /// If UseAppLease is true, gets or sets the AppLeaseOptions used for acquiring the lease to start the application. /// public AppLeaseOptions AppLeaseOptions { get; set; } = AppLeaseOptions.DefaultOptions; @@ -258,6 +259,11 @@ public string HubName /// public TimeSpan? GrpcHttpClientTimeout { get; set; } = TimeSpan.FromSeconds(100); + /// + /// Gets or sets the local gRPC listener mode, controlling what version of gRPC listener is created. + /// + internal LocalGrpcListenerMode GrpcListenerMode { get; set; } + // Used for mocking the lifecycle notification helper. internal HttpMessageHandler NotificationHandler { get; set; } diff --git a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs new file mode 100644 index 000000000..66ddb85ed --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs @@ -0,0 +1,479 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#nullable enable +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; +using DurableTask.Core.Entities; +using DurableTask.Core.Exceptions; +using DurableTask.Core.History; +using DurableTask.Core.Query; +using DurableTask.Core.Serializing.Internal; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; +using DTCore = DurableTask.Core; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + internal class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase + { + private readonly DurableTaskExtension extension; + + public TaskHubGrpcServer(DurableTaskExtension extension) + { + this.extension = extension; + } + + public override Task Hello(Empty request, ServerCallContext context) + { + return Task.FromResult(new Empty()); + } + + public override Task CreateTaskHub(P.CreateTaskHubRequest request, ServerCallContext context) + { + this.GetDurabilityProvider(context).CreateAsync(request.RecreateIfExists); + return Task.FromResult(new P.CreateTaskHubResponse()); + } + + public override Task DeleteTaskHub(P.DeleteTaskHubRequest request, ServerCallContext context) + { + this.GetDurabilityProvider(context).DeleteAsync(); + return Task.FromResult(new P.DeleteTaskHubResponse()); + } + + public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) + { + try + { + // Create the orchestration instance + var instance = new OrchestrationInstance + { + InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), + ExecutionId = Guid.NewGuid().ToString(), + }; + + // Create the ExecutionStartedEvent + ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input) + { + Name = request.Name, + Version = request.Version, + OrchestrationInstance = instance, + ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), + }; + + // Get the parent trace context from CreateInstanceRequest + string? traceParent = request.ParentTraceContext?.TraceParent; + string? traceState = request.ParentTraceContext?.TraceState; + + // Create a new activity with the parent context + ActivityContext.TryParse(traceParent, traceState, out ActivityContext parentActivityContext); + using Activity? scheduleOrchestrationActivity = StartActivityForNewOrchestration(executionStartedEvent, parentActivityContext); + + // Schedule the orchestration + await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( + new TaskMessage + { + Event = executionStartedEvent, + OrchestrationInstance = instance, + }, + this.GetStatusesNotToOverride()); + + return new P.CreateInstanceResponse + { + InstanceId = instance.InstanceId, + }; + } + catch (OrchestrationAlreadyExistsException) + { + throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); + } + catch (InvalidOperationException ex) when (ex.Message.EndsWith("already exists.")) // for older versions of DTF.AS and DTFx.Netherite + { + throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); + } + catch (Exception ex) + { + this.extension.TraceHelper.ExtensionWarningEvent( + this.extension.Options.HubName, + functionName: request.Name, + instanceId: request.InstanceId, + message: $"Failed to start instanceId {request.InstanceId} due to internal exception.\n Exception trace: {ex}."); + throw new RpcException(new Status(StatusCode.Internal, $"Failed to start instance with ID {request.InstanceId}.\nInner Exception message: {ex.Message}.")); + } + } + + private OrchestrationStatus[] GetStatusesNotToOverride() + { + OverridableStates overridableStates = this.extension.Options.OverridableExistingInstanceStates; + return overridableStates.ToDedupeStatuses(); + } + + internal static Activity? StartActivityForNewOrchestration(ExecutionStartedEvent startEvent, ActivityContext parentTraceContext) + { + // Create the Activity Source for the WebJobs extension + ActivitySource activitySource = new ActivitySource("WebJobs.Extensions.DurableTask"); + + // Start the new activity to represent scheduling the orchestration + Activity? newActivity = activitySource.CreateActivity( + name: Schema.SpanNames.CreateOrchestration(startEvent.Name, startEvent.Version), + kind: ActivityKind.Producer, + parentContext: parentTraceContext); + + newActivity?.Start(); + + if (newActivity != null && !string.IsNullOrEmpty(newActivity.Id)) + { + newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration); + newActivity.SetTag(Schema.Task.Name, startEvent.Name); + newActivity.SetTag(Schema.Task.InstanceId, startEvent.OrchestrationInstance.InstanceId); + newActivity.SetTag(Schema.Task.ExecutionId, startEvent.OrchestrationInstance.ExecutionId); + + if (!string.IsNullOrEmpty(startEvent.Version)) + { + newActivity.SetTag(Schema.Task.Version, startEvent.Version); + } + + // Set the parent trace context for the ExecutionStartedEvent + startEvent.ParentTraceContext = new DTCore.Tracing.DistributedTraceContext(newActivity?.Id!, newActivity?.TraceStateString); + } + + return newActivity; + } + + public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) + { + bool throwStatusExceptionsOnRaiseEvent = this.extension.Options.ThrowStatusExceptionsOnRaiseEvent ?? this.extension.DefaultDurabilityProvider.CheckStatusBeforeRaiseEvent; + + try + { + await this.GetClient(context).RaiseEventAsync(request.InstanceId, request.Name, Raw(request.Input)); + } + catch (ArgumentNullException ex) + { + // Indicates a required argument (e.g., instanceId, eventName, or input) is null or empty. + throw new RpcException(new Status(StatusCode.InvalidArgument, ex.Message)); + } + catch (ArgumentException ex) + { + // Indicates the provided instanceId has no orchestration status. + if (throwStatusExceptionsOnRaiseEvent) + { + throw new RpcException(new Status(StatusCode.NotFound, ex.Message)); + } + } + catch (InvalidOperationException) + { + // Indicates the orchestration instance exists but is already in a completed state. + if (throwStatusExceptionsOnRaiseEvent) + { + throw new RpcException(new Status(StatusCode.FailedPrecondition, "The orchestration instance with the provided instance id is not running.")); + } + } + catch (Exception ex) + { + // Any other unexpected exceptions. + throw new RpcException(new Status(StatusCode.Unknown, ex.Message)); + } + + return new P.RaiseEventResponse(); + } + + public async override Task SignalEntity(P.SignalEntityRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal( + new OrchestrationInstance() { InstanceId = request.InstanceId }, + Guid.Parse(request.RequestId), + request.Name, + request.Input, + EntityMessageEvent.GetCappedScheduledTime( + DateTime.UtcNow, + entityOrchestrationService.EntityBackendProperties!.MaximumSignalDelayTime, + request.ScheduledTime?.ToDateTime())); + + await durabilityProvider.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); + + // No fields in the response + return new P.SignalEntityResponse(); + } + + public async override Task GetEntity(P.GetEntityRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + EntityBackendQueries.EntityMetadata? metaData = await entityOrchestrationService.EntityBackendQueries!.GetEntityAsync( + DTCore.Entities.EntityId.FromString(request.InstanceId), + request.IncludeState, + includeStateless: false, + context.CancellationToken); + + return new P.GetEntityResponse() + { + Exists = metaData.HasValue, + Entity = metaData.HasValue ? this.ConvertEntityMetadata(metaData.Value) : default, + }; + } + + public async override Task QueryEntities(P.QueryEntitiesRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + P.EntityQuery query = request.Query; + EntityBackendQueries.EntityQueryResult result = await entityOrchestrationService.EntityBackendQueries!.QueryEntitiesAsync( + new EntityBackendQueries.EntityQuery() + { + InstanceIdStartsWith = query.InstanceIdStartsWith, + LastModifiedFrom = query.LastModifiedFrom?.ToDateTime(), + LastModifiedTo = query.LastModifiedTo?.ToDateTime(), + IncludeTransient = query.IncludeTransient, + IncludeState = query.IncludeState, + ContinuationToken = query.ContinuationToken, + PageSize = query.PageSize, + }, + context.CancellationToken); + + var response = new P.QueryEntitiesResponse() + { + ContinuationToken = result.ContinuationToken, + }; + + foreach (EntityBackendQueries.EntityMetadata entityMetadata in result.Results) + { + response.Entities.Add(this.ConvertEntityMetadata(entityMetadata)); + } + + return response; + } + + public async override Task CleanEntityStorage(P.CleanEntityStorageRequest request, ServerCallContext context) + { + this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); + + EntityBackendQueries.CleanEntityStorageResult result = await entityOrchestrationService.EntityBackendQueries!.CleanEntityStorageAsync( + new EntityBackendQueries.CleanEntityStorageRequest() + { + RemoveEmptyEntities = request.RemoveEmptyEntities, + ReleaseOrphanedLocks = request.ReleaseOrphanedLocks, + ContinuationToken = request.ContinuationToken, + }, + context.CancellationToken); + + return new P.CleanEntityStorageResponse() + { + EmptyEntitiesRemoved = result.EmptyEntitiesRemoved, + OrphanedLocksReleased = result.OrphanedLocksReleased, + ContinuationToken = result.ContinuationToken, + }; + } + + public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) + { + await this.GetClient(context).TerminateAsync(request.InstanceId, request.Output); + return new P.TerminateResponse(); + } + + public async override Task SuspendInstance(P.SuspendRequest request, ServerCallContext context) + { + await this.GetClient(context).SuspendAsync(request.InstanceId, request.Reason); + return new P.SuspendResponse(); + } + + public async override Task ResumeInstance(P.ResumeRequest request, ServerCallContext context) + { + await this.GetClient(context).ResumeAsync(request.InstanceId, request.Reason); + return new P.ResumeResponse(); + } + + public async override Task RewindInstance(P.RewindInstanceRequest request, ServerCallContext context) + { +#pragma warning disable CS0618 // Type or member is obsolete + await this.GetClient(context).RewindAsync(request.InstanceId, request.Reason); +#pragma warning restore CS0618 // Type or member is obsolete + return new P.RewindInstanceResponse(); + } + + public async override Task GetInstance(P.GetInstanceRequest request, ServerCallContext context) + { + OrchestrationState state = await this.GetDurabilityProvider(context) + .GetOrchestrationStateAsync(request.InstanceId, executionId: null); + if (state == null) + { + return new P.GetInstanceResponse() { Exists = false }; + } + + return CreateGetInstanceResponse(state, request); + } + + public async override Task QueryInstances(P.QueryInstancesRequest request, ServerCallContext context) + { + var query = ProtobufUtils.ToOrchestrationQuery(request); + var queryClient = (IOrchestrationServiceQueryClient)this.GetDurabilityProvider(context); + OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken); + return ProtobufUtils.CreateQueryInstancesResponse(result, request); + } + + public async override Task PurgeInstances(P.PurgeInstancesRequest request, ServerCallContext context) + { + var purgeClient = (IOrchestrationServicePurgeClient)this.GetDurabilityProvider(context); + + PurgeResult result; + try + { + switch (request.RequestCase) + { + case P.PurgeInstancesRequest.RequestOneofCase.InstanceId: + result = await purgeClient.PurgeInstanceStateAsync(request.InstanceId); + break; + + case P.PurgeInstancesRequest.RequestOneofCase.PurgeInstanceFilter: + var purgeInstanceFilter = ProtobufUtils.ToPurgeInstanceFilter(request); + result = await purgeClient.PurgeInstanceStateAsync(purgeInstanceFilter); + break; + + default: + throw new RpcException(new Status(StatusCode.InvalidArgument, $"Unknown purge request type '{request.RequestCase}'.")); + } + + return ProtobufUtils.CreatePurgeInstancesResponse(result); + } + catch (RpcException) + { + // Rethrow RPC-related exceptions as-is. + throw; + } + catch (Exception ex) + { + // Wrap all other exceptions in an RpcException. + throw new RpcException(new Status(StatusCode.Internal, $"Failed during purging instances: {ex.Message}")); + } + } + + public async override Task WaitForInstanceStart(P.GetInstanceRequest request, ServerCallContext context) + { + int retryCount = 0; + while (true) + { + // Keep fetching the status until we get one of the states we care about + OrchestrationState state = await this.GetDurabilityProvider(context) + .GetOrchestrationStateAsync(request.InstanceId, executionId: null); + if (state != null && state.OrchestrationStatus != OrchestrationStatus.Pending) + { + return CreateGetInstanceResponse(state, request); + } + + // Increase the delay time by 1 second every 10 seconds up to 10 seconds. + // The cancellation token is what will break us out of this loop if the orchestration + // never leaves the "Pending" state. + var delay = TimeSpan.FromSeconds(Math.Min(10, (retryCount / 10) + 1)); + await Task.Delay(delay, context.CancellationToken); + retryCount++; + } + } + + public async override Task WaitForInstanceCompletion(P.GetInstanceRequest request, ServerCallContext context) + { + OrchestrationState state = await this.GetDurabilityProvider(context).WaitForOrchestrationAsync( + request.InstanceId, + executionId: null, + timeout: Timeout.InfiniteTimeSpan, + context.CancellationToken); + + if (state == null) + { + return new P.GetInstanceResponse() { Exists = false }; + } + + return CreateGetInstanceResponse(state, request); + } + +#pragma warning disable CS0618 // Type or member is obsolete -- 'internal' usage. + private static RawInput Raw(string input) + { + return new RawInput(input); + } +#pragma warning restore CS0618 // Type or member is obsolete + + private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, P.GetInstanceRequest request) + { + return new P.GetInstanceResponse + { + Exists = true, + OrchestrationState = new P.OrchestrationState + { + InstanceId = state.OrchestrationInstance.InstanceId, + Name = state.Name, + OrchestrationStatus = (P.OrchestrationStatus)state.OrchestrationStatus, + CreatedTimestamp = Timestamp.FromDateTime(state.CreatedTime), + LastUpdatedTimestamp = Timestamp.FromDateTime(state.LastUpdatedTime), + Input = request.GetInputsAndOutputs ? state.Input : null, + Output = request.GetInputsAndOutputs ? state.Output : null, + CustomStatus = request.GetInputsAndOutputs ? state.Status : null, + FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, + }, + }; + } + + private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new P.TaskFailureDetails + { + ErrorType = failureDetails.ErrorType, + ErrorMessage = failureDetails.ErrorMessage, + StackTrace = failureDetails.StackTrace, + }; + } + + private DurableClientAttribute GetAttribute(ServerCallContext context) + { + string? taskHub = context.RequestHeaders.GetValue("Durable-TaskHub"); + string? connectionName = context.RequestHeaders.GetValue("Durable-ConnectionName"); + return new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; + } + + private DurabilityProvider GetDurabilityProvider(ServerCallContext context) + { + return this.extension.GetDurabilityProvider(this.GetAttribute(context)); + } + + private IDurableClient GetClient(ServerCallContext context) + { + return this.extension.GetClient(this.GetAttribute(context)); + } + + private void CheckEntitySupport(ServerCallContext context, out DurabilityProvider durabilityProvider, out IEntityOrchestrationService entityOrchestrationService) + { + durabilityProvider = this.GetDurabilityProvider(context); + entityOrchestrationService = durabilityProvider; + if (entityOrchestrationService?.EntityBackendProperties == null) + { + throw new RpcException(new Status( + StatusCode.Unimplemented, + $"Missing entity support for storage backend '{durabilityProvider.GetBackendInfo()}'. Entity support" + + $" may have not been implemented yet, or the selected package version is too old.")); + } + } + + private P.EntityMetadata ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metaData) + { + return new P.EntityMetadata() + { + InstanceId = metaData.EntityId.ToString(), + LastModifiedTime = metaData.LastModifiedTime.ToTimestamp(), + BacklogQueueSize = metaData.BacklogQueueSize, + LockedBy = metaData.LockedBy, + SerializedState = metaData.SerializedState, + }; + } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj index a33c1b82b..5576e071a 100644 --- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj +++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj @@ -28,7 +28,6 @@ $(MajorVersion).$(MinorVersion).$(PatchVersion)-$(VersionSuffix) - Azure Functions Durable Task Extension @@ -50,7 +49,7 @@ - + From ca022f76c0e0f8d981c1518ac0598de90fd3e5bf Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 2 Jun 2025 11:10:43 -0700 Subject: [PATCH 2/6] run tests on macos --- .github/workflows/E2ETest.yml | 10 +++++----- .github/workflows/smoketest-dotnet-isolated-v4.yml | 10 +++++----- .github/workflows/smoketest-java8-v4.yml | 4 ++-- .github/workflows/smoketest-mssql-inproc-v4.yml | 2 +- .github/workflows/smoketest-netherite-inproc-v4.yml | 2 +- .github/workflows/smoketest-node20-v4.yml | 2 +- .github/workflows/smoketest-python37-v4.yml | 2 +- .github/workflows/validate-build-analyzer.yml | 2 +- .github/workflows/validate-build-e2e.yml | 2 +- .github/workflows/validate-build.yml | 2 +- 10 files changed, 19 insertions(+), 19 deletions(-) diff --git a/.github/workflows/E2ETest.yml b/.github/workflows/E2ETest.yml index 2d163ba02..19d24172c 100644 --- a/.github/workflows/E2ETest.yml +++ b/.github/workflows/E2ETest.yml @@ -49,8 +49,8 @@ jobs: working-directory: test/e2e/Tests run: dotnet test --filter AzureStorage!=Skip - e2e-azurestorage-windows: - runs-on: windows-latest + e2e-azurestorage-macos: + runs-on: macos-latest env: E2E_TEST_DURABLE_BACKEND: 'AzureStorage' steps: @@ -74,7 +74,7 @@ jobs: - name: Setup E2E tests shell: pwsh run: | - .\test\e2e\Tests\build-e2e-test.ps1 + ./test/e2e/Tests/build-e2e-test.ps1 - name: Build working-directory: test/e2e/Tests @@ -108,7 +108,7 @@ jobs: - name: Setup E2E tests shell: pwsh run: | - .\test\e2e\Tests\build-e2e-test.ps1 -StartMSSqlContainer + ./test/e2e/Tests/build-e2e-test.ps1 -StartMSSqlContainer - name: Build working-directory: test/e2e/Tests @@ -138,7 +138,7 @@ jobs: - name: Setup E2E tests shell: pwsh run: | - .\test\e2e\Tests\build-e2e-test.ps1 -StartDTSContainer + ./test/e2e/Tests/build-e2e-test.ps1 -StartDTSContainer - name: Build working-directory: test/e2e/Tests diff --git a/.github/workflows/smoketest-dotnet-isolated-v4.yml b/.github/workflows/smoketest-dotnet-isolated-v4.yml index f1fa0d6f2..de46f9455 100644 --- a/.github/workflows/smoketest-dotnet-isolated-v4.yml +++ b/.github/workflows/smoketest-dotnet-isolated-v4.yml @@ -15,7 +15,7 @@ on: jobs: build: - runs-on: ubuntu-22.04 + runs-on: macos-latest steps: - uses: actions/checkout@v2 @@ -82,24 +82,24 @@ jobs: # ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/StartHelloCitiesTyped - name: Run smoke tests (Hello Cities) - shell: pwsh + shell: bash run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 & cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 & ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/StartHelloCitiesUntyped - name: Run smoke tests (Process Exit) - shell: pwsh + shell: bash run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 & ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/durable_HttpStartProcessExitOrchestrator - name: Run smoke tests (Timeout) - shell: pwsh + shell: bash run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 & cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 & ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/durable_HttpStartTimeoutOrchestrator - name: Run smoke tests (OOM) - shell: pwsh + shell: bash run: azurite --silent --blobPort 10000 --queuePort 10001 --tablePort 10002 & cd ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated && func host start --port 7071 & ./test/SmokeTests/OOProcSmokeTests/DotNetIsolated/run-smoke-tests.ps1 -HttpStartPath api/durable_HttpStartOOMOrchestrator \ No newline at end of file diff --git a/.github/workflows/smoketest-java8-v4.yml b/.github/workflows/smoketest-java8-v4.yml index e118632bb..127979099 100644 --- a/.github/workflows/smoketest-java8-v4.yml +++ b/.github/workflows/smoketest-java8-v4.yml @@ -15,7 +15,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: macos-latest steps: - uses: actions/checkout@v2 - name: Set up JDK 8 @@ -33,5 +33,5 @@ jobs: run: | wget -P test/SmokeTests/OOProcSmokeTests/durableJava/build/azure-functions/durableJava/lib/ "https://repo.maven.apache.org/maven2/com/microsoft/azure/functions/azure-functions-java-library/2.0.1/azure-functions-java-library-2.0.1.jar" --show-progress - name: Run V4 Java 8 Smoke Test + shell: bash run: test/SmokeTests/e2e-test.ps1 -DockerfilePath test/SmokeTests/OOProcSmokeTests/durableJava/Dockerfile -HttpStartPath api/StartOrchestration - shell: pwsh diff --git a/.github/workflows/smoketest-mssql-inproc-v4.yml b/.github/workflows/smoketest-mssql-inproc-v4.yml index e2b25adab..66eb77965 100644 --- a/.github/workflows/smoketest-mssql-inproc-v4.yml +++ b/.github/workflows/smoketest-mssql-inproc-v4.yml @@ -18,7 +18,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: macos-latest env: SA_PASSWORD: ${{ secrets.SA_PASSWORD }} diff --git a/.github/workflows/smoketest-netherite-inproc-v4.yml b/.github/workflows/smoketest-netherite-inproc-v4.yml index c8d62c1d7..f7d51d7ab 100644 --- a/.github/workflows/smoketest-netherite-inproc-v4.yml +++ b/.github/workflows/smoketest-netherite-inproc-v4.yml @@ -15,7 +15,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: macos-latest steps: - uses: actions/checkout@v2 - name: Run V4 .NET in-proc w/ Netherite Smoke Test diff --git a/.github/workflows/smoketest-node20-v4.yml b/.github/workflows/smoketest-node20-v4.yml index ec088fab6..f5fd91824 100644 --- a/.github/workflows/smoketest-node20-v4.yml +++ b/.github/workflows/smoketest-node20-v4.yml @@ -15,7 +15,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: macos-latest steps: - uses: actions/checkout@v2 - name: Run V4 Node 20 Smoke Test diff --git a/.github/workflows/smoketest-python37-v4.yml b/.github/workflows/smoketest-python37-v4.yml index 70b7b5378..1983c4ba5 100644 --- a/.github/workflows/smoketest-python37-v4.yml +++ b/.github/workflows/smoketest-python37-v4.yml @@ -15,7 +15,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: macos-latest steps: - uses: actions/checkout@v2 - name: Run V4 Python 3.7 Smoke Test diff --git a/.github/workflows/validate-build-analyzer.yml b/.github/workflows/validate-build-analyzer.yml index 22afaf09d..3f410cfb6 100644 --- a/.github/workflows/validate-build-analyzer.yml +++ b/.github/workflows/validate-build-analyzer.yml @@ -18,7 +18,7 @@ env: jobs: build: - runs-on: windows-latest + runs-on: macos-latest steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/validate-build-e2e.yml b/.github/workflows/validate-build-e2e.yml index 935a33072..d7e684a09 100644 --- a/.github/workflows/validate-build-e2e.yml +++ b/.github/workflows/validate-build-e2e.yml @@ -18,7 +18,7 @@ env: jobs: build: - runs-on: windows-latest + runs-on: macos-latest steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/validate-build.yml b/.github/workflows/validate-build.yml index de6cf0360..f60d8500f 100644 --- a/.github/workflows/validate-build.yml +++ b/.github/workflows/validate-build.yml @@ -18,7 +18,7 @@ env: jobs: build: - runs-on: windows-latest + runs-on: macos-latest steps: - uses: actions/checkout@v3 From 1d627ad1926493d618c94cd0c3b45a0af0566e97 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 2 Jun 2025 12:05:42 -0700 Subject: [PATCH 3/6] delete old grpclistneer --- .../LocalGrpcListener.cs | 644 ------------------ 1 file changed, 644 deletions(-) delete mode 100644 src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs deleted file mode 100644 index ee4e14578..000000000 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ /dev/null @@ -1,644 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See LICENSE in the project root for license information. -#nullable enable -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using DurableTask.Core; -using DurableTask.Core.Entities; -using DurableTask.Core.Exceptions; -using DurableTask.Core.History; -using DurableTask.Core.Query; -using DurableTask.Core.Serializing.Internal; -using Google.Protobuf.WellKnownTypes; -using Grpc.Core; -using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; -using Microsoft.Extensions.Hosting; -using DTCore = DurableTask.Core; -using P = Microsoft.DurableTask.Protobuf; - -namespace Microsoft.Azure.WebJobs.Extensions.DurableTask -{ - internal class LocalGrpcListener : IHostedService - { - private const int DefaultPort = 4001; - - // Pick a large, fixed range of ports that are going to be valid in all environment. - // Avoiding ports below 1024 as those are blocked by app service sandbox. - // Ephemeral ports for most OS start well above 32768. See https://www.ncftp.com/ncftpd/doc/misc/ephemeral_ports.html - private const int MinPort = 30000; - private const int MaxPort = 31000; - - private readonly DurableTaskExtension extension; - - private readonly Random portGenerator; - private readonly HashSet attemptedPorts; - - private Server? grpcServer; - - public LocalGrpcListener(DurableTaskExtension extension) - { - this.extension = extension ?? throw new ArgumentNullException(nameof(extension)); - - this.portGenerator = new Random(); - this.attemptedPorts = new HashSet(); - } - - public string? ListenAddress { get; private set; } - - public async Task StartAsync(CancellationToken cancelToken = default) - { - const int maxAttempts = 10; - int numAttempts = 1; - while (numAttempts <= maxAttempts) - { - ChannelOption[] options = new[] - { - new ChannelOption(ChannelOptions.MaxReceiveMessageLength, int.MaxValue), - new ChannelOption(ChannelOptions.MaxSendMessageLength, int.MaxValue), - }; - - if (this.grpcServer is not null) - { - try - { - await this.grpcServer.ShutdownAsync(); - } - catch (IOException) - { - // Do nothing, IOException is a known exception type when trying to shutdown a server - // when its port was already in use - } - catch (Exception ex) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: string.Empty, - instanceId: string.Empty, - message: $"Unexpected error when closing gRPC server. Exception details: {ex}"); - } - } - - this.grpcServer = new Server(options); - this.grpcServer.Services.Add(P.TaskHubSidecarService.BindService(new TaskHubGrpcServer(this))); - - // Attempt to get an unused port. Note that while unlikely, it is possible that the port returned by this method - // may be utilized by another process between this call and the gRPC server create below, hence we still need to - // guard against port conflicts. - int listeningPort = this.GetAvailablePort(); - int portBindingResult = this.grpcServer.Ports.Add("localhost", listeningPort, ServerCredentials.Insecure); - if (portBindingResult != 0) - { - try - { - this.grpcServer.Start(); - this.ListenAddress = $"http://localhost:{listeningPort}"; - - this.extension.TraceHelper.ExtensionInformationalEvent( - this.extension.Options.HubName, - instanceId: string.Empty, - functionName: string.Empty, - message: $"Opened local gRPC endpoint: {this.ListenAddress}", - writeToUserLogs: true); - - return; - } - catch (IOException) - { - portBindingResult = 0; - } - } - - if (portBindingResult == 0) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: string.Empty, - instanceId: string.Empty, - message: $"Failed to open local port {listeningPort}. This was attempt #{numAttempts} to open a local port."); - this.attemptedPorts.Add(listeningPort); - numAttempts++; - } - } - - throw new IOException($"Unable to find a port to open an RPC endpoint on after {maxAttempts} attempts"); - } - - public async Task StopAsync(CancellationToken cancelToken = default) - { - if (this.grpcServer != null) - { - await this.grpcServer.ShutdownAsync(); - } - } - - private int GetAvailablePort() - { - // Get an available port for use in the gRPC server. Try 4001 first, then select a random open port - // in the 30000-31000 range. - if (this.IsTcpPortFree(DefaultPort)) - { - return DefaultPort; - } - - int numAttempts = 50; - - int randomPort; - for (int i = 0; i < numAttempts; i++) - { - randomPort = this.portGenerator.Next(MinPort, MaxPort); - if (this.IsTcpPortFree(randomPort)) - { - return randomPort; - } - } - - throw new InvalidOperationException($"Failed to get free port for local gRPC server after {numAttempts} attempts"); - } - - private bool IsTcpPortFree(int port) - { - var listener = new TcpListener(IPAddress.Loopback, port); - try - { - listener.Start(); - return true; - } - catch (SocketException) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: string.Empty, - instanceId: string.Empty, - message: $"Starting Durable gRPC server - Port {port} is already in use."); - return false; - } - finally - { - listener.Stop(); - } - } - - private class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase - { - private readonly DurableTaskExtension extension; - - public TaskHubGrpcServer(LocalGrpcListener listener) - { - this.extension = listener.extension; - } - - public override Task Hello(Empty request, ServerCallContext context) - { - return Task.FromResult(new Empty()); - } - - public override Task CreateTaskHub(P.CreateTaskHubRequest request, ServerCallContext context) - { - this.GetDurabilityProvider(context).CreateAsync(request.RecreateIfExists); - return Task.FromResult(new P.CreateTaskHubResponse()); - } - - public override Task DeleteTaskHub(P.DeleteTaskHubRequest request, ServerCallContext context) - { - this.GetDurabilityProvider(context).DeleteAsync(); - return Task.FromResult(new P.DeleteTaskHubResponse()); - } - - public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) - { - try - { - // Create the orchestration instance - var instance = new OrchestrationInstance - { - InstanceId = request.InstanceId ?? Guid.NewGuid().ToString("N"), - ExecutionId = Guid.NewGuid().ToString(), - }; - - // Create the ExecutionStartedEvent - ExecutionStartedEvent executionStartedEvent = new ExecutionStartedEvent(-1, request.Input) - { - Name = request.Name, - Version = request.Version, - OrchestrationInstance = instance, - ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), - }; - - // Get the parent trace context from CreateInstanceRequest - string? traceParent = request.ParentTraceContext?.TraceParent; - string? traceState = request.ParentTraceContext?.TraceState; - - // Create a new activity with the parent context - ActivityContext.TryParse(traceParent, traceState, out ActivityContext parentActivityContext); - using Activity? scheduleOrchestrationActivity = StartActivityForNewOrchestration(executionStartedEvent, parentActivityContext); - - // Schedule the orchestration - await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( - new TaskMessage - { - Event = executionStartedEvent, - OrchestrationInstance = instance, - }, - this.GetStatusesNotToOverride()); - - return new P.CreateInstanceResponse - { - InstanceId = instance.InstanceId, - }; - } - catch (OrchestrationAlreadyExistsException) - { - throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); - } - catch (InvalidOperationException ex) when (ex.Message.EndsWith("already exists.")) // for older versions of DTF.AS and DTFx.Netherite - { - throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); - } - catch (Exception ex) - { - this.extension.TraceHelper.ExtensionWarningEvent( - this.extension.Options.HubName, - functionName: request.Name, - instanceId: request.InstanceId, - message: $"Failed to start instanceId {request.InstanceId} due to internal exception.\n Exception trace: {ex}."); - throw new RpcException(new Status(StatusCode.Internal, $"Failed to start instance with ID {request.InstanceId}.\nInner Exception message: {ex.Message}.")); - } - } - - private OrchestrationStatus[] GetStatusesNotToOverride() - { - OverridableStates overridableStates = this.extension.Options.OverridableExistingInstanceStates; - return overridableStates.ToDedupeStatuses(); - } - - internal static Activity? StartActivityForNewOrchestration(ExecutionStartedEvent startEvent, ActivityContext parentTraceContext) - { - // Create the Activity Source for the WebJobs extension - ActivitySource activitySource = new ActivitySource("WebJobs.Extensions.DurableTask"); - - // Start the new activity to represent scheduling the orchestration - Activity? newActivity = activitySource.CreateActivity( - name: Schema.SpanNames.CreateOrchestration(startEvent.Name, startEvent.Version), - kind: ActivityKind.Producer, - parentContext: parentTraceContext); - - newActivity?.Start(); - - if (newActivity != null && !string.IsNullOrEmpty(newActivity.Id)) - { - newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration); - newActivity.SetTag(Schema.Task.Name, startEvent.Name); - newActivity.SetTag(Schema.Task.InstanceId, startEvent.OrchestrationInstance.InstanceId); - newActivity.SetTag(Schema.Task.ExecutionId, startEvent.OrchestrationInstance.ExecutionId); - - if (!string.IsNullOrEmpty(startEvent.Version)) - { - newActivity.SetTag(Schema.Task.Version, startEvent.Version); - } - - // Set the parent trace context for the ExecutionStartedEvent - startEvent.ParentTraceContext = new DTCore.Tracing.DistributedTraceContext(newActivity?.Id!, newActivity?.TraceStateString); - } - - return newActivity; - } - - public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) - { - bool throwStatusExceptionsOnRaiseEvent = this.extension.Options.ThrowStatusExceptionsOnRaiseEvent ?? this.extension.DefaultDurabilityProvider.CheckStatusBeforeRaiseEvent; - - try - { - await this.GetClient(context).RaiseEventAsync(request.InstanceId, request.Name, Raw(request.Input)); - } - catch (ArgumentNullException ex) - { - // Indicates a required argument (e.g., instanceId, eventName, or input) is null or empty. - throw new RpcException(new Status(StatusCode.InvalidArgument, ex.Message)); - } - catch (ArgumentException ex) - { - // Indicates the provided instanceId has no orchestration status. - if (throwStatusExceptionsOnRaiseEvent) - { - throw new RpcException(new Status(StatusCode.NotFound, ex.Message)); - } - } - catch (InvalidOperationException) - { - // Indicates the orchestration instance exists but is already in a completed state. - if (throwStatusExceptionsOnRaiseEvent) - { - throw new RpcException(new Status(StatusCode.FailedPrecondition, "The orchestration instance with the provided instance id is not running.")); - } - } - catch (Exception ex) - { - // Any other unexpected exceptions. - throw new RpcException(new Status(StatusCode.Unknown, ex.Message)); - } - - return new P.RaiseEventResponse(); - } - - public async override Task SignalEntity(P.SignalEntityRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal( - new OrchestrationInstance() { InstanceId = request.InstanceId }, - Guid.Parse(request.RequestId), - request.Name, - request.Input, - EntityMessageEvent.GetCappedScheduledTime( - DateTime.UtcNow, - entityOrchestrationService.EntityBackendProperties!.MaximumSignalDelayTime, - request.ScheduledTime?.ToDateTime())); - - await durabilityProvider.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); - - // No fields in the response - return new P.SignalEntityResponse(); - } - - public async override Task GetEntity(P.GetEntityRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - EntityBackendQueries.EntityMetadata? metaData = await entityOrchestrationService.EntityBackendQueries!.GetEntityAsync( - DTCore.Entities.EntityId.FromString(request.InstanceId), - request.IncludeState, - includeStateless: false, - context.CancellationToken); - - return new P.GetEntityResponse() - { - Exists = metaData.HasValue, - Entity = metaData.HasValue ? this.ConvertEntityMetadata(metaData.Value) : default, - }; - } - - public async override Task QueryEntities(P.QueryEntitiesRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - P.EntityQuery query = request.Query; - EntityBackendQueries.EntityQueryResult result = await entityOrchestrationService.EntityBackendQueries!.QueryEntitiesAsync( - new EntityBackendQueries.EntityQuery() - { - InstanceIdStartsWith = query.InstanceIdStartsWith, - LastModifiedFrom = query.LastModifiedFrom?.ToDateTime(), - LastModifiedTo = query.LastModifiedTo?.ToDateTime(), - IncludeTransient = query.IncludeTransient, - IncludeState = query.IncludeState, - ContinuationToken = query.ContinuationToken, - PageSize = query.PageSize, - }, - context.CancellationToken); - - var response = new P.QueryEntitiesResponse() - { - ContinuationToken = result.ContinuationToken, - }; - - foreach (EntityBackendQueries.EntityMetadata entityMetadata in result.Results) - { - response.Entities.Add(this.ConvertEntityMetadata(entityMetadata)); - } - - return response; - } - - public async override Task CleanEntityStorage(P.CleanEntityStorageRequest request, ServerCallContext context) - { - this.CheckEntitySupport(context, out var durabilityProvider, out var entityOrchestrationService); - - EntityBackendQueries.CleanEntityStorageResult result = await entityOrchestrationService.EntityBackendQueries!.CleanEntityStorageAsync( - new EntityBackendQueries.CleanEntityStorageRequest() - { - RemoveEmptyEntities = request.RemoveEmptyEntities, - ReleaseOrphanedLocks = request.ReleaseOrphanedLocks, - ContinuationToken = request.ContinuationToken, - }, - context.CancellationToken); - - return new P.CleanEntityStorageResponse() - { - EmptyEntitiesRemoved = result.EmptyEntitiesRemoved, - OrphanedLocksReleased = result.OrphanedLocksReleased, - ContinuationToken = result.ContinuationToken, - }; - } - - public async override Task TerminateInstance(P.TerminateRequest request, ServerCallContext context) - { - await this.GetClient(context).TerminateAsync(request.InstanceId, request.Output); - return new P.TerminateResponse(); - } - - public async override Task SuspendInstance(P.SuspendRequest request, ServerCallContext context) - { - await this.GetClient(context).SuspendAsync(request.InstanceId, request.Reason); - return new P.SuspendResponse(); - } - - public async override Task ResumeInstance(P.ResumeRequest request, ServerCallContext context) - { - await this.GetClient(context).ResumeAsync(request.InstanceId, request.Reason); - return new P.ResumeResponse(); - } - - public async override Task RewindInstance(P.RewindInstanceRequest request, ServerCallContext context) - { -#pragma warning disable CS0618 // Type or member is obsolete - await this.GetClient(context).RewindAsync(request.InstanceId, request.Reason); -#pragma warning restore CS0618 // Type or member is obsolete - return new P.RewindInstanceResponse(); - } - - public async override Task GetInstance(P.GetInstanceRequest request, ServerCallContext context) - { - OrchestrationState state = await this.GetDurabilityProvider(context) - .GetOrchestrationStateAsync(request.InstanceId, executionId: null); - if (state == null) - { - return new P.GetInstanceResponse() { Exists = false }; - } - - return CreateGetInstanceResponse(state, request); - } - - public async override Task QueryInstances(P.QueryInstancesRequest request, ServerCallContext context) - { - var query = ProtobufUtils.ToOrchestrationQuery(request); - var queryClient = (IOrchestrationServiceQueryClient)this.GetDurabilityProvider(context); - OrchestrationQueryResult result = await queryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken); - return ProtobufUtils.CreateQueryInstancesResponse(result, request); - } - - public async override Task PurgeInstances(P.PurgeInstancesRequest request, ServerCallContext context) - { - var purgeClient = (IOrchestrationServicePurgeClient)this.GetDurabilityProvider(context); - - PurgeResult result; - try - { - switch (request.RequestCase) - { - case P.PurgeInstancesRequest.RequestOneofCase.InstanceId: - result = await purgeClient.PurgeInstanceStateAsync(request.InstanceId); - break; - - case P.PurgeInstancesRequest.RequestOneofCase.PurgeInstanceFilter: - var purgeInstanceFilter = ProtobufUtils.ToPurgeInstanceFilter(request); - result = await purgeClient.PurgeInstanceStateAsync(purgeInstanceFilter); - break; - - default: - throw new RpcException(new Status(StatusCode.InvalidArgument, $"Unknown purge request type '{request.RequestCase}'.")); - } - - return ProtobufUtils.CreatePurgeInstancesResponse(result); - } - catch (RpcException) - { - // Rethrow RPC-related exceptions as-is. - throw; - } - catch (Exception ex) - { - // Wrap all other exceptions in an RpcException. - throw new RpcException(new Status(StatusCode.Internal, $"Failed during purging instances: {ex.Message}")); - } - } - - public async override Task WaitForInstanceStart(P.GetInstanceRequest request, ServerCallContext context) - { - int retryCount = 0; - while (true) - { - // Keep fetching the status until we get one of the states we care about - OrchestrationState state = await this.GetDurabilityProvider(context) - .GetOrchestrationStateAsync(request.InstanceId, executionId: null); - if (state != null && state.OrchestrationStatus != OrchestrationStatus.Pending) - { - return CreateGetInstanceResponse(state, request); - } - - // Increase the delay time by 1 second every 10 seconds up to 10 seconds. - // The cancellation token is what will break us out of this loop if the orchestration - // never leaves the "Pending" state. - var delay = TimeSpan.FromSeconds(Math.Min(10, (retryCount / 10) + 1)); - await Task.Delay(delay, context.CancellationToken); - retryCount++; - } - } - - public async override Task WaitForInstanceCompletion(P.GetInstanceRequest request, ServerCallContext context) - { - OrchestrationState state = await this.GetDurabilityProvider(context).WaitForOrchestrationAsync( - request.InstanceId, - executionId: null, - timeout: Timeout.InfiniteTimeSpan, - context.CancellationToken); - - if (state == null) - { - return new P.GetInstanceResponse() { Exists = false }; - } - - return CreateGetInstanceResponse(state, request); - } - -#pragma warning disable CS0618 // Type or member is obsolete -- 'internal' usage. - private static RawInput Raw(string input) - { - return new RawInput(input); - } -#pragma warning restore CS0618 // Type or member is obsolete - - private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, P.GetInstanceRequest request) - { - return new P.GetInstanceResponse - { - Exists = true, - OrchestrationState = new P.OrchestrationState - { - InstanceId = state.OrchestrationInstance.InstanceId, - Name = state.Name, - OrchestrationStatus = (P.OrchestrationStatus)state.OrchestrationStatus, - CreatedTimestamp = Timestamp.FromDateTime(state.CreatedTime), - LastUpdatedTimestamp = Timestamp.FromDateTime(state.LastUpdatedTime), - Input = request.GetInputsAndOutputs ? state.Input : null, - Output = request.GetInputsAndOutputs ? state.Output : null, - CustomStatus = request.GetInputsAndOutputs ? state.Status : null, - FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, - }, - }; - } - - private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = failureDetails.ErrorType, - ErrorMessage = failureDetails.ErrorMessage, - StackTrace = failureDetails.StackTrace, - }; - } - - private DurableClientAttribute GetAttribute(ServerCallContext context) - { - string? taskHub = context.RequestHeaders.GetValue("Durable-TaskHub"); - string? connectionName = context.RequestHeaders.GetValue("Durable-ConnectionName"); - return new DurableClientAttribute() { TaskHub = taskHub, ConnectionName = connectionName }; - } - - private DurabilityProvider GetDurabilityProvider(ServerCallContext context) - { - return this.extension.GetDurabilityProvider(this.GetAttribute(context)); - } - - private IDurableClient GetClient(ServerCallContext context) - { - return this.extension.GetClient(this.GetAttribute(context)); - } - - private void CheckEntitySupport(ServerCallContext context, out DurabilityProvider durabilityProvider, out IEntityOrchestrationService entityOrchestrationService) - { - durabilityProvider = this.GetDurabilityProvider(context); - entityOrchestrationService = durabilityProvider; - if (entityOrchestrationService?.EntityBackendProperties == null) - { - throw new RpcException(new Grpc.Core.Status( - Grpc.Core.StatusCode.Unimplemented, - $"Missing entity support for storage backend '{durabilityProvider.GetBackendInfo()}'. Entity support" + - $" may have not been implemented yet, or the selected package version is too old.")); - } - } - - private P.EntityMetadata ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metaData) - { - return new P.EntityMetadata() - { - InstanceId = metaData.EntityId.ToString(), - LastModifiedTime = metaData.LastModifiedTime.ToTimestamp(), - BacklogQueueSize = metaData.BacklogQueueSize, - LockedBy = metaData.LockedBy, - SerializedState = metaData.SerializedState, - }; - } - } - } -} \ No newline at end of file From c210beaf6812e245fe031f44f21ea1521a5aa1ff Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 2 Jun 2025 14:30:08 -0700 Subject: [PATCH 4/6] update yml files to install docker --- .github/workflows/smoketest-dotnet-isolated-v4.yml | 10 ---------- .github/workflows/smoketest-java8-v4.yml | 12 ++++++++++++ .github/workflows/smoketest-mssql-inproc-v4.yml | 11 +++++++++++ .github/workflows/smoketest-netherite-inproc-v4.yml | 12 ++++++++++++ .github/workflows/smoketest-node20-v4.yml | 12 ++++++++++++ .github/workflows/smoketest-python37-v4.yml | 12 ++++++++++++ .github/workflows/validate-build-analyzer.yml | 9 ++------- .github/workflows/validate-build-e2e.yml | 9 ++------- .github/workflows/validate-build.yml | 9 ++------- 9 files changed, 65 insertions(+), 31 deletions(-) diff --git a/.github/workflows/smoketest-dotnet-isolated-v4.yml b/.github/workflows/smoketest-dotnet-isolated-v4.yml index de46f9455..6ccbba237 100644 --- a/.github/workflows/smoketest-dotnet-isolated-v4.yml +++ b/.github/workflows/smoketest-dotnet-isolated-v4.yml @@ -20,16 +20,6 @@ jobs: - uses: actions/checkout@v2 # Install .NET versions - - name: Set up .NET Core 3.1 - uses: actions/setup-dotnet@v3 - with: - dotnet-version: '3.1.x' - - - name: Set up .NET Core 2.1 - uses: actions/setup-dotnet@v3 - with: - dotnet-version: '2.1.x' - - name: Set up .NET Core 6.x uses: actions/setup-dotnet@v3 with: diff --git a/.github/workflows/smoketest-java8-v4.yml b/.github/workflows/smoketest-java8-v4.yml index 127979099..8232e62c0 100644 --- a/.github/workflows/smoketest-java8-v4.yml +++ b/.github/workflows/smoketest-java8-v4.yml @@ -18,6 +18,18 @@ jobs: runs-on: macos-latest steps: - uses: actions/checkout@v2 + + - name: Install Docker + run: | + brew install --cask docker + sudo xattr -d com.apple.quarantine /Applications/Docker.app + open -a Docker + # Wait for Docker to start + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker to start..." + sleep 5 + done + - name: Set up JDK 8 uses: actions/setup-java@v2 with: diff --git a/.github/workflows/smoketest-mssql-inproc-v4.yml b/.github/workflows/smoketest-mssql-inproc-v4.yml index 66eb77965..884ad107e 100644 --- a/.github/workflows/smoketest-mssql-inproc-v4.yml +++ b/.github/workflows/smoketest-mssql-inproc-v4.yml @@ -25,6 +25,17 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Install Docker + run: | + brew install --cask docker + sudo xattr -d com.apple.quarantine /Applications/Docker.app + open -a Docker + # Wait for Docker to start + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker to start..." + sleep 5 + done + - name: Run V4 .NET in-proc w/ MSSQL Smoke Test run: test/SmokeTests/e2e-test.ps1 -DockerfilePath test/SmokeTests/BackendSmokeTests/MSSQL/Dockerfile -HttpStartPath api/DurableFunctionsHttpStart -ContainerName MSSQLApp -SetupSQLServer shell: pwsh diff --git a/.github/workflows/smoketest-netherite-inproc-v4.yml b/.github/workflows/smoketest-netherite-inproc-v4.yml index f7d51d7ab..7a30204db 100644 --- a/.github/workflows/smoketest-netherite-inproc-v4.yml +++ b/.github/workflows/smoketest-netherite-inproc-v4.yml @@ -18,6 +18,18 @@ jobs: runs-on: macos-latest steps: - uses: actions/checkout@v2 + + - name: Install Docker + run: | + brew install --cask docker + sudo xattr -d com.apple.quarantine /Applications/Docker.app + open -a Docker + # Wait for Docker to start + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker to start..." + sleep 5 + done + - name: Run V4 .NET in-proc w/ Netherite Smoke Test run: test/SmokeTests/e2e-test.ps1 -DockerfilePath test/SmokeTests/BackendSmokeTests/Netherite/Dockerfile -HttpStartPath api/DurableFunctionsHttpStart -ContainerName NetheriteApp shell: pwsh diff --git a/.github/workflows/smoketest-node20-v4.yml b/.github/workflows/smoketest-node20-v4.yml index f5fd91824..d2baa56e1 100644 --- a/.github/workflows/smoketest-node20-v4.yml +++ b/.github/workflows/smoketest-node20-v4.yml @@ -18,6 +18,18 @@ jobs: runs-on: macos-latest steps: - uses: actions/checkout@v2 + + - name: Install Docker + run: | + brew install --cask docker + sudo xattr -d com.apple.quarantine /Applications/Docker.app + open -a Docker + # Wait for Docker to start + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker to start..." + sleep 5 + done + - name: Run V4 Node 20 Smoke Test run: test/SmokeTests/e2e-test.ps1 -DockerfilePath test/SmokeTests/OOProcSmokeTests/durableJS/Dockerfile -HttpStartPath api/DurableFunctionsHttpStart shell: pwsh diff --git a/.github/workflows/smoketest-python37-v4.yml b/.github/workflows/smoketest-python37-v4.yml index 1983c4ba5..6a303d68c 100644 --- a/.github/workflows/smoketest-python37-v4.yml +++ b/.github/workflows/smoketest-python37-v4.yml @@ -18,6 +18,18 @@ jobs: runs-on: macos-latest steps: - uses: actions/checkout@v2 + + - name: Install Docker + run: | + brew install --cask docker + sudo xattr -d com.apple.quarantine /Applications/Docker.app + open -a Docker + # Wait for Docker to start + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker to start..." + sleep 5 + done + - name: Run V4 Python 3.7 Smoke Test run: test/SmokeTests/e2e-test.ps1 -DockerfilePath test/SmokeTests/OOProcSmokeTests/durablePy/Dockerfile -HttpStartPath api/DurableFunctionsHttpStart -ContainerName pyApp shell: pwsh diff --git a/.github/workflows/validate-build-analyzer.yml b/.github/workflows/validate-build-analyzer.yml index 3f410cfb6..0996f6d7b 100644 --- a/.github/workflows/validate-build-analyzer.yml +++ b/.github/workflows/validate-build-analyzer.yml @@ -28,15 +28,10 @@ jobs: - name: Setup .NET uses: actions/setup-dotnet@v3 - - name: Set up .NET Core 3.1 + - name: Set up .NET 6 uses: actions/setup-dotnet@v3 with: - dotnet-version: '3.1.x' - - - name: Set up .NET Core 2.1 - uses: actions/setup-dotnet@v3 - with: - dotnet-version: '2.1.x' + dotnet-version: '6.0.x' - name: Restore dependencies run: dotnet restore $solution diff --git a/.github/workflows/validate-build-e2e.yml b/.github/workflows/validate-build-e2e.yml index d7e684a09..e222c24aa 100644 --- a/.github/workflows/validate-build-e2e.yml +++ b/.github/workflows/validate-build-e2e.yml @@ -28,15 +28,10 @@ jobs: - name: Setup .NET uses: actions/setup-dotnet@v3 - - name: Set up .NET Core 3.1 + - name: Set up .NET 6 uses: actions/setup-dotnet@v3 with: - dotnet-version: '3.1.x' - - - name: Set up .NET Core 2.1 - uses: actions/setup-dotnet@v3 - with: - dotnet-version: '2.1.x' + dotnet-version: '6.0.x' - name: Restore dependencies run: dotnet restore $solution diff --git a/.github/workflows/validate-build.yml b/.github/workflows/validate-build.yml index f60d8500f..bbb1216fd 100644 --- a/.github/workflows/validate-build.yml +++ b/.github/workflows/validate-build.yml @@ -28,15 +28,10 @@ jobs: - name: Setup .NET uses: actions/setup-dotnet@v3 - - name: Set up .NET Core 3.1 + - name: Set up .NET 6 uses: actions/setup-dotnet@v3 with: - dotnet-version: '3.1.x' - - - name: Set up .NET Core 2.1 - uses: actions/setup-dotnet@v3 - with: - dotnet-version: '2.1.x' + dotnet-version: '6.0.x' - name: Restore dependencies run: dotnet restore $solution From d10da6e5051111db9b745ea5829fef78389ccbb8 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 2 Jun 2025 15:10:07 -0700 Subject: [PATCH 5/6] update yml --- .github/workflows/E2ETest.yml | 4 ++-- .github/workflows/smoketest-java8-v4.yml | 4 +++- .github/workflows/smoketest-mssql-inproc-v4.yml | 4 +++- .github/workflows/smoketest-netherite-inproc-v4.yml | 4 +++- .github/workflows/smoketest-node20-v4.yml | 4 +++- .github/workflows/smoketest-python37-v4.yml | 4 +++- 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/.github/workflows/E2ETest.yml b/.github/workflows/E2ETest.yml index 19d24172c..63f787b33 100644 --- a/.github/workflows/E2ETest.yml +++ b/.github/workflows/E2ETest.yml @@ -85,7 +85,7 @@ jobs: run: dotnet test --filter AzureStorage!=Skip e2e-mssql: - runs-on: ubuntu-latest + runs-on: macos-latest env: E2E_TEST_DURABLE_BACKEND: "MSSQL" steps: @@ -119,7 +119,7 @@ jobs: run: dotnet test --filter MSSQL!=Skip e2e-dts: - runs-on: ubuntu-latest + runs-on: macos-latest env: E2E_TEST_DURABLE_BACKEND: "azureManaged" steps: diff --git a/.github/workflows/smoketest-java8-v4.yml b/.github/workflows/smoketest-java8-v4.yml index 8232e62c0..d2e676580 100644 --- a/.github/workflows/smoketest-java8-v4.yml +++ b/.github/workflows/smoketest-java8-v4.yml @@ -22,7 +22,9 @@ jobs: - name: Install Docker run: | brew install --cask docker - sudo xattr -d com.apple.quarantine /Applications/Docker.app + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi open -a Docker # Wait for Docker to start while ! docker info > /dev/null 2>&1; do diff --git a/.github/workflows/smoketest-mssql-inproc-v4.yml b/.github/workflows/smoketest-mssql-inproc-v4.yml index 884ad107e..7ca35e6c5 100644 --- a/.github/workflows/smoketest-mssql-inproc-v4.yml +++ b/.github/workflows/smoketest-mssql-inproc-v4.yml @@ -28,7 +28,9 @@ jobs: - name: Install Docker run: | brew install --cask docker - sudo xattr -d com.apple.quarantine /Applications/Docker.app + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi open -a Docker # Wait for Docker to start while ! docker info > /dev/null 2>&1; do diff --git a/.github/workflows/smoketest-netherite-inproc-v4.yml b/.github/workflows/smoketest-netherite-inproc-v4.yml index 7a30204db..32c49d502 100644 --- a/.github/workflows/smoketest-netherite-inproc-v4.yml +++ b/.github/workflows/smoketest-netherite-inproc-v4.yml @@ -22,7 +22,9 @@ jobs: - name: Install Docker run: | brew install --cask docker - sudo xattr -d com.apple.quarantine /Applications/Docker.app + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi open -a Docker # Wait for Docker to start while ! docker info > /dev/null 2>&1; do diff --git a/.github/workflows/smoketest-node20-v4.yml b/.github/workflows/smoketest-node20-v4.yml index d2baa56e1..403f04e5e 100644 --- a/.github/workflows/smoketest-node20-v4.yml +++ b/.github/workflows/smoketest-node20-v4.yml @@ -22,7 +22,9 @@ jobs: - name: Install Docker run: | brew install --cask docker - sudo xattr -d com.apple.quarantine /Applications/Docker.app + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi open -a Docker # Wait for Docker to start while ! docker info > /dev/null 2>&1; do diff --git a/.github/workflows/smoketest-python37-v4.yml b/.github/workflows/smoketest-python37-v4.yml index 6a303d68c..b7d638165 100644 --- a/.github/workflows/smoketest-python37-v4.yml +++ b/.github/workflows/smoketest-python37-v4.yml @@ -22,7 +22,9 @@ jobs: - name: Install Docker run: | brew install --cask docker - sudo xattr -d com.apple.quarantine /Applications/Docker.app + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi open -a Docker # Wait for Docker to start while ! docker info > /dev/null 2>&1; do From fe954606e2e6f722853e492c40ef4ac48210bc68 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 2 Jun 2025 15:38:10 -0700 Subject: [PATCH 6/6] update yml --- .github/workflows/E2ETest.yml | 36 +++++++++++++++++++ .github/workflows/smoketest-java8-v4.yml | 12 +++++-- .../workflows/smoketest-mssql-inproc-v4.yml | 11 ++++-- .../smoketest-netherite-inproc-v4.yml | 11 ++++-- .github/workflows/smoketest-node20-v4.yml | 11 ++++-- .github/workflows/smoketest-python37-v4.yml | 11 ++++-- 6 files changed, 77 insertions(+), 15 deletions(-) diff --git a/.github/workflows/E2ETest.yml b/.github/workflows/E2ETest.yml index 63f787b33..869eaad88 100644 --- a/.github/workflows/E2ETest.yml +++ b/.github/workflows/E2ETest.yml @@ -91,6 +91,24 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Install Docker + run: | + brew install --cask docker + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker daemon to start..." + sleep 5 + done + - name: Setup .NET Core uses: actions/setup-dotnet@v3 with: @@ -125,6 +143,24 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Install Docker + run: | + brew install --cask docker + if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then + sudo xattr -d com.apple.quarantine /Applications/Docker.app + fi + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + + while ! docker info > /dev/null 2>&1; do + echo "Waiting for Docker daemon to start..." + sleep 5 + done + - name: Setup .NET Core uses: actions/setup-dotnet@v3 with: diff --git a/.github/workflows/smoketest-java8-v4.yml b/.github/workflows/smoketest-java8-v4.yml index d2e676580..7b0ccb86d 100644 --- a/.github/workflows/smoketest-java8-v4.yml +++ b/.github/workflows/smoketest-java8-v4.yml @@ -25,10 +25,16 @@ jobs: if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then sudo xattr -d com.apple.quarantine /Applications/Docker.app fi - open -a Docker - # Wait for Docker to start + + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + while ! docker info > /dev/null 2>&1; do - echo "Waiting for Docker to start..." + echo "Waiting for Docker daemon to start..." sleep 5 done diff --git a/.github/workflows/smoketest-mssql-inproc-v4.yml b/.github/workflows/smoketest-mssql-inproc-v4.yml index 7ca35e6c5..5d67e57e2 100644 --- a/.github/workflows/smoketest-mssql-inproc-v4.yml +++ b/.github/workflows/smoketest-mssql-inproc-v4.yml @@ -31,10 +31,15 @@ jobs: if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then sudo xattr -d com.apple.quarantine /Applications/Docker.app fi - open -a Docker - # Wait for Docker to start + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + while ! docker info > /dev/null 2>&1; do - echo "Waiting for Docker to start..." + echo "Waiting for Docker daemon to start..." sleep 5 done diff --git a/.github/workflows/smoketest-netherite-inproc-v4.yml b/.github/workflows/smoketest-netherite-inproc-v4.yml index 32c49d502..841f07cc6 100644 --- a/.github/workflows/smoketest-netherite-inproc-v4.yml +++ b/.github/workflows/smoketest-netherite-inproc-v4.yml @@ -25,10 +25,15 @@ jobs: if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then sudo xattr -d com.apple.quarantine /Applications/Docker.app fi - open -a Docker - # Wait for Docker to start + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + while ! docker info > /dev/null 2>&1; do - echo "Waiting for Docker to start..." + echo "Waiting for Docker daemon to start..." sleep 5 done diff --git a/.github/workflows/smoketest-node20-v4.yml b/.github/workflows/smoketest-node20-v4.yml index 403f04e5e..4ed88132e 100644 --- a/.github/workflows/smoketest-node20-v4.yml +++ b/.github/workflows/smoketest-node20-v4.yml @@ -25,10 +25,15 @@ jobs: if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then sudo xattr -d com.apple.quarantine /Applications/Docker.app fi - open -a Docker - # Wait for Docker to start + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + while ! docker info > /dev/null 2>&1; do - echo "Waiting for Docker to start..." + echo "Waiting for Docker daemon to start..." sleep 5 done diff --git a/.github/workflows/smoketest-python37-v4.yml b/.github/workflows/smoketest-python37-v4.yml index b7d638165..8d56ff6f3 100644 --- a/.github/workflows/smoketest-python37-v4.yml +++ b/.github/workflows/smoketest-python37-v4.yml @@ -25,10 +25,15 @@ jobs: if xattr /Applications/Docker.app | grep -q com.apple.quarantine; then sudo xattr -d com.apple.quarantine /Applications/Docker.app fi - open -a Docker - # Wait for Docker to start + until [ -d "/Applications/Docker.app" ]; do + echo "Waiting for Docker.app to be installed..." + sleep 3 + done + + open -a Docker || echo "Warning: Could not open Docker app immediately, continuing..." + while ! docker info > /dev/null 2>&1; do - echo "Waiting for Docker to start..." + echo "Waiting for Docker daemon to start..." sleep 5 done