Skip to content

Commit ff03161

Browse files
committed
Add support for forwarding directly to a TCPSocket
1 parent aa32a26 commit ff03161

File tree

6 files changed

+160
-81
lines changed

6 files changed

+160
-81
lines changed

docs/src/changelog.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ CurrentModule = LibSSH
77
This documents notable changes in LibSSH.jl. The format is based on [Keep a
88
Changelog](https://keepachangelog.com).
99

10+
## Unreleased
11+
12+
### Added
13+
14+
- A new [`Forwarder(::Session, ::String, ::Int)`](@ref) constructor to allow for
15+
forwarding a port to an internal socket instead of to a port ([#10]).
16+
1017
## [v0.4.0] - 2024-03-12
1118

1219
### Added

docs/src/sessions_and_channels.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ Base.success(::Cmd, ::Session)
122122
```@docs
123123
Forwarder
124124
Forwarder(::Session, ::Int, ::String, ::Int)
125+
Forwarder(::Session, ::String, ::Int)
125126
Forwarder(::Function)
126127
Base.close(::Forwarder)
127128
```

src/LibSSH.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ function _safe_poll_fd(args...; kwargs...)
176176
return result
177177
end
178178

179+
include("utils.jl")
179180
include("gssapi.jl")
180181
include("pki.jl")
181182
include("callbacks.jl")

src/channel.jl

Lines changed: 121 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,46 @@ mutable struct _ForwardingClient
703703
sshchan::SshChannel
704704
callbacks::Callbacks.ChannelCallbacks
705705
client_task::Union{Task, Nothing}
706+
707+
function _ForwardingClient(forwarder, socket::TCPSocket)
708+
remotehost = forwarder.remotehost
709+
remoteport = forwarder.remoteport
710+
711+
# Open a forwarding channel
712+
local_ip = string(getaddrinfo(gethostname()))
713+
sshchan = SshChannel(forwarder._session)
714+
ret = _session_trywait(forwarder._session) do
715+
GC.@preserve remotehost local_ip begin
716+
lib.ssh_channel_open_forward(sshchan.ptr,
717+
Base.unsafe_convert(Ptr{Cchar}, remotehost), remoteport,
718+
Base.unsafe_convert(Ptr{Cchar}, local_ip), forwarder.localport)
719+
end
720+
end
721+
if ret != SSH_OK
722+
throw(LibSSHException("Could not open a forwarding channel: $(get_error(forwarder._session))"))
723+
end
724+
725+
# Set callbacks for the channel
726+
callbacks = Callbacks.ChannelCallbacks(nothing;
727+
on_data=_on_client_channel_data,
728+
on_eof=_on_client_channel_eof,
729+
on_close=_on_client_channel_close)
730+
set_channel_callbacks(sshchan, callbacks)
731+
732+
# Create a client and set the callbacks userdata to the new client object
733+
self = new(forwarder._next_client_id, forwarder.verbose, socket,
734+
sshchan, callbacks, nothing)
735+
callbacks.userdata = self
736+
737+
# Start a listener on the new socket to forward data to the server
738+
self.client_task = Threads.@spawn try
739+
_handle_forwarding_client(self)
740+
catch ex
741+
@error "Error when handling SSH port forward client $(self.id)!" exception=(ex, catch_backtrace())
742+
end
743+
744+
return self
745+
end
706746
end
707747

708748
# Helper function to log messages from a forwarding client
@@ -742,69 +782,98 @@ end
742782
$(TYPEDEF)
743783
$(TYPEDFIELDS)
744784
745-
This object manages a direct forwarding channel between `localport` and `remotehost:remoteport`.
785+
This object manages a direct forwarding channel between `localport` and
786+
`remotehost:remoteport`. Fields beginning with an underscore `_` are private and
787+
should not be used.
746788
"""
747-
mutable struct Forwarder
789+
@kwdef mutable struct Forwarder
748790
remotehost::String
749791
remoteport::Int
750-
localinterface::Sockets.IPAddr
751-
localport::Int
792+
localinterface::Sockets.IPAddr = Sockets.localhost
793+
localport::Int = -1
794+
795+
out::Union{TCPSocket, Nothing} = nothing
752796

753-
_listen_server::TCPServer
754-
_listener_task::Union{Task, Nothing}
755-
_clients::Vector{_ForwardingClient}
797+
_listen_server::TCPServer = TCPServer()
798+
_listener_task::Union{Task, Nothing} = nothing
799+
_clients::Vector{_ForwardingClient} = _ForwardingClient[]
800+
_next_client_id::Int = 1
756801

757802
_session::Session
758803
verbose::Bool
804+
end
759805

760-
@doc """
761-
$(TYPEDSIGNATURES)
806+
"""
807+
$(TYPEDSIGNATURES)
762808
763-
Create a `Forwarder` object to forward data from `localport` to
764-
`remotehost:remoteport`. This will handle an internal [`SshChannel`](@ref)
765-
for forwarding.
766-
767-
# Arguments
768-
- `session`: The session to create a forwarding channel over.
769-
- `localport`: The local port to bind to.
770-
- `remotehost`: The remote host.
771-
- `remoteport`: The remote port to bind to.
772-
- `verbose`: Print logging messages on callbacks etc (not equivalent to
773-
setting `log_verbosity` on a [`Session`](@ref)).
774-
- `localinterface=IPv4(0)`: The interface to bind `localport` on.
775-
"""
776-
function Forwarder(session::Session, localport::Int, remotehost::String, remoteport::Int;
777-
verbose=false, localinterface::Sockets.IPAddr=IPv4(0))
778-
listen_server = Sockets.listen(localinterface, localport)
809+
Create a `Forwarder` object that will forward its data to a single
810+
`TCPSocket`. This is useful if there is only one client and binding to a port
811+
available to other processes is not desirable. The socket will be stored in the
812+
`Forwarder.out` property, and it will be closed when the `Forwarder` is closed.
779813
780-
self = new(remotehost, remoteport, localinterface, localport,
781-
listen_server, nothing, _ForwardingClient[],
782-
session, verbose)
814+
All arguments mean the same as in [`Forwarder(::Session, ::Int, ::String,
815+
::Int)`](@ref).
816+
"""
817+
function Forwarder(session::Session, remotehost::String, remoteport::Int;
818+
verbose=false)
819+
sock1, sock2 = _socketpair()
820+
self = Forwarder(; remotehost, remoteport, out=sock2, _session=session, verbose)
821+
push!(self._clients, _ForwardingClient(self, sock1))
783822

784-
# Start the listener
785-
self._listener_task = Threads.@spawn try
786-
_fwd_listen(self)
787-
catch ex
788-
@error "Error in listen loop for Forwarder!" exception=(ex, catch_backtrace())
789-
end
823+
return self
824+
end
790825

791-
finalizer(close, self)
826+
"""
827+
$(TYPEDSIGNATURES)
828+
829+
Create a `Forwarder` object to forward data from `localport` to
830+
`remotehost:remoteport`. This will handle an internal [`SshChannel`](@ref)
831+
for forwarding.
832+
833+
# Arguments
834+
- `session`: The session to create a forwarding channel over.
835+
- `localport`: The local port to bind to.
836+
- `remotehost`: The remote host.
837+
- `remoteport`: The remote port to bind to.
838+
- `verbose`: Print logging messages on callbacks etc (not equivalent to
839+
setting `log_verbosity` on a [`Session`](@ref)).
840+
- `localinterface=IPv4(0)`: The interface to bind `localport` on.
841+
"""
842+
function Forwarder(session::Session, localport::Int, remotehost::String, remoteport::Int;
843+
verbose=false, localinterface::Sockets.IPAddr=IPv4(0))
844+
_listen_server = Sockets.listen(localinterface, localport)
845+
846+
self = Forwarder(; remotehost, remoteport, localinterface, localport,
847+
_listen_server, _session=session, verbose)
848+
849+
# Start the listener
850+
self._listener_task = Threads.@spawn try
851+
_fwd_listen(self)
852+
catch ex
853+
@error "Error in listen loop for Forwarder!" exception=(ex, catch_backtrace())
792854
end
855+
856+
finalizer(close, self)
793857
end
794858

859+
795860
function Base.show(io::IO, f::Forwarder)
796861
if !isopen(f)
797862
print(io, Forwarder, "()")
798863
else
799-
print(io, Forwarder, "($(f.localinterface):$(f.localport)$(f.remotehost):$(f.remoteport))")
864+
if isnothing(forwarder.out)
865+
print(io, Forwarder, "($(f.localinterface):$(f.localport)$(f.remotehost):$(f.remoteport))")
866+
else
867+
print(io, Forwarder, "($(f.out)$(f.remotehost):$(f.remoteport))")
868+
end
800869
end
801870
end
802871

803872
"""
804873
$(TYPEDSIGNATURES)
805874
806-
Do-constructor for a `Forwarder`. All arguments are forwarded to the
807-
[`Forwarder(::Session, ::Int, ::String, ::Int)`](@ref) constructor.
875+
Do-constructor for a `Forwarder`. All arguments are forwarded to the other
876+
constructors.
808877
"""
809878
function Forwarder(f::Function, args...; kwargs...)
810879
forwarder = Forwarder(args...; kwargs...)
@@ -825,23 +894,29 @@ socket.
825894
function Base.close(forwarder::Forwarder)
826895
# Stop accepting new clients
827896
close(forwarder._listen_server)
828-
wait(forwarder._listener_task)
897+
if !isnothing(forwarder._listener_task)
898+
wait(forwarder._listener_task)
899+
end
829900

830901
# Close existing clients
831902
for client in forwarder._clients
832903
close(client)
833904
end
834905
end
835906

836-
Base.isopen(forwarder::Forwarder) = isopen(forwarder._listen_server)
907+
function Base.isopen(forwarder::Forwarder)
908+
# If we're forwarding to a bound port then check if the TCPServer is
909+
# running, otherwise check if the single client socket is still open.
910+
if isnothing(forwarder.out)
911+
isopen(forwarder._listen_server)
912+
else
913+
isopen(forwarder.out)
914+
end
915+
end
837916

838917
# This function accepts connections on the local port and sets up
839918
# _ForwardingClient's for them.
840919
function _fwd_listen(forwarder::Forwarder)
841-
next_client_id = 1
842-
remotehost = forwarder.remotehost
843-
remoteport = forwarder.remoteport
844-
845920
while isopen(forwarder._listen_server)
846921
local sock
847922
try
@@ -854,40 +929,7 @@ function _fwd_listen(forwarder::Forwarder)
854929
end
855930
end
856931

857-
# Open a forwarding channel
858-
local_ip = string(getaddrinfo(gethostname()))
859-
sshchan = SshChannel(forwarder._session)
860-
ret = _session_trywait(forwarder._session) do
861-
GC.@preserve remotehost local_ip begin
862-
lib.ssh_channel_open_forward(sshchan.ptr,
863-
Base.unsafe_convert(Ptr{Cchar}, remotehost), remoteport,
864-
Base.unsafe_convert(Ptr{Cchar}, local_ip), forwarder.localport)
865-
end
866-
end
867-
if ret != SSH_OK
868-
throw(LibSSHException("Could not open a forwarding channel: $(get_error(forwarder._session))"))
869-
end
870-
871-
# Set callbacks for the channel
872-
callbacks = Callbacks.ChannelCallbacks(nothing;
873-
on_data=_on_client_channel_data,
874-
on_eof=_on_client_channel_eof,
875-
on_close=_on_client_channel_close)
876-
set_channel_callbacks(sshchan, callbacks)
877-
878-
# Create a client and set the callbacks userdata to the new client object
879-
client = _ForwardingClient(next_client_id, forwarder.verbose, sock,
880-
sshchan, callbacks, nothing)
881-
callbacks.userdata = client
882-
883-
# Start a listener on the new socket to forward data to the server
884-
client.client_task = Threads.@spawn try
885-
_handle_forwarding_client(client)
886-
catch ex
887-
@error "Error when handling SSH port forward client $(client.id)!" exception=(ex, catch_backtrace())
888-
end
889-
890-
push!(forwarder._clients, client)
891-
next_client_id += 1
932+
push!(forwarder._clients, _ForwardingClient(forwarder, sock))
933+
forwarder._next_client_id += 1
892934
end
893935
end

src/utils.jl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import Sockets
2+
3+
# High-level, portable implementation of socketpair(2)
4+
function _socketpair()
5+
port, server = Sockets.listenany(Sockets.localhost, 2048)
6+
acceptor = Threads.@spawn Sockets.accept(server)
7+
8+
sock1 = Sockets.connect(Sockets.localhost, port)
9+
sock2 = fetch(acceptor)
10+
11+
close(server)
12+
13+
return sock1, sock2
14+
end

test/LibSSHTests.jl

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import LibSSH.Demo: DemoServer
1818

1919
username() = Sys.iswindows() ? ENV["USERNAME"] : ENV["USER"]
2020

21+
const HTTP_200 = "HTTP/1.1 200 OK\r\n\r\n"
22+
2123
# Dummy HTTP server that only responds 200 to requests
2224
function http_server(f::Function, port)
2325
start_event = Base.Event()
@@ -38,7 +40,7 @@ function http_server(f::Function, port)
3840
# Wait for any request, doesn't matter what
3941
data = readavailable(sock)
4042
if !isempty(data)
41-
write(sock, "HTTP/1.1 200 OK\r\n\r\n")
43+
write(sock, HTTP_200)
4244
end
4345

4446
closewrite(sock)
@@ -428,12 +430,13 @@ end
428430
end
429431

430432
@testset "Direct port forwarding" begin
431-
# Test port forwarding
433+
# Smoke test
432434
demo_server_with_session(2222) do session
433435
forwarder = ssh.Forwarder(session, 8080, "localhost", 9090)
434436
close(forwarder)
435437
end
436438

439+
# Test forwarding to a port
437440
demo_server_with_session(2222) do session
438441
ssh.Forwarder(session, 8080, "localhost", 9090) do forwarder
439442
http_server(9090) do
@@ -448,6 +451,17 @@ end
448451
end
449452
end
450453
end
454+
455+
# Test forwarding to a socket
456+
demo_server_with_session(2222) do session
457+
ssh.Forwarder(session, "localhost", 9090) do forwarder
458+
http_server(9090) do
459+
socket = forwarder.out
460+
write(socket, "foo")
461+
@test read(socket, String) == HTTP_200
462+
end
463+
end
464+
end
451465
end
452466
end
453467

0 commit comments

Comments
 (0)