-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
77 lines (65 loc) · 1.93 KB
/
main.go
File metadata and controls
77 lines (65 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
"github.com/segmentio/kafka-go"
)
// main is the entry point of the program demonstrating real-time data integration
// from Kafka to a PostgreSQL database.
func main() {
// Kafka configuration
brokerAddress := "localhost:9092"
topic := "example-topic"
dbConnStr := "postgres://username:password@localhost/dbname?sslmode=disable"
// Kafka reader configuration for consuming messages
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer reader.Close()
// Database connection
db, err := sql.Open("postgres", dbConnStr)
if err != nil {
log.Fatalf("failed to connect to the database: %v", err)
}
defer db.Close()
// Ensure database connection is alive
err = db.Ping()
if err != nil {
log.Fatalf("failed to ping the database: %v", err)
}
fmt.Println("Connected to Kafka and database, starting to read messages...")
// Infinite loop to continuously read from Kafka handle" functions
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Printf("failed to read message: %v", err)
continue
}
fmt.Printf("Message received: Key=%s, Value=%s", string(msg.Key), string(msg.Value))
// Example of simple data storage
query := "INSERT INTO messages (key, value) VALUES ($1, $2)"
_, err = db.Exec(query, string(msg.Key), string(msg.Value))
if err != nil {
log.Printf("failed to execute query: %v", err)
} else {
fmt.Println("Message stored in database")
}
}
}
// handleError handles error logging in a consistent manner.
// It demonstrates structured error handling as a best practice in Go.
func handleError(err error, message string) {
if err != nil {
log.Fatalf("%s: %v", message, err)
}
}