Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,85 @@ class RealtimeChannel {
return pushEvent;
}

/// Sends a broadcast message explicitly via REST API.
///
/// This method always uses the REST API endpoint regardless of WebSocket connection state.
/// Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
///
/// [event] is the name of the broadcast event.
/// [payload] is the payload to be sent (required).
/// [timeout] is an optional timeout duration.
///
/// Returns a [Future] that resolves when the message is sent successfully,
/// or throws an error if the message fails to send.
///
/// ```dart
/// try {
/// await channel.httpSend(
/// event: 'cursor-pos',
/// payload: {'x': 123, 'y': 456},
/// );
/// } catch (e) {
/// print('Failed to send message: $e');
/// }
/// ```
Future<void> httpSend({
required String event,
required Map<String, dynamic> payload,
Duration? timeout,
}) async {
final headers = <String, String>{
'Content-Type': 'application/json',
if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!,
...socket.headers,
if (socket.accessToken != null)
'Authorization': 'Bearer ${socket.accessToken}',
};

final body = {
'messages': [
{
'topic': subTopic,
'event': event,
'payload': payload,
'private': _private,
}
]
};

try {
final res = await (socket.httpClient?.post ?? post)(
Uri.parse(broadcastEndpointURL),
headers: headers,
body: json.encode(body),
).timeout(
timeout ?? _timeout,
onTimeout: () => throw TimeoutException('Request timeout'),
);

if (res.statusCode == 202) {
return;
}

String errorMessage = res.reasonPhrase ?? 'Unknown error';
try {
final errorBody = json.decode(res.body) as Map<String, dynamic>;
errorMessage = (errorBody['error'] ??
errorBody['message'] ??
errorMessage) as String;
} catch (_) {
// If JSON parsing fails, use the default error message
}

throw Exception(errorMessage);
} catch (e) {
if (e is TimeoutException) {
rethrow;
}
throw Exception(e.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im unsure what the reason for this catch block is. I would just remove it as it either just rethrows the timeout or stringifies the exception.

}
}

/// Sends a realtime broadcast message.
Future<ChannelResponse> sendBroadcastMessage({
required String event,
Expand Down Expand Up @@ -531,6 +610,13 @@ class RealtimeChannel {
}

if (!canPush && type == RealtimeListenTypes.broadcast) {
socket.log(
'channel',
'send() is automatically falling back to REST API. '
'This behavior will be deprecated in the future. '
'Please use httpSend() explicitly for REST delivery.',
);

final headers = <String, String>{
'Content-Type': 'application/json',
if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!,
Expand Down
112 changes: 112 additions & 0 deletions packages/realtime_client/test/channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -616,4 +616,116 @@ void main() {
expect(channel.params['config']['presence']['enabled'], isTrue);
});
});

group('httpSend', () {
late HttpServer mockServer;

setUp(() async {
mockServer = await HttpServer.bind('localhost', 0);
});

tearDown(() async {
await mockServer.close();
});

test('sends message via http endpoint with correct headers and payload',
() async {
socket = RealtimeClient(
'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1',
headers: {'apikey': 'supabaseKey'},
params: {'apikey': 'supabaseKey'},
);
channel =
socket.channel('myTopic', const RealtimeChannelConfig(private: true));

final requestFuture = mockServer.first;
final sendFuture =
channel.httpSend(event: 'test', payload: {'myKey': 'myValue'});

final req = await requestFuture;
expect(req.uri.toString(), '/realtime/v1/api/broadcast');
expect(req.headers.value('apikey'), 'supabaseKey');

final body = json.decode(await utf8.decodeStream(req));
final message = body['messages'][0];
expect(message['topic'], 'myTopic');
expect(message['event'], 'test');
expect(message['payload'], {'myKey': 'myValue'});
expect(message['private'], true);

req.response.statusCode = 202;
await req.response.close();

await sendFuture;
});

test('sends with Authorization header when access token is set', () async {
socket = RealtimeClient(
'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1',
params: {'apikey': 'abc123'},
customAccessToken: () async => 'token123',
);
await socket.setAuth('token123');
channel = socket.channel('topic');

final requestFuture = mockServer.first;
final sendFuture =
channel.httpSend(event: 'test', payload: {'data': 'test'});

final req = await requestFuture;
expect(req.headers.value('Authorization'), 'Bearer token123');
expect(req.headers.value('apikey'), 'abc123');

req.response.statusCode = 202;
await req.response.close();

await sendFuture;
});

test('throws error on non-202 status', () async {
socket = RealtimeClient(
'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1',
params: {'apikey': 'abc123'},
);
channel = socket.channel('topic');

final requestFuture = mockServer.first;
final sendFuture =
channel.httpSend(event: 'test', payload: {'data': 'test'});

final req = await requestFuture;
req.response.statusCode = 500;
req.response.write(json.encode({'error': 'Server error'}));
await req.response.close();

await expectLater(
sendFuture,
throwsA(predicate((e) => e.toString().contains('Server error'))),
);
});

test('handles timeout', () async {
socket = RealtimeClient(
'ws://${mockServer.address.host}:${mockServer.port}/realtime/v1',
params: {'apikey': 'abc123'},
);
channel = socket.channel('topic');

// Don't await the server - let it hang to trigger timeout
mockServer.first.then((req) async {
await Future.delayed(const Duration(seconds: 1));
req.response.statusCode = 202;
await req.response.close();
});

await expectLater(
channel.httpSend(
event: 'test',
payload: {'data': 'test'},
timeout: const Duration(milliseconds: 100),
),
throwsA(isA<TimeoutException>()),
);
});
});
}