Skip to content

Commit c0b29e5

Browse files
feat(faye): add websocket client (#110)
Co-authored-by: ferhat elmas <[email protected]>
1 parent a4f2786 commit c0b29e5

34 files changed

+1484
-11
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: build
22

33
on: [pull_request]
44

5-
concurrency:
5+
concurrency:
66
group: ${{ github.workflow }}-${{ github.head_ref }}
77
cancel-in-progress: true
88

@@ -31,6 +31,7 @@ jobs:
3131
env:
3232
STREAM_KEY: ${{ secrets.STREAM_KEY }}
3333
STREAM_SECRET: ${{ secrets.STREAM_SECRET }}
34+
STREAM_APP_ID: ${{ secrets.STREAM_APP_ID }}
3435
run: |
3536
./gradlew spotlessCheck --no-daemon
3637
./gradlew test --no-daemon

.github/workflows/initiate_release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
VERSION: ${{ github.event.inputs.version }}
2222
run: |
2323
npx --yes [email protected] --release-as "$VERSION" --skip.tag --skip.commit --tag-prefix=v
24-
git config --global user.name 'github-actions'
24+
git config --global user.name 'github-actions'
2525
git config --global user.email '[email protected]'
2626
git checkout -q -b "release-$VERSION"
2727
git commit -am "chore(release): $VERSION"

.github/workflows/release.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
- uses: actions/checkout@v3
1616
with:
1717
fetch-depth: 0
18-
18+
1919
- uses: actions/github-script@v5
2020
with:
2121
script: |
@@ -34,6 +34,7 @@ jobs:
3434
env:
3535
STREAM_KEY: ${{ secrets.STREAM_KEY }}
3636
STREAM_SECRET: ${{ secrets.STREAM_SECRET }}
37+
STREAM_APP_ID: ${{ secrets.STREAM_APP_ID }}
3738
GPG_KEY_CONTENTS: ${{ secrets.GPG_KEY_CONTENTS }}
3839
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
3940
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}

src/main/java/io/getstream/cloud/CloudAggregatedFeed.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ public class CloudAggregatedFeed extends CloudFeed {
2020
super(client, id);
2121
}
2222

23+
CloudAggregatedFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
24+
super(client, id, subscriber);
25+
}
26+
2327
public CompletableFuture<? extends List<? extends Group<Activity>>> getActivities()
2428
throws StreamException {
2529
return getActivities(

src/main/java/io/getstream/cloud/CloudClient.java

Lines changed: 135 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import io.getstream.core.Region;
77
import io.getstream.core.Stream;
88
import io.getstream.core.exceptions.StreamException;
9+
import io.getstream.core.faye.DefaultMessageTransformer;
10+
import io.getstream.core.faye.Message;
11+
import io.getstream.core.faye.client.FayeClient;
12+
import io.getstream.core.faye.subscription.ChannelSubscription;
913
import io.getstream.core.http.HTTPClient;
1014
import io.getstream.core.http.OKHTTPClientAdapter;
1115
import io.getstream.core.http.Response;
@@ -14,20 +18,38 @@
1418
import io.getstream.core.models.Data;
1519
import io.getstream.core.models.FeedID;
1620
import io.getstream.core.models.OGData;
21+
import io.getstream.core.models.RealtimeMessage;
1722
import io.getstream.core.options.RequestOption;
23+
import io.getstream.core.utils.Serialization;
1824
import java.net.MalformedURLException;
1925
import java.net.URL;
26+
import java.util.HashMap;
27+
import java.util.Map;
2028
import java8.util.concurrent.CompletableFuture;
2129

2230
public final class CloudClient {
31+
private final String apiKey;
2332
private final Token token;
33+
private final String appID;
2434
private final String userID;
2535
private final Stream stream;
26-
27-
private CloudClient(String key, Token token, String userID, URL baseURL, HTTPClient httpClient) {
36+
private final FayeClient faye;
37+
38+
private CloudClient(
39+
String key,
40+
Token token,
41+
String userID,
42+
String appID,
43+
URL baseURL,
44+
HTTPClient httpClient,
45+
URL fayeURL) {
46+
this.apiKey = key;
2847
this.token = token;
48+
this.appID = appID;
2949
this.userID = userID;
3050
this.stream = new Stream(key, baseURL, httpClient);
51+
this.faye = new FayeClient(fayeURL);
52+
this.faye.setMessageTransformer(new FayeMessageTransformer());
3153
}
3254

3355
public static Builder builder(String apiKey, String token, String userID) {
@@ -38,22 +60,25 @@ public static Builder builder(String apiKey, Token token, String userID) {
3860
return new Builder(apiKey, token, userID);
3961
}
4062

41-
public CompletableFuture<OGData> openGraph(URL url) throws StreamException {
42-
return stream.openGraph(token, url);
63+
public static Builder builder(String apiKey, Token token, String userID, String appID) {
64+
return new Builder(apiKey, token, userID, appID);
4365
}
4466

4567
public static final class Builder {
4668
private static final String DEFAULT_HOST = "stream-io-api.com";
69+
private static final String DEFAULT_FAYE_URL = "https://faye-us-east.stream-io-api.com/faye";
4770

4871
private final String apiKey;
4972
private final Token token;
5073
private final String userID;
74+
private final String appID;
5175
private HTTPClient httpClient;
5276

5377
private String scheme = "https";
5478
private String region = Region.US_EAST.toString();
5579
private String host = DEFAULT_HOST;
5680
private int port = 443;
81+
private String fayeURL = DEFAULT_FAYE_URL;
5782

5883
public Builder(String apiKey, Token token, String userID) {
5984
checkNotNull(apiKey, "API key can't be null");
@@ -64,6 +89,19 @@ public Builder(String apiKey, Token token, String userID) {
6489
this.apiKey = apiKey;
6590
this.token = token;
6691
this.userID = userID;
92+
this.appID = null;
93+
}
94+
95+
public Builder(String apiKey, Token token, String userID, String appID) {
96+
checkNotNull(apiKey, "API key can't be null");
97+
checkNotNull(token, "Token can't be null");
98+
checkNotNull(userID, "User ID can't be null");
99+
checkArgument(!apiKey.isEmpty(), "API key can't be empty");
100+
checkArgument(!userID.isEmpty(), "User ID can't be empty");
101+
this.apiKey = apiKey;
102+
this.token = token;
103+
this.userID = userID;
104+
this.appID = appID;
67105
}
68106

69107
public Builder httpClient(HTTPClient httpClient) {
@@ -105,6 +143,13 @@ public Builder region(String region) {
105143
return this;
106144
}
107145

146+
public Builder fayeURL(String fayeURL) {
147+
checkNotNull(fayeURL, "FayeUrl can't be null");
148+
checkArgument(!fayeURL.isEmpty(), "FayeUrl can't be empty");
149+
this.fayeURL = fayeURL;
150+
return this;
151+
}
152+
108153
private String buildHost() {
109154
final StringBuilder sb = new StringBuilder();
110155
if (host.equals(DEFAULT_HOST)) {
@@ -118,15 +163,97 @@ public CloudClient build() throws MalformedURLException {
118163
if (httpClient == null) {
119164
httpClient = new OKHTTPClientAdapter();
120165
}
166+
121167
return new CloudClient(
122-
apiKey, token, userID, new URL(scheme, buildHost(), port, ""), httpClient);
168+
apiKey,
169+
token,
170+
userID,
171+
appID,
172+
new URL(scheme, buildHost(), port, ""),
173+
httpClient,
174+
new URL(DEFAULT_FAYE_URL));
175+
}
176+
}
177+
178+
private static class FeedSubscription {
179+
private String token;
180+
private String userId;
181+
private ChannelSubscription channelSubscription;
182+
183+
private FeedSubscription(String token, String userId) {
184+
this.token = token;
185+
this.userId = userId;
186+
}
187+
188+
private FeedSubscription(String token, String userId, ChannelSubscription subscription) {
189+
this.token = token;
190+
this.userId = userId;
191+
this.channelSubscription = subscription;
192+
}
193+
}
194+
195+
private final Map<String, FeedSubscription> feedSubscriptions = new HashMap<>();
196+
197+
private class FayeMessageTransformer extends DefaultMessageTransformer {
198+
@Override
199+
public Message transformRequest(Message message) {
200+
final String subscription = message.getSubscription();
201+
if (feedSubscriptions.containsKey(subscription)) {
202+
final FeedSubscription feedSubscription = feedSubscriptions.get(subscription);
203+
final Map<String, Object> ext = new HashMap<>();
204+
ext.put("user_id", feedSubscription.userId);
205+
ext.put("api_key", apiKey);
206+
ext.put("signature", feedSubscription.token);
207+
message.setExt(ext);
208+
}
209+
return message;
123210
}
124211
}
125212

126213
public <T> T getHTTPClientImplementation() {
127214
return stream.getHTTPClientImplementation();
128215
}
129216

217+
public CompletableFuture<OGData> openGraph(URL url) throws StreamException {
218+
return stream.openGraph(token, url);
219+
}
220+
221+
private CompletableFuture<ChannelSubscription> feedSubscriber(
222+
FeedID feedId, RealtimeMessageCallback messageCallback) {
223+
final CompletableFuture<ChannelSubscription> subscriberCompletion = new CompletableFuture<>();
224+
try {
225+
checkNotNull(appID, "Missing app id, which is needed in order to subscribe feed");
226+
final String claim = feedId.getClaim();
227+
final String notificationChannel = "site" + "-" + appID + "-" + "feed" + "-" + claim;
228+
final FeedSubscription subscription =
229+
new FeedSubscription(token.toString(), notificationChannel);
230+
feedSubscriptions.put("/" + notificationChannel, subscription);
231+
232+
final ChannelSubscription channelSubscription =
233+
faye.subscribe(
234+
"/" + notificationChannel,
235+
data -> {
236+
try {
237+
final byte[] payload = Serialization.toJSON(data);
238+
final RealtimeMessage message =
239+
Serialization.fromJSON(new String(payload), RealtimeMessage.class);
240+
messageCallback.onMessage(message);
241+
} catch (Exception e) {
242+
e.printStackTrace();
243+
}
244+
},
245+
() -> feedSubscriptions.remove("/" + notificationChannel))
246+
.get();
247+
248+
subscription.channelSubscription = channelSubscription;
249+
feedSubscriptions.put("/" + notificationChannel, subscription);
250+
subscriberCompletion.complete(channelSubscription);
251+
} catch (Exception e) {
252+
subscriberCompletion.completeExceptionally(e);
253+
}
254+
return subscriberCompletion;
255+
}
256+
130257
// TODO: add personalized feed versions
131258
public CloudFlatFeed flatFeed(String slug) {
132259
return flatFeed(slug, userID);
@@ -141,7 +268,7 @@ public CloudFlatFeed flatFeed(String slug, String userID) {
141268
}
142269

143270
public CloudFlatFeed flatFeed(FeedID id) {
144-
return new CloudFlatFeed(this, id);
271+
return new CloudFlatFeed(this, id, this::feedSubscriber);
145272
}
146273

147274
public CloudAggregatedFeed aggregatedFeed(String slug) {
@@ -157,7 +284,7 @@ public CloudAggregatedFeed aggregatedFeed(String slug, String userID) {
157284
}
158285

159286
public CloudAggregatedFeed aggregatedFeed(FeedID id) {
160-
return new CloudAggregatedFeed(this, id);
287+
return new CloudAggregatedFeed(this, id, this::feedSubscriber);
161288
}
162289

163290
public CloudNotificationFeed notificationFeed(String slug) {
@@ -173,7 +300,7 @@ public CloudNotificationFeed notificationFeed(String slug, String userID) {
173300
}
174301

175302
public CloudNotificationFeed notificationFeed(FeedID id) {
176-
return new CloudNotificationFeed(this, id);
303+
return new CloudNotificationFeed(this, id, this::feedSubscriber);
177304
}
178305

179306
public CloudUser user(String userID) {

src/main/java/io/getstream/cloud/CloudFeed.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.google.common.collect.Iterables;
88
import io.getstream.core.exceptions.StreamException;
9+
import io.getstream.core.faye.subscription.ChannelSubscription;
910
import io.getstream.core.http.Response;
1011
import io.getstream.core.models.Activity;
1112
import io.getstream.core.models.FeedID;
@@ -26,19 +27,36 @@
2627
public class CloudFeed {
2728
private final CloudClient client;
2829
private final FeedID id;
30+
private final FeedSubscriber subscriber;
2931

3032
CloudFeed(CloudClient client, FeedID id) {
3133
checkNotNull(client, "Can't create feed w/o a client");
3234
checkNotNull(id, "Can't create feed w/o an ID");
3335

3436
this.client = client;
3537
this.id = id;
38+
this.subscriber = null;
39+
}
40+
41+
CloudFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
42+
checkNotNull(client, "Can't create feed w/o a client");
43+
checkNotNull(id, "Can't create feed w/o an ID");
44+
45+
this.client = client;
46+
this.id = id;
47+
this.subscriber = subscriber;
3648
}
3749

3850
protected final CloudClient getClient() {
3951
return client;
4052
}
4153

54+
public final CompletableFuture<ChannelSubscription> subscribe(
55+
RealtimeMessageCallback messageCallback) {
56+
checkNotNull(subscriber, "A subscriber must be provided in order to start listening to a feed");
57+
return subscriber.subscribe(id, messageCallback);
58+
}
59+
4260
public final FeedID getID() {
4361
return id;
4462
}

src/main/java/io/getstream/cloud/CloudFlatFeed.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ public final class CloudFlatFeed extends CloudFeed {
1919
super(client, id);
2020
}
2121

22+
CloudFlatFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
23+
super(client, id, subscriber);
24+
}
25+
2226
public CompletableFuture<List<Activity>> getActivities() throws StreamException {
2327
return getActivities(
2428
DefaultOptions.DEFAULT_LIMIT,

src/main/java/io/getstream/cloud/CloudNotificationFeed.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ public final class CloudNotificationFeed extends CloudFeed {
1717
super(client, id);
1818
}
1919

20+
CloudNotificationFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
21+
super(client, id, subscriber);
22+
}
23+
2024
public CompletableFuture<PaginatedNotificationGroup<Activity>> getActivities()
2125
throws StreamException {
2226
return getActivities(
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.getstream.cloud;
2+
3+
import io.getstream.core.faye.subscription.ChannelSubscription;
4+
import io.getstream.core.models.FeedID;
5+
import java8.util.concurrent.CompletableFuture;
6+
7+
public interface FeedSubscriber {
8+
CompletableFuture<ChannelSubscription> subscribe(
9+
FeedID feedID, RealtimeMessageCallback messageCallback);
10+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.getstream.cloud;
2+
3+
import io.getstream.core.models.RealtimeMessage;
4+
5+
public interface RealtimeMessageCallback {
6+
void onMessage(RealtimeMessage message);
7+
}

0 commit comments

Comments
 (0)