Skip to content

Commit c203bda

Browse files
lezzagogoyameghaaarone90ahkcssongkant-aws
authored
[Experimental] Support direct query data sources (#4375)
Co-authored-by: Megha Goyal <[email protected]> Co-authored-by: Aaron Alvarez <[email protected]> Co-authored-by: Kai Huang <[email protected]> Co-authored-by: Songkan Tang <[email protected]> Co-authored-by: Simeon Widdis <[email protected]> Co-authored-by: Chen Dai <[email protected]> Co-authored-by: Jialiang Liang <[email protected]> Co-authored-by: ritvibhatt <[email protected]> Co-authored-by: Aaron Alvarez <[email protected]> Co-authored-by: Vamsi Manohar <[email protected]> Co-authored-by: Tomoyuki MORITA <[email protected]> Co-authored-by: Lantao Jin <[email protected]> Co-authored-by: qianheng <[email protected]> Co-authored-by: Shenoy Pratik <[email protected]> Co-authored-by: Yuanchun Shen <[email protected]> Co-authored-by: Peng Huo <[email protected]> Co-authored-by: Xinyuan Lu <[email protected]> Co-authored-by: Joshua Li <[email protected]>
1 parent 2383684 commit c203bda

File tree

98 files changed

+10475
-489
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+10475
-489
lines changed

async-query-core/src/main/java/org/opensearch/sql/spark/rest/model/LangType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
/** Language type accepted in async query apis. */
99
public enum LangType {
1010
SQL("sql"),
11-
PPL("ppl");
11+
PPL("ppl"),
12+
PROMQL("promql");
1213
private final String text;
1314

1415
LangType(String text) {

direct-query-core/build.gradle

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
plugins {
7+
id 'java-library'
8+
id "io.freefair.lombok"
9+
id 'jacoco'
10+
id 'java-test-fixtures'
11+
}
12+
13+
repositories {
14+
mavenCentral()
15+
}
16+
17+
dependencies {
18+
api project(':core')
19+
implementation project(':datasources')
20+
implementation project(':async-query-core')
21+
22+
// Common dependencies
23+
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
24+
implementation group: 'org.json', name: 'json', version: '20231013'
25+
implementation group: 'commons-io', name: 'commons-io', version: "${commons_io_version}"
26+
27+
// Test dependencies
28+
testImplementation(platform("org.junit:junit-bom:5.9.3"))
29+
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.3'
30+
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
31+
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockito_version}"
32+
testImplementation group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '4.12.0'
33+
34+
testCompileOnly('junit:junit:4.13.1') {
35+
exclude group: 'org.hamcrest', module: 'hamcrest-core'
36+
}
37+
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
38+
exclude group: 'org.hamcrest', module: 'hamcrest-core'
39+
}
40+
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") {
41+
exclude group: 'org.hamcrest', module: 'hamcrest-core'
42+
}
43+
testImplementation("org.opensearch.test:framework:${opensearch_version}")
44+
}
45+
46+
test {
47+
useJUnitPlatform()
48+
testLogging {
49+
events "failed"
50+
exceptionFormat "full"
51+
}
52+
}
53+
task junit4(type: Test) {
54+
useJUnitPlatform {
55+
includeEngines("junit-vintage")
56+
}
57+
systemProperty 'tests.security.manager', 'false'
58+
testLogging {
59+
events "failed"
60+
exceptionFormat "full"
61+
}
62+
}
63+
64+
jacocoTestReport {
65+
dependsOn test, junit4
66+
executionData test, junit4
67+
reports {
68+
html.required = true
69+
xml.required = true
70+
}
71+
afterEvaluate {
72+
classDirectories.setFrom(files(classDirectories.files.collect {
73+
fileTree(dir: it)
74+
}))
75+
}
76+
}
77+
78+
jacocoTestCoverageVerification {
79+
dependsOn test, junit4
80+
executionData test, junit4
81+
violationRules {
82+
rule {
83+
element = 'CLASS'
84+
excludes = [
85+
'org.opensearch.sql.prometheus.model.*',
86+
'org.opensearch.sql.directquery.rest.model.*'
87+
]
88+
limit {
89+
counter = 'LINE'
90+
minimum = 1.0
91+
}
92+
limit {
93+
counter = 'BRANCH'
94+
minimum = 1.0
95+
}
96+
}
97+
}
98+
afterEvaluate {
99+
classDirectories.setFrom(files(classDirectories.files.collect {
100+
fileTree(dir: it)
101+
}))
102+
}
103+
}
104+
check.dependsOn jacocoTestCoverageVerification
105+
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource.client;
7+
8+
/**
9+
* Base interface for all data source clients. This interface serves as a marker interface for all
10+
* client implementations.
11+
*
12+
* @opensearch.experimental
13+
*/
14+
public interface DataSourceClient {}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource.client;
7+
8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import org.opensearch.common.inject.Inject;
11+
import org.opensearch.sql.common.setting.Settings;
12+
import org.opensearch.sql.datasource.DataSourceService;
13+
import org.opensearch.sql.datasource.client.exceptions.DataSourceClientException;
14+
import org.opensearch.sql.datasource.model.DataSourceMetadata;
15+
import org.opensearch.sql.datasource.model.DataSourceType;
16+
import org.opensearch.sql.prometheus.utils.PrometheusClientUtils;
17+
18+
/**
19+
* Factory for creating data source clients based on the data source type.
20+
*
21+
* @opensearch.experimental
22+
*/
23+
public class DataSourceClientFactory {
24+
25+
private static final Logger LOG = LogManager.getLogger();
26+
27+
private final Settings settings;
28+
private final DataSourceService dataSourceService;
29+
30+
@Inject
31+
public DataSourceClientFactory(DataSourceService dataSourceService, Settings settings) {
32+
this.settings = settings;
33+
this.dataSourceService = dataSourceService;
34+
}
35+
36+
/**
37+
* Creates a client for the specified data source with appropriate type.
38+
*
39+
* @param <T> The type of client to create, must implement DataSourceClient
40+
* @param dataSourceName The name of the data source
41+
* @return The appropriate client for the data source type
42+
* @throws DataSourceClientException If client creation fails
43+
*/
44+
@SuppressWarnings("unchecked")
45+
public <T extends DataSourceClient> T createClient(String dataSourceName)
46+
throws DataSourceClientException {
47+
try {
48+
if (!dataSourceService.dataSourceExists(dataSourceName)) {
49+
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
50+
}
51+
52+
DataSourceMetadata metadata =
53+
dataSourceService.verifyDataSourceAccessAndGetRawMetadata(dataSourceName, null);
54+
DataSourceType dataSourceType = metadata.getConnector();
55+
56+
return (T) createClientForType(dataSourceType.name(), metadata);
57+
} catch (Exception e) {
58+
if (e instanceof DataSourceClientException) {
59+
throw e;
60+
}
61+
LOG.error("Failed to create client for data source: " + dataSourceName, e);
62+
throw new DataSourceClientException(
63+
"Failed to create client for data source: " + dataSourceName, e);
64+
}
65+
}
66+
67+
/**
68+
* Gets the data source type for a given data source name.
69+
*
70+
* @param dataSourceName The name of the data source
71+
* @return The type of the data source
72+
* @throws DataSourceClientException If the data source doesn't exist
73+
*/
74+
public DataSourceType getDataSourceType(String dataSourceName) throws DataSourceClientException {
75+
if (!dataSourceService.dataSourceExists(dataSourceName)) {
76+
throw new DataSourceClientException("Data source does not exist: " + dataSourceName);
77+
}
78+
79+
return dataSourceService.getDataSourceMetadata(dataSourceName).getConnector();
80+
}
81+
82+
private DataSourceClient createClientForType(String dataSourceType, DataSourceMetadata metadata)
83+
throws DataSourceClientException {
84+
switch (dataSourceType) {
85+
case "PROMETHEUS":
86+
return PrometheusClientUtils.createPrometheusClient(metadata, settings);
87+
default:
88+
throw new DataSourceClientException("Unsupported data source type: " + dataSourceType);
89+
}
90+
}
91+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource.client.exceptions;
7+
8+
/**
9+
* Exception thrown when there are issues with data source client operations.
10+
*
11+
* @opensearch.experimental
12+
*/
13+
public class DataSourceClientException extends RuntimeException {
14+
15+
public DataSourceClientException(String message) {
16+
super(message);
17+
}
18+
19+
public DataSourceClientException(String message, Throwable cause) {
20+
super(message, cause);
21+
}
22+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource.query;
7+
8+
import java.io.IOException;
9+
import org.opensearch.sql.datasource.client.DataSourceClient;
10+
import org.opensearch.sql.datasource.model.DataSourceType;
11+
import org.opensearch.sql.directquery.rest.model.ExecuteDirectQueryRequest;
12+
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesRequest;
13+
import org.opensearch.sql.directquery.rest.model.GetDirectQueryResourcesResponse;
14+
import org.opensearch.sql.directquery.rest.model.WriteDirectQueryResourcesRequest;
15+
import org.opensearch.sql.directquery.rest.model.WriteDirectQueryResourcesResponse;
16+
17+
/**
18+
* Interface for handling queries for specific data source types.
19+
*
20+
* @param <T> The client type this handler works with, extending DataSourceClient
21+
*
22+
* @opensearch.experimental
23+
*/
24+
public interface QueryHandler<T extends DataSourceClient> {
25+
26+
/**
27+
* Returns the data source type this handler supports.
28+
*
29+
* @return The supported data source type
30+
*/
31+
DataSourceType getSupportedDataSourceType();
32+
33+
/**
34+
* Executes a query for the supported data source type.
35+
*
36+
* @param client The client instance to use
37+
* @param request The query request
38+
* @return JSON string result of the query
39+
* @throws IOException If query execution fails
40+
*/
41+
String executeQuery(T client, ExecuteDirectQueryRequest request) throws IOException;
42+
43+
/**
44+
* Gets resources from the data source.
45+
*
46+
* @param client The client instance to use
47+
* @param request The resources request
48+
* @return Response containing the requested resources
49+
* @throws IOException If resource retrieval fails
50+
*/
51+
GetDirectQueryResourcesResponse<?> getResources(T client, GetDirectQueryResourcesRequest request)
52+
throws IOException;
53+
54+
/**
55+
* Writes resources to the data source.
56+
*
57+
* @param client The client instance to use
58+
* @param request The resources request
59+
* @return Response containing the requested resources
60+
* @throws IOException If resource retrieval fails
61+
*/
62+
WriteDirectQueryResourcesResponse<?> writeResources(T client, WriteDirectQueryResourcesRequest request)
63+
throws IOException;
64+
65+
/**
66+
* Checks if this handler can handle the given client type.
67+
*
68+
* @param client The client to check
69+
* @return true if this handler can handle the client
70+
*/
71+
boolean canHandle(DataSourceClient client);
72+
73+
/**
74+
* Gets the client class this handler supports.
75+
*
76+
* @return The class of client this handler supports
77+
*/
78+
Class<T> getClientClass();
79+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource.query;
7+
8+
import java.util.List;
9+
import java.util.Optional;
10+
import org.opensearch.common.inject.Inject;
11+
import org.opensearch.sql.datasource.client.DataSourceClient;
12+
13+
/**
14+
* Registry for all query handlers.
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public class QueryHandlerRegistry {
19+
20+
private final List<QueryHandler<?>> handlers;
21+
22+
@Inject
23+
public QueryHandlerRegistry(List<QueryHandler<?>> handlers) {
24+
this.handlers = handlers;
25+
}
26+
27+
/**
28+
* Finds a handler that can process the given client.
29+
*
30+
* @param client The client to find a handler for
31+
* @param <T> The type of client, extending DataSourceClient
32+
* @return An optional containing the handler if found
33+
*/
34+
@SuppressWarnings("unchecked")
35+
public <T extends DataSourceClient> Optional<QueryHandler<T>> getQueryHandler(T client) {
36+
return handlers.stream()
37+
.filter(
38+
handler -> {
39+
try {
40+
// Get the handler's client class and check if it's compatible with our client
41+
Class<?> handlerClientClass = handler.getClientClass();
42+
return handlerClientClass.isInstance(client)
43+
&& ((QueryHandler<T>) handler).canHandle(client);
44+
} catch (ClassCastException e) {
45+
return false;
46+
}
47+
})
48+
.map(handler -> (QueryHandler<T>) handler)
49+
.findFirst();
50+
}
51+
}

0 commit comments

Comments
 (0)