Skip to content

Commit bbc4a2d

Browse files
ryanjdewMarkLogic Builder
authored andcommitted
DHFPROD-3068: Be more responsible with our streams
1 parent e8414cd commit bbc4a2d

File tree

8 files changed

+266
-266
lines changed

8 files changed

+266
-266
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/collector/impl/FileCollector.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.nio.file.Path;
1111
import java.nio.file.Paths;
1212
import java.util.*;
13+
import java.util.stream.Stream;
1314

1415
public class FileCollector {
1516
private String filePath;
@@ -54,19 +55,19 @@ public DiskQueue<String> run() {
5455
if(!(Files.exists(dirPath)) || !(Files.isDirectory(dirPath))) {
5556
throw new RuntimeException("The path doesn't exist or is not a directory");
5657
}
57-
Files.find(dirPath,
58+
try (Stream<Path> files = Files.find(dirPath,
5859
Integer.MAX_VALUE,
59-
(filePath, fileAttr) -> fileAttr.isRegularFile())
60-
.forEach(path -> {
60+
(filePath, fileAttr) -> fileAttr.isRegularFile())) {
61+
files.forEach(path -> {
6162
File file = path.toFile();
6263
String fileName = FilenameUtils.getExtension(file.getName()).toLowerCase();
6364
if (fileFormat.containsKey(inputFormat.toLowerCase()) && fileFormat.get(inputFormat.toLowerCase()).contains(fileName)) {
6465
results.add(path.toFile().getAbsolutePath());
65-
}
66-
else if("binary".equalsIgnoreCase(inputFormat) && !csvExts.contains(fileName) && !jsonExts.contains(fileName) && !xmlExts.contains(fileName)){
66+
} else if ("binary".equalsIgnoreCase(inputFormat) && !csvExts.contains(fileName) && !jsonExts.contains(fileName) && !xmlExts.contains(fileName)) {
6767
results.add(path.toFile().getAbsolutePath());
6868
}
6969
});
70+
}
7071
} catch (IOException e) {
7172
throw new RuntimeException(e);
7273
}

marklogic-data-hub/src/main/java/com/marklogic/hub/impl/DataHubImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,12 @@ public void clearUserModules() {
347347

348348
HashSet<String> services = new HashSet<>();
349349
for (Resource r : resolver.getResources("classpath*:/ml-modules/root/marklogic.rest.resource/*")) {
350-
services.add(r.getFilename().replaceAll("\\.(xqy|sjs)", ""));
350+
services.add(r.getFilename());
351351
}
352352

353353
HashSet<String> transforms = new HashSet<>();
354354
for (Resource r : resolver.getResources("classpath*:/ml-modules/root/marklogic.rest.transform/*")) {
355-
transforms.add(r.getFilename().replaceAll("\\.(xqy|sjs)", ""));
355+
transforms.add(r.getFilename());
356356
}
357357

358358
ServerConfigurationManager configMgr = hubConfig.newStagingClient().newServerConfigManager();

marklogic-data-hub/src/main/java/com/marklogic/hub/impl/HubProjectImpl.java

Lines changed: 177 additions & 177 deletions
Large diffs are not rendered by default.

marklogic-data-hub/src/main/java/com/marklogic/hub/impl/MappingManagerImpl.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ public class MappingManagerImpl extends LoggingObject implements MappingManager
124124
ObjectMapper objectMapper = new ObjectMapper();
125125
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
126126
Object json = objectMapper.readValue(mappingString, Object.class);
127-
FileOutputStream fileOutputStream = new FileOutputStream(file);
128-
fileOutputStream.write(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json).getBytes());
129-
fileOutputStream.flush();
130-
fileOutputStream.close();
127+
try (FileOutputStream fileOutputStream = new FileOutputStream(file)) {
128+
fileOutputStream.write(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json).getBytes());
129+
fileOutputStream.flush();
130+
}
131131
} catch (JsonProcessingException e) {
132132
throw new DataHubProjectException("Could not serialize mapping for project.");
133133
} catch (IOException e){
@@ -182,9 +182,7 @@ private Mapping getMappingVersion(String mappingName, int version){
182182
}
183183
}
184184
if(targetFileName !=null ){
185-
try {
186-
//String jsonMap = new String(Files.readAllBytes(mappingPath.resolve(targetFileName)), StandardCharsets.UTF_8);
187-
FileInputStream fileInputStream = new FileInputStream(mappingPath.resolve(targetFileName).toFile());
185+
try (FileInputStream fileInputStream = new FileInputStream(mappingPath.resolve(targetFileName).toFile())) {
188186
ObjectMapper objectMapper = new ObjectMapper();
189187
JsonNode node = objectMapper.readTree(fileInputStream);
190188
Mapping newMap = createMappingFromJSON(node);

marklogic-data-hub/src/main/java/com/marklogic/hub/impl/ScaffoldingImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ public void createDefaultFlow(String flowName) {
166166
fileContents = fileContents.replace(key, value);
167167
}
168168
}
169-
FileWriter writer = new FileWriter(flowFile);
170-
writer.write(fileContents);
171-
writer.close();
169+
try (FileWriter writer = new FileWriter(flowFile)) {
170+
writer.write(fileContents);
171+
}
172172
}
173173
catch (IOException e) {
174174
throw new RuntimeException(e);
@@ -241,8 +241,7 @@ public void createDefaultFlow(String flowName) {
241241
}
242242

243243
private Document readLegacyFlowXml(File file) {
244-
try {
245-
FileInputStream is = new FileInputStream(file);
244+
try (FileInputStream is = new FileInputStream(file)) {
246245
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
247246
factory.setNamespaceAware(true);
248247
DocumentBuilder builder = factory.newDocumentBuilder();

marklogic-data-hub/src/main/java/com/marklogic/hub/legacy/job/impl/LegacyJobManagerImpl.java

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -173,66 +173,66 @@ public LegacyJobManagerImpl(DatabaseClient jobClient) {
173173
}
174174

175175
@Override public void importJobs(Path importFilePath) throws IOException {
176-
ZipFile importZip = new ZipFile(importFilePath.toFile());
177-
Enumeration<? extends ZipEntry> entries = importZip.entries();
176+
try(ZipFile importZip = new ZipFile(importFilePath.toFile())) {
177+
Enumeration<? extends ZipEntry> entries = importZip.entries();
178178

179-
DataMovementManager dmm = jobClient.newDataMovementManager();
180-
WriteBatcher writer = dmm
181-
.newWriteBatcher()
182-
.withJobName("Load jobs")
183-
.withBatchSize(50);
184-
JobTicket ticket = dmm.startJob(writer);
185-
186-
// Add each Job entry to the writer; set aside the Trace entries.
187-
ArrayList<ZipEntry> traceEntries = new ArrayList<ZipEntry>();
188-
DocumentMetadataHandle jobMetadata = new DocumentMetadataHandle().withCollections("job");
189-
while (entries.hasMoreElements()) {
190-
ZipEntry entry = entries.nextElement();
191-
192-
if (entry.getName().startsWith("/jobs/")) {
193-
// Delimiter = \A, which is the beginning of the input
194-
Scanner s = new Scanner(importZip.getInputStream(entry)).useDelimiter("\\A");
195-
String entryText = s.hasNext() ? s.next() : "";
196-
197-
writer.add(
198-
entry.getName(),
199-
jobMetadata,
200-
new StringHandle(entryText).withFormat(Format.JSON)
201-
);
202-
}
203-
else {
204-
traceEntries.add(entry);
205-
}
206-
}
207-
208-
writer.flushAndWait();
209-
dmm.stopJob(ticket);
210-
dmm.release();
211-
212-
if (traceEntries.size() > 0) {
213-
dmm = this.jobClient.newDataMovementManager();
214-
writer = dmm
179+
DataMovementManager dmm = jobClient.newDataMovementManager();
180+
WriteBatcher writer = dmm
215181
.newWriteBatcher()
216-
.withJobName("Load traces");
217-
ticket = dmm.startJob(writer);
218-
219-
DocumentMetadataHandle traceMetadata = new DocumentMetadataHandle().withCollections("trace");
220-
221-
for (ZipEntry entry: traceEntries) {
222-
// Delimiter = \A, which is the beginning of the input
223-
Scanner s = new Scanner(importZip.getInputStream(entry)).useDelimiter("\\A");
224-
String entryText = s.hasNext() ? s.next() : "";
225-
226-
writer.add(
227-
entry.getName(),
228-
traceMetadata,
229-
new StringHandle(entryText)
230-
.withFormat(entry.getName().endsWith(".json") ? Format.JSON : Format.XML)
231-
);
182+
.withJobName("Load jobs")
183+
.withBatchSize(50);
184+
JobTicket ticket = dmm.startJob(writer);
185+
186+
// Add each Job entry to the writer; set aside the Trace entries.
187+
ArrayList<ZipEntry> traceEntries = new ArrayList<ZipEntry>();
188+
DocumentMetadataHandle jobMetadata = new DocumentMetadataHandle().withCollections("job");
189+
while (entries.hasMoreElements()) {
190+
ZipEntry entry = entries.nextElement();
191+
192+
if (entry.getName().startsWith("/jobs/")) {
193+
// Delimiter = \A, which is the beginning of the input
194+
Scanner s = new Scanner(importZip.getInputStream(entry)).useDelimiter("\\A");
195+
String entryText = s.hasNext() ? s.next() : "";
196+
197+
writer.add(
198+
entry.getName(),
199+
jobMetadata,
200+
new StringHandle(entryText).withFormat(Format.JSON)
201+
);
202+
} else {
203+
traceEntries.add(entry);
204+
}
232205
}
206+
233207
writer.flushAndWait();
234208
dmm.stopJob(ticket);
235209
dmm.release();
210+
211+
if (traceEntries.size() > 0) {
212+
dmm = this.jobClient.newDataMovementManager();
213+
writer = dmm
214+
.newWriteBatcher()
215+
.withJobName("Load traces");
216+
ticket = dmm.startJob(writer);
217+
218+
DocumentMetadataHandle traceMetadata = new DocumentMetadataHandle().withCollections("trace");
219+
220+
for (ZipEntry entry : traceEntries) {
221+
// Delimiter = \A, which is the beginning of the input
222+
Scanner s = new Scanner(importZip.getInputStream(entry)).useDelimiter("\\A");
223+
String entryText = s.hasNext() ? s.next() : "";
224+
225+
writer.add(
226+
entry.getName(),
227+
traceMetadata,
228+
new StringHandle(entryText)
229+
.withFormat(entry.getName().endsWith(".json") ? Format.JSON : Format.XML)
230+
);
231+
}
232+
writer.flushAndWait();
233+
dmm.stopJob(ticket);
234+
dmm.release();
235+
}
236236
}
237237
}
238238

marklogic-data-hub/src/main/java/com/marklogic/hub/step/impl/WriteStepRunner.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.marklogic.hub.step.*;
4040
import com.marklogic.hub.util.json.JSONObject;
4141
import org.apache.commons.io.FilenameUtils;
42+
import org.apache.commons.io.IOUtils;
4243
import org.apache.commons.lang3.StringUtils;
4344
import org.apache.commons.lang3.SystemUtils;
4445
import org.slf4j.Logger;
@@ -658,40 +659,41 @@ private void processCsv(JacksonHandle jacksonHandle, File file) {
658659
}
659660

660661
private void addToBatcher(File file, Format fileFormat) throws IOException{
662+
// This docStream must not be closed, or use try-resource due to WriteBatcher needing the stream open
661663
FileInputStream docStream = new FileInputStream(file);
662664
//note these ORs are for forward compatibility if we swap out the filecollector for another lib
663-
if(inputFileType.equalsIgnoreCase("csv") || inputFileType.equalsIgnoreCase("tsv") || inputFileType.equalsIgnoreCase("psv")) {
665+
if (inputFileType.equalsIgnoreCase("csv") || inputFileType.equalsIgnoreCase("tsv") || inputFileType.equalsIgnoreCase("psv")) {
664666
CsvSchema schema = CsvSchema.emptySchema()
665667
.withHeader()
666668
.withColumnSeparator(separator.charAt(0));
667669
JacksonCSVSplitter splitter = new JacksonCSVSplitter().withCsvSchema(schema);
668670
try {
669-
if(! writeBatcher.isStopped()) {
671+
if (!writeBatcher.isStopped()) {
670672
Stream<JacksonHandle> contentStream = splitter.split(docStream);
671673
contentStream.forEach(jacksonHandle -> this.processCsv(jacksonHandle, file));
672674
}
673675
} catch (Exception e) {
674-
throw new RuntimeException(e);
676+
IOUtils.closeQuietly(docStream);
677+
throw new RuntimeException(e);
675678
}
676-
}
677-
else {
679+
} else {
678680
InputStreamHandle handle = new InputStreamHandle(docStream);
679-
handle.setFormat(fileFormat);
680681
try {
681-
if(! writeBatcher.isStopped()) {
682+
handle.setFormat(fileFormat);
683+
if (!writeBatcher.isStopped()) {
682684
try {
683685
String uri = file.getAbsolutePath();
684686
//In case of Windows, C:\\Documents\\abc.json will be converted to /c/Documents/abc.json
685-
if(SystemUtils.OS_NAME.toLowerCase().contains("windows")){
687+
if (SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
686688
uri = "/" + FilenameUtils.separatorsToUnix(StringUtils.replaceOnce(uri, ":", ""));
687689
}
688690
writeBatcher.add(generateAndEncodeURI(outputURIReplace(uri)), handle);
689-
}
690-
catch (IllegalStateException e) {
691+
} catch (IllegalStateException e) {
691692
logger.error("WriteBatcher has been stopped");
692693
}
693694
}
694695
} catch (URISyntaxException e) {
696+
IOUtils.closeQuietly(handle);
695697
throw new RuntimeException(e);
696698
}
697699
}

web/src/main/java/com/marklogic/hub/web/service/LegacyFlowManagerService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ public void saveOrUpdateHarmonizeFlowOptionsToFile(String entityName, String flo
143143
}
144144
Path filePath = getHarmonizeOptionsFilePath(destFolder, entityName, flowName);
145145
FileWriter fw = new FileWriter(filePath.toString());
146-
BufferedWriter bw = new BufferedWriter(fw);
147-
bw.write(harmonizeOptionsFileContent);
148-
bw.close();
146+
try (BufferedWriter bw = new BufferedWriter(fw)) {
147+
bw.write(harmonizeOptionsFileContent);
148+
}
149149
}
150150

151151

@@ -161,9 +161,9 @@ public void saveOrUpdateFlowMlcpOptionsToFile(String entityName, String flowName
161161
}
162162
Path filePath = getMlcpOptionsFilePath(destFolder, entityName, flowName);
163163
FileWriter fw = new FileWriter(filePath.toString());
164-
BufferedWriter bw = new BufferedWriter(fw);
165-
bw.write(mlcpOptionsFileContent);
166-
bw.close();
164+
try (BufferedWriter bw = new BufferedWriter(fw)) {
165+
bw.write(mlcpOptionsFileContent);
166+
}
167167
}
168168

169169
public Map<String, Object> getFlowMlcpOptionsFromFile(String entityName, String flowName) throws IOException {

0 commit comments

Comments
 (0)