From 74dc57fd52dd75830e3521444bc67fe13650afc1 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 9 Oct 2025 13:55:35 -0300 Subject: [PATCH] feat(realtime): add explicit REST API call method for broadcast messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added new `httpSend` method to RealtimeChannel class that explicitly uses the REST API endpoint for sending broadcast messages, regardless of WebSocket connection state. Changes: - Added `httpSend` method with proper error handling and timeout support - Added deprecation warning to `send` method when falling back to REST - Comprehensive test coverage for the new method This addresses the issue where users may unknowingly use REST API fallback when WebSocket is not connected. The new method provides explicit control over message delivery mechanism. Ported from: https://github.com/supabase/supabase-js/pull/1751 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../lib/src/realtime_channel.dart | 86 ++++++++++++++ .../realtime_client/test/channel_test.dart | 112 ++++++++++++++++++ 2 files changed, 198 insertions(+) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 8710a77fb..a79b82c50 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -504,6 +504,85 @@ class RealtimeChannel { return pushEvent; } + /// Sends a broadcast message explicitly via REST API. + /// + /// This method always uses the REST API endpoint regardless of WebSocket connection state. + /// Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback. + /// + /// [event] is the name of the broadcast event. + /// [payload] is the payload to be sent (required). + /// [timeout] is an optional timeout duration. + /// + /// Returns a [Future] that resolves when the message is sent successfully, + /// or throws an error if the message fails to send. + /// + /// ```dart + /// try { + /// await channel.httpSend( + /// event: 'cursor-pos', + /// payload: {'x': 123, 'y': 456}, + /// ); + /// } catch (e) { + /// print('Failed to send message: $e'); + /// } + /// ``` + Future httpSend({ + required String event, + required Map payload, + Duration? timeout, + }) async { + final headers = { + 'Content-Type': 'application/json', + if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!, + ...socket.headers, + if (socket.accessToken != null) + 'Authorization': 'Bearer ${socket.accessToken}', + }; + + final body = { + 'messages': [ + { + 'topic': subTopic, + 'event': event, + 'payload': payload, + 'private': _private, + } + ] + }; + + try { + final res = await (socket.httpClient?.post ?? post)( + Uri.parse(broadcastEndpointURL), + headers: headers, + body: json.encode(body), + ).timeout( + timeout ?? _timeout, + onTimeout: () => throw TimeoutException('Request timeout'), + ); + + if (res.statusCode == 202) { + return; + } + + String errorMessage = res.reasonPhrase ?? 'Unknown error'; + try { + final errorBody = json.decode(res.body) as Map; + errorMessage = (errorBody['error'] ?? + errorBody['message'] ?? + errorMessage) as String; + } catch (_) { + // If JSON parsing fails, use the default error message + } + + throw Exception(errorMessage); + } catch (e) { + if (e is TimeoutException) { + rethrow; + } + throw Exception(e.toString()); + } + } + /// Sends a realtime broadcast message. Future sendBroadcastMessage({ required String event, @@ -531,6 +610,13 @@ class RealtimeChannel { } if (!canPush && type == RealtimeListenTypes.broadcast) { + socket.log( + 'channel', + 'send() is automatically falling back to REST API. ' + 'This behavior will be deprecated in the future. ' + 'Please use httpSend() explicitly for REST delivery.', + ); + final headers = { 'Content-Type': 'application/json', if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!, diff --git a/packages/realtime_client/test/channel_test.dart b/packages/realtime_client/test/channel_test.dart index 8d8733d86..ab1ea7cef 100644 --- a/packages/realtime_client/test/channel_test.dart +++ b/packages/realtime_client/test/channel_test.dart @@ -616,4 +616,116 @@ void main() { expect(channel.params['config']['presence']['enabled'], isTrue); }); }); + + group('httpSend', () { + late HttpServer mockServer; + + setUp(() async { + mockServer = await HttpServer.bind('localhost', 0); + }); + + tearDown(() async { + await mockServer.close(); + }); + + test('sends message via http endpoint with correct headers and payload', + () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + headers: {'apikey': 'supabaseKey'}, + params: {'apikey': 'supabaseKey'}, + ); + channel = + socket.channel('myTopic', const RealtimeChannelConfig(private: true)); + + final requestFuture = mockServer.first; + final sendFuture = + channel.httpSend(event: 'test', payload: {'myKey': 'myValue'}); + + final req = await requestFuture; + expect(req.uri.toString(), '/realtime/v1/api/broadcast'); + expect(req.headers.value('apikey'), 'supabaseKey'); + + final body = json.decode(await utf8.decodeStream(req)); + final message = body['messages'][0]; + expect(message['topic'], 'myTopic'); + expect(message['event'], 'test'); + expect(message['payload'], {'myKey': 'myValue'}); + expect(message['private'], true); + + req.response.statusCode = 202; + await req.response.close(); + + await sendFuture; + }); + + test('sends with Authorization header when access token is set', () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + params: {'apikey': 'abc123'}, + customAccessToken: () async => 'token123', + ); + await socket.setAuth('token123'); + channel = socket.channel('topic'); + + final requestFuture = mockServer.first; + final sendFuture = + channel.httpSend(event: 'test', payload: {'data': 'test'}); + + final req = await requestFuture; + expect(req.headers.value('Authorization'), 'Bearer token123'); + expect(req.headers.value('apikey'), 'abc123'); + + req.response.statusCode = 202; + await req.response.close(); + + await sendFuture; + }); + + test('throws error on non-202 status', () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + params: {'apikey': 'abc123'}, + ); + channel = socket.channel('topic'); + + final requestFuture = mockServer.first; + final sendFuture = + channel.httpSend(event: 'test', payload: {'data': 'test'}); + + final req = await requestFuture; + req.response.statusCode = 500; + req.response.write(json.encode({'error': 'Server error'})); + await req.response.close(); + + await expectLater( + sendFuture, + throwsA(predicate((e) => e.toString().contains('Server error'))), + ); + }); + + test('handles timeout', () async { + socket = RealtimeClient( + 'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1', + params: {'apikey': 'abc123'}, + ); + channel = socket.channel('topic'); + + // Don't await the server - let it hang to trigger timeout + mockServer.first.then((req) async { + await Future.delayed(const Duration(seconds: 1)); + req.response.statusCode = 202; + await req.response.close(); + }); + + await expectLater( + channel.httpSend( + event: 'test', + payload: {'data': 'test'}, + timeout: const Duration(milliseconds: 100), + ), + throwsA(isA()), + ); + }); + }); }