Skip to content

Commit 747233c

Browse files
authored
Merge pull request #35 from qazq/service/memory-persistence
Lock each list of memory persistence before using
2 parents 27926fa + 1824d31 commit 747233c

File tree

1 file changed

+88
-43
lines changed

1 file changed

+88
-43
lines changed

src/WorkflowCore/Services/MemoryPersistenceProvider.cs

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,66 +22,90 @@ public class MemoryPersistenceProvider : IPersistenceProvider
2222

2323
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
2424
{
25-
workflow.Id = Guid.NewGuid().ToString();
26-
_instances.Add(workflow);
27-
return workflow.Id;
25+
lock (_instances)
26+
{
27+
workflow.Id = Guid.NewGuid().ToString();
28+
_instances.Add(workflow);
29+
return workflow.Id;
30+
}
2831
}
2932

3033
public async Task PersistWorkflow(WorkflowInstance workflow)
3134
{
32-
var existing = _instances.First(x => x.Id == workflow.Id);
33-
_instances.Remove(existing);
34-
_instances.Add(workflow);
35+
lock (_instances)
36+
{
37+
var existing = _instances.First(x => x.Id == workflow.Id);
38+
_instances.Remove(existing);
39+
_instances.Add(workflow);
40+
}
3541
}
3642

3743
public async Task<IEnumerable<string>> GetRunnableInstances()
3844
{
39-
var now = DateTime.Now.ToUniversalTime().Ticks;
40-
return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id);
45+
lock (_instances)
46+
{
47+
var now = DateTime.Now.ToUniversalTime().Ticks;
48+
return _instances.Where(x => x.NextExecution.HasValue && x.NextExecution <= now).Select(x => x.Id);
49+
}
4150
}
4251

4352
public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
4453
{
45-
return _instances.First(x => x.Id == Id);
54+
lock (_instances)
55+
{
56+
return _instances.First(x => x.Id == Id);
57+
}
4658
}
4759

4860
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
4961
{
50-
var result = _instances.AsQueryable();
62+
lock (_instances)
63+
{
64+
var result = _instances.AsQueryable();
5165

52-
if (status.HasValue)
53-
result = result.Where(x => x.Status == status.Value);
66+
if (status.HasValue)
67+
result = result.Where(x => x.Status == status.Value);
5468

55-
if (!String.IsNullOrEmpty(type))
56-
result = result.Where(x => x.WorkflowDefinitionId == type);
69+
if (!String.IsNullOrEmpty(type))
70+
result = result.Where(x => x.WorkflowDefinitionId == type);
5771

58-
if (createdFrom.HasValue)
59-
result = result.Where(x => x.CreateTime >= createdFrom.Value);
72+
if (createdFrom.HasValue)
73+
result = result.Where(x => x.CreateTime >= createdFrom.Value);
6074

61-
if (createdTo.HasValue)
62-
result = result.Where(x => x.CreateTime <= createdTo.Value);
75+
if (createdTo.HasValue)
76+
result = result.Where(x => x.CreateTime <= createdTo.Value);
6377

64-
return result.Skip(skip).Take(take).ToList();
78+
return result.Skip(skip).Take(take).ToList();
79+
}
6580
}
6681

6782

6883
public async Task<string> CreateEventSubscription(EventSubscription subscription)
6984
{
70-
subscription.Id = Guid.NewGuid().ToString();
71-
_subscriptions.Add(subscription);
72-
return subscription.Id;
85+
lock (_subscriptions)
86+
{
87+
subscription.Id = Guid.NewGuid().ToString();
88+
_subscriptions.Add(subscription);
89+
return subscription.Id;
90+
}
7391
}
7492

7593
public async Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf)
7694
{
77-
return _subscriptions
78-
.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf);
95+
lock (_subscriptions)
96+
{
97+
return _subscriptions
98+
.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf);
99+
}
79100
}
80101

81102
public async Task TerminateSubscription(string eventSubscriptionId)
82103
{
83-
var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId);
84-
_subscriptions.Remove(sub);
104+
lock (_subscriptions)
105+
{
106+
var sub = _subscriptions.Single(x => x.Id == eventSubscriptionId);
107+
_subscriptions.Remove(sub);
108+
}
85109
}
86110

87111
public void EnsureStoreExists()
@@ -90,51 +114,72 @@ public void EnsureStoreExists()
90114

91115
public async Task<string> CreateEvent(Event newEvent)
92116
{
93-
newEvent.Id = Guid.NewGuid().ToString();
94-
_events.Add(newEvent);
95-
return newEvent.Id;
117+
lock (_events)
118+
{
119+
newEvent.Id = Guid.NewGuid().ToString();
120+
_events.Add(newEvent);
121+
return newEvent.Id;
122+
}
96123
}
97124

98125
public async Task MarkEventProcessed(string id)
99126
{
100-
var evt = _events.FirstOrDefault(x => x.Id == id);
101-
if (evt != null)
102-
evt.IsProcessed = true;
127+
lock (_events)
128+
{
129+
var evt = _events.FirstOrDefault(x => x.Id == id);
130+
if (evt != null)
131+
evt.IsProcessed = true;
132+
}
103133
}
104134

105135
public async Task<IEnumerable<string>> GetRunnableEvents()
106136
{
107-
return _events
108-
.Where(x => !x.IsProcessed)
109-
.Where(x => x.EventTime <= DateTime.Now.ToUniversalTime())
110-
.Select(x => x.Id)
111-
.ToList();
137+
lock (_events)
138+
{
139+
return _events
140+
.Where(x => !x.IsProcessed)
141+
.Where(x => x.EventTime <= DateTime.Now.ToUniversalTime())
142+
.Select(x => x.Id)
143+
.ToList();
144+
}
112145
}
113146

114147
public async Task<Event> GetEvent(string id)
115148
{
116-
return _events.FirstOrDefault(x => x.Id == id);
149+
lock (_events)
150+
{
151+
return _events.FirstOrDefault(x => x.Id == id);
152+
}
117153
}
118154

119155
public async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf)
120156
{
121-
return _events
157+
lock (_events)
158+
{
159+
return _events
122160
.Where(x => x.EventName == eventName && x.EventKey == eventKey)
123161
.Where(x => x.EventTime >= asOf)
124162
.Select(x => x.Id)
125163
.ToList();
164+
}
126165
}
127166

128167
public async Task MarkEventUnprocessed(string id)
129168
{
130-
var evt = _events.FirstOrDefault(x => x.Id == id);
131-
if (evt != null)
132-
evt.IsProcessed = false;
169+
lock (_events)
170+
{
171+
var evt = _events.FirstOrDefault(x => x.Id == id);
172+
if (evt != null)
173+
evt.IsProcessed = false;
174+
}
133175
}
134176

135177
public async Task PersistErrors(IEnumerable<ExecutionError> errors)
136178
{
137-
_errors.AddRange(errors);
179+
lock (errors)
180+
{
181+
_errors.AddRange(errors);
182+
}
138183
}
139184
}
140185
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

0 commit comments

Comments
 (0)