|
| 1 | +/*--------------------------------------------------------------------------------------------- |
| 2 | + * Copyright (c) Gitpod. All rights reserved. |
| 3 | + * Licensed under the MIT License. See License.txt in the project root for license information. |
| 4 | + *--------------------------------------------------------------------------------------------*/ |
| 5 | + |
| 6 | +import { CancellationToken, ChannelMessage, ChannelOpenConfirmationMessage, ChannelOpenFailureMessage, ChannelRequestMessage, PromiseCompletionSource, SessionRequestFailureMessage, SessionRequestMessage, SessionRequestSuccessMessage, SshChannel, SshChannelClosedEventArgs, SshChannelError, SshChannelOpenFailureReason, SshChannelOpeningEventArgs, SshMessage, SshRequestEventArgs, SshSession, SshSessionClosedEventArgs } from '@microsoft/dev-tunnels-ssh'; |
| 7 | +import { ChannelFailureMessage, ChannelSuccessMessage } from '@microsoft/dev-tunnels-ssh/messages/connectionMessages'; |
| 8 | + |
| 9 | +// Patch of https://github.com/microsoft/dev-tunnels-ssh/blob/main/src/ts/ssh/pipeExtensions.ts |
| 10 | + |
| 11 | +export class PipeExtensions { |
| 12 | + public static async pipeSession(session: SshSession, toSession: SshSession): Promise<void> { |
| 13 | + if (!session) { throw new TypeError('Session is required.'); } |
| 14 | + if (!toSession) { throw new TypeError('Target session is required'); } |
| 15 | + |
| 16 | + const endCompletion = new PromiseCompletionSource<Promise<void>>(); |
| 17 | + |
| 18 | + session.onRequest((e) => { |
| 19 | + e.responsePromise = PipeExtensions.forwardSessionRequest(e, toSession, e.cancellation); |
| 20 | + }); |
| 21 | + toSession.onRequest((e) => { |
| 22 | + e.responsePromise = PipeExtensions.forwardSessionRequest(e, session, e.cancellation); |
| 23 | + }); |
| 24 | + |
| 25 | + session.onChannelOpening((e) => { |
| 26 | + if (e.isRemoteRequest) { |
| 27 | + e.openingPromise = PipeExtensions.forwardChannel(e, toSession, e.cancellation); |
| 28 | + } |
| 29 | + }); |
| 30 | + toSession.onChannelOpening((e) => { |
| 31 | + if (e.isRemoteRequest) { |
| 32 | + e.openingPromise = PipeExtensions.forwardChannel(e, session, e.cancellation); |
| 33 | + } |
| 34 | + }); |
| 35 | + |
| 36 | + session.onClosed((e) => { |
| 37 | + endCompletion.resolve(PipeExtensions.forwardSessionClose(toSession, e)); |
| 38 | + }); |
| 39 | + toSession.onClosed((e) => { |
| 40 | + endCompletion.resolve(PipeExtensions.forwardSessionClose(session, e)); |
| 41 | + }); |
| 42 | + |
| 43 | + const endPromise = await endCompletion.promise; |
| 44 | + await endPromise; |
| 45 | + } |
| 46 | + |
| 47 | + public static async pipeChannel(channel: SshChannel, toChannel: SshChannel): Promise<void> { |
| 48 | + if (!channel) { throw new TypeError('Channel is required.'); } |
| 49 | + if (!toChannel) { throw new TypeError('Target channel is required'); } |
| 50 | + |
| 51 | + const endCompletion = new PromiseCompletionSource<Promise<void>>(); |
| 52 | + let closed = false; |
| 53 | + |
| 54 | + channel.onRequest((e) => { |
| 55 | + e.responsePromise = PipeExtensions.forwardChannelRequest(e, toChannel, e.cancellation); |
| 56 | + }); |
| 57 | + toChannel.onRequest((e) => { |
| 58 | + e.responsePromise = PipeExtensions.forwardChannelRequest(e, channel, e.cancellation); |
| 59 | + }); |
| 60 | + |
| 61 | + channel.onDataReceived((data) => { |
| 62 | + void PipeExtensions.forwardData(channel, toChannel, data).catch(); |
| 63 | + }); |
| 64 | + toChannel.onDataReceived((data) => { |
| 65 | + void PipeExtensions.forwardData(toChannel, channel, data).catch(); |
| 66 | + }); |
| 67 | + |
| 68 | + channel.onClosed((e) => { |
| 69 | + if (!closed) { |
| 70 | + closed = true; |
| 71 | + endCompletion.resolve(PipeExtensions.forwardChannelClose(toChannel, e)); |
| 72 | + } |
| 73 | + }); |
| 74 | + toChannel.onClosed((e) => { |
| 75 | + if (!closed) { |
| 76 | + closed = true; |
| 77 | + endCompletion.resolve(PipeExtensions.forwardChannelClose(channel, e)); |
| 78 | + } |
| 79 | + }); |
| 80 | + |
| 81 | + const endTask = await endCompletion.promise; |
| 82 | + await endTask; |
| 83 | + } |
| 84 | + |
| 85 | + private static async forwardSessionRequest( |
| 86 | + e: SshRequestEventArgs<SessionRequestMessage>, |
| 87 | + toSession: SshSession, |
| 88 | + cancellation?: CancellationToken, |
| 89 | + ): Promise<SshMessage> { |
| 90 | + // `toSession.requestResponse` always set `wantReply` to `true` internally and awaits for response |
| 91 | + // but `SessionRequestMessage` has an internal cache when piped so it will send original message with `false`, |
| 92 | + // use `toSession.request` instead so it returns immeadiately |
| 93 | + if (!e.request.wantReply) { |
| 94 | + return toSession.request( |
| 95 | + e.request, |
| 96 | + cancellation, |
| 97 | + ).then(() => new SessionRequestSuccessMessage()); |
| 98 | + } |
| 99 | + return toSession.requestResponse( |
| 100 | + e.request, |
| 101 | + SessionRequestSuccessMessage, |
| 102 | + SessionRequestFailureMessage, |
| 103 | + cancellation, |
| 104 | + ); |
| 105 | + } |
| 106 | + |
| 107 | + private static async forwardChannel( |
| 108 | + e: SshChannelOpeningEventArgs, |
| 109 | + toSession: SshSession, |
| 110 | + cancellation?: CancellationToken, |
| 111 | + ): Promise<ChannelMessage> { |
| 112 | + try { |
| 113 | + const toChannel = await toSession.openChannel(e.request, null, cancellation); |
| 114 | + void PipeExtensions.pipeChannel(e.channel, toChannel).catch(); |
| 115 | + return new ChannelOpenConfirmationMessage(); |
| 116 | + } catch (err) { |
| 117 | + if (!(err instanceof Error)) { throw err; } |
| 118 | + |
| 119 | + const failureMessage = new ChannelOpenFailureMessage(); |
| 120 | + if (err instanceof SshChannelError) { |
| 121 | + failureMessage.reasonCode = err.reason ?? SshChannelOpenFailureReason.connectFailed; |
| 122 | + } else { |
| 123 | + failureMessage.reasonCode = SshChannelOpenFailureReason.connectFailed; |
| 124 | + } |
| 125 | + |
| 126 | + failureMessage.description = err.message; |
| 127 | + return failureMessage; |
| 128 | + } |
| 129 | + } |
| 130 | + |
| 131 | + private static async forwardChannelRequest( |
| 132 | + e: SshRequestEventArgs<ChannelRequestMessage>, |
| 133 | + toChannel: SshChannel, |
| 134 | + cancellation?: CancellationToken, |
| 135 | + ): Promise<SshMessage> { |
| 136 | + e.request.recipientChannel = toChannel.remoteChannelId; |
| 137 | + const result = await toChannel.request(e.request, cancellation); |
| 138 | + return result ? new ChannelSuccessMessage() : new ChannelFailureMessage(); |
| 139 | + } |
| 140 | + |
| 141 | + private static async forwardSessionClose( |
| 142 | + session: SshSession, |
| 143 | + e: SshSessionClosedEventArgs, |
| 144 | + ): Promise<void> { |
| 145 | + return session.close(e.reason, e.message, e.error ?? undefined); |
| 146 | + } |
| 147 | + |
| 148 | + private static async forwardData( |
| 149 | + channel: SshChannel, |
| 150 | + toChannel: SshChannel, |
| 151 | + data: Buffer, |
| 152 | + ): Promise<void> { |
| 153 | + // Seems that somehow data gets corrupted/disposed? so do a copy first thing |
| 154 | + const buffer = Buffer.alloc(data.length); |
| 155 | + data.copy(buffer); |
| 156 | + const promise = toChannel.send(buffer, CancellationToken.None); |
| 157 | + channel.adjustWindow(buffer.length); |
| 158 | + return promise; |
| 159 | + } |
| 160 | + |
| 161 | + private static async forwardChannelClose( |
| 162 | + channel: SshChannel, |
| 163 | + e: SshChannelClosedEventArgs, |
| 164 | + ): Promise<void> { |
| 165 | + if (e.error) { |
| 166 | + // @ts-ignore |
| 167 | + channel.close(e.error); |
| 168 | + return Promise.resolve(); |
| 169 | + } else if (e.exitSignal) { |
| 170 | + return channel.close(e.exitSignal, e.errorMessage); |
| 171 | + } else if (typeof e.exitStatus === 'number') { |
| 172 | + return channel.close(e.exitStatus); |
| 173 | + } else { |
| 174 | + return channel.close(); |
| 175 | + } |
| 176 | + } |
| 177 | +} |
0 commit comments