Skip to content

Commit ebfa709

Browse files
authored
Merge pull request #35 from zetroot/feature/parallel-observations
+semver: feature
2 parents c9f9828 + 1ea9043 commit ebfa709

File tree

2 files changed

+62
-38
lines changed

2 files changed

+62
-38
lines changed

src/NTorSpectator.Observer/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
var jobDetail = JobBuilder.Create<SpectatorJob>()
5151
.WithDescription("Tor spectator job")
5252
.WithIdentity("tor-spectator")
53+
.DisallowConcurrentExecution()
5354
.Build();
5455
cfg.AddJob<SpectatorJob>(jobKey: jobDetail.Key, configure: j => {});
5556
cfg.AddTrigger(t => t.WithCronSchedule("0 0 * * * ?").ForJob(jobDetail));

src/NTorSpectator.Observer/Services/SpectatorJob.cs

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Collections.Concurrent;
12
using System.Diagnostics;
23
using NTorSpectator.Observer.TorIntegration;
34
using NTorSpectator.Services;
@@ -22,15 +23,13 @@ public class SpectatorJob : IJob
2223

2324
private readonly ILogger<SpectatorJob> _logger;
2425
private readonly ISitesCatalogue _sitesCatalogue;
25-
private readonly TorControlManager _torControl;
26-
private readonly ISiteObserver _siteObserver;
27-
28-
public SpectatorJob(ILogger<SpectatorJob> logger, ISitesCatalogue sitesCatalogue, TorControlManager torControl, ISiteObserver siteObserver)
26+
private readonly IServiceProvider _services;
27+
28+
public SpectatorJob(ILogger<SpectatorJob> logger, ISitesCatalogue sitesCatalogue, IServiceProvider services)
2929
{
3030
_logger = logger;
3131
_sitesCatalogue = sitesCatalogue;
32-
_torControl = torControl;
33-
_siteObserver = siteObserver;
32+
_services = services;
3433
}
3534

3635
public async Task Execute(IJobExecutionContext context)
@@ -40,50 +39,74 @@ public async Task Execute(IJobExecutionContext context)
4039
_logger.LogDebug("Starting sites observations");
4140
var sites = await _sitesCatalogue.GetAllSites();
4241
_logger.LogDebug("Got {Count} sites to observe", sites.Count);
43-
44-
var siteQueue = new Queue<QueuedSite>(sites.Select(x => new QueuedSite(x, 0)));
45-
while(siteQueue.TryDequeue(out var queuedSite))
42+
var queuedSites = sites.Select(x => new QueuedSite(x, 0));
43+
var siteQueue = new ConcurrentQueue<QueuedSite>(queuedSites);
44+
while(true)
4645
{
4746
QueueLength.Set(siteQueue.Count);
48-
using var _ = _logger.BeginScope(new Dictionary<string, object> { { "HiddenService", queuedSite.Site.SiteUri } });
49-
_logger.LogDebug("Starting observations on the next site");
50-
try
51-
{
52-
var observations = await ObserveSite(queuedSite.Site.SiteUri);
53-
ObservationsCount.Inc();
54-
if (!observations.IsOk)
55-
{
56-
_logger.LogDebug("Site observed as not available");
57-
var siteObservationsCount = queuedSite.ObservationsCount;
58-
if (siteObservationsCount < 3)
59-
{
60-
_logger.LogDebug("Site has been observed {Count} times, returning it to queue", siteObservationsCount);
61-
siteQueue.Enqueue(queuedSite with{ObservationsCount = siteObservationsCount + 1});
62-
RetriesCount.Inc();
63-
continue;
64-
}
65-
}
66-
_logger.LogDebug("Site seems to be up");
67-
await _siteObserver.AddNewObservation(queuedSite.Site.SiteUri, observations.IsOk);
68-
SiteStatus.WithLabels(queuedSite.Site.SiteUri).Set(observations.IsOk ? 1 : 0);
69-
_logger.LogInformation("Site observed");
70-
}
71-
catch (Exception e)
72-
{
73-
_logger.LogError(e, "Observation for site failed");
74-
}
47+
if (!siteQueue.Any())
48+
break;
49+
var chunk = TryDequeueMax(siteQueue, 4);
50+
var tasks = chunk.Select(x => TryCheckSite(x, s => siteQueue.Enqueue(s))).ToArray();
51+
await Task.WhenAll(tasks);
7552
}
7653
_logger.LogDebug("The queue is finally empty, observations finished");
7754
sw.Stop();
7855
TotalSessionDuration.Set(sw.ElapsedMilliseconds);
7956
}
57+
58+
59+
private QueuedSite[] TryDequeueMax(ConcurrentQueue<QueuedSite> queue, int max)
60+
{
61+
var result = new List<QueuedSite>(max);
62+
for (int i = 0; i < max; ++i)
63+
{
64+
if (queue.TryDequeue(out var item))
65+
result.Add(item);
66+
}
67+
return result.ToArray();
68+
}
8069

70+
private async Task TryCheckSite(QueuedSite queuedSite, Action<QueuedSite> putBackIntoQueue)
71+
{
72+
using var _ = _logger.BeginScope(new Dictionary<string, object> { { "HiddenService", queuedSite.Site.SiteUri } });
73+
using var scope = _services.CreateScope();
74+
var torControl = scope.ServiceProvider.GetRequiredService<TorControlManager>();
75+
var siteObserver = scope.ServiceProvider.GetRequiredService<ISiteObserver>();
76+
_logger.LogDebug("Starting observations on the next site");
77+
try
78+
{
79+
var observations = await ObserveSite(queuedSite.Site.SiteUri, torControl);
80+
ObservationsCount.Inc();
81+
if (!observations.IsOk)
82+
{
83+
_logger.LogDebug("Site observed as not available");
84+
var siteObservationsCount = queuedSite.ObservationsCount;
85+
if (siteObservationsCount < 3)
86+
{
87+
_logger.LogDebug("Site has been observed {Count} times, returning it to queue", siteObservationsCount);
88+
putBackIntoQueue(queuedSite with { ObservationsCount = siteObservationsCount + 1 });
89+
RetriesCount.Inc();
90+
return;
91+
}
92+
}
93+
_logger.LogDebug("Site seems to be up");
94+
await siteObserver.AddNewObservation(queuedSite.Site.SiteUri, observations.IsOk);
95+
SiteStatus.WithLabels(queuedSite.Site.SiteUri).Set(observations.IsOk ? 1 : 0);
96+
_logger.LogInformation("Site observed");
97+
}
98+
catch (Exception e)
99+
{
100+
_logger.LogError(e, "Observation for site failed");
101+
}
102+
}
103+
81104
private record QueuedSite(Site Site, int ObservationsCount);
82105

83-
private async Task<TorWatchResults> ObserveSite(string site)
106+
private async Task<TorWatchResults> ObserveSite(string site, TorControlManager torControl)
84107
{
85108
using var _ = RequestDuration.NewTimer();
86-
var torReply = await _torControl.HsFetch(site);
109+
var torReply = await torControl.HsFetch(site);
87110
var positive = torReply.Count(x => x.Action == HsDescAction.Received);
88111
var negative = torReply.Count(x => x.Action == HsDescAction.Failed);
89112
return new(site, positive, negative);

0 commit comments

Comments
 (0)