|
| 1 | +/** |
| 2 | + * Copyright 2025 Confluent Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +// This is a simple example demonstrating how to produce a message to |
| 18 | +// a topic, and then reading it back again using a consumer. |
| 19 | +// The authentication uses the OpenID Connect method of |
| 20 | +// the OAUTHBEARER SASL mechanism, obtaining tokens from the |
| 21 | +// Azure Instance Metadata Service (IMDS) metadata based authentication. |
| 22 | + |
| 23 | +package main |
| 24 | + |
| 25 | +import ( |
| 26 | + "context" |
| 27 | + "fmt" |
| 28 | + "os" |
| 29 | + "os/signal" |
| 30 | + "sync" |
| 31 | + "syscall" |
| 32 | + "time" |
| 33 | + |
| 34 | + "github.com/confluentinc/confluent-kafka-go/v2/kafka" |
| 35 | +) |
| 36 | + |
| 37 | +const ( |
| 38 | + bootstrapServers = "<bootstrap_servers>" |
| 39 | +) |
| 40 | + |
| 41 | +func main() { |
| 42 | + extensions := fmt.Sprintf("logicalCluster=%s,identityPoolId=%s", "<logical_cluster>", "<identity_pool_id>") |
| 43 | + azureIMDSQueryParams := "api-version=&resource=&client_id=" |
| 44 | + config := fmt.Sprintf("query=%s", azureIMDSQueryParams) |
| 45 | + commonProperties := kafka.ConfigMap{ |
| 46 | + "bootstrap.servers": bootstrapServers, |
| 47 | + "security.protocol": "SASL_SSL", |
| 48 | + "sasl.mechanism": "OAUTHBEARER", |
| 49 | + "sasl.oauthbearer.method": "OIDC", |
| 50 | + "sasl.oauthbearer.extensions": extensions, |
| 51 | + "sasl.oauthbearer.metadata.authentication.type": "azure_imds", |
| 52 | + "sasl.oauthbearer.config": config, |
| 53 | + } |
| 54 | + producerProperties := kafka.ConfigMap{} |
| 55 | + consumerProperties := kafka.ConfigMap{ |
| 56 | + "group.id": "oauthbearer_oidc_azure_imds_example", |
| 57 | + "auto.offset.reset": "earliest", |
| 58 | + "enable.auto.offset.store": "false", |
| 59 | + } |
| 60 | + for k, v := range commonProperties { |
| 61 | + producerProperties[k] = v |
| 62 | + consumerProperties[k] = v |
| 63 | + } |
| 64 | + |
| 65 | + topic := "go-test-topic" |
| 66 | + createTopic(&producerProperties, topic) |
| 67 | + |
| 68 | + producer, err := kafka.NewProducer(&producerProperties) |
| 69 | + if err != nil { |
| 70 | + panic(fmt.Sprintf("Failed to create producer: %s", err)) |
| 71 | + } |
| 72 | + |
| 73 | + consumer, err := kafka.NewConsumer(&consumerProperties) |
| 74 | + if err != nil { |
| 75 | + panic(fmt.Sprintf("Failed to create consumer: %s", err)) |
| 76 | + } |
| 77 | + |
| 78 | + topics := []string{topic} |
| 79 | + err = consumer.SubscribeTopics(topics, nil) |
| 80 | + if err != nil { |
| 81 | + panic(fmt.Sprintf("Failed to subscribe to topics: %s", err)) |
| 82 | + } |
| 83 | + |
| 84 | + var wg sync.WaitGroup |
| 85 | + wg.Add(2) |
| 86 | + |
| 87 | + run := true |
| 88 | + signalChannel := make(chan os.Signal, 1) |
| 89 | + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) |
| 90 | + |
| 91 | + go func() { |
| 92 | + defer wg.Done() |
| 93 | + for run { |
| 94 | + select { |
| 95 | + case sig := <-signalChannel: |
| 96 | + fmt.Printf("Caught signal %v: terminating\n", sig) |
| 97 | + run = false |
| 98 | + default: |
| 99 | + // Produce a new record to the topic... |
| 100 | + value := "golang test value" |
| 101 | + producer.Produce(&kafka.Message{ |
| 102 | + TopicPartition: kafka.TopicPartition{Topic: &topic, |
| 103 | + Partition: kafka.PartitionAny}, |
| 104 | + Value: []byte(value)}, nil) |
| 105 | + loop: |
| 106 | + // Wait for delivery report |
| 107 | + for ev := range producer.Events() { |
| 108 | + switch e := ev.(type) { |
| 109 | + case *kafka.Message: |
| 110 | + message := e |
| 111 | + if message.TopicPartition.Error != nil { |
| 112 | + fmt.Printf("failed to deliver message: %v\n", |
| 113 | + message.TopicPartition) |
| 114 | + } else { |
| 115 | + fmt.Printf("delivered to topic %s [%d] at offset %v\n", |
| 116 | + *message.TopicPartition.Topic, |
| 117 | + message.TopicPartition.Partition, |
| 118 | + message.TopicPartition.Offset) |
| 119 | + } |
| 120 | + break loop |
| 121 | + default: |
| 122 | + // ignore other event types |
| 123 | + } |
| 124 | + } |
| 125 | + time.Sleep(1 * time.Second) |
| 126 | + } |
| 127 | + } |
| 128 | + }() |
| 129 | + |
| 130 | + go func() { |
| 131 | + defer wg.Done() |
| 132 | + for run { |
| 133 | + // Receive one message |
| 134 | + ev := consumer.Poll(100) |
| 135 | + if ev == nil { |
| 136 | + continue |
| 137 | + } |
| 138 | + |
| 139 | + switch e := ev.(type) { |
| 140 | + case *kafka.Message: |
| 141 | + fmt.Printf("%% Message on %s:\n%s\n", |
| 142 | + e.TopicPartition, string(e.Value)) |
| 143 | + if e.Headers != nil { |
| 144 | + fmt.Printf("%% Headers: %v\n", e.Headers) |
| 145 | + } |
| 146 | + _, err := consumer.StoreMessage(e) |
| 147 | + if err != nil { |
| 148 | + fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s, %s:\n", |
| 149 | + e.TopicPartition, err.Error()) |
| 150 | + } |
| 151 | + case kafka.Error: |
| 152 | + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) |
| 153 | + if e.Code() == kafka.ErrAllBrokersDown { |
| 154 | + run = false |
| 155 | + } |
| 156 | + default: |
| 157 | + fmt.Printf("Ignored %v\n", e) |
| 158 | + } |
| 159 | + } |
| 160 | + }() |
| 161 | + |
| 162 | + wg.Wait() |
| 163 | + producer.Close() |
| 164 | + consumer.Close() |
| 165 | +} |
| 166 | + |
| 167 | +func createTopic(adminProperties *kafka.ConfigMap, topic string) { |
| 168 | + adminClient, err := kafka.NewAdminClient(adminProperties) |
| 169 | + if err != nil { |
| 170 | + fmt.Printf("Failed to create Admin client: %s\n", err) |
| 171 | + os.Exit(1) |
| 172 | + } |
| 173 | + |
| 174 | + // Contexts are used to abort or limit the amount of time |
| 175 | + // the Admin call blocks waiting for a result. |
| 176 | + ctx, cancel := context.WithCancel(context.Background()) |
| 177 | + defer cancel() |
| 178 | + |
| 179 | + // Create topics on cluster. |
| 180 | + // Set Admin options to wait for the operation to finish (or at most 60s) |
| 181 | + maxDuration, err := time.ParseDuration("60s") |
| 182 | + if err != nil { |
| 183 | + panic("time.ParseDuration(60s)") |
| 184 | + } |
| 185 | + |
| 186 | + results, err := adminClient.CreateTopics(ctx, |
| 187 | + []kafka.TopicSpecification{{ |
| 188 | + Topic: topic, |
| 189 | + NumPartitions: 1, |
| 190 | + ReplicationFactor: 3}}, |
| 191 | + kafka.SetAdminOperationTimeout(maxDuration)) |
| 192 | + |
| 193 | + if err != nil { |
| 194 | + fmt.Printf("Problem during the topic creation: %v\n", err) |
| 195 | + os.Exit(1) |
| 196 | + } |
| 197 | + |
| 198 | + // Check for specific topic errors |
| 199 | + for _, result := range results { |
| 200 | + if result.Error.Code() != kafka.ErrNoError && |
| 201 | + result.Error.Code() != kafka.ErrTopicAlreadyExists { |
| 202 | + fmt.Printf("Topic creation failed for %s: %v", |
| 203 | + result.Topic, result.Error.String()) |
| 204 | + os.Exit(1) |
| 205 | + } |
| 206 | + } |
| 207 | + |
| 208 | + adminClient.Close() |
| 209 | +} |
0 commit comments