|
1 | | -This is forked from https://github.com/ClickHouse/clickhouse-go. Necessary changes are made to support https://github.com/timeplus-io/proton with date types and namespaces changes. |
| 1 | +# Proton Go Driver |
| 2 | + |
| 3 | +## Introduction |
| 4 | +[Proton](https://github.com/timeplus-io/proton) is a unified streaming and historical data processing engine in a single binary. The historical store is built based on [ClickHouse](https://github.com/ClickHouse/ClickHouse). |
| 5 | + |
| 6 | +This project provides go driver to interact with Proton, the code is based on https://github.com/ClickHouse/clickhouse-go. |
| 7 | + |
| 8 | +## Installation |
| 9 | + |
| 10 | +To get started, you need to have Go installed. Then, import the Proton Database Go Driver using Go Modules: |
| 11 | + |
| 12 | +```shell |
| 13 | +go get github.com/timeplus-io/proton-go-driver/v2 |
| 14 | +``` |
| 15 | + |
| 16 | +## Quick Start |
| 17 | + |
| 18 | +1. Run proton with docker, `docker run -d -p 8463:8463 --pull always --name proton ghcr.io/timeplus-io/proton:develop` |
| 19 | +2. Run following Golang code |
| 20 | + |
| 21 | +```go |
| 22 | +package main |
| 23 | + |
| 24 | +import ( |
| 25 | + "fmt" |
| 26 | + "github.com/timeplus-io/proton-go-driver/v2" |
| 27 | +) |
| 28 | + |
| 29 | +func main() { |
| 30 | + conn := proton.OpenDB(&proton.Options{ |
| 31 | + Addr: []string{"127.0.0.1:8463"}, |
| 32 | + Auth: proton.Auth{ |
| 33 | + Username: "default", |
| 34 | + Password: "", |
| 35 | + }, |
| 36 | + }) |
| 37 | + var value int |
| 38 | + conn.QueryRow("SELECT 300").Scan(&value) |
| 39 | + fmt.Println(value) |
| 40 | +} |
| 41 | +``` |
| 42 | + |
| 43 | +above code should return 1 , which shows that everything is working fine now. |
| 44 | + |
| 45 | +## Connecting to Proton Database |
| 46 | + |
| 47 | +To connect to the Proton database, create a connection using the following code: |
| 48 | + |
| 49 | +```go |
| 50 | +conn := proton.OpenDB(&proton.Options{ |
| 51 | + Addr: []string{"127.0.0.1:8463"}, |
| 52 | + Auth: proton.Auth{ |
| 53 | + Database: "default", |
| 54 | + Username: "default", |
| 55 | + Password: "", |
| 56 | + }, |
| 57 | + DialTimeout: 5 * time.Second, |
| 58 | + Compression: &proton.Compression{ |
| 59 | + proton.CompressionLZ4, |
| 60 | + }, |
| 61 | +}) |
| 62 | +conn.SetMaxIdleConns(5) |
| 63 | +conn.SetMaxOpenConns(10) |
| 64 | +conn.SetConnMaxLifetime(time.Hour) |
| 65 | +ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) { |
| 66 | + if rand.Float32() < 0.3 { |
| 67 | + log.Println("progress:", p) |
| 68 | + } |
| 69 | +})) |
| 70 | +``` |
| 71 | + |
| 72 | +## Create Stream |
| 73 | + |
| 74 | +Before working with streaming data, you need to initialize it. Here's an example for creating a stream: |
| 75 | + |
| 76 | +```go |
| 77 | +if _, err := conn.ExecContext(ctx, "DROP STREAM IF EXISTS car"); err != nil { |
| 78 | + return err |
| 79 | +} |
| 80 | +if _, err := conn.ExecContext(ctx, "CREATE STREAM IF NOT EXISTS car(id int64, speed float64)"); err != nil { |
| 81 | + return err |
| 82 | +} |
| 83 | +``` |
| 84 | +## Batch Insertion |
| 85 | + |
| 86 | +```go |
| 87 | +scope, err := conn.Begin() |
| 88 | +if err != nil { |
| 89 | + log.Fatal(err) |
| 90 | +} |
| 91 | +batch, err := scope.PrepareContext(ctx, "INSERT INTO car (id, speed, _tp_time) values") |
| 92 | +for i := 0; i < 20; i++ { |
| 93 | + speed := rand.Float64()*20 + 50 |
| 94 | + _, err := batch.Exec(id, speed, time.Now()) |
| 95 | + if err != nil { |
| 96 | + log.Fatal(err) |
| 97 | + } |
| 98 | + time.Sleep(time.Duration(100) * time.Millisecond) |
| 99 | +} |
| 100 | +err = scope.Commit() |
| 101 | +if err != nil { |
| 102 | + log.Fatal(err) |
| 103 | +} |
| 104 | +``` |
| 105 | + |
| 106 | +## Streaming Query |
| 107 | + |
| 108 | +```go |
| 109 | +const QueryDDL = `SELECT id, avg(speed), window_start, window_end |
| 110 | + FROM session(car, 1h, [speed >= 60, speed < 60)) |
| 111 | + GROUP BY id, window_start, window_end` |
| 112 | +conn, ctx := getConnection(context.Background()) |
| 113 | +ctx, cancel := context.WithCancel(ctx) |
| 114 | +rows, err := conn.QueryContext(ctx, QueryDDL) |
| 115 | +if err != nil { |
| 116 | + log.Fatal(err) |
| 117 | +} |
| 118 | +defer rows.Close() |
| 119 | +go func() { |
| 120 | + time.Sleep(time.Duration(20) * time.Second) |
| 121 | + cancel() |
| 122 | +}() |
| 123 | +for rows.Next() { |
| 124 | + var car SpeedingCarRcd |
| 125 | + if err := rows.Scan(&car.Id, &car.Speed, &car.Start, &car.End); err != nil { |
| 126 | + log.Fatal(err) |
| 127 | + } |
| 128 | + log.Printf("%+v", car) |
| 129 | +} |
| 130 | +err = rows.Err() |
| 131 | +if err != nil { |
| 132 | + log.Fatal(err) |
| 133 | +} |
| 134 | +``` |
| 135 | + |
| 136 | +> [!NOTE] |
| 137 | +> To cancel a streaming query, you need to use the cancel function returned by `context.WithCancel`. |
0 commit comments