@@ -10,8 +10,9 @@ namespace WorkflowCore.Services
10
10
{
11
11
public class WorkflowRegistry : IWorkflowRegistry
12
12
{
13
- private readonly IServiceProvider _serviceProvider ;
14
- private readonly BlockingCollection < ( string workflowId , int version , WorkflowDefinition definition ) > _registry = new BlockingCollection < ( string , int , WorkflowDefinition ) > ( ) ;
13
+ private readonly IServiceProvider _serviceProvider ;
14
+ private readonly ConcurrentDictionary < string , WorkflowDefinition > _registry = new ConcurrentDictionary < string , WorkflowDefinition > ( ) ;
15
+ private readonly ConcurrentDictionary < string , WorkflowDefinition > _lastestVersion = new ConcurrentDictionary < string , WorkflowDefinition > ( ) ;
15
16
16
17
public WorkflowRegistry ( IServiceProvider serviceProvider )
17
18
{
@@ -20,75 +21,85 @@ public WorkflowRegistry(IServiceProvider serviceProvider)
20
21
21
22
public WorkflowDefinition GetDefinition ( string workflowId , int ? version = null )
22
23
{
23
- ( string workflowId , int version , WorkflowDefinition definition ) workflowEntry ;
24
24
if ( version . HasValue )
25
25
{
26
- workflowEntry = _registry . FirstOrDefault ( x => x . workflowId == workflowId && x . version == version . Value ) ;
26
+ if ( ! _registry . ContainsKey ( $ "{ workflowId } -{ version } ") )
27
+ return default ;
28
+ return _registry [ $ "{ workflowId } -{ version } "] ;
27
29
}
28
30
else
29
31
{
30
- workflowEntry = _registry . Where ( x => x . workflowId == workflowId ) . OrderByDescending ( x => x . version )
31
- . FirstOrDefault ( ) ;
32
+ if ( ! _lastestVersion . ContainsKey ( workflowId ) )
33
+ return default ;
34
+ return _lastestVersion [ workflowId ] ;
32
35
}
33
-
34
- return workflowEntry != default ? workflowEntry . definition : default ;
35
36
}
36
37
37
38
public void DeregisterWorkflow ( string workflowId , int version )
38
39
{
39
- var definition = _registry . FirstOrDefault ( x => x . workflowId == workflowId && x . version == version ) ;
40
- if ( definition != default )
40
+ if ( ! _registry . ContainsKey ( $ "{ workflowId } -{ version } ") )
41
+ return ;
42
+
43
+ lock ( _registry )
41
44
{
42
- _registry . TryTake ( out definition ) ;
45
+ _registry . TryRemove ( $ "{ workflowId } -{ version } ", out var _ ) ;
46
+ if ( _lastestVersion [ workflowId ] . Version == version )
47
+ {
48
+ _lastestVersion . TryRemove ( workflowId , out var _ ) ;
49
+
50
+ var latest = _registry . Values . Where ( x => x . Id == workflowId ) . OrderByDescending ( x => x . Version ) . FirstOrDefault ( ) ;
51
+ if ( latest != default )
52
+ _lastestVersion [ workflowId ] = latest ;
53
+ }
43
54
}
44
55
}
45
56
46
57
public void RegisterWorkflow ( IWorkflow workflow )
47
58
{
48
- if ( _registry . Any ( x => x . workflowId == workflow . Id && x . version == workflow . Version ) )
49
- {
50
- throw new InvalidOperationException ( $ "Workflow { workflow . Id } version { workflow . Version } is already registered") ;
51
- }
52
-
53
59
var builder = _serviceProvider . GetService < IWorkflowBuilder > ( ) . UseData < object > ( ) ;
54
60
workflow . Build ( builder ) ;
55
61
var def = builder . Build ( workflow . Id , workflow . Version ) ;
56
- _registry . Add ( ( workflow . Id , workflow . Version , def ) ) ;
62
+ RegisterWorkflow ( def ) ;
57
63
}
58
64
59
65
public void RegisterWorkflow ( WorkflowDefinition definition )
60
66
{
61
- if ( _registry . Any ( x => x . workflowId == definition . Id && x . version == definition . Version ) )
67
+ if ( _registry . ContainsKey ( $ " { definition . Id } - { definition . Version } " ) )
62
68
{
63
69
throw new InvalidOperationException ( $ "Workflow { definition . Id } version { definition . Version } is already registered") ;
64
70
}
65
71
66
- _registry . Add ( ( definition . Id , definition . Version , definition ) ) ;
72
+ lock ( _registry )
73
+ {
74
+ _registry [ $ "{ definition . Id } -{ definition . Version } "] = definition ;
75
+ if ( ! _lastestVersion . ContainsKey ( definition . Id ) )
76
+ {
77
+ _lastestVersion [ definition . Id ] = definition ;
78
+ return ;
79
+ }
80
+
81
+ if ( _lastestVersion [ definition . Id ] . Version <= definition . Version )
82
+ _lastestVersion [ definition . Id ] = definition ;
83
+ }
67
84
}
68
85
69
86
public void RegisterWorkflow < TData > ( IWorkflow < TData > workflow )
70
87
where TData : new ( )
71
88
{
72
- if ( _registry . Any ( x => x . workflowId == workflow . Id && x . version == workflow . Version ) )
73
- {
74
- throw new InvalidOperationException ( $ "Workflow { workflow . Id } version { workflow . Version } is already registered") ;
75
- }
76
-
77
89
var builder = _serviceProvider . GetService < IWorkflowBuilder > ( ) . UseData < TData > ( ) ;
78
90
workflow . Build ( builder ) ;
79
91
var def = builder . Build ( workflow . Id , workflow . Version ) ;
80
- _registry . Add ( ( workflow . Id , workflow . Version , def ) ) ;
92
+ RegisterWorkflow ( def ) ;
81
93
}
82
94
83
95
public bool IsRegistered ( string workflowId , int version )
84
96
{
85
- var definition = _registry . FirstOrDefault ( x => x . workflowId == workflowId && x . version == version ) ;
86
- return definition != default ;
97
+ return _registry . ContainsKey ( $ "{ workflowId } -{ version } ") ;
87
98
}
88
99
89
100
public IEnumerable < WorkflowDefinition > GetAllDefinitions ( )
90
101
{
91
- return _registry . Select ( i => i . definition ) ;
102
+ return _registry . Values ;
92
103
}
93
104
}
94
105
}
0 commit comments