Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,33 @@ jobs:
with:
name: oracle-test-results
path: test-results/
Azure-Tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup .NET
uses: actions/setup-dotnet@v1
with:
dotnet-version: |
8.0.x
9.0.x
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore
- name: Azure Tests
run: dotnet test test/WorkflowCore.Tests.Azure --no-build --verbosity detailed --logger "trx;LogFileName=AzureTests.trx" --logger "console;verbosity=detailed" --results-directory ./test-results -p:ParallelizeTestCollections=false
- name: Publish Test Results
uses: dorny/test-reporter@v1
if: success() || failure()
with:
name: Azure Test Results
path: test-results/*.trx
reporter: dotnet-trx
fail-on-error: false
- name: Upload Test Results
uses: actions/upload-artifact@v4
if: always()
with:
name: azure-test-results
path: test-results/
53 changes: 52 additions & 1 deletion docs/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,56 @@ There are several persistence providers available as separate Nuget packages.
* [Sqlite](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Sqlite)
* [Amazon DynamoDB](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.AWS)
* [Cosmos DB](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Azure)
* [Azure Table Storage](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Azure)
* [Redis](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Redis)
* [Oracle](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Oracle)
* [Oracle](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Oracle)

## Implementing a custom persistence provider

To implement a custom persistence provider, create a class that implements `IPersistenceProvider` interface:

```csharp
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
{
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
void EnsureStoreExists();
}
```

The `IPersistenceProvider` interface combines four repository interfaces:

### IWorkflowRepository
Handles workflow instance storage and retrieval:
- `CreateNewWorkflow` - Create and store a new workflow instance
- `PersistWorkflow` - Update an existing workflow instance
- `GetWorkflowInstance` - Retrieve a specific workflow instance
- `GetRunnableInstances` - Get workflow instances ready for execution

### IEventRepository
Manages workflow events:
- `CreateEvent` - Store a new event
- `GetEvent` - Retrieve a specific event
- `GetRunnableEvents` - Get events ready for processing
- `MarkEventProcessed/Unprocessed` - Update event status

### ISubscriptionRepository
Handles event subscriptions:
- `CreateEventSubscription` - Create new event subscription
- `GetSubscriptions` - Query subscriptions for events
- `TerminateSubscription` - Remove a subscription
- `SetSubscriptionToken/ClearSubscriptionToken` - Manage subscription locking

### IScheduledCommandRepository
For future command scheduling (optional):
- `ScheduleCommand` - Schedule a command for future execution
- `ProcessCommands` - Execute scheduled commands
- `SupportsScheduledCommands` - Indicates if provider supports this feature

Once implemented, register your provider:

```csharp
services.AddWorkflow(options =>
{
options.UsePersistence(sp => new MyCustomPersistenceProvider());
});
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class EventTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string EventName { get; set; }
public string EventKey { get; set; }
public string EventData { get; set; }
public DateTime EventTime { get; set; }
public bool IsProcessed { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static EventTableEntity FromInstance(Event instance)
{
return new EventTableEntity
{
PartitionKey = "event",
RowKey = instance.Id,
EventName = instance.EventName,
EventKey = instance.EventKey,
EventTime = instance.EventTime,
IsProcessed = instance.IsProcessed,
EventData = JsonConvert.SerializeObject(instance.EventData, SerializerSettings),
};
}

public static Event ToInstance(EventTableEntity entity)
{
return new Event
{
Id = entity.RowKey,
EventName = entity.EventName,
EventKey = entity.EventKey,
EventTime = entity.EventTime,
IsProcessed = entity.IsProcessed,
EventData = JsonConvert.DeserializeObject(entity.EventData, SerializerSettings),
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class ScheduledCommandTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string CommandName { get; set; }
public string Data { get; set; }
public long ExecuteTime { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static ScheduledCommandTableEntity FromInstance(ScheduledCommand instance)
{
return new ScheduledCommandTableEntity
{
PartitionKey = "command",
RowKey = Guid.NewGuid().ToString(),
CommandName = instance.CommandName,
Data = instance.Data,
ExecuteTime = instance.ExecuteTime,
};
}

public static ScheduledCommand ToInstance(ScheduledCommandTableEntity entity)
{
return new ScheduledCommand
{
CommandName = entity.CommandName,
Data = entity.Data,
ExecuteTime = entity.ExecuteTime,
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class SubscriptionTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string WorkflowId { get; set; }
public int StepId { get; set; }
public string ExecutionPointerId { get; set; }
public string EventName { get; set; }
public string EventKey { get; set; }
public DateTime SubscribeAsOf { get; set; }
public string SubscriptionData { get; set; }
public string ExternalToken { get; set; }
public string ExternalWorkerId { get; set; }
public DateTime? ExternalTokenExpiry { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static SubscriptionTableEntity FromInstance(EventSubscription instance)
{
return new SubscriptionTableEntity
{
PartitionKey = "subscription",
RowKey = instance.Id,
WorkflowId = instance.WorkflowId,
StepId = instance.StepId,
ExecutionPointerId = instance.ExecutionPointerId,
EventName = instance.EventName,
EventKey = instance.EventKey,
SubscribeAsOf = instance.SubscribeAsOf,
ExternalToken = instance.ExternalToken,
ExternalWorkerId = instance.ExternalWorkerId,
ExternalTokenExpiry = instance.ExternalTokenExpiry,
SubscriptionData = JsonConvert.SerializeObject(instance.SubscriptionData, SerializerSettings),
};
}

public static EventSubscription ToInstance(SubscriptionTableEntity entity)
{
return new EventSubscription
{
Id = entity.RowKey,
WorkflowId = entity.WorkflowId,
StepId = entity.StepId,
ExecutionPointerId = entity.ExecutionPointerId,
EventName = entity.EventName,
EventKey = entity.EventKey,
SubscribeAsOf = entity.SubscribeAsOf,
ExternalToken = entity.ExternalToken,
ExternalWorkerId = entity.ExternalWorkerId,
ExternalTokenExpiry = entity.ExternalTokenExpiry,
SubscriptionData = JsonConvert.DeserializeObject(entity.SubscriptionData, SerializerSettings),
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class WorkflowTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string WorkflowDefinitionId { get; set; }
public int Version { get; set; }
public string Description { get; set; }
public string Reference { get; set; }
public string ExecutionPointers { get; set; }
public long? NextExecution { get; set; }
public int Status { get; set; }
public string Data { get; set; }
public DateTime CreateTime { get; set; }
public DateTime? CompleteTime { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static WorkflowTableEntity FromInstance(WorkflowInstance instance)
{
return new WorkflowTableEntity
{
PartitionKey = "workflow",
RowKey = instance.Id,
WorkflowDefinitionId = instance.WorkflowDefinitionId,
Version = instance.Version,
Description = instance.Description,
Reference = instance.Reference,
NextExecution = instance.NextExecution,
Status = (int)instance.Status,
CreateTime = instance.CreateTime,
CompleteTime = instance.CompleteTime,
Data = JsonConvert.SerializeObject(instance.Data, SerializerSettings),
ExecutionPointers = JsonConvert.SerializeObject(instance.ExecutionPointers, SerializerSettings),
};
}

public static WorkflowInstance ToInstance(WorkflowTableEntity entity)
{
return new WorkflowInstance
{
Id = entity.RowKey,
WorkflowDefinitionId = entity.WorkflowDefinitionId,
Version = entity.Version,
Description = entity.Description,
Reference = entity.Reference,
NextExecution = entity.NextExecution,
Status = (WorkflowStatus)entity.Status,
CreateTime = entity.CreateTime,
CompleteTime = entity.CompleteTime,
Data = JsonConvert.DeserializeObject(entity.Data, SerializerSettings),
ExecutionPointers = JsonConvert.DeserializeObject<ExecutionPointerCollection>(entity.ExecutionPointers, SerializerSettings),
};
}
}
}
32 changes: 32 additions & 0 deletions src/providers/WorkflowCore.Providers.Azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Provides Queueing support on [Workflow Core](../../README.md) using Azure Storage queues.
* Provides event hub support on [Workflow Core](../../README.md) backed by Azure Service Bus.
* Provides persistence on [Workflow Core](../../README.md) backed by Azure Cosmos DB.
* Provides persistence on [Workflow Core](../../README.md) backed by Azure Table Storage.

This makes it possible to have a cluster of nodes processing your workflows.

Expand Down Expand Up @@ -33,4 +34,35 @@ services.AddWorkflow(options =>
options.UseAzureServiceBusEventHub("service bus connection string", "topic name", "subscription name");
options.UseCosmosDbPersistence("connection string");
});
```

### Azure Table Storage Persistence

For cost-effective workflow persistence using Azure Table Storage:

```C#
services.AddWorkflow(options =>
{
options.UseAzureTableStoragePersistence("azure storage connection string");
});
```

You can also specify a custom table name prefix:

```C#
services.AddWorkflow(options =>
{
options.UseAzureTableStoragePersistence("azure storage connection string", "MyWorkflows");
});
```

Or use with managed identity:

```C#
services.AddWorkflow(options =>
{
var tableServiceUri = new Uri("https://mystorageaccount.table.core.windows.net");
var credential = new DefaultAzureCredential();
options.UseAzureTableStoragePersistence(tableServiceUri, credential);
});
```
Loading
Loading