Skip to content

Commit d828bc6

Browse files
beelisSimon Beelinodece
authored andcommitted
[improve][fn] Introduce NewOutputMessageWithError to enable error handling (#24122)
Co-authored-by: Simon Beeli <[email protected]> Co-authored-by: Zixuan Liu <[email protected]> (cherry picked from commit d8e2743)
1 parent a630282 commit d828bc6

File tree

4 files changed

+70
-8
lines changed

4 files changed

+70
-8
lines changed

pulsar-function-go/pf/context.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ import (
3434
// message, what are our operating constraints, etc can be accessed by the
3535
// executing function
3636
type FunctionContext struct {
37-
instanceConf *instanceConf
38-
userConfigs map[string]interface{}
39-
logAppender *LogAppender
40-
outputMessage func(topic string) pulsar.Producer
41-
userMetrics sync.Map
42-
record pulsar.Message
37+
instanceConf *instanceConf
38+
userConfigs map[string]interface{}
39+
logAppender *LogAppender
40+
outputMessage func(topic string) pulsar.Producer
41+
outputMessageWithError func(topic string) (pulsar.Producer, error)
42+
userMetrics sync.Map
43+
record pulsar.Message
4344
}
4445

4546
// NewFuncContext returns a new Function context
@@ -161,6 +162,13 @@ func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {
161162
return c.outputMessage(topicName)
162163
}
163164

165+
// NewOutputMessageWithError send message to the topic and returns a potential error
166+
// @param topicName: The name of the topic for output message
167+
// @return A Pulsar producer for the given topic and an error, if any.
168+
func (c *FunctionContext) NewOutputMessageWithError(topicName string) (pulsar.Producer, error) {
169+
return c.outputMessageWithError(topicName)
170+
}
171+
164172
// SetCurrentRecord sets the current message into the function context called
165173
// for each message before executing a handler function
166174
func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {

pulsar-function-go/pf/context_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package pf
2121

2222
import (
2323
"context"
24+
"errors"
25+
"fmt"
2426
"testing"
2527
"time"
2628

@@ -82,3 +84,46 @@ func TestFunctionContext_NewOutputMessage(t *testing.T) {
8284
actualProducer := fc.NewOutputMessage(publishTopic)
8385
assert.IsType(t, &MockPulsarProducer{}, actualProducer)
8486
}
87+
88+
func TestFunctionContext_NewOutputMessageWithError(t *testing.T) {
89+
testErr := errors.New("test error")
90+
91+
testCases := []struct {
92+
name string
93+
outputFunc func(topic string) (pulsar.Producer, error)
94+
expectedError error
95+
expectedProducerType *MockPulsarProducer
96+
}{
97+
98+
{
99+
name: "Test producer",
100+
outputFunc: func(topic string) (pulsar.Producer, error) { return &MockPulsarProducer{}, nil },
101+
expectedError: nil,
102+
expectedProducerType: &MockPulsarProducer{},
103+
},
104+
{
105+
name: "Test error",
106+
outputFunc: func(topic string) (pulsar.Producer, error) { return nil, errors.New("test error") },
107+
expectedError: testErr,
108+
expectedProducerType: nil,
109+
},
110+
}
111+
112+
for i, testCase := range testCases {
113+
t.Run(fmt.Sprintf("testCase[%d] %s", i, testCase.name), func(t *testing.T) {
114+
115+
fc := NewFuncContext()
116+
publishTopic := "publish-topic"
117+
118+
fc.outputMessageWithError = testCase.outputFunc
119+
120+
actualProducer, err := fc.NewOutputMessageWithError(publishTopic)
121+
if testCase.expectedProducerType == nil {
122+
assert.Nil(t, actualProducer)
123+
} else {
124+
assert.IsType(t, testCase.expectedProducerType, actualProducer)
125+
}
126+
assert.Equal(t, testCase.expectedError, err)
127+
})
128+
}
129+
}

pulsar-function-go/pf/instance.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,15 @@ func newGoInstance() *goInstance {
7777
return producer
7878
}
7979

80+
goInstance.context.outputMessageWithError = func(topic string) (pulsar.Producer, error) {
81+
producer, err := goInstance.getProducer(topic)
82+
if err != nil {
83+
log.Errorf("getting producer failed, error is:%v", err)
84+
return nil, err
85+
}
86+
return producer, nil
87+
}
88+
8089
goInstance.lastHealthCheckTS = now.UnixNano()
8190
goInstance.properties = make(map[string]string)
8291
goInstance.stats = NewStatWithLabelValues(goInstance.getMetricsLabels()...)

pulsar-function-go/pf/util_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func TestUtils(t *testing.T) {
4444
expectedFQFN := getDefaultSubscriptionName(tenant, namespace, name)
4545
assert.Equal(t, expectedFQFN, fqfn)
4646

47-
actualtMap := getProperties(fqfn, 100)
48-
assert.Equal(t, propertiesMap, actualtMap)
47+
actualMap := getProperties(fqfn, 100)
48+
assert.Equal(t, propertiesMap, actualMap)
4949

5050
expectedRes := getFullyQualifiedInstanceID(tenant, namespace, name, instanceID)
5151
assert.Equal(t, expectedRes, "pulsar/function/go:100")

0 commit comments

Comments
 (0)