Skip to content

Commit 4aa7714

Browse files
committed
guard improvments
1 parent 0f939bb commit 4aa7714

File tree

2 files changed

+34
-22
lines changed

2 files changed

+34
-22
lines changed

src/Dijkstra.NET/Dijkstra.NET/ShortestPath/BfsParallel.cs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace Dijkstra.NET.ShortestPath
22
{
33
using System;
4+
using System.Diagnostics;
45
using System.Threading;
56
using Contract;
67
using Model;
@@ -22,7 +23,7 @@ public BfsParallel(IConcurrentGraph<T, TEdgeCustom> graph) : base(graph)
2223
_table = new ProducerConsumer<T, TEdgeCustom>();
2324
}
2425

25-
public BfsParallel(IConcurrentGraph<T, TEdgeCustom> graph, double guardInterval): base(graph)
26+
public BfsParallel(IConcurrentGraph<T, TEdgeCustom> graph, double guardInterval) : base(graph)
2627
{
2728
_graph = graph;
2829
_table = new ProducerConsumer<T, TEdgeCustom>(guardInterval);
@@ -34,7 +35,8 @@ public override IShortestPathResult Process(uint @from, uint to)
3435

3536
IConcurrentNode<T, TEdgeCustom> nodeFrom = _graph.GetConccurentNode(from);
3637
nodeFrom.Distance = 0;
37-
_table.Produce(nodeFrom);
38+
39+
_table.Initialise = () => _table.Produce(nodeFrom);
3840

3941
_table.Producing = node =>
4042
{
@@ -50,9 +52,14 @@ public override IShortestPathResult Process(uint @from, uint to)
5052

5153
_table.Consuming = job =>
5254
{
53-
if (Reduce(_graph.GetConccurentNode(job.To), job.Distance))
55+
if (Reduce(job.From,_graph.GetConccurentNode(job.To), job.Distance))
5456
{
55-
_result.P.AddOrUpdate(job.To, job.From, (u, u1) => job.From);
57+
Debug.WriteLineIf(job.To == 3, $"Update ({job.From})->({job.To}) [{job.Distance}]");
58+
//_result.P.AddOrUpdate(job.To, job.From, (u, u1) =>
59+
//{
60+
// return job.From;
61+
//});
62+
5663
_table.Produce(_graph.GetConccurentNode(job.To));
5764
}
5865
};
@@ -63,29 +70,26 @@ public override IShortestPathResult Process(uint @from, uint to)
6370

6471
return _result;
6572
}
66-
73+
private object _locker = new object();
74+
private object _locker2 = new object();
6775
public void SetInsurance(int insurance)
6876
{
6977
_table.Insurance = insurance;
7078
}
7179

72-
private bool Reduce(IConcurrentNode<T, TEdgeCustom> to, int distance)
80+
private bool Reduce(uint from, IConcurrentNode<T, TEdgeCustom> to, int distance)
7381
{
74-
var spin = new SpinWait();
75-
76-
while (true)
82+
lock (to)
7783
{
78-
int initialDistance = to.Distance;
79-
80-
if (initialDistance > distance)
84+
if (to.Distance > distance)
8185
{
82-
if (to.TrySetDistance(distance, initialDistance))
83-
return true;
86+
to.Distance = distance;
87+
_result.Path[to.Key] = from;
8488

85-
spin.SpinOnce();
89+
return true;
8690
}
87-
else
88-
return false;
91+
92+
return false;
8993
}
9094
}
9195

src/Dijkstra.NET/Dijkstra.NET/ShortestPath/Utility/ProducerConsumer.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ internal sealed class ProducerConsumer<T, TEdgeCustom> : IDisposable where TEdge
2020

2121
private int _counter;
2222

23-
public ProducerConsumer(double guardInterval = 20)
23+
public ProducerConsumer(double guardInterval = 5)
2424
{
2525
_guardTimer = new Timer(guardInterval);
2626

@@ -42,6 +42,7 @@ public ProducerConsumer(double guardInterval = 20)
4242

4343
public Action<IConcurrentNode<T, TEdgeCustom>> Producing { get; set; }
4444
public Action<MapReduceJob> Consuming { get; set; }
45+
public Action Initialise { get; set; }
4546

4647
public void Produce(IConcurrentNode<T, TEdgeCustom> product)
4748
{
@@ -61,6 +62,8 @@ private void Emit(Emitter emitter)
6162

6263
public void Work()
6364
{
65+
Initialise();
66+
6467
foreach (var emitter in _table.GetConsumingEnumerable())
6568
{
6669
Task.Factory.StartNew(() =>
@@ -81,10 +84,15 @@ public void Work()
8184

8285
private void NotifyGuard()
8386
{
84-
if (Interlocked.Exchange(ref _guardIsWorking, 1) == 0)
85-
_guardTimer.Start();
86-
else
87-
Interlocked.Exchange(ref _counter, 0);
87+
//if (Interlocked.Exchange(ref _guardIsWorking, 1) == 0)
88+
// _guardTimer.Start();
89+
//else
90+
// Interlocked.Exchange(ref _counter, 0);
91+
92+
if (IsNotWorking)
93+
{
94+
Complete();
95+
}
8896
}
8997

9098
private bool IsNotWorking => _table.Count == 0 && Interlocked.CompareExchange(ref _currentJobs, 0, 0) == 0;

0 commit comments

Comments
 (0)