Skip to content

Commit 2274e97

Browse files
committed
PLUGIN-1936: Rework based on new changes suggested in the design
1 parent cf89907 commit 2274e97

21 files changed

+515
-234
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@
9999
<version>${cdap.version}</version>
100100
<scope>test</scope>
101101
</dependency>
102+
<dependency>
103+
<groupId>commons-io</groupId>
104+
<artifactId>commons-io</artifactId>
105+
<version>2.5</version>
106+
</dependency>
102107
<dependency>
103108
<groupId>org.apache.hadoop</groupId>
104109
<artifactId>hadoop-common</artifactId>

src/main/java/io/cdap/plugin/servicenow/ServiceNowBaseConfig.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package io.cdap.plugin.servicenow;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.gson.stream.JsonReader;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Name;
2324
import io.cdap.cdap.api.plugin.PluginConfig;
2425
import io.cdap.cdap.etl.api.FailureCollector;
2526
import io.cdap.plugin.common.ConfigUtil;
27+
import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException;
2628
import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl;
2729
import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIRequestBuilder;
2830
import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig;
@@ -31,14 +33,21 @@
3133
import io.cdap.plugin.servicenow.util.SchemaType;
3234
import io.cdap.plugin.servicenow.util.ServiceNowConstants;
3335
import io.cdap.plugin.servicenow.util.SourceValueType;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3438

39+
import java.io.IOException;
40+
import java.io.InputStream;
41+
import java.io.InputStreamReader;
42+
import java.nio.charset.StandardCharsets;
3543
import javax.annotation.Nullable;
3644

3745
/**
3846
* ServiceNow Base Config. Contains connection properties and methods.
3947
*/
4048
public class ServiceNowBaseConfig extends PluginConfig {
4149

50+
private static final Logger log = LoggerFactory.getLogger(ServiceNowBaseConfig.class);
4251
@Name(ConfigUtil.NAME_USE_CONNECTION)
4352
@Nullable
4453
@Description("Whether to use an existing connection.")
@@ -140,7 +149,7 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo
140149
requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT);
141150

142151
apiResponse = serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
143-
if (serviceNowTableAPIClient.parseResponseToResultListOfMap(apiResponse.getResponseBody()).isEmpty()) {
152+
if (isResultEmpty(apiResponse)) {
144153
// Removed config property as in case of MultiSource, only first table error was populating.
145154
collector.addFailure("Table: " + tableName + " is empty.", "");
146155
}
@@ -152,4 +161,26 @@ public void validateTable(String tableName, SourceValueType valueType, FailureCo
152161
}
153162
}
154163

164+
/**
165+
* Check whether the result is empty or not.
166+
* @param restAPIResponse
167+
* @return true if result is empty
168+
* @throws IOException
169+
*/
170+
public boolean isResultEmpty(RestAPIResponse restAPIResponse) throws IOException {
171+
JsonReader reader = new JsonReader(new InputStreamReader(restAPIResponse.getBodyAsStream(),
172+
StandardCharsets.UTF_8));
173+
reader.beginObject();
174+
while (reader.hasNext()) {
175+
String name = reader.nextName();
176+
if (ServiceNowConstants.RESULT.equals(name)) {
177+
reader.beginArray();
178+
return !reader.hasNext();
179+
} else {
180+
reader.skipValue();
181+
}
182+
}
183+
return true;
184+
}
185+
155186
}

src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,6 @@ public RestAPIResponse fetchTableRecords(
161161
requestBuilder.setAuthHeader(accessToken);
162162
RestAPIResponse apiResponse = executeGetWithRetries(requestBuilder.build());
163163
return apiResponse;
164-
//return parseResponseToResultListOfMap(apiResponse.getResponseBody());
165-
// return parseResponseStreamToRecord(apiResponse.getInputStream());
166-
167164
}
168165

169166
private void applyDateRangeToRequest(ServiceNowTableAPIRequestBuilder requestBuilder, String startDate,
@@ -206,38 +203,35 @@ public List<Map<String, String>> parseResponseToResultListOfMap(String responseB
206203
return GSON.fromJson(ja, type);
207204
}
208205

209-
public Map<String, String> parseResponseStreamToRecord(InputStream in) throws ServiceNowAPIException {
210-
// List<Map<String, String>> records = new ArrayList<>();
211-
// InputStream in = httpResponse.getEntity().getContent();
206+
public List<Map<String, String>> parseResponseToResultListOfMap(InputStream in) throws ServiceNowAPIException {
212207
try (InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8);
213208
JsonReader jsonReader = new JsonReader(reader)) {
214209
jsonReader.setLenient(true);
215210
jsonReader.beginObject();
216-
Map<String, String> record = new HashMap<>();
211+
212+
List<Map<String, String>> records = new ArrayList<>();
217213
while (jsonReader.hasNext()) {
218214
String name = jsonReader.nextName();
219215
if (ServiceNowConstants.RESULT.equals(name) && jsonReader.peek() == JsonToken.BEGIN_ARRAY) {
220216
jsonReader.beginArray();
221217
while (jsonReader.hasNext()) {
222218
jsonReader.beginObject();
223219
while (jsonReader.hasNext()) {
220+
Map<String, String> record = new HashMap<>();
224221
String field = jsonReader.nextName();
225222
JsonToken token = jsonReader.peek();
226-
// JsonElement resultElement = GSON.fromJson(jsonReader, JsonElement.class);
227-
// responseBody = resultElement.toString();
228223
record.put(field, token == JsonToken.NULL ? null : jsonReader.nextString());
224+
records.add(record);
229225
}
230226
jsonReader.endObject();
231-
// records.add(record);
232227
}
233228
jsonReader.endArray();
234229
} else {
235-
// skip other fields (e.g., metadata like result_count)
236230
jsonReader.skipValue();
237231
}
238232
}
239233
jsonReader.endObject();
240-
return record;
234+
return records;
241235
} catch (IOException e) {
242236
throw new ServiceNowAPIException(e, null);
243237
}
@@ -325,8 +319,8 @@ public Schema fetchTableSchema(String tableName, FailureCollector collector) {
325319
}
326320

327321
@VisibleForTesting
328-
public MetadataAPISchemaResponse parseSchemaResponse(String responseBody) {
329-
return GSON.fromJson(responseBody, MetadataAPISchemaResponse.class);
322+
public MetadataAPISchemaResponse parseSchemaResponse(InputStream responseStream) {
323+
return GSON.fromJson(createJsonReader(responseStream), MetadataAPISchemaResponse.class);
330324
}
331325

332326
/**
@@ -400,7 +394,7 @@ public Schema fetchTableSchema(String tableName, String accessToken, SourceValue
400394
private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
401395
String tableName) throws ServiceNowAPIException {
402396
SchemaAPISchemaResponse schemaAPISchemaResponse =
403-
GSON.fromJson(restAPIResponse.getResponseBody(), SchemaAPISchemaResponse.class);
397+
GSON.fromJson(createJsonReader(restAPIResponse.getBodyAsStream()), SchemaAPISchemaResponse.class);
404398

405399
if (schemaAPISchemaResponse.getResult() == null || schemaAPISchemaResponse.getResult().isEmpty()) {
406400
throw new ServiceNowAPIException(
@@ -434,8 +428,7 @@ private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<
434428
private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
435429
String tableName, SourceValueType valueType) throws
436430
ServiceNowAPIException {
437-
MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getResponseBody());
438-
431+
MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getBodyAsStream());
439432
if (metadataAPISchemaResponse.getResult() == null || metadataAPISchemaResponse.getResult().getColumns() == null ||
440433
metadataAPISchemaResponse.getResult().getColumns().isEmpty()) {
441434
throw new ServiceNowAPIException(
@@ -461,6 +454,11 @@ private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, Lis
461454
return SchemaBuilder.constructSchema(tableName, columns);
462455
}
463456

457+
public JsonReader createJsonReader(InputStream inputStream) {
458+
Objects.requireNonNull(inputStream, "InputStream must not be null");
459+
return new JsonReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
460+
}
461+
464462
/**
465463
* Get the total number of records in the table
466464
*
@@ -551,8 +549,8 @@ public String createRecordInDisplayMode(String tableName, HttpEntity entity) thr
551549
}
552550

553551
private String getSystemId(RestAPIResponse restAPIResponse) {
554-
CreateRecordAPIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(),
555-
CreateRecordAPIResponse.class);
552+
CreateRecordAPIResponse apiResponse = GSON.fromJson(
553+
new InputStreamReader(restAPIResponse.getBodyAsStream(), StandardCharsets.UTF_8), CreateRecordAPIResponse.class);
556554
return apiResponse.getResult().get(ServiceNowConstants.SYSTEM_ID).toString();
557555
}
558556

@@ -575,7 +573,8 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String
575573
requestBuilder.setAuthHeader(accessToken);
576574
restAPIResponse = executeGetWithRetries(requestBuilder.build());
577575

578-
APIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), APIResponse.class);
576+
APIResponse apiResponse = GSON.fromJson(
577+
new InputStreamReader(restAPIResponse.getBodyAsStream(), StandardCharsets.UTF_8), APIResponse.class);
579578
return apiResponse.getResult().get(0);
580579
}
581580

@@ -593,8 +592,9 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String
593592
* @throws RuntimeException if the schema response is null or contains no result.
594593
*/
595594
private Schema prepareStringBasedSchema(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns,
596-
String tableName) {
597-
List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody());
595+
String tableName) throws ServiceNowAPIException {
596+
List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getBodyAsStream());
597+
// List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody());
598598
if (result != null && !result.isEmpty()) {
599599
Map<String, String> firstRecord = result.get(0);
600600
for (String key : firstRecord.keySet()) {

src/main/java/io/cdap/plugin/servicenow/connector/ServiceNowConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private TableList listTables(String accessToken) throws ServiceNowAPIException {
135135
ServiceNowTableAPIClientImpl serviceNowTableAPIClient = new ServiceNowTableAPIClientImpl(config, true);
136136
RestAPIResponse apiResponse =
137137
serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
138-
return GSON.fromJson(apiResponse.getResponseBody(), TableList.class);
138+
return GSON.fromJson(serviceNowTableAPIClient.createJsonReader(apiResponse.getBodyAsStream()), TableList.class);
139139
}
140140

141141
public ConnectorSpec generateSpec(ConnectorContext connectorContext, ConnectorSpecRequest connectorSpecRequest) {
@@ -184,7 +184,7 @@ private List<StructuredRecord> getTableData(String tableName, int limit)
184184
requestBuilder.setResponseHeaders(ServiceNowConstants.HEADER_NAME_TOTAL_COUNT);
185185
RestAPIResponse apiResponse = serviceNowTableAPIClient.executeGetWithRetries(requestBuilder.build());
186186
List<Map<String, String>> result = serviceNowTableAPIClient.parseResponseToResultListOfMap
187-
(apiResponse.getResponseBody());
187+
(apiResponse.getBodyAsStream());
188188
List<StructuredRecord> recordList = new ArrayList<>();
189189
Schema schema = getSchema(tableName);
190190
if (schema != null) {

src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public RestAPIResponse executeGet(RestAPIRequest request) throws IOException {
8383
}
8484
} catch (ConnectTimeoutException | SocketException e) {
8585
ServiceNowAPIException exception = new ServiceNowAPIException(e, null);
86-
return new RestAPIResponse(Collections.emptyMap(), (InputStream) null, exception);
86+
return new RestAPIResponse(Collections.emptyMap(), null, exception);
8787
}
8888
}
8989

src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIResponse.java

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import com.google.gson.JsonObject;
2121
import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException;
2222
import io.cdap.plugin.servicenow.util.ServiceNowConstants;
23+
import org.apache.commons.io.IOUtils;
24+
import org.apache.commons.io.input.BoundedInputStream;
2325
import org.apache.http.Header;
2426
import org.apache.http.HttpEntity;
2527
import org.apache.http.HttpResponse;
2628
import org.apache.http.HttpStatus;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2731

2832
import java.io.ByteArrayInputStream;
29-
import java.io.ByteArrayOutputStream;
3033
import java.io.IOException;
3134
import java.io.InputStream;
3235
import java.nio.charset.StandardCharsets;
@@ -44,42 +47,33 @@
4447
* Pojo class to capture the API response.
4548
*/
4649
public class RestAPIResponse {
50+
private static final Logger LOG = LoggerFactory.getLogger(RestAPIResponse.class);
4751
private static final Gson GSON = new Gson();
4852
private static final String HTTP_ERROR_MESSAGE = "Http call to ServiceNow instance returned status code %d.";
4953
private static final String REST_ERROR_MESSAGE = "Rest Api response has errors. Error message: %s.";
5054
private static final Set<Integer> SUCCESS_CODES = new HashSet<>(Arrays.asList(HttpStatus.SC_CREATED,
5155
HttpStatus.SC_OK));
56+
private static final long MAX_PAGE_BYTES = 50L * 1024 * 1024; // 50 MB (Upper Bound)
5257
private final Map<String, String> headers;
53-
// Deprecated: storing full body as String can cause OOM
54-
@Deprecated
55-
private String responseBody;
5658
@Nullable private final ServiceNowAPIException exception;
5759

58-
// New: store InputStream for streaming consumption
59-
private InputStream inputStream;
60-
61-
public RestAPIResponse(
62-
Map<String, String> headers,
63-
@Nullable String responseBody,
64-
InputStream inputStream,
65-
@Nullable ServiceNowAPIException exception) {
66-
this.headers = headers;
67-
this.responseBody = responseBody;
68-
this.inputStream = inputStream;
69-
this.exception = exception;
70-
}
60+
// New: store byte array
61+
private byte[] responseBody;
7162

7263
public RestAPIResponse(
7364
Map<String, String> headers,
74-
InputStream inputStream,
65+
byte[] responseBody,
7566
@Nullable ServiceNowAPIException exception) {
7667
this.headers = headers;
77-
this.inputStream = inputStream;
68+
this.responseBody = responseBody;
7869
this.exception = exception;
7970
}
8071

8172
/**
8273
* Parses HttpResponse into RestAPIResponse object when no errors occur.
74+
* The RESTAPIResponse contains the HTTP response body as a stream. This stream is:
75+
* single-use, forward-only and owned by the caller. Caller is responsible for consuming and closing it.
76+
*
8377
* Throws a {@link ServiceNowAPIException}.
8478
*
8579
* @param httpResponse The HttpResponse object to parse
@@ -100,43 +94,33 @@ public static RestAPIResponse parse(HttpResponse httpResponse, String... headerN
10094

10195
ServiceNowAPIException serviceNowAPIException = validateHttpResponse(httpResponse);
10296
if (serviceNowAPIException != null) {
103-
return new RestAPIResponse(headers, null, null, serviceNowAPIException);
97+
return new RestAPIResponse(headers, null, serviceNowAPIException);
10498
}
105-
/*try {
106-
responseBody = EntityUtils.toString(httpResponse.getEntity());
107-
} catch (IOException e) {
108-
return new RestAPIResponse(headers, null, null, new ServiceNowAPIException(e, httpResponse));
109-
}*/
11099
try {
111-
return prepareResponseWithBodyAndStream(httpResponse, headers, serviceNowAPIException);
100+
return prepareResponseStream(httpResponse, headers, serviceNowAPIException);
112101
} catch (IOException e) {
113-
return new RestAPIResponse(headers, null, null, new ServiceNowAPIException(e, httpResponse));
102+
return new RestAPIResponse(headers, null, new ServiceNowAPIException(e, httpResponse));
114103
}
115104
}
116105

117-
public static RestAPIResponse prepareResponseWithBodyAndStream(HttpResponse httpResponse, Map<String, String> headers,
106+
public static RestAPIResponse prepareResponseStream(HttpResponse httpResponse, Map<String, String> headers,
118107
ServiceNowAPIException serviceNowAPIException) throws IOException {
119108
HttpEntity httpEntity = httpResponse.getEntity();
109+
byte[] responseBody = new byte[0];
110+
InputStream inputStream;
120111
if (httpEntity != null) {
121-
try (InputStream inputStream = httpEntity.getContent();
122-
ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
123-
124-
// Copy the InputStream into the ByteArrayOutputStream
125-
byte[] data = new byte[8192];
126-
int bytesRead;
127-
while ((bytesRead = inputStream.read(data)) != -1) {
128-
buffer.write(data, 0, bytesRead);
129-
}
130-
// Convert the buffer to a String for the responseBody
131-
String responseBody = buffer.toString(String.valueOf(StandardCharsets.UTF_8));
132-
serviceNowAPIException = validateRestApiResponse(httpResponse, responseBody);
133-
// Create a new InputStream from the buffer for further processing
134-
InputStream reusableStream = new ByteArrayInputStream(buffer.toByteArray());
135-
// return new RestAPIResponse(headers, responseBody, serviceNowAPIException);
136-
return new RestAPIResponse(headers, responseBody, reusableStream, serviceNowAPIException);
112+
inputStream = httpEntity.getContent();
113+
BoundedInputStream boundedInputStream = new BoundedInputStream(
114+
inputStream, MAX_PAGE_BYTES + 1); // +1 to detect overflow
115+
responseBody = IOUtils.toByteArray(boundedInputStream);
116+
LOG.info("RAW JSON: {}", new String(responseBody, StandardCharsets.UTF_8));
117+
if (responseBody.length > MAX_PAGE_BYTES) {
118+
throw new IOException(
119+
"ServiceNow page exceeded max allowed size: " + MAX_PAGE_BYTES);
137120
}
121+
return new RestAPIResponse(headers, responseBody, serviceNowAPIException);
138122
} else {
139-
return new RestAPIResponse(headers, null, null, serviceNowAPIException);
123+
return new RestAPIResponse(headers, responseBody, serviceNowAPIException);
140124
}
141125
}
142126

@@ -174,12 +158,16 @@ public Map<String, String> getHeaders() {
174158
}
175159

176160
@Nullable
177-
public String getResponseBody() {
161+
public byte[] getResponseBody() {
178162
return responseBody;
179163
}
180164

181-
public InputStream getInputStream() {
182-
return inputStream;
165+
/**
166+
* Returns a fresh InputStream for the response body. Caller must close the stream.
167+
* @return InputStream
168+
*/
169+
public InputStream getBodyAsStream() {
170+
return responseBody == null ? null : new ByteArrayInputStream(responseBody);
183171
}
184172

185173
@Nullable

0 commit comments

Comments
 (0)