Skip to content

Commit 4235153

Browse files
committed
PLUGIN-1936: Refactor code to implement streaming logic
1 parent 1f2d9d7 commit 4235153

15 files changed

+241
-67
lines changed

src/main/java/io/cdap/plugin/servicenow/apiclient/ServiceNowTableAPIClientImpl.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public String getAccessTokenRetryableMode() throws ExecutionException, RetryExce
138138
* @param limit The number of records to be fetched
139139
* @return The list of Map; each Map representing a table row
140140
*/
141-
public List<Map<String, String>> fetchTableRecords(
141+
public RestAPIResponse fetchTableRecords(
142142
String tableName,
143143
SourceValueType valueType,
144144
String startDate,
@@ -161,8 +161,9 @@ public List<Map<String, String>> fetchTableRecords(
161161
String accessToken = getAccessToken();
162162
requestBuilder.setAuthHeader(accessToken);
163163
RestAPIResponse apiResponse = executeGetWithRetries(requestBuilder.build());
164+
return apiResponse;
164165
//return parseResponseToResultListOfMap(apiResponse.getResponseBody());
165-
return parseResponseStreamToResultListOfMap(apiResponse.getInputStream());
166+
// return parseResponseStreamToRecord(apiResponse.getInputStream());
166167

167168
}
168169

@@ -206,20 +207,20 @@ public List<Map<String, String>> parseResponseToResultListOfMap(String responseB
206207
return GSON.fromJson(ja, type);
207208
}
208209

209-
public List<Map<String, String>> parseResponseStreamToResultListOfMap(InputStream in) throws ServiceNowAPIException {
210-
List<Map<String, String>> records = new ArrayList<>();
210+
public Map<String, String> parseResponseStreamToRecord(InputStream in) throws ServiceNowAPIException {
211+
// List<Map<String, String>> records = new ArrayList<>();
211212
// InputStream in = httpResponse.getEntity().getContent();
212213
try (InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8);
213214
JsonReader jsonReader = new JsonReader(reader)) {
214215
jsonReader.setLenient(true);
215216
jsonReader.beginObject();
217+
Map<String, String> record = new HashMap<>();
216218
while (jsonReader.hasNext()) {
217219
String name = jsonReader.nextName();
218220
if (ServiceNowConstants.RESULT.equals(name) && jsonReader.peek() == JsonToken.BEGIN_ARRAY) {
219221
jsonReader.beginArray();
220222
while (jsonReader.hasNext()) {
221223
jsonReader.beginObject();
222-
Map<String, String> record = new HashMap<>();
223224
while (jsonReader.hasNext()) {
224225
String field = jsonReader.nextName();
225226
JsonToken token = jsonReader.peek();
@@ -228,7 +229,7 @@ public List<Map<String, String>> parseResponseStreamToResultListOfMap(InputStrea
228229
record.put(field, token == JsonToken.NULL ? null : jsonReader.nextString());
229230
}
230231
jsonReader.endObject();
231-
records.add(record);
232+
// records.add(record);
232233
}
233234
jsonReader.endArray();
234235
} else {
@@ -237,7 +238,7 @@ public List<Map<String, String>> parseResponseStreamToResultListOfMap(InputStrea
237238
}
238239
}
239240
jsonReader.endObject();
240-
return records;
241+
return record;
241242
} catch (IOException e) {
242243
throw new ServiceNowAPIException(e, null);
243244
}
@@ -277,12 +278,14 @@ private String getErrorMessage(String responseBody) {
277278
* @param limit The number of records to be fetched
278279
* @return The list of Map; each Map representing a table row
279280
*/
280-
public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType,
281+
public RestAPIResponse fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType,
281282
String startDate, String endDate, int offset,
282283
int limit) throws ServiceNowAPIException {
283-
final List<Map<String, String>> results = new ArrayList<>();
284+
//final List<Map<String, String>> results = new ArrayList<>();
285+
final RestAPIResponse[] restAPIResponse = new RestAPIResponse[1];
284286
Callable<Boolean> fetchRecords = () -> {
285-
results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit));
287+
// results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit));
288+
restAPIResponse[0] = fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit);
286289
return true;
287290
};
288291

@@ -300,7 +303,7 @@ public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName
300303
e, null, false);
301304
}
302305

303-
return results;
306+
return restAPIResponse[0];
304307
}
305308

306309
/**

src/main/java/io/cdap/plugin/servicenow/restapi/RestAPIResponse.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ public class RestAPIResponse {
6464
public RestAPIResponse(
6565
Map<String, String> headers,
6666
@Nullable String responseBody,
67+
InputStream inputStream,
6768
@Nullable ServiceNowAPIException exception) {
6869
this.headers = headers;
6970
this.responseBody = responseBody;
71+
this.inputStream = inputStream;
7072
this.exception = exception;
7173
}
7274

@@ -101,26 +103,26 @@ public static RestAPIResponse parse(HttpResponse httpResponse, String... headerN
101103

102104
ServiceNowAPIException serviceNowAPIException = validateHttpResponse(httpResponse);
103105
if (serviceNowAPIException != null) {
104-
return new RestAPIResponse(headers, (InputStream) null, serviceNowAPIException);
106+
return new RestAPIResponse(headers, null, null, serviceNowAPIException);
105107
}
106108

107109
String responseBody = null;
108110
try {
109111
responseBody = EntityUtils.toString(httpResponse.getEntity());
110112
} catch (IOException e) {
111-
return new RestAPIResponse(headers, (String) null, new ServiceNowAPIException(e, httpResponse));
113+
return new RestAPIResponse(headers, null, null, new ServiceNowAPIException(e, httpResponse));
112114
}
113115
// Instead of reading the entire entity, store the stream
114116
HttpEntity httpEntity = httpResponse.getEntity();
115117
InputStream responseStream;
116118
try {
117119
responseStream = (httpEntity != null) ? httpEntity.getContent() : null;
118120
} catch (IOException e) {
119-
return new RestAPIResponse(headers, (InputStream) null, new ServiceNowAPIException(e, httpResponse));
121+
return new RestAPIResponse(headers, null, null, new ServiceNowAPIException(e, httpResponse));
120122
}
121123
serviceNowAPIException = validateRestApiResponse(httpResponse, responseBody);
122124
// return new RestAPIResponse(headers, responseBody, serviceNowAPIException);
123-
return new RestAPIResponse(headers, responseStream, serviceNowAPIException);
125+
return new RestAPIResponse(headers, responseBody, responseStream, serviceNowAPIException);
124126

125127
}
126128

src/main/java/io/cdap/plugin/servicenow/source/ServiceNowMultiRecordReader.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.cdap.plugin.servicenow.apiclient.ServiceNowAPIException;
2323
import io.cdap.plugin.servicenow.apiclient.ServiceNowTableAPIClientImpl;
2424
import io.cdap.plugin.servicenow.connector.ServiceNowRecordConverter;
25+
import io.cdap.plugin.servicenow.restapi.RestAPIResponse;
2526
import org.apache.hadoop.mapreduce.InputSplit;
2627
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2728

@@ -91,14 +92,14 @@ public StructuredRecord getCurrentValue() throws IOException {
9192
}
9293

9394
@VisibleForTesting
94-
void fetchData() throws ServiceNowAPIException {
95+
RestAPIResponse fetchData() throws ServiceNowAPIException {
9596
// Get the table data
96-
results = restApi.fetchTableRecordsRetryableMode(tableName, multiSourcePluginConf.getValueType(),
97-
multiSourcePluginConf.getStartDate(),
98-
multiSourcePluginConf.getEndDate(), split.getOffset(),
99-
multiSourcePluginConf.getPageSize());
97+
RestAPIResponse restAPIResponse = restApi.fetchTableRecordsRetryableMode(tableName,
98+
multiSourcePluginConf.getValueType(), multiSourcePluginConf.getStartDate(), multiSourcePluginConf.getEndDate(),
99+
split.getOffset(), multiSourcePluginConf.getPageSize());
100100

101-
iterator = results.iterator();
101+
// iterator = results.iterator();
102+
return restAPIResponse;
102103
}
103104

104105
private void fetchSchema(ServiceNowTableAPIClientImpl restApi) {

0 commit comments

Comments
 (0)