Skip to content

Commit 0e8a76a

Browse files
authored
feat(cubesql): Whole SQL query push down to data sources (#6629)
* feat(cubesql): Whole SQL query push down to data sources * Fix clippy * Fix clippy * Fix cost function * Add sqlGenerators mock * Fix tests * SQL Generation structure * Fix tests * SQL transport API * Query sent to load method but member re-projection is required * Simple querying works * Move to minijinja * Clippy and formatter * Linter * Update chrono * Restructure templates * Simple projection rules * Introduce SQL push down flag and make sure rules still work with it disabled * Fix projection positive feedback loop and implement coalesce function as an example of function pushdown * Parameters implementation * Missing jsdoc param * SQL API load support * SQL API load streaming support: fix unit tests * SQL API load streaming support: fix driver tests * SQL API load streaming support: update mssql snapshot * Revert "SQL API load streaming support: update mssql snapshot" This reverts commit 91ed17e. * SQL API load streaming support: update schema * Bump rust toolchain to address tarpaulin issue
1 parent baabecd commit 0e8a76a

File tree

45 files changed

+4243
-760
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+4243
-760
lines changed

.github/workflows/rust-cubesql.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
- name: Install Rust
6262
uses: actions-rs/toolchain@v1
6363
with:
64-
toolchain: nightly-2022-03-22
64+
toolchain: nightly-2022-06-22
6565
override: true
6666
components: rustfmt
6767
- uses: Swatinem/rust-cache@v2
@@ -80,6 +80,7 @@ jobs:
8080
CUBESQL_TESTING_CUBE_TOKEN: ${{ secrets.CUBESQL_TESTING_CUBE_TOKEN }}
8181
CUBESQL_TESTING_CUBE_URL: ${{ secrets.CUBESQL_TESTING_CUBE_URL }}
8282
CUBESQL_REWRITE_ENGINE: true
83+
CUBESQL_SQL_PUSH_DOWN: true
8384
CUBESQL_REWRITE_TIMEOUT: 60
8485
run: cd rust/cubesql && cargo tarpaulin --workspace --no-fail-fast --avoid-cfg-tarpaulin --out Xml
8586
- name: Upload code coverage

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 186 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import {
3838
PreAggJob,
3939
PreAggJobStatusItem,
4040
PreAggJobStatusObject,
41-
PreAggJobStatusResponse,
41+
PreAggJobStatusResponse, SqlApiRequest,
4242
} from './types/request';
4343
import {
4444
CheckAuthInternalOptions,
@@ -1169,7 +1169,7 @@ class ApiGateway {
11691169
return [queryType, normalizedQueries];
11701170
}
11711171

1172-
public async sql({ query, context, res }: QueryRequest) {
1172+
public async sql({ query, context, res, memberToAlias, exportAnnotatedSql }: QueryRequest) {
11731173
const requestStarted = new Date();
11741174

11751175
try {
@@ -1180,8 +1180,11 @@ class ApiGateway {
11801180

11811181
const sqlQueries = await Promise.all<any>(
11821182
normalizedQueries.map((normalizedQuery) => this.getCompilerApi(context).getSql(
1183-
this.coerceForSqlQuery(normalizedQuery, context),
1184-
{ includeDebugInfo: getEnv('devMode') || context.signedWithPlaygroundAuthSecret }
1183+
this.coerceForSqlQuery({ ...normalizedQuery, memberToAlias }, context),
1184+
{
1185+
includeDebugInfo: getEnv('devMode') || context.signedWithPlaygroundAuthSecret,
1186+
exportAnnotatedSql,
1187+
}
11851188
))
11861189
);
11871190

@@ -1200,6 +1203,30 @@ class ApiGateway {
12001203
}
12011204
}
12021205

1206+
public async sqlGenerators({ context, res }: { context: RequestContext, res: ResponseResultFn }) {
1207+
const requestStarted = new Date();
1208+
1209+
try {
1210+
const compilerApi = this.getCompilerApi(context);
1211+
const query = {
1212+
requestId: context.requestId,
1213+
};
1214+
const cubeNameToDataSource = await compilerApi.cubeNameToDataSource(query);
1215+
1216+
let dataSources = Object.keys(cubeNameToDataSource).map(c => cubeNameToDataSource[c]);
1217+
dataSources = [...new Set(dataSources)];
1218+
const dataSourceToSqlGenerator = (await Promise.all(
1219+
dataSources.map(async dataSource => ({ [dataSource]: (await compilerApi.getSqlGenerator(query, dataSource)).sqlGenerator }))
1220+
)).reduce((a, b) => ({ ...a, ...b }), {});
1221+
1222+
res({ cubeNameToDataSource, dataSourceToSqlGenerator });
1223+
} catch (e) {
1224+
this.handleError({
1225+
e, context, res, requestStarted
1226+
});
1227+
}
1228+
}
1229+
12031230
protected createSecurityContextExtractor(options?: JWTOptions): SecurityContextExtractorFn {
12041231
if (options?.claimsNamespace) {
12051232
return (ctx: Readonly<RequestContext>) => {
@@ -1609,6 +1636,161 @@ class ApiGateway {
16091636
}
16101637
}
16111638

1639+
public async sqlApiLoad(request: SqlApiRequest) {
1640+
let query: Query | Query[] | null = null;
1641+
const {
1642+
context,
1643+
res,
1644+
apiType = 'sql',
1645+
} = request;
1646+
const requestStarted = new Date();
1647+
1648+
try {
1649+
await this.assertApiScope('data', context.securityContext);
1650+
1651+
query = this.parseQueryParam(request.query);
1652+
let resType: ResultType = ResultType.DEFAULT;
1653+
1654+
if (!Array.isArray(query) && query.responseFormat) {
1655+
resType = query.responseFormat;
1656+
}
1657+
1658+
this.log({
1659+
type: 'Load Request',
1660+
query,
1661+
streaming: request.streaming,
1662+
}, context);
1663+
1664+
const [queryType, normalizedQueries] =
1665+
await this.getNormalizedQueries(query, context);
1666+
1667+
let metaConfigResult = await this
1668+
.getCompilerApi(context).metaConfig({
1669+
requestId: context.requestId
1670+
});
1671+
1672+
metaConfigResult = this.filterVisibleItemsInMeta(context, metaConfigResult);
1673+
1674+
const sqlQueries = await this
1675+
.getSqlQueriesInternal(context, normalizedQueries);
1676+
1677+
let results;
1678+
1679+
let slowQuery = false;
1680+
1681+
const streamResponse = async (sqlQuery) => {
1682+
const q = {
1683+
...sqlQuery,
1684+
query: sqlQuery.query || sqlQuery.sql[0],
1685+
values: sqlQuery.values || sqlQuery.sql[1],
1686+
continueWait: true,
1687+
renewQuery: false,
1688+
requestId: context.requestId,
1689+
context,
1690+
persistent: true,
1691+
forceNoCache: true,
1692+
};
1693+
return {
1694+
stream: await this.getAdapterApi(context).streamQuery(q),
1695+
};
1696+
};
1697+
1698+
if (request.sqlQuery) {
1699+
const finalQuery = {
1700+
query: request.sqlQuery[0],
1701+
values: request.sqlQuery[1],
1702+
continueWait: true,
1703+
renewQuery: normalizedQueries[0].renewQuery,
1704+
requestId: context.requestId,
1705+
context,
1706+
...sqlQueries[0],
1707+
// TODO Can we just pass through data? Ensure hidden members can't be queried
1708+
aliasNameToMember: null,
1709+
};
1710+
1711+
if (request.streaming) {
1712+
results = [await streamResponse(finalQuery)];
1713+
} else {
1714+
const response = await this
1715+
.getAdapterApi(context)
1716+
.executeQuery(finalQuery);
1717+
1718+
const annotation = prepareAnnotation(
1719+
metaConfigResult, normalizedQueries[0]
1720+
);
1721+
1722+
// TODO Can we just pass through data? Ensure hidden members can't be queried
1723+
results = [{
1724+
data: response.data,
1725+
annotation
1726+
}];
1727+
}
1728+
} else {
1729+
results = await Promise.all(
1730+
normalizedQueries.map(async (normalizedQuery, index) => {
1731+
slowQuery = slowQuery ||
1732+
Boolean(sqlQueries[index].slowQuery);
1733+
1734+
const annotation = prepareAnnotation(
1735+
metaConfigResult, normalizedQuery
1736+
);
1737+
1738+
if (request.streaming) {
1739+
return streamResponse(sqlQueries[index]);
1740+
}
1741+
1742+
const response = await this.getSqlResponseInternal(
1743+
context,
1744+
normalizedQuery,
1745+
sqlQueries[index],
1746+
);
1747+
1748+
return this.getResultInternal(
1749+
context,
1750+
queryType,
1751+
normalizedQuery,
1752+
sqlQueries[index],
1753+
annotation,
1754+
response,
1755+
resType,
1756+
);
1757+
})
1758+
);
1759+
}
1760+
1761+
this.log(
1762+
{
1763+
type: 'Load Request Success',
1764+
query,
1765+
duration: this.duration(requestStarted),
1766+
apiType,
1767+
isPlayground: Boolean(
1768+
context.signedWithPlaygroundAuthSecret
1769+
),
1770+
queries: results.length,
1771+
queriesWithPreAggregations:
1772+
results.filter(
1773+
(r: any) => Object.keys(
1774+
r.usedPreAggregations || {}
1775+
).length
1776+
).length,
1777+
queriesWithData:
1778+
results.filter((r: any) => r.data?.length).length,
1779+
dbType: results.map(r => r.dbType),
1780+
},
1781+
context,
1782+
);
1783+
1784+
res(request.streaming ? results[0] : {
1785+
results,
1786+
});
1787+
} catch (e) {
1788+
this.handleError({
1789+
e, context, query, res, requestStarted
1790+
});
1791+
}
1792+
}
1793+
16121794
public subscribeQueueEvents({ context, signedWithPlaygroundAuthSecret, connectionId, res }) {
16131795
if (this.enforceSecurityChecks && !signedWithPlaygroundAuthSecret) {
16141796
throw new CubejsHandlerError(

packages/cubejs-api-gateway/src/helpers/transformData.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,6 @@ function getCompactRow(
204204
getDateRangeValue(timeDimensions)
205205
);
206206
} else if (queryType === QueryTypeEnum.BLENDING_QUERY) {
207-
console.log(getBlendingResponseKey(timeDimensions));
208-
console.log(dbRow[
209-
getBlendingResponseKey(timeDimensions)
210-
]);
211207
row.push(
212208
dbRow[
213209
membersToAliasMap[

packages/cubejs-api-gateway/src/sql-server.ts

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,24 @@ export class SQLServer {
4545
const canSwitchSqlUser: CanSwitchSQLUserFn = options.canSwitchSqlUser
4646
|| this.createDefaultCanSwitchSqlUserFn(options);
4747

48+
const contextByRequest = async (request, session) => {
49+
let userForContext = session.user;
50+
51+
if (request.meta.changeUser && request.meta.changeUser !== session.user) {
52+
const canSwitch = session.superuser || await canSwitchSqlUser(session.user, request.meta.changeUser);
53+
if (canSwitch) {
54+
userForContext = request.meta.changeUser;
55+
} else {
56+
throw new Error(
57+
`You cannot change security context via __user from ${session.user} to ${request.meta.changeUser}, because it's not allowed.`
58+
);
59+
}
60+
}
61+
// @todo Store security context in native for session's user, but not for switching
62+
const current = await checkSqlAuth(request, userForContext);
63+
return this.contextByNativeReq(request, current.securityContext, request.id);
64+
};
65+
4866
this.sqlInterfaceInstance = await registerInterface({
4967
port: options.sqlPort,
5068
pgPort: options.pgSqlPort,
@@ -78,22 +96,7 @@ export class SQLServer {
7896
});
7997
},
8098
load: async ({ request, session, query }) => {
81-
let userForContext = session.user;
82-
83-
if (request.meta.changeUser && request.meta.changeUser !== session.user) {
84-
const canSwitch = session.superuser || await canSwitchSqlUser(session.user, request.meta.changeUser);
85-
if (canSwitch) {
86-
userForContext = request.meta.changeUser;
87-
} else {
88-
throw new Error(
89-
`You cannot change security context via __user from ${session.user} to ${request.meta.changeUser}, because it's not allowed.`
90-
);
91-
}
92-
}
93-
94-
// @todo Store security context in native for session's user, but not for switching
95-
const current = await checkSqlAuth(request, userForContext);
96-
const context = await this.contextByNativeReq(request, current.securityContext, request.id);
99+
const context = await contextByRequest(request, session);
97100

98101
// eslint-disable-next-line no-async-promise-executor
99102
return new Promise(async (resolve, reject) => {
@@ -112,23 +115,51 @@ export class SQLServer {
112115
}
113116
});
114117
},
115-
stream: async ({ request, session, query }) => {
116-
let userForContext = session.user;
117-
118-
if (request.meta.changeUser && request.meta.changeUser !== session.user) {
119-
const canSwitch = session.superuser || await canSwitchSqlUser(session.user, request.meta.changeUser);
120-
if (canSwitch) {
121-
userForContext = request.meta.changeUser;
122-
} else {
123-
throw new Error(
124-
`You cannot change security context via __user from ${session.user} to ${request.meta.changeUser}, because it's not allowed.`
125-
);
118+
sqlApiLoad: async ({ request, session, query, sqlQuery, streaming }) => {
119+
const context = await contextByRequest(request, session);
120+
121+
// eslint-disable-next-line no-async-promise-executor
122+
return new Promise(async (resolve, reject) => {
123+
try {
124+
await this.apiGateway.sqlApiLoad({
125+
query,
126+
sqlQuery,
127+
streaming,
128+
context,
129+
res: (message) => {
130+
resolve(message);
131+
},
132+
apiType: 'sql',
133+
});
134+
} catch (e) {
135+
reject(e);
126136
}
127-
}
137+
});
138+
},
139+
sql: async ({ request, session, query, memberToAlias }) => {
140+
const context = await contextByRequest(request, session);
128141

129-
// @todo Store security context in native for session's user, but not for switching
130-
const current = await checkSqlAuth(request, userForContext);
131-
const context = await this.contextByNativeReq(request, current.securityContext, request.id);
142+
// eslint-disable-next-line no-async-promise-executor
143+
return new Promise(async (resolve, reject) => {
144+
try {
145+
await this.apiGateway.sql({
146+
query,
147+
memberToAlias,
148+
exportAnnotatedSql: true,
149+
queryType: 'multi',
150+
context,
151+
res: (message) => {
152+
resolve(message);
153+
},
154+
apiType: 'sql',
155+
});
156+
} catch (e) {
157+
reject(e);
158+
}
159+
});
160+
},
161+
stream: async ({ request, session, query }) => {
162+
const context = await contextByRequest(request, session);
132163

133164
// eslint-disable-next-line no-async-promise-executor
134165
return new Promise(async (resolve, reject) => {
@@ -139,6 +170,27 @@ export class SQLServer {
139170
}
140171
});
141172
},
173+
sqlGenerators: async (paramsJson: string) => {
174+
// TODO get rid of it
175+
const { request, session } = JSON.parse(paramsJson);
176+
// @todo Store security context in native
177+
const { securityContext } = await checkSqlAuth(request, session.user);
178+
const context = await this.apiGateway.contextByReq(<any> request, securityContext, request.id);
179+
180+
// eslint-disable-next-line no-async-promise-executor
181+
return new Promise(async (resolve, reject) => {
182+
try {
183+
await this.apiGateway.sqlGenerators({
184+
context,
185+
res: (queries) => {
186+
resolve(queries);
187+
},
188+
});
189+
} catch (e) {
190+
reject(e);
191+
}
192+
});
193+
},
142194
});
143195
}
144196

0 commit comments

Comments
 (0)