Skip to content

Commit 59777d1

Browse files
authored
Major improvement to cohort import time (#1796)
- addresses the long import time for TEMPO cohorts - switched to using queries that handle batches of samples - cleaned up redundancies Signed-off-by: Angelica Ochoa <15623749+ao508@users.noreply.github.com>
1 parent c7cff35 commit 59777d1

File tree

14 files changed

+253
-87
lines changed

14 files changed

+253
-87
lines changed

persistence/src/main/java/org/mskcc/smile/persistence/neo4j/CohortCompleteRepository.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ public interface CohortCompleteRepository extends Neo4jRepository<Cohort, Long>
3434
List<Cohort> findCohortsBySamplePrimaryId(@Param("primaryId") String primaryId);
3535

3636
@Query("""
37-
MATCH (s: Sample)-[:HAS_METADATA]->(sm: SampleMetadata {primaryId: $primaryId})
3837
MATCH (c: Cohort {cohortId: $cohortId})
38+
MATCH (s: Sample)-[:HAS_METADATA]->(sm: SampleMetadata)
39+
WHERE sm.primaryId IN $primaryIds
40+
WITH s, c
3941
MERGE (c)-[hcs:HAS_COHORT_SAMPLE]->(s)
4042
""")
4143
void addCohortSampleRelationship(@Param("cohortId") String cohortId,
42-
@Param("primaryId") String primaryId);
44+
@Param("primaryIds") List<String> primaryIds);
4345
}

persistence/src/main/java/org/mskcc/smile/persistence/neo4j/SmileSampleRepository.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.mskcc.smile.persistence.neo4j;
22

33
import java.util.List;
4+
import java.util.Map;
45
import java.util.UUID;
56
import org.mskcc.smile.model.SampleMetadata;
67
import org.mskcc.smile.model.SmileSample;
@@ -196,4 +197,41 @@ SmileSample updateRevisableBySampleId(@Param("smileSampleId") UUID smileSampleId
196197
RETURN DISTINCT sm.primaryId LIMIT 1
197198
""")
198199
String findSamplePrimaryIdByInputId(@Param("inputId") String inputId);
200+
201+
@Query("""
202+
MATCH (s: Sample)<-[:IS_ALIAS]-(sa: SampleAlias)
203+
WITH s, sa, COLLECT {
204+
MATCH (s)-[:HAS_METADATA]->(sm: SampleMetadata)
205+
RETURN sm ORDER BY sm.importDate DESC LIMIT 1
206+
} as latestSm
207+
WITH s, sa, latestSm[0] AS latestSm, $inputIds AS inputIds
208+
WHERE latestSm.primaryId IN $inputIds
209+
OR latestSm.cmoSampleName IN $inputIds
210+
OR latestSm.investigatorSampleId IN $inputIds
211+
OR s.smileSampleId IN $inputIds
212+
OR sa.value IN $inputIds
213+
WITH s, latestSm, inputIds
214+
WITH ({
215+
smileSampleId: s.smileSampleId,
216+
primaryId: latestSm.primaryId,
217+
cmoSampleName: latestSm.cmoSampleName,
218+
investigatorSampleId: latestSm.investigatorSampleId
219+
}) as matchedSample, inputIds
220+
UNWIND inputIds as inputId
221+
WITH inputId, matchedSample, inputIds
222+
WHERE ANY(prop IN keys(matchedSample) WHERE toString(matchedSample[prop]) = inputId)
223+
WITH
224+
COLLECT(DISTINCT inputId) AS matchedIds,
225+
COLLECT(DISTINCT matchedSample.primaryId) as matchedPrimaryIds,
226+
inputIds
227+
WITH matchedIds, matchedPrimaryIds,
228+
[item IN inputIds WHERE NOT item IN matchedIds] as unmatchedIds
229+
WITH ({
230+
matchedIds: matchedIds,
231+
matchedPrimaryIds: matchedPrimaryIds,
232+
unmatchedIds: unmatchedIds
233+
}) AS result
234+
RETURN result
235+
""")
236+
Map<String, Object> findMatchedAndUnmatchedInputSampleIds(List<String> inputIds);
199237
}

persistence/src/main/java/org/mskcc/smile/persistence/neo4j/TempoRepository.java

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,99 @@ OPTIONAL MATCH (p)<-[:IS_ALIAS]-(pa:PatientAlias{namespace: 'dmpId'})
179179
Map<String, Object> findTempoSampleDataBySamplePrimaryId(@Param("primaryId") String primaryId);
180180

181181
@Query("""
182-
MATCH (t: Tempo{smileTempoId: $smileTempoId})
183-
SET t.initialPipelineRunDate = $initialPipelineRunDate,
184-
t.embargoDate = $embargoDate,
185-
t.accessLevel = $accessLevel
186-
""")
187-
void updateTempoData(@Param("smileTempoId") UUID smileTempoId,
188-
@Param("initialPipelineRunDate") String initialPipelineRunDate,
189-
@Param("embargoDate") String embargoDate,
190-
@Param("accessLevel") String accessLevel);
182+
MATCH (s:Sample)-[:HAS_METADATA]->(sm:SampleMetadata)
183+
WHERE sm.primaryId IN $primaryIds
184+
WITH s, sm.primaryId as primaryId, EXISTS {MATCH (s)-[:HAS_TEMPO]->(t:Tempo)} AS hasTempo
185+
WITH hasTempo AS tempoStatus, COLLECT(DISTINCT primaryId) AS samplePrimaryIds
186+
RETURN ({ tempoStatus: tempoStatus, samples: samplePrimaryIds}) as result
187+
""")
188+
List<Map<String, Object>> sortSamplesByTempoStatus(@Param("primaryIds") List<String> primaryIds);
189+
190+
191+
@Query("""
192+
MATCH (r:Request)-[:HAS_SAMPLE]->(s:Sample)-[:HAS_METADATA]->(sm:SampleMetadata)
193+
WHERE sm.primaryId in $primaryIds
194+
WITH r, s,
195+
CASE WHEN (r.labHeadName IS NULL OR r.labHeadName = "")
196+
THEN r.investigatorName ELSE r.labHeadName END AS custodianInformation,
197+
COLLECT {
198+
OPTIONAL MATCH (cc:CohortComplete)<-[:HAS_COHORT_COMPLETE]-(c:Cohort)-[:HAS_COHORT_SAMPLE]->(s)
199+
RETURN cc.date ORDER BY cc.date ASC LIMIT 1
200+
}[0] AS earliestDeliveryDate
201+
WITH custodianInformation, s.smileSampleId as smileSampleId, earliestDeliveryDate,
202+
CASE WHEN (earliestDeliveryDate IS NULL OR earliestDeliveryDate = "")
203+
THEN $ccDeliveryDate ELSE earliestDeliveryDate END AS initialPipelineRunDate
204+
WITH custodianInformation, smileSampleId, earliestDeliveryDate, initialPipelineRunDate,
205+
CASE WHEN (initialPipelineRunDate IS NULL OR initialPipelineRunDate = "") THEN ""
206+
ELSE apoc.temporal.format(datetime(apoc.date.format(
207+
apoc.date.parse(initialPipelineRunDate, "ms", "yyyy-MM-dd HH:mm"),"ms", "yyyy-MM-dd"))
208+
+ Duration({months:18}), 'YYYY-MM-dd HH:mm') END AS embargoDate,
209+
apoc.date.format(apoc.date.currentTimestamp(), 'ms', 'yyyy-MM-dd HHM:mm') AS today
210+
WITH smileSampleId, today, custodianInformation, initialPipelineRunDate, embargoDate,
211+
CASE WHEN (today <= embargoDate OR initialPipelineRunDate = "")
212+
THEN "MSK Embargo" ELSE "MSK Public"
213+
END AS accessLevel
214+
WITH DISTINCT smileSampleId, custodianInformation,
215+
initialPipelineRunDate, embargoDate, accessLevel
216+
CREATE (t:Tempo {
217+
smileTempoId: randomUUID(),
218+
custodianInformation: custodianInformation,
219+
accessLevel: accessLevel,
220+
initialPipelineRunDate: initialPipelineRunDate,
221+
embargoDate: embargoDate,
222+
billedBy: "", costCenter: ""})
223+
WITH smileSampleId, t
224+
MATCH (s:Sample {smileSampleId: smileSampleId})
225+
MERGE (s)-[x:HAS_TEMPO]->(t)
226+
RETURN COUNT(DISTINCT t)
227+
""")
228+
Integer batchCreateTempoNodesForSamplePrimaryIds(@Param("primaryIds") List<String> primaryIds,
229+
@Param("ccDeliveryDate") String ccDeliveryDate);
230+
231+
@Query("""
232+
MATCH (s:Sample)-[:HAS_METADATA]->(sm:SampleMetadata)
233+
WHERE sm.primaryId IN $primaryIds
234+
WITH s,
235+
COLLECT {
236+
OPTIONAL MATCH (cc:CohortComplete)<-[:HAS_COHORT_COMPLETE]-(c:Cohort)-[:HAS_COHORT_SAMPLE]->(s)
237+
RETURN cc.date ORDER BY cc.date ASC LIMIT 1
238+
}[0] AS earliestDeliveryDate
239+
WITH s, earliestDeliveryDate,
240+
apoc.date.format(apoc.date.currentTimestamp(), 'ms', 'yyyy-MM-dd HHM:mm') AS today
241+
MATCH (s)-[:HAS_TEMPO]->(t:Tempo)
242+
WITH s, t, earliestDeliveryDate, today,
243+
CASE WHEN ((t.initialPipelineRunDate IS NULL OR t.initialPipelineRunDate = "")
244+
AND (earliestDeliveryDate IS NULL)) THEN ""
245+
ELSE CASE WHEN (t.initialPipelineRunDate IS NULL OR t.initialPipelineRunDate = ""
246+
OR earliestDeliveryDate < t.initialPipelineRunDate)
247+
THEN earliestDeliveryDate
248+
ELSE t.initialPipelineRunDate
249+
END
250+
END AS updatedInitRundate
251+
SET t.initialPipelineRunDate = updatedInitRundate
252+
WITH s, t, today,
253+
CASE WHEN (t.initialPipelineRunDate IS NULL OR t.initialPipelineRunDate = "")
254+
THEN ""
255+
ELSE apoc.temporal.format(datetime(
256+
apoc.date.format(apoc.date.parse(t.initialPipelineRunDate, "ms", "yyyy-MM-dd HH:mm"),
257+
"ms", "yyyy-MM-dd")) + Duration({months:18}), 'YYYY-MM-dd HH:mm')
258+
END as updatedEmbargoDate
259+
SET t.embargoDate = updatedEmbargoDate
260+
WITH s,t,today,
261+
CASE WHEN (t.initialPipelineRunDate IS NULL OR t.initialPipelineRunDate = "")
262+
THEN t.accessLevel
263+
ELSE
264+
CASE
265+
WHEN (today > t.embargoDate
266+
AND (t.accessLevel = "" OR t.accessLevel IS NULL OR t.accessLevel = "MSK Embargo"))
267+
THEN "MSK Public"
268+
ELSE
269+
CASE WHEN (today <= t.embargoDate AND (t.accessLevel = "" OR t.accessLevel IS NULL))
270+
THEN "MSK Embargo"
271+
END
272+
END
273+
END AS updatedAccessLevel
274+
SET t.accessLevel = updatedAccessLevel
275+
""")
276+
void batchUpdateTempoDataForSamplePrimaryIds(@Param("primaryIds") List<String> primaryIds);
191277
}

service/src/main/java/org/mskcc/smile/service/SmileSampleService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.mskcc.smile.service;
22

33
import java.util.List;
4+
import java.util.Map;
45
import java.util.UUID;
56
import org.mskcc.smile.model.SampleMetadata;
67
import org.mskcc.smile.model.SmileSample;
@@ -40,4 +41,5 @@ List<SmileSample> getSamplesByCategoryAndCmoPatientId(String cmoPatientId,
4041
SampleMetadata getLatestSampleMetadataByPrimaryId(String primaryId) throws Exception;
4142
List<String> getSamplePrimaryIdsBySmileTempoIds(List<UUID> smileTempoIds) throws Exception;
4243
String getSamplePrimaryIdBySampleInputId(String inputId) throws Exception;
44+
Map<String, Object> getMatchedAndUnmatchedInputSampleIds(List<String> inputIds) throws Exception;
4345
}

service/src/main/java/org/mskcc/smile/service/TempoService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.mskcc.smile.service;
22

33
import java.util.List;
4+
import java.util.Map;
45
import java.util.UUID;
56
import org.mskcc.smile.commons.generated.Smile.TempoSample;
67
import org.mskcc.smile.model.SmileSample;
@@ -26,5 +27,8 @@ public interface TempoService {
2627
List<UUID> getTempoIdsNoLongerEmbargoed() throws Exception;
2728
void updateTempoAccessLevel(List<String> samplePrimaryIds, String accessLevel) throws Exception;
2829
TempoSample getTempoSampleDataBySamplePrimaryId(String primaryId) throws Exception;
29-
void updateSampleInitRunDate(String primaryId) throws Exception;
30+
Map<String, Object> sortSamplesByTempoStatus(List<String> primaryIds) throws Exception;
31+
Integer batchCreateTempoNodesForSamplePrimaryIds(List<String> primaryIds, String ccDeliveryDate)
32+
throws Exception;
33+
void batchUpdateTempoDataForSamplePrimaryIds(List<String> primaryIds) throws Exception;
3034
}

service/src/main/java/org/mskcc/smile/service/impl/CohortCompleteServiceImpl.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package org.mskcc.smile.service.impl;
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
4-
import java.util.HashSet;
4+
import java.util.ArrayList;
55
import java.util.List;
6+
import java.util.Map;
67
import java.util.Set;
78
import org.apache.commons.lang3.StringUtils;
89
import org.apache.commons.logging.Log;
@@ -48,28 +49,49 @@ public Cohort saveCohort(Cohort cohort, Set<String> sampleIds) throws Exception
4849
if (sampleIds == null) {
4950
return getCohortByCohortId(cohort.getCohortId());
5051
}
51-
Set<String> unknownSamples = new HashSet<>(); // tracks unknown samples in smile
52-
// create cohort-sample relationships
53-
LOG.info("Adding cohort-sample edges in database for " + sampleIds.size() + " samples...");
54-
for (String sampleId : sampleIds) {
55-
// confirm sample exists by primary id and then link to cohort
56-
if (sampleService.sampleExistsByInputId(sampleId)) {
57-
String primaryId = sampleService.getSamplePrimaryIdBySampleInputId(sampleId);
58-
// init default tempo data for sample if sample does not already have tempo data
59-
if (tempoService.getTempoDataBySamplePrimaryId(primaryId) == null) {
60-
tempoService.initAndSaveDefaultTempoData(primaryId,
61-
cohort.getLatestCohortComplete().getDate());
62-
}
63-
cohortCompleteRepository.addCohortSampleRelationship(cohort.getCohortId(), primaryId);
64-
// the update needs to happen after the new cohort-sample relationship
65-
// is established so that all possible cohort delivery dates are taken into consideration
66-
// when potentially updating the pipeline run date
67-
tempoService.updateSampleInitRunDate(primaryId);
52+
53+
Map<String, Object> result
54+
= sampleService.getMatchedAndUnmatchedInputSampleIds(new ArrayList<>(sampleIds));
55+
if (result.isEmpty()) {
56+
LOG.error("None of the samples provided in the cohort sample list are known to SMILE: "
57+
+ mapper.writeValueAsString(result));
58+
throw new RuntimeException("Cohort does not have any known samples in SMILE"
59+
+ " - check data before reattempting.");
60+
}
61+
62+
// merge cohort-samples
63+
List<String> primaryIds = (List<String>) result.get("matchedPrimaryIds");
64+
LOG.info("Adding cohort-sample edges in database for " + primaryIds.size() + " samples...");
65+
cohortCompleteRepository.addCohortSampleRelationship(cohort.getCohortId(), primaryIds);
66+
LOG.info("Done.");
67+
68+
// create tempo nodes for samples that do not already have tempo data in smile
69+
Map<String, Object> samplesByTempoStatus = tempoService.sortSamplesByTempoStatus(primaryIds);
70+
if (samplesByTempoStatus.containsKey("false")) {
71+
LOG.info("Creating TEMPO nodes for cohort samples...");
72+
List<String> samplesMissingTempoData = (List<String>) samplesByTempoStatus.get("false");
73+
Integer actual = tempoService.batchCreateTempoNodesForSamplePrimaryIds(samplesMissingTempoData,
74+
cohort.getLatestCohortComplete().getDate());
75+
if (actual != samplesMissingTempoData.size()) {
76+
LOG.error("Actual number of TEMPO nodes created does not match expected. "
77+
+ "Actual = " + actual + ", expected = " + samplesMissingTempoData.size());
6878
} else {
69-
unknownSamples.add(sampleId);
79+
LOG.info("Number of TEMPO nodes created = " + samplesMissingTempoData.size());
7080
}
81+
LOG.info("Done");
7182
}
83+
84+
// re-calculate the intiial pipeline rundate embargo date, and access level for samples
85+
// that already have tempo data in smile
86+
if (samplesByTempoStatus.containsKey("true")) {
87+
LOG.info("Updating TEMPO nodes for cohort samples...");
88+
List<String> samplesWithTempoData = (List<String>) samplesByTempoStatus.get("true");
89+
tempoService.batchUpdateTempoDataForSamplePrimaryIds(samplesWithTempoData);
90+
LOG.info("Done. Number of TEMPO nodes updated = " + samplesWithTempoData.size());
91+
}
92+
7293
// log and report unknown samples for reference
94+
List<String> unknownSamples = (List<String>) result.get("unmatchedIds");
7395
if (!unknownSamples.isEmpty()) {
7496
StringBuilder builder = new StringBuilder();
7597
builder.append("[TEMPO COHORT COMPLETE FAILED SAMPLES] Could not import ")

service/src/main/java/org/mskcc/smile/service/impl/SampleServiceImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import java.util.ArrayList;
55
import java.util.List;
6+
import java.util.Map;
67
import java.util.UUID;
78
import java.util.regex.Matcher;
89
import java.util.regex.Pattern;
@@ -540,4 +541,9 @@ public List<String> getSamplePrimaryIdsBySmileTempoIds(List<UUID> smileTempoIds)
540541
public String getSamplePrimaryIdBySampleInputId(String inputId) throws Exception {
541542
return sampleRepository.findSamplePrimaryIdByInputId(inputId);
542543
}
544+
545+
@Override
546+
public Map<String, Object> getMatchedAndUnmatchedInputSampleIds(List<String> inputIds) throws Exception {
547+
return sampleRepository.findMatchedAndUnmatchedInputSampleIds(inputIds);
548+
}
543549
}

0 commit comments

Comments
 (0)