|
21 | 21 | Unpooled)
|
22 | 22 | (io.netty.channel
|
23 | 23 | Channel
|
| 24 | + ChannelFactory |
24 | 25 | ChannelFuture
|
25 | 26 | ChannelHandler
|
26 | 27 | ChannelHandlerContext
|
|
1667 | 1668 | (.addFirst pipeline "channel-tracker" ^ChannelHandler (channel-tracking-handler chan-group))
|
1668 | 1669 | (pipeline-builder pipeline)))
|
1669 | 1670 |
|
| 1671 | +(defn- validate-existing-channel |
| 1672 | + [existing-channel] |
| 1673 | + (when (some? existing-channel) |
| 1674 | + (when-not (instance? java.nio.channels.ServerSocketChannel existing-channel) |
| 1675 | + (throw (IllegalArgumentException. |
| 1676 | + (str "The existing-channel type is not supported: " (pr-str existing-channel))))) |
| 1677 | + (when (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel existing-channel)) |
| 1678 | + (throw (IllegalArgumentException. |
| 1679 | + (str "The existing-channel is not bound: " (pr-str existing-channel))))))) |
| 1680 | + |
| 1681 | +(defn- wrapping-channel-factory |
| 1682 | + ^ChannelFactory [^java.nio.channels.ServerSocketChannel channel] |
| 1683 | + (proxy [ChannelFactory] [] |
| 1684 | + (newChannel [] |
| 1685 | + (NioServerSocketChannel. channel)))) |
| 1686 | + |
1670 | 1687 | (defn ^:no-doc start-server
|
1671 | 1688 | ([pipeline-builder
|
1672 | 1689 | ssl-context
|
|
1685 | 1702 | bootstrap-transform
|
1686 | 1703 | on-close
|
1687 | 1704 | ^SocketAddress socket-address
|
| 1705 | + existing-channel |
1688 | 1706 | transport
|
1689 | 1707 | shutdown-timeout]
|
1690 | 1708 | :or {shutdown-timeout default-shutdown-timeout}
|
1691 | 1709 | :as opts}]
|
1692 | 1710 | (ensure-transport-available! transport)
|
| 1711 | + (validate-existing-channel existing-channel) |
1693 | 1712 | (let [num-cores (.availableProcessors (Runtime/getRuntime))
|
1694 | 1713 | num-threads (* 2 num-cores)
|
1695 | 1714 | thread-factory (enumerating-thread-factory "aleph-server-pool" false)
|
|
1715 | 1734 | (.option ChannelOption/SO_REUSEADDR true)
|
1716 | 1735 | (.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE)
|
1717 | 1736 | (.group group)
|
1718 |
| - (.channel channel-class) |
| 1737 | + (cond-> (nil? existing-channel) (.channel channel-class)) |
| 1738 | + (cond-> (some? existing-channel) (.channelFactory (wrapping-channel-factory existing-channel))) |
1719 | 1739 | ;;TODO: add a server (.handler) call to the bootstrap, for logging or something
|
1720 | 1740 | (.childHandler (pipeline-initializer pipeline-builder))
|
1721 | 1741 | (.childOption ChannelOption/SO_REUSEADDR true)
|
1722 | 1742 | (.childOption ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE)
|
1723 | 1743 | bootstrap-transform)
|
1724 | 1744 |
|
1725 | 1745 | ^ServerSocketChannel
|
1726 |
| - ch (-> b (.bind socket-address) .sync .channel)] |
| 1746 | + ch (-> (if (nil? existing-channel) |
| 1747 | + (.bind b socket-address) |
| 1748 | + (.register b)) |
| 1749 | + .sync |
| 1750 | + .channel)] |
1727 | 1751 |
|
1728 | 1752 | (reify
|
1729 | 1753 | Closeable
|
1730 | 1754 | (close [_]
|
1731 | 1755 | (when (compare-and-set! closed? false true)
|
1732 | 1756 | ;; This is the three step closing sequence:
|
1733 | 1757 | ;; 1. Stop listening to incoming requests
|
1734 |
| - (-> ch .close .sync) |
| 1758 | + (if (nil? existing-channel) |
| 1759 | + (-> ch .close .sync) |
| 1760 | + (-> ch .deregister .sync)) |
1735 | 1761 | (-> (if (pos? shutdown-timeout)
|
1736 | 1762 | ;; 2. Wait for in-flight requests to stop processing within the supplied timeout
|
1737 | 1763 | ;; interval.
|
|
1759 | 1785 | (port [_]
|
1760 | 1786 | (-> ch .localAddress .getPort))
|
1761 | 1787 | (wait-for-close [_]
|
1762 |
| - (-> ch .closeFuture .await) |
| 1788 | + (when (nil? existing-channel) |
| 1789 | + (-> ch .closeFuture .await)) |
1763 | 1790 | (-> group .terminationFuture .await)
|
1764 | 1791 | nil)))
|
1765 | 1792 |
|
|
0 commit comments