Skip to content

Commit 4151875

Browse files
committed
Connect the websocket
1 parent e769e88 commit 4151875

File tree

5 files changed

+173
-11
lines changed

5 files changed

+173
-11
lines changed

packages/stream_feeds/lib/src/feeds_client.dart

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
import 'package:stream_core/src/user/user.dart';
1+
import 'dart:async';
2+
3+
import 'package:stream_core/stream_core.dart';
24
import 'package:uuid/uuid.dart';
35

46
import '../stream_feeds.dart';
57
import 'generated/api/api.g.dart' as api;
68
import 'repositories.dart';
9+
import 'utils/endpoint_config.dart';
10+
import 'ws/feeds_ws_event.dart';
711

812
class FeedsClient {
913
FeedsClient({
@@ -12,17 +16,33 @@ class FeedsClient {
1216
required this.userToken,
1317
this.config = const FeedsConfig(),
1418
this.userTokenProvider,
19+
this.networkMonitor,
1520
}) {
1621
apiClient = api.DefaultApi(
1722
api.ApiClient(
23+
basePath: endpointConfig.baseFeedsUrl,
1824
authentication: _Authentication(
1925
apiKey: apiKey,
2026
user: user,
2127
getToken: () async => userToken,
22-
getConnectionId: () => null,
28+
getConnectionId: () => webSocketClient?.connectionId,
2329
),
2430
),
2531
);
32+
final websocketUri = Uri.parse(endpointConfig.wsEndpoint).replace(
33+
queryParameters: <String, String>{
34+
'api_key': apiKey,
35+
'stream-auth-type': 'jwt',
36+
'X-Stream-Client': 'stream-feeds-dart',
37+
},
38+
);
39+
40+
webSocketClient = WebSocketClient(
41+
url: websocketUri.toString(),
42+
eventDecoder: FeedsWsEvent.fromEventObject,
43+
onConnectionEstablished: _authenticate,
44+
);
45+
2646
feedsRepository = FeedsRepository(apiClient: apiClient);
2747
}
2848

@@ -31,10 +51,74 @@ class FeedsClient {
3151
final String userToken;
3252
final FeedsConfig config;
3353
final UserTokenProvider? userTokenProvider;
54+
final NetworkMonitor? networkMonitor;
3455

3556
late final api.DefaultApi apiClient;
3657
late final FeedsRepository feedsRepository;
3758

59+
static final endpointConfig = EndpointConfig.production;
60+
late final WebSocketClient webSocketClient;
61+
ConnectionRecoveryHandler? connectionRecoveryHandler;
62+
63+
Completer<void>? _connectionCompleter;
64+
StreamSubscription<WebSocketConnectionState>? _connectionSubscription;
65+
66+
/// Connects to the feeds websocket.
67+
/// Future will complete when the connection is established and the user is authenticated.
68+
/// If the authentication fails, the future will complete with an error.
69+
Future<void> connect() async {
70+
webSocketClient.connect();
71+
72+
_connectionSubscription = webSocketClient!.connectionStateStream
73+
.listen(_onConnectionStateChanged);
74+
75+
connectionRecoveryHandler = DefaultConnectionRecoveryHandler(
76+
client: webSocketClient,
77+
networkMonitor: networkMonitor,
78+
);
79+
80+
_connectionCompleter = Completer<void>();
81+
return _connectionCompleter!.future;
82+
}
83+
84+
/// Disconnects from the feeds websocket.
85+
/// The FeedsClient should no longer be used after calling this method.
86+
void disconnect() {
87+
connectionRecoveryHandler?.dispose();
88+
webSocketClient.disconnect();
89+
_connectionSubscription?.cancel();
90+
_connectionCompleter?.complete();
91+
_connectionCompleter = null;
92+
}
93+
94+
void _onConnectionStateChanged(WebSocketConnectionState state) {
95+
if (_connectionCompleter != null) {
96+
if (state is Connected) {
97+
_connectionCompleter!.complete();
98+
_connectionCompleter = null;
99+
}
100+
if (state is Disconnected) {
101+
_connectionCompleter!.completeError(Exception('Connection failed'));
102+
_connectionCompleter = null;
103+
}
104+
}
105+
}
106+
107+
void _authenticate() {
108+
final connectUserRequest = WsAuthMessageRequest(
109+
products: ['feeds'],
110+
token: userToken,
111+
userDetails: ConnectUserDetailsRequest(
112+
id: user.id,
113+
name: user.originalName,
114+
image: user.imageUrl,
115+
customData: user.customData,
116+
),
117+
);
118+
119+
webSocketClient.send(connectUserRequest);
120+
}
121+
38122
/// Creates a feed instance based on the provided query.
39123
///
40124
/// This method creates a [Feed] object using a [FeedQuery] that can include additional

packages/stream_feeds/lib/src/state_layer/feed_query.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class FeedQuery {
99
FeedQuery({
1010
required String group,
1111
required String id,
12-
this.watch = false,
12+
this.watch = true,
1313
}) : fid = FeedId(group: group, id: id);
1414

1515
/// The unique identifier for the feed.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
enum EndpointConfig {
2+
localhost(
3+
hostname: 'http://localhost:3030',
4+
wsEndpoint: 'ws://localhost:8800/api/v2/connect',
5+
),
6+
staging(
7+
hostname: 'https://chat-edge-frankfurt-ce1.stream-io-api.com',
8+
wsEndpoint:
9+
'wss://chat-edge-frankfurt-ce1.stream-io-api.com/api/v2/connect',
10+
),
11+
production(
12+
hostname: 'https://feeds.stream-io-api.com',
13+
wsEndpoint: 'wss://feeds.stream-io-api.com/api/v2/connect',
14+
);
15+
16+
final String hostname;
17+
final String wsEndpoint;
18+
final String baseFeedsUrl;
19+
20+
const EndpointConfig({
21+
required this.hostname,
22+
required this.wsEndpoint,
23+
String? baseFeedsUrl,
24+
}) : baseFeedsUrl = baseFeedsUrl ?? hostname;
25+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// ignore_for_file: prefer_single_quotes
2+
3+
import 'dart:convert';
4+
5+
import 'package:stream_core/stream_core.dart' as core;
6+
7+
import '../generated/api/api.g.dart' as api;
8+
9+
class FeedsWsEvent extends core.WsEvent {
10+
const FeedsWsEvent(this.event);
11+
12+
final api.WSClientEvent? event;
13+
14+
static core.WsEvent fromEventObject(Object message) {
15+
try {
16+
final json = jsonDecode(message.toString()) as Map<String, dynamic>;
17+
final type = json['type'];
18+
switch (type) {
19+
case "connection.ok":
20+
return core.HealthCheckPongEvent(
21+
healthCheckInfo: core.HealthCheckInfo(
22+
connectionId: json['connection_id'],
23+
),
24+
);
25+
case 'connection.error':
26+
return core.WsErrorEvent(error: json['error'], message: message);
27+
default:
28+
final event = api.WSClientEvent.fromJson(json);
29+
30+
if (event is api.WSClientEventHealthCheckEvent) {
31+
return core.HealthCheckPongEvent(
32+
healthCheckInfo: core.HealthCheckInfo(
33+
connectionId: event.healthCheckEvent.connectionId,
34+
),
35+
);
36+
}
37+
print(type);
38+
return FeedsWsEvent(event);
39+
}
40+
} catch (e) {
41+
return core.WsErrorEvent(error: e, message: message);
42+
}
43+
}
44+
}

sample_app/lib/navigation/app_state.dart

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,32 @@ class AppStateProvider extends ValueNotifier<AppState> {
2424
final userId = _prefs.getString('user_id');
2525
if (userId != null) {
2626
final credentials = UserCredentials.credentialsFor(userId);
27-
setUser(credentials);
27+
await setUser(credentials);
2828
} else {
2929
value = LoggedOutState();
3030
}
3131
}
3232

33-
void setUser(UserCredentials userCredentials) {
34-
_prefs.setString('user_id', userCredentials.user.id);
33+
Future<void> setUser(UserCredentials userCredentials) async {
34+
await _prefs.setString('user_id', userCredentials.user.id);
35+
value = LoadingState();
36+
final client = FeedsClient(
37+
apiKey: DemoAppConfig.current.apiKey,
38+
user: userCredentials.user,
39+
userToken: userCredentials.token,
40+
);
41+
await client.connect();
42+
3543
value = LoggedInState(
36-
feedsClient: FeedsClient(
37-
apiKey: DemoAppConfig.current.apiKey,
38-
user: userCredentials.user,
39-
userToken: userCredentials.token,
40-
),
44+
feedsClient: client,
4145
);
4246
}
4347

4448
void clearUserId() {
4549
_prefs.remove('user_id');
50+
if (value is LoggedInState) {
51+
(value as LoggedInState).feedsClient.disconnect();
52+
}
4653
value = LoggedOutState();
4754
}
4855
}
@@ -53,6 +60,8 @@ class InitialState extends AppState {}
5360

5461
class LoggedOutState extends AppState {}
5562

63+
class LoadingState extends AppState {}
64+
5665
class LoggedInState extends AppState {
5766
LoggedInState({required this.feedsClient});
5867

0 commit comments

Comments
 (0)