Skip to content

Commit 92dbdc8

Browse files
authored
feat(egress): add blocked hostname webhook fanout (alibaba#406)
* feat(egress): add blocked hostname webhook fanout * fix(egress): harden webhook broadcaster shutdown * fix(egress): normalize webhook hostname and harden broadcaster
1 parent f7bcd35 commit 92dbdc8

File tree

7 files changed

+427
-0
lines changed

7 files changed

+427
-0
lines changed

components/egress/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ The egress control is implemented as a **Sidecar** that shares the network names
4949
- **127.0.0.1** — so packets redirected by iptables to the proxy (127.0.0.1:15353) are accepted by nft.
5050
- **Nameserver IPs** from `/etc/resolv.conf` — so client DNS and proxy upstream work (e.g. private DNS).
5151
Nameserver IPs are validated (unspecified and loopback are skipped) and capped. Use `OPENSANDBOX_EGRESS_MAX_NS` (default `3`; `0` = no cap, `1``10` = cap). See [SECURITY-RISKS.md](SECURITY-RISKS.md) for trust and scope of this whitelist.
52+
- **Blocked hostname webhook**
53+
- `OPENSANDBOX_EGRESS_DENY_WEBHOOK`: HTTP endpoint URL. When set, egress asynchronously POSTs JSON **only when a hostname is denied**: `{"hostname": "<original query>", "timestamp": "<RFC3339>", "source": "opensandbox-egress"}`. Default timeout 5s, up to 3 retries with exponential backoff starting at 1s; 4xx is not retried, 5xx/network errors are retried.
54+
- **Allow requirement**: you must allow the webhook host (or its IP/CIDR) in the policy; with default deny, if you don’t explicitly allow it, the webhook traffic will be blocked by egress itself. Example: `{"defaultAction":"deny","egress":[{"action":"allow","target":"webhook.example.com"}]}`. If a broader deny CIDR covers the resolved IP, it will still be blocked—adjust your policy accordingly.
5255
- DoH/DoT blocking:
5356
- DoT (tcp/udp 853) blocked by default.
5457
- Optional DoH over 443: `OPENSANDBOX_EGRESS_BLOCK_DOH_443=true`. If enabled without blocklist, all 443 is dropped.

components/egress/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/alibaba/opensandbox/egress/pkg/constants"
2626
"github.com/alibaba/opensandbox/egress/pkg/dnsproxy"
27+
"github.com/alibaba/opensandbox/egress/pkg/events"
2728
"github.com/alibaba/opensandbox/egress/pkg/iptables"
2829
"github.com/alibaba/opensandbox/egress/pkg/log"
2930
slogger "github.com/alibaba/opensandbox/internal/logger"
@@ -64,6 +65,14 @@ func main() {
6465
}
6566
log.Infof("dns proxy started on 127.0.0.1:15353")
6667

68+
if blockWebhookURL := strings.TrimSpace(os.Getenv(constants.EnvBlockedWebhook)); blockWebhookURL != "" {
69+
blockedBroadcaster := events.NewBroadcaster(ctx, events.BroadcasterConfig{QueueSize: 256})
70+
blockedBroadcaster.AddSubscriber(events.NewWebhookSubscriber(blockWebhookURL))
71+
proxy.SetBlockedBroadcaster(blockedBroadcaster)
72+
defer blockedBroadcaster.Close()
73+
log.Infof("denied hostname webhook enabled")
74+
}
75+
6776
exemptDst := dnsproxy.ParseNameserverExemptList()
6877
if len(exemptDst) > 0 {
6978
log.Infof("nameserver exempt list: %v (proxy upstream in this list will not set SO_MARK)", exemptDst)

components/egress/pkg/constants/configuration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
EnvEgressRules = "OPENSANDBOX_EGRESS_RULES"
2424
EnvEgressLogLevel = "OPENSANDBOX_EGRESS_LOG_LEVEL"
2525
EnvMaxNameservers = "OPENSANDBOX_EGRESS_MAX_NS"
26+
EnvBlockedWebhook = "OPENSANDBOX_EGRESS_DENY_WEBHOOK"
2627

2728
// EnvNameserverExempt comma-separated IPs; proxy upstream to these is not marked and is allowed in nft allow set
2829
EnvNameserverExempt = "OPENSANDBOX_EGRESS_NAMESERVER_EXEMPT"

components/egress/pkg/dnsproxy/proxy.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"net"
2121
"net/netip"
2222
"os"
23+
"strings"
2324
"sync"
2425
"time"
2526

2627
"github.com/miekg/dns"
2728

29+
"github.com/alibaba/opensandbox/egress/pkg/events"
2830
"github.com/alibaba/opensandbox/egress/pkg/log"
2931
"github.com/alibaba/opensandbox/egress/pkg/nftables"
3032
"github.com/alibaba/opensandbox/egress/pkg/policy"
@@ -41,6 +43,9 @@ type Proxy struct {
4143

4244
// optional; called in goroutine when A/AAAA are present
4345
onResolved func(domain string, ips []nftables.ResolvedIP)
46+
47+
// optional broadcaster to notify blocked hostnames
48+
blockedBroadcaster *events.Broadcaster
4449
}
4550

4651
// New builds a proxy with resolved upstream; listenAddr can be empty for default.
@@ -109,6 +114,7 @@ func (p *Proxy) serveDNS(w dns.ResponseWriter, r *dns.Msg) {
109114
currentPolicy := p.policy
110115
p.policyMu.RUnlock()
111116
if currentPolicy != nil && currentPolicy.Evaluate(domain) == policy.ActionDeny {
117+
p.publishBlocked(domain)
112118
resp := new(dns.Msg)
113119
resp.SetRcode(r, dns.RcodeNameError)
114120
_ = w.WriteMsg(resp)
@@ -179,6 +185,26 @@ func (p *Proxy) SetOnResolved(fn func(domain string, ips []nftables.ResolvedIP))
179185
p.onResolved = fn
180186
}
181187

188+
// SetBlockedBroadcaster wires a broadcaster used to notify blocked hostnames.
189+
func (p *Proxy) SetBlockedBroadcaster(b *events.Broadcaster) {
190+
p.blockedBroadcaster = b
191+
}
192+
193+
func (p *Proxy) publishBlocked(domain string) {
194+
if p.blockedBroadcaster == nil {
195+
return
196+
}
197+
normalized := strings.ToLower(strings.TrimSuffix(domain, "."))
198+
if normalized == "" {
199+
return
200+
}
201+
202+
p.blockedBroadcaster.Publish(events.BlockedEvent{
203+
Hostname: normalized,
204+
Timestamp: time.Now().UTC(),
205+
})
206+
}
207+
182208
// extractResolvedIPs parses A and AAAA records from resp.Answer into ResolvedIP slice.
183209
//
184210
// Uses netip.ParseAddr(v.A.String()) which allocates a temporary string per record; typically
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package events
16+
17+
import (
18+
"context"
19+
"sync"
20+
"sync/atomic"
21+
"time"
22+
23+
"github.com/alibaba/opensandbox/egress/pkg/log"
24+
)
25+
26+
const defaultQueueSize = 128
27+
28+
// BlockedEvent describes a blocked hostname notification.
29+
type BlockedEvent struct {
30+
Hostname string `json:"hostname"`
31+
Timestamp time.Time `json:"timestamp"`
32+
}
33+
34+
// Subscriber consumes blocked events.
35+
type Subscriber interface {
36+
HandleBlocked(ctx context.Context, ev BlockedEvent)
37+
}
38+
39+
// BroadcasterConfig defines queue sizing for the broadcaster.
40+
type BroadcasterConfig struct {
41+
QueueSize int
42+
}
43+
44+
// Broadcaster fans out blocked events to one or more subscribers via channels.
45+
type Broadcaster struct {
46+
ctx context.Context
47+
cancel context.CancelFunc
48+
49+
mu sync.RWMutex
50+
subscribers []chan BlockedEvent
51+
queueSize int
52+
closed atomic.Bool
53+
}
54+
55+
// NewBroadcaster builds a broadcaster with the given queue size (defaults to 128).
56+
func NewBroadcaster(ctx context.Context, cfg BroadcasterConfig) *Broadcaster {
57+
if cfg.QueueSize <= 0 {
58+
cfg.QueueSize = defaultQueueSize
59+
}
60+
cctx, cancel := context.WithCancel(ctx)
61+
return &Broadcaster{
62+
ctx: cctx,
63+
cancel: cancel,
64+
queueSize: cfg.QueueSize,
65+
}
66+
}
67+
68+
// AddSubscriber registers a new subscriber with its own buffered queue and worker.
69+
func (b *Broadcaster) AddSubscriber(sub Subscriber) {
70+
if sub == nil {
71+
return
72+
}
73+
ch := make(chan BlockedEvent, b.queueSize)
74+
75+
b.mu.Lock()
76+
b.subscribers = append(b.subscribers, ch)
77+
b.mu.Unlock()
78+
79+
go func() {
80+
for {
81+
select {
82+
case <-b.ctx.Done():
83+
return
84+
case ev, ok := <-ch:
85+
if !ok {
86+
return
87+
}
88+
sub.HandleBlocked(b.ctx, ev)
89+
}
90+
}
91+
}()
92+
}
93+
94+
// Publish sends an event to all subscribers; drops and logs when a subscriber queue is full.
95+
func (b *Broadcaster) Publish(event BlockedEvent) {
96+
if b.closed.Load() {
97+
return
98+
}
99+
100+
b.mu.RLock()
101+
defer b.mu.RUnlock()
102+
103+
for _, ch := range b.subscribers {
104+
select {
105+
case ch <- event:
106+
default:
107+
log.Warnf("[events] blocked-event queue full; dropping hostname %s", event.Hostname)
108+
}
109+
}
110+
}
111+
112+
// Close stops all workers and closes subscriber queues.
113+
func (b *Broadcaster) Close() {
114+
if b.closed.Load() {
115+
return
116+
}
117+
118+
b.cancel()
119+
120+
b.mu.Lock()
121+
defer b.mu.Unlock()
122+
subs := b.subscribers
123+
b.subscribers = nil
124+
125+
for _, ch := range subs {
126+
close(ch)
127+
}
128+
b.closed.Store(true)
129+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package events
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"io"
21+
"net/http"
22+
"net/http/httptest"
23+
"testing"
24+
"time"
25+
)
26+
27+
type captureSubscriber struct {
28+
recv chan BlockedEvent
29+
}
30+
31+
func (c *captureSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) {
32+
c.recv <- ev
33+
}
34+
35+
type blockingSubscriber struct {
36+
block chan struct{}
37+
}
38+
39+
func (b *blockingSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) {
40+
// Block until the channel is closed to simulate a slow consumer and trigger backpressure.
41+
<-b.block
42+
_ = ev
43+
}
44+
45+
func TestBroadcasterFanout(t *testing.T) {
46+
ctx, cancel := context.WithCancel(context.Background())
47+
defer cancel()
48+
49+
b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 2})
50+
51+
sub1 := &captureSubscriber{recv: make(chan BlockedEvent, 1)}
52+
sub2 := &captureSubscriber{recv: make(chan BlockedEvent, 1)}
53+
b.AddSubscriber(sub1)
54+
b.AddSubscriber(sub2)
55+
56+
ev := BlockedEvent{Hostname: "example.com.", Timestamp: time.Now()}
57+
b.Publish(ev)
58+
59+
select {
60+
case got := <-sub1.recv:
61+
if got.Hostname != ev.Hostname {
62+
t.Fatalf("sub1 expected hostname %s, got %s", ev.Hostname, got.Hostname)
63+
}
64+
case <-time.After(2 * time.Second):
65+
t.Fatal("sub1 did not receive event")
66+
}
67+
68+
select {
69+
case got := <-sub2.recv:
70+
if got.Hostname != ev.Hostname {
71+
t.Fatalf("sub2 expected hostname %s, got %s", ev.Hostname, got.Hostname)
72+
}
73+
case <-time.After(2 * time.Second):
74+
t.Fatal("sub2 did not receive event")
75+
}
76+
77+
b.Close()
78+
}
79+
80+
func TestBroadcasterDropsWhenSubscriberBackedUp(t *testing.T) {
81+
ctx, cancel := context.WithCancel(context.Background())
82+
defer cancel()
83+
84+
// Small queue; blocking subscriber will hold the first event.
85+
b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 1})
86+
block := make(chan struct{})
87+
sub := &blockingSubscriber{block: block}
88+
b.AddSubscriber(sub)
89+
90+
ev1 := BlockedEvent{Hostname: "first.example", Timestamp: time.Now()}
91+
ev2 := BlockedEvent{Hostname: "second.example", Timestamp: time.Now()}
92+
93+
b.Publish(ev1)
94+
// This publish should drop because subscriber is blocked and queue size is 1.
95+
b.Publish(ev2)
96+
97+
// Allow subscriber to drain and exit.
98+
close(block)
99+
100+
b.Close()
101+
}
102+
103+
func TestWebhookSubscriberSendsPayload(t *testing.T) {
104+
var (
105+
gotMethod string
106+
gotPayload webhookPayload
107+
)
108+
109+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
110+
gotMethod = r.Method
111+
body, _ := io.ReadAll(r.Body)
112+
_ = r.Body.Close()
113+
_ = json.Unmarshal(body, &gotPayload)
114+
w.WriteHeader(http.StatusOK)
115+
}))
116+
defer server.Close()
117+
118+
sub := NewWebhookSubscriber(server.URL)
119+
if sub == nil {
120+
t.Fatal("webhook subscriber should not be nil")
121+
}
122+
123+
ts := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)
124+
ev := BlockedEvent{Hostname: "Example.com.", Timestamp: ts}
125+
sub.HandleBlocked(context.Background(), ev)
126+
127+
if gotMethod != http.MethodPost {
128+
t.Fatalf("expected POST, got %s", gotMethod)
129+
}
130+
if gotPayload.Hostname != ev.Hostname {
131+
t.Fatalf("expected hostname %s, got %s", ev.Hostname, gotPayload.Hostname)
132+
}
133+
if gotPayload.Source != webhookSource {
134+
t.Fatalf("expected source %s, got %s", webhookSource, gotPayload.Source)
135+
}
136+
if gotPayload.Timestamp == "" {
137+
t.Fatalf("expected timestamp to be set")
138+
}
139+
}

0 commit comments

Comments
 (0)