Skip to content

Commit 0ddba0d

Browse files
authored
Merge pull request #38 from mikekap/deterministic-ports
Add the ability to have slightly more deterministic ports
2 parents 1072f20 + f89ba13 commit 0ddba0d

File tree

4 files changed

+27
-19
lines changed

4 files changed

+27
-19
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ See:
8585
--default-listener-ip string Default listener IP (default "127.0.0.1")
8686
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
8787
--dynamic-listeners-disable Disable dynamic listeners.
88+
--dynamic-sequential-min-port int If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.
8889
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
8990
--forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
9091
--forward-proxy string URL of the forward proxy. Supported schemas are socks5 and http
@@ -568,4 +569,4 @@ Use localhost:19092 as bootstrap servers
568569
### Embedded third-party source code
569570

570571
* [Cloud SQL Proxy](https://github.com/GoogleCloudPlatform/cloudsql-proxy)
571-
* [Sarama](https://github.com/Shopify/sarama)
572+
* [Sarama](https://github.com/Shopify/sarama)

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func initFlags() {
8787
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
8888
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
8989
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
90+
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9091

9192
Server.Flags().IntVar(&c.Proxy.RequestBufferSize, "proxy-request-buffer-size", 4096, "Request buffer size pro tcp connection")
9293
Server.Flags().IntVar(&c.Proxy.ResponseBufferSize, "proxy-response-buffer-size", 4096, "Response buffer size pro tcp connection")

config/config.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ type Config struct {
4747
LevelFieldName string
4848
}
4949
Proxy struct {
50-
DefaultListenerIP string
51-
BootstrapServers []ListenerConfig
52-
ExternalServers []ListenerConfig
53-
DialAddressMappings []DialAddressMapping
54-
DisableDynamicListeners bool
55-
RequestBufferSize int
56-
ResponseBufferSize int
57-
ListenerReadBufferSize int // SO_RCVBUF
58-
ListenerWriteBufferSize int // SO_SNDBUF
59-
ListenerKeepAlive time.Duration
50+
DefaultListenerIP string
51+
BootstrapServers []ListenerConfig
52+
ExternalServers []ListenerConfig
53+
DialAddressMappings []DialAddressMapping
54+
DisableDynamicListeners bool
55+
DynamicSequentialMinPort int
56+
RequestBufferSize int
57+
ResponseBufferSize int
58+
ListenerReadBufferSize int // SO_RCVBUF
59+
ListenerWriteBufferSize int // SO_SNDBUF
60+
ListenerKeepAlive time.Duration
6061

6162
TLS struct {
6263
Enable bool

proxy/proxy.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ type Listeners struct {
2222

2323
listenFunc ListenFunc
2424

25-
disableDynamicListeners bool
25+
disableDynamicListeners bool
26+
dynamicSequentialMinPort int
2627

2728
brokerToListenerConfig map[string]config.ListenerConfig
2829
lock sync.RWMutex
@@ -60,12 +61,13 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6061
}
6162

6263
return &Listeners{
63-
defaultListenerIP: defaultListenerIP,
64-
connSrc: make(chan Conn, 1),
65-
brokerToListenerConfig: brokerToListenerConfig,
66-
tcpConnOptions: tcpConnOptions,
67-
listenFunc: listenFunc,
68-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
64+
defaultListenerIP: defaultListenerIP,
65+
connSrc: make(chan Conn, 1),
66+
brokerToListenerConfig: brokerToListenerConfig,
67+
tcpConnOptions: tcpConnOptions,
68+
listenFunc: listenFunc,
69+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
70+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
6971
}, nil
7072
}
7173

@@ -140,7 +142,10 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,
140142
return v.AdvertisedAddress, 0, nil
141143
}
142144

143-
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(0))
145+
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
146+
if p.dynamicSequentialMinPort != 0 {
147+
p.dynamicSequentialMinPort += 1
148+
}
144149

145150
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
146151
l, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)

0 commit comments

Comments
 (0)