Skip to content

Commit 601e8af

Browse files
authored
Merge pull request #39 from zetroot/revert-35-feature/parallel-observations
Revert "Feature/parallel observations"
2 parents 0ca9161 + 4fac191 commit 601e8af

File tree

2 files changed

+38
-62
lines changed

2 files changed

+38
-62
lines changed

src/NTorSpectator.Observer/Program.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
var jobDetail = JobBuilder.Create<SpectatorJob>()
5151
.WithDescription("Tor spectator job")
5252
.WithIdentity("tor-spectator")
53-
.DisallowConcurrentExecution()
5453
.Build();
5554
cfg.AddJob<SpectatorJob>(jobKey: jobDetail.Key, configure: j => {});
5655
cfg.AddTrigger(t => t.WithCronSchedule("0 0 * * * ?").ForJob(jobDetail));

src/NTorSpectator.Observer/Services/SpectatorJob.cs

Lines changed: 38 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Collections.Concurrent;
21
using System.Diagnostics;
32
using NTorSpectator.Observer.TorIntegration;
43
using NTorSpectator.Services;
@@ -23,13 +22,15 @@ public class SpectatorJob : IJob
2322

2423
private readonly ILogger<SpectatorJob> _logger;
2524
private readonly ISitesCatalogue _sitesCatalogue;
26-
private readonly IServiceProvider _services;
27-
28-
public SpectatorJob(ILogger<SpectatorJob> logger, ISitesCatalogue sitesCatalogue, IServiceProvider services)
25+
private readonly TorControlManager _torControl;
26+
private readonly ISiteObserver _siteObserver;
27+
28+
public SpectatorJob(ILogger<SpectatorJob> logger, ISitesCatalogue sitesCatalogue, TorControlManager torControl, ISiteObserver siteObserver)
2929
{
3030
_logger = logger;
3131
_sitesCatalogue = sitesCatalogue;
32-
_services = services;
32+
_torControl = torControl;
33+
_siteObserver = siteObserver;
3334
}
3435

3536
public async Task Execute(IJobExecutionContext context)
@@ -39,74 +40,50 @@ public async Task Execute(IJobExecutionContext context)
3940
_logger.LogDebug("Starting sites observations");
4041
var sites = await _sitesCatalogue.GetAllSites();
4142
_logger.LogDebug("Got {Count} sites to observe", sites.Count);
42-
var queuedSites = sites.Select(x => new QueuedSite(x, 0));
43-
var siteQueue = new ConcurrentQueue<QueuedSite>(queuedSites);
44-
while(true)
43+
44+
var siteQueue = new Queue<QueuedSite>(sites.Select(x => new QueuedSite(x, 0)));
45+
while(siteQueue.TryDequeue(out var queuedSite))
4546
{
4647
QueueLength.Set(siteQueue.Count);
47-
if (!siteQueue.Any())
48-
break;
49-
var chunk = TryDequeueMax(siteQueue, 4);
50-
var tasks = chunk.Select(x => TryCheckSite(x, s => siteQueue.Enqueue(s))).ToArray();
51-
await Task.WhenAll(tasks);
52-
}
53-
_logger.LogDebug("The queue is finally empty, observations finished");
54-
sw.Stop();
55-
TotalSessionDuration.Set(sw.ElapsedMilliseconds);
56-
}
57-
58-
59-
private QueuedSite[] TryDequeueMax(ConcurrentQueue<QueuedSite> queue, int max)
60-
{
61-
var result = new List<QueuedSite>(max);
62-
for (int i = 0; i < max; ++i)
63-
{
64-
if (queue.TryDequeue(out var item))
65-
result.Add(item);
66-
}
67-
return result.ToArray();
68-
}
69-
70-
private async Task TryCheckSite(QueuedSite queuedSite, Action<QueuedSite> putBackIntoQueue)
71-
{
72-
using var _ = _logger.BeginScope(new Dictionary<string, object> { { "HiddenService", queuedSite.Site.SiteUri } });
73-
using var scope = _services.CreateScope();
74-
var torControl = scope.ServiceProvider.GetRequiredService<TorControlManager>();
75-
var siteObserver = scope.ServiceProvider.GetRequiredService<ISiteObserver>();
76-
_logger.LogDebug("Starting observations on the next site");
77-
try
78-
{
79-
var observations = await ObserveSite(queuedSite.Site.SiteUri, torControl);
80-
ObservationsCount.Inc();
81-
if (!observations.IsOk)
48+
using var _ = _logger.BeginScope(new Dictionary<string, object> { { "HiddenService", queuedSite.Site.SiteUri } });
49+
_logger.LogDebug("Starting observations on the next site");
50+
try
8251
{
83-
_logger.LogDebug("Site observed as not available");
84-
var siteObservationsCount = queuedSite.ObservationsCount;
85-
if (siteObservationsCount < 3)
52+
var observations = await ObserveSite(queuedSite.Site.SiteUri);
53+
ObservationsCount.Inc();
54+
if (!observations.IsOk)
8655
{
87-
_logger.LogDebug("Site has been observed {Count} times, returning it to queue", siteObservationsCount);
88-
putBackIntoQueue(queuedSite with { ObservationsCount = siteObservationsCount + 1 });
89-
RetriesCount.Inc();
90-
return;
56+
_logger.LogDebug("Site observed as not available");
57+
var siteObservationsCount = queuedSite.ObservationsCount;
58+
if (siteObservationsCount < 3)
59+
{
60+
_logger.LogDebug("Site has been observed {Count} times, returning it to queue", siteObservationsCount);
61+
siteQueue.Enqueue(queuedSite with{ObservationsCount = siteObservationsCount + 1});
62+
RetriesCount.Inc();
63+
continue;
64+
}
9165
}
66+
_logger.LogDebug("Site seems to be up");
67+
await _siteObserver.AddNewObservation(queuedSite.Site.SiteUri, observations.IsOk);
68+
SiteStatus.WithLabels(queuedSite.Site.SiteUri).Set(observations.IsOk ? 1 : 0);
69+
_logger.LogInformation("Site observed");
70+
}
71+
catch (Exception e)
72+
{
73+
_logger.LogError(e, "Observation for site failed");
9274
}
93-
_logger.LogDebug("Site seems to be up");
94-
await siteObserver.AddNewObservation(queuedSite.Site.SiteUri, observations.IsOk);
95-
SiteStatus.WithLabels(queuedSite.Site.SiteUri).Set(observations.IsOk ? 1 : 0);
96-
_logger.LogInformation("Site observed");
97-
}
98-
catch (Exception e)
99-
{
100-
_logger.LogError(e, "Observation for site failed");
10175
}
76+
_logger.LogDebug("The queue is finally empty, observations finished");
77+
sw.Stop();
78+
TotalSessionDuration.Set(sw.ElapsedMilliseconds);
10279
}
103-
80+
10481
private record QueuedSite(Site Site, int ObservationsCount);
10582

106-
private async Task<TorWatchResults> ObserveSite(string site, TorControlManager torControl)
83+
private async Task<TorWatchResults> ObserveSite(string site)
10784
{
10885
using var _ = RequestDuration.NewTimer();
109-
var torReply = await torControl.HsFetch(site);
86+
var torReply = await _torControl.HsFetch(site);
11087
var positive = torReply.Count(x => x.Action == HsDescAction.Received);
11188
var negative = torReply.Count(x => x.Action == HsDescAction.Failed);
11289
return new(site, positive, negative);

0 commit comments

Comments
 (0)