Skip to content

Commit 46ecf6d

Browse files
KyleAMathewsclaude
andauthored
Fix columnMapper to support loading subsets (#3564)
When using columnMapper with ShapeStream, the columns parameter was not being encoded from app column names to database column names. This fix ensures that when a user specifies columns using app format (e.g., camelCase like `userId`), they are properly converted to the database format (e.g., snake_case like `user_id`) before being sent to the server. This makes columnMapper work correctly with column subsets. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 7ffb32f commit 46ecf6d

File tree

6 files changed

+364
-7
lines changed

6 files changed

+364
-7
lines changed

packages/typescript-client/src/client.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import {
99
SnapshotMetadata,
1010
} from './types'
1111
import { MessageParser, Parser, TransformFunction } from './parser'
12-
import { ColumnMapper, encodeWhereClause } from './column-mapper'
12+
import {
13+
ColumnMapper,
14+
encodeWhereClause,
15+
quoteIdentifier,
16+
} from './column-mapper'
1317
import { getOffset, isUpToDateMessage, isChangeMessage } from './helpers'
1418
import {
1519
FetchError,
@@ -855,8 +859,27 @@ export class ShapeStream<T extends Row<unknown> = Row>
855859
)
856860
setQueryParam(fetchUrl, WHERE_QUERY_PARAM, encodedWhere)
857861
}
858-
if (params.columns)
859-
setQueryParam(fetchUrl, COLUMNS_QUERY_PARAM, params.columns)
862+
if (params.columns) {
863+
// Get original columns array from options (before toInternalParams converted to string)
864+
const originalColumns = await resolveValue(this.options.params?.columns)
865+
if (Array.isArray(originalColumns)) {
866+
// Apply columnMapper encoding if present
867+
let encodedColumns = originalColumns.map(String)
868+
if (this.options.columnMapper) {
869+
encodedColumns = encodedColumns.map(
870+
this.options.columnMapper.encode
871+
)
872+
}
873+
// Quote each column name to handle special characters (commas, etc.)
874+
const serializedColumns = encodedColumns
875+
.map(quoteIdentifier)
876+
.join(`,`)
877+
setQueryParam(fetchUrl, COLUMNS_QUERY_PARAM, serializedColumns)
878+
} else {
879+
// Fallback: columns was already a string
880+
setQueryParam(fetchUrl, COLUMNS_QUERY_PARAM, params.columns)
881+
}
882+
}
860883
if (params.replica) setQueryParam(fetchUrl, REPLICA_PARAM, params.replica)
861884
if (params.params)
862885
setQueryParam(fetchUrl, WHERE_PARAMS_PARAM, params.params)

packages/typescript-client/src/column-mapper.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,31 @@ import { Schema } from './types'
33
type DbColumnName = string
44
type AppColumnName = string
55

6+
/**
7+
* Quote a PostgreSQL identifier for safe use in query parameters.
8+
*
9+
* Wraps the identifier in double quotes and escapes any internal
10+
* double quotes by doubling them. This ensures identifiers with
11+
* special characters (commas, spaces, etc.) are handled correctly.
12+
*
13+
* @param identifier - The identifier to quote
14+
* @returns The quoted identifier
15+
*
16+
* @example
17+
* ```typescript
18+
* quoteIdentifier('user_id') // '"user_id"'
19+
* quoteIdentifier('foo,bar') // '"foo,bar"'
20+
* quoteIdentifier('has"quote') // '"has""quote"'
21+
* ```
22+
*
23+
* @internal
24+
*/
25+
export function quoteIdentifier(identifier: string): string {
26+
// Escape internal double quotes by doubling them
27+
const escaped = identifier.replace(/"/g, `""`)
28+
return `"${escaped}"`
29+
}
30+
631
/**
732
* A bidirectional column mapper that handles transforming column **names**
833
* between database format (e.g., snake_case) and application format (e.g., camelCase).

packages/typescript-client/test/column-mapper.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,34 @@ import {
55
createColumnMapper,
66
snakeCamelMapper,
77
encodeWhereClause,
8+
quoteIdentifier,
89
} from '../src/column-mapper'
910
import type { Schema } from '../src/types'
1011

12+
describe(`quoteIdentifier`, () => {
13+
it(`should wrap identifier in double quotes`, () => {
14+
expect(quoteIdentifier(`user_id`)).toBe(`"user_id"`)
15+
expect(quoteIdentifier(`simple`)).toBe(`"simple"`)
16+
})
17+
18+
it(`should escape internal double quotes by doubling`, () => {
19+
expect(quoteIdentifier(`has"quote`)).toBe(`"has""quote"`)
20+
expect(quoteIdentifier(`two"quotes"here`)).toBe(`"two""quotes""here"`)
21+
})
22+
23+
it(`should handle identifiers with commas`, () => {
24+
expect(quoteIdentifier(`foo,bar`)).toBe(`"foo,bar"`)
25+
})
26+
27+
it(`should handle identifiers with spaces`, () => {
28+
expect(quoteIdentifier(`has space`)).toBe(`"has space"`)
29+
})
30+
31+
it(`should handle empty string`, () => {
32+
expect(quoteIdentifier(``)).toBe(`""`)
33+
})
34+
})
35+
1136
describe(`snakeToCamel`, () => {
1237
it(`should convert snake_case to camelCase`, () => {
1338
expect(snakeToCamel(`user_id`)).toBe(`userId`)

packages/typescript-client/test/integration.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
IssueRow,
1414
testWithIssuesTable as it,
1515
testWithMultitypeTable as mit,
16+
testWithSpecialColumnsTable as scit,
1617
} from './support/test-context'
1718
import * as h from './support/test-helpers'
1819

@@ -817,6 +818,48 @@ describe(`HTTP Sync`, () => {
817818
}
818819
)
819820

821+
scit(
822+
`should handle columns with special characters (commas, quotes, spaces)`,
823+
async ({ dbClient, aborter, tableSql, tableUrl }) => {
824+
// Insert data with special column names
825+
await dbClient.query(
826+
`INSERT INTO ${tableSql} (id, "normal", "has,comma", "has""quote", "has space") VALUES ($1, $2, $3, $4, $5)`,
827+
[1, `normal_value`, `comma_value`, `quote_value`, `space_value`]
828+
)
829+
830+
// Get initial data, selecting only some columns including ones with special chars
831+
const shapeData = new Map()
832+
const stream = new ShapeStream({
833+
url: `${BASE_URL}/v1/shape`,
834+
params: {
835+
table: tableUrl,
836+
columns: [`id`, `normal`, `has,comma`, `has"quote`],
837+
},
838+
signal: aborter.signal,
839+
})
840+
841+
await h.forEachMessage(stream, aborter, async (res, msg, nth) => {
842+
if (!isChangeMessage(msg)) return
843+
shapeData.set(msg.key, msg.value)
844+
845+
if (nth === 0) {
846+
// Verify we got the correct columns, including those with special chars
847+
expect(msg.value).toStrictEqual({
848+
id: 1,
849+
normal: `normal_value`,
850+
'has,comma': `comma_value`,
851+
'has"quote': `quote_value`,
852+
})
853+
// Verify the column with space was NOT included
854+
expect(msg.value).not.toHaveProperty(`has space`)
855+
res()
856+
}
857+
})
858+
859+
expect([...shapeData.values()]).toHaveLength(1)
860+
}
861+
)
862+
820863
it(`should chunk a large log with reasonably sized chunks`, async ({
821864
insertIssues,
822865
waitForIssues,

0 commit comments

Comments
 (0)