Skip to content

Commit 8d8c724

Browse files
Merge branch 'main' into remove-reflect-methods
2 parents b67d27e + e85ce0b commit 8d8c724

File tree

5 files changed

+34
-7
lines changed

5 files changed

+34
-7
lines changed

.release-please-manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
".": "4.80.1"
2+
".": "4.80.3"
33
}

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [4.80.3](https://github.com/cloudquery/plugin-sdk/compare/v4.80.2...v4.80.3) (2025-05-19)
9+
10+
11+
### Bug Fixes
12+
13+
* Pass correct value for plugin version ([#2156](https://github.com/cloudquery/plugin-sdk/issues/2156)) ([37b4157](https://github.com/cloudquery/plugin-sdk/commit/37b41572165f96c8d0c67390bded2b20815a506d))
14+
15+
## [4.80.2](https://github.com/cloudquery/plugin-sdk/compare/v4.80.1...v4.80.2) (2025-05-19)
16+
17+
18+
### Bug Fixes
19+
20+
* Change logic for batch writing to write when batch size is reached, not exceeded ([#2153](https://github.com/cloudquery/plugin-sdk/issues/2153)) ([58c8a1e](https://github.com/cloudquery/plugin-sdk/commit/58c8a1e35d8d77f7cb1ae1c73e70e4a21b23e0a7))
21+
* Flush DeleteRecord messages when batch writer is flushed ([#2154](https://github.com/cloudquery/plugin-sdk/issues/2154)) ([791c865](https://github.com/cloudquery/plugin-sdk/commit/791c8658a0b0224f080dce8ea0cf734dbc9ce911))
22+
823
## [4.80.1](https://github.com/cloudquery/plugin-sdk/compare/v4.80.0...v4.80.1) (2025-05-12)
924

1025

examples/simple_plugin/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ toolchain go1.24.1
66

77
require (
88
github.com/apache/arrow-go/v18 v18.2.0
9-
github.com/cloudquery/plugin-sdk/v4 v4.80.1
9+
github.com/cloudquery/plugin-sdk/v4 v4.80.2
1010
github.com/rs/zerolog v1.34.0
1111
)
1212

serve/package.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func (s *PluginServe) build(pluginDirectory string, target plugin.BuildTarget, d
129129
if target.IncludeSymbols {
130130
stripSymbols = ""
131131
}
132-
ldFlags := fmt.Sprintf("%[1]s -w -X %[2]s/plugin.Version=%[3]s -X %[2]s/resources/plugin.Version=%[2]s", stripSymbols, importPath, pluginVersion)
132+
ldFlags := fmt.Sprintf("%[1]s -w -X %[2]s/plugin.Version=%[3]s -X %[2]s/resources/plugin.Version=%[3]s", stripSymbols, importPath, pluginVersion)
133133
args := []string{"build", "-o", pluginPath}
134134
args = append(args, "-buildmode=exe")
135135
args = append(args, "-ldflags", ldFlags)

writers/batchwriter/batchwriter.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ func (w *BatchWriter) Flush(ctx context.Context) error {
107107
if err := w.flushMigrateTables(ctx); err != nil {
108108
return err
109109
}
110-
return w.flushDeleteStaleTables(ctx)
110+
if err := w.flushDeleteStaleTables(ctx); err != nil {
111+
return err
112+
}
113+
return w.flushDeleteRecordTables(ctx)
111114
}
112115

113116
func (w *BatchWriter) Close(context.Context) error {
@@ -278,7 +281,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
278281
w.deleteStaleMessages = append(w.deleteStaleMessages, m)
279282
l := int64(len(w.deleteStaleMessages))
280283
w.deleteStaleLock.Unlock()
281-
if w.batchSize > 0 && l > w.batchSize {
284+
if w.isLimitReached(l) {
282285
if err := w.flushDeleteStaleTables(ctx); err != nil {
283286
return err
284287
}
@@ -298,7 +301,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
298301
w.deleteRecordMessages = append(w.deleteRecordMessages, m)
299302
l := int64(len(w.deleteRecordMessages))
300303
w.deleteRecordLock.Unlock()
301-
if w.batchSize > 0 && l > w.batchSize {
304+
if w.isLimitReached(l) {
302305
if err := w.flushDeleteRecordTables(ctx); err != nil {
303306
return err
304307
}
@@ -310,6 +313,9 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
310313
if err := w.flushDeleteStaleTables(ctx); err != nil {
311314
return err
312315
}
316+
if err := w.flushDeleteRecordTables(ctx); err != nil {
317+
return err
318+
}
313319
if err := w.startWorker(ctx, m); err != nil {
314320
return err
315321
}
@@ -322,7 +328,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
322328
w.migrateTableMessages = append(w.migrateTableMessages, m)
323329
l := int64(len(w.migrateTableMessages))
324330
w.migrateTableLock.Unlock()
325-
if w.batchSize > 0 && l > w.batchSize {
331+
if w.isLimitReached(l) {
326332
if err := w.flushMigrateTables(ctx); err != nil {
327333
return err
328334
}
@@ -332,6 +338,12 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
332338
return nil
333339
}
334340

341+
func (w *BatchWriter) isLimitReached(rowCount int64) bool {
342+
limit := batch.CappedAt(0, w.batchSize)
343+
limit.AddRows(rowCount)
344+
return limit.ReachedLimit()
345+
}
346+
335347
func (w *BatchWriter) startWorker(_ context.Context, msg *message.WriteInsert) error {
336348
w.workersLock.RLock()
337349
md := msg.Record.Schema().Metadata()

0 commit comments

Comments
 (0)