@@ -8,7 +8,7 @@ import type {
88import { SQLWarehouseConnector } from "../connectors" ;
99import { Plugin , toPlugin } from "../plugin" ;
1010import type { Request , Response } from "../utils" ;
11- import { getRequestContext } from "../utils" ;
11+ import { getRequestContext , getWorkspaceClient } from "../utils" ;
1212import { queryDefaults } from "./defaults" ;
1313import { QueryProcessor } from "./query" ;
1414import type {
@@ -41,8 +41,21 @@ export class AnalyticsPlugin extends Plugin {
4141 }
4242
4343 injectRoutes ( router : IAppRouter ) {
44- // Inject core Arrow routes first (provides /arrow-result/:jobId endpoint)
45- this . injectCoreArrowRoutes ( router ) ;
44+ this . route ( router , {
45+ method : "get" ,
46+ path : "/arrow-result/:jobId" ,
47+ handler : async ( req : Request , res : Response ) => {
48+ await this . _handleArrowRoute ( req , res ) ;
49+ } ,
50+ } ) ;
51+
52+ this . route ( router , {
53+ method : "get" ,
54+ path : "/users/me/arrow-result/:jobId" ,
55+ handler : async ( req : Request , res : Response ) => {
56+ await this . _handleArrowRoute ( req , res , { asUser : true } ) ;
57+ } ,
58+ } ) ;
4659
4760 this . route < AnalyticsQueryResponse > ( router , {
4861 method : "post" ,
@@ -61,17 +74,58 @@ export class AnalyticsPlugin extends Plugin {
6174 } ) ;
6275 }
6376
77+ private async _handleArrowRoute (
78+ req : Request ,
79+ res : Response ,
80+ { asUser = false } : { asUser ?: boolean } = { } ,
81+ ) : Promise < void > {
82+ try {
83+ const { jobId } = req . params ;
84+
85+ const workspaceClient = getWorkspaceClient ( asUser ) ;
86+
87+ console . log (
88+ `Processing Arrow job request: ${ jobId } for plugin: ${ this . name } ` ,
89+ ) ;
90+
91+ const result = await this . getArrowData ( workspaceClient , jobId ) ;
92+
93+ res . setHeader ( "Content-Type" , "application/octet-stream" ) ;
94+ res . setHeader ( "Content-Length" , result . data . length . toString ( ) ) ;
95+ res . setHeader ( "Cache-Control" , "public, max-age=3600" ) ;
96+
97+ console . log (
98+ `Sending Arrow buffer: ${ result . data . length } bytes for job ${ jobId } ` ,
99+ ) ;
100+ res . send ( Buffer . from ( result . data ) ) ;
101+ } catch ( error ) {
102+ console . error ( `Arrow job error for ${ this . name } :` , error ) ;
103+ res . status ( 404 ) . json ( {
104+ error : error instanceof Error ? error . message : "Arrow job not found" ,
105+ plugin : this . name ,
106+ } ) ;
107+ }
108+ }
109+
64110 private async _handleQueryRoute (
65111 req : Request ,
66112 res : Response ,
67113 { asUser = false } : { asUser ?: boolean } = { } ,
68114 ) : Promise < void > {
69115 const { query_key } = req . params ;
70116 const { parameters, format = "JSON" } = req . body as IAnalyticsQueryRequest ;
71- const formatParameters =
117+ const queryParameters =
72118 format === "ARROW"
73- ? { disposition : "EXTERNAL_LINKS" , format : "ARROW_STREAM" }
74- : { } ;
119+ ? {
120+ formatParameters : {
121+ disposition : "EXTERNAL_LINKS" ,
122+ format : "ARROW_STREAM" ,
123+ } ,
124+ type : "arrow" ,
125+ }
126+ : {
127+ type : "result" ,
128+ } ;
75129
76130 const requestContext = getRequestContext ( ) ;
77131 const userKey = asUser
@@ -126,17 +180,14 @@ export class AnalyticsPlugin extends Plugin {
126180 const result = await this . query (
127181 query ,
128182 processedParams ,
129- formatParameters ,
183+ queryParameters . formatParameters ,
130184 signal ,
131185 {
132186 asUser,
133187 } ,
134188 ) ;
135189
136- const type =
137- formatParameters . format === "ARROW_STREAM" ? "arrow" : "result" ;
138-
139- return { type, ...result } ;
190+ return { type : queryParameters . type , ...result } ;
140191 } ,
141192 streamExecutionSettings ,
142193 userKey ,
@@ -151,21 +202,11 @@ export class AnalyticsPlugin extends Plugin {
151202 { asUser = false } : { asUser ?: boolean } = { } ,
152203 ) : Promise < any > {
153204 const requestContext = getRequestContext ( ) ;
205+ const workspaceClient = getWorkspaceClient ( asUser ) ;
206+
154207 const { statement, parameters : sqlParameters } =
155208 this . queryProcessor . convertToSQLParameters ( query , parameters ) ;
156209
157- let workspaceClient : WorkspaceClient ;
158- if ( asUser ) {
159- if ( ! requestContext . userDatabricksClient ) {
160- throw new Error (
161- `User token passthrough feature is not enabled for workspace ${ requestContext . workspaceId } .` ,
162- ) ;
163- }
164- workspaceClient = requestContext . userDatabricksClient ;
165- } else {
166- workspaceClient = requestContext . serviceDatabricksClient ;
167- }
168-
169210 const response = await this . SQLClient . executeStatement (
170211 workspaceClient ,
171212 {
@@ -180,11 +221,14 @@ export class AnalyticsPlugin extends Plugin {
180221 return response . result ;
181222 }
182223
224+ // If we need arrow stream in more plugins we can define this as a base method in the core plugin class
225+ // and have a generic endpoint for each plugin that consumes this arrow data.
183226 protected async getArrowData (
184227 workspaceClient : WorkspaceClient ,
185228 jobId : string ,
186- ) : Promise < any > {
187- return await this . SQLClient . getArrowData ( workspaceClient , jobId ) ;
229+ signal ?: AbortSignal ,
230+ ) : Promise < ReturnType < typeof this . SQLClient . getArrowData > > {
231+ return await this . SQLClient . getArrowData ( workspaceClient , jobId , signal ) ;
188232 }
189233
190234 async shutdown ( ) : Promise < void > {
0 commit comments