Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
83ebf36
feat: mysql chunking optimization
saksham-datazip Jan 27, 2026
f5766f8
mysql optimization comment resolve
saksham-datazip Jan 27, 2026
443cf94
Merge branch 'staging' into feat/mysql-chunking-optimization
saksham-datazip Jan 27, 2026
6fc574c
Merge branch 'staging' into feat/mysql-chunking-optimization
saksham-datazip Feb 2, 2026
c09aee8
chore: formatting fix
saksham-datazip Feb 3, 2026
53520de
my-sql-chunking-formatting-resolved
saksham-datazip Feb 3, 2026
3b9fbe7
mysql-chunking-self-reviewed
saksham-datazip Feb 3, 2026
8e4ba6a
mysql-chunking-optimization-for-string-pk
saksham-datazip Feb 7, 2026
1707ae1
Merge branch 'staging' into feat/mysql-chunking-optimization
saksham-datazip Feb 7, 2026
feca5a0
Merge branch 'staging' into feat/mysql-chunking-optimization
vaibhav-datazip Feb 9, 2026
ccfb371
feat: solved lint issue
saksham-datazip Feb 9, 2026
fe4b4b2
Merge branch 'staging' into feat/mysql-chunking-optimization
saksham-datazip Feb 10, 2026
910246a
feat: mysql chunking optimization review resolved
saksham-datazip Feb 10, 2026
1eacf5a
feat: resolving-lint-extra-spaces
saksham-datazip Feb 10, 2026
964a2ee
feat: lint error resolved
saksham-datazip Feb 10, 2026
11a9f03
feat: self-reviewed
saksham-datazip Feb 10, 2026
348c21a
Merge branch 'staging' into feat/mysql-chunking-optimization
vaibhav-datazip Feb 12, 2026
94a6fd8
feat: recommiting the logical issue in mysql chunking calculation
saksham-datazip Feb 16, 2026
aa03463
Merge branch 'staging' into feat/mysql-chunking-optimization
ImDoubD-datazip Feb 20, 2026
bc1abf8
Merge branch 'staging' into feat/mysql-chunking-optimization
saksham-datazip Feb 23, 2026
6e5e82f
feat: added splitviaprimarykey function
saksham-datazip Feb 24, 2026
d9189b9
chore: Merge branch 'feat/mysql-chunking-optimization' of https://git…
saksham-datazip Feb 24, 2026
69a1714
chore: improved formatting and resolved calculating validationcount q…
saksham-datazip Feb 28, 2026
96b5689
chore: merge conflict resolved
saksham-datazip Mar 1, 2026
66749dd
chore: constant size readjusted
saksham-datazip Mar 1, 2026
fa24a2c
chore: saperated buildChunkConditionMySQL function from mssql
saksham-datazip Mar 2, 2026
8411568
chore: fixed buildChunkConditionMySQL function for multiple colummns
saksham-datazip Mar 3, 2026
64f31c1
chore: resolved comment for final-testing
saksham-datazip Mar 4, 2026
debd4eb
chore: resolved lint error
saksham-datazip Mar 4, 2026
86a2d91
fix: changes pulled from staging
saksham-datazip Mar 7, 2026
8ead67e
chore: float and uint8 issue resolved
saksham-datazip Mar 9, 2026
0caf2aa
chore: converted float64 to int64
saksham-datazip Mar 9, 2026
8ccfdd6
chore: added uint8[] block and took datatype for numeric value from i…
saksham-datazip Mar 11, 2026
7754d72
chore: self reviewed
saksham-datazip Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
EffectiveParquetSize = int64(256) * 1024 * 1024 * int64(8)
DB2StateTimestampFormat = "2006-01-02 15:04:05.000000"
DefaultStateTimestampFormat = "2006-01-02T15:04:05.000000000Z"
// DistributionLower is the lower bound for distribution factor
DistributionLower = 0.05
// DistributionUpper is the upper bound for distribution factor
DistributionUpper = 100.0
Comment on lines +38 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did we choose these distribution factors ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is an assumption and airbyte used it as well

)

type DriverType string
Expand Down
88 changes: 74 additions & 14 deletions drivers/mysql/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ func (m *MySQL) ChunkIterator(ctx context.Context, stream types.StreamInterface,
sort.Strings(pkColumns)

logger.Debugf("Starting backfill from %v to %v with filter: %s, args: %v", chunk.Min, chunk.Max, filter, args)
// Get chunks from state or calculate new ones
stmt := ""
// Get cxhunks from state or calculate new ones
var stmt string
if chunkColumn != "" {
stmt = jdbc.MysqlChunkScanQuery(stream, []string{chunkColumn}, chunk, filter)
} else if len(pkColumns) > 0 {
stmt = jdbc.MysqlChunkScanQuery(stream, pkColumns, chunk, filter)
} else {
stmt = jdbc.MysqlLimitOffsetScanQuery(stream, chunk, filter)
}

logger.Debugf("Executing chunk query: %s", stmt)
setter := jdbc.NewReader(ctx, stmt, func(ctx context.Context, query string, queryArgs ...any) (*sql.Rows, error) {
return tx.QueryContext(ctx, query, args...)
Expand Down Expand Up @@ -93,21 +94,39 @@ func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo
chunkSize := int64(math.Ceil(float64(constants.EffectiveParquetSize) / avgRowSizeFloat))
chunks := types.NewSet[types.Chunk]()
chunkColumn := stream.Self().StreamMetadata.ChunkColumn

var (
isEvenDistribution bool
step int64
minVal any //to define lower range of the chunk
maxVal any //to define upper range of the chunk
minFloat float64
maxFloat float64
)

pkColumns := stream.GetStream().SourceDefinedPrimaryKey.Array()
if chunkColumn != "" {
pkColumns = []string{chunkColumn}
}
sort.Strings(pkColumns)

if stream.GetStream().SourceDefinedPrimaryKey.Len() > 0 || chunkColumn != "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can directly check the pkcolumns array here , it has all the info

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

err = jdbc.WithIsolation(ctx, m.client, true, func(tx *sql.Tx) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the need of isolation here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

var err error
minVal, maxVal, err = m.getTableExtremes(ctx, stream, pkColumns, tx)
return err
})
if err != nil {
return nil, fmt.Errorf("failed to get table extremes: %s", err)
}
}
if len(pkColumns) == 1 {
isEvenDistribution, step, minFloat, maxFloat = shouldUseEvenDistribution(minVal, maxVal, approxRowCount, chunkSize)
}
// Takes the user defined batch size as chunkSize
// TODO: common-out the chunking logic for db2, mssql, mysql
splitViaPrimaryKey := func(stream types.StreamInterface, chunks *types.Set[types.Chunk]) error {
Comment on lines +155 to 159
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave a line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

return jdbc.WithIsolation(ctx, m.client, true, func(tx *sql.Tx) error {
// Get primary key column using the provided function
pkColumns := stream.GetStream().SourceDefinedPrimaryKey.Array()
if chunkColumn != "" {
pkColumns = []string{chunkColumn}
}
sort.Strings(pkColumns)
// Get table extremes
minVal, maxVal, err := m.getTableExtremes(ctx, stream, pkColumns, tx)
if err != nil {
return fmt.Errorf("failed to get table extremes: %s", err)
}
if minVal == nil {
return nil
}
Expand Down Expand Up @@ -180,7 +199,31 @@ func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo
})
}

if stream.GetStream().SourceDefinedPrimaryKey.Len() > 0 || chunkColumn != "" {
//used mathematical calculation to split the chunks for cases where the distribution factor is within the range
splitEvenlyForInt := func(minf, maxf float64, chunks *types.Set[types.Chunk], step float64) {
if minf+step > maxf {
chunks.Insert(types.Chunk{
Min: nil,
Max: nil,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does nil to nil chunk mean ? , at other places as well we have created chunk from nil to start and start to nil if there is no further value . similar can be done and if there is no value in table we don't need to proceed till this function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

})
return
}
prev := minf
for next := minf + step; next <= maxf; next += step {
chunks.Insert(types.Chunk{
Min: utils.ConvertToString(prev),
Max: utils.ConvertToString(next),
})
prev = next
}
chunks.Insert(types.Chunk{
Min: utils.ConvertToString(prev),
Max: nil,
})
}
if len(pkColumns) == 1 && isEvenDistribution {
splitEvenlyForInt(minFloat, maxFloat, chunks, float64(step))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a space here between the above code block and start of this if block. also rename this variable as minFloat and maxFloat isn't current name for a pk which is just integer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be better if you use int as step so that there are well defined boundaries

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this float is because of float pk, their we will need it

} else if len(pkColumns) > 0 {
err = splitViaPrimaryKey(stream, chunks)
} else {
err = limitOffsetChunking(chunks)
Expand All @@ -193,3 +236,20 @@ func (m *MySQL) getTableExtremes(ctx context.Context, stream types.StreamInterfa
err = tx.QueryRowContext(ctx, query).Scan(&min, &max)
return min, max, err
}

func shouldUseEvenDistribution(minVal any, maxVal any, approxRowCount int64, chunkSize int64) (bool, int64, float64, float64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment for what this function does and what it returns

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

if approxRowCount == 0 {
return false, 0, 0, 0
}
minFloat, err1 := typeutils.ReformatFloat64(minVal)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why err is defined every time why not use one error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

maxFloat, err2 := typeutils.ReformatFloat64(maxVal)
if err1 != nil || err2 != nil {
return false, 0, 0, 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again why are we not sending the error back ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

}
distributionFactor := (maxFloat - minFloat + 1) / float64(approxRowCount)
if distributionFactor < constants.DistributionLower || distributionFactor > constants.DistributionUpper {
return false, 0, 0, 0
}
step := int64(math.Max(distributionFactor*float64(chunkSize), 1))
return true, step, minFloat, maxFloat
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the need of returning minfloat and max float from here ? isn't It the same value which you had sent to this function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

}
3 changes: 3 additions & 0 deletions pkg/jdbc/jdbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk,
func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string {
condition := buildChunkConditionMySQL(filterColumns, chunk, extraFilter)
quotedTable := QuoteTable(stream.Namespace(), stream.Name(), constants.MySQL)
if condition == "" {
condition = utils.Ternary(extraFilter != "", extraFilter, "1 = 1").(string)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will create not so good query, instead you could have added the WHERE also here and appended it in last string

also can you give example of case where the condition string will be empty. because AFAIK condition creates query based on chunk , and always there will be a chunk like

from start to null

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used in other drivers as well so i continued using it for sake of consistency

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

return fmt.Sprintf("SELECT * FROM %s WHERE %s", quotedTable, condition)
}

Expand Down
Loading