Skip to content
53 changes: 52 additions & 1 deletion docs/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,56 @@ There are several persistence providers available as separate Nuget packages.
* [Sqlite](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Sqlite)
* [Amazon DynamoDB](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.AWS)
* [Cosmos DB](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Azure)
* [Azure Table Storage](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Azure)
* [Redis](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Redis)
* [Oracle](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Oracle)
* [Oracle](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Oracle)

## Implementing a custom persistence provider

To implement a custom persistence provider, create a class that implements `IPersistenceProvider` interface:

```csharp
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
{
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
void EnsureStoreExists();
}
```

The `IPersistenceProvider` interface combines four repository interfaces:

### IWorkflowRepository
Handles workflow instance storage and retrieval:
- `CreateNewWorkflow` - Create and store a new workflow instance
- `PersistWorkflow` - Update an existing workflow instance
- `GetWorkflowInstance` - Retrieve a specific workflow instance
- `GetRunnableInstances` - Get workflow instances ready for execution

### IEventRepository
Manages workflow events:
- `CreateEvent` - Store a new event
- `GetEvent` - Retrieve a specific event
- `GetRunnableEvents` - Get events ready for processing
- `MarkEventProcessed/Unprocessed` - Update event status

### ISubscriptionRepository
Handles event subscriptions:
- `CreateEventSubscription` - Create new event subscription
- `GetSubscriptions` - Query subscriptions for events
- `TerminateSubscription` - Remove a subscription
- `SetSubscriptionToken/ClearSubscriptionToken` - Manage subscription locking

### IScheduledCommandRepository
For future command scheduling (optional):
- `ScheduleCommand` - Schedule a command for future execution
- `ProcessCommands` - Execute scheduled commands
- `SupportsScheduledCommands` - Indicates if provider supports this feature

Once implemented, register your provider:

```csharp
services.AddWorkflow(options =>
{
options.UsePersistence(sp => new MyCustomPersistenceProvider());
});
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class EventTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string EventName { get; set; }
public string EventKey { get; set; }
public string EventData { get; set; }
public DateTime EventTime { get; set; }
public bool IsProcessed { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static EventTableEntity FromInstance(Event instance)
{
return new EventTableEntity
{
PartitionKey = "event",
RowKey = instance.Id,
EventName = instance.EventName,
EventKey = instance.EventKey,
EventTime = instance.EventTime,
IsProcessed = instance.IsProcessed,
EventData = JsonConvert.SerializeObject(instance.EventData, SerializerSettings),
};
}

public static Event ToInstance(EventTableEntity entity)
{
return new Event
{
Id = entity.RowKey,
EventName = entity.EventName,
EventKey = entity.EventKey,
EventTime = entity.EventTime,
IsProcessed = entity.IsProcessed,
EventData = JsonConvert.DeserializeObject(entity.EventData, SerializerSettings),
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class ScheduledCommandTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string CommandName { get; set; }
public string Data { get; set; }
public long ExecuteTime { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static ScheduledCommandTableEntity FromInstance(ScheduledCommand instance)
{
return new ScheduledCommandTableEntity
{
PartitionKey = "command",
RowKey = Guid.NewGuid().ToString(),
CommandName = instance.CommandName,
Data = instance.Data,
ExecuteTime = instance.ExecuteTime,
};
}

public static ScheduledCommand ToInstance(ScheduledCommandTableEntity entity)
{
return new ScheduledCommand
{
CommandName = entity.CommandName,
Data = entity.Data,
ExecuteTime = entity.ExecuteTime,
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class SubscriptionTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string WorkflowId { get; set; }
public int StepId { get; set; }
public string ExecutionPointerId { get; set; }
public string EventName { get; set; }
public string EventKey { get; set; }
public DateTime SubscribeAsOf { get; set; }
public string SubscriptionData { get; set; }
public string ExternalToken { get; set; }
public string ExternalWorkerId { get; set; }
public DateTime? ExternalTokenExpiry { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static SubscriptionTableEntity FromInstance(EventSubscription instance)
{
return new SubscriptionTableEntity
{
PartitionKey = "subscription",
RowKey = instance.Id,
WorkflowId = instance.WorkflowId,
StepId = instance.StepId,
ExecutionPointerId = instance.ExecutionPointerId,
EventName = instance.EventName,
EventKey = instance.EventKey,
SubscribeAsOf = instance.SubscribeAsOf,
ExternalToken = instance.ExternalToken,
ExternalWorkerId = instance.ExternalWorkerId,
ExternalTokenExpiry = instance.ExternalTokenExpiry,
SubscriptionData = JsonConvert.SerializeObject(instance.SubscriptionData, SerializerSettings),
};
}

public static EventSubscription ToInstance(SubscriptionTableEntity entity)
{
return new EventSubscription
{
Id = entity.RowKey,
WorkflowId = entity.WorkflowId,
StepId = entity.StepId,
ExecutionPointerId = entity.ExecutionPointerId,
EventName = entity.EventName,
EventKey = entity.EventKey,
SubscribeAsOf = entity.SubscribeAsOf,
ExternalToken = entity.ExternalToken,
ExternalWorkerId = entity.ExternalWorkerId,
ExternalTokenExpiry = entity.ExternalTokenExpiry,
SubscriptionData = JsonConvert.DeserializeObject(entity.SubscriptionData, SerializerSettings),
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using WorkflowCore.Models;

namespace WorkflowCore.Providers.Azure.Models
{
public class WorkflowTableEntity : ITableEntity
{
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string WorkflowDefinitionId { get; set; }
public int Version { get; set; }
public string Description { get; set; }
public string Reference { get; set; }
public string ExecutionPointers { get; set; }
public long? NextExecution { get; set; }
public int Status { get; set; }
public string Data { get; set; }
public DateTime CreateTime { get; set; }
public DateTime? CompleteTime { get; set; }

private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };

public static WorkflowTableEntity FromInstance(WorkflowInstance instance)
{
return new WorkflowTableEntity
{
PartitionKey = "workflow",
RowKey = instance.Id,
WorkflowDefinitionId = instance.WorkflowDefinitionId,
Version = instance.Version,
Description = instance.Description,
Reference = instance.Reference,
NextExecution = instance.NextExecution,
Status = (int)instance.Status,
CreateTime = instance.CreateTime,
CompleteTime = instance.CompleteTime,
Data = JsonConvert.SerializeObject(instance.Data, SerializerSettings),
ExecutionPointers = JsonConvert.SerializeObject(instance.ExecutionPointers, SerializerSettings),
};
}

public static WorkflowInstance ToInstance(WorkflowTableEntity entity)
{
return new WorkflowInstance
{
Id = entity.RowKey,
WorkflowDefinitionId = entity.WorkflowDefinitionId,
Version = entity.Version,
Description = entity.Description,
Reference = entity.Reference,
NextExecution = entity.NextExecution,
Status = (WorkflowStatus)entity.Status,
CreateTime = entity.CreateTime,
CompleteTime = entity.CompleteTime,
Data = JsonConvert.DeserializeObject(entity.Data, SerializerSettings),
ExecutionPointers = JsonConvert.DeserializeObject<ExecutionPointerCollection>(entity.ExecutionPointers, SerializerSettings),
};
}
}
}
32 changes: 32 additions & 0 deletions src/providers/WorkflowCore.Providers.Azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Provides Queueing support on [Workflow Core](../../README.md) using Azure Storage queues.
* Provides event hub support on [Workflow Core](../../README.md) backed by Azure Service Bus.
* Provides persistence on [Workflow Core](../../README.md) backed by Azure Cosmos DB.
* Provides persistence on [Workflow Core](../../README.md) backed by Azure Table Storage.

This makes it possible to have a cluster of nodes processing your workflows.

Expand Down Expand Up @@ -33,4 +34,35 @@ services.AddWorkflow(options =>
options.UseAzureServiceBusEventHub("service bus connection string", "topic name", "subscription name");
options.UseCosmosDbPersistence("connection string");
});
```

### Azure Table Storage Persistence

For cost-effective workflow persistence using Azure Table Storage:

```C#
services.AddWorkflow(options =>
{
options.UseAzureTableStoragePersistence("azure storage connection string");
});
```

You can also specify a custom table name prefix:

```C#
services.AddWorkflow(options =>
{
options.UseAzureTableStoragePersistence("azure storage connection string", "MyWorkflows");
});
```

Or use with managed identity:

```C#
services.AddWorkflow(options =>
{
var tableServiceUri = new Uri("https://mystorageaccount.table.core.windows.net");
var credential = new DefaultAzureCredential();
options.UseAzureTableStoragePersistence(tableServiceUri, credential);
});
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Azure.Core;
using Azure.Data.Tables;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
Expand Down Expand Up @@ -106,5 +107,36 @@ public static WorkflowOptions UseCosmosDbPersistence(
options.UsePersistence(sp => new CosmosDbPersistenceProvider(sp.GetService<ICosmosClientFactory>(), databaseId, sp.GetService<ICosmosDbProvisioner>(), cosmosDbStorageOptions));
return options;
}

public static WorkflowOptions UseAzureTableStoragePersistence(
this WorkflowOptions options,
string connectionString,
string tableNamePrefix = "WorkflowCore")
{
options.Services.AddSingleton<TableServiceClient>(sp => new TableServiceClient(connectionString));
options.UsePersistence(sp => new AzureTableStoragePersistenceProvider(sp.GetService<TableServiceClient>(), tableNamePrefix));
return options;
}

public static WorkflowOptions UseAzureTableStoragePersistence(
this WorkflowOptions options,
TableServiceClient tableServiceClient,
string tableNamePrefix = "WorkflowCore")
{
options.Services.AddSingleton(tableServiceClient);
options.UsePersistence(sp => new AzureTableStoragePersistenceProvider(sp.GetService<TableServiceClient>(), tableNamePrefix));
return options;
}

public static WorkflowOptions UseAzureTableStoragePersistence(
this WorkflowOptions options,
Uri serviceUri,
TokenCredential tokenCredential,
string tableNamePrefix = "WorkflowCore")
{
options.Services.AddSingleton<TableServiceClient>(sp => new TableServiceClient(serviceUri, tokenCredential));
options.UsePersistence(sp => new AzureTableStoragePersistenceProvider(sp.GetService<TableServiceClient>(), tableNamePrefix));
return options;
}
}
}
Loading