Skip to content

Commit 418a96f

Browse files
authored
feat: make configurable hyper table check for timescale db (#40)
* feat: make configurable hyper table check for timescale db * feat: make configurable hyper table check for timescale db --------- Co-authored-by: Serhat Karabulut <serhat.karabulut@trendyolgo.com>
1 parent 5dd6ef7 commit 418a96f

File tree

3 files changed

+30
-19
lines changed

3 files changed

+30
-19
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ This setup ensures continuous data synchronization and minimal downtime in captu
154154
| `slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
155155
| `slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
156156
| `slot.slotActivityCheckerInterval` | int | no | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
157+
| `extensionSupport.enableTimescaleDB` | bool | no | false | Enable support for TimescaleDB hypertables. Ensures proper handling of compressed chunks during replication. | |
157158

158159
### API
159160

config/config.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@ import (
1414
)
1515

1616
type Config struct {
17-
Logger LoggerConfig `json:"logger" yaml:"logger"`
18-
Host string `json:"host" yaml:"host"`
19-
Username string `json:"username" yaml:"username"`
20-
Password string `json:"password" yaml:"password"`
21-
Database string `json:"database" yaml:"database"`
22-
Publication publication.Config `json:"publication" yaml:"publication"`
23-
Slot slot.Config `json:"slot" yaml:"slot"`
24-
Port int `json:"port" yaml:"port"`
25-
Metric MetricConfig `json:"metric" yaml:"metric"`
26-
DebugMode bool `json:"debugMode" yaml:"debugMode"`
17+
Logger LoggerConfig `json:"logger" yaml:"logger"`
18+
Host string `json:"host" yaml:"host"`
19+
Username string `json:"username" yaml:"username"`
20+
Password string `json:"password" yaml:"password"`
21+
Database string `json:"database" yaml:"database"`
22+
Publication publication.Config `json:"publication" yaml:"publication"`
23+
Slot slot.Config `json:"slot" yaml:"slot"`
24+
Port int `json:"port" yaml:"port"`
25+
Metric MetricConfig `json:"metric" yaml:"metric"`
26+
DebugMode bool `json:"debugMode" yaml:"debugMode"`
27+
ExtensionSupport ExtensionSupport `json:"extensionSupport" yaml:"extensionSupport"`
2728
}
2829

2930
type MetricConfig struct {
@@ -35,6 +36,10 @@ type LoggerConfig struct {
3536
LogLevel slog.Level `json:"level" yaml:"level"` // if custom logger is nil, set the slog log level
3637
}
3738

39+
type ExtensionSupport struct {
40+
EnableTimeScaleDB bool `json:"enableTimeScaleDB" yaml:"EnableTimeScaleDB"`
41+
}
42+
3843
func (c *Config) DSN() string {
3944
// URL-encode username and password to handle special characters
4045
encodedUsername := url.QueryEscape(c.Username)

connector.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,16 @@ func NewConnector(ctx context.Context, cfg config.Config, listenerFunc replicati
112112

113113
prometheusRegistry := metric.NewRegistry(m)
114114

115-
tdb, err := timescaledb.NewTimescaleDB(ctx, cfg.DSN())
116-
if err != nil {
117-
return nil, err
118-
}
119-
120-
_, err = tdb.FindHyperTables(ctx)
121-
if err != nil {
122-
return nil, err
115+
var tdb *timescaledb.TimescaleDB
116+
if cfg.ExtensionSupport.EnableTimeScaleDB {
117+
tdb, err = timescaledb.NewTimescaleDB(ctx, cfg.DSN())
118+
if err != nil {
119+
return nil, err
120+
}
121+
_, err = tdb.FindHyperTables(ctx)
122+
if err != nil {
123+
return nil, err
124+
}
123125
}
124126

125127
return &connector{
@@ -156,7 +158,10 @@ func (c *connector) Start(ctx context.Context) {
156158

157159
logger.Info("slot captured")
158160
go c.slot.Metrics(ctx)
159-
go c.timescaleDB.SyncHyperTables(ctx)
161+
162+
if c.timescaleDB != nil {
163+
go c.timescaleDB.SyncHyperTables(ctx)
164+
}
160165

161166
signal.Notify(c.cancelCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT, syscall.SIGQUIT)
162167

0 commit comments

Comments
 (0)