Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/rsocket-core/src/RSocketMachine.js
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
};

_handleConnectionError(error: Error): void {
this._handleError(error);
this._connection.close();
this._connection.close(error);
const errorHandler = this._errorHandler;
if (errorHandler) {
errorHandler(error);
Expand Down
21 changes: 13 additions & 8 deletions packages/rsocket-core/src/RSocketResumableTransport.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection {
this._statusSubscribers = new Set();
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down Expand Up @@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection {
if (this._isTerminated()) {
return;
}
if (error) {
this._setConnectionStatus({error, kind: 'ERROR'});
} else {
this._setConnectionStatus(CONNECTION_STATUS.CLOSED);
}

const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED;
this._setConnectionStatus(status);

const receivers = this._receivers;
receivers.forEach(r => r.onComplete());
receivers.forEach(subscriber => {
if (error) {
subscriber.onError(error);
} else {
subscriber.onComplete();
}
});
receivers.clear();

const senders = this._senders;
Expand Down
4 changes: 2 additions & 2 deletions packages/rsocket-core/src/ReassemblyDuplexConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection {
.lift(actual => new ReassemblySubscriber(actual));
}

close(): void {
this._source.close();
close(error?: Error): void {
this._source.close(error);
}

connect(): void {
Expand Down
8 changes: 6 additions & 2 deletions packages/rsocket-core/src/__mocks__/MockDuplexConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ export function genMockConnection() {
let closed = false;

const connection = {
close: jest.fn(() => {
connection.mock.close();
close: jest.fn(error => {
if (error) {
connection.mock.closeWithError(error);
} else {
connection.mock.close();
}
}),
connect: jest.fn(),
connectionStatus: jest.fn(() => status),
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-core/src/__tests__/RSocketClient-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ describe('RSocketClient', () => {
expect(errors.values().next().value).toEqual(
`No keep-alive acks for ${keepAliveTimeout} millis`,
);
expect(status.kind).toEqual('CLOSED');
expect(status.kind).toEqual('ERROR');

jest.advanceTimersByTime(keepAliveTimeout);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => {
expect(currentTransport.sendOne.mock.calls.length).toBe(0);
});
});

describe('post-connect() APIs', () => {
beforeEach(() => {
resumableTransport.connect();
currentTransport.mock.connect();
});

describe('close()', () => {
describe('given an error', () => {
it('closes the transport', () => {
resumableTransport.close(new Error());
expect(currentTransport.close.mock.calls.length).toBe(1);
});

it('sets the status to ERROR with the given error', () => {
const error = new Error();
resumableTransport.close(error);
expect(resumableStatus.kind).toBe('ERROR');
expect(resumableStatus.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
resumableTransport.receive().subscribe({onError, onSubscribe});
const error = new Error();
resumableTransport.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
});

describe('not given an error', () => {
it('closes the transport', () => {
resumableTransport.close();
expect(currentTransport.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
resumableTransport.close();
expect(resumableStatus.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
resumableTransport.receive().subscribe({onComplete, onSubscribe});
resumableTransport.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});
});
});
4 changes: 2 additions & 2 deletions packages/rsocket-tcp-client/src/RSocketTcpClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection {
}
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down
71 changes: 52 additions & 19 deletions packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => {
});

describe('close()', () => {
it('closes the socket', () => {
client.close();
expect(socket.end.mock.calls.length).toBe(1);
});
describe('given an error', () => {
it('closes the socket', () => {
client.close(new Error());
expect(socket.end.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
it('sets the status to ERROR with the given error', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
const error = new Error();
client.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onError, onSubscribe});
const error = new Error();
client.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
describe('not given an error', () => {
it('closes the socket', () => {
client.close();
expect(socket.end.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});

Expand Down
7 changes: 4 additions & 3 deletions packages/rsocket-types/src/ReactiveSocketTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ export interface DuplexConnection {
receive(): Flowable<Frame>,

/**
* Close the underlying connection, emitting `onComplete` on the receive()
* Publisher.
* Close the underlying connection, optionally providing an error as reason.
* If an error is passed, emits `onError` on the receive() Publisher.
* If no error is passed, emits `onComplete` on the receive() Publisher.
*/
close(): void,
close(error?: Error): void,

/**
* Open the underlying connection. Throws if the connection is already in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection {
this._statusSubscribers = new Set();
}

close(): void {
this._close();
close(error?: Error): void {
this._close(error);
}

connect(): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,62 @@ describe('RSocketWebSocketClient', () => {
});

describe('close()', () => {
it('closes the socket', () => {
client.close();
expect(socket.close.mock.calls.length).toBe(1);
});
describe('given an error', () => {
it('closes the socket', () => {
client.close(new Error());
expect(socket.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
it('sets the status to ERROR with the given error', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
const error = new Error();
client.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});

it('calls receive.onError with the given error', () => {
const onError = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onError, onSubscribe});
const error = new Error();
client.close(error);
expect(onError.mock.calls.length).toBe(1);
expect(onError.mock.calls[0][0]).toBe(error);
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
describe('not given an error', () => {
it('closes the socket', () => {
client.close();
expect(socket.close.mock.calls.length).toBe(1);
});

it('sets the status to CLOSED', () => {
let status;
client.connectionStatus().subscribe({
onNext: _status => (status = _status),
onSubscribe: subscription =>
subscription.request(Number.MAX_SAFE_INTEGER),
});
client.close();
expect(status.kind).toBe('CLOSED');
});

it('calls receive.onComplete', () => {
const onComplete = jest.fn();
const onSubscribe = subscription =>
subscription.request(Number.MAX_SAFE_INTEGER);
client.receive().subscribe({onComplete, onSubscribe});
client.close();
expect(onComplete.mock.calls.length).toBe(1);
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,13 @@ class WSDuplexConnection implements DuplexConnection {
});
}

close(): void {
this._socket.emit('close');
close(error?: Error): void {
if (error) {
this._socket.emit('error', error);
} else {
this._socket.emit('close');
}

this._socket.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,18 @@ describe('RSocketWebSocketServer', () => {
expect(status.error).toBe(error);
});

it('returns CLOSED if explicitly closed', () => {
it('returns CLOSED if explicitly closed with no error', () => {
connection.receive().subscribe(() => {});
connection.close();
expect(status.kind).toBe('CLOSED');
});

it('returns ERROR if explicitly closed with an error', () => {
connection.receive().subscribe(() => {});
const error = new Error();
connection.close(error);
expect(status.kind).toBe('ERROR');
expect(status.error).toBe(error);
});
});
});