Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -247,6 +246,7 @@ public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName
}

/**
* Fetch schema for actual value type
* @param tableName ServiceNow table name for which schema is getting fetched
* @param collector FailureCollector
* @return schema for given ServiceNow table
Expand All @@ -255,7 +255,7 @@ public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName
public Schema fetchTableSchema(String tableName, FailureCollector collector) {
Schema schema = null;
try {
schema = fetchTableSchema(tableName);
schema = fetchTableSchema(tableName, SourceValueType.SHOW_ACTUAL_VALUE);
} catch (Exception e) {
LOG.error("Failed to fetch schema on table {}", tableName, e);
collector.addFailure(String.format("Connection failed. Unable to fetch schema for table: %s. Cause: %s",
Expand All @@ -276,35 +276,42 @@ public SchemaResponse parseSchemaResponse(String responseBody) {
* @return schema for given ServiceNow table
* @throws ServiceNowAPIException
*/
public Schema fetchTableSchema(String tableName)
public Schema fetchTableSchema(String tableName, SourceValueType valueType)
throws ServiceNowAPIException {
return fetchTableSchema(tableName, getAccessToken());
return fetchTableSchema(tableName, getAccessToken(), valueType);
}

/**
* Fetches the table schema from ServiceNow
*
* @param tableName ServiceNow table name for which schema is getting fetched
* @param accessToken Access Token to use
* @param valueType Type of value (Actual/Display)
* @return schema for given ServiceNow table
*/
public Schema fetchTableSchema(String tableName, String accessToken)
public Schema fetchTableSchema(String tableName, String accessToken, SourceValueType valueType)
throws ServiceNowAPIException {
ServiceNowTableAPIRequestBuilder requestBuilder = new ServiceNowTableAPIRequestBuilder(
this.conf.getRestApiEndpoint(), tableName, true)
.setExcludeReferenceLink(true);

RestAPIResponse apiResponse;
RestAPIResponse restAPIResponse;
requestBuilder.setAuthHeader(accessToken);
apiResponse = executeGetWithRetries(requestBuilder.build());
SchemaResponse response = parseSchemaResponse(apiResponse.getResponseBody());
restAPIResponse = executeGetWithRetries(requestBuilder.build());
SchemaResponse schemaResponse = parseSchemaResponse(restAPIResponse.getResponseBody());
List<ServiceNowColumn> columns = new ArrayList<>();

if (response.getResult() == null && response.getResult().isEmpty()) {
if (schemaResponse.getResult() == null && schemaResponse.getResult().getColumns().isEmpty()) {
throw new RuntimeException("Error - Schema Response does not contain any result");
}
for (ServiceNowSchemaField field : response.getResult()) {
columns.add(new ServiceNowColumn(field.getName(), field.getInternalType()));

for (ServiceNowSchemaField field : schemaResponse.getResult().getColumns().values()) {
if (valueType.equals(SourceValueType.SHOW_DISPLAY_VALUE) &&
!Objects.equals(field.getType(), field.getInternalType())) {
columns.add(new ServiceNowColumn(field.getName(), field.getType()));
} else {
columns.add(new ServiceNowColumn(field.getName(), field.getInternalType()));
}
}
return SchemaBuilder.constructSchema(tableName, columns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ public class ServiceNowTableAPIRequestBuilder extends RestAPIRequest.Builder {
*/
private static final String SCHEMA_API_URL_TEMPLATE = "%s/api/now/doc/table/schema/%s";

/**
* ServiceNow API URL to fetch column metadata
*/
private static final String METADATA_API_URL_TEMPLATE = "%s/api/now/ui/meta/%s";

public ServiceNowTableAPIRequestBuilder(String instanceBaseUrl, String tableName, boolean isSchemaRequired) {
if (isSchemaRequired) {
this.setUrl(String.format(SCHEMA_API_URL_TEMPLATE, instanceBaseUrl, tableName));
this.setUrl(String.format(METADATA_API_URL_TEMPLATE, instanceBaseUrl, tableName));
} else {
this.setUrl(String.format(TABLE_API_URL_TEMPLATE, instanceBaseUrl, tableName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.cdap.plugin.servicenow.connector;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
Expand Down Expand Up @@ -206,8 +205,14 @@ private List<StructuredRecord> getTableData(String tableName, int limit)
@Nullable
private Schema getSchema(String tableName) {
SourceQueryMode mode = SourceQueryMode.TABLE;
List<ServiceNowTableInfo> tableInfo = ServiceNowInputFormat.fetchTableInfo(mode, config, tableName,
null);
// Use display type schema as connector shows a limited number of values
// and display value type provides easy to read values
List<ServiceNowTableInfo> tableInfo = ServiceNowInputFormat.fetchTableInfo(
mode,
config,
tableName,
null,
SourceValueType.SHOW_DISPLAY_VALUE);
Schema schema = tableInfo.stream().findFirst().isPresent() ? tableInfo.stream().findFirst().get().getSchema() :
null;
return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@

package io.cdap.plugin.servicenow.sink.model;

import java.util.List;

/**
* Model class for Schema Response from Schema API
*/
public class SchemaResponse {

private final List<ServiceNowSchemaField> result;
private final ServiceNowSchemaResult result;

public SchemaResponse(List<ServiceNowSchemaField> result) {
public SchemaResponse(ServiceNowSchemaResult result) {
this.result = result;
}

public List<ServiceNowSchemaField> getResult() {
public ServiceNowSchemaResult getResult() {
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,38 @@

package io.cdap.plugin.servicenow.sink.model;

import com.google.gson.annotations.SerializedName;

/**
* Model class for Schema Field from Schema API
*/
public class ServiceNowSchemaField {
private final String label;
private final String exampleValue;
@SerializedName("internal_type")
private final String internalType;
private final String name;
private final String type;

public ServiceNowSchemaField(String label, String exampleValue, String internalType, String name) {
public ServiceNowSchemaField(String label, String internalType, String name, String type) {
this.label = label;
this.exampleValue = exampleValue;
this.internalType = internalType;
this.name = name;
this.type = type;
}

public String getLabel() {
return label;
}

public String getExampleValue() {
return exampleValue;
}

public String getInternalType() {
return internalType;
}

public String getName() {
return name;
}

public String getType() {
return type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.servicenow.sink.model;

import java.util.Map;

/**
* Model class for Schema Result returned by the ServiceNow Column Schema API.
*
* <p>The {@code columns} map contains metadata for each column in the ServiceNow table.
* The key of the map is the column's internal name (as used in the table schema),
* and the value is a {@link ServiceNowSchemaField} object containing the details for that column.
*
* <p>Example JSON from ServiceNow:
* <pre>
* {
* "result": {
* "columns": {
* "state": {
* "label": "State",
* "type": "string",
* "internal_type": "integer",
* "name": "state"
* },
* "active": {
* "label": "Active",
* "type": "boolean",
* "internal_type": "boolean",
* "name": "active"
* }
* }
* }
* }
* </pre>
*
* In this example, the map will contain keys like {@code "state"} and {@code "active"},
* each pointing to a {@code ServiceNowSchemaField} instance with metadata about that field.
*/
public class ServiceNowSchemaResult {
private final Map<String, ServiceNowSchemaField> columns;

public ServiceNowSchemaResult(Map<String, ServiceNowSchemaField> columns) {
this.columns = columns;
}

public Map<String, ServiceNowSchemaField> getColumns() {
return columns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@
import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException;
import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl;
import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig;
import io.cdap.plugin.servicenow.util.ServiceNowConstants;
import io.cdap.plugin.servicenow.util.ServiceNowTableInfo;
import io.cdap.plugin.servicenow.util.SourceApplication;
import io.cdap.plugin.servicenow.util.SourceQueryMode;
import io.cdap.plugin.servicenow.util.SourceValueType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.oltu.oauth2.common.exception.OAuthProblemException;
import org.apache.oltu.oauth2.common.exception.OAuthSystemException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,18 +63,19 @@ public static List<ServiceNowTableInfo> setInput(Configuration jobConfig, Source
// Depending on conf value fetch the list of fields for each table and create schema object
// return the schema object for each table as ServiceNowTableInfo
List<ServiceNowTableInfo> tableInfos = fetchTableInfo(mode, conf.getConnection(), conf.getTableName(),
conf.getApplicationName());
conf.getApplicationName(), conf.getValueType());
jobConf.setTableInfos(tableInfos);

return tableInfos;
}

public static List<ServiceNowTableInfo> fetchTableInfo(SourceQueryMode mode, ServiceNowConnectorConfig conf,
@Nullable String tableName,
@Nullable SourceApplication application) {
@Nullable SourceApplication application,
@Nullable SourceValueType valueType) {
// When mode = Table, fetch details from the table name provided in plugin config
if (mode == SourceQueryMode.TABLE) {
ServiceNowTableInfo tableInfo = getTableMetaData(tableName, conf);
ServiceNowTableInfo tableInfo = getTableMetaData(tableName, conf, valueType);
return (tableInfo == null) ? Collections.emptyList() : Collections.singletonList(tableInfo);
}

Expand All @@ -86,7 +85,7 @@ public static List<ServiceNowTableInfo> fetchTableInfo(SourceQueryMode mode, Ser

List<String> tableNames = application.getTableNames();
for (String table : tableNames) {
ServiceNowTableInfo tableInfo = getTableMetaData(table, conf);
ServiceNowTableInfo tableInfo = getTableMetaData(table, conf, valueType);
if (tableInfo == null) {
continue;
}
Expand All @@ -96,14 +95,16 @@ public static List<ServiceNowTableInfo> fetchTableInfo(SourceQueryMode mode, Ser
return tableInfos;
}

private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNowConnectorConfig conf) {
private static ServiceNowTableInfo getTableMetaData(String tableName,
ServiceNowConnectorConfig conf,
SourceValueType valueType) {
// Call API to fetch first record from the table
ServiceNowTableAPIClientImpl restApi = new ServiceNowTableAPIClientImpl(conf);

Schema schema = null;
int recordCount = 0;
try {
schema = restApi.fetchTableSchema(tableName);
schema = restApi.fetchTableSchema(tableName, valueType);
recordCount = restApi.getTableRecordCount(tableName);
} catch (ServiceNowAPIException e) {
throw new RuntimeException(String.format("Error in fetching table metadata due to reason: %s", e.getMessage()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig;
import io.cdap.plugin.servicenow.util.ServiceNowConstants;
import io.cdap.plugin.servicenow.util.ServiceNowTableInfo;
import io.cdap.plugin.servicenow.util.SourceValueType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
Expand Down Expand Up @@ -96,7 +97,10 @@ private static ServiceNowTableInfo getTableMetaData(String tableName, ServiceNow
Schema schema;
int recordCount;
try {
schema = restApi.fetchTableSchema(tableName);
// Use actual value type as connector config does not have an option to select value type
// This is used for ServiceNowMultiSource and provides structure of the table and being dependent on
// connector config, makes it a read only function
schema = restApi.fetchTableSchema(tableName, SourceValueType.SHOW_ACTUAL_VALUE);
recordCount = restApi.getTableRecordCount(tableName);
} catch (ServiceNowAPIException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void fetchData() throws ServiceNowAPIException {
private void fetchSchema(ServiceNowTableAPIClientImpl restApi) {
// Fetch the schema
try {
Schema tempSchema = restApi.fetchTableSchema(tableName);
Schema tempSchema = restApi.fetchTableSchema(tableName, multiSourcePluginConf.getValueType());
tableFields = tempSchema.getFields();
List<Schema.Field> schemaFields = new ArrayList<>(tableFields);
schemaFields.add(Schema.Field.of(tableNameField, Schema.of(Schema.Type.STRING)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
List<ServiceNowTableInfo> tableInfo = ServiceNowInputFormat.fetchTableInfo(conf.getQueryMode(collector),
conf.getConnection(),
conf.getTableName(),
conf.getApplicationName());
conf.getApplicationName(),
conf.getValueType());
stageConfigurer.setOutputSchema(tableInfo.stream().findFirst().get().getSchema());
} else if (conf.getQueryMode() == SourceQueryMode.REPORTING) {
stageConfigurer.setOutputSchema(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private Schema createSchema(ServiceNowColumn column) {
case "guid":
case "translated_html":
case "journal":
case "choice":
case "string":
default:
return Schema.of(Schema.Type.STRING);
Expand Down
Loading
Loading