Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ node_modules/
.DS_Store

# Scratchpad Files
scratchpad/**/*.md
scratchpad/**/*.ts
!scratchpad/index.ts
27 changes: 24 additions & 3 deletions eslint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ export default defineConfig(
{
plugins: {
"simple-import-sort": simpleImportSort,
"sort-destructure-keys": sortDestructureKeys,
"unused-imports": unusedImports
"sort-destructure-keys": sortDestructureKeys
},

languageOptions: {
Expand Down Expand Up @@ -71,7 +70,6 @@ export default defineConfig(
"import-x/order": "off",
"simple-import-sort/imports": "off",
"sort-destructure-keys/sort-destructure-keys": "error",
"unused-imports/no-unused-imports": "error",
"deprecation/deprecation": "off",

"@typescript-eslint/array-type": [
Expand Down Expand Up @@ -130,5 +128,28 @@ export default defineConfig(
rules: {
"no-console": "error"
}
},
{
files: ["scratchpad/eslint/**/*"],
plugins: {
"unused-imports": unusedImports
},
rules: {
"unused-imports/no-unused-imports": "error",
"@effect/dprint": [
"error",
{
config: {
indentWidth: 2,
lineWidth: 80,
semiColons: "asi",
quoteStyle: "alwaysDouble",
trailingCommas: "never",
operatorPosition: "maintain",
"arrowFunction.useParentheses": "force"
}
}
]
}
}
)
2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
corepack
nodejs_24
python3

go
];
};
});
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
"@effect/language-service": "^0.57.1",
"@eslint/js": "^9.39.1",
"@types/node": "^24.10.1",
"effect": "^3.19.8",
"eslint": "^9.39.1",
"eslint-import-resolver-typescript": "^4.4.4",
"eslint-plugin-import-x": "^4.16.1",
"eslint-plugin-simple-import-sort": "^12.1.1",
"eslint-plugin-sort-destructure-keys": "^2.0.0",
"eslint-plugin-unused-imports": "^4.3.0",
"ts-patch": "^3.3.0",
"tsx": "^4.21.0",
"typescript": "^5.9.3",
"typescript-eslint": "^8.48.0"
}
Expand Down
12 changes: 12 additions & 0 deletions packages/amp/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: v2

inputs:
- git_repo: https://github.com/apache/arrow
subdir: format

plugins:
- local: protoc-gen-es
out: src/proto
opt:
- target=ts
- import_extension=ts
22 changes: 22 additions & 0 deletions packages/amp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"url": "https://github.com/edgeandnode/amp-typescript",
"directory": "packages/amp"
},
"sideEffects": [],
"exports": {
"./package.json": "./package.json",
".": "./src/index.ts",
Expand All @@ -22,10 +23,31 @@
"dist/**/*.d.ts",
"dist/**/*.d.ts.map"
],
"publishConfig": {
"provenance": true,
"exports": {
"./package.json": "./package.json",
".": "./dist/index.js",
"./*": "./dist/*.js"
}
},
"peerDependencies": {
"@bufbuild/protobuf": "^2.10.1",
"@connectrpc/connect": "^2.1.1",
"@connectrpc/connect-node": "^2.1.1",
"effect": "^3.19.8"
},
"devDependencies": {
"@bufbuild/buf": "^1.61.0",
"@bufbuild/protobuf": "^2.10.1",
"@bufbuild/protoc-gen-es": "^2.10.1",
"@connectrpc/connect": "^2.1.1",
"@connectrpc/connect-node": "^2.1.1",
"effect": "^3.19.8"
},
"dependencies": {
"@uwdata/flechette": "^2.2.6",
"apache-arrow": "^21.1.0",
"@edgeandnode/arrow-flight-ipc": "workspace:*"
}
}
190 changes: 190 additions & 0 deletions packages/amp/src/ArrowFlight.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import { create, toBinary } from "@bufbuild/protobuf"
import { anyPack, AnySchema } from "@bufbuild/protobuf/wkt"
import { type Client, createClient, type Transport as ConnectTransport } from "@connectrpc/connect"
import { getMessageType, MessageHeaderType } from "@edgeandnode/arrow-flight-ipc/core/types"
import { decodeRecordBatch } from "@edgeandnode/arrow-flight-ipc/record-batch/decoder"
import { recordBatchToJson } from "@edgeandnode/arrow-flight-ipc/record-batch/json"
import { parseRecordBatch } from "@edgeandnode/arrow-flight-ipc/record-batch/parser"
import { parseSchema } from "@edgeandnode/arrow-flight-ipc/schema/parser"
import type { ArrowSchema } from "@edgeandnode/arrow-flight-ipc/schema/types"
import * as Console from "effect/Console"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Schema from "effect/Schema"
import * as Stream from "effect/Stream"
import { FlightDescriptor_DescriptorType, FlightDescriptorSchema, FlightService } from "./proto/Flight_pb.ts"
import { CommandStatementQuerySchema } from "./proto/FlightSql_pb.ts"

/**
* A service which abstracts the underlying transport for a given client.
*
* A transport implements a protocol, such as Connect or gRPC-web, and allows
* for the concrete clients to be independent of the protocol.
*/
export class Transport extends Context.Tag("@edgeandnode/amp/Transport")<
Transport,
ConnectTransport
>() {}

/**
* Represents the possible errors that can occur when executing an Arrow Flight
* query.
*/
export type ArrowFlightQueryError =
| RpcError
| NoEndpointsError
| MultipleEndpointsError
| TicketNotFoundError

/**
* Represents an Arrow Flight RPC request that failed.
*/
export class RpcError extends Schema.TaggedError<RpcError>(
"@edgeandnode/amp/RpcError"
)("RpcError", {
method: Schema.String,
/**
* The underlying reason for the failed RPC request.
*/
cause: Schema.Defect
}) {}

/**
* Represents an error that occurred as a result of a `FlightInfo` request
* returning an empty list of endpoints from which data can be acquired.
*/
export class NoEndpointsError extends Schema.TaggedError<NoEndpointsError>(
"@edgeandnode/amp/NoEndpointsError"
)("NoEndpointsError", {
/**
* The SQL query that was requested.
*/
query: Schema.String
}) {}

// TODO: determine if this is _really_ a logical error case
/**
* Represents an error that occured as a result of a `FlightInfo` request
* returning multiple endpoints from which data can be acquired.
*
* For Amp queries, there should only ever be **one** authoritative source
* of data.
*/
export class MultipleEndpointsError extends Schema.TaggedError<MultipleEndpointsError>(
"@edgeandnode/amp/MultipleEndpointsError"
)("MultipleEndpointsError", {
/**
* The SQL query that was requested.
*/
query: Schema.String
}) {}

/**
* Represents an error that occurred as a result of a `FlightInfo` request
* whose endpoint did not have a ticket.
*/
export class TicketNotFoundError extends Schema.TaggedError<TicketNotFoundError>(
"@edgeandnode/amp/TicketNotFoundError"
)("TicketNotFoundError", {
/**
* The SQL query that was requested.
*/
query: Schema.String
}) {}

/**
* A service which can be used to execute queries against an Arrow Flight API.
*/
export class ArrowFlight extends Context.Tag("@edgeandnode/amp/ArrowFlight")<ArrowFlight, {
/**
* The Connect `Client` that will be used to execute Arrow Flight queries.
*/
readonly client: Client<typeof FlightService>

readonly query: (query: string) => Effect.Effect<any>
}>() {}

const make = Effect.gen(function*() {
const transport = yield* Transport
const client = createClient(FlightService, transport)

/**
* Execute a SQL query and return a stream of rows.
*/
const request = Effect.fn("ArrowFlight.request")(function*(query: string) {
const cmd = create(CommandStatementQuerySchema, { query })
const any = anyPack(CommandStatementQuerySchema, cmd)
const desc = create(FlightDescriptorSchema, {
type: FlightDescriptor_DescriptorType.CMD,
cmd: toBinary(AnySchema, any)
})

const flightInfo = yield* Effect.tryPromise({
try: (signal) => client.getFlightInfo(desc, { signal }),
catch: (cause) => new RpcError({ cause, method: "getFlightInfo" })
})

if (flightInfo.endpoint.length !== 1) {
return yield* flightInfo.endpoint.length <= 0
? new NoEndpointsError({ query })
: new MultipleEndpointsError({ query })
}

const { ticket } = flightInfo.endpoint[0]!

if (ticket === undefined) {
return yield* new TicketNotFoundError({ query })
}

const flightDataStream = Stream.unwrapScoped(Effect.gen(function*() {
const controller = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()),
(controller) => Effect.sync(() => controller.abort())
)
return Stream.fromAsyncIterable(
client.doGet(ticket, { signal: controller.signal }),
(cause) => new RpcError({ cause, method: "doGet" })
)
}))

let schema: ArrowSchema | undefined

// Convert FlightData stream to a stream of rows
return yield* flightDataStream.pipe(
Stream.runForEach(Effect.fnUntraced(function*(flightData) {
const messageType = yield* getMessageType(flightData)

switch (messageType) {
case MessageHeaderType.SCHEMA: {
schema = yield* parseSchema(flightData)
break
}
case MessageHeaderType.RECORD_BATCH: {
const recordBatch = yield* parseRecordBatch(flightData)
const decodedRecordBatch = decodeRecordBatch(recordBatch, flightData.dataBody, schema!)
const json = recordBatchToJson(decodedRecordBatch)
yield* Console.dir(json, { depth: null, colors: true })
break
}
}

return yield* Effect.void
}))
)
})

return {
client,
/**
* Execute a SQL query and return a stream of rows.
*/
query: request
} as const
})

/**
* A layer which constructs a concrete implementation of an `ArrowFlight`
* service and depends upon some implementation of a `Transport`.
*/
export const layer: Layer.Layer<ArrowFlight, ArrowFlightQueryError, Transport> = Layer.effect(ArrowFlight, make)
9 changes: 9 additions & 0 deletions packages/amp/src/ArrowFlight/Node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { createGrpcTransport, type GrpcTransportOptions } from "@connectrpc/connect-node"
import * as Layer from "effect/Layer"
import { Transport } from "../ArrowFlight.ts"

/**
* Create a `Transport` for the gRPC protocol using the Node.js `http2` module.
*/
export const layerTransportGrpc = (options: GrpcTransportOptions): Layer.Layer<Transport> =>
Layer.sync(Transport, () => createGrpcTransport(options))
7 changes: 4 additions & 3 deletions packages/amp/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Effect from "effect/Effect"

export const program = Effect.void
/**
* An implementation of the Arrow Flight protocol.
*/
export * as ArrowFlight from "./ArrowFlight.ts"
3,632 changes: 3,632 additions & 0 deletions packages/amp/src/proto/FlightSql_pb.ts

Large diffs are not rendered by default.

Loading