Skip to content
This repository was archived by the owner on Jun 20, 2024. It is now read-only.

Commit 044551d

Browse files
committed
heavily WIP: add a backend we can test against
1 parent ee4b023 commit 044551d

File tree

8 files changed

+627
-11
lines changed

8 files changed

+627
-11
lines changed

backend/handlers.go

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
unixfile "github.com/ipfs/go-unixfs/file"
8+
"github.com/ipld/go-car"
9+
"github.com/ipld/go-car/util"
10+
"io"
11+
"net/http"
12+
"os"
13+
"runtime/debug"
14+
"strconv"
15+
"strings"
16+
"time"
17+
18+
"github.com/prometheus/client_golang/prometheus"
19+
"github.com/prometheus/client_golang/prometheus/promhttp"
20+
21+
"github.com/ipfs/go-blockservice"
22+
"github.com/ipfs/go-cid"
23+
ipath "github.com/ipfs/interface-go-ipfs-core/path"
24+
)
25+
26+
func makeMetricsHandler(port int) (*http.Server, error) {
27+
mux := http.NewServeMux()
28+
29+
gatherers := prometheus.Gatherers{
30+
prometheus.DefaultGatherer,
31+
}
32+
options := promhttp.HandlerOpts{}
33+
mux.Handle("/debug/metrics/prometheus", promhttp.HandlerFor(gatherers, options))
34+
35+
return &http.Server{
36+
Handler: mux,
37+
Addr: ":" + strconv.Itoa(port),
38+
}, nil
39+
}
40+
41+
func withRequestLogger(next http.Handler) http.Handler {
42+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
43+
goLog.Infow(r.Method, "url", r.URL, "host", r.Host)
44+
// TODO: if debug is enabled, show more? goLog.Infow("request received", "url", r.URL, "host", r.Host, "method", r.Method, "ua", r.UserAgent(), "referer", r.Referer())
45+
next.ServeHTTP(w, r)
46+
})
47+
}
48+
49+
var noModtime = time.Unix(0, 0)
50+
51+
func makeGatewayCARHandler(bsrv blockservice.BlockService, port int) (*http.Server, error) {
52+
mux := http.NewServeMux()
53+
mux.HandleFunc("/ipfs/", func(w http.ResponseWriter, r *http.Request) {
54+
// the hour is a hard fallback, we don't expect it to happen, but just in case
55+
ctx, cancel := context.WithTimeout(r.Context(), time.Hour)
56+
defer cancel()
57+
r = r.WithContext(ctx)
58+
59+
defer func() {
60+
if r := recover(); r != nil {
61+
goLog.Error("A panic occurred in the gateway handler!")
62+
goLog.Error(r)
63+
debug.PrintStack()
64+
}
65+
}()
66+
67+
if r.Method != http.MethodGet {
68+
w.Header().Add("Allow", http.MethodGet)
69+
70+
errmsg := "Method " + r.Method + " not allowed"
71+
http.Error(w, errmsg, http.StatusMethodNotAllowed)
72+
return
73+
}
74+
75+
isCar := false
76+
if formatParam := r.URL.Query().Get("format"); formatParam != "" {
77+
isCar = formatParam == "car"
78+
if !isCar {
79+
http.Error(w, "only raw format supported", http.StatusBadRequest)
80+
return
81+
}
82+
} else {
83+
for _, header := range r.Header.Values("Accept") {
84+
for _, value := range strings.Split(header, ",") {
85+
accept := strings.TrimSpace(value)
86+
if accept == "application/vnd.ipld.car" {
87+
isCar = true
88+
break
89+
}
90+
}
91+
}
92+
}
93+
if !isCar {
94+
http.Error(w, "only car format supported", http.StatusBadRequest)
95+
return
96+
}
97+
98+
contentPath := ipath.New(r.URL.Path)
99+
if contentPath.Mutable() {
100+
http.Error(w, "only immutable block requests supported", http.StatusBadRequest)
101+
return
102+
} else if contentPath.Namespace() != "ipfs" {
103+
http.Error(w, "only the ipfs names is supported", http.StatusBadRequest)
104+
return
105+
}
106+
107+
carStream, err := simpleSelectorToCar(contentPath)
108+
if err != nil {
109+
http.Error(w, "only the ipfs names is supported", http.StatusBadRequest)
110+
return
111+
}
112+
113+
const immutableCacheControl = "public, max-age=29030400, immutable"
114+
// immutable! CACHE ALL THE THINGS, FOREVER! wolololol
115+
w.Header().Set("Cache-Control", immutableCacheControl)
116+
117+
// Set modtime to 'zero time' to disable Last-Modified header (superseded by Cache-Control)
118+
119+
io.Copy(w, carStream)
120+
return
121+
})
122+
123+
// Creates metrics handler for total response size. Matches the same metrics
124+
// from Kubo:
125+
// https://github.com/ipfs/kubo/blob/e550d9e4761ea394357c413c02ade142c0dea88c/core/corehttp/metrics.go#L79-L152
126+
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
127+
Namespace: "ipfs",
128+
Subsystem: "http",
129+
Name: "response_size_bytes",
130+
Help: "The HTTP response sizes in bytes.",
131+
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
132+
}, nil)
133+
err := prometheus.Register(sum)
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
// Construct the HTTP handler for the gateway.
139+
handler := promhttp.InstrumentHandlerResponseSize(sum, mux)
140+
141+
// Add logging
142+
handler = withRequestLogger(handler)
143+
144+
return &http.Server{
145+
Handler: handler,
146+
Addr: ":" + strconv.Itoa(port),
147+
}, nil
148+
}
149+
150+
func simpleSelectorToCar(ipfsPath ipath.Path) (io.ReadCloser, error) {
151+
pathSegs := strings.Split(ipfsPath.String(), "/")
152+
if len(pathSegs) < 3 || !(pathSegs[0] == "" && pathSegs[1] == "ipfs") {
153+
return nil, fmt.Errorf("invalid path")
154+
}
155+
pathSegs = pathSegs[2:]
156+
rootCidStr := pathSegs[0]
157+
rootCid, err := cid.Decode(rootCidStr)
158+
if err != nil {
159+
return nil, err
160+
}
161+
162+
r, w := io.Pipe()
163+
// Setup header for the output car
164+
err = car.WriteHeader(&car.CarHeader{
165+
Roots: []cid.Cid{rootCid},
166+
Version: 1,
167+
}, w)
168+
if err != nil {
169+
return nil, fmt.Errorf("writing car header: %w", err)
170+
}
171+
172+
go func() {
173+
defer w.Close()
174+
remainingPath := pathSegs[1:]
175+
unixfile.NewUnixfsFile()
176+
177+
err = util.LdWrite(os.Stdout, block.Cid().Bytes(), block.RawData()) // write to the output car
178+
if err != nil {
179+
return fmt.Errorf("writing to output car: %w", err)
180+
}
181+
}()
182+
_ = rootCid
183+
return r, nil
184+
}
185+
186+
func makeGatewayBlockHandler(bsrv blockservice.BlockService, port int) (*http.Server, error) {
187+
mux := http.NewServeMux()
188+
mux.HandleFunc("/ipfs/", func(w http.ResponseWriter, r *http.Request) {
189+
// the hour is a hard fallback, we don't expect it to happen, but just in case
190+
ctx, cancel := context.WithTimeout(r.Context(), time.Hour)
191+
defer cancel()
192+
r = r.WithContext(ctx)
193+
194+
defer func() {
195+
if r := recover(); r != nil {
196+
goLog.Error("A panic occurred in the gateway handler!")
197+
goLog.Error(r)
198+
debug.PrintStack()
199+
}
200+
}()
201+
202+
if r.Method != http.MethodGet {
203+
w.Header().Add("Allow", http.MethodGet)
204+
205+
errmsg := "Method " + r.Method + " not allowed"
206+
http.Error(w, errmsg, http.StatusMethodNotAllowed)
207+
return
208+
}
209+
210+
isBlock := false
211+
if formatParam := r.URL.Query().Get("format"); formatParam != "" {
212+
isBlock = formatParam == "raw"
213+
if !isBlock {
214+
http.Error(w, "only raw format supported", http.StatusBadRequest)
215+
return
216+
}
217+
} else {
218+
for _, header := range r.Header.Values("Accept") {
219+
for _, value := range strings.Split(header, ",") {
220+
accept := strings.TrimSpace(value)
221+
if accept == "application/vnd.ipld.raw" {
222+
isBlock = true
223+
break
224+
}
225+
}
226+
}
227+
}
228+
if !isBlock {
229+
http.Error(w, "only raw format supported", http.StatusBadRequest)
230+
return
231+
}
232+
233+
contentPath := ipath.New(r.URL.Path)
234+
if contentPath.Mutable() {
235+
http.Error(w, "only immutable block requests supported", http.StatusBadRequest)
236+
return
237+
} else if contentPath.Namespace() != "ipfs" {
238+
http.Error(w, "only the ipfs names is supported", http.StatusBadRequest)
239+
return
240+
}
241+
242+
strComps := strings.Split(strings.TrimRight(contentPath.String(), "/"), "/")
243+
if len(strComps) != 3 {
244+
http.Error(w, "requests must be for single raw blocks", http.StatusBadRequest)
245+
return
246+
}
247+
c, err := cid.Decode(strComps[2])
248+
if err != nil {
249+
http.Error(w, fmt.Sprintf("not a valid cid %s", strComps[2]), http.StatusBadRequest)
250+
return
251+
}
252+
253+
blk, err := bsrv.GetBlock(r.Context(), c)
254+
if err != nil {
255+
http.Error(w, fmt.Sprintf("could not get cid %s", c), http.StatusInternalServerError)
256+
return
257+
}
258+
259+
const immutableCacheControl = "public, max-age=29030400, immutable"
260+
// immutable! CACHE ALL THE THINGS, FOREVER! wolololol
261+
w.Header().Set("Cache-Control", immutableCacheControl)
262+
263+
// Set modtime to 'zero time' to disable Last-Modified header (superseded by Cache-Control)
264+
265+
http.ServeContent(w, r, c.String()+".bin", noModtime, bytes.NewReader(blk.RawData()))
266+
return
267+
})
268+
269+
// Creates metrics handler for total response size. Matches the same metrics
270+
// from Kubo:
271+
// https://github.com/ipfs/kubo/blob/e550d9e4761ea394357c413c02ade142c0dea88c/core/corehttp/metrics.go#L79-L152
272+
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
273+
Namespace: "ipfs",
274+
Subsystem: "http",
275+
Name: "response_size_bytes",
276+
Help: "The HTTP response sizes in bytes.",
277+
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
278+
}, nil)
279+
err := prometheus.Register(sum)
280+
if err != nil {
281+
return nil, err
282+
}
283+
284+
// Construct the HTTP handler for the gateway.
285+
handler := promhttp.InstrumentHandlerResponseSize(sum, mux)
286+
287+
// Add logging
288+
handler = withRequestLogger(handler)
289+
290+
return &http.Server{
291+
Handler: handler,
292+
Addr: ":" + strconv.Itoa(port),
293+
}, nil
294+
}

0 commit comments

Comments
 (0)