diff --git a/constants/constants.go b/constants/constants.go index 1bff31f5b..c90429970 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -33,6 +33,15 @@ const ( EffectiveParquetSize = int64(256) * 1024 * 1024 * int64(8) DB2StateTimestampFormat = "2006-01-02 15:04:05.000000" DefaultStateTimestampFormat = "2006-01-02T15:04:05.000000000Z" + // DistributionLower and DistributionUpper define the acceptable range + // of the distribution factor for validating evenly distributed numeric PKs. + DistributionLower = 0.05 + DistributionUpper = 100.0 + // UnicodeSize is the total number of valid Unicode code points (0 to 0x10FFFF) + UnicodeSize = 1114112 + // MysqlChunkAcceptanceRatio defines the minimum ratio of expected chunks that must be generated + // for the split to be considered valid. + MysqlChunkAcceptanceRatio = float64(0.8) ) type DriverType string diff --git a/constants/state_version.go b/constants/state_version.go index 2fbf0d6b3..68b707663 100644 --- a/constants/state_version.go +++ b/constants/state_version.go @@ -26,11 +26,15 @@ package constants // * Earlier if the session timezone or global was set in offset format, it was not parsed correctly and used to fallback to UTC. // * Now it parses the offset correctly and uses the timezone offset to set the timezone for the connection. // -// - Version 4: (Current Version) Unsigned int/integer/bigint map to Int64. +// - Version 4: Unsigned int/integer/bigint map to Int64. // * Earlier unsigned int/integer/bigint were mapped to Int32 which caused integer overflows. +// +// - Version 5: (Current Version) Added []uint8 (byte slice) support in ReformatInt64 +// * Previously, numeric values returned as byte slices (common in some SQL drivers) caused errors +// * Now these byte slices are parsed and converted into int64 const ( - LatestStateVersion = 4 + LatestStateVersion = 5 ) // Used as the current version of the state when the program is running diff --git a/drivers/mysql/internal/backfill.go b/drivers/mysql/internal/backfill.go index 9ae829797..38032f64e 100644 --- a/drivers/mysql/internal/backfill.go +++ b/drivers/mysql/internal/backfill.go @@ -5,8 +5,11 @@ import ( "database/sql" "fmt" "math" + "math/big" + "slices" "sort" "strings" + "unicode/utf8" "github.com/datazip-inc/olake/constants" "github.com/datazip-inc/olake/destination" @@ -43,13 +46,15 @@ func (m *MySQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, logger.Debugf("Starting backfill from %v to %v with filter: %s, args: %v", chunk.Min, chunk.Max, filter, args) // Get chunks from state or calculate new ones stmt := "" + var chunkArgs []any if chunkColumn != "" { - stmt = jdbc.MysqlChunkScanQuery(stream, []string{chunkColumn}, chunk, filter) + stmt, chunkArgs = jdbc.MysqlChunkScanQuery(stream, []string{chunkColumn}, chunk, filter) } else if len(pkColumns) > 0 { - stmt = jdbc.MysqlChunkScanQuery(stream, pkColumns, chunk, filter) + stmt, chunkArgs = jdbc.MysqlChunkScanQuery(stream, pkColumns, chunk, filter) } else { stmt = jdbc.MysqlLimitOffsetScanQuery(stream, chunk, filter) } + args = append(chunkArgs, args...) logger.Debugf("Executing chunk query: %s", stmt) setter := jdbc.NewReader(ctx, stmt, func(ctx context.Context, query string, queryArgs ...any) (*sql.Rows, error) { return tx.QueryContext(ctx, query, args...) @@ -58,13 +63,20 @@ func (m *MySQL) ChunkIterator(ctx context.Context, stream types.StreamInterface, }) } +// TODO: Separate chunking-related logic from this function so the individual components can be unit tested independently. func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPool, stream types.StreamInterface) (*types.Set[types.Chunk], error) { - var approxRowCount int64 - var avgRowSize any - approxRowCountQuery := jdbc.MySQLTableRowStatsQuery() - err := m.client.QueryRowContext(ctx, approxRowCountQuery, stream.Name()).Scan(&approxRowCount, &avgRowSize) + var ( + approxRowCount int64 + avgRowSize any + approxTableSize int64 + columnCollationType string + dataMaxLength sql.NullInt64 + ) + + tableStatsQuery := jdbc.MySQLTableStatsQuery() + err := m.client.QueryRowContext(ctx, tableStatsQuery, stream.Name()).Scan(&approxRowCount, &avgRowSize, &approxTableSize) if err != nil { - return nil, fmt.Errorf("failed to get approx row count and avg row size: %s", err) + return nil, fmt.Errorf("failed to fetch TableStats query for table=%s: %s", stream.Name(), err) } if approxRowCount == 0 { @@ -93,21 +105,59 @@ func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo chunkSize := int64(math.Ceil(float64(constants.EffectiveParquetSize) / avgRowSizeFloat)) chunks := types.NewSet[types.Chunk]() chunkColumn := stream.Self().StreamMetadata.ChunkColumn + + var ( + chunkStepSize int64 + minVal any // to define lower range of the chunk + maxVal any // to define upper range of the chunk + minBoundary int64 + maxBoundary int64 + ) + + pkColumns := stream.GetStream().SourceDefinedPrimaryKey.Array() + if chunkColumn != "" { + pkColumns = []string{chunkColumn} + } + sort.Strings(pkColumns) + + if len(pkColumns) > 0 { + minVal, maxVal, err = m.getTableExtremes(ctx, stream, pkColumns) + if err != nil { + return nil, fmt.Errorf("Stream %s: Failed to get table extremes: %s", stream.ID(), err) + } + } + + stringSupportedPk := false + + if len(pkColumns) == 1 { + var dataType string + query := jdbc.MySQLColumnStatsQuery() + err = m.client.QueryRowContext(ctx, query, stream.Name(), pkColumns[0]).Scan(&dataType, &dataMaxLength, &columnCollationType) + if err != nil { + return nil, fmt.Errorf("failed to fetch Column DataType and max length %s", err) + } + // 1. Try Numeric Strategy + chunkStepSize, minBoundary, maxBoundary = IsNumericAndEvenDistributed(minVal, maxVal, approxRowCount, chunkSize, dataType) + + // 2. If not numeric, check for supported String strategy + if chunkStepSize == 0 { + switch strings.ToLower(dataType) { + case "char", "varchar": + stringSupportedPk = true + logger.Infof("%s is a string type PK", pkColumns[0]) + if dataMaxLength.Valid { + logger.Infof("Data Max Length: %d", dataMaxLength.Int64) + } + default: + logger.Infof("%s is not a string type PK", pkColumns[0]) + } + } + } + // Takes the user defined batch size as chunkSize // TODO: common-out the chunking logic for db2, mssql, mysql splitViaPrimaryKey := func(stream types.StreamInterface, chunks *types.Set[types.Chunk]) error { return jdbc.WithIsolation(ctx, m.client, true, func(tx *sql.Tx) error { - // Get primary key column using the provided function - pkColumns := stream.GetStream().SourceDefinedPrimaryKey.Array() - if chunkColumn != "" { - pkColumns = []string{chunkColumn} - } - sort.Strings(pkColumns) - // Get table extremes - minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkColumns, tx) - if err != nil { - return fmt.Errorf("failed to get table extremes: %s", err) - } if minVal == nil { return nil } @@ -155,6 +205,7 @@ func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo }) } + logger.Infof("Chunking completed using SplitViaPrimaryKey Method for stream %s", stream.ID()) return nil }) } @@ -176,20 +227,324 @@ func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo Min: utils.ConvertToString(lastChunk), Max: nil, }) + logger.Infof("Chunking completed using limit offset method for stream %s", stream.ID()) + return nil + }) + } + + /* + splitEvenlyForInt generates chunk boundaries for numeric values by dividing the range [minBoundary, maxBoundary] using an arithmetic progression (AP). + + Each boundary follows: + next = prev + chunkStepSize + + Example: + minBoundary = 0, maxBoundary = 100, chunkStepSize = 25 + + AP sequence: + 0 → 25 → 50 → 75 → 100 + + Chunks formed: + (-∞, 0), [0,25), [25,50), [50,75), [75,100), [100, +∞) + */ + splitEvenlyForInt := func(chunks *types.Set[types.Chunk], chunkStepSize int64) error { + chunks.Insert(types.Chunk{ + Min: nil, + Max: utils.ConvertToString(minBoundary), + }) + prev := minBoundary + for next := minBoundary + chunkStepSize; next <= maxBoundary; next += chunkStepSize { + // condition to protect from infinite loop + if next <= prev { + logger.Warnf("int precision collapse detected, falling back to SplitViaPrimaryKey for stream %s", stream.ID()) + chunks.Clear() + return splitViaPrimaryKey(stream, chunks) + } + chunks.Insert(types.Chunk{ + Min: utils.ConvertToString(prev), + Max: utils.ConvertToString(next), + }) + prev = next + } + chunks.Insert(types.Chunk{ + Min: utils.ConvertToString(prev), + Max: nil, + }) + logger.Infof("Chunking completed using splitEvenlyForInt Method for stream %s", stream.ID()) + return nil + } + + /* + splitEvenlyForString generates chunk boundaries for string-based primary keys + by converting string values into a numeric (big.Int) space and iteratively + splitting that range. + + Workflow: + 1. Convert min and max string values into padded form and map them into big.Int using unicode-based encoding. + 2. Estimate the expected number of chunks based on table size and target file size. + 3. Compute an initial chunk interval using ceil division on the numeric range. + 4. Iteratively (up to 5 attempts): + - Adjust the interval using an AP-based variation. + - Generate candidate boundaries in numeric space and map them back to strings. + - Query distinct values using collation-aware SQL (ordering handled in query). + - Validate the number of effective chunks using a count query. + - If at least the required threshold (~80%) of chunks is achieved, accept and stop. + 5. If all attempts fail, fallback to primary key–based chunking. + + Final Step: + - Use the validated boundary values to construct non-overlapping chunks + covering the full range [min, max], including open-ended boundaries. + + Example: + minVal = "aa", maxVal = "az", expectedChunks = 3 + + Generated boundaries after refining boundaries using collation-aware DB queries: + ["aa", "ai", "ar", "az"] + + Chunks: + (-∞, "aa"), ["aa","ai"), ["ai","ar"), ["ar","az"), ["az", +∞) + */ + splitEvenlyForString := func(chunks *types.Set[types.Chunk]) error { + var validChunksCount int + + maxValPadded := utils.ConvertToString(maxVal) + minValPadded := utils.ConvertToString(minVal) + + if dataMaxLength.Valid { + maxValPadded = padRightWithNulls(maxValPadded, int(dataMaxLength.Int64)) + minValPadded = padRightWithNulls(minValPadded, int(dataMaxLength.Int64)) + } + + maxEncodedBigIntValue := encodeUnicodeStringToBigInt(maxValPadded) + minEncodedBigIntValue := encodeUnicodeStringToBigInt(minValPadded) + + expectedChunks := int64(math.Ceil(float64(approxTableSize) / float64(constants.EffectiveParquetSize))) + expectedChunks = utils.Ternary(expectedChunks <= 0, int64(1), expectedChunks).(int64) + + stringChunkStepSize := new(big.Int).Sub(&maxEncodedBigIntValue, &minEncodedBigIntValue) + stringChunkStepSize.Add(stringChunkStepSize, new(big.Int).Sub(big.NewInt(expectedChunks), big.NewInt(1))) + stringChunkStepSize.Div(stringChunkStepSize, big.NewInt(expectedChunks)) //ceil division set up + + rangeSlice := []string{} + // Try up to 5 times to generate balanced chunks by slightly adjusting the chunk size each iteration. + for retryAttempt := int64(0); retryAttempt < int64(5); retryAttempt++ { + adjustedStepSize := new(big.Int).Set(stringChunkStepSize) + adjustedStepSize.Add(adjustedStepSize, big.NewInt(retryAttempt)) + adjustedStepSize.Div(adjustedStepSize, big.NewInt(retryAttempt+1)) + currentBoundary := new(big.Int).Set(&minEncodedBigIntValue) + + for chunkIdx := int64(0); chunkIdx < expectedChunks*(retryAttempt+1) && currentBoundary.Cmp(&maxEncodedBigIntValue) < 0; chunkIdx++ { + rangeSlice = append(rangeSlice, decodeBigIntToUnicodeString(currentBoundary)) + currentBoundary.Add(currentBoundary, adjustedStepSize) + } + + // Align boundaries with actual DB values using MySQL collation ordering + rangeSlice = append(rangeSlice, decodeBigIntToUnicodeString(&maxEncodedBigIntValue)) + query, args := jdbc.MySQLDistinctValuesWithCollationQuery(rangeSlice, columnCollationType) + rows, err := m.client.QueryContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("failed to run distinct query: %s", err) + } + rangeSlice = rangeSlice[:0] + // Some chunks generated might be completely empty when boundaries greater + // than the max value and smaller than the min value exists + for rows.Next() { + var val string + if err := rows.Scan(&val); err != nil { + rows.Close() + return fmt.Errorf("failed to scan row: %s", err) + } + rangeSlice = append(rangeSlice, val) + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("row iteration error during distinct boundaries iteration: %s", err) + } + + // Counting the number of valid chunks generated i.e., between min and max + query, args = jdbc.MySQLCountGeneratedInRange(rangeSlice, columnCollationType, minValPadded, maxValPadded) + err = m.client.QueryRowContext(ctx, query, args...).Scan(&validChunksCount) + if err != nil { + return fmt.Errorf("failed to run count query: %s", err) + } + + // Accept boundaries if enough valid chunks are produced + if float64(validChunksCount) >= float64(expectedChunks)*constants.MysqlChunkAcceptanceRatio { + logger.Infof("Successfully Generated Chunks using splitEvenlyForString Method for stream %s", stream.ID()) + break + } + + // If the number of valid chunks generated is less than the expected chunks * a constant factor even after 5 iterations, we fallback to splitViaPrimaryKey + if float64(validChunksCount) < float64(expectedChunks)*constants.MysqlChunkAcceptanceRatio && retryAttempt == 4 { + logger.Warnf("failed to generate chunks for stream %s, falling back to splitviaprimarykey method", stream.ID()) + err = splitViaPrimaryKey(stream, chunks) + if err != nil { + return fmt.Errorf("failed to generate chunks for stream %s: %s", stream.ID(), err) + } + return nil + } + rangeSlice = rangeSlice[:0] + } + + if len(rangeSlice) == 0 { return nil + } + + prev := rangeSlice[0] + chunks.Insert(types.Chunk{ + Min: nil, + Max: prev, }) + + for idx := range rangeSlice { + if idx == 0 { + continue + } + currVal := rangeSlice[idx] + chunks.Insert(types.Chunk{ + Min: prev, + Max: currVal, + }) + prev = currVal + } + + chunks.Insert(types.Chunk{ + Min: prev, + Max: nil, + }) + + logger.Infof("Chunking completed using splitEvenlyForString Method for stream %s", stream.ID()) + return nil } - if stream.GetStream().SourceDefinedPrimaryKey.Len() > 0 || chunkColumn != "" { + switch { + case len(pkColumns) == 1 && chunkStepSize > 0: + logger.Infof("Using splitEvenlyForInt Method for stream %s", stream.ID()) + err = splitEvenlyForInt(chunks, chunkStepSize) + case len(pkColumns) == 1 && stringSupportedPk: + logger.Infof("Using splitEvenlyForString Method for stream %s", stream.ID()) + err = splitEvenlyForString(chunks) + case len(pkColumns) > 0: + logger.Infof("Using SplitViaPrimaryKey Method for stream %s", stream.ID()) err = splitViaPrimaryKey(stream, chunks) - } else { + default: + logger.Infof("Falling back to limit offset method for stream %s", stream.ID()) err = limitOffsetChunking(chunks) } + return chunks, err } -func (m *MySQL) getTableExtremes(ctx context.Context, stream types.StreamInterface, pkColumns []string, tx *sql.Tx) (min, max any, err error) { +func (m *MySQL) getTableExtremes(ctx context.Context, stream types.StreamInterface, pkColumns []string) (min, max any, err error) { query := jdbc.MinMaxQueryMySQL(stream, pkColumns) - err = tx.QueryRowContext(ctx, query).Scan(&min, &max) + err = m.client.QueryRowContext(ctx, query).Scan(&min, &max) return min, max, err } + +// checks if the pk column is numeric and evenly distributed +func IsNumericAndEvenDistributed(minVal any, maxVal any, approxRowCount int64, chunkSize int64, dataType string) (int64, int64, int64) { + icebergDataType := mysqlTypeToDataTypes[strings.ToLower(dataType)] + if icebergDataType != types.Int32 && icebergDataType != types.Int64 { + logger.Debugf("Current pk is not a supported numeric column") + return 0, 0, 0 + } + + minBoundary, err := typeutils.ReformatInt64(minVal) + if err != nil { + logger.Debugf("failed to parse minVal: %s", err) + return 0, 0, 0 + } + + maxBoundary, err := typeutils.ReformatInt64(maxVal) + if err != nil { + logger.Debugf("failed to parse maxVal: %s", err) + return 0, 0, 0 + } + + distributionFactor := (float64(maxBoundary) - float64(minBoundary) + 1) / float64(approxRowCount) + + if distributionFactor < constants.DistributionLower || distributionFactor > constants.DistributionUpper { + logger.Debugf("distribution factor is not in the range of %f to %f", constants.DistributionLower, constants.DistributionUpper) + return 0, 0, 0 + } + + chunkStepSize := int64(math.Ceil(math.Max(distributionFactor*float64(chunkSize), 1))) + return chunkStepSize, minBoundary, maxBoundary +} + +/* + encodeUnicodeStringToBigInt maps a string to a big.Int using base = 1114112(UnicodeSize), treating each rune as a digit in a positional system. + + Value = r₀*base^(n-1) + r₁*base^(n-2) + ... + rₙ + + Example: + s = "aa" + r₀ = 'a' = 97, r₁ = 'a' = 97, base = 1114112 + + Value = r₀*base^(n-1) + r₁*base^(n-2) + + = 97*1114112 + 97 + = 108068961 +*/ +func encodeUnicodeStringToBigInt(s string) big.Int { + base := big.NewInt(constants.UnicodeSize) + val := big.NewInt(0) + + for _, ch := range []rune(s) { + val.Mul(val, base) + val.Add(val, big.NewInt(int64(ch))) + } + return *val +} + +/* + decodeBigIntToUnicodeString reconstructs the original string from its big.Int representation by extracting digits in base = 1114112 (UnicodeSize). + + It repeatedly takes modulus and division by base to recover each rune: + rᵢ = n % base, then n = n / base + + Example: + n = 108068961, base = 1114112 + + Step 1: + r₁ = n % base = 97 → 'a' + n = n / base = 97 + + Step 2: + r₀ = n % base = 97 → 'a' + n = 0 + + Reconstructed (after reversing): + "aa" +*/ +func decodeBigIntToUnicodeString(n *big.Int) string { + if n.Cmp(big.NewInt(0)) == 0 { + return "" + } + base := big.NewInt(constants.UnicodeSize) + x := new(big.Int).Set(n) + var runes []rune + + for x.Cmp(big.NewInt(0)) > 0 { + rem := new(big.Int).Mod(x, base) + runes = append(runes, rune(rem.Int64())) + x.Div(x, base) + } + + slices.Reverse(runes) + return string(runes) +} + +/* + Padding a string with null characters to a specified length. + + Example: + padRightWithNulls("aa", 4) = "aa\x00\x00" +*/ +func padRightWithNulls(s string, maxLength int) string { + length := utf8.RuneCountInString(s) + if length >= maxLength { + return s + } + return s + strings.Repeat("\x00", maxLength-length) +} diff --git a/pkg/jdbc/jdbc.go b/pkg/jdbc/jdbc.go index fec9d98a8..359716dda 100644 --- a/pkg/jdbc/jdbc.go +++ b/pkg/jdbc/jdbc.go @@ -230,6 +230,94 @@ func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, c return fmt.Sprintf(`SELECT * FROM %s WHERE %s`, quotedTable, chunkCond) } +// TODO: Common out buildChunkConditionMySQL for MSSQL, DB2, and other drivers where needed. +// MySQL-Specific Queries buildChunkConditionMySQL builds the condition for a chunk in MySQL +func buildChunkConditionMySQL(filterColumns []string, chunk types.Chunk, extraFilter string) (string, []any) { + quotedCols := QuoteColumns(filterColumns, constants.MySQL) + + splitBoundaryValues := func(boundary any) []string { + if boundary == nil { + return nil + } + str := utils.ConvertToString(boundary) + parts := strings.Split(str, ",") + for i, part := range parts { + parts[i] = strings.TrimSpace(part) + } + return parts + } + + // buildBound creates the expanded logic for: + // (c1, c2, c3) >= (v1, v2, v3) + // as: + // (c1 > v1) OR (c1 = v1 AND c2 > v2) OR (c1 = v1 AND c2 = v2 AND c3 >= v3) + // + // For upper bounds, it creates: + // (c1 < v1) OR (c1 = v1 AND c2 < v2) OR (c1 = v1 AND c2 = v2 AND c3 < v3) + buildBound := func(values []string, isLower bool) (string, []any) { + var args []any + orGroups := make([]string, 0, len(quotedCols)) + + for colIdx := 0; colIdx < len(quotedCols); colIdx++ { + andConds := make([]string, 0, colIdx+1) + + // Prefix columns must match exactly: c1 = v1 AND c2 = v2 ... + for prefixIdx := 0; prefixIdx < colIdx; prefixIdx++ { + if prefixIdx < len(values) { + andConds = append(andConds, fmt.Sprintf("%s = ?", quotedCols[prefixIdx])) + args = append(args, values[prefixIdx]) + } + } + + var op string + if isLower { + op = ">" + if colIdx == len(quotedCols)-1 { + op = ">=" + } + } else { + op = "<" + } + + if colIdx < len(values) { + andConds = append(andConds, fmt.Sprintf("%s %s ?", quotedCols[colIdx], op)) + args = append(args, values[colIdx]) + } + if len(andConds) > 0 { + orGroups = append(orGroups, "("+strings.Join(andConds, " AND ")+")") + } + } + + return "(" + strings.Join(orGroups, " OR ") + ")", args + } + + lowerValues := splitBoundaryValues(chunk.Min) + upperValues := splitBoundaryValues(chunk.Max) + + chunkCond := "" + var args []any + switch { + case chunk.Min != nil && chunk.Max != nil: + lowerCond, lowerArgs := buildBound(lowerValues, true) + upperCond, upperArgs := buildBound(upperValues, false) + if lowerCond != "" && upperCond != "" { + chunkCond = fmt.Sprintf("(%s) AND (%s)", lowerCond, upperCond) + args = append(args, lowerArgs...) + args = append(args, upperArgs...) + } + case chunk.Min != nil: + chunkCond, args = buildBound(lowerValues, true) + case chunk.Max != nil: + chunkCond, args = buildBound(upperValues, false) + } + + // Combine with any additional filter if present. + if extraFilter != "" && chunkCond != "" { + chunkCond = fmt.Sprintf("(%s) AND (%s)", chunkCond, extraFilter) + } + return chunkCond, args +} + // buildLexicographicChunkCondition builds a WHERE condition for a chunk scan using // lexicographic OR-groups over multiple ordering columns. // @@ -326,13 +414,6 @@ func buildLexicographicChunkCondition(quotedColumns []string, chunk types.Chunk, return chunkCond } -// MySQL-Specific Queries -// buildChunkConditionMySQL builds the condition for a chunk in MySQL. -func buildChunkConditionMySQL(filterColumns []string, chunk types.Chunk, extraFilter string) string { - quotedCols := QuoteColumns(filterColumns, constants.MySQL) - return buildLexicographicChunkCondition(quotedCols, chunk, extraFilter) -} - // MysqlLimitOffsetScanQuery is used to get the rows func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string { quotedTable := QuoteTable(stream.Namespace(), stream.Name(), constants.MySQL) @@ -354,10 +435,10 @@ func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, } // MySQLWithoutState builds a chunk scan query for MySql -func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string { - condition := buildChunkConditionMySQL(filterColumns, chunk, extraFilter) +func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) (string, []any) { + condition, args := buildChunkConditionMySQL(filterColumns, chunk, extraFilter) quotedTable := QuoteTable(stream.Namespace(), stream.Name(), constants.MySQL) - return fmt.Sprintf("SELECT * FROM %s WHERE %s", quotedTable, condition) + return fmt.Sprintf("SELECT * FROM %s WHERE %s", quotedTable, condition), args } // MinMaxQueryMySQL returns the query to fetch MIN and MAX values of a column in a MySQL table @@ -426,17 +507,79 @@ func MySQLPrimaryKeyQuery() string { ` } -// MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL -func MySQLTableRowStatsQuery() string { +// MySQLTableStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL +func MySQLTableStatsQuery() string { return ` SELECT TABLE_ROWS, - CEIL(data_length / NULLIF(table_rows, 0)) AS avg_row_bytes + CEIL(data_length / NULLIF(table_rows, 0)) AS avg_row_bytes, + DATA_LENGTH FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? ` } +// MySQLColumnStatsQuery returns a query that fetches the DATA_TYPE, CHARACTER_MAXIMUM_LENGTH and Collation type of a column in MySQL. +func MySQLColumnStatsQuery() string { + return ` + SELECT DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, COALESCE(COLLATION_NAME, '') + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = ? + AND COLUMN_NAME = ? + LIMIT 1; + ` +} + +// MySQLDistinctValuesWithCollationQuery builds a DISTINCT query over a slice of strings +// using the column's collation type. +func MySQLDistinctValuesWithCollationQuery(values []string, columnCollationType string) (string, []any) { + unionParts := make([]string, 0, len(values)) + args := make([]any, 0, len(values)) + for _, v := range values { + unionParts = append(unionParts, "SELECT ? AS val") + args = append(args, v) + } + query := fmt.Sprintf(` + SELECT DISTINCT val COLLATE %s AS val + FROM ( + %s + ) AS t + ORDER BY val COLLATE %s; + `, columnCollationType, strings.Join(unionParts, " UNION ALL "), columnCollationType) + return query, args +} + +// MySQLCountGeneratedInRange builds a query that counts how many values from the provided slice +// fall within [minVal, maxVal] using the column's collation ordering. +func MySQLCountGeneratedInRange(values []string, columnCollationType string, minVal, maxVal string) (string, []any) { + unionParts := make([]string, 0, len(values)) + args := make([]any, 0, len(values)+2) + + args = append(args, minVal, maxVal, maxVal, minVal) + + for _, v := range values { + unionParts = append(unionParts, "SELECT ? AS val") + args = append(args, v) + } + + query := fmt.Sprintf(` + SELECT GREATEST( + SUM(CASE + WHEN val COLLATE %s >= ? AND val COLLATE %s <= ? + THEN 1 ELSE 0 END), + SUM(CASE + WHEN val COLLATE %s >= ? AND val COLLATE %s <= ? + THEN 1 ELSE 0 END) + ) AS max_count + FROM ( + %s + ) AS t; + `, columnCollationType, columnCollationType, columnCollationType, columnCollationType, strings.Join(unionParts, " UNION ALL ")) + + return query, args +} + // MySQLTableExistsQuery returns the query to check if a table has any rows using EXISTS func MySQLTableExistsQuery(stream types.StreamInterface) string { quotedTable := QuoteTable(stream.Namespace(), stream.Name(), constants.MySQL) diff --git a/types/set.go b/types/set.go index 9ae2eec13..0d0cb8688 100644 --- a/types/set.go +++ b/types/set.go @@ -208,3 +208,9 @@ func (st *Set[T]) UnmarshalJSON(data []byte) error { func (st *Set[T]) MarshalJSON() ([]byte, error) { return json.Marshal(st.Array()) } + +// Clear removes all elements from the set +func (st *Set[T]) Clear() { + st.hash = make(map[string]nothing) + st.storage = make(map[string]T) +} diff --git a/utils/typeutils/reformat.go b/utils/typeutils/reformat.go index b5bfdb474..f0547d44e 100644 --- a/utils/typeutils/reformat.go +++ b/utils/typeutils/reformat.go @@ -280,7 +280,7 @@ func parseStringTimestamp(value string, isTimestampInDB bool) (time.Time, error) return time.Unix(0, 0).UTC(), nil } -// TODO: Add bytes array handling of int64 and other datatypes. Also add unit test cases for it. +// TODO: Add unit test cases for ReformatInt64 and byte array handling for other datatypes as well. func ReformatInt64(v any) (int64, error) { switch v := v.(type) { case json.Number: @@ -324,6 +324,19 @@ func ReformatInt64(v any) (int64, error) { return intValue, nil case *any: return ReformatInt64(*v) + case []uint8: + if constants.LoadedStateVersion > 4 { + strVal := string(v) + intValue, err := strconv.ParseInt(strVal, 10, 64) + if err == nil { + return intValue, nil + } + uintValue, err := strconv.ParseUint(strVal, 10, 64) + if err == nil { + //nolint:gosec // G115: converting []uint8 to int64 is safe and required for backward compatibility + return int64(uintValue), nil + } + } } return int64(0), fmt.Errorf("failed to change %v (type:%T) to int64", v, v)