Skip to content

Commit c95a2a8

Browse files
serprexalexstoick
andauthored
PEERDB_SNOWFLAKE_AUTO_COMPRESS (#3137)
This allows us to skip the auto-compression enabled by default for Snowflake Snowflake seems to struggle with certain files that get auto-compressed and then fail to decompress: ``` Invalid data encountered during decompression for file: 'not_file', compression type used: 'ZSTD', cause: 'Data corruption detected' ``` closes #3123 --------- Co-authored-by: Stoica Alexandru <[email protected]>
1 parent 3491eb7 commit c95a2a8

File tree

2 files changed

+32
-13
lines changed

2 files changed

+32
-13
lines changed

flow/connectors/snowflake/qrep_avro_sync.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1616
"github.com/PeerDB-io/peerdb/flow/generated/protos"
17+
"github.com/PeerDB-io/peerdb/flow/internal"
1718
"github.com/PeerDB-io/peerdb/flow/model"
1819
"github.com/PeerDB-io/peerdb/flow/shared"
1920
"github.com/PeerDB-io/peerdb/flow/shared/types"
@@ -199,12 +200,18 @@ func (s *SnowflakeAvroSyncHandler) putFileToStage(ctx context.Context, avroFile
199200
return nil
200201
}
201202

202-
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)
203+
autoCompressionStr := ""
204+
autoCompression, err := internal.PeerDBSnowflakeAutoCompress(ctx, s.config.Env)
205+
if err != nil {
206+
s.logger.Warn("Failed to load PEERDB_SNOWFLAKE_AUTO_COMPRESS, proceeding without", slog.Any("error", err))
207+
} else if !autoCompression {
208+
autoCompressionStr = " AUTO_COMPRESS=FALSE"
209+
}
203210

204-
if _, err := s.ExecContext(ctx, putCmd); err != nil {
211+
if _, err := s.ExecContext(ctx, fmt.Sprintf("PUT file://%s @%s%s", avroFile.FilePath, stage, autoCompressionStr)); err != nil {
205212
return fmt.Errorf("failed to put file to stage: %w", err)
206213
}
207214

208-
s.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage))
215+
s.logger.Info("put file to stage", slog.String("file", avroFile.FilePath), slog.String("stage", stage))
209216
return nil
210217
}

flow/internal/dynamicconf.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ var DynamicSettings = [...]*protos.DynamicSetting{
3434
},
3535
{
3636
Name: "PEERDB_NORMALIZE_CHANNEL_BUFFER_SIZE",
37-
Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalizing, " +
38-
"use with PEERDB_PARALLEL_SYNC_NORMALIZE",
37+
Description: "Advanced setting: changes buffer size of channel PeerDB uses for queueing normalization, " +
38+
"use with PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE",
3939
DefaultValue: "128",
4040
ValueType: protos.DynconfValueType_INT,
4141
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
@@ -120,14 +120,6 @@ var DynamicSettings = [...]*protos.DynamicSetting{
120120
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
121121
TargetForSetting: protos.DynconfTarget_ALL,
122122
},
123-
{
124-
Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT",
125-
Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64",
126-
DefaultValue: "raw",
127-
ValueType: protos.DynconfValueType_STRING,
128-
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
129-
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
130-
},
131123
{
132124
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
133125
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
@@ -136,6 +128,22 @@ var DynamicSettings = [...]*protos.DynamicSetting{
136128
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
137129
TargetForSetting: protos.DynconfTarget_SNOWFLAKE,
138130
},
131+
{
132+
Name: "PEERDB_SNOWFLAKE_AUTO_COMPRESS",
133+
Description: "AUTO_COMPRESS option when uploading to Snowflake",
134+
DefaultValue: "true",
135+
ValueType: protos.DynconfValueType_BOOL,
136+
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
137+
TargetForSetting: protos.DynconfTarget_SNOWFLAKE,
138+
},
139+
{
140+
Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT",
141+
Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64",
142+
DefaultValue: "raw",
143+
ValueType: protos.DynconfValueType_STRING,
144+
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
145+
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
146+
},
139147
{
140148
Name: "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME",
141149
Description: "S3 buckets to store Avro files for mirrors with ClickHouse target",
@@ -554,6 +562,10 @@ func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string)
554562
return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM")
555563
}
556564

565+
func PeerDBSnowflakeAutoCompress(ctx context.Context, env map[string]string) (bool, error) {
566+
return dynamicConfBool(ctx, env, "PEERDB_SNOWFLAKE_AUTO_COMPRESS")
567+
}
568+
557569
func PeerDBClickHouseAWSS3BucketName(ctx context.Context, env map[string]string) (string, error) {
558570
return dynLookup(ctx, env, "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")
559571
}

0 commit comments

Comments
 (0)