Skip to content

Commit e202574

Browse files
change drug molecule map into a struct
Co-authored-by: project-defiant <szymonszyszkowski@gmail.com>
1 parent 3c169d4 commit e202574

File tree

4 files changed

+35
-7
lines changed

4 files changed

+35
-7
lines changed

src/main/scala/io/opentargets/etl/backend/Search.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,24 @@ object Transformers {
447447

448448
}
449449

450+
/** Collect cross reference ids for the drugs
451+
* @param crossReferences
452+
* crossReferences column from the drug dataframe
453+
* @return
454+
* a column with an array of cross reference ids
455+
*/
456+
def collectCrossReferenceIds(crossReferences: Column): Column =
457+
sort_array(
458+
array_distinct(
459+
flatten(
460+
transform(
461+
crossReferences,
462+
x => x.getField("ids")
463+
)
464+
)
465+
)
466+
).alias("crossReferences")
467+
450468
// uses target_ids, drug_id, target_labels, disease_id, disease_labels
451469
def setIdAndSelectFromDrugs(
452470
associatedDrugs: DataFrame,
@@ -502,6 +520,10 @@ object Transformers {
502520
when(col("disease_labels").isNull, Array.empty[String])
503521
.otherwise(col("disease_labels"))
504522
)
523+
.withColumn(
524+
"crossReferences",
525+
collectCrossReferenceIds(col("crossReferences"))
526+
)
505527
SearchIndex(
506528
id = col("id"),
507529
name = col("name"),
@@ -514,9 +536,7 @@ object Transformers {
514536
"array(name)",
515537
"array(id)",
516538
"childChemblIds",
517-
"crossReferences.PubChem",
518-
"crossReferences.drugbank",
519-
"crossReferences.chEBI"
539+
"crossReferences"
520540
),
521541
prefixes = C.flattenCat(
522542
"synonyms",

src/main/scala/io/opentargets/etl/backend/drug/Drug.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ object Drug extends Serializable with LazyLogging {
7070
)
7171

7272
// We define a drug as having either a drugbank id, a mechanism of action, an indication, or if it is a chemical probe.
73-
val isDrugMolecule: Column = array_contains(map_keys(col("crossReferences")), "drugbank") ||
73+
val isDrugMolecule: Column = array_contains(col("crossReferences.source"), "drugbank") ||
7474
col("indications").isNotNull ||
7575
col("mechanismsOfAction").isNotNull ||
7676
col("chemicalProbeDrugId").isNotNull

src/main/scala/io/opentargets/etl/backend/drug/Molecule.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.apache.spark.sql.functions.{
1515
explode,
1616
lit,
1717
map_concat,
18+
struct,
1819
typedLit,
1920
udf,
2021
upper,
@@ -164,7 +165,14 @@ object Molecule extends LazyLogging {
164165
.foldLeft(chemblCrossReferences)((agg, a) => mergeCrossReferenceMaps(agg, a))
165166
.filter(col(XREF_COLUMN_NAME).isNotNull)
166167
.withColumnRenamed(XREF_COLUMN_NAME, "crossReferences")
167-
references
168+
169+
val transformedCrossReference = references
170+
.select(col("id"), explode(col("crossReferences")))
171+
.withColumnRenamed("key", "source")
172+
.withColumnRenamed("value", "ids")
173+
.groupBy("id")
174+
.agg(collect_set(struct(col("source"), col("ids"))).as("crossReferences"))
175+
transformedCrossReference
168176
}
169177

170178
/** @param preProcessedMolecules

src/test/scala/io/opentargets/etl/backend/Drug/MoleculeTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,10 @@ class MoleculeTest extends EtlSparkUnitTest {
202202
.json(this.getClass.getResource("/sample_mol_after_preprocessing.json").getPath)
203203
// when
204204
val results = Molecule invokePrivate processMoleculeCrossReferences(sampleMolecule)
205-
val xrefMap = results.head.getMap(1)
205+
val xrefMap = results.head.getList(1)
206206
// then
207207
assertResult(4) {
208-
xrefMap.keySet.size
208+
xrefMap.size
209209
}
210210
}
211211

0 commit comments

Comments
 (0)