diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 8710a77fb..a79b82c50 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -504,6 +504,85 @@ class RealtimeChannel { return pushEvent; } + /// Sends a broadcast message explicitly via REST API. + /// + /// This method always uses the REST API endpoint regardless of WebSocket connection state. + /// Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback. + /// + /// [event] is the name of the broadcast event. + /// [payload] is the payload to be sent (required). + /// [timeout] is an optional timeout duration. + /// + /// Returns a [Future] that resolves when the message is sent successfully, + /// or throws an error if the message fails to send. + /// + /// ```dart + /// try { + /// await channel.httpSend( + /// event: 'cursor-pos', + /// payload: {'x': 123, 'y': 456}, + /// ); + /// } catch (e) { + /// print('Failed to send message: $e'); + /// } + /// ``` + Future httpSend({ + required String event, + required Map payload, + Duration? timeout, + }) async { + final headers = { + 'Content-Type': 'application/json', + if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!, + ...socket.headers, + if (socket.accessToken != null) + 'Authorization': 'Bearer ${socket.accessToken}', + }; + + final body = { + 'messages': [ + { + 'topic': subTopic, + 'event': event, + 'payload': payload, + 'private': _private, + } + ] + }; + + try { + final res = await (socket.httpClient?.post ?? post)( + Uri.parse(broadcastEndpointURL), + headers: headers, + body: json.encode(body), + ).timeout( + timeout ?? _timeout, + onTimeout: () => throw TimeoutException('Request timeout'), + ); + + if (res.statusCode == 202) { + return; + } + + String errorMessage = res.reasonPhrase ?? 'Unknown error'; + try { + final errorBody = json.decode(res.body) as Map; + errorMessage = (errorBody['error'] ?? + errorBody['message'] ?? + errorMessage) as String; + } catch (_) { + // If JSON parsing fails, use the default error message + } + + throw Exception(errorMessage); + } catch (e) { + if (e is TimeoutException) { + rethrow; + } + throw Exception(e.toString()); + } + } + /// Sends a realtime broadcast message. Future sendBroadcastMessage({ required String event, @@ -531,6 +610,13 @@ class RealtimeChannel { } if (!canPush && type == RealtimeListenTypes.broadcast) { + socket.log( + 'channel', + 'send() is automatically falling back to REST API. ' + 'This behavior will be deprecated in the future. ' + 'Please use httpSend() explicitly for REST delivery.', + ); + final headers = { 'Content-Type': 'application/json', if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!, diff --git a/packages/realtime_client/test/channel_test.dart b/packages/realtime_client/test/channel_test.dart index 8d8733d86..ab1ea7cef 100644 --- a/packages/realtime_client/test/channel_test.dart +++ b/packages/realtime_client/test/channel_test.dart @@ -616,4 +616,116 @@ void main() { expect(channel.params['config']['presence']['enabled'], isTrue); }); }); + + group('httpSend', () { + late HttpServer mockServer; + + setUp(() async { + mockServer = await HttpServer.bind('localhost', 0); + }); + + tearDown(() async { + await mockServer.close(); + }); + + test('sends message via http endpoint with correct headers and payload', + () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + headers: {'apikey': 'supabaseKey'}, + params: {'apikey': 'supabaseKey'}, + ); + channel = + socket.channel('myTopic', const RealtimeChannelConfig(private: true)); + + final requestFuture = mockServer.first; + final sendFuture = + channel.httpSend(event: 'test', payload: {'myKey': 'myValue'}); + + final req = await requestFuture; + expect(req.uri.toString(), '/realtime/v1/api/broadcast'); + expect(req.headers.value('apikey'), 'supabaseKey'); + + final body = json.decode(await utf8.decodeStream(req)); + final message = body['messages'][0]; + expect(message['topic'], 'myTopic'); + expect(message['event'], 'test'); + expect(message['payload'], {'myKey': 'myValue'}); + expect(message['private'], true); + + req.response.statusCode = 202; + await req.response.close(); + + await sendFuture; + }); + + test('sends with Authorization header when access token is set', () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + params: {'apikey': 'abc123'}, + customAccessToken: () async => 'token123', + ); + await socket.setAuth('token123'); + channel = socket.channel('topic'); + + final requestFuture = mockServer.first; + final sendFuture = + channel.httpSend(event: 'test', payload: {'data': 'test'}); + + final req = await requestFuture; + expect(req.headers.value('Authorization'), 'Bearer token123'); + expect(req.headers.value('apikey'), 'abc123'); + + req.response.statusCode = 202; + await req.response.close(); + + await sendFuture; + }); + + test('throws error on non-202 status', () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + params: {'apikey': 'abc123'}, + ); + channel = socket.channel('topic'); + + final requestFuture = mockServer.first; + final sendFuture = + channel.httpSend(event: 'test', payload: {'data': 'test'}); + + final req = await requestFuture; + req.response.statusCode = 500; + req.response.write(json.encode({'error': 'Server error'})); + await req.response.close(); + + await expectLater( + sendFuture, + throwsA(predicate((e) => e.toString().contains('Server error'))), + ); + }); + + test('handles timeout', () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + params: {'apikey': 'abc123'}, + ); + channel = socket.channel('topic'); + + // Don't await the server - let it hang to trigger timeout + mockServer.first.then((req) async { + await Future.delayed(const Duration(seconds: 1)); + req.response.statusCode = 202; + await req.response.close(); + }); + + await expectLater( + channel.httpSend( + event: 'test', + payload: {'data': 'test'}, + timeout: const Duration(milliseconds: 100), + ), + throwsA(isA()), + ); + }); + }); }