Skip to content

Commit a8f68cf

Browse files
committed
start work on arrow flight sql client
1 parent aa4ba78 commit a8f68cf

File tree

18 files changed

+5890
-8
lines changed

18 files changed

+5890
-8
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ node_modules/
1414
.DS_Store
1515

1616
# Scratchpad Files
17+
scratchpad/**/*.md
1718
scratchpad/**/*.ts
19+
!scratchpad/index.ts

eslint.config.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ export default defineConfig(
2121
plugins: {
2222
"simple-import-sort": simpleImportSort,
2323
"sort-destructure-keys": sortDestructureKeys,
24-
"unused-imports": unusedImports
2524
},
2625

2726
languageOptions: {
@@ -130,5 +129,27 @@ export default defineConfig(
130129
rules: {
131130
"no-console": "error"
132131
}
133-
}
132+
},
133+
{
134+
files: ["scratchpad/eslint/**/*"],
135+
plugins: {
136+
"unused-imports": unusedImports
137+
},
138+
rules: {
139+
"unused-imports/no-unused-imports": "error",
140+
"@effect/dprint": [
141+
"error",
142+
{
143+
config: {
144+
indentWidth: 2,
145+
lineWidth: 80,
146+
semiColons: "asi",
147+
quoteStyle: "alwaysDouble",
148+
trailingCommas: "never",
149+
operatorPosition: "maintain",
150+
"arrowFunction.useParentheses": "force"
151+
}
152+
}
153+
]
154+
}
134155
)

flake.nix

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
corepack
2222
nodejs_24
2323
python3
24+
25+
go
2426
];
2527
};
2628
});

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
"@effect/language-service": "^0.57.1",
1313
"@eslint/js": "^9.39.1",
1414
"@types/node": "^24.10.1",
15+
"effect": "^3.19.8",
1516
"eslint": "^9.39.1",
1617
"eslint-import-resolver-typescript": "^4.4.4",
1718
"eslint-plugin-import-x": "^4.16.1",
1819
"eslint-plugin-simple-import-sort": "^12.1.1",
1920
"eslint-plugin-sort-destructure-keys": "^2.0.0",
2021
"eslint-plugin-unused-imports": "^4.3.0",
2122
"ts-patch": "^3.3.0",
23+
"tsx": "^4.21.0",
2224
"typescript": "^5.9.3",
2325
"typescript-eslint": "^8.48.0"
2426
}

packages/amp/buf.gen.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
version: v2
2+
3+
inputs:
4+
- git_repo: https://github.com/apache/arrow
5+
subdir: format
6+
7+
plugins:
8+
- local: protoc-gen-es
9+
out: src/proto
10+
opt:
11+
- target=ts
12+
- import_extension=ts

packages/amp/package.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"url": "https://github.com/edgeandnode/amp-typescript",
1111
"directory": "packages/amp"
1212
},
13+
"sideEffects": [],
1314
"exports": {
1415
"./package.json": "./package.json",
1516
".": "./src/index.ts",
@@ -22,10 +23,30 @@
2223
"dist/**/*.d.ts",
2324
"dist/**/*.d.ts.map"
2425
],
26+
"publishConfig": {
27+
"provenance": true,
28+
"exports": {
29+
"./package.json": "./package.json",
30+
".": "./dist/index.js",
31+
"./*": "./dist/*.js"
32+
}
33+
},
2534
"peerDependencies": {
35+
"@bufbuild/protobuf": "^2.10.1",
36+
"@connectrpc/connect": "^2.1.1",
37+
"@connectrpc/connect-node": "^2.1.1",
2638
"effect": "^3.19.8"
2739
},
2840
"devDependencies": {
41+
"@bufbuild/buf": "^1.61.0",
42+
"@bufbuild/protobuf": "^2.10.1",
43+
"@bufbuild/protoc-gen-es": "^2.10.1",
44+
"@connectrpc/connect": "^2.1.1",
45+
"@connectrpc/connect-node": "^2.1.1",
2946
"effect": "^3.19.8"
47+
},
48+
"dependencies": {
49+
"@uwdata/flechette": "^2.2.6",
50+
"apache-arrow": "^21.1.0"
3051
}
3152
}

packages/amp/src/ArrowFlight.ts

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import { RecordBatchReader, tableFromIPC } from 'apache-arrow'
2+
import { create, toBinary } from "@bufbuild/protobuf"
3+
import { anyPack, AnySchema } from "@bufbuild/protobuf/wkt"
4+
import { type Client, createClient, type Transport as ConnectTransport } from "@connectrpc/connect"
5+
import * as Context from "effect/Context"
6+
import * as Effect from "effect/Effect"
7+
import * as Layer from "effect/Layer"
8+
import * as Schema from "effect/Schema"
9+
import * as Stream from "effect/Stream"
10+
import {
11+
type FlightData,
12+
FlightDescriptor_DescriptorType,
13+
FlightDescriptorSchema,
14+
FlightService
15+
} from "./proto/Flight_pb.ts"
16+
import { CommandStatementQuerySchema } from "./proto/FlightSql_pb.ts"
17+
18+
/**
19+
* A service which abstracts the underlying transport for a given client.
20+
*
21+
* A transport implements a protocol, such as Connect or gRPC-web, and allows
22+
* for the concrete clients to be independent of the protocol.
23+
*/
24+
export class Transport extends Context.Tag("@edgeandnode/amp/Transport")<
25+
Transport,
26+
ConnectTransport
27+
>() {}
28+
29+
/**
30+
* Represents the possible errors that can occur when executing an Arrow Flight
31+
* query.
32+
*/
33+
export type ArrowFlightQueryError =
34+
| RpcError
35+
| NoEndpointsError
36+
| MultipleEndpointsError
37+
| TicketNotFoundError
38+
39+
/**
40+
* Represents an Arrow Flight RPC request that failed.
41+
*/
42+
export class RpcError extends Schema.TaggedError<RpcError>(
43+
"@edgeandnode/amp/RpcError"
44+
)("RpcError", {
45+
method: Schema.String,
46+
/**
47+
* The underlying reason for the failed RPC request.
48+
*/
49+
cause: Schema.Defect
50+
}) {}
51+
52+
/**
53+
* Represents an error that occurred as a result of a `FlightInfo` request
54+
* returning an empty list of endpoints from which data can be acquired.
55+
*/
56+
export class NoEndpointsError extends Schema.TaggedError<NoEndpointsError>(
57+
"@edgeandnode/amp/NoEndpointsError"
58+
)("NoEndpointsError", {
59+
/**
60+
* The SQL query that was requested.
61+
*/
62+
query: Schema.String
63+
}) {}
64+
65+
// TODO: determine if this is _really_ a logical error case
66+
/**
67+
* Represents an error that occured as a result of a `FlightInfo` request
68+
* returning multiple endpoints from which data can be acquired.
69+
*
70+
* For Amp queries, there should only ever be **one** authoritative source
71+
* of data.
72+
*/
73+
export class MultipleEndpointsError extends Schema.TaggedError<MultipleEndpointsError>(
74+
"@edgeandnode/amp/MultipleEndpointsError"
75+
)("MultipleEndpointsError", {
76+
/**
77+
* The SQL query that was requested.
78+
*/
79+
query: Schema.String
80+
}) {}
81+
82+
/**
83+
* Represents an error that occurred as a result of a `FlightInfo` request
84+
* whose endpoint did not have a ticket.
85+
*/
86+
export class TicketNotFoundError extends Schema.TaggedError<TicketNotFoundError>(
87+
"@edgeandnode/amp/TicketNotFoundError"
88+
)("TicketNotFoundError", {
89+
/**
90+
* The SQL query that was requested.
91+
*/
92+
query: Schema.String
93+
}) {}
94+
95+
/**
96+
* A service which can be used to execute queries against an Arrow Flight API.
97+
*/
98+
export class ArrowFlight extends Context.Tag("@edgeandnode/amp/ArrowFlight")<ArrowFlight, {
99+
/**
100+
* The Connect `Client` that will be used to execute Arrow Flight queries.
101+
*/
102+
readonly client: Client<typeof FlightService>
103+
}>() {}
104+
105+
const make = Effect.gen(function*() {
106+
const transport = yield* Transport
107+
const client = createClient(FlightService, transport)
108+
109+
const request = Effect.fn("ArrowFlight.request")(function*(query: string) {
110+
const cmd = create(CommandStatementQuerySchema, { query })
111+
const any = anyPack(CommandStatementQuerySchema, cmd)
112+
const desc = create(FlightDescriptorSchema, {
113+
type: FlightDescriptor_DescriptorType.CMD,
114+
cmd: toBinary(AnySchema, any)
115+
})
116+
117+
const flightInfo = yield* Effect.tryPromise({
118+
try: (signal) => client.getFlightInfo(desc, { signal }),
119+
catch: (cause) => new RpcError({ cause, method: "getFlightInfo" })
120+
})
121+
122+
if (flightInfo.endpoint.length !== 1) {
123+
return yield* flightInfo.endpoint.length <= 0
124+
? new NoEndpointsError({ query })
125+
: new MultipleEndpointsError({ query })
126+
}
127+
128+
const { ticket } = flightInfo.endpoint[0]!
129+
130+
if (ticket === undefined) {
131+
return yield* new TicketNotFoundError({ query })
132+
}
133+
134+
const flightDataStream = Stream.unwrapScoped(Effect.gen(function*() {
135+
const controller = yield* Effect.acquireRelease(
136+
Effect.sync(() => new AbortController()),
137+
(controller) => Effect.sync(() => controller.abort())
138+
)
139+
return Stream.fromAsyncIterable(
140+
client.doGet(ticket, { signal: controller.signal }),
141+
(cause) => new RpcError({ cause, method: "doGet" })
142+
)
143+
})).pipe(Stream.map(flightDataToIPC))
144+
145+
yield* flightDataStream.pipe(
146+
Stream.runForEach((table) =>
147+
Effect.sync(() => {
148+
console.dir(table, { depth: null, colors: true })
149+
})
150+
)
151+
)
152+
})
153+
154+
// Test code
155+
yield* Effect.orDie(request("SELECT * FROM intTable"))
156+
157+
return {
158+
client
159+
} as const
160+
})
161+
162+
/**
163+
* A layer which constructs a concrete implementation of an `ArrowFlight`
164+
* service and depends upon some implementation of a `Transport`.
165+
*/
166+
export const layer: Layer.Layer<ArrowFlight, never, Transport> = Layer.effect(ArrowFlight, make)
167+
168+
/**
169+
* Converts a `FlightData` payload into Apache Arrow IPC format.
170+
*/
171+
const flightDataToIPC = (flightData: FlightData): Uint8Array => {
172+
// Arrow IPC Stream format requires:
173+
// 1. Continuation indicator (4 bytes): 0xFFFFFFFF
174+
// 2. Metadata length (4 bytes, little-endian)
175+
// 3. Metadata (padded to 8-byte boundary)
176+
// 4. Body data (already 8-byte aligned)
177+
178+
const continuationToken = new Uint8Array([0xFF, 0xFF, 0xFF, 0xFF])
179+
180+
// Get metadata length
181+
const metadataLength = flightData.dataHeader.length
182+
const metadataLengthBytes = new Uint8Array(4)
183+
new DataView(metadataLengthBytes.buffer).setUint32(0, metadataLength, true) // little-endian
184+
185+
// Calculate padding needed to align to 8 bytes
186+
const paddingSize = (8 - ((8 + metadataLength) % 8)) % 8
187+
const padding = new Uint8Array(paddingSize)
188+
189+
// Combine all parts
190+
const totalLength = continuationToken.length + // 4 bytes
191+
metadataLengthBytes.length + // 4 bytes
192+
metadataLength + // variable
193+
paddingSize + // 0-7 bytes
194+
flightData.dataBody.length // variable
195+
196+
const ipcMessage = new Uint8Array(totalLength)
197+
let offset = 0
198+
199+
// Write continuation token
200+
ipcMessage.set(continuationToken, offset)
201+
offset += continuationToken.length
202+
203+
// Write metadata length
204+
ipcMessage.set(metadataLengthBytes, offset)
205+
offset += metadataLengthBytes.length
206+
207+
// Write metadata
208+
ipcMessage.set(flightData.dataHeader, offset)
209+
offset += metadataLength
210+
211+
// Write padding
212+
ipcMessage.set(padding, offset)
213+
offset += paddingSize
214+
215+
// Write body
216+
ipcMessage.set(flightData.dataBody, offset)
217+
218+
return ipcMessage
219+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { createGrpcTransport, type GrpcTransportOptions } from "@connectrpc/connect-node"
2+
import * as Layer from "effect/Layer"
3+
import { Transport } from "../ArrowFlight.ts"
4+
5+
/**
6+
* Create a `Transport` for the gRPC protocol using the Node.js `http2` module.
7+
*/
8+
export const layerTransportGrpc = (options: GrpcTransportOptions): Layer.Layer<Transport> =>
9+
Layer.sync(Transport, () => createGrpcTransport(options))

packages/amp/src/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
import * as Effect from "effect/Effect"
2-
3-
export const program = Effect.void
1+
/**
2+
* An implementation of the Arrow Flight protocol.
3+
*/
4+
export * as ArrowFlight from "./ArrowFlight.ts"

packages/amp/src/proto/FlightSql_pb.ts

Lines changed: 3632 additions & 0 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)