Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions samples/Foundatio.HostingSample/Jobs/Sample1Job.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,42 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Jobs;
using Foundatio.Resilience;
using Microsoft.Extensions.Logging;

namespace Foundatio.HostingSample;

[Job(Description = "Sample 1 job", Interval = "5s", IterationLimit = 5)]
public class Sample1Job : IJob
{
private readonly IResiliencePolicy _policy;
private readonly ILogger _logger;
private int _iterationCount = 0;

public Sample1Job(ILoggerFactory loggerFactory)
public Sample1Job(IResiliencePolicyProvider provider, ILoggerFactory loggerFactory)
{
// get policy for Sample1Job and if not found, try to get policy for IJob, then fallback to default policy
_policy = provider.GetPolicy<Sample1Job, IJob>();
_logger = loggerFactory.CreateLogger<Sample1Job>();
}

public Task<JobResult> RunAsync(CancellationToken cancellationToken = default)
public async Task<JobResult> RunAsync(CancellationToken cancellationToken = default)
{
Interlocked.Increment(ref _iterationCount);
_logger.LogTrace("Sample1Job Run #{IterationCount} Thread={ManagedThreadId}", _iterationCount, Thread.CurrentThread.ManagedThreadId);
return await _policy.ExecuteAsync(async () =>
{
int count = Interlocked.Increment(ref _iterationCount);
_logger.LogTrace("Sample1Job Run #{IterationCount} Thread={ManagedThreadId}", _iterationCount, Thread.CurrentThread.ManagedThreadId);

return Task.FromResult(JobResult.Success);
if (count < 3)
{
_logger.LogInformation("Sample1Job Run #{IterationCount} Thread={ManagedThreadId} - Simulating failure", _iterationCount, Thread.CurrentThread.ManagedThreadId);
throw new InvalidOperationException("Simulated failure");
}

await Task.Delay(5000, cancellationToken);

return JobResult.Success;
}, cancellationToken: cancellationToken);
}
}
49 changes: 24 additions & 25 deletions samples/Foundatio.HostingSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Caching;
using Foundatio;
using Foundatio.Extensions.Hosting.Jobs;
using Foundatio.Extensions.Hosting.Startup;
using Foundatio.HostingSample;
using Foundatio.Resilience;
using Foundatio.Serializer;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
#if REDIS
using System.Text.Json;
using Microsoft.AspNetCore.Http.Json;
using Microsoft.Extensions.Configuration;
using Foundatio.Redis;
using StackExchange.Redis;
#endif

Expand All @@ -25,6 +29,15 @@

var builder = WebApplication.CreateBuilder(args);

// configure Foundatio services
builder.Services.AddFoundatio()
.Storage.UseFolder()
.Caching.UseInMemory()
.Locking.UseCache()
.Messaging.UseInMemory()
.AddSerializer(sp => new SystemTextJsonSerializer(sp.GetRequiredService<IOptions<JsonOptions>>().Value.SerializerOptions))
.AddResilience(b => b.WithPolicy<Sample1Job>(p => p.WithMaxAttempts(5).WithLinearDelay().WithJitter()));

ConfigureServices();

// shutdown the host if no jobs are running, cron jobs are not considered running jobs
Expand Down Expand Up @@ -63,7 +76,7 @@
});

if (sample1)
builder.Services.AddJob("Sample1", sp => new Sample1Job(sp.GetRequiredService<ILoggerFactory>()), o => o.ApplyDefaults<Sample1Job>().WaitForStartupActions().InitialDelay(TimeSpan.FromSeconds(4)));
builder.Services.AddJob("Sample1", sp => new Sample1Job(sp.GetService<IResiliencePolicyProvider>(), sp.GetService<ILoggerFactory>()), o => o.ApplyDefaults<Sample1Job>().WaitForStartupActions().InitialDelay(TimeSpan.FromSeconds(4)));

builder.Services.AddJob<SampleLockJob>(o => o.WaitForStartupActions());

Expand Down Expand Up @@ -196,28 +209,14 @@ void ConfigureServices()
{
var connectionString = builder.Configuration.GetConnectionString("Redis")!;
connectionString += ",abortConnect=false";
return ConnectionMultiplexer.Connect(connectionString, o => o.LoggerFactory = sp.GetRequiredService<ILoggerFactory>());
return ConnectionMultiplexer.Connect(connectionString);
// enable redis logging
//return ConnectionMultiplexer.Connect(connectionString, o => o.LoggerFactory = sp.GetRequiredService<ILoggerFactory>());
});

// distributed cache
builder.Services.AddSingleton<ITextSerializer>(sp => new SystemTextJsonSerializer(sp.GetRequiredService<IOptions<JsonOptions>>().Value.SerializerOptions, sp.GetRequiredService<IOptions<JsonOptions>>().Value.SerializerOptions));
builder.Services.AddSingleton<ISerializer>(sp => sp.GetRequiredService<ITextSerializer>());
builder.Services.AddSingleton<ICacheClient>(sp => new RedisCacheClient(c => c.ConnectionMultiplexer(sp.GetRequiredService<IConnectionMultiplexer>())));

// distributed lock provider
builder.Services.AddSingleton(s => new CacheLockProvider(s.GetRequiredService<ICacheClient>(), s.GetRequiredService<IMessageBus>(), s.GetRequiredService<ILoggerFactory>()));
builder.Services.AddSingleton<ILockProvider>(s => s.GetRequiredService<CacheLockProvider>());

// distributed message bus
builder.Services.AddSingleton<IMessageBus>(s => new RedisMessageBus(new RedisMessageBusOptions
{
Subscriber = s.GetRequiredService<IConnectionMultiplexer>().GetSubscriber(),
Serializer = s.GetRequiredService<ISerializer>(),
LoggerFactory = s.GetRequiredService<ILoggerFactory>()
}));
builder.Services.AddSingleton<IMessagePublisher>(s => s.GetRequiredService<IMessageBus>());
builder.Services.AddSingleton<IMessageSubscriber>(s => s.GetRequiredService<IMessageBus>());
#else
builder.Services.AddSingleton<ICacheClient>(sp => new InMemoryCacheClient(o => o.LoggerFactory(sp.GetService<ILoggerFactory>())));
// distributed cache and messaging using redis (replaces in memory cache)
builder.Services.AddFoundatio()
.Caching.UseRedis()
.Messaging.UseRedis();
#endif
}
55 changes: 32 additions & 23 deletions src/Foundatio.Extensions.Hosting/Startup/StartupExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,35 +94,39 @@ await Task.WhenAll(startupActionGroup.Select(async a =>
return new RunStartupActionsResult { Success = true };
}

public static void AddStartupAction<T>(this IServiceCollection services, int? priority = null) where T : IStartupAction
public static IServiceCollection AddStartupAction<T>(this IServiceCollection services, int? priority = null) where T : IStartupAction
{
services.TryAddSingleton<StartupActionsContext>();
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(RunStartupActionsService)))
services.AddSingleton<IHostedService, RunStartupActionsService>();
services.TryAddTransient(typeof(T));
services.AddTransient(s => new StartupActionRegistration(typeof(T).Name, typeof(T), priority));

return services;
}

public static void AddStartupAction<T>(this IServiceCollection services, string name, int? priority = null) where T : IStartupAction
public static IServiceCollection AddStartupAction<T>(this IServiceCollection services, string name, int? priority = null) where T : IStartupAction
{
services.TryAddSingleton<StartupActionsContext>();
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(RunStartupActionsService)))
services.AddSingleton<IHostedService, RunStartupActionsService>();
services.TryAddTransient(typeof(T));
services.AddTransient(s => new StartupActionRegistration(name, typeof(T), priority));

return services;
}

public static void AddStartupAction(this IServiceCollection services, string name, Action action, int? priority = null)
public static IServiceCollection AddStartupAction(this IServiceCollection services, string name, Action action, int? priority = null)
{
services.AddStartupAction(name, ct => action(), priority);
return services.AddStartupAction(name, ct => action(), priority);
}

public static void AddStartupAction(this IServiceCollection services, string name, Action<IServiceProvider> action, int? priority = null)
public static IServiceCollection AddStartupAction(this IServiceCollection services, string name, Action<IServiceProvider> action, int? priority = null)
{
services.AddStartupAction(name, (sp, ct) => action(sp), priority);
return services.AddStartupAction(name, (sp, ct) => action(sp), priority);
}

public static void AddStartupAction(this IServiceCollection services, string name, Action<IServiceProvider, CancellationToken> action, int? priority = null)
public static IServiceCollection AddStartupAction(this IServiceCollection services, string name, Action<IServiceProvider, CancellationToken> action, int? priority = null)
{
services.TryAddSingleton<StartupActionsContext>();
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(RunStartupActionsService)))
Expand All @@ -132,27 +136,31 @@ public static void AddStartupAction(this IServiceCollection services, string nam
action(sp, ct);
return Task.CompletedTask;
}, priority));

return services;
}

public static void AddStartupAction(this IServiceCollection services, string name, Func<Task> action, int? priority = null)
public static IServiceCollection AddStartupAction(this IServiceCollection services, string name, Func<Task> action, int? priority = null)
{
services.AddStartupAction(name, (sp, ct) => action(), priority);
return services.AddStartupAction(name, (sp, ct) => action(), priority);
}

public static void AddStartupAction(this IServiceCollection services, string name, Func<IServiceProvider, Task> action, int? priority = null)
public static IServiceCollection AddStartupAction(this IServiceCollection services, string name, Func<IServiceProvider, Task> action, int? priority = null)
{
services.AddStartupAction(name, (sp, ct) => action(sp), priority);
return services.AddStartupAction(name, (sp, ct) => action(sp), priority);
}

public static void AddStartupAction(this IServiceCollection services, string name, Func<IServiceProvider, CancellationToken, Task> action, int? priority = null)
public static IServiceCollection AddStartupAction(this IServiceCollection services, string name, Func<IServiceProvider, CancellationToken, Task> action, int? priority = null)
{
services.TryAddSingleton<StartupActionsContext>();
if (!services.Any(s => s.ServiceType == typeof(IHostedService) && s.ImplementationType == typeof(RunStartupActionsService)))
services.AddSingleton<IHostedService, RunStartupActionsService>();
services.AddTransient(s => new StartupActionRegistration(name, action, priority));

return services;
}

public const string CheckForStartupActionsName = "CheckForStartupActions";
private const string CheckForStartupActionsName = "CheckForStartupActions";
public static IHealthChecksBuilder AddCheckForStartupActions(this IHealthChecksBuilder builder, params string[] tags)
{
return builder.AddCheck<StartupActionsHealthCheck>(CheckForStartupActionsName, null, tags);
Expand All @@ -165,36 +173,35 @@ public static IApplicationBuilder UseWaitForStartupActionsBeforeServingRequests(

public static IApplicationBuilder UseHealthChecks(this IApplicationBuilder builder, string path, params string[] tags)
{
if (tags == null)
tags = Array.Empty<string>();
tags ??= [];

return builder.UseHealthChecks(path, new HealthCheckOptions { Predicate = c => c.Tags.Any(t => tags.Contains(t, StringComparer.OrdinalIgnoreCase)) });
}

public static IApplicationBuilder UseReadyHealthChecks(this IApplicationBuilder builder, params string[] tags)
{
if (tags == null)
tags = Array.Empty<string>();
tags ??= [];

var options = new HealthCheckOptions
{
Predicate = c => c.Tags.Any(t => tags.Contains(t, StringComparer.OrdinalIgnoreCase))
};

return builder.UseHealthChecks("/ready", options);
}

public static void AddStartupActionToWaitForHealthChecks(this IServiceCollection services, params string[] tags)
public static IServiceCollection AddStartupActionToWaitForHealthChecks(this IServiceCollection services, params string[] tags)
{
if (tags == null)
tags = Array.Empty<string>();
tags ??= [];

services.AddStartupActionToWaitForHealthChecks(c => c.Tags.Any(t => tags.Contains(t, StringComparer.OrdinalIgnoreCase)));

return services;
}

public static void AddStartupActionToWaitForHealthChecks(this IServiceCollection services, Func<HealthCheckRegistration, bool> shouldWaitForHealthCheck = null)
public static IServiceCollection AddStartupActionToWaitForHealthChecks(this IServiceCollection services, Func<HealthCheckRegistration, bool> shouldWaitForHealthCheck = null)
{
if (shouldWaitForHealthCheck == null)
shouldWaitForHealthCheck = c => c.Tags.Contains("Critical", StringComparer.OrdinalIgnoreCase);
shouldWaitForHealthCheck ??= c => c.Tags.Contains("Critical", StringComparer.OrdinalIgnoreCase);

services.AddStartupAction("WaitForHealthChecks", async (sp, t) =>
{
Expand All @@ -211,5 +218,7 @@ public static void AddStartupActionToWaitForHealthChecks(this IServiceCollection
result = await healthCheckService.CheckHealthAsync(c => c.Name != CheckForStartupActionsName && shouldWaitForHealthCheck(c), t).AnyContext();
}
}, -100);

return services;
}
}
2 changes: 1 addition & 1 deletion src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ await queue.EnqueueAsync(new SampleQueueWorkItem
});
});

var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(Log)), allowedLockCount, TimeSpan.FromDays(1), null, null, Log);
var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(Log)), allowedLockCount, TimeSpan.FromDays(1), null, Log);
var job = new SampleQueueJobWithLocking(queue, lockProvider, null, Log);
await Task.Delay(10);
_logger.LogInformation("Starting RunUntilEmptyAsync");
Expand Down
2 changes: 1 addition & 1 deletion src/Foundatio.TestHarness/Jobs/ThrottledJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ThrottledJob : JobWithLockBase
{
public ThrottledJob(ICacheClient client, ILoggerFactory loggerFactory = null) : base(loggerFactory)
{
_locker = new ThrottlingLockProvider(client, 1, TimeSpan.FromMilliseconds(100), null, null, loggerFactory);
_locker = new ThrottlingLockProvider(client, 1, TimeSpan.FromMilliseconds(100), null, loggerFactory);
}

private readonly ILockProvider _locker;
Expand Down
55 changes: 55 additions & 0 deletions src/Foundatio/Extensions/ServicesExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;

namespace Foundatio.Extensions;

public static class ServicesExtensions
{
/// <summary>
/// Replaces an existing service descriptor with a new singleton service of the specified type.
/// </summary>
/// <param name="services"></param>
/// <typeparam name="TService"></typeparam>
/// <typeparam name="TImplementation"></typeparam>
/// <returns></returns>
public static IServiceCollection ReplaceSingleton<TService, TImplementation>(this IServiceCollection services)
where TService : class
where TImplementation : class, TService
{
// Remove the existing service descriptor
var descriptor = services.FirstOrDefault(d => d.ServiceType == typeof(TService));
if (descriptor != null)
{
services.Remove(descriptor);
}

// Add the new singleton service
services.AddSingleton<TService, TImplementation>();

return services;
}

/// <summary>
/// Replaces an existing service descriptor with a new singleton service of the specified type.
/// </summary>
/// <param name="services"></param>
/// <param name="implementationFactory"></param>
/// <typeparam name="TService"></typeparam>
/// <returns></returns>
public static IServiceCollection ReplaceSingleton<TService>(this IServiceCollection services, Func<IServiceProvider, TService> implementationFactory)
where TService : class
{
// Remove the existing service descriptor
var descriptor = services.FirstOrDefault(d => d.ServiceType == typeof(TService));
if (descriptor != null)
{
services.Remove(descriptor);
}

// Add the new singleton service
services.AddSingleton(implementationFactory);

return services;
}
}
Loading