Skip to content

Commit 4d13f70

Browse files
authored
add logic to date evidence based on publication dates
* feat: adding logic to date evidence * feat: adding tests for new method * doc: adding docstring to function * chore: refactoring processed publication data * chore: moving date resolution logic to the right place * fix: removing broadcasting to enable joining * fix: addressing reviewer comments
1 parent 96e2b9d commit 4d13f70

File tree

3 files changed

+213
-0
lines changed

3 files changed

+213
-0
lines changed

src/main/resources/reference.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,10 @@ steps: {
515515
}
516516
targets: ${steps.target.output.target}
517517
mechanism_of_action: ${steps.drug.output.mechanism_of_action}
518+
literature_dating: {
519+
format: "json"
520+
path: "gs://otar000-evidence_input/literature_export/*"
521+
}
518522
}
519523
direction_of_effect: {
520524
var_filter_lof: [

src/main/scala/io/opentargets/etl/backend/evidence/Evidence.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,76 @@ object Evidence extends LazyLogging {
197197
resolved
198198
}
199199

200+
/**
201+
* Resolves publication dates for evidence based on literature identifiers.
202+
*
203+
* This function takes evidence records with literature arrays and matches them
204+
* against a publication date mapping to add publicationDate and evidenceDate columns.
205+
* The evidenceDate uses publicationDate when available, falling back to releaseDate.
206+
*
207+
* @param df the evidence DataFrame containing literature arrays
208+
* @param publication_date_mapping DataFrame with publication dates mapped to identifiers
209+
* @param context the ETL session context containing Spark session
210+
* @return DataFrame with added publicationDate and evidenceDate columns
211+
*/
212+
def resolvePublicationDates(
213+
df: DataFrame,
214+
publication_date_mapping: DataFrame
215+
)(implicit context: ETLSessionContext): DataFrame = {
216+
logger.info("resolve publication dates for evidences based on literature identifiers")
217+
218+
implicit val session: SparkSession = context.sparkSession
219+
220+
// Filter for MED, AGR and pre-prints (PPR) and create temp view called pub_data:
221+
val processedPublicationData = publication_date_mapping
222+
.filter(col("source").isin("MED", "PPR", "AGR"))
223+
.select(
224+
col("firstPublicationDate").alias("publicationDate"),
225+
explode(
226+
expr("filter(array(pmid, id, pmcid), x -> x is not null)")
227+
).alias("publicationId")
228+
)
229+
230+
// val processedDatesLut = broadcast(processedPublicationData.orderBy(col("publicationId").asc))
231+
232+
// Process evidence with literature field
233+
val evidenceWithPubIds = df
234+
.select(col("id"), explode(col("literature")).as("publicationId"))
235+
// Clean publication IDs - trim and uppercase
236+
.withColumn(
237+
"publicationId",
238+
upper(trim(col("publicationId")))
239+
)
240+
241+
// Join with publication dates
242+
val datedEvidence = evidenceWithPubIds
243+
.join(processedPublicationData, Seq("publicationId"), "inner")
244+
// Rank by publication date to find earliest publication for each evidence
245+
.withColumn(
246+
"rank",
247+
row_number().over(
248+
Window.partitionBy("id").orderBy(col("publicationDate").asc)
249+
)
250+
)
251+
// Filter for the first (earliest) publication for each evidence
252+
.filter(col("rank") === 1)
253+
.select("id", "publicationDate")
254+
255+
val datedEvidenceLut = broadcast(datedEvidence.orderBy(col("id").asc))
256+
257+
// Join original evidence with dated evidence and return
258+
df
259+
.join(datedEvidenceLut, Seq("id"), "left_outer")
260+
.withColumn(
261+
"evidenceDate",
262+
coalesce(
263+
col("publicationDate"),
264+
col("releaseDate")
265+
)
266+
)
267+
268+
}
269+
200270
def normaliseDatatypes(df: DataFrame)(implicit context: ETLSessionContext): DataFrame = {
201271
implicit val ss: SparkSession = context.sparkSession
202272
import ss.implicits._
@@ -371,6 +441,7 @@ object Evidence extends LazyLogging {
371441
.transform(score(_, sc))
372442
.transform(checkNullifiedScores(_, sc, ns))
373443
.transform(markDuplicates(_, id, md))
444+
.transform(resolvePublicationDates(_, dfs("literature_dating").data))
374445
.transform(
375446
DirectionOfEffect(_, dfs("targets").data, dfs("mechanism_of_action").data)
376447
)
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package io.opentargets.etl.backend.evidence
2+
3+
4+
5+
import io.opentargets.etl.backend.{Configuration, ETLSessionContext}
6+
import io.opentargets.etl.backend.Configuration.OTConfig
7+
import io.opentargets.etl.backend.spark.{IOResource, IoHelpers}
8+
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
9+
import org.apache.spark.sql.functions._
10+
import org.apache.spark.sql.types._
11+
import org.scalatest.flatspec.AnyFlatSpec
12+
import org.scalatest.matchers.should.Matchers
13+
14+
class EvidenceDatingTest extends AnyFlatSpec with Matchers {
15+
16+
implicit val sparkSession: SparkSession = SparkSession
17+
.builder()
18+
.appName("EvidenceTest")
19+
.master("local[*]")
20+
.config("spark.sql.adaptive.enabled", "false")
21+
.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
22+
.getOrCreate()
23+
24+
// Mock ETLSessionContext for testing
25+
implicit val mockContext: ETLSessionContext = ETLSessionContext(
26+
configuration = null.asInstanceOf[OTConfig], // We'll mock this as needed for specific tests
27+
sparkSession = sparkSession
28+
)
29+
30+
// Shared test data available to all tests
31+
val evidenceSchema = StructType(Array(
32+
StructField("id", StringType, nullable = false),
33+
StructField("releaseDate", StringType, nullable = true),
34+
StructField("literature", ArrayType(StringType), nullable = true)
35+
))
36+
37+
val testEvidenceData = sparkSession.createDataFrame(
38+
sparkSession.sparkContext.parallelize(
39+
Seq(
40+
Row("e1", null, Array.empty[String]), // No dates, empty array instead of null
41+
Row("e2", "2021-02-03", Array.empty[String]), // Release date is given, empty array
42+
Row("e3", "2021-02-03", Array("123", "PMC456")), // Both release date and literature is given
43+
Row("e4", null, Array("123", "PMC456")), // Only literature is given
44+
Row("e5", null, Array("PMC456")) // Only literature but only one source.
45+
)
46+
),
47+
evidenceSchema
48+
)
49+
50+
val literatureMapSchema = StructType(
51+
Array(
52+
StructField("source", StringType, nullable = false),
53+
StructField("firstPublicationDate", StringType, nullable = true),
54+
StructField("pmid", StringType, nullable = true),
55+
StructField("id", StringType, nullable = true),
56+
StructField("pmcid", StringType, nullable = true)
57+
)
58+
)
59+
60+
val testPublicationData = sparkSession.createDataFrame(
61+
sparkSession.sparkContext.parallelize(
62+
Seq(
63+
Row("MED", "2021-06-15", "123", "123", "PMC9936"),
64+
Row("MED", "2021-08-15", null, "PMC456", "PMC456"),
65+
Row("AGR", "2021-07-30", "AGR001", "AGR001", null)
66+
)
67+
),
68+
literatureMapSchema
69+
)
70+
71+
// Apply the function using shared test data
72+
val result = Evidence.resolvePublicationDates(testEvidenceData, testPublicationData)
73+
74+
"resolvePublicationDates" should "return dataframe" in {
75+
76+
// Compile-time type assertion
77+
implicitly[result.type <:< DataFrame]
78+
79+
}
80+
81+
it should "return all evidence" in {
82+
// Test that DataFrame is created successfully
83+
result.count() should be(5)
84+
85+
// Should have all expected columns
86+
val expectedColumns = testEvidenceData.columns
87+
result.columns should contain allElementsOf(expectedColumns)
88+
89+
}
90+
91+
it should "have new `publicationDate` column with the right type" in {
92+
// Test for new column:
93+
result.columns should contain("publicationDate")
94+
95+
// Test column schema:
96+
result.schema("publicationDate").dataType should be(StringType)
97+
result.schema("publicationDate").nullable should be(true)
98+
}
99+
100+
it should "have new `evidenceDate` column with the right type" in {
101+
// Test for new column:
102+
result.columns should contain("evidenceDate")
103+
104+
// Test column schema:
105+
result.schema("evidenceDate").dataType should be(StringType)
106+
result.schema("evidenceDate").nullable should be(true)
107+
}
108+
109+
it should "correctly resolve publication dates for specific evidence" in {
110+
// Test specific evidence records
111+
112+
// e1: No literature, no releaseDate - publicationDate should be null, evidenceDate should be null
113+
val evidence1 = result.filter(col("id") === "e1").collect().head
114+
evidence1.getString(evidence1.fieldIndex("publicationDate")) should be(null)
115+
evidence1.getString(evidence1.fieldIndex("evidenceDate")) should be(null)
116+
117+
// e2: No literature, has releaseDate - publicationDate should be null, evidenceDate should be releaseDate
118+
val evidence2 = result.filter(col("id") === "e2").collect().head
119+
evidence2.getString(evidence2.fieldIndex("publicationDate")) should be(null)
120+
evidence2.getString(evidence2.fieldIndex("evidenceDate")) should be("2021-02-03")
121+
122+
// e3: Has literature that can be resolved - publicationDate should be from literature, evidenceDate should prioritize publication date
123+
val evidence3 = result.filter(col("id") === "e3").collect().head
124+
evidence3.getString(evidence3.fieldIndex("publicationDate")) should be("2021-06-15")
125+
evidence3.getString(evidence3.fieldIndex("evidenceDate")) should be("2021-06-15")
126+
127+
// e4: Has literature that can be resolved - publicationDate should be from literature, evidenceDate should prioritize publication date
128+
val evidence4 = result.filter(col("id") === "e4").collect().head
129+
evidence4.getString(evidence4.fieldIndex("publicationDate")) should be("2021-06-15")
130+
evidence4.getString(evidence4.fieldIndex("evidenceDate")) should be("2021-06-15")
131+
132+
// e5: Has literature that can be resolved - publicationDate should be from literature, which is an older date
133+
val evidence5 = result.filter(col("id") === "e5").collect().head
134+
evidence5.getString(evidence5.fieldIndex("publicationDate")) should be("2021-08-15")
135+
evidence5.getString(evidence5.fieldIndex("evidenceDate")) should be("2021-08-15")
136+
137+
}
138+
}

0 commit comments

Comments
 (0)