Skip to content

Commit c2d754d

Browse files
Youcef Sebiatysebiat
authored andcommitted
CNAM-410: Add start delay for LimitedExposurePeriodAdder strategy
1 parent 9a839f0 commit c2d754d

File tree

3 files changed

+56
-11
lines changed

3 files changed

+56
-11
lines changed

src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/ExposuresTransformer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ExposuresTransformer(config: ExposuresTransformerConfig)
5656
when(col(FollowUpEnd) < col(ExposureEnd), col(FollowUpEnd)).otherwise(col(ExposureEnd))
5757
).drop(ExposureEnd) // This makes sure that all the exposures end at the followup end date
5858
.withColumnRenamed("Correct_Exposure_End", ExposureEnd)
59+
.where(col(ExposureStart) <= col(ExposureEnd))
5960
.aggregateWeight(
6061
cumulativeExposureWindow,
6162
cumulativeStartThreshold,

src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdder.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod
2222
* A. The first DrugPurchase defines a new Exposure.
2323
* B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current
2424
* Exposure with the DrugPurchase.
25-
* C. Else, close and set the end of the Exposure as the reach of the current and create a new Exposure with the
26-
* DrugPurchase as the new Exposure.
25+
* C. Else, close and set the end of the Exposure as the reach of the latest Drug Purchase and create a new
26+
* Exposure with the next DrugPurchase as the new Exposure.
2727
* This strategy is suited for short term effects.
2828
* !!! WARNING: THIS ONLY RETURNS EXPOSURES.
2929
*
3030
* @param minPurchases : Not used.
31-
* @param startDelay : Not used.
31+
* @param startDelay : period to be added to the start of each DrugPurchase.
3232
* @param purchasesWindow : Not used.
3333
* @param endThresholdGc : the period that defines the reach for Grand Condtionnement.
3434
* @param endThresholdNgc : the period that defines the reach for Non Grand Condtionnement.
@@ -37,7 +37,7 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod
3737
*/
3838
def withStartEnd(
3939
minPurchases: Int = 2,
40-
startDelay: Period = 3.months,
40+
startDelay: Period = 5.days,
4141
purchasesWindow: Period = 4.months,
4242
endThresholdGc: Option[Period] = Some(120.days),
4343
endThresholdNgc: Option[Period] = Some(40.days),
@@ -46,11 +46,19 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod
4646

4747
val outputColumns = (data.columns.toList ++ List(ExposureStart, ExposureEnd)).map(col)
4848

49-
val firstLastPurchase = getFirstAndLastPurchase(data, endThresholdGc.get, endThresholdNgc.get, endDelay.get)
49+
val delayedDrugPurchases = delayStart(data, startDelay)
50+
51+
val firstLastPurchase = getFirstAndLastPurchase(delayedDrugPurchases, endThresholdGc.get, endThresholdNgc.get, endDelay.get)
5052

5153
toExposure(firstLastPurchase).select(outputColumns: _*)
5254
}
5355

56+
def delayStart(data: DataFrame, startDelay: Period): DataFrame = {
57+
data.withColumn("NewStart", col("start").addPeriod(startDelay))
58+
.drop(col("start"))
59+
.withColumnRenamed("NewStart", "start")
60+
}
61+
5462
def toExposure(firstLastPurchase: DataFrame): DataFrame = {
5563
val condition = (col("Status") === "first"
5664
&& lead(col("Status"), 1).over(orderedWindow) === "last")

src/test/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdderSuite.scala

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,42 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
1212
// instance created from a mock DataFrame, to allow testing the InnerImplicits implicit class
1313
private val mockInstance = new LimitedExposurePeriodAdder(mock(classOf[DataFrame]))
1414

15+
16+
"delayStart" should "delay the start of events by the given period" in {
17+
val sqlCtx = sqlContext
18+
import sqlCtx.implicits._
19+
20+
// Given
21+
val input = Seq(
22+
("A", "P", makeTS(2008, 1, 1), 1.0),
23+
("A", "P", makeTS(2008, 2, 1), 1.0),
24+
("A", "P", makeTS(2008, 5, 1), 1.0),
25+
("A", "P", makeTS(2008, 6, 1), 1.0),
26+
("A", "P", makeTS(2008, 7, 1), 1.0),
27+
("A", "P", makeTS(2009, 1, 1), 1.0),
28+
("A", "P", makeTS(2009, 7, 1), 1.0),
29+
("A", "P", makeTS(2009, 8, 1), 1.0),
30+
("A", "S", makeTS(2008, 2, 1), 1.0)
31+
).toDF(PatientID, Value, Start, Weight)
32+
33+
34+
val expected = Seq(
35+
("A", "P", makeTS(2008, 1, 11), 1.0),
36+
("A", "P", makeTS(2008, 2, 11), 1.0),
37+
("A", "P", makeTS(2008, 5, 11), 1.0),
38+
("A", "P", makeTS(2008, 6, 11), 1.0),
39+
("A", "P", makeTS(2008, 7, 11), 1.0),
40+
("A", "P", makeTS(2009, 1, 11), 1.0),
41+
("A", "P", makeTS(2009, 7, 11), 1.0),
42+
("A", "P", makeTS(2009, 8, 11), 1.0),
43+
("A", "S", makeTS(2008, 2, 11), 1.0)
44+
).toDF(PatientID, Value, Start, Weight).select(PatientID, Value, Weight, Start)
45+
46+
val result = mockInstance.delayStart(input, 10.days)
47+
48+
assertDFs(expected, result)
49+
}
50+
1551
"getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in {
1652

1753
val sqlCtx = sqlContext
@@ -93,15 +129,15 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
93129
).toDF(PatientID, Value, Start, Weight)
94130

95131
val expected = Seq(
96-
("A", "P", makeTS(2008, 1, 1), 1.0, makeTS(2008, 1, 1), makeTS(2008, 3, 11)),
97-
("A", "P", makeTS(2008, 5, 1), 1.0, makeTS(2008, 5, 1), makeTS(2008, 8, 11)),
98-
("A", "P", makeTS(2009, 1, 1), 1.0, makeTS(2009, 1, 1), makeTS(2009, 2, 11)),
99-
("A", "P", makeTS(2009, 7, 1), 1.0, makeTS(2009, 7, 1), makeTS(2009, 9, 11)),
100-
("A", "S", makeTS(2008, 2, 1), 1.0, makeTS(2008, 2, 1), makeTS(2008, 3, 11))
132+
("A", "P", makeTS(2008, 1, 6), 1.0, makeTS(2008, 1, 6), makeTS(2008, 3, 16)),
133+
("A", "P", makeTS(2008, 5, 6), 1.0, makeTS(2008, 5, 6), makeTS(2008, 8, 16)),
134+
("A", "P", makeTS(2009, 1, 6), 1.0, makeTS(2009, 1, 6), makeTS(2009, 2, 16)),
135+
("A", "P", makeTS(2009, 7, 6), 1.0, makeTS(2009, 7, 6), makeTS(2009, 9, 16)),
136+
("A", "S", makeTS(2008, 2, 6), 1.0, makeTS(2008, 2, 6), makeTS(2008, 3, 16))
101137
).toDF(PatientID, Value, Start, Weight, ExposureStart, ExposureEnd)
102138

103139
val result = new LimitedExposurePeriodAdder(input)
104-
.withStartEnd(0, 0.months, 0.months, Some(1.months), Some(3.months), Some(10.days))
140+
.withStartEnd(0, 5.days, 0.months, Some(1.months), Some(3.months), Some(10.days))
105141

106142
assertDFs(expected, result)
107143
}

0 commit comments

Comments
 (0)