@@ -49,47 +49,58 @@ Here's the code to send events to an event hub. The main steps in the code are:
49
49
package main
50
50
51
51
import (
52
- " context"
52
+ " context"
53
53
54
- " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
54
+ " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
55
55
)
56
56
57
57
func main () {
58
+ // create an Event Hubs producer client using a connection string to the namespace and the event hub
59
+ producerClient , err := azeventhubs.NewProducerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , nil )
58
60
59
- // create an Event Hubs producer client using a connection string to the namespace and the event hub
60
- producerClient , err := azeventhubs.NewProducerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , nil )
61
+ if err != nil {
62
+ panic (err)
63
+ }
61
64
62
- if err != nil {
63
- panic (err)
64
- }
65
+ defer producerClient.Close (context.TODO ())
65
66
66
- defer producerClient.Close (context.TODO ())
67
+ // create sample events
68
+ events := createEventsForSample ()
67
69
68
- // create sample events
69
- events := createEventsForSample ()
70
+ // create a batch object and add sample events to the batch
71
+ newBatchOptions := &azeventhubs. EventDataBatchOptions {}
70
72
71
- // create a batch object and add sample events to the batch
72
- newBatchOptions := &azeventhubs.EventDataBatchOptions {}
73
+ batch , err := producerClient.NewEventDataBatch (context.TODO (), newBatchOptions)
73
74
74
- batch , err := producerClient.NewEventDataBatch (context.TODO (), newBatchOptions)
75
+ if err != nil {
76
+ panic (err)
77
+ }
75
78
76
- for i := 0 ; i < len (events); i++ {
77
- err = batch.AddEventData (events[i], nil )
78
- }
79
+ for i := 0 ; i < len (events); i++ {
80
+ err = batch.AddEventData (events[i], nil )
79
81
80
- // send the batch of events to the event hub
81
- producerClient.SendEventDataBatch (context.TODO (), batch, nil )
82
+ if err != nil {
83
+ panic (err)
84
+ }
85
+ }
86
+
87
+ // send the batch of events to the event hub
88
+ err = producerClient.SendEventDataBatch (context.TODO (), batch, nil )
89
+
90
+ if err != nil {
91
+ panic (err)
92
+ }
82
93
}
83
94
84
95
func createEventsForSample () []*azeventhubs .EventData {
85
- return []*azeventhubs.EventData {
86
- {
87
- Body: []byte (" hello" ),
88
- },
89
- {
90
- Body: []byte (" world" ),
91
- },
92
- }
96
+ return []*azeventhubs.EventData {
97
+ {
98
+ Body: []byte (" hello" ),
99
+ },
100
+ {
101
+ Body: []byte (" world" ),
102
+ },
103
+ }
93
104
}
94
105
```
95
106
@@ -133,100 +144,104 @@ Here's the code to receive events from an event hub. The main steps in the code
133
144
package main
134
145
135
146
import (
136
- " context"
137
- " errors"
138
- " fmt"
139
- " time"
140
-
141
- " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
142
- " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
147
+ " context"
148
+ " errors"
149
+ " fmt"
150
+ " time"
151
+
152
+ " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
153
+ " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
154
+ " github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
143
155
)
144
156
145
157
func main () {
146
158
147
- // create a container client using a connection string and container name
148
- checkClient , err := container.NewClientFromConnectionString (" AZURE STORAGE CONNECTION STRING" , " CONTAINER NAME" , nil )
149
-
150
- // create a checkpoint store that will be used by the event hub
151
- checkpointStore , err := checkpoints.NewBlobStore (checkClient, nil )
159
+ // create a container client using a connection string and container name
160
+ checkClient , err := container.NewClientFromConnectionString (" AZURE STORAGE CONNECTION STRING" , " CONTAINER NAME" , nil )
152
161
153
- if err != nil {
154
- panic (err)
155
- }
162
+ if err != nil {
163
+ panic (err)
164
+ }
156
165
157
- // create a consumer client using a connection string to the namespace and the event hub
158
- consumerClient , err := azeventhubs. NewConsumerClientFromConnectionString ( " NAMESPACE CONNECTION STRING " , " EVENT HUB NAME " , azeventhubs. DefaultConsumerGroup , nil )
166
+ // create a checkpoint store that will be used by the event hub
167
+ checkpointStore , err := checkpoints. NewBlobStore (checkClient , nil )
159
168
160
- if err != nil {
161
- panic (err)
162
- }
169
+ if err != nil {
170
+ panic (err)
171
+ }
163
172
164
- defer consumerClient.Close (context.TODO ())
173
+ // create a consumer client using a connection string to the namespace and the event hub
174
+ consumerClient , err := azeventhubs.NewConsumerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , azeventhubs.DefaultConsumerGroup , nil )
165
175
166
- // create a processor to receive and process events
167
- processor , err := azeventhubs.NewProcessor (consumerClient, checkpointStore, nil )
176
+ if err != nil {
177
+ panic (err)
178
+ }
168
179
169
- if err != nil {
170
- panic (err)
171
- }
180
+ defer consumerClient.Close (context.TODO ())
172
181
173
- // for each partition in the event hub, create a partition client with processEvents as the function to process events
174
- dispatchPartitionClients := func () {
175
- for {
176
- partitionClient := processor.NextPartitionClient (context.TODO ())
182
+ // create a processor to receive and process events
183
+ processor , err := azeventhubs.NewProcessor (consumerClient, checkpointStore, nil )
177
184
178
- if partitionClient = = nil {
179
- break
180
- }
185
+ if err ! = nil {
186
+ panic (err)
187
+ }
181
188
182
- go func () {
183
- if err := processEvents (partitionClient); err != nil {
184
- panic (err)
185
- }
186
- }()
187
- }
188
- }
189
+ // for each partition in the event hub, create a partition client with processEvents as the function to process events
190
+ dispatchPartitionClients := func () {
191
+ for {
192
+ partitionClient := processor.NextPartitionClient (context.TODO ())
189
193
190
- // run all partition clients
191
- go dispatchPartitionClients ()
194
+ if partitionClient == nil {
195
+ break
196
+ }
192
197
193
- processorCtx , processorCancel := context.WithCancel (context.TODO ())
194
- defer processorCancel ()
198
+ go func () {
199
+ if err := processEvents (partitionClient); err != nil {
200
+ panic (err)
201
+ }
202
+ }()
203
+ }
204
+ }
195
205
196
- if err := processor.Run (processorCtx); err != nil {
197
- panic (err)
198
- }
206
+ // run all partition clients
207
+ go dispatchPartitionClients ()
208
+
209
+ processorCtx , processorCancel := context.WithCancel (context.TODO ())
210
+ defer processorCancel ()
211
+
212
+ if err := processor.Run (processorCtx); err != nil {
213
+ panic (err)
214
+ }
199
215
}
200
216
201
217
func processEvents (partitionClient *azeventhubs .ProcessorPartitionClient ) error {
202
- defer closePartitionResources (partitionClient)
203
- for {
204
- receiveCtx , receiveCtxCancel := context.WithTimeout (context.TODO (), time.Minute )
205
- events , err := partitionClient.ReceiveEvents (receiveCtx, 100 , nil )
206
- receiveCtxCancel ()
207
-
208
- if err != nil && !errors.Is (err, context.DeadlineExceeded ) {
209
- return err
210
- }
211
-
212
- fmt.Printf (" Processing %d event(s)\n " , len (events))
213
-
214
- for _ , event := range events {
215
- fmt.Printf (" Event received with body %v \n " , string (event.Body ))
216
- }
217
-
218
- if len (events) != 0 {
219
- if err := partitionClient.UpdateCheckpoint (context.TODO (), events[len (events)-1 ]); err != nil {
220
- return err
221
- }
222
- }
223
- }
218
+ defer closePartitionResources (partitionClient)
219
+ for {
220
+ receiveCtx , receiveCtxCancel := context.WithTimeout (context.TODO (), time.Minute )
221
+ events , err := partitionClient.ReceiveEvents (receiveCtx, 100 , nil )
222
+ receiveCtxCancel ()
223
+
224
+ if err != nil && !errors.Is (err, context.DeadlineExceeded ) {
225
+ return err
226
+ }
227
+
228
+ fmt.Printf (" Processing %d event(s)\n " , len (events))
229
+
230
+ for _ , event := range events {
231
+ fmt.Printf (" Event received with body %v \n " , string (event.Body ))
232
+ }
233
+
234
+ if len (events) != 0 {
235
+ if err := partitionClient.UpdateCheckpoint (context.TODO (), events[len (events)-1 ], nil ); err != nil {
236
+ return err
237
+ }
238
+ }
239
+ }
224
240
}
225
241
226
242
func closePartitionResources (partitionClient *azeventhubs .ProcessorPartitionClient ) {
227
- defer partitionClient.Close (context.TODO ())
243
+ defer partitionClient.Close (context.TODO ())
228
244
}
229
-
230
245
```
231
246
232
247
## Run receiver and sender apps
0 commit comments