|
func (api *Service) ListMessages( |
|
ctx context.Context, |
|
req *connect.Request[v1alpha.ListMessagesRequest], |
|
stream *connect.ServerStream[v1alpha.ListMessagesResponse], |
|
) error { |
|
lmq := httptypes.ListMessagesRequest{ |
|
TopicName: req.Msg.GetTopic(), |
|
StartOffset: req.Msg.GetStartOffset(), |
|
StartTimestamp: req.Msg.GetStartTimestamp(), |
|
PartitionID: req.Msg.GetPartitionId(), |
|
MaxResults: int(req.Msg.GetMaxResults()), |
|
FilterInterpreterCode: req.Msg.GetFilterInterpreterCode(), |
|
Enterprise: req.Msg.GetEnterprise(), |
|
PageToken: req.Msg.GetPageToken(), |
|
} |
|
|
|
interpreterCode, err := lmq.DecodeInterpreterCode() |
|
if err != nil { |
|
return apierrors.NewConnectError( |
|
connect.CodeInvalidArgument, |
|
fmt.Errorf("failed decoding provided interpreter code: %w", err), |
|
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()), |
|
) |
|
} |
|
|
|
// test compile |
|
code := fmt.Sprintf(`var isMessageOk = function() {%s}`, interpreterCode) |
|
_, err = goja.Compile("", code, true) |
|
if err != nil { |
|
return apierrors.NewConnectError( |
|
connect.CodeInvalidArgument, |
|
fmt.Errorf("failed to compile provided interpreter code: %w", err), |
|
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()), |
|
) |
|
} |
|
|
|
// Request messages from kafka and return them once we got all the messages or the context is done |
|
listReq := console.ListMessageRequest{ |
|
TopicName: lmq.TopicName, |
|
PartitionID: lmq.PartitionID, |
|
StartOffset: lmq.StartOffset, |
|
StartTimestamp: lmq.StartTimestamp, |
|
MessageCount: lmq.MaxResults, |
|
FilterInterpreterCode: interpreterCode, |
|
Troubleshoot: req.Msg.GetTroubleshoot(), |
|
IncludeRawPayload: req.Msg.GetIncludeOriginalRawPayload(), |
|
IgnoreMaxSizeLimit: req.Msg.GetIgnoreMaxSizeLimit(), |
|
KeyDeserializer: fromProtoEncoding(req.Msg.GetKeyDeserializer()), |
|
ValueDeserializer: fromProtoEncoding(req.Msg.GetValueDeserializer()), |
|
PageToken: lmq.PageToken, |
|
PageSize: int(req.Msg.GetPageSize()), |
|
} |
|
|
|
timeout := 35 * time.Second |
|
if req.Msg.GetFilterInterpreterCode() != "" || req.Msg.GetStartOffset() == console.StartOffsetNewest { |
|
// Push-down filters and StartOffset = Newest may be long-running streams. |
|
// There's already a client-side provided timeout which we usually trust. |
|
// But additionally we want to ensure it never takes much longer than that. |
|
timeout = 31 * time.Minute |
|
} |
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("list fetch timeout")) |
|
defer cancel() |
|
|
|
progress := &streamProgressReporter{ |
|
logger: api.logger, |
|
request: &listReq, |
|
stream: stream, |
|
messagesConsumed: atomic.Int64{}, |
|
bytesConsumed: atomic.Int64{}, |
|
} |
|
progress.Start(ctx) |
|
|
|
return api.consoleSvc.ListMessages(ctx, listReq, progress) |
|
} |
Description:
When Apache Kafka read/write throughput is slow, consuming messages based on a specific timestamp can exceed the default timeout (35s), leading to failures.
Expected Behavior
Make the timeout configurable instead of hardcoded.
console/backend/pkg/api/connect/service/console/service.go
Lines 51 to 125 in 6ee88a1
Environment
console version: v3.5.2 / 3.6.0 / 3.7.2