Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/pulsarutil/start_pos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pulsarutil

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

func GetStartOption(startFrom string) (pulsar.SubscriptionInitialPosition, *pulsar.MessageID, *time.Time, error) {
switch {
case startFrom == "earliest":
return pulsar.SubscriptionPositionEarliest, nil, nil, nil
case strings.HasPrefix(startFrom, "messageId:"):
parts := strings.Split(strings.TrimPrefix(startFrom, "messageId:"), ":")
if len(parts) != 2 {
return 0, nil, nil, fmt.Errorf("invalid messageId format, expected ledgerId:entryId")
}
ledgerId, err1 := strconv.ParseInt(parts[0], 10, 64)
entryId, err2 := strconv.ParseInt(parts[1], 10, 64)
if err1 != nil || err2 != nil {
return 0, nil, nil, fmt.Errorf("invalid messageId numbers")
}
msgID := pulsar.NewMessageID(ledgerId, entryId, -1)
return 0, &msgID, nil, nil
case strings.HasPrefix(startFrom, "timestamp:"):
millisStr := strings.TrimPrefix(startFrom, "timestamp:")
millis, err := strconv.ParseInt(millisStr, 10, 64)
if err != nil {
return 0, nil, nil, fmt.Errorf("invalid timestamp")
}
t := time.UnixMilli(millis)
return 0, nil, &t, nil
default:
return 0, nil, nil, fmt.Errorf("unsupported startFrom value")
}
}
20 changes: 20 additions & 0 deletions src/route/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/kafkaesque-io/pulsar-beam/src/model"
"github.com/kafkaesque-io/pulsar-beam/src/pulsardriver"
"github.com/kafkaesque-io/pulsar-beam/src/util"
"github.com/kafkaesque-io/pulsar-beam/src/pulsarutil"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -196,6 +197,9 @@ func SSEHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Parse the startFrom query parameter
startFrom := r.URL.Query().Get("startFrom")

// Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
Expand All @@ -219,6 +223,22 @@ func SSEHandler(w http.ResponseWriter, r *http.Request) {
defer consumer.Unsubscribe()
}

// Apply seek after subscribing based on the startFrom query parameter
if startFrom != "" {
pos, msgID, ts, err := pulsarutil.GetStartOption(startFrom)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if msgID != nil {
consumer.Seek(*msgID)
} else if ts != nil {
consumer.SeekByTime(*ts)
} else {
consumer.SeekByTime(time.Now())
}
}

consumChan := consumer.Chan()
for {
select {
Expand Down