Skip to content

feat: mysql chunking optimisation#797

Open
saksham-datazip wants to merge 34 commits intostagingfrom
feat/mysql-chunking-optimization
Open

feat: mysql chunking optimisation#797
saksham-datazip wants to merge 34 commits intostagingfrom
feat/mysql-chunking-optimization

Conversation

@saksham-datazip
Copy link
Collaborator

@saksham-datazip saksham-datazip commented Jan 27, 2026

Description

This PR improves the MySQL chunking strategy with the primary goal of significantly reducing chunk generation time for large tables during incremental reads.

To achieve this, two mathematical chunking strategies were introduced based on the primary key type, replacing repeated database-based chunk discovery.

Numeric Primary Keys

The numeric range [min, max] is divided using an arithmetic progression to generate evenly spaced chunk boundaries. This allows chunk boundaries to be computed mathematically instead of relying on repeated database lookups, significantly reducing chunking time.

String Primary Keys

String values are mapped into a numeric space using Unicode encoding (big.Int) and then split into balanced ranges. These candidate boundaries are then aligned with actual database values using collation-aware queries to maintain correct ordering.

These strategies substantially reduce the number of database round trips required for chunk discovery, resulting in faster chunk generation and improved performance for large datasets.

As part of this work, several edge cases in chunk boundary calculation were also addressed, particularly around MySQL collation-aware ordering for string primary keys. The implementation aligns generated boundaries with actual database values using collation-aware queries, ensuring correct range generation and preventing missing or overlapping chunks.

Additionally, a small compatibility fix was introduced in refractor.go. Previously, some queries used hardcoded SQL strings, which caused MySQL to return numeric values as uint64. After switching to parameterized queries, the Go MySQL driver began returning these values as []uint8 (byte slices).

To handle this change correctly, an additional []uint8 case was added in ReformatInt64 so that these values are properly parsed and converted to int64. This ensures consistent behavior regardless of how the query result is returned by the driver.

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

  • Tested MySQL chunking with INT32 primary keys

  • Tested MySQL chunking with INT64 primary keys

  • Tested MySQL chunking with FLOAT / DOUBLE primary keys

  • Verified no data loss or overlap across chunk boundaries

  • Tested on different kind of string pk for full refresh and cdc

  • Confirmed performance improvement on large datasets

Performance Stats (Different PK Types)

The following stats.json outputs were collected from runs on different MySQL tables, each containing 10M records, using different primary key types.

🔢 Table with INT32 Primary Key

  • Seconds Elapsed: 184.00
  • Speed: 54,347.30 rps
  • Memory: 96 MB

🔣 Table with FLOAT64 Primary Key

  • Seconds Elapsed: 54.00
  • Speed: 185,179.58 rps
  • Memory: 36 MB

Screenshots or Recordings

https://datazip.atlassian.net/wiki/x/AYCVDg

Documentation

  • Documentation Link: [link to README, olake.io/docs, or olake-docs]
  • N/A (bug fix, refactor, or test changes only)

Related PR's (If Any):

N/A

Comment on lines +48 to +53
if chunk.Min == nil && chunk.Max == nil && filter == "" {
stmt = fmt.Sprintf(
"SELECT * FROM `%s`.`%s`",
stream.Namespace(),
stream.Name(),
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we falling back to single chunk , shouldn't this fall back to older way ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is added because after where clause all conditions are empty than this statement

select * from stream.Namespace().stream.Name() where 

will throw error.

Comment on lines +156 to +166
err = splitViaPrimaryKey(
ctx,
m,
stream,
chunks,
chunkColumn,
chunkSize,
minVal,
maxVal,
pkColumns,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't split it into various lines, call in single line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +261 to +266
func shouldUseEvenDistribution(
minVal any,
maxVal any,
approxRowCount int64,
chunkSize int64,
) (bool, int64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly here, figure out other places as well where you have done this and correct it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

func splitEvenlyForInt(minVal, maxVal any, chunks *types.Set[types.Chunk], step int64) {
start, _ := typeutils.ReformatFloat64(minVal)
end, _ := typeutils.ReformatFloat64(maxVal)
logger.Infof("splitEvenlyForInt start=%v end=%v step=%d", start, end, step)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this log

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +48 to +52
if chunk.Min == nil && chunk.Max == nil && filter == "" {
stmt = fmt.Sprintf(
"SELECT * FROM `%s`.`%s`",
stream.Namespace(),
stream.Name(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query building logic is being handled in jdbc.go file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +299 to +320
if start+float64(step) > end {
chunks.Insert(types.Chunk{
Min: nil,
Max: nil,
})
logger.Infof("Generated single chunk: %+v", chunks)
return
}

prev := start

for next := start + float64(step); next <= end; next += float64(step) {
chunks.Insert(types.Chunk{
Min: utils.ConvertToString(prev),
Max: utils.ConvertToString(next),
})
prev = next
}

chunks.Insert(types.Chunk{
Min: utils.ConvertToString(prev),
Max: nil,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this functions logic is too much cluttered can be made simpler also most of this code looks similar to older code

Copy link
Collaborator Author

@saksham-datazip saksham-datazip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self review

maxValBaseN, err1 := convertStringToIntBaseN(utils.ConvertToString(maxVal))
minValBaseN, err2 := convertStringToIntBaseN(utils.ConvertToString(minVal))
if err1 != nil || err2 != nil {
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should at least return the error , otherwise how will someone debug what happened

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

pkg/jdbc/jdbc.go Outdated
func MySQLTableSizeQuery() string {
return `
SELECT
DATA_LENGTH + INDEX_LENGTH AS table_size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we including index length as well here ? why wont the older query MySQLTableRowStatsQuery not do the work ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that querry is for average row count and average row size while what this function is returning is schema size

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

pkg/jdbc/jdbc.go Outdated
Comment on lines +432 to +434
if condition == "" {
condition = utils.Ternary(extraFilter != "", extraFilter, "1 = 1").(string)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will create not so good query, instead you could have added the WHERE also here and appended it in last string

also can you give example of case where the condition string will be empty. because AFAIK condition creates query based on chunk , and always there will be a chunk like

from start to null

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used in other drivers as well so i continued using it for sake of consistency

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

}
sort.Strings(pkColumns)

if stream.GetStream().SourceDefinedPrimaryKey.Len() > 0 || chunkColumn != "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can directly check the pkcolumns array here , it has all the info

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +245 to +246
Min: nil,
Max: nil,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does nil to nil chunk mean ? , at other places as well we have created chunk from nil to start and start to nil if there is no further value . similar can be done and if there is no value in table we don't need to proceed till this function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

return min, max, err
}

func shouldUseEvenDistribution(minVal any, maxVal any, approxRowCount int64, chunkSize int64) (bool, int64, float64, float64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment for what this function does and what it returns

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +289 to +292
minFloat, err1 := typeutils.ReformatFloat64(minVal)
maxFloat, err2 := typeutils.ReformatFloat64(maxVal)
if err1 != nil || err2 != nil {
return false, 0, 0, 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again why are we not sending the error back ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Min: nil,
Max: *convertIntBaseNtoString(prev),
})
for next := new(big.Int).Add(prev, chunkdiff); next.Cmp(&maxValBaseN) < 0; next.Add(next, chunkdiff) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have already subtracted from maxValBaseN earlier, check again this logic might be flawed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

})
}
if len(pkColumns) == 1 && isEvenDistribution {
splitEvenlyForInt(minFloat, maxFloat, chunks, float64(step))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be better if you use int as step so that there are well defined boundaries

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this float is because of float pk, their we will need it

Comment on lines +130 to +156
// Supported MySQL string-like PK datatypes
var stringTypes = map[string]struct{}{
"char": {},
"varchar": {},
}
//defining boolean to check if string is supported or not
stringSupportedPk := false

if len(pkColumns) == 1 {
isNumericAndEvenDistributed, chunkStepSize, minFloat, maxFloat, err = IsNumericAndEvenDistributed(minVal, maxVal, approxRowCount, chunkSize)
if err != nil {
isNumericAndEvenDistributed = false
logger.Infof("Stream %s: PK is not numeric or conversion failed, falling back to string splitting: %v", stream.ID(), err)
}
if !isNumericAndEvenDistributed {
var dataType string
query := jdbc.MySQLColumnTypeQuery()
err = m.client.QueryRowContext(ctx, query, stream.Name(), pkColumns[0]).Scan(&dataType, &dataMaxLength)
if err != nil {
return nil, fmt.Errorf("failed to fetch Column DataType and max length %s", err)
} else if _, ok := stringTypes[dataType]; ok {
stringSupportedPk = true
logger.Infof("%s is a string type PK",pkColumns[0])
if dataMaxLength.Valid {
logger.Infof("Data Max Length: %d", dataMaxLength.Int64)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be refactored something like this

Suggested change
// Supported MySQL string-like PK datatypes
var stringTypes = map[string]struct{}{
"char": {},
"varchar": {},
}
//defining boolean to check if string is supported or not
stringSupportedPk := false
if len(pkColumns) == 1 {
isNumericAndEvenDistributed, chunkStepSize, minFloat, maxFloat, err = IsNumericAndEvenDistributed(minVal, maxVal, approxRowCount, chunkSize)
if err != nil {
isNumericAndEvenDistributed = false
logger.Infof("Stream %s: PK is not numeric or conversion failed, falling back to string splitting: %v", stream.ID(), err)
}
if !isNumericAndEvenDistributed {
var dataType string
query := jdbc.MySQLColumnTypeQuery()
err = m.client.QueryRowContext(ctx, query, stream.Name(), pkColumns[0]).Scan(&dataType, &dataMaxLength)
if err != nil {
return nil, fmt.Errorf("failed to fetch Column DataType and max length %s", err)
} else if _, ok := stringTypes[dataType]; ok {
stringSupportedPk = true
logger.Infof("%s is a string type PK",pkColumns[0])
if dataMaxLength.Valid {
logger.Infof("Data Max Length: %d", dataMaxLength.Int64)
}
}
// defining boolean to check if string is supported or not
stringSupportedPk := false
if len(pkColumns) == 1 {
// 1. Try Numeric Strategy
isNumericAndEvenDistributed, chunkStepSize, minFloat, maxFloat, err = IsNumericAndEvenDistributed(minVal, maxVal, approxRowCount, chunkSize)
if err != nil {
isNumericAndEvenDistributed = false
logger.Infof("Stream %s: PK is not numeric or conversion failed: %v", stream.ID(), err)
}
// 2. If not numeric, check for supported String strategy
if !isNumericAndEvenDistributed {
var dataType string
// Fetch column type
query := jdbc.MySQLColumnTypeQuery()
if err := m.client.QueryRowContext(ctx, query, stream.Name(), pkColumns[0]).Scan(&dataType, &dataMaxLength); err != nil {
return nil, fmt.Errorf("failed to fetch Column DataType and max length: %w", err)
}
// Check if type is supported
switch dataType {
case "char", "varchar":
stringSupportedPk = true
logger.Infof("%s is a string type PK (type: %s)", pkColumns[0], dataType)
if dataMaxLength.Valid {
logger.Infof("Data Max Length: %d", dataMaxLength.Int64)
}
default:
logger.Infof("Unsupported string PK type: %s", dataType)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +330 to +346
for i := int64(0); i < int64(5); i++ {
temporarychunkdiff := new(big.Int).Set(chunkdiff)
temporarychunkdiff.Add(temporarychunkdiff, big.NewInt(i))
temporarychunkdiff.Div(temporarychunkdiff, big.NewInt(i+1))
curr := new(big.Int).Set(&minValBaseN)

for j := int64(0); j < expectedChunks && curr.Cmp(&maxValBaseN) < 0; j++ {
rangeSlice = append(rangeSlice, convertIntUnicodeToString(curr))
curr.Add(curr, temporarychunkdiff)
}

rangeSlice = append(rangeSlice, convertIntUnicodeToString(&maxValBaseN))
query, args := jdbc.MySQLDistinctValuesWithCollationQuery(rangeSlice, tableCollationType)
rows, err := m.client.QueryContext(ctx, query, args...)
if err != nil {
return fmt.Errorf("failed to run distinct query: %v", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment to explain this loop also , use more explanative variables instead of I and j

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

//
// For upper bounds, it creates:
// (c1 < v1) OR (c1 = v1 AND c2 < v2) OR (c1 = v1 AND c2 = v2 AND c3 < v3)
buildBound := func(values []string, isLower bool) (string, []any) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isLowerBound variable name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following the same pattern as used in mssql

pkg/jdbc/jdbc.go Outdated
// Combine with any additional filter if present.
if extraFilter != "" && chunkCond != "" {
chunkCond = fmt.Sprintf("(%s) AND (%s)", chunkCond, extraFilter)
return fmt.Sprintf("(%s) AND (%s)", chunkCond, extraFilter), args
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arg isn't modified here so change might not be required

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

pkg/jdbc/jdbc.go Outdated
return query, args
}

func MySQLCountGeneratedInRange(values []string, tableCollationType string, minVal, maxVal string) (string, []any) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments explaining what this function does as well as others missing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

if len(pkColumns) > 0 {
minVal, maxVal, err = m.getTableExtremes(ctx, stream, pkColumns)
if err != nil {
return nil, fmt.Errorf("Stream %s: Failed to get table extremes: %v", stream.ID(), err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use %s for error everywhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

if float64(validChunksCount) >= float64(expectedChunks)*constants.MysqlChunkSizeReductionFactor {
logger.Infof("Successfully Generated Chunks using splitEvenlyForString Method for stream %s", stream.ID())
for i, val := range rangeSlice {
logger.Debugf("Boundary[%d] = %q", i, val)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we printing all the boundaries ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

return nil, fmt.Errorf("Stream %s: Failed to get table extremes: %s", stream.ID(), err)
}
}
//defining boolean to check if string is supported or not
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove some unnecessary comments , where it becomes self explanatory,

also what does string is supported mean ? we are checking if the pk is only string right ? name it accordingly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +356 to +361
var val string
if err := rows.Scan(&val); err != nil {
return fmt.Errorf("failed to scan row: %s", err)
}
rangeSlice = append(rangeSlice, val)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this we should also check for rows.Err()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +580 to +582
SUM(CASE
WHEN val COLLATE %s >= ? AND val COLLATE %s <= ?
THEN 1 ELSE 0 END)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this second condition give value != 0 ?

Copy link
Collaborator Author

@saksham-datazip saksham-datazip Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be called when chunkdiff is negative ie., max is lesser than min due to mysql chunking

pkg/jdbc/jdbc.go Outdated
@@ -427,16 +509,86 @@ func MySQLPrimaryKeyQuery() string {
}

// MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the comment as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

pkg/jdbc/jdbc.go Outdated
CEIL(data_length / NULLIF(table_rows, 0)) AS avg_row_bytes
CEIL(data_length / NULLIF(table_rows, 0)) AS avg_row_bytes,
DATA_LENGTH,
TABLE_COLLATION
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if table collaition is NULL

Copy link
Collaborator Author

@saksham-datazip saksham-datazip Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved now we will be fetching it from column instead

var (
approxRowCount int64
avgRowSize any
avgSchemaSize int64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it to tableDataLength as that will make more sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

rangeSlice := []string{}
// Try up to 5 times to generate balanced chunks by slightly adjusting the chunk size each iteration.
for retryAttempt := int64(0); retryAttempt < int64(5); retryAttempt++ {
temporarychunkdiff := new(big.Int).Set(chunkdiff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it to temporaryChunkDiff

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

expectedChunks = 1
}

chunkdiff := new(big.Int).Sub(&maxValBaseN, &minValBaseN)
Copy link
Collaborator

@vaibhav-datazip vaibhav-datazip Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is number of rows per chunk name it accordingly like chunkLength

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

logger.Infof("Using splitEvenlyForString Method for stream %s", stream.ID())
err = splitEvenlyForString(chunks)

case len(pkColumns) > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition should run when primary key is present

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +449 to +451
if approxRowCount == 0 {
return false, 0, 0, 0, nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this even get triggered ? as we do early return incase row count is 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved

return false, 0, 0, 0, nil
}

minFloat, err1 := typeutils.ReformatFloat64(minVal)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why err is defined every time why not use one error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +38 to +39
DistributionLower = 0.05
DistributionUpper = 100.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did we choose these distribution factors ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is an assumption and airbyte used it as well

Comment on lines +123 to +125
if err != nil {
logger.Debugf("Stream %s: PK is not numeric or conversion failed, falling back to string splitting: %v", stream.ID(), err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error is getting thrown even when its not evenly distributed

Comment on lines +319 to +322
val1 := convertUnicodeStringToInt(maxValPadded)
maxValBaseN.Set(&val1)
val2 := convertUnicodeStringToInt(minValPadded)
minValBaseN.Set(&val2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we setting value in min and maxbaseN when val1 and val2 is already bigint

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment on lines +325 to +327
if expectedChunks <= 0 {
expectedChunks = 1
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use ternary operator here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

pkg/jdbc/jdbc.go Outdated
Comment on lines +561 to +563
if len(values) == 0 {
return "", nil
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values slice will contain at least one value, when will this get triggered ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

}

unionParts := make([]string, 0, len(values))
args := make([]any, 0, len(values)+2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you defining it as slice of any because all values inserted in it are of type string

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because args is expected to be of type any by function querryContext itself in its definition

Comment on lines +348 to +352
query, args := jdbc.MySQLDistinctValuesWithCollationQuery(rangeSlice, tableCollationType)
rows, err := m.client.QueryContext(ctx, query, args...)
if err != nil {
return fmt.Errorf("failed to run distinct query: %s", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a info here that few chunks can be generated from this which can have 0 values


minFloat, err1 := typeutils.ReformatFloat64(minVal)
if err1 != nil {
return false, 0, 0, 0, fmt.Errorf("failed to parse minVal: %s", err1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of throwing error log that reformat failed trying string strategy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

}

// checks if the pk column is numeric and evenly distributed
func IsNumericAndEvenDistributed(minVal any, maxVal any, approxRowCount int64, chunkSize int64) (bool, int64, float64, float64, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will stepsize 0 mean is evenly distributed false , int that case can we eliminate the variable isevenlydistributed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

@@ -59,12 +64,18 @@ func (m *MySQL) ChunkIterator(ctx context.Context, stream types.StreamInterface,
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a todo here to separate the chunking functions from this one , that will help in creating unit tests for these functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

)

tableStatsQuery := jdbc.MySQLTableStatsQuery()
err := m.client.QueryRowContext(ctx, tableStatsQuery, stream.Name()).Scan(&approxRowCount, &avgRowSize, &avgSchemaSize, &tableCollationType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here instead of using table collation can we use column specific collation instead of?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

@saksham-datazip saksham-datazip force-pushed the feat/mysql-chunking-optimization branch from debd4eb to 1876fb5 Compare March 7, 2026 15:33
@saksham-datazip saksham-datazip force-pushed the feat/mysql-chunking-optimization branch from 0f4e0da to 86a2d91 Compare March 7, 2026 19:13
Comment on lines 314 to 326
case []uint8:
strVal := string(v)
intValue, err := strconv.ParseInt(strVal, 10, 64)
if err == nil {
return intValue, nil
}
uintValue, err := strconv.ParseUint(strVal, 10, 64)
if err != nil {
return int64(0), fmt.Errorf("failed to change []byte %v to int64: %v", v, err)
}
//nolint:gosec // G115: converting []uint8 to int64 is safe and required for backward compatibility
return int64(uintValue), nil
case bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check how this might affect backward compatibility

func IsNumericAndEvenDistributed(minVal any, maxVal any, approxRowCount int64, chunkSize int64) (int64, float64, float64) {
minFloat, err := typeutils.ReformatFloat64(minVal)
func IsNumericAndEvenDistributed(minVal any, maxVal any, approxRowCount int64, chunkSize int64) (int64, int64, int64) {
if exceedsInt64Limits(minVal) || exceedsInt64Limits(maxVal) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why check min exceeds limits, wont just checking max do the job ?

Copy link
Collaborator Author

@saksham-datazip saksham-datazip Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be helpful for string related case where minval can be greater than maxval due to string sorting
i.e., 2 is greater than INT_MAX during string sorting

return 0, 0, 0
}

minInt64, err := typeutils.ReformatInt64(minVal)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename these as minBoundary and maxBoundary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved


chunkDiff := new(big.Int).Sub(&maxEncodedBigIntValue, &minEncodedBigIntValue)
chunkDiff.Add(chunkDiff, new(big.Int).Sub(big.NewInt(expectedChunks), big.NewInt(1)))
chunkDiff.Div(chunkDiff, big.NewInt(expectedChunks)) //ceil division set up
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunkDiff -> stringChunkStepSize
temporaryChunkDiff -> adjustedStepSize

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

return exceedsInt64Limits(string(v))
case string:
if _, err := strconv.ParseInt(v, 10, 64); err == nil {
return false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case the pk is string we will still get

minVal or maxVal exceeds int64 limits

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved by adding improving the log to

Not a numeric column or maxVal exceeds int64 limits

from

maxVal exceeds int64 limits

Comment on lines +326 to +333
chunkDiff := new(big.Int).Sub(&maxEncodedBigIntValue, &minEncodedBigIntValue)
chunkDiff.Add(chunkDiff, new(big.Int).Sub(big.NewInt(expectedChunks), big.NewInt(1)))
chunkDiff.Div(chunkDiff, big.NewInt(expectedChunks)) //ceil division set up

rangeSlice := []string{}
// Try up to 5 times to generate balanced chunks by slightly adjusting the chunk size each iteration.
for retryAttempt := int64(0); retryAttempt < int64(5); retryAttempt++ {
temporaryChunkDiff := new(big.Int).Set(chunkDiff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested this for a table with primary key as string, and had set the size of parquet as 2kb , still getting chunk diff as a really high number.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this result is expected. Both the min and max values are padded to the maximum length of the string data type. Because of this padding, the effective range between them becomes large.

When we compute the chunk difference using:

ceil((max - min) / expectedChunks)

where expectedChunks represents the number of rows per chunk, the calculated value is mostly determined by the padded range of max - min.

As a result, this calculation is not significantly influenced by the Parquet file size, and observing a large chunkDiff value in this case is an expected outcome.

Copy link
Collaborator Author

@saksham-datazip saksham-datazip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self review

@@ -28,9 +28,13 @@ package constants
//
// - Version 4: (Current Version) Unsigned int/integer/bigint map to Int64.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove current version from it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this todo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants