Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public static WebApplication Create(string[] args)
.AddCheck<HealthCheck>("authorization_admin_health_check");

builder.ConfigureLibsIntegrations();
builder.ConfigureAppsettings();

if (!builder.Environment.IsDevelopment())
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Diagnostics.CodeAnalysis;
using Altinn.AccessManagement;
using Altinn.AccessMgmt.Core;
using Altinn.AccessMgmt.Core.HostedServices;
using Altinn.AccessMgmt.Persistence.Extensions;
using Altinn.AccessMgmt.PersistenceEF.Audit;
using Altinn.AccessMgmt.PersistenceEF.Contexts;
using Altinn.Authorization.Host.Pipeline.Extensions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Microsoft.FeatureManagement;
Expand Down Expand Up @@ -52,32 +54,33 @@

async Task Init()
{
if (await featureManager.IsEnabledAsync(AccessManagementFeatureFlags.MigrationDbEf))
{
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>().Database;
await db.MigrateAsync();
}
else if (await featureManager.IsEnabledAsync(AccessManagementFeatureFlags.MigrationDb))
{
bool generateBasicData = await featureManager.IsEnabledAsync(AccessManagementFeatureFlags.MigrationDbWithBasicData);
await app.UseAccessMgmtDb(generateBasicData);
}

using var cts = new CancellationTokenSource();
AppDomain.CurrentDomain.ProcessExit += (s, e) =>
{
try
{
cts.Cancel();
}
catch (ObjectDisposedException)
catch (Exception)
{
// Terminated by itself.
}
};

var registerImport = scope.ServiceProvider.GetRequiredService<RegisterHostedService>();
await registerImport.EnsureDbIsIngestWithRegisterData(cts.Token);
if (await featureManager.IsEnabledAsync(AccessManagementFeatureFlags.MigrationDbEf))
{
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>().Database;
await db.MigrateAsync();
}
else if (await featureManager.IsEnabledAsync(AccessManagementFeatureFlags.MigrationDb))
{
bool generateBasicData = await featureManager.IsEnabledAsync(AccessManagementFeatureFlags.MigrationDbWithBasicData);
await app.UseAccessMgmtDb(generateBasicData);
}

if (await featureManager.IsEnabledAsync(AccessMgmtFeatureFlags.PipelineInit))
{
await app.Services.ExecuteInitPipelinesAsync(cts.Token);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ namespace Altinn.AccessMgmt.Core;

public static class AccessMgmtFeatureFlags
{
/// <summary>
/// Ensures that all pipelines has at executed before application starts.
/// </summary>
public const string PipelineInit = "AccessManagement.Core.Pipeline.Init";

/// <summary>
/// Specifies if the register data should streamed from register service to access management database
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
using Altinn.AccessMgmt.Core.HostedServices;
using Altinn.AccessMgmt.Core.HostedServices.Contracts;
using Altinn.AccessMgmt.Core.HostedServices.Services;
using Altinn.AccessMgmt.Core.Pipelines;
using Altinn.AccessMgmt.Core.Services;
using Altinn.AccessMgmt.Core.Services.Contracts;
using Altinn.AccessMgmt.PersistenceEF.Audit;
using Altinn.AccessMgmt.PersistenceEF.Constants;
using Altinn.AccessMgmt.PersistenceEF.Utils;
using Altinn.Authorization.Host.Pipeline.Extensions;
using Altinn.Register.Contracts;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using AMPartyService = Altinn.AccessMgmt.Core.Services.AMPartyService;
Expand All @@ -18,9 +23,9 @@ public static class ServiceCollectionExtensions
{
public static IServiceCollection AddAccessMgmtCore(this IServiceCollection services, IConfiguration configuration)
{
services.AddHostedService<RegisterHostedService>();
// services.AddHostedService<RegisterHostedService>();
services.AddHostedService<AltinnRoleHostedService>();
services.AddScoped<RegisterHostedService>();
// services.AddScoped<RegisterHostedService>();
services.AddScoped<IIngestService, IngestService>();
services.AddScoped<IConnectionService, ConnectionService>();
services.AddScoped<IPartyService, PartyService>();
Expand All @@ -43,9 +48,111 @@ public static IServiceCollection AddAccessMgmtCore(this IServiceCollection servi
}

AddJobs(services);

services.AddResourceRegistryPipeline();
return services;
}

private static void AddResourceRegistryPipeline(this IServiceCollection services)
{
services.AddPipelinesOtel();
services.AddPipelines(descriptor =>
{
descriptor
.WithFeatureFlag(AccessMgmtFeatureFlags.HostedServicesResourceRegistrySync)
.WithGroupName("Resource Registry Import")
.WithRecurring(TimeSpan.FromMinutes(2))
.AddPipeline("Sync Service Owners")
.WithLease("resource_registry_pipeline_service_owners")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.ResourceRegistryImportSystem))
.WithStages()
.AddSource(ResourceRegistryPipelines.ServiceOwnerTasks.Extract)
.AddSegment(ResourceRegistryPipelines.ServiceOwnerTasks.Transform)
.AddSink(ResourceRegistryPipelines.ServiceOwnerTasks.Load)
.Build()
.AddPipeline("Sync Resources")
.WithLease("resource_registry_pipeline_resources")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.ResourceRegistryImportSystem))
.WithStages()
.AddSource(ResourceRegistryPipelines.ResourceTasks.Extract)
.AddSegment(ResourceRegistryPipelines.ResourceTasks.Transform)
.AddSink(ResourceRegistryPipelines.ResourceTasks.Load)
.Build()
.AddPipeline("Sync Package Resources")
.WithLease("resource_registry_pipeline_package_resources")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.ResourceRegistryImportSystem))
.WithStages()
.AddSource(ResourceRegistryPipelines.UpdatedResourceTasks.Extract)
.AddSegment(ResourceRegistryPipelines.PackageResourceTasks.Transform)
.AddSink(ResourceRegistryPipelines.PackageResourceTasks.Load)
.Build()
.AddPipeline("Sync Role Resources")
.WithLease("resource_registry_pipeline_role_resources")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.ResourceRegistryImportSystem))
.WithStages()
.AddSource(ResourceRegistryPipelines.UpdatedResourceTasks.Extract)
.AddSegment(ResourceRegistryPipelines.RoleResourceTasks.Transform)
.AddSink(ResourceRegistryPipelines.RoleResourceTasks.Load)
.Build();
});

services.AddPipelines(descriptor =>
{
descriptor
.WithFeatureFlag(AccessMgmtFeatureFlags.HostedServicesRegisterSync)
.WithGroupName("Register Import")
.WithRecurring(TimeSpan.FromMinutes(2))
.AddPipeline("Sync Persons")
.WithLease("register_pipeline_entity_persons")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.RegisterImportSystem))
.WithStages()
.AddSource(RegisterPipelines.PartyTasks.Extract<Person>)
.AddSegment(RegisterPipelines.PartyTasks.Transform)
.AddSink(RegisterPipelines.PartyTasks.Load)
.Build()
.AddPipeline("Sync Organizations")
.WithLease("register_pipeline_entity_organizations")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.RegisterImportSystem))
.WithStages()
.AddSource(RegisterPipelines.PartyTasks.Extract<Organization>)
.AddSegment(RegisterPipelines.PartyTasks.Transform)
.AddSink(RegisterPipelines.PartyTasks.Load)
.Build()
.AddPipeline("Sync System users")
.WithLease("register_pipeline_entity_systemusers")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.RegisterImportSystem))
.WithStages()
.AddSource(RegisterPipelines.PartyTasks.Extract<SystemUser>)
.AddSegment(RegisterPipelines.PartyTasks.Transform)
.AddSink(RegisterPipelines.PartyTasks.Load)
.Build()
.AddPipeline("Sync Self Identified Users")
.WithLease("register_pipeline_entity_selfidentified")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.RegisterImportSystem))
.WithStages()
.AddSource(RegisterPipelines.PartyTasks.Extract<SelfIdentifiedUser>)
.AddSegment(RegisterPipelines.PartyTasks.Transform)
.AddSink(RegisterPipelines.PartyTasks.Load)
.Build()
.AddPipeline("Sync Enterprise Users")
.WithLease("register_pipeline_entity_enterpriseusers")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.RegisterImportSystem))
.WithStages()
.AddSource(RegisterPipelines.PartyTasks.Extract<EnterpriseUser>)
.AddSegment(RegisterPipelines.PartyTasks.Transform)
.AddSink(RegisterPipelines.PartyTasks.Load)
.Build()
.AddPipeline("Sync Roles")
.WithLease("register_pipeline_roles")
.WithServiceScope(sp => sp.CreateEFScope(SystemEntityConstants.RegisterImportSystem))
.WithStages()
.AddSource(RegisterPipelines.RoleTasks.Extract)
.AddSegment(RegisterPipelines.RoleTasks.Transform)
.AddSink(RegisterPipelines.RoleTasks.Load)
.Build();
});
}

private static void AddJobs(IServiceCollection services)
{
services.AddSingleton<IPartySyncService, PartySyncService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Altinn.AccessMgmt.Core.HostedServices.Leases;
using Altinn.AccessMgmt.Core.HostedServices.Services;
using Altinn.Authorization.Host.Lease;
using Microsoft.AspNetCore.Components.Web;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.FeatureManagement;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Net.Http.Headers;
using Altinn.AccessMgmt.Core.HostedServices.Contracts;
using Altinn.AccessMgmt.Core.HostedServices.Contracts;
using Altinn.AccessMgmt.Core.HostedServices.Leases;
using Altinn.AccessMgmt.PersistenceEF.Audit;
using Altinn.AccessMgmt.PersistenceEF.Constants;
Expand All @@ -18,7 +17,7 @@ namespace Altinn.AccessMgmt.Core.HostedServices.Services;
/// <inheritdoc />
public class PartySyncService : BaseSyncService, IPartySyncService
{
private readonly ILogger<RegisterHostedService> _logger;
private readonly ILogger<PartySyncService> _logger;
private readonly IAltinnRegister _register;
private readonly IServiceProvider _serviceProvider;

Expand All @@ -27,7 +26,7 @@ public class PartySyncService : BaseSyncService, IPartySyncService
/// </summary>
public PartySyncService(
IAltinnRegister register,
ILogger<RegisterHostedService> logger,
ILogger<PartySyncService> logger,
IServiceProvider serviceProvider
)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics;
using Altinn.AccessMgmt.Core.HostedServices.Contracts;
using Altinn.AccessMgmt.Core.HostedServices.Contracts;
using Altinn.AccessMgmt.Core.HostedServices.Leases;
using Altinn.AccessMgmt.PersistenceEF.Audit;
using Altinn.AccessMgmt.PersistenceEF.Constants;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System.Diagnostics;
using Altinn.AccessMgmt.PersistenceEF.Audit;
using Altinn.AccessMgmt.PersistenceEF.Utils;
using Altinn.Authorization.Integration.Platform;
using Altinn.Authorization.ProblemDetails;
using Microsoft.Extensions.DependencyInjection;

namespace Altinn.AccessMgmt.Core.Pipelines;

internal static class PipelineUtils
{
internal static async Task<int> Flush<T>(IServiceScope services, List<T> items, IEnumerable<string> matchColumns, CancellationToken cancellationToken = default)
{
if (items.Count == 0)
{
return 0;
}

var activity = Activity.Current;
var ingest = services.ServiceProvider.GetRequiredService<IIngestService>();
var audit = services.ServiceProvider.GetRequiredService<IAuditAccessor>().AuditValues;

var batchId = Guid.CreateVersion7();
activity?.AddTag("batch_id", batchId);
activity?.AddTag("batch_size", items.Count);

var ingestTemp = await ingest.IngestTempData(items, batchId, cancellationToken);
activity?.AddTag("ingested_temp", ingestTemp);

var flushed = await ingest.MergeTempData<T>(batchId, audit, matchColumns, cancellationToken);
activity?.AddTag("ingested_merge", flushed);

return flushed;
}

internal static void EnsureSuccess<T>(PlatformResponse<PageStream<T>> page)
{
var activity = Activity.Current;
if (page.IsProblem)
{
throw new InvalidOperationException(page.ProblemDetails.Detail);
}

if (page?.Content?.Data is { } data)
{
activity?.AddTag("page_size", data.Count());
}

if (page?.Content?.Links is { } link)
{
activity?.AddTag("page_next_link", link);
}

if (page?.Content?.Stats is { } stats)
{
activity?.AddTag("page_stats_end", stats.PageEnd);
activity?.AddTag("page_stats_start", stats.PageStart);
activity?.AddTag("page_stats_sequence_max", stats.SequenceMax);
}
}
}
Loading
Loading