Skip to content

Commit 4e0a698

Browse files
committed
Work-in-progress
1 parent 2edcdf0 commit 4e0a698

File tree

3 files changed

+164
-0
lines changed

3 files changed

+164
-0
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
cabal-version: 3.0
2+
Name: network-transport-quic
3+
Version: 0.1.0
4+
build-Type: Simple
5+
License: BSD-3-Clause
6+
License-file: LICENSE
7+
Copyright: Well-Typed LLP, Tweag I/O Limited
8+
Author: Duncan Coutts, Nicolas Wu, Edsko de Vries
9+
maintainer: The Distributed Haskell team
10+
Stability: experimental
11+
Homepage: http://haskell-distributed.github.com
12+
Bug-Reports: https://github.com/haskell-distributed/distributed-process/issues
13+
Synopsis: Networking layer for Cloud Haskell based on QUIC
14+
Description: Networking layer for Cloud Haskell based on QUIC
15+
tested-with: GHC==8.10.7 GHC==9.0.2 GHC==9.2.8 GHC==9.4.5 GHC==9.6.4 GHC==9.8.2 GHC==9.10.1 GHC==9.12.1
16+
Category: Network
17+
extra-doc-files: ChangeLog
18+
19+
source-repository head
20+
Type: git
21+
Location: https://github.com/haskell-distributed/distributed-process
22+
SubDir: packages/network-transport-quic
23+
24+
common warnings
25+
ghc-options: -Wall
26+
-Wcompat
27+
-Widentities
28+
-Wincomplete-uni-patterns
29+
-Wincomplete-record-updates
30+
-Wredundant-constraints
31+
-fhide-source-paths
32+
-Wpartial-fields
33+
-Wunused-packages
34+
35+
library
36+
import: warnings
37+
build-depends: async >= 2.2 && < 2.3,
38+
base >= 4.14 && < 5,
39+
binary,
40+
bytestring >= 0.10 && < 0.13,
41+
containers,
42+
network >= 3.1 && < 3.3,
43+
network-transport >= 0.5 && < 0.6,
44+
quic ^>=0.2,
45+
stm >=2.4 && <2.6
46+
exposed-modules: Network.Transport.QUIC
47+
other-modules: Network.Transport.QUIC.EndpointState
48+
default-language: Haskell2010
49+
default-extensions: ImportQualifiedPost
50+
-- The -threaded option is /required/ to use the quic library
51+
ghc-options: -threaded
52+
hs-source-dirs: src
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
{-# LANGUAGE OverloadedRecordDot #-}
2+
3+
module Network.Transport.QUIC (
4+
createTransport,
5+
QUICAddr (..),
6+
) where
7+
8+
import Control.Concurrent.Async qualified as Async
9+
import Control.Concurrent.STM (atomically)
10+
import Control.Concurrent.STM.TQueue
11+
import Data.ByteString qualified as BS
12+
import Data.Map.Strict (Map)
13+
import GHC.IORef (atomicModifyIORef')
14+
import Network.QUIC (StreamId)
15+
import Network.QUIC qualified as QUIC
16+
import Network.QUIC.Client qualified as QUIC
17+
import Network.QUIC.Server (defaultServerConfig)
18+
import Network.QUIC.Server qualified as QUIC.Server
19+
import Network.Transport (ConnectErrorCode (..), ConnectionId, EndPoint (..), Event (..), NewEndPointErrorCode, Transport (..), TransportError (..))
20+
import Network.Transport.Internal (mapIOException)
21+
22+
import Network.Socket (HostName, ServiceName, getAddrInfo)
23+
24+
{- | Create a new Transport.
25+
26+
Only a single transport should be created per Haskell process
27+
(threads can, and should, create their own endpoints though).
28+
-}
29+
createTransport :: QUICAddr -> IO Transport
30+
createTransport quicAddr = do
31+
pure $ Transport (newEndpoint quicAddr) closeTransport
32+
33+
data QUICAddr = QUICAddr
34+
{ quicBindHost :: !HostName
35+
, quicBindPort :: !ServiceName
36+
}
37+
38+
newEndpoint :: QUICAddr -> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
39+
newEndpoint quicAddr = do
40+
eventQueue <- newTQueueIO
41+
42+
(addrInfo : _) <-
43+
mapIOException (TransportError ConnectNotFound . show) $
44+
getAddrInfo
45+
Nothing
46+
(Just quicAddr.quicBindHost)
47+
(Just quicAddr.quicBindPort)
48+
49+
QUIC.Server.run
50+
defaultServerConfig
51+
( \conn -> do
52+
-- TODO: create a bidirectional stream
53+
-- which can be re-used for sending
54+
quicStream <- QUIC.acceptStream conn
55+
-- TODO: how to ensure positivity of ConnectionId? QUIC StreamID should be a 62 bit integer,
56+
-- so there's room to make it a positive 64 bit integer (ConnectionId ~ Word64)
57+
let connId = fromIntegral (QUIC.streamId quicStream)
58+
receiveLoop connId quicStream eventQueue
59+
)
60+
61+
pure . Right $
62+
EndPoint
63+
(atomically (readTQueue eventQueue))
64+
_
65+
_
66+
_
67+
_
68+
_
69+
where
70+
receiveLoop ::
71+
ConnectionId ->
72+
QUIC.Stream ->
73+
TQueue Event ->
74+
IO ()
75+
receiveLoop connId stream eventQueue = do
76+
incoming <- QUIC.recvStream stream 1024 -- TODO: variable length?
77+
-- TODO: check some state whether we should stop all connections
78+
if BS.null incoming
79+
then do
80+
atomically (writeTQueue eventQueue (ConnectionClosed connId))
81+
else do
82+
atomically (writeTQueue eventQueue (Received connId [incoming]))
83+
receiveLoop connId stream eventQueue
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
module Network.Transport.QUIC.EndpointState (
2+
EndpointState,
3+
newConnection,
4+
) where
5+
6+
import Data.Map.Strict (Map)
7+
import Network.QUIC (Stream, StreamId)
8+
import Network.QUIC qualified as QUIC
9+
import Network.Transport (ConnectionId)
10+
11+
import Data.Map.Strict qualified as Map
12+
13+
newtype EndpointState = EndpointState
14+
{ streamIds :: Map StreamId ConnectionId
15+
, streams :: Map
16+
}
17+
18+
newConnection :: Stream -> EndpointState -> EndpointState
19+
newConnection stream (EndpointState sids) =
20+
EndpointState
21+
( Map.insert
22+
(QUIC.streamId stream)
23+
-- TODO: how to ensure positivity? QUIC StreamID should be a 62 bit integer,
24+
-- so there's room to make it a positive 64 bit integer (ConnectionId ~ Word64)
25+
(fromIntegral $ QUIC.streamId stream)
26+
sids
27+
)
28+
29+
lookupConnectionId :: EndpointState -> ConnectionId

0 commit comments

Comments
 (0)