Skip to content

Commit a61ff96

Browse files
[azeventhubs] Fixing broken behavior in Processor (Azure#22833)
Explicitly disallow some sharp edges with Run(): * If Run() is currently active for this instance * If Run() has stopped for this instance. * Now we close the channel so NextPartitionClient() also returns nil if the Processor has been stopped. * Fixing the credential test - looks like a change in validation in the credential itself exposed some silliness on my end. Fixes Azure#22785
1 parent c23d009 commit a61ff96

File tree

7 files changed

+302
-29
lines changed

7 files changed

+302
-29
lines changed

sdk/messaging/azeventhubs/CHANGELOG.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# Release History
22

3-
## 1.1.1 (Unreleased)
4-
5-
### Features Added
6-
7-
### Breaking Changes
3+
## 1.2.0 (2024-05-07)
84

95
### Bugs Fixed
106

11-
### Other Changes
7+
Processor.Run had unclear behavior for some cases:
8+
- Run() now returns an explicit error when called more than once on a single
9+
Processor instance or if multiple Run calls are made concurrently. (PR#22833)
10+
- NextProcessorClient now properly terminates (and returns nil) if called on a
11+
stopped Processor. (PR#22833)
1212

1313
## 1.1.0 (2024-04-02)
1414

sdk/messaging/azeventhubs/example_enabling_logging_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ func Example_enableLogging() {
2323
)
2424

2525
fmt.Printf("Logging enabled\n")
26-
27-
// Output:
28-
// Logging enabled
29-
//
3026
}
3127

3228
func printLoggedEvent(event azlog.Event, s string) {

sdk/messaging/azeventhubs/internal/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
package internal
55

66
// Version is the semantic version number
7-
const Version = "v1.1.1"
7+
const Version = "v1.2.0"

sdk/messaging/azeventhubs/internal/test/test_helpers.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,6 @@ func GetConnectionParamsForTest(t *testing.T) ConnectionParamsForTest {
136136
ResourceGroup: envVars["RESOURCE_GROUP"],
137137
StorageConnectionString: envVars["CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"],
138138
SubscriptionID: envVars["AZURE_SUBSCRIPTION_ID"],
139-
TenantID: envVars["AZURE_TENANT_ID"],
140-
ClientID: envVars["AZURE_CLIENT_ID"],
141139
}
142140
}
143141

sdk/messaging/azeventhubs/processor.go

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package azeventhubs
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"math/rand"
1011
"sync"
@@ -78,6 +79,14 @@ type StartPositions struct {
7879
Default StartPosition
7980
}
8081

82+
type state int32
83+
84+
const (
85+
stateNone state = 0
86+
stateStopped state = 1
87+
stateRunning state = 2
88+
)
89+
8190
// Processor uses a [ConsumerClient] and [CheckpointStore] to provide automatic
8291
// load balancing between multiple Processor instances, even in separate
8392
// processes or on separate machines.
@@ -87,6 +96,9 @@ type StartPositions struct {
8796
//
8897
// [example_consuming_with_checkpoints_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go
8998
type Processor struct {
99+
stateMu sync.Mutex
100+
state state
101+
90102
ownershipUpdateInterval time.Duration
91103
defaultStartPositions StartPositions
92104
checkpointStore CheckpointStore
@@ -97,10 +109,10 @@ type Processor struct {
97109
consumerClient consumerClientForProcessor
98110

99111
nextClients chan *ProcessorPartitionClient
112+
nextClientsReady chan struct{}
100113
consumerClientDetails consumerClientDetails
101114

102-
runCalled chan struct{}
103-
lb *processorLoadBalancer
115+
lb *processorLoadBalancer
104116

105117
// claimedOwnerships is set to whatever our current ownerships are. The underlying
106118
// value is a []Ownership.
@@ -173,12 +185,13 @@ func newProcessorImpl(consumerClient consumerClientForProcessor, checkpointStore
173185
},
174186
prefetch: options.Prefetch,
175187
consumerClientDetails: consumerClient.getDetails(),
176-
runCalled: make(chan struct{}),
188+
nextClientsReady: make(chan struct{}),
177189
lb: newProcessorLoadBalancer(checkpointStore, consumerClient.getDetails(), strategy, partitionDurationExpiration),
178190
currentOwnerships: currentOwnerships,
179191

180-
// `nextClients` will be initialized when the user calls Run() since it needs to query the #
181-
// of partitions on the Event Hub.
192+
// `nextClients` will be properly initialized when the user calls
193+
// Run() since it needs to query the # of partitions on the Event Hub.
194+
nextClients: make(chan *ProcessorPartitionClient),
182195
}, nil
183196
}
184197

@@ -196,7 +209,7 @@ func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartition
196209
select {
197210
case <-ctx.Done():
198211
return nil
199-
case <-p.runCalled:
212+
case <-p.nextClientsReady:
200213
}
201214

202215
select {
@@ -207,12 +220,30 @@ func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartition
207220
}
208221
}
209222

223+
func (p *Processor) checkState() error {
224+
switch p.state {
225+
case stateNone:
226+
// not running so we can start. And lock out any other users.
227+
p.state = stateRunning
228+
return nil
229+
case stateRunning:
230+
return errors.New("the Processor is currently running. Concurrent calls to Run() are not allowed.")
231+
case stateStopped:
232+
return errors.New("the Processor has been stopped. Create a new instance to start processing again")
233+
default:
234+
return fmt.Errorf("unhandled state value %v", p.state)
235+
}
236+
}
237+
210238
// Run handles the load balancing loop, blocking until the passed in context is cancelled
211239
// or it encounters an unrecoverable error. On cancellation, it will return a nil error.
212240
//
213241
// This function should run for the lifetime of your application, or for as long as you want
214242
// to continue to claim and process partitions.
215243
//
244+
// Once a Processor has been stopped it cannot be restarted and a new instance must
245+
// be created.
246+
//
216247
// As partitions are claimed new [ProcessorPartitionClient] instances will be returned from
217248
// [Processor.NextPartitionClient]. This can happen at any time, based on new Processor instances
218249
// coming online, as well as other Processors exiting.
@@ -225,7 +256,15 @@ func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartition
225256
//
226257
// [example_consuming_with_checkpoints_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go
227258
func (p *Processor) Run(ctx context.Context) error {
228-
err := p.runImpl(ctx)
259+
p.stateMu.Lock()
260+
err := p.checkState()
261+
p.stateMu.Unlock()
262+
263+
if err != nil {
264+
return err
265+
}
266+
267+
err = p.runImpl(ctx)
229268

230269
// the context is the proper way to close down the Run() loop, so it's not
231270
// an error and doesn't need to be returned.
@@ -241,7 +280,7 @@ func (p *Processor) runImpl(ctx context.Context) error {
241280
defer func() {
242281
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
243282
defer cancel()
244-
p.closeConsumers(ctx, consumers)
283+
p.close(ctx, consumers)
245284
}()
246285

247286
// size the channel to the # of partitions. We can never exceed this size since
@@ -287,7 +326,7 @@ func (p *Processor) initNextClientsCh(ctx context.Context) (EventHubProperties,
287326
}
288327

289328
p.nextClients = make(chan *ProcessorPartitionClient, len(eventHubProperties.PartitionIDs))
290-
close(p.runCalled)
329+
close(p.nextClientsReady)
291330

292331
return eventHubProperties, nil
293332
}
@@ -433,7 +472,7 @@ func (p *Processor) getCheckpointsMap(ctx context.Context) (map[string]Checkpoin
433472
return m, nil
434473
}
435474

436-
func (p *Processor) closeConsumers(ctx context.Context, consumersMap *sync.Map) {
475+
func (p *Processor) close(ctx context.Context, consumersMap *sync.Map) {
437476
consumersMap.Range(func(key, value any) bool {
438477
client := value.(*ProcessorPartitionClient)
439478

@@ -455,6 +494,20 @@ func (p *Processor) closeConsumers(ctx context.Context, consumersMap *sync.Map)
455494
if err != nil {
456495
azlog.Writef(EventConsumer, "Failed to relinquish ownerships. New processors will have to wait for ownerships to expire: %s", err.Error())
457496
}
497+
498+
p.stateMu.Lock()
499+
p.state = stateStopped
500+
p.stateMu.Unlock()
501+
502+
// NextPartitionClient() will quit out now that p.nextClients is closed.
503+
close(p.nextClients)
504+
505+
select {
506+
case <-p.nextClientsReady:
507+
// already closed
508+
default:
509+
close(p.nextClientsReady)
510+
}
458511
}
459512

460513
// relinquishedOwnershipID indicates that a partition is immediately available, similar to

0 commit comments

Comments
 (0)