Skip to content

Commit e990e98

Browse files
committed
Work-in-progress
1 parent 2edcdf0 commit e990e98

File tree

15 files changed

+740
-0
lines changed

15 files changed

+740
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Copyright (c) Laurent P. René de Cotret
2+
3+
Permission is hereby granted, free of charge, to any person obtaining
4+
a copy of this software and associated documentation files (the
5+
"Software"), to deal in the Software without restriction, including
6+
without limitation the rights to use, copy, modify, merge, publish,
7+
distribute, sublicense, and/or sell copies of the Software, and to
8+
permit persons to whom the Software is furnished to do so, subject to
9+
the following conditions:
10+
11+
The above copyright notice and this permission notice shall be included
12+
in all copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15+
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
17+
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
18+
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19+
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20+
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
cabal-version: 3.0
2+
Name: network-transport-quic
3+
Version: 0.1.0
4+
build-Type: Simple
5+
License: BSD-3-Clause
6+
License-file: LICENSE
7+
Copyright: Laurent P. René de Cotret
8+
Author: Laurent P. René de Cotret
9+
maintainer: The Distributed Haskell team
10+
Stability: experimental
11+
Homepage: http://haskell-distributed.github.com
12+
Bug-Reports: https://github.com/haskell-distributed/distributed-process/issues
13+
Synopsis: Networking layer for Cloud Haskell based on QUIC
14+
Description: Networking layer for Cloud Haskell based on QUIC
15+
tested-with: GHC==8.10.7 GHC==9.0.2 GHC==9.2.8 GHC==9.4.5 GHC==9.6.4 GHC==9.8.2 GHC==9.10.1 GHC==9.12.1
16+
Category: Network
17+
extra-doc-files: ChangeLog
18+
extra-source-files: test/credentials/*
19+
20+
source-repository head
21+
Type: git
22+
Location: https://github.com/haskell-distributed/distributed-process
23+
SubDir: packages/network-transport-quic
24+
25+
common common
26+
ghc-options:
27+
-- warnings
28+
-Wall
29+
-Wcompat
30+
-Widentities
31+
-Wincomplete-uni-patterns
32+
-Wincomplete-record-updates
33+
-Wredundant-constraints
34+
-fhide-source-paths
35+
-Wpartial-fields
36+
-Wunused-packages
37+
-- The -threaded option is /required/ to use the quic library
38+
-threaded
39+
40+
library
41+
import: common
42+
build-depends: attoparsec
43+
, base >= 4.14 && < 5
44+
, bytestring >= 0.10 && < 0.13
45+
, containers
46+
, ip
47+
, network >= 3.1 && < 3.3
48+
, network-transport >= 0.5 && < 0.6
49+
, quic ^>=0.2
50+
, stm >=2.4 && <2.6
51+
, text >= 2.0 && <2.2
52+
, tls
53+
, tls-session-manager
54+
exposed-modules: Network.Transport.QUIC
55+
Network.Transport.QUIC.Internal
56+
other-modules: Network.Transport.QUIC.Internal.Configuration
57+
Network.Transport.QUIC.Internal.EndpointState
58+
Network.Transport.QUIC.Internal.QUICAddr
59+
Network.Transport.QUIC.Internal.Server
60+
Network.Transport.QUIC.Internal.TLS
61+
Network.Transport.QUIC.Internal.TransportState
62+
default-language: Haskell2010
63+
default-extensions: ImportQualifiedPost
64+
-- The -threaded option is /required/ to use the quic library
65+
hs-source-dirs: src
66+
67+
test-suite network-transport-quic-tests
68+
import: common
69+
default-language: Haskell2010
70+
default-extensions: ImportQualifiedPost
71+
main-is: Main.hs
72+
other-modules: Test.Network.Transport.QUIC
73+
Test.Network.Transport.QUIC.Internal.QUICAddr
74+
type: exitcode-stdio-1.0
75+
hs-source-dirs: test
76+
build-depends: base
77+
, filepath
78+
, hedgehog
79+
, ip
80+
, network
81+
, network-transport
82+
, network-transport-quic
83+
, network-transport-tests
84+
, tasty ^>=1.5
85+
, tasty-hedgehog
86+
, tasty-hunit
87+
, text
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
module Network.Transport.QUIC (
2+
createTransport,
3+
QUICAddr (..),
4+
5+
-- * Re-export to generate credentials
6+
Credential,
7+
credentialLoadX509,
8+
) where
9+
10+
import Network.Transport.QUIC.Internal (
11+
-- \* Re-export to generate credentials
12+
Credential,
13+
QUICAddr (..),
14+
createTransport,
15+
credentialLoadX509,
16+
)
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
{-# LANGUAGE LambdaCase #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
{-# LANGUAGE ScopedTypeVariables #-}
4+
5+
module Network.Transport.QUIC.Internal (
6+
createTransport,
7+
QUICAddr (..),
8+
encodeQUICAddr,
9+
decodeQUICAddr,
10+
11+
-- * Re-export to generate credentials
12+
Credential,
13+
credentialLoadX509,
14+
) where
15+
16+
import Control.Concurrent (forkIO, killThread)
17+
import Control.Concurrent.STM (atomically)
18+
import Control.Concurrent.STM.TQueue (
19+
TQueue,
20+
newTQueueIO,
21+
readTQueue,
22+
writeTQueue,
23+
)
24+
import Control.Exception (throwIO, try)
25+
import Control.Monad (forever, void)
26+
import Data.Bifunctor (first)
27+
import Data.ByteString qualified as BS
28+
import Data.Functor ((<&>))
29+
import Data.List.NonEmpty (NonEmpty)
30+
import Network.QUIC qualified as QUIC
31+
import Network.QUIC.Client qualified as QUIC.Client
32+
import Network.TLS (Credential)
33+
import Network.Transport (
34+
ConnectErrorCode (ConnectNotFound),
35+
ConnectHints,
36+
Connection (..),
37+
ConnectionId,
38+
EndPoint (..),
39+
EndPointAddress,
40+
Event (..),
41+
EventErrorCode (EventEndPointFailed),
42+
NewEndPointErrorCode,
43+
NewMulticastGroupErrorCode (NewMulticastGroupUnsupported),
44+
Reliability,
45+
ResolveMulticastGroupErrorCode (ResolveMulticastGroupUnsupported),
46+
SendErrorCode (..),
47+
Transport (..),
48+
TransportError (..),
49+
)
50+
import Network.Transport.QUIC.Internal.Configuration (credentialLoadX509, mkClientConfig)
51+
import Network.Transport.QUIC.Internal.QUICAddr (QUICAddr (..), decodeQUICAddr, encodeQUICAddr)
52+
import Network.Transport.QUIC.Internal.Server (forkServer)
53+
import Network.Transport.QUIC.Internal.TransportState (TransportState, newTransportState, registerEndpoint, traverseTransportState)
54+
55+
{- | Create a new Transport based on the QUIC protocol.
56+
57+
Only a single transport should be created per Haskell process
58+
(threads can, and should, create their own endpoints though).
59+
-}
60+
createTransport ::
61+
QUICAddr ->
62+
NonEmpty Credential ->
63+
IO Transport
64+
createTransport quicAddr creds = do
65+
transportState <- newTransportState
66+
pure $
67+
Transport
68+
(newEndpoint transportState quicAddr creds)
69+
(closeQUICTransport transportState)
70+
71+
newEndpoint ::
72+
TransportState ->
73+
QUICAddr ->
74+
NonEmpty Credential ->
75+
IO (Either (TransportError NewEndPointErrorCode) EndPoint)
76+
newEndpoint transportState quicAddr@(QUICAddr host port) creds = do
77+
eventQueue <- newTQueueIO
78+
79+
serverThread <-
80+
forkServer
81+
host
82+
port
83+
creds
84+
throwIO
85+
( atomically
86+
. writeTQueue eventQueue
87+
. ErrorEvent
88+
. TransportError EventEndPointFailed
89+
. show
90+
)
91+
( \stream -> do
92+
let connId = fromIntegral (QUIC.streamId stream)
93+
receiveLoop connId stream eventQueue
94+
)
95+
96+
let endpoint =
97+
EndPoint
98+
(atomically (readTQueue eventQueue))
99+
(encodeQUICAddr quicAddr)
100+
(connectQUIC creds)
101+
(pure . Left $ TransportError NewMulticastGroupUnsupported "Multicast not supported")
102+
(pure . Left . const (TransportError ResolveMulticastGroupUnsupported "Multicast not supported"))
103+
(killThread serverThread >> atomically (writeTQueue eventQueue EndPointClosed))
104+
void $ transportState `registerEndpoint` endpoint
105+
pure $ Right endpoint
106+
where
107+
receiveLoop ::
108+
ConnectionId ->
109+
QUIC.Stream ->
110+
TQueue Event ->
111+
IO ()
112+
receiveLoop connId stream eventQueue = do
113+
incoming <- QUIC.recvStream stream 1024 -- TODO: variable length?
114+
-- TODO: check some state whether we should stop all connections
115+
if BS.null incoming
116+
then do
117+
atomically (writeTQueue eventQueue (ConnectionClosed connId))
118+
else do
119+
atomically (writeTQueue eventQueue (Received connId [incoming]))
120+
receiveLoop connId stream eventQueue
121+
122+
connectQUIC ::
123+
NonEmpty Credential ->
124+
EndPointAddress ->
125+
Reliability ->
126+
ConnectHints ->
127+
IO (Either (TransportError ConnectErrorCode) Connection)
128+
connectQUIC creds endpointAddress _reliability _connectHints =
129+
case decodeQUICAddr endpointAddress of
130+
Left errmsg -> pure $ Left $ TransportError ConnectNotFound ("Could not decode QUIC address: " <> errmsg)
131+
Right (QUICAddr hostname port) ->
132+
try $ do
133+
clientConfig <- mkClientConfig hostname port creds
134+
135+
outgoingQueue <- newTQueueIO
136+
tid <- forkIO $ QUIC.Client.run clientConfig $ \conn -> do
137+
QUIC.waitEstablished conn
138+
stream <- QUIC.stream conn
139+
forever $ do
140+
atomically (readTQueue outgoingQueue)
141+
>>= try . QUIC.sendStreamMany stream
142+
<&> first
143+
( \case
144+
QUIC.StreamIsClosed -> TransportError SendClosed "QUIC stream is closed"
145+
QUIC.ConnectionIsClosed reason -> TransportError SendClosed (show reason)
146+
other -> TransportError SendFailed (show other)
147+
)
148+
149+
pure $
150+
Connection
151+
(fmap Right . atomically . writeTQueue outgoingQueue)
152+
(killThread tid)
153+
154+
closeQUICTransport :: TransportState -> IO ()
155+
closeQUICTransport = flip traverseTransportState (\_ endpoint -> closeEndPoint endpoint)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{-# LANGUAGE OverloadedStrings #-}
2+
3+
module Network.Transport.QUIC.Internal.Configuration (
4+
mkClientConfig,
5+
mkServerConfig,
6+
7+
-- * Re-export to generate credentials
8+
Credential,
9+
TLS.credentialLoadX509,
10+
) where
11+
12+
import Data.List.NonEmpty (NonEmpty)
13+
import Data.List.NonEmpty qualified as NonEmpty
14+
import Network.QUIC.Client (ClientConfig (ccALPN, ccValidate, ccWatchDog), ccPortName, ccServerName, defaultClientConfig)
15+
import Network.QUIC.Internal (ClientConfig (ccDebugLog), Milliseconds (Milliseconds), ServerConfig (scALPN), ccCredentials, ccKeyLog, maxIdleTimeout, scParameters)
16+
import Network.QUIC.Server (ServerConfig (scAddresses, scCredentials, scSessionManager, scUse0RTT), defaultServerConfig)
17+
import Network.Socket (HostName, ServiceName)
18+
import Network.TLS (Credential, Credentials (Credentials))
19+
import Network.Transport.QUIC.Internal.TLS qualified as TLS
20+
21+
mkClientConfig ::
22+
HostName ->
23+
ServiceName ->
24+
NonEmpty Credential ->
25+
IO ClientConfig
26+
mkClientConfig host port creds = do
27+
pure $
28+
defaultClientConfig
29+
{ ccServerName = host
30+
, ccPortName = port
31+
, ccALPN = \_version -> pure (Just ["perf"])
32+
, ccValidate = False
33+
, ccCredentials = Credentials (NonEmpty.toList creds)
34+
, ccWatchDog = True
35+
, -- The following two parameters are for debugging. TODO: turn off by default
36+
ccDebugLog = True
37+
, ccKeyLog = putStrLn
38+
}
39+
40+
mkServerConfig ::
41+
HostName ->
42+
ServiceName ->
43+
NonEmpty Credential ->
44+
IO ServerConfig
45+
mkServerConfig host port creds = do
46+
tlsSessionManager <- TLS.sessionManager
47+
48+
pure $
49+
defaultServerConfig
50+
{ scAddresses = [(read host, read port)]
51+
, scSessionManager = tlsSessionManager
52+
, scCredentials = Credentials (NonEmpty.toList creds)
53+
, scALPN = Just $ \_version _protocols -> pure "perf"
54+
, scUse0RTT = True
55+
, scParameters =
56+
(scParameters defaultServerConfig)
57+
{ maxIdleTimeout = Milliseconds 1000
58+
}
59+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
module Network.Transport.QUIC.Internal.EndpointState (
2+
EndpointState,
3+
newEndpointState,
4+
registerStream,
5+
deregisterStream,
6+
) where
7+
8+
import Control.Monad (void)
9+
import Data.Function ((&))
10+
import Data.IORef (IORef, newIORef)
11+
import Data.Map.Strict (Map)
12+
import Data.Map.Strict qualified as Map
13+
import GHC.IORef (atomicModifyIORef'_)
14+
import Network.QUIC (Stream, streamId)
15+
import Network.Transport (EndPointAddress)
16+
17+
newtype EndpointState = EndpointState (IORef (Map EndPointAddress Stream))
18+
19+
-- \^ Mapping from destination address to stream.
20+
-- A 'Stream' will have two keys since a stream is bidirectional.
21+
22+
newEndpointState :: IO EndpointState
23+
newEndpointState = EndpointState <$> newIORef mempty
24+
25+
registerStream ::
26+
EndpointState ->
27+
EndPointAddress ->
28+
EndPointAddress ->
29+
Stream ->
30+
IO ()
31+
registerStream (EndpointState strs) source dest stream =
32+
void $
33+
atomicModifyIORef'_
34+
strs
35+
( \st ->
36+
st
37+
& Map.insert source stream
38+
& Map.insert dest stream
39+
)
40+
41+
deregisterStream :: EndpointState -> Stream -> IO ()
42+
deregisterStream (EndpointState strs) stream =
43+
void $
44+
atomicModifyIORef'_
45+
strs
46+
( \st ->
47+
let thisStreamId = streamId stream
48+
in Map.filter ((/=) thisStreamId . streamId) st
49+
)

0 commit comments

Comments
 (0)