Skip to content

Commit 4eac303

Browse files
committed
chore: fixup
1 parent ad6ba65 commit 4eac303

File tree

12 files changed

+331
-260
lines changed

12 files changed

+331
-260
lines changed

packages/app-kit-ui/src/react/hooks/__tests__/use-chart-data.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ describe("useChartData", () => {
205205
);
206206
});
207207

208-
test("auto-selects ARROW by default when no heuristics match", () => {
208+
test("auto-selects JSON by default when no heuristics match", () => {
209209
mockUseAnalyticsQuery.mockReturnValue({
210210
data: [],
211211
loading: false,
@@ -223,11 +223,11 @@ describe("useChartData", () => {
223223
expect(mockUseAnalyticsQuery).toHaveBeenCalledWith(
224224
"test",
225225
{ limit: 100 },
226-
expect.objectContaining({ format: "ARROW" }),
226+
expect.objectContaining({ format: "JSON" }),
227227
);
228228
});
229229

230-
test("defaults to auto format (ARROW) when format is not specified", () => {
230+
test("defaults to auto format (JSON) when format is not specified", () => {
231231
mockUseAnalyticsQuery.mockReturnValue({
232232
data: [],
233233
loading: false,
@@ -243,7 +243,7 @@ describe("useChartData", () => {
243243
expect(mockUseAnalyticsQuery).toHaveBeenCalledWith(
244244
"test",
245245
undefined,
246-
expect.objectContaining({ format: "ARROW" }),
246+
expect.objectContaining({ format: "JSON" }),
247247
);
248248
});
249249
});

packages/app-kit-ui/src/react/hooks/use-analytics-query.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,24 @@ export function useAnalyticsQuery<
126126

127127
// success - Arrow format
128128
if (parsed.type === "arrow") {
129-
const arrowData = await ArrowClient.fetchArrow(
130-
getArrowStreamUrl(parsed.statement_id),
131-
);
132-
const table = await ArrowClient.processArrowBuffer(arrowData);
133-
setLoading(false);
134-
// Table is cast to TypedArrowTable with row type from QueryRegistry
135-
setData(table as ResultType);
136-
return;
129+
try {
130+
const arrowData = await ArrowClient.fetchArrow(
131+
getArrowStreamUrl(parsed.statement_id),
132+
);
133+
const table = await ArrowClient.processArrowBuffer(arrowData);
134+
setLoading(false);
135+
// Table is cast to TypedArrowTable with row type from QueryRegistry
136+
setData(table as ResultType);
137+
return;
138+
} catch (error) {
139+
console.error(
140+
"[useAnalyticsQuery] Failed to fetch Arrow data",
141+
error,
142+
);
143+
setLoading(false);
144+
setError("Unable to load data, please try again");
145+
return;
146+
}
137147
}
138148

139149
// error

packages/app-kit-ui/src/react/hooks/use-chart-data.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,10 @@ function resolveFormat(
7272
return "ARROW";
7373
}
7474

75-
// Default to Arrow for better performance
76-
return "ARROW";
75+
return "JSON";
7776
}
7877

79-
return "ARROW";
78+
return "JSON";
8079
}
8180

8281
// ============================================================================

packages/app-kit/src/analytics/analytics.ts

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import type {
88
import { SQLWarehouseConnector } from "../connectors";
99
import { Plugin, toPlugin } from "../plugin";
1010
import type { Request, Response } from "../utils";
11-
import { getRequestContext } from "../utils";
11+
import { getRequestContext, getWorkspaceClient } from "../utils";
1212
import { queryDefaults } from "./defaults";
1313
import { QueryProcessor } from "./query";
1414
import type {
@@ -41,8 +41,21 @@ export class AnalyticsPlugin extends Plugin {
4141
}
4242

4343
injectRoutes(router: IAppRouter) {
44-
// Inject core Arrow routes first (provides /arrow-result/:jobId endpoint)
45-
this.injectCoreArrowRoutes(router);
44+
this.route(router, {
45+
method: "get",
46+
path: "/arrow-result/:jobId",
47+
handler: async (req: Request, res: Response) => {
48+
await this._handleArrowRoute(req, res);
49+
},
50+
});
51+
52+
this.route(router, {
53+
method: "get",
54+
path: "/users/me/arrow-result/:jobId",
55+
handler: async (req: Request, res: Response) => {
56+
await this._handleArrowRoute(req, res, { asUser: true });
57+
},
58+
});
4659

4760
this.route<AnalyticsQueryResponse>(router, {
4861
method: "post",
@@ -61,17 +74,58 @@ export class AnalyticsPlugin extends Plugin {
6174
});
6275
}
6376

77+
private async _handleArrowRoute(
78+
req: Request,
79+
res: Response,
80+
{ asUser = false }: { asUser?: boolean } = {},
81+
): Promise<void> {
82+
try {
83+
const { jobId } = req.params;
84+
85+
const workspaceClient = getWorkspaceClient(asUser);
86+
87+
console.log(
88+
`Processing Arrow job request: ${jobId} for plugin: ${this.name}`,
89+
);
90+
91+
const result = await this.getArrowData(workspaceClient, jobId);
92+
93+
res.setHeader("Content-Type", "application/octet-stream");
94+
res.setHeader("Content-Length", result.data.length.toString());
95+
res.setHeader("Cache-Control", "public, max-age=3600");
96+
97+
console.log(
98+
`Sending Arrow buffer: ${result.data.length} bytes for job ${jobId}`,
99+
);
100+
res.send(Buffer.from(result.data));
101+
} catch (error) {
102+
console.error(`Arrow job error for ${this.name}:`, error);
103+
res.status(404).json({
104+
error: error instanceof Error ? error.message : "Arrow job not found",
105+
plugin: this.name,
106+
});
107+
}
108+
}
109+
64110
private async _handleQueryRoute(
65111
req: Request,
66112
res: Response,
67113
{ asUser = false }: { asUser?: boolean } = {},
68114
): Promise<void> {
69115
const { query_key } = req.params;
70116
const { parameters, format = "JSON" } = req.body as IAnalyticsQueryRequest;
71-
const formatParameters =
117+
const queryParameters =
72118
format === "ARROW"
73-
? { disposition: "EXTERNAL_LINKS", format: "ARROW_STREAM" }
74-
: {};
119+
? {
120+
formatParameters: {
121+
disposition: "EXTERNAL_LINKS",
122+
format: "ARROW_STREAM",
123+
},
124+
type: "arrow",
125+
}
126+
: {
127+
type: "result",
128+
};
75129

76130
const requestContext = getRequestContext();
77131
const userKey = asUser
@@ -126,17 +180,14 @@ export class AnalyticsPlugin extends Plugin {
126180
const result = await this.query(
127181
query,
128182
processedParams,
129-
formatParameters,
183+
queryParameters.formatParameters,
130184
signal,
131185
{
132186
asUser,
133187
},
134188
);
135189

136-
const type =
137-
formatParameters.format === "ARROW_STREAM" ? "arrow" : "result";
138-
139-
return { type, ...result };
190+
return { type: queryParameters.type, ...result };
140191
},
141192
streamExecutionSettings,
142193
userKey,
@@ -151,21 +202,11 @@ export class AnalyticsPlugin extends Plugin {
151202
{ asUser = false }: { asUser?: boolean } = {},
152203
): Promise<any> {
153204
const requestContext = getRequestContext();
205+
const workspaceClient = getWorkspaceClient(asUser);
206+
154207
const { statement, parameters: sqlParameters } =
155208
this.queryProcessor.convertToSQLParameters(query, parameters);
156209

157-
let workspaceClient: WorkspaceClient;
158-
if (asUser) {
159-
if (!requestContext.userDatabricksClient) {
160-
throw new Error(
161-
`User token passthrough feature is not enabled for workspace ${requestContext.workspaceId}.`,
162-
);
163-
}
164-
workspaceClient = requestContext.userDatabricksClient;
165-
} else {
166-
workspaceClient = requestContext.serviceDatabricksClient;
167-
}
168-
169210
const response = await this.SQLClient.executeStatement(
170211
workspaceClient,
171212
{
@@ -180,11 +221,14 @@ export class AnalyticsPlugin extends Plugin {
180221
return response.result;
181222
}
182223

224+
// If we need arrow stream in more plugins we can define this as a base method in the core plugin class
225+
// and have a generic endpoint for each plugin that consumes this arrow data.
183226
protected async getArrowData(
184227
workspaceClient: WorkspaceClient,
185228
jobId: string,
186-
): Promise<any> {
187-
return await this.SQLClient.getArrowData(workspaceClient, jobId);
229+
signal?: AbortSignal,
230+
): Promise<ReturnType<typeof this.SQLClient.getArrowData>> {
231+
return await this.SQLClient.getArrowData(workspaceClient, jobId, signal);
188232
}
189233

190234
async shutdown(): Promise<void> {

packages/app-kit/src/analytics/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ export interface IAnalyticsConfig extends BasePluginConfig {
44
timeout?: number;
55
}
66

7+
export type AnalyticsFormat = "JSON" | "ARROW";
78
export interface IAnalyticsQueryRequest {
89
parameters?: Record<string, any>;
9-
format?: "JSON" | "ARROW";
10+
format?: AnalyticsFormat;
1011
}
1112

1213
export interface AnalyticsQueryResponse {

packages/app-kit/src/connectors/sql-warehouse/client.ts

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ export class SQLWarehouseConnector {
6363
private get arrowProcessor(): ArrowStreamProcessor {
6464
if (!this._arrowProcessor) {
6565
this._arrowProcessor = new ArrowStreamProcessor({
66-
maxConcurrentDownloads: 5,
6766
timeout: this.config.timeout || executeStatementDefaults.timeout,
68-
retries: 3,
67+
maxConcurrentDownloads:
68+
ArrowStreamProcessor.DEFAULT_MAX_CONCURRENT_DOWNLOADS,
69+
retries: ArrowStreamProcessor.DEFAULT_RETRIES,
6970
});
7071
}
7172
return this._arrowProcessor;
@@ -403,25 +404,68 @@ export class SQLWarehouseConnector {
403404
jobId: string,
404405
signal?: AbortSignal,
405406
): Promise<ReturnType<typeof this.arrowProcessor.processChunks>> {
406-
try {
407-
const response = await workspaceClient.statementExecution.getStatement(
408-
{
409-
statement_id: jobId,
407+
const startTime = Date.now();
408+
409+
return this.telemetry.startActiveSpan(
410+
"arrow.getData",
411+
{
412+
kind: SpanKind.CLIENT,
413+
attributes: {
414+
"db.system": "databricks",
415+
"arrow.job_id": jobId,
410416
},
411-
this._createContext(signal),
412-
);
413-
const chunks = response.result?.external_links;
414-
const schema = response.manifest?.schema;
415-
416-
if (!chunks || !schema) {
417-
throw new Error("No chunks or schema found in response");
418-
}
419-
420-
return await this.arrowProcessor.processChunks(chunks, schema);
421-
} catch (error) {
422-
console.error(`Failed Arrow job: ${jobId}`, error);
423-
throw error;
424-
}
417+
},
418+
async (span: Span) => {
419+
try {
420+
const response =
421+
await workspaceClient.statementExecution.getStatement(
422+
{ statement_id: jobId },
423+
this._createContext(signal),
424+
);
425+
426+
const chunks = response.result?.external_links;
427+
const schema = response.manifest?.schema;
428+
429+
if (!chunks || !schema) {
430+
throw new Error("No chunks or schema found in response");
431+
}
432+
433+
span.setAttribute("arrow.chunk_count", chunks.length);
434+
435+
const result = await this.arrowProcessor.processChunks(
436+
chunks,
437+
schema,
438+
signal,
439+
);
440+
441+
span.setAttribute("arrow.data_size_bytes", result.data.length);
442+
span.setStatus({ code: SpanStatusCode.OK });
443+
444+
const duration = Date.now() - startTime;
445+
this.telemetryMetrics.queryDuration.record(duration, {
446+
operation: "arrow.getData",
447+
status: "success",
448+
});
449+
450+
return result;
451+
} catch (error) {
452+
span.setStatus({
453+
code: SpanStatusCode.ERROR,
454+
message: error instanceof Error ? error.message : "Unknown error",
455+
});
456+
span.recordException(error as Error);
457+
458+
const duration = Date.now() - startTime;
459+
this.telemetryMetrics.queryDuration.record(duration, {
460+
operation: "arrow.getData",
461+
status: "error",
462+
});
463+
464+
console.error(`Failed Arrow job: ${jobId}`, error);
465+
throw error;
466+
}
467+
},
468+
);
425469
}
426470

427471
// create context for cancellation token

0 commit comments

Comments
 (0)