Skip to content

Commit 8176552

Browse files
szuecsmadumalt
andauthored
Feature: teeResponse filter (#3843)
Feature: teeResponse() filter Ref: #3836 --------- Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de> Co-authored-by: Thilina Madumal <madumalt@users.noreply.github.com>
1 parent c8fb663 commit 8176552

File tree

7 files changed

+312
-1
lines changed

7 files changed

+312
-1
lines changed

docs/reference/filters.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,7 @@ Example:
933933

934934
### loopbackIfStatus
935935

936-
Fallbacks to the given path if the response has the specified code. The filter
936+
Fallbacks to the given path if the response has the specified code. The filter
937937
replaces the response coming from the backend or the previous filters.
938938

939939
Parameters:
@@ -1022,6 +1022,15 @@ See also:
10221022
* [Tee predicate](predicates.md#tee)
10231023
* [Shadow Traffic Tutorial](../tutorials/shadow-traffic.md)
10241024

1025+
### teeResponse
1026+
1027+
The teeResponse filter provides the possibility to send the HTTP body
1028+
of the response to some other HTTP endpoint as request body.
1029+
1030+
```
1031+
r: * -> teeResponse("https://another-api.example.org") -> "http://api.example.org";
1032+
```
1033+
10251034
## HTTP Body
10261035
### compress
10271036

filters/builtin/builtin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func Filters() []filters.Spec {
202202
tee.NewTeeDeprecated(),
203203
tee.NewTeeNoFollow(),
204204
tee.NewTeeLoopback(),
205+
tee.NewTeeResponse(tee.Options{}),
205206
sed.New(),
206207
sed.NewDelimited(),
207208
sed.NewRequest(),

filters/filters.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ const (
282282
LogBodyName = "logBody"
283283
LogHeaderName = "logHeader"
284284
TeeName = "tee"
285+
TeeResponseName = "teeResponse"
285286
TeenfName = "teenf"
286287
TeeLoopbackName = "teeLoopback"
287288
SedName = "sed"

filters/tee/main_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,10 @@ func cleanupClients() {
2121
c.Close()
2222
}
2323
teeClients.mu.Unlock()
24+
25+
teeResponseClients.mu.Lock()
26+
for _, c := range teeResponseClients.store {
27+
c.Close()
28+
}
29+
teeResponseClients.mu.Unlock()
2430
}

filters/tee/tee_response.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package tee
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"net/url"
8+
9+
"github.com/sirupsen/logrus"
10+
"github.com/zalando/skipper/filters"
11+
"github.com/zalando/skipper/net"
12+
)
13+
14+
var teeResponseClients *teeClient = &teeClient{
15+
store: make(map[string]*net.Client),
16+
}
17+
18+
type (
19+
teeResponseSpec struct {
20+
options Options
21+
}
22+
23+
teeResponse struct {
24+
client *net.Client
25+
host string
26+
scheme string
27+
}
28+
)
29+
30+
// NewTeeResponse returns a new teeResponse filter Spec, whose instances create a Request against a shadow backend with the response body streamed to the client.
31+
// parameters: shadow backend url
32+
//
33+
// Name: "teeResponse".
34+
func NewTeeResponse(opt Options) filters.Spec {
35+
spec := &teeResponseSpec{options: Options{
36+
Timeout: defaultTeeTimeout,
37+
MaxIdleConns: defaultMaxIdleConns,
38+
MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
39+
IdleConnTimeout: defaultIdleConnTimeout,
40+
}}
41+
if opt.Timeout != 0 {
42+
spec.options.Timeout = opt.Timeout
43+
}
44+
if opt.IdleConnTimeout != 0 {
45+
spec.options.IdleConnTimeout = opt.IdleConnTimeout
46+
}
47+
if opt.MaxIdleConns != 0 {
48+
spec.options.MaxIdleConns = opt.MaxIdleConns
49+
}
50+
if opt.MaxIdleConnsPerHost != 0 {
51+
spec.options.MaxIdleConnsPerHost = opt.MaxIdleConnsPerHost
52+
}
53+
54+
return spec
55+
}
56+
57+
func (spec *teeResponseSpec) Name() string {
58+
return filters.TeeResponseName
59+
}
60+
61+
// CreateFilter creates a teeResponse Filter
62+
// Accepts only one parameter: shadow backend url
63+
func (spec *teeResponseSpec) CreateFilter(config []interface{}) (filters.Filter, error) {
64+
if len(config) != 1 {
65+
return nil, filters.ErrInvalidFilterParameters
66+
}
67+
backend, ok := config[0].(string)
68+
if !ok {
69+
return nil, filters.ErrInvalidFilterParameters
70+
}
71+
72+
u, err := url.Parse(backend)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
var client *net.Client
78+
teeResponseClients.mu.Lock()
79+
if cc, ok := teeResponseClients.store[u.Host]; !ok {
80+
client = net.NewClient(net.Options{
81+
Timeout: spec.options.Timeout,
82+
TLSHandshakeTimeout: spec.options.Timeout,
83+
ResponseHeaderTimeout: spec.options.Timeout,
84+
MaxIdleConns: spec.options.MaxIdleConns,
85+
MaxIdleConnsPerHost: spec.options.MaxIdleConnsPerHost,
86+
IdleConnTimeout: spec.options.IdleConnTimeout,
87+
Tracer: spec.options.Tracer,
88+
OpentracingComponentTag: "skipper",
89+
OpentracingSpanName: spec.Name(),
90+
})
91+
teeResponseClients.store[u.Host] = client
92+
} else {
93+
client = cc
94+
}
95+
teeResponseClients.mu.Unlock()
96+
97+
teeResponse := teeResponse{
98+
client: client,
99+
host: u.Host,
100+
scheme: u.Scheme,
101+
}
102+
103+
return &teeResponse, nil
104+
}
105+
106+
func (f *teeResponse) Response(fc filters.FilterContext) {
107+
if fc.Response().ContentLength == 0 {
108+
return
109+
}
110+
111+
pr, pw := io.Pipe()
112+
fc.Response().Body = &teeTie{fc.Response().Body, pw}
113+
114+
go func() {
115+
req, err := http.NewRequest("POST", fmt.Sprintf("%s://%s", f.scheme, f.host), pr)
116+
if err != nil {
117+
logrus.Errorf("Failed to create request: %v", err)
118+
return
119+
}
120+
121+
rsp, err := f.client.Do(req)
122+
if err != nil {
123+
fc.Logger().Warnf("tee: error while teeResponse request %v", err)
124+
return
125+
}
126+
127+
if rsp.Body != nil {
128+
io.Copy(io.Discard, rsp.Body)
129+
rsp.Body.Close()
130+
}
131+
}()
132+
}
133+
134+
func (*teeResponse) Request(filters.FilterContext) {}

filters/tee/tee_response_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package tee
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
10+
"github.com/zalando/skipper/eskip"
11+
"github.com/zalando/skipper/filters"
12+
"github.com/zalando/skipper/proxy/proxytest"
13+
)
14+
15+
func TestTeeResponseEndToEndBody(t *testing.T) {
16+
s := "hello"
17+
18+
shadowBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
19+
b, err := io.ReadAll(r.Body)
20+
if err != nil {
21+
t.Fatalf("Failed to read shadow request: %v", err)
22+
}
23+
24+
if s != string(b) {
25+
t.Fatalf("Failed to get the shadow request %q != %q", s, string(b))
26+
}
27+
28+
r.Body.Close()
29+
}))
30+
defer shadowBackend.Close()
31+
32+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
33+
io.Copy(io.Discard, r.Body)
34+
r.Body.Close()
35+
w.Write([]byte(s))
36+
}))
37+
defer backend.Close()
38+
39+
routeStr := fmt.Sprintf(`route1: * -> teeResponse("%v") -> "%v";`, shadowBackend.URL, backend.URL)
40+
41+
route := eskip.MustParse(routeStr)
42+
registry := make(filters.Registry)
43+
registry.Register(NewTeeResponse(Options{}))
44+
p := proxytest.New(registry, route...)
45+
defer p.Close()
46+
47+
req, err := http.NewRequest("GET", p.URL, nil)
48+
if err != nil {
49+
t.Error(err)
50+
}
51+
52+
rsp, err := (&http.Client{}).Do(req)
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
57+
b, err := io.ReadAll(rsp.Body)
58+
if err != nil {
59+
t.Fatalf("Failed to read request body: %v", err)
60+
}
61+
res := string(b)
62+
if res != s {
63+
t.Fatalf("Failed to get client result: %q != %q", res, s)
64+
}
65+
66+
rsp.Body.Close()
67+
}
68+
69+
func TestTeeResponseNoResponseBody(t *testing.T) {
70+
shadowBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
71+
b, _ := io.ReadAll(r.Body)
72+
if n := len(b); n != 0 {
73+
t.Fatalf("Failed to get no body, got: %d", n)
74+
}
75+
76+
r.Body.Close()
77+
}))
78+
defer shadowBackend.Close()
79+
80+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
81+
io.Copy(io.Discard, r.Body)
82+
r.Body.Close()
83+
w.WriteHeader(200)
84+
w.Write([]byte(""))
85+
}))
86+
defer backend.Close()
87+
88+
routeStr := fmt.Sprintf(`route1: * -> teeResponse("%v") -> "%v";`, shadowBackend.URL, backend.URL)
89+
90+
route := eskip.MustParse(routeStr)
91+
registry := make(filters.Registry)
92+
registry.Register(NewTeeResponse(Options{}))
93+
p := proxytest.New(registry, route...)
94+
defer p.Close()
95+
96+
req, err := http.NewRequest("GET", p.URL, nil)
97+
if err != nil {
98+
t.Error(err)
99+
}
100+
101+
rsp, err := (&http.Client{}).Do(req)
102+
if err != nil {
103+
t.Fatal(err)
104+
}
105+
106+
b, err := io.ReadAll(rsp.Body)
107+
if err != nil {
108+
t.Fatalf("Failed to read request body: %v", err)
109+
}
110+
res := string(b)
111+
if res != "" {
112+
t.Fatalf("Failed to get client result: %q != %q", res, "")
113+
}
114+
115+
rsp.Body.Close()
116+
}
117+
118+
func TestTeeResponseFailingShadow(t *testing.T) {
119+
s := "hello"
120+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
121+
io.Copy(io.Discard, r.Body)
122+
r.Body.Close()
123+
w.WriteHeader(200)
124+
w.Write([]byte(s))
125+
}))
126+
defer backend.Close()
127+
128+
routeStr := fmt.Sprintf(`route1: * -> teeResponse("%v") -> "%v";`, "http://localhost:34125", backend.URL)
129+
130+
route := eskip.MustParse(routeStr)
131+
registry := make(filters.Registry)
132+
registry.Register(NewTeeResponse(Options{}))
133+
p := proxytest.New(registry, route...)
134+
defer p.Close()
135+
136+
req, err := http.NewRequest("GET", p.URL, nil)
137+
if err != nil {
138+
t.Error(err)
139+
}
140+
141+
rsp, err := (&http.Client{}).Do(req)
142+
if err != nil {
143+
t.Fatal(err)
144+
}
145+
146+
b, err := io.ReadAll(rsp.Body)
147+
if err != nil {
148+
t.Fatalf("Failed to read request body: %v", err)
149+
}
150+
res := string(b)
151+
if res != s {
152+
t.Fatalf("Failed to get client result: %q != %q", res, s)
153+
}
154+
155+
rsp.Body.Close()
156+
}

skipper.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1737,6 +1737,10 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
17371737
NoFollow: true,
17381738
Tracer: tracer,
17391739
}),
1740+
// teeResponse()
1741+
teefilters.NewTeeResponse(teefilters.Options{
1742+
Tracer: tracer,
1743+
}),
17401744
)
17411745

17421746
if o.OAuthTokeninfoURL != "" {

0 commit comments

Comments
 (0)