1
+ using Microsoft . EntityFrameworkCore ;
2
+ using System ;
3
+ using System . Collections . Generic ;
4
+ using System . Data ;
5
+ using System . Linq ;
6
+ using System . Threading . Tasks ;
7
+ using WorkflowCore . Persistence . EntityFramework . Models ;
8
+ using WorkflowCore . Models ;
9
+ using WorkflowCore . Persistence . EntityFramework . Interfaces ;
10
+ using System . Threading ;
11
+ using WorkflowCore . Interface ;
12
+
13
+ namespace WorkflowCore . Persistence . EntityFramework . Services
14
+ {
15
+ public sealed class LargeDataOptimizedEntityFrameworkPersistenceProvider : EntityFrameworkPersistenceProvider , IPersistenceProvider
16
+ {
17
+ private readonly IWorkflowDbContextFactory _contextFactory ;
18
+
19
+ public LargeDataOptimizedEntityFrameworkPersistenceProvider ( IWorkflowDbContextFactory contextFactory , bool canCreateDb , bool canMigrateDb )
20
+ : base ( contextFactory , canCreateDb , canMigrateDb )
21
+ {
22
+ _contextFactory = contextFactory ;
23
+ }
24
+
25
+ /// <inheritdoc/>
26
+ public new async Task < IEnumerable < WorkflowInstance > > GetWorkflowInstances ( WorkflowStatus ? status , string type , DateTime ? createdFrom , DateTime ? createdTo , int skip , int take )
27
+ {
28
+ using ( var db = _contextFactory . Build ( ) )
29
+ {
30
+ IQueryable < PersistedWorkflow > query = db . Set < PersistedWorkflow > ( )
31
+ . Include ( wf => wf . ExecutionPointers )
32
+ . ThenInclude ( ep => ep . ExtensionAttributes )
33
+ . Include ( wf => wf . ExecutionPointers )
34
+ . AsSplitQuery ( )
35
+ . AsQueryable ( ) ;
36
+
37
+ if ( status . HasValue )
38
+ {
39
+ query = query . Where ( x => x . Status == status . Value ) ;
40
+ }
41
+
42
+ if ( ! string . IsNullOrEmpty ( type ) )
43
+ {
44
+ query = query . Where ( x => x . WorkflowDefinitionId == type ) ;
45
+ }
46
+
47
+ if ( createdFrom . HasValue )
48
+ {
49
+ query = query . Where ( x => x . CreateTime >= createdFrom . Value ) ;
50
+ }
51
+
52
+ if ( createdTo . HasValue )
53
+ {
54
+ query = query . Where ( x => x . CreateTime <= createdTo . Value ) ;
55
+ }
56
+
57
+ var rawResult = await query . OrderBy ( x => x . PersistenceId ) . Skip ( skip ) . Take ( take ) . ToListAsync ( ) ;
58
+
59
+ var result = new List < WorkflowInstance > ( rawResult . Count ) ;
60
+
61
+ foreach ( var item in rawResult )
62
+ {
63
+ result . Add ( item . ToWorkflowInstance ( ) ) ;
64
+ }
65
+
66
+ return result ;
67
+ }
68
+ }
69
+
70
+ /// <inheritdoc/>
71
+ public new async Task < WorkflowInstance > GetWorkflowInstance ( string id , CancellationToken cancellationToken = default )
72
+ {
73
+ using ( var db = _contextFactory . Build ( ) )
74
+ {
75
+ var uid = new Guid ( id ) ;
76
+ var raw = await db . Set < PersistedWorkflow > ( )
77
+ . Include ( wf => wf . ExecutionPointers )
78
+ . ThenInclude ( ep => ep . ExtensionAttributes )
79
+ . Include ( wf => wf . ExecutionPointers )
80
+ . AsSplitQuery ( )
81
+ . FirstAsync ( x => x . InstanceId == uid , cancellationToken ) ;
82
+
83
+ return raw ? . ToWorkflowInstance ( ) ;
84
+ }
85
+ }
86
+
87
+ /// <inheritdoc/>
88
+ public new async Task < IEnumerable < WorkflowInstance > > GetWorkflowInstances ( IEnumerable < string > ids , CancellationToken cancellationToken = default )
89
+ {
90
+ if ( ids == null )
91
+ {
92
+ return Array . Empty < WorkflowInstance > ( ) ;
93
+ }
94
+
95
+ using ( var db = _contextFactory . Build ( ) )
96
+ {
97
+ var uids = ids . Select ( i => new Guid ( i ) ) ;
98
+ var raw = db . Set < PersistedWorkflow > ( )
99
+ . Include ( wf => wf . ExecutionPointers )
100
+ . ThenInclude ( ep => ep . ExtensionAttributes )
101
+ . Include ( wf => wf . ExecutionPointers )
102
+ . AsSplitQuery ( )
103
+ . Where ( x => uids . Contains ( x . InstanceId ) ) ;
104
+
105
+ var persistedWorkflows = await raw . ToListAsync ( cancellationToken ) ;
106
+
107
+ return persistedWorkflows . Select ( i => i . ToWorkflowInstance ( ) ) ;
108
+ }
109
+ }
110
+
111
+ /// <inheritdoc/>
112
+ public new async Task PersistWorkflow ( WorkflowInstance workflow , CancellationToken cancellationToken = default )
113
+ {
114
+ using ( var db = _contextFactory . Build ( ) )
115
+ using ( var transaction = await db . Database . BeginTransactionAsync ( IsolationLevel . RepeatableRead , cancellationToken ) )
116
+ {
117
+ var uid = new Guid ( workflow . Id ) ;
118
+ var existingEntity = await db . Set < PersistedWorkflow > ( )
119
+ . Where ( x => x . InstanceId == uid )
120
+ . Include ( wf => wf . ExecutionPointers )
121
+ . ThenInclude ( ep => ep . ExtensionAttributes )
122
+ . Include ( wf => wf . ExecutionPointers )
123
+ . AsSplitQuery ( )
124
+ . AsTracking ( )
125
+ . FirstAsync ( cancellationToken ) ;
126
+
127
+ _ = workflow . ToPersistable ( existingEntity ) ;
128
+
129
+ await db . SaveChangesAsync ( cancellationToken ) ;
130
+
131
+ await transaction . CommitAsync ( cancellationToken ) ;
132
+ }
133
+ }
134
+ }
135
+ }
0 commit comments