@@ -49,47 +49,58 @@ Here's the code to send events to an event hub. The main steps in the code are:
49
49
package main
50
50
51
51
import (
52
- " context"
52
+ " context"
53
53
54
- " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
54
+ " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
55
55
)
56
56
57
57
func main () {
58
+ // create an Event Hubs producer client using a connection string to the namespace and the event hub
59
+ producerClient , err := azeventhubs.NewProducerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , nil )
58
60
59
- // create an Event Hubs producer client using a connection string to the namespace and the event hub
60
- producerClient , err := azeventhubs.NewProducerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , nil )
61
+ if err != nil {
62
+ panic (err)
63
+ }
61
64
62
- if err != nil {
63
- panic (err)
64
- }
65
+ defer producerClient.Close (context.TODO ())
65
66
66
- defer producerClient.Close (context.TODO ())
67
+ // create sample events
68
+ events := createEventsForSample ()
67
69
68
- // create sample events
69
- events := createEventsForSample ()
70
+ // create a batch object and add sample events to the batch
71
+ newBatchOptions := &azeventhubs. EventDataBatchOptions {}
70
72
71
- // create a batch object and add sample events to the batch
72
- newBatchOptions := &azeventhubs.EventDataBatchOptions {}
73
+ batch , err := producerClient.NewEventDataBatch (context.TODO (), newBatchOptions)
73
74
74
- batch , err := producerClient.NewEventDataBatch (context.TODO (), newBatchOptions)
75
+ if err != nil {
76
+ panic (err)
77
+ }
75
78
76
- for i := 0 ; i < len (events); i++ {
77
- err = batch.AddEventData (events[i], nil )
78
- }
79
+ for i := 0 ; i < len (events); i++ {
80
+ err = batch.AddEventData (events[i], nil )
79
81
80
- // send the batch of events to the event hub
81
- producerClient.SendEventDataBatch (context.TODO (), batch, nil )
82
+ if err != nil {
83
+ panic (err)
84
+ }
85
+ }
86
+
87
+ // send the batch of events to the event hub
88
+ err = producerClient.SendEventDataBatch (context.TODO (), batch, nil )
89
+
90
+ if err != nil {
91
+ panic (err)
92
+ }
82
93
}
83
94
84
95
func createEventsForSample () []*azeventhubs .EventData {
85
- return []*azeventhubs.EventData {
86
- {
87
- Body: []byte (" hello" ),
88
- },
89
- {
90
- Body: []byte (" world" ),
91
- },
92
- }
96
+ return []*azeventhubs.EventData {
97
+ {
98
+ Body: []byte (" hello" ),
99
+ },
100
+ {
101
+ Body: []byte (" world" ),
102
+ },
103
+ }
93
104
}
94
105
```
95
106
@@ -134,101 +145,104 @@ Here's the code to receive events from an event hub. The main steps in the code
134
145
package main
135
146
136
147
import (
137
- " context"
138
- " errors"
139
- " fmt"
140
- " time"
141
-
142
- " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
143
- " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
144
- " github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
148
+ " context"
149
+ " errors"
150
+ " fmt"
151
+ " time"
152
+
153
+ " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
154
+ " github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
155
+ " github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
145
156
)
146
157
147
158
func main () {
148
159
149
- // create a container client using a connection string and container name
150
- checkClient , err := container.NewClientFromConnectionString (" AZURE STORAGE CONNECTION STRING" , " CONTAINER NAME" , nil )
151
-
152
- // create a checkpoint store that will be used by the event hub
153
- checkpointStore , err := checkpoints.NewBlobStore (checkClient, nil )
160
+ // create a container client using a connection string and container name
161
+ checkClient , err := container.NewClientFromConnectionString (" AZURE STORAGE CONNECTION STRING" , " CONTAINER NAME" , nil )
162
+
163
+ if err != nil {
164
+ panic (err)
165
+ }
154
166
155
- if err != nil {
156
- panic (err)
157
- }
167
+ // create a checkpoint store that will be used by the event hub
168
+ checkpointStore , err := checkpoints.NewBlobStore (checkClient, nil )
158
169
159
- // create a consumer client using a connection string to the namespace and the event hub
160
- consumerClient , err := azeventhubs.NewConsumerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , azeventhubs.DefaultConsumerGroup , nil )
170
+ if err != nil {
171
+ panic (err)
172
+ }
161
173
162
- if err != nil {
163
- panic (err)
164
- }
174
+ // create a consumer client using a connection string to the namespace and the event hub
175
+ consumerClient , err := azeventhubs.NewConsumerClientFromConnectionString (" NAMESPACE CONNECTION STRING" , " EVENT HUB NAME" , azeventhubs.DefaultConsumerGroup , nil )
165
176
166
- defer consumerClient.Close (context.TODO ())
177
+ if err != nil {
178
+ panic (err)
179
+ }
167
180
168
- // create a processor to receive and process events
169
- processor , err := azeventhubs.NewProcessor (consumerClient, checkpointStore, nil )
181
+ defer consumerClient.Close (context.TODO ())
170
182
171
- if err != nil {
172
- panic (err)
173
- }
183
+ // create a processor to receive and process events
184
+ processor , err := azeventhubs.NewProcessor (consumerClient, checkpointStore, nil )
174
185
175
- // for each partition in the event hub, create a partition client with processEvents as the function to process events
176
- dispatchPartitionClients := func () {
177
- for {
178
- partitionClient := processor.NextPartitionClient (context.TODO ())
186
+ if err != nil {
187
+ panic (err)
188
+ }
179
189
180
- if partitionClient == nil {
181
- break
182
- }
190
+ // for each partition in the event hub, create a partition client with processEvents as the function to process events
191
+ dispatchPartitionClients := func () {
192
+ for {
193
+ partitionClient := processor.NextPartitionClient (context.TODO ())
183
194
184
- go func () {
185
- if err := processEvents (partitionClient); err != nil {
186
- panic (err)
187
- }
188
- }()
189
- }
190
- }
195
+ if partitionClient == nil {
196
+ break
197
+ }
191
198
192
- // run all partition clients
193
- go dispatchPartitionClients ()
199
+ go func () {
200
+ if err := processEvents (partitionClient); err != nil {
201
+ panic (err)
202
+ }
203
+ }()
204
+ }
205
+ }
194
206
195
- processorCtx , processorCancel := context. WithCancel (context. TODO ())
196
- defer processorCancel ()
207
+ // run all partition clients
208
+ go dispatchPartitionClients ()
197
209
198
- if err := processor.Run (processorCtx); err != nil {
199
- panic (err)
200
- }
210
+ processorCtx , processorCancel := context.WithCancel (context.TODO ())
211
+ defer processorCancel ()
212
+
213
+ if err := processor.Run (processorCtx); err != nil {
214
+ panic (err)
215
+ }
201
216
}
202
217
203
218
func processEvents (partitionClient *azeventhubs .ProcessorPartitionClient ) error {
204
- defer closePartitionResources (partitionClient)
205
- for {
206
- receiveCtx , receiveCtxCancel := context.WithTimeout (context.TODO (), time.Minute )
207
- events , err := partitionClient.ReceiveEvents (receiveCtx, 100 , nil )
208
- receiveCtxCancel ()
209
-
210
- if err != nil && !errors.Is (err, context.DeadlineExceeded ) {
211
- return err
212
- }
213
-
214
- fmt.Printf (" Processing %d event(s)\n " , len (events))
215
-
216
- for _ , event := range events {
217
- fmt.Printf (" Event received with body %v \n " , string (event.Body ))
218
- }
219
-
220
- if len (events) != 0 {
221
- if err := partitionClient.UpdateCheckpoint (context.TODO (), events[len (events)-1 ]); err != nil {
222
- return err
223
- }
224
- }
225
- }
219
+ defer closePartitionResources (partitionClient)
220
+ for {
221
+ receiveCtx , receiveCtxCancel := context.WithTimeout (context.TODO (), time.Minute )
222
+ events , err := partitionClient.ReceiveEvents (receiveCtx, 100 , nil )
223
+ receiveCtxCancel ()
224
+
225
+ if err != nil && !errors.Is (err, context.DeadlineExceeded ) {
226
+ return err
227
+ }
228
+
229
+ fmt.Printf (" Processing %d event(s)\n " , len (events))
230
+
231
+ for _ , event := range events {
232
+ fmt.Printf (" Event received with body %v \n " , string (event.Body ))
233
+ }
234
+
235
+ if len (events) != 0 {
236
+ if err := partitionClient.UpdateCheckpoint (context.TODO (), events[len (events)-1 ], nil ); err != nil {
237
+ return err
238
+ }
239
+ }
240
+ }
226
241
}
227
242
228
243
func closePartitionResources (partitionClient *azeventhubs .ProcessorPartitionClient ) {
229
- defer partitionClient.Close (context.TODO ())
244
+ defer partitionClient.Close (context.TODO ())
230
245
}
231
-
232
246
```
233
247
234
248
## Run receiver and sender apps
0 commit comments