Skip to content

Commit 2973066

Browse files
feat(PubSubApi): publish a binary message
1 parent 1b377be commit 2973066

File tree

3 files changed

+59
-9
lines changed

3 files changed

+59
-9
lines changed

src/CoreApi/PubSubApi.cs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,23 @@ internal PubSubApi(IpfsClient ipfs)
4242
return strings.Select(s => new Peer { Id = (string)s } );
4343
}
4444

45+
public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken))
46+
{
47+
var url = new StringBuilder();
48+
url.Append("/api/v0/pubsub/pub");
49+
url.Append("?arg=");
50+
url.Append(System.Net.WebUtility.UrlEncode(topic));
51+
url.Append("&arg=");
52+
var data = Encoding.ASCII.GetString(System.Net.WebUtility.UrlEncodeToBytes(message, 0, message.Length));
53+
url.Append(data);
54+
return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), cancel);
55+
}
56+
57+
public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
58+
{
59+
throw new NotImplementedException();
60+
}
61+
4562
public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
4663
{
4764
var _ = await ipfs.DoCommandAsync("pubsub/pub", cancel, topic, "arg=" + message);
@@ -102,15 +119,6 @@ void ProcessMessages(string topic, Action<PublishedMessage> handler, StreamReade
102119
log.DebugFormat("Stop listening for '{0}' messages", topic);
103120
}
104121

105-
public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken))
106-
{
107-
throw new NotImplementedException();
108-
}
109-
110-
public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
111-
{
112-
throw new NotImplementedException();
113-
}
114122
}
115123

116124
}

src/IpfsClient.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,21 @@ public async Task<string> DoCommandAsync(string command, CancellationToken cance
272272
}
273273
}
274274

275+
internal async Task DoCommandAsync(Uri url, CancellationToken cancel)
276+
{
277+
if (log.IsDebugEnabled)
278+
log.Debug("POST " + url.ToString());
279+
using (var response = await Api().PostAsync(url, null, cancel))
280+
{
281+
await ThrowOnErrorAsync(response);
282+
var body = await response.Content.ReadAsStringAsync();
283+
if (log.IsDebugEnabled)
284+
log.Debug("RSP " + body);
285+
return;
286+
}
287+
}
288+
289+
275290
/// <summary>
276291
/// Perform an <see href="https://ipfs.io/docs/api/">IPFS API command</see> returning
277292
/// a specific <see cref="Type"/>.

test/CoreApi/PubSubApiTest.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Microsoft.VisualStudio.TestTools.UnitTesting;
33
using Newtonsoft.Json.Linq;
44
using System;
5+
using System.Collections.Generic;
56
using System.Linq;
67
using System.Text;
78
using System.Threading;
@@ -173,5 +174,31 @@ await ipfs.PubSub.SubscribeAsync(topic, msg =>
173174
await Task.Delay(1000);
174175
Assert.AreEqual(1, messageCount1);
175176
}
177+
178+
[TestMethod]
179+
public async Task Subscribe_BinaryMessage()
180+
{
181+
var messages = new List<IPublishedMessage>();
182+
var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff };
183+
var ipfs = TestFixture.Ipfs;
184+
var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString();
185+
var cs = new CancellationTokenSource();
186+
try
187+
{
188+
await ipfs.PubSub.SubscribeAsync(topic, msg =>
189+
{
190+
messages.Add(msg);
191+
}, cs.Token);
192+
await ipfs.PubSub.PublishAsync(topic, expected);
193+
194+
await Task.Delay(1000);
195+
Assert.AreEqual(1, messages.Count);
196+
CollectionAssert.AreEqual(expected, messages[0].DataBytes);
197+
}
198+
finally
199+
{
200+
cs.Cancel();
201+
}
202+
}
176203
}
177204
}

0 commit comments

Comments
 (0)