Skip to content

Commit e3fb303

Browse files
Merge pull request #11 from richardschneider/pubsub
Pubsub API
2 parents e06c069 + d85c963 commit e3fb303

File tree

8 files changed

+359
-5
lines changed

8 files changed

+359
-5
lines changed

appveyor.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ environment:
2727
secure: NeX5NCOUXsCLc1UjTJjqB9F02FZ8Wq0VsxqTXC8kBdyK6zjxjebrf/9Da2sY1Kql
2828
snk_secret:
2929
secure: 5QzEIgiDqTIrZruPaIQIvTlNMl5BZ7TGEps7ALyBfHE=
30+
DOTNET_CLI_TELEMETRY_OPTOUT: 1
3031

3132
# tools we need for bulding/testing/deploying
3233
install:
@@ -38,7 +39,7 @@ install:
3839
# - if defined snk_secret secure-file\tools\secure-file -decrypt src\ipfs.ci.snk.enc -secret %snk_secret% -out src\ipfs.dev.snk
3940
- choco install ipfs
4041
- ipfs init
41-
- ps: Start-Process -FilePath "ipfs.exe" -ArgumentList "daemon"
42+
- ps: Start-Process -FilePath "ipfs.exe" -ArgumentList "daemon --enable-pubsub-experiment"
4243

4344
# gitversion will change the assembly info
4445
pull_requests:

src/CoreApi/PubSubApi.cs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
using Common.Logging;
2+
using Newtonsoft.Json;
3+
using Newtonsoft.Json.Linq;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.IO;
7+
using System.Linq;
8+
using System.Text;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace Ipfs.Api
13+
{
14+
15+
/// <summary>
16+
/// Allows you to publish messages to a given topic, and also to
17+
/// subscribe to new messages on a given topic.
18+
/// </summary>
19+
/// <remarks>
20+
/// This API is accessed via the <see cref="IpfsClient.PubSub"/> property.
21+
/// <para>
22+
/// This is an experimental feature. It is not intended in its current state
23+
/// to be used in a production environment.
24+
/// </para>
25+
/// <para>
26+
/// To use, the daemon must be run with '--enable-pubsub-experiment'.
27+
/// </para>
28+
/// </remarks>
29+
/// <seealso href="https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/PUBSUB.md">PUBSUB API</seealso>
30+
public class PubSubApi
31+
{
32+
static ILog log = LogManager.GetLogger<PubSubApi>();
33+
34+
IpfsClient ipfs;
35+
36+
internal PubSubApi(IpfsClient ipfs)
37+
{
38+
this.ipfs = ipfs;
39+
}
40+
41+
/// <summary>
42+
/// Get the subscribed topics.
43+
/// </summary>
44+
/// <returns>
45+
/// A sequence of <see cref="string"/> for each topic.
46+
/// </returns>
47+
public async Task<IEnumerable<string>> SubscribedTopicsAsync()
48+
{
49+
var json = await ipfs.DoCommandAsync("pubsub/ls");
50+
var result = JObject.Parse(json);
51+
var strings = result["Strings"] as JArray;
52+
if (strings == null) return new string[0];
53+
return strings.Select(s => (string)s);
54+
}
55+
56+
/// <summary>
57+
/// Get the peers that are pubsubing with us.
58+
/// </summary>
59+
/// <returns>
60+
/// A sequence of <see cref="string"/> for each peer ID.
61+
/// </returns>
62+
public async Task<IEnumerable<string>> PeersAsync()
63+
{
64+
var json = await ipfs.DoCommandAsync("pubsub/peers");
65+
var result = JObject.Parse(json);
66+
var strings = result["Strings"] as JArray;
67+
if (strings == null) return new string[0];
68+
return strings.Select(s => (string)s);
69+
}
70+
71+
/// <summary>
72+
/// Publish a message to a given topic.
73+
/// </summary>
74+
/// <param name="topic">
75+
/// The topic name.
76+
/// </param>
77+
/// <param name="message">
78+
/// The message to publish.
79+
/// </param>
80+
public async Task Publish(string topic, string message)
81+
{
82+
var _ = await ipfs.PostCommandAsync("pubsub/pub", topic, "arg=" + message);
83+
return;
84+
}
85+
86+
/// <summary>
87+
/// Subscribe to messages on a given topic.
88+
/// </summary>
89+
/// <param name="topic">
90+
/// The topic name.
91+
/// </param>
92+
/// <param name="handler">
93+
/// The action to perform when a <see cref="PublishedMessage"/> is received.
94+
/// </param>
95+
/// <param name="cancellationToken">
96+
/// Is used to stop the topic listener. When cancelled, the <see cref="OperationCanceledException"/>
97+
/// is <b>NOT</b> raised.
98+
/// </param>
99+
/// <returns>
100+
/// After the topic listener is register with the IPFS server.
101+
/// </returns>
102+
/// <remarks>
103+
/// The <paramref name="handler"/> is invoked on the topic listener thread.
104+
/// </remarks>
105+
public async Task Subscribe(string topic, Action<PublishedMessage> handler, CancellationToken cancellationToken = default(CancellationToken))
106+
{
107+
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", topic);
108+
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
109+
Task.Run(() => ProcessMessages(topic, handler, messageStream, cancellationToken));
110+
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
111+
return;
112+
}
113+
114+
void ProcessMessages(string topic, Action<PublishedMessage> handler, Stream stream, CancellationToken ct)
115+
{
116+
log.DebugFormat("Start listening for '{0}' messages", topic);
117+
using (var sr = new StreamReader(stream))
118+
{
119+
while (!sr.EndOfStream && !ct.IsCancellationRequested)
120+
{
121+
var json = sr.ReadLine();
122+
if (log.IsDebugEnabled)
123+
log.DebugFormat("PubSub message {0}", json);
124+
if (json != "{}" && !ct.IsCancellationRequested)
125+
{
126+
handler(new PublishedMessage(json));
127+
}
128+
}
129+
}
130+
log.DebugFormat("Stop listening for '{0}' messages", topic);
131+
}
132+
133+
}
134+
135+
}

src/IpfsClient.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public IpfsClient()
5959
Dag = new DagApi(this);
6060
Object = new ObjectApi(this);
6161
FileSystem = new FileSystemApi(this);
62+
PubSub = new PubSubApi(this);
6263
}
6364

6465
/// <summary>
@@ -145,6 +146,11 @@ public IpfsClient(string host)
145146
/// </summary>
146147
public FileSystemApi FileSystem { get; private set; }
147148

149+
/// <summary>
150+
/// Provides access to the <see cref="PubSubApi">PubSub API</see>.
151+
/// </summary>
152+
public PubSubApi PubSub { get; private set; }
153+
148154
Uri BuildCommand(string command, string arg = null, params string[] options)
149155
{
150156
var url = "/api/v0/" + command;
@@ -288,7 +294,9 @@ public async Task<string> PostCommandAsync(string command, string arg = null, pa
288294
var url = BuildCommand(command, arg, options);
289295
if (log.IsDebugEnabled)
290296
log.Debug("POST " + url.ToString());
291-
using (var response = await Api().PostAsync(url, null))
297+
var request = new HttpRequestMessage(HttpMethod.Post, url);
298+
299+
using (var response = await Api().SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
292300
{
293301
await ThrowOnErrorAsync(response);
294302
var body = await response.Content.ReadAsStringAsync();
@@ -329,7 +337,7 @@ public async Task<T> PostCommandAsync<T>(string command, string arg = null, para
329337
}
330338

331339
/// <summary>
332-
/// Post an <see href="https://ipfs.io/docs/api/">IPFS API command</see> returning a string.
340+
/// Post an <see href="https://ipfs.io/docs/api/">IPFS API command</see> returning a stream.
333341
/// </summary>
334342
/// <param name="command">
335343
/// The <see href="https://ipfs.io/docs/api/">IPFS API command</see>, such as
@@ -349,7 +357,9 @@ public async Task<Stream> PostDownloadAsync(string command, string arg = null, p
349357
var url = BuildCommand(command, arg, options);
350358
if (log.IsDebugEnabled)
351359
log.Debug("POST " + url.ToString());
352-
var response = await Api().PostAsync(url, null);
360+
var request = new HttpRequestMessage(HttpMethod.Post, url);
361+
362+
var response = await Api().SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
353363
await ThrowOnErrorAsync(response);
354364
return await response.Content.ReadAsStreamAsync();
355365
}

src/PublishedMessage.cs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using Newtonsoft.Json.Linq;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.IO;
6+
using System.Text;
7+
8+
namespace Ipfs.Api
9+
{
10+
/// <summary>
11+
/// A published message.
12+
/// </summary>
13+
/// <remarks>
14+
/// The <see cref="PubSubApi"/> is used to publish and subsribe to a message.
15+
/// </remarks>
16+
public class PublishedMessage
17+
{
18+
/// <summary>
19+
/// Creates a new instance of <see cref="PublishedMessage"/> from the
20+
/// specified JSON string.
21+
/// </summary>
22+
/// <param name="json">
23+
/// The JSON representation of a published message.
24+
/// </param>
25+
public PublishedMessage(string json)
26+
{
27+
var o = JObject.Parse(json);
28+
this.Sender = Convert.FromBase64String((string)o["from"]).ToBase58();
29+
this.SequenceNumber = Convert.FromBase64String((string)o["seqno"]);
30+
this.DataBytes = Convert.FromBase64String((string)o["data"]);
31+
this.Topics = ((JArray)o["topicIDs"]).Select(t => (string)t);
32+
}
33+
34+
/// <summary>
35+
/// The sender of the message.
36+
/// </summary>
37+
/// <remarks>
38+
/// This is the peer ID of the node that sent the message.
39+
/// </remarks>
40+
public string Sender { get; private set; }
41+
42+
/// <summary>
43+
/// The topics of the message.
44+
/// </summary>
45+
public IEnumerable<string> Topics { get; private set; }
46+
47+
/// <summary>
48+
/// The sequence number of the message.
49+
/// </summary>
50+
public byte[] SequenceNumber { get; private set; }
51+
52+
/// <summary>
53+
/// Contents as a byte array.
54+
/// </summary>
55+
/// <value>
56+
/// The contents as a sequence of bytes.
57+
/// </value>
58+
public byte[] DataBytes { get; private set; }
59+
60+
/// <summary>
61+
/// Contents as a stream of bytes.
62+
/// </summary>
63+
/// <value>
64+
/// The contents as a stream of bytes.
65+
/// </value>
66+
public Stream DataStream
67+
{
68+
get
69+
{
70+
return new MemoryStream(DataBytes, false);
71+
}
72+
}
73+
74+
/// <summary>
75+
/// Contents as a string.
76+
/// </summary>
77+
/// <value>
78+
/// The contents interpreted as a UTF-8 string.
79+
/// </value>
80+
public string DataString
81+
{
82+
get
83+
{
84+
return Encoding.UTF8.GetString(DataBytes);
85+
}
86+
}
87+
}
88+
}

test/CoreApi/PubSubApiTest.cs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using Ipfs.Api;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
using Newtonsoft.Json.Linq;
4+
using System;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
namespace Ipfs.Api
11+
{
12+
13+
[TestClass]
14+
public class PubSubApiTest
15+
{
16+
17+
[TestMethod]
18+
public void Api_Exists()
19+
{
20+
IpfsClient ipfs = TestFixture.Ipfs;
21+
Assert.IsNotNull(ipfs.PubSub);
22+
}
23+
24+
[TestMethod]
25+
public void Peers()
26+
{
27+
var ipfs = TestFixture.Ipfs;
28+
var peers = ipfs.PubSub.PeersAsync().Result.ToArray();
29+
Assert.IsTrue(peers.Length > 0);
30+
}
31+
32+
[TestMethod]
33+
public void Subscribed_Topics()
34+
{
35+
var ipfs = TestFixture.Ipfs;
36+
var topics = ipfs.PubSub.SubscribedTopicsAsync().Result.ToArray();
37+
// TODO: Assert.IsTrue(peers.Length > 0);
38+
}
39+
40+
volatile int messageCount = 0;
41+
42+
[TestMethod]
43+
public async Task Subscribe()
44+
{
45+
messageCount = 0;
46+
var ipfs = TestFixture.Ipfs;
47+
var topic = "net-ipfs-api-test-" + Guid.NewGuid().ToString();
48+
await ipfs.PubSub.Subscribe(topic, msg =>
49+
{
50+
Interlocked.Increment(ref messageCount);
51+
});
52+
await ipfs.PubSub.Publish(topic, "hello world!");
53+
54+
await Task.Delay(1000);
55+
Assert.AreEqual(1, messageCount);
56+
}
57+
58+
volatile int messageCount1 = 0;
59+
60+
[TestMethod]
61+
public async Task Unsubscribe()
62+
{
63+
messageCount1 = 0;
64+
var ipfs = TestFixture.Ipfs;
65+
var topic = "net-ipfs-api-test-" + Guid.NewGuid().ToString();
66+
var cs = new CancellationTokenSource();
67+
await ipfs.PubSub.Subscribe(topic, msg =>
68+
{
69+
Interlocked.Increment(ref messageCount1);
70+
}, cs.Token);
71+
await ipfs.PubSub.Publish(topic, "hello world!");
72+
await Task.Delay(1000);
73+
Assert.AreEqual(1, messageCount1);
74+
75+
cs.Cancel();
76+
await ipfs.PubSub.Publish(topic, "hello world!!!");
77+
await Task.Delay(1000);
78+
Assert.AreEqual(1, messageCount1);
79+
}
80+
}
81+
}

test/CoreApi/SwarmApiTest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public async Task Peers_Info()
4545
var ipfs = TestFixture.Ipfs;
4646
var peers = await ipfs.Swarm.PeersAsync();
4747
await Task.WhenAll(peers
48+
.Where(p => p.Latency != TimeSpan.Zero)
49+
.OrderBy(p => p.Latency)
4850
.Take(10)
4951
.Select(async p =>
5052
{

test/IpfsApiTests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFrameworks>netcoreapp1.1;netcoreapp2.0;net45</TargetFrameworks>
4+
<TargetFrameworks>net45;netcoreapp1.1;netcoreapp2.0</TargetFrameworks>
55

66
<IsPackable>false</IsPackable>
77
<DebugType>full</DebugType>

0 commit comments

Comments
 (0)