diff --git a/Dockerfile b/Dockerfile index dee6803..d3a9405 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,6 +12,11 @@ RUN apk --no-cache add build-base git RUN go install github.com/google/gops@latest WORKDIR /root/ + +# Cache dependencies download +ADD go.mod go.sum ./ +RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get + ADD . /root RUN cd /root/src && go build -o pulsar-beam diff --git a/README.md b/README.md index 10a1d2a..c67caa6 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,13 @@ Pulsar Beam requires the same public and private keys to generate and verify JWT To disable JWT authentication, set the paramater `HTTPAuthImpl` in the config file or env variable to `noauth`. +Notice: Pulsar Beam create one client connection per pulsar url per token, so using other authorization on top of Pulsar Beam may cause memory leak due to creating of a lot of pulsar client. In order to use other authorization like reverse proxy (like nginx) on top of Pulsar Beam, please disable Pulsar authorization by setting `PulsarTokenHeaderName` to empty string (default is "Authorization"). If you would like to keep both authorization of reverse proxy and Pulsar, please change `PulsarTokenHeaderName` to another header name that is different than "Authorization" or not using by reverse proxy. + +How to know that you are under memory leak? + +- Option 1: Use `gops` to check running go routine and if you are having a lot of routine that doing ping/pong with Pulsar brokers. +- Option 2: We are using Pulsar Beam as http webhook receiver at https://doopage.com and we are able to handle few millions of request every day with CPU stable at <3% (8vCPU AWS) and Memory <40MB (`WorkerPoolSize` = 16). If you are using more resources than us, please try to set `PulsarTokenHeaderName` to empty string to check whether the problem is resolved. + ### Sink source If a webhook's response contains a body and three headers including `Authorization` for Pulsar JWT, `TopicFn` for a topic fully qualified name, and `PulsarUrl`, the beam server will send the body as a new message to the Pulsar's topic specified as in TopicFn and PulsarUrl. diff --git a/src/broker/webhook.go b/src/broker/webhook.go index 0b06c1e..d78595a 100644 --- a/src/broker/webhook.go +++ b/src/broker/webhook.go @@ -139,7 +139,7 @@ func toPulsar(r *http.Response) { return } - err3 := pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, true) + err3 := pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, true, false, 0) if err3 != nil { return } diff --git a/src/pulsardriver/pulsar-client.go b/src/pulsardriver/pulsar-client.go index cb85632..3b6bf32 100644 --- a/src/pulsardriver/pulsar-client.go +++ b/src/pulsardriver/pulsar-client.go @@ -29,6 +29,7 @@ func GetPulsarClient(pulsarURL, pulsarToken string, reset bool) (pulsar.Client, clientSync.Lock() driver, ok := ClientCache[key] if !ok { + log.Debugf("Creating new pulsar client cache %s\n token %s", pulsarURL, pulsarToken) driver = &PulsarClient{} driver.createdAt = time.Now() driver.pulsarURL = pulsarURL @@ -67,9 +68,6 @@ func (c *PulsarClient) GetClient(url, tokenStr string) (pulsar.Client, error) { log.Errorf("failed instantiate pulsar client %v", err) return nil, fmt.Errorf("Could not instantiate Pulsar client: %v", err) } - if log.GetLevel() == log.DebugLevel { - log.Debugf("pulsar client url %s\n token %s", url, tokenStr) - } c.client = driver return driver, nil diff --git a/src/pulsardriver/pulsar-producer.go b/src/pulsardriver/pulsar-producer.go index cfddc7c..25da09c 100644 --- a/src/pulsardriver/pulsar-producer.go +++ b/src/pulsardriver/pulsar-producer.go @@ -13,6 +13,7 @@ import ( ) var producerCacheTTL = util.GetEnvInt("ProducerCacheTTL", 900) +var producerSendRetryLimit = util.GetEnvInt("ProducerSendRetryLimit", 1) // ProducerCache is the cache for Producer objects var ProducerCache = util.NewCache(util.CacheOption{ @@ -28,11 +29,14 @@ var ProducerCache = util.NewCache(util.CacheOption{ }) // GetPulsarProducer gets a Pulsar producer object -func GetPulsarProducer(pulsarURL, pulsarToken, topic string) (pulsar.Producer, error) { +func GetPulsarProducer(pulsarURL, pulsarToken, topic string, reconnect bool) (pulsar.Producer, error) { key := pulsarURL + pulsarToken + topic obj, exists := ProducerCache.Get(key) if exists { if driver, ok := obj.(*PulsarProducer); ok { + if reconnect { + return driver.Reconnect() + } return driver.GetProducer() } } @@ -68,8 +72,8 @@ type PulsarProducer struct { } // SendToPulsar sends data to a Pulsar producer. -func SendToPulsar(url, token, topic string, data []byte, async bool) error { - p, err := GetPulsarProducer(url, token, topic) +func SendToPulsar(url, token, topic string, data []byte, async bool, reconnect bool, retried int) error { + p, err := GetPulsarProducer(url, token, topic, reconnect) if err != nil { log.Errorf("Failed to create Pulsar produce err: %v", err) return errors.New("Failed to create Pulsar producer") @@ -96,12 +100,36 @@ func SendToPulsar(url, token, topic string, data []byte, async bool) error { p.SendAsync(ctx, &message, func(messageId pulsar.MessageID, msg *pulsar.ProducerMessage, err error) { if err != nil { log.Warnf("send to Pulsar err %v", err) + var pulsarErr *pulsar.Error + if errors.As(err, &pulsarErr) { + // Do reconnect and re-send if producer was closed + if pulsarErr.Result() == pulsar.ProducerClosed { + if retried < producerSendRetryLimit { + log.Warnf("retry sending to Pulsar due to %v", err) + SendToPulsar(url, token, topic, data, async, true, retried+1) + } + } + } // TODO: push to a queue for retry } }) return nil } _, err = p.Send(ctx, &message) + if err != nil { + log.Warnf("send to Pulsar err %v", err) + var pulsarErr *pulsar.Error + if errors.As(err, &pulsarErr) { + if pulsarErr.Result() == pulsar.ProducerClosed { + // Do reconnect and re-send if producer was closed + if retried < producerSendRetryLimit { + log.Warnf("retry sending to Pulsar due to %v", err) + return SendToPulsar(url, token, topic, data, async, true, retried+1) + } + } + } + } + return err } diff --git a/src/route/handlers.go b/src/route/handlers.go index 6c863bf..7f8a08b 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/url" "strings" @@ -26,9 +25,27 @@ var singleDb db.Db const subDelimiter = "-" +// 5MB + 1 byte buffer (default Pulsar message size limit is 5MB https://pulsar.apache.org/docs/concepts-messaging/) +const workerBufferSize = 5242881 + +var workerPool chan func(buffer []byte) + // Init initializes database func Init() { singleDb = db.NewDbWithPanic(util.GetConfig().PbDbType) + + log.Infof("Start worker pool with size = %d", util.GetConfig().WorkerPoolSize) + workerPool = make(chan func(buffer []byte), util.GetConfig().WorkerPoolSize) + + // Start a number of goroutine as worker pool + for i := 0; i < util.GetConfig().WorkerPoolSize; i++ { + go func() { + var buffer [workerBufferSize]byte + for f := range workerPool { + f(buffer[:]) + } + }() + } } // TokenServerResponse is the json object for token server response @@ -75,45 +92,106 @@ func StatusPage(w http.ResponseWriter, r *http.Request) { // ReceiveHandler - the message receiver handler func ReceiveHandler(w http.ResponseWriter, r *http.Request) { - var b []byte - var err error - if r.Header.Get("Content-Encoding") == "gzip" { - g, gerr := gzip.NewReader(r.Body) - if gerr != nil { - util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError) + done := make(chan bool) + workerPool <- func(buffer []byte) { + var b []byte = buffer[:0] + var err error + var bufferSize int = 0 + + defer r.Body.Close() + defer func() { done <- true }() + + // Include request line (GET /uri HTTP/1.1) into the message payload if url has includeRequestLine=true + includeRequestLine, isIncludeRequestLine := r.URL.Query()["includeRequestLine"] + + if isIncludeRequestLine && includeRequestLine[0] != "false" { + b = append(append(append(append(append(append(b, r.Method...), " "...), r.RequestURI...), " "...), r.Proto...), "\r\n"...) + } + + // Include headers information into the message payload if url has includeHeaders=true + includeHeaders, isIncludeHeaders := r.URL.Query()["includeHeaders"] + + if isIncludeHeaders && includeHeaders[0] != "false" { + for name, values := range r.Header { + b = append(append(append(append(b, name...), ": "...), values[0]...), "\r\n"...) + } + } + + // Append header delimiter (\r\n\r\n) and adjust the buffer size + if isIncludeRequestLine || isIncludeHeaders { + b = append(b, "\r\n\r\n"...) + bufferSize = len(b) + } + + if r.Header.Get("Content-Encoding") == "gzip" { + g, gerr := gzip.NewReader(r.Body) + + if gerr != nil { + util.ResponseErrorJSON(gerr, w, http.StatusInternalServerError) + return + } + + defer g.Close() + + var n int + for { + n, err = g.Read(buffer[bufferSize:]) + bufferSize += n + if err == io.EOF { + break + } else if err != nil { + util.ResponseErrorJSON(err, w, http.StatusInternalServerError) + return + } else if bufferSize >= workerBufferSize { + util.ResponseErrorJSON(errors.New("Buffer overflow"), w, http.StatusInternalServerError) + return + } + } + } else { + var n int + for { + n, err = r.Body.Read(buffer[bufferSize:]) + bufferSize += n + if err == io.EOF { + break + } else if err != nil { + util.ResponseErrorJSON(err, w, http.StatusInternalServerError) + return + } else if bufferSize >= workerBufferSize { + util.ResponseErrorJSON(errors.New("Buffer overflow"), w, http.StatusInternalServerError) + return + } + } + } + + b = buffer[:bufferSize] + log.Debugf("Message buffer (size = %d): %s", bufferSize, b); + + token, topic, pulsarURL, err := util.ReceiverHeader(util.AllowedPulsarURLs, &r.Header) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusUnauthorized) return } - b, err = ioutil.ReadAll(g) - } else { - b, err = ioutil.ReadAll(r.Body) - } - defer r.Body.Close() - if err != nil { - util.ResponseErrorJSON(err, w, http.StatusInternalServerError) - return - } - token, topic, pulsarURL, err := util.ReceiverHeader(util.AllowedPulsarURLs, &r.Header) - if err != nil { - util.ResponseErrorJSON(err, w, http.StatusUnauthorized) - return - } - - topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r)) - if topic == "" && err2 != nil { - // only read topic from routes - util.ResponseErrorJSON(err2, w, http.StatusUnprocessableEntity) - return - } - topicFN = util.AssignString(topic, topicFN) // header topicFn overwrites topic specified in the routes - log.Infof("topicFN %s pulsarURL %s", topicFN, pulsarURL) + + topicFN, err2 := GetTopicFnFromRoute(mux.Vars(r)) + if topic == "" && err2 != nil { + // only read topic from routes + util.ResponseErrorJSON(err2, w, http.StatusUnprocessableEntity) + return + } + topicFN = util.AssignString(topic, topicFN) // header topicFn overwrites topic specified in the routes + log.Infof("topicFN %s pulsarURL %s", topicFN, pulsarURL) - pulsarAsync := r.URL.Query().Get("mode") == "async" - err = pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, pulsarAsync) - if err != nil { - util.ResponseErrorJSON(err, w, http.StatusServiceUnavailable) + pulsarAsync := r.URL.Query().Get("mode") == "async" + err = pulsardriver.SendToPulsar(pulsarURL, token, topicFN, b, pulsarAsync, false, 0) + if err != nil { + util.ResponseErrorJSON(err, w, http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) return } - w.WriteHeader(http.StatusOK) + <-done return } diff --git a/src/route/router.go b/src/route/router.go index 8ae106b..847b664 100644 --- a/src/route/router.go +++ b/src/route/router.go @@ -2,6 +2,7 @@ package route import ( "net/http" + "net/http/pprof" "github.com/gorilla/mux" @@ -25,8 +26,17 @@ func NewRouter(mode *string) *mux.Router { Path(route.Pattern). Name(route.Name). Handler(route.AuthFunc(handler)) - } + + router.Handle("/debug/pprof", http.HandlerFunc(pprof.Index)) + router.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + router.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + router.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + router.Handle("/debug/pprof/heap", pprof.Handler("heap")) + router.Handle("/debug/pprof/block", pprof.Handler("block")) + router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) + router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + // TODO rate limit can be added per route basis router.Use(middleware.LimitRate) diff --git a/src/util/config.go b/src/util/config.go index 2eddf6e..8f99796 100644 --- a/src/util/config.go +++ b/src/util/config.go @@ -8,6 +8,7 @@ import ( "os" "reflect" "strings" + "strconv" "unicode" @@ -81,6 +82,12 @@ type Configuration struct { // HTTPAuthImpl specifies the jwt authen and authorization algorithm, `noauth` to skip JWT authentication HTTPAuthImpl string `json:"HTTPAuthImpl"` + + // Limit concurency of receiver. Every worker will need to allocate a buffer memory (default is 5MB) + WorkerPoolSize int `json:"WorkerPoolSize"` + + // Name of the HTTP header to use for Pulsar token to authorize pulsar client, set tp empty to disable pulsar token authorization + PulsarTokenHeaderName string `json:"PulsarTokenHeaderName"` } var ( @@ -103,6 +110,11 @@ var ( // Init initializes configuration func Init() { configFile := AssignString(os.Getenv("PULSAR_BEAM_CONFIG"), DefaultConfigFile) + + // Default config + Config.WorkerPoolSize = 4 + Config.PulsarTokenHeaderName = "Authorization" + ReadConfigFile(configFile) log.SetLevel(logLevel(Config.LogLevel)) @@ -140,14 +152,35 @@ func ReadConfigFile(configFile string) { for i := 0; i < fields.NumField(); i++ { field := fields.Field(i).Name f := st.FieldByName(field) - - if f.Kind() == reflect.String { - envV := os.Getenv(field) - if len(envV) > 0 && f.IsValid() && f.CanSet() { - f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file - } - os.Setenv(field, f.String()) - } + envV, envPresent := os.LookupEnv(field) + + switch f.Kind() { + case reflect.String: + if envPresent && f.IsValid() && f.CanSet() { + f.SetString(strings.TrimSuffix(envV, "\n")) // ensure no \n at the end of line that was introduced by loading k8s secrete file + } + os.Setenv(field, f.String()) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if len(envV) > 0 && f.IsValid() && f.CanSet() { + intVal, err := strconv.ParseInt(envV, 10, 64) + if err != nil { + panic(err) + } else { + f.SetInt(intVal) + } + } + os.Setenv(field, strconv.FormatInt(f.Int(), 10)) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if len(envV) > 0 && f.IsValid() && f.CanSet() { + uintVal, err := strconv.ParseUint(envV, 10, 64) + if err != nil { + panic(err) + } else { + f.SetUint(uintVal) + } + } + os.Setenv(field, strconv.FormatUint(f.Uint(), 10)) + } } clusterStr := AssignString(Config.PulsarClusters, "") @@ -163,8 +196,8 @@ func ReadConfigFile(configFile string) { Config.PORT, Config.PbDbType, Config.PbDbInterval, Config.TrustStore, Config.DbName, Config.DbConnectionStr) fmt.Printf("PublicKey %s, PrivateKey %s\n", Config.PulsarPublicKey, Config.PulsarPrivateKey) - fmt.Printf("PulsarBrokerURL %s, AllowedPulsarURLs %v,PulsarTLSAllowInsecureConnection %s,PulsarTLSValidateHostname %s\n", - Config.PulsarBrokerURL, AllowedPulsarURLs, Config.PulsarTLSAllowInsecureConnection, Config.PulsarTLSValidateHostname) + fmt.Printf("PulsarBrokerURL %s, AllowedPulsarURLs %v,PulsarTLSAllowInsecureConnection %s,PulsarTLSValidateHostname %s,PulsarTokenHeaderName %s\n", + Config.PulsarBrokerURL, AllowedPulsarURLs, Config.PulsarTLSAllowInsecureConnection, Config.PulsarTLSValidateHostname, Config.PulsarTokenHeaderName) } //GetConfig returns a reference to the Configuration diff --git a/src/util/util.go b/src/util/util.go index 1093f19..8de3968 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -59,7 +59,10 @@ func ResponseErrorJSON(e error, w http.ResponseWriter, statusCode int) { // ReceiverHeader parses headers for Pulsar required configuration func ReceiverHeader(allowedClusters []string, h *http.Header) (token, topicFN, pulsarURL string, err error) { - token = strings.TrimSpace(strings.Replace(h.Get("Authorization"), "Bearer", "", 1)) + token = "" + if GetConfig().PulsarTokenHeaderName != "" { + token = strings.TrimSpace(strings.Replace(h.Get(GetConfig().PulsarTokenHeaderName), "Bearer", "", 1)) + } topicFN = h.Get("TopicFn") pulsarURL = h.Get("PulsarUrl") if len(allowedClusters) > 1 || (len(allowedClusters) == 1 && allowedClusters[0] != "") {