Skip to content

Commit 2369834

Browse files
authored
Merge branch 'staging' into feat/postgres_ssl_ca
2 parents 69e5825 + fa32a77 commit 2369834

File tree

3 files changed

+67
-4
lines changed

3 files changed

+67
-4
lines changed

drivers/oracle/internal/incremental.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package driver
33
import (
44
"context"
55
"fmt"
6+
"strings"
7+
"time"
68

79
"github.com/datazip-inc/olake/constants"
810
"github.com/datazip-inc/olake/drivers/abstract"
911
"github.com/datazip-inc/olake/pkg/jdbc"
1012
"github.com/datazip-inc/olake/types"
13+
"github.com/datazip-inc/olake/utils/logger"
1114
)
1215

1316
// StreamIncrementalChanges implements incremental sync for Oracle
@@ -43,9 +46,41 @@ func (o *Oracle) StreamIncrementalChanges(ctx context.Context, stream types.Stre
4346
}
4447

4548
func (o *Oracle) FetchMaxCursorValues(ctx context.Context, stream types.StreamInterface) (any, any, error) {
46-
maxPrimaryCursorValue, maxSecondaryCursorValue, err := jdbc.GetMaxCursorValues(ctx, o.client, constants.Oracle, stream)
49+
maxPrimary, maxSecondary, err := jdbc.GetMaxCursorValues(ctx, o.client, constants.Oracle, stream)
4750
if err != nil {
4851
return nil, nil, err
4952
}
50-
return maxPrimaryCursorValue, maxSecondaryCursorValue, nil
53+
54+
primaryCursor, secondaryCursor := stream.Cursor()
55+
maxPrimary = o.normalizeCursorValue(ctx, stream, primaryCursor, maxPrimary)
56+
if secondaryCursor != "" {
57+
maxSecondary = o.normalizeCursorValue(ctx, stream, secondaryCursor, maxSecondary)
58+
}
59+
return maxPrimary, maxSecondary, nil
60+
}
61+
62+
// normalizeCursorTime converts a cursor time.Time from GetMaxCursorValues to plain UTC,
63+
// applying wall-clock strip for TZ-naive Oracle columns and .UTC() for TZ-aware ones.
64+
func (o *Oracle) normalizeCursorValue(ctx context.Context, stream types.StreamInterface, cursorField string, value any) any {
65+
t, ok := value.(time.Time)
66+
if !ok {
67+
return value
68+
}
69+
var dataType string
70+
query := jdbc.OracleColumnDataTypeQuery(stream.Namespace(), stream.Name(), strings.ToUpper(cursorField))
71+
if err := o.client.QueryRowContext(ctx, query).Scan(&dataType); err != nil {
72+
logger.Warnf("normalizeCursorValue: failed to get DATA_TYPE for %s.%s.%s, cursor may be incorrect: %s",
73+
stream.Namespace(), stream.Name(), cursorField, err)
74+
return value
75+
}
76+
// FetchMaxCursorValues bypasses dataTypeConverter, so go-ora attaches the session timezone
77+
// to TZ-naive columns and the original timezone to TZ-aware columns. Normalize here so
78+
// the cursor values entering FormatCursorValue are already plain UTC.
79+
upper := strings.ToUpper(dataType)
80+
if strings.Contains(upper, "WITH TIME ZONE") || strings.Contains(upper, "WITH LOCAL TIME ZONE") {
81+
// TZ-aware: convert to the correct UTC instant.
82+
return t.UTC()
83+
}
84+
// TZ-naive (TIMESTAMP, DATE): strip session-TZ offset, preserve wall-clock as UTC.
85+
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
5186
}

drivers/oracle/internal/oracle.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,33 @@ func (o *Oracle) ProduceSchema(ctx context.Context, streamName string) (*types.S
211211
return stream, pkRows.Err()
212212
}
213213

214+
// tzNaiveOracleTypes are Oracle column types that store only wall-clock time with no timezone.
215+
// go-ora attaches dbServerTimeZone when decoding all three via the same code path:
216+
//
217+
// DATE (12) → "DATE" — Oracle DATE columns
218+
// TimeStampDTY (180) → "TimeStampDTY" — TIMESTAMP columns in regular SELECT results
219+
// TODO: Add support for TIMESTAMP (187) → "TIMESTAMP" — TIMESTAMP in RETURNING INTO / PL/SQL OUT params
220+
//
221+
// Strip the attached offset to preserve wall-clock digits consistently as UTC.
222+
var tzNaiveOracleTypes = map[string]bool{
223+
"date": true,
224+
"timestampdty": true,
225+
}
226+
214227
func (o *Oracle) dataTypeConverter(value interface{}, columnType string) (interface{}, error) {
215228
if value == nil {
216229
return nil, typeutils.ErrNullValue
217230
}
218231
olakeType := typeutils.ExtractAndMapColumnType(columnType, oracleTypeToDataTypes)
219-
return typeutils.ReformatValue(olakeType, value)
232+
result, err := typeutils.ReformatValue(olakeType, value)
233+
if err != nil {
234+
return result, err
235+
}
236+
// Strip the session-timezone offset that go-ora attaches to timezone-naive columns.
237+
if tzNaiveOracleTypes[strings.ToLower(columnType)] {
238+
if t, ok := result.(time.Time); ok {
239+
result = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
240+
}
241+
}
242+
return result, nil
220243
}

pkg/jdbc/jdbc.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,11 @@ func OracleTableDetailsQuery(schemaName, tableName string) string {
941941
return fmt.Sprintf("SELECT column_name, data_type, nullable, data_precision, data_scale FROM all_tab_columns WHERE owner = '%s' AND table_name = '%s'", schemaName, tableName)
942942
}
943943

944+
// OracleColumnDataTypeQuery returns the query to fetch the data type of a column in OracleDB
945+
func OracleColumnDataTypeQuery(schemaName, tableName, columnName string) string {
946+
return fmt.Sprintf("SELECT DATA_TYPE FROM ALL_TAB_COLUMNS WHERE OWNER = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'", schemaName, tableName, columnName)
947+
}
948+
944949
// OraclePrimaryKeyQuery returns the query to fetch all the primary key columns of a table in OracleDB
945950
func OraclePrimaryKeyColummsQuery(schemaName, tableName string) string {
946951
return fmt.Sprintf(`SELECT cols.column_name FROM all_constraints cons, all_cons_columns cols WHERE cons.constraint_type = 'P' AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner AND cons.owner = '%s' AND cols.table_name = '%s'`, schemaName, tableName)
@@ -1076,7 +1081,7 @@ func IncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceho
10761081
var dbDatatype string
10771082
switch opts.Driver {
10781083
case constants.Oracle:
1079-
query := fmt.Sprintf("SELECT DATA_TYPE FROM ALL_TAB_COLUMNS WHERE OWNER = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'", stream.Namespace(), stream.Name(), cursorField)
1084+
query := OracleColumnDataTypeQuery(stream.Namespace(), stream.Name(), cursorField)
10801085
err = opts.Client.QueryRowContext(ctx, query).Scan(&dbDatatype)
10811086
if err != nil {
10821087
return "", nil, fmt.Errorf("failed to get column datatype: %s", err)

0 commit comments

Comments
 (0)