forked from OrleansContrib/Orleankka
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTopic.cs
More file actions
135 lines (111 loc) · 3.52 KB
/
Topic.cs
File metadata and controls
135 lines (111 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleankka;
using Orleankka.Meta;
namespace Demo
{
[Serializable]
public class CreateTopic : Command
{
public readonly string Query;
public readonly IDictionary<ActorRef, TimeSpan> Schedule;
public CreateTopic(string query, IDictionary<ActorRef, TimeSpan> schedule)
{
Query = query;
Schedule = schedule;
}
}
[Actor(Placement = Placement.DistributeEvenly)]
public class Topic : Actor
{
readonly ITopicStorage storage;
const int MaxRetries = 3;
static readonly TimeSpan RetryPeriod = TimeSpan.FromSeconds(5);
readonly IDictionary<string, int> retrying = new Dictionary<string, int>();
internal int total;
string query;
public Topic()
{
storage = ServiceLocator.TopicStorage;
}
public Topic(string id, IActorRuntime runtime, ITopicStorage storage) : base(id, runtime)
{
this.storage = storage;
}
public override async Task OnActivate()
{
total = await storage.ReadTotalAsync(Id);
}
public async Task Handle(CreateTopic cmd)
{
query = cmd.Query;
foreach (var entry in cmd.Schedule)
await Reminders.Register(entry.Key.Path.Id, TimeSpan.Zero, entry.Value);
}
public override async Task OnReminder(string api)
{
try
{
if (!IsRetrying(api))
await Search(api);
}
catch (ApiUnavailableException)
{
ScheduleRetries(api);
}
}
bool IsRetrying(string api)
{
return retrying.ContainsKey(api);
}
public void ScheduleRetries(string api)
{
retrying.Add(api, 0);
Timers.Register(api, RetryPeriod, RetryPeriod, api, RetrySearch);
}
public async Task RetrySearch(object state)
{
var api = (string)state;
try
{
await Search(api);
CancelRetries(api);
}
catch (ApiUnavailableException)
{
RecordFailedRetry(api);
if (MaxRetriesReached(api))
{
DisableSearch(api);
CancelRetries(api);
}
}
}
void RecordFailedRetry(string api)
{
Log.Message(ConsoleColor.DarkRed, "[{0}] failed to obtain results from {1} ...", Id, api);
retrying[api] += 1;
}
bool MaxRetriesReached(string api)
{
return retrying[api] == MaxRetries;
}
void CancelRetries(string api)
{
Timers.Unregister(api);
retrying.Remove(api);
}
async Task Search(string api)
{
var provider = System.ActorOf<Api>(api);
total += await provider.Ask(new Search(query));
Log.Message(ConsoleColor.DarkGray, "[{0}] succesfully obtained results from {1} ...", Id, api);
await storage.WriteTotalAsync(Id, total);
}
void DisableSearch(string api)
{
Reminders.Unregister(api);
}
}
}