Skip to content

Commit 151d597

Browse files
author
Youcef Sebiat
authored
Merge pull request #200 from X-DataInitiative/CNAM-410-Start-Delay-LimitedExposure
CNAM-410: Add start delay for LimitedExposurePeriodAdder strategy
2 parents 9a839f0 + 0e8216c commit 151d597

File tree

3 files changed

+81
-31
lines changed

3 files changed

+81
-31
lines changed

src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/ExposuresTransformer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ExposuresTransformer(config: ExposuresTransformerConfig)
5656
when(col(FollowUpEnd) < col(ExposureEnd), col(FollowUpEnd)).otherwise(col(ExposureEnd))
5757
).drop(ExposureEnd) // This makes sure that all the exposures end at the followup end date
5858
.withColumnRenamed("Correct_Exposure_End", ExposureEnd)
59+
.where(col(ExposureStart) <= col(ExposureEnd))
5960
.aggregateWeight(
6061
cumulativeExposureWindow,
6162
cumulativeStartThreshold,

src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdder.scala

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,29 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod
1515
private val orderedWindow = window.orderBy(col(Start))
1616

1717
/** *
18-
* This strategy works as the following:
19-
* 1. Each DrugPurchase will have a corresponding Exposure.
20-
* 2. Each Exposure has one or multiple DrugPurchases.
21-
* 3. An Exposure is defined recursively as follows:
22-
* A. The first DrugPurchase defines a new Exposure.
23-
* B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current
24-
* Exposure with the DrugPurchase.
25-
* C. Else, close and set the end of the Exposure as the reach of the current and create a new Exposure with the
26-
* DrugPurchase as the new Exposure.
27-
* This strategy is suited for short term effects.
28-
* !!! WARNING: THIS ONLY RETURNS EXPOSURES.
29-
*
30-
* @param minPurchases : Not used.
31-
* @param startDelay : Not used.
32-
* @param purchasesWindow : Not used.
33-
* @param endThresholdGc : the period that defines the reach for Grand Condtionnement.
34-
* @param endThresholdNgc : the period that defines the reach for Non Grand Condtionnement.
35-
* @param endDelay : period added to the end of an exposure.
36-
* @return: A DataFrame of Exposures.
37-
*/
18+
* This strategy works as the following:
19+
* 1. Each DrugPurchase will have a corresponding Exposure.
20+
* 2. Each Exposure has one or multiple DrugPurchases.
21+
* 3. An Exposure is defined recursively as follows:
22+
* A. The first DrugPurchase defines a new Exposure.
23+
* B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current
24+
* Exposure with the DrugPurchase.
25+
* C. Else, close and set the end of the Exposure as the reach of the latest Drug Purchase and create a new
26+
* Exposure with the next DrugPurchase as the new Exposure.
27+
* This strategy is suited for short term effects.
28+
* !!! WARNING: THIS ONLY RETURNS EXPOSURES.
29+
*
30+
* @param minPurchases : Not used.
31+
* @param startDelay : period to be added to delay the start of each DrugPurchase.
32+
* @param purchasesWindow : Not used.
33+
* @param endThresholdGc : the period that defines the reach for Grand Conditionnement.
34+
* @param endThresholdNgc : the period that defines the reach for Non Grand Conditionnement.
35+
* @param endDelay : period added to the end of an exposure.
36+
* @return: A DataFrame of Exposures.
37+
*/
3838
def withStartEnd(
3939
minPurchases: Int = 2,
40-
startDelay: Period = 3.months,
40+
startDelay: Period = 5.days,
4141
purchasesWindow: Period = 4.months,
4242
endThresholdGc: Option[Period] = Some(120.days),
4343
endThresholdNgc: Option[Period] = Some(40.days),
@@ -46,11 +46,24 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod
4646

4747
val outputColumns = (data.columns.toList ++ List(ExposureStart, ExposureEnd)).map(col)
4848

49-
val firstLastPurchase = getFirstAndLastPurchase(data, endThresholdGc.get, endThresholdNgc.get, endDelay.get)
49+
val delayedDrugPurchases = delayStart(data, startDelay)
50+
51+
val firstLastPurchase = getFirstAndLastPurchase(
52+
delayedDrugPurchases,
53+
endThresholdGc.get,
54+
endThresholdNgc.get,
55+
endDelay.get
56+
)
5057

5158
toExposure(firstLastPurchase).select(outputColumns: _*)
5259
}
5360

61+
def delayStart(data: DataFrame, startDelay: Period): DataFrame = {
62+
data.withColumn("NewStart", col("start").addPeriod(startDelay))
63+
.drop(col("start"))
64+
.withColumnRenamed("NewStart", "start")
65+
}
66+
5467
def toExposure(firstLastPurchase: DataFrame): DataFrame = {
5568
val condition = (col("Status") === "first"
5669
&& lead(col("Status"), 1).over(orderedWindow) === "last")

src/test/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdderSuite.scala

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
1212
// instance created from a mock DataFrame, to allow testing the InnerImplicits implicit class
1313
private val mockInstance = new LimitedExposurePeriodAdder(mock(classOf[DataFrame]))
1414

15-
"getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in {
1615

16+
"delayStart" should "delay the start of events by the given period" in {
1717
val sqlCtx = sqlContext
1818
import sqlCtx.implicits._
1919

@@ -30,15 +30,51 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
3030
("A", "S", makeTS(2008, 2, 1), 1.0)
3131
).toDF(PatientID, Value, Start, Weight)
3232

33+
34+
val expected = Seq(
35+
("A", "P", makeTS(2008, 1, 11), 1.0),
36+
("A", "P", makeTS(2008, 2, 11), 1.0),
37+
("A", "P", makeTS(2008, 5, 11), 1.0),
38+
("A", "P", makeTS(2008, 6, 11), 1.0),
39+
("A", "P", makeTS(2008, 7, 11), 1.0),
40+
("A", "P", makeTS(2009, 1, 11), 1.0),
41+
("A", "P", makeTS(2009, 7, 11), 1.0),
42+
("A", "P", makeTS(2009, 8, 11), 1.0),
43+
("A", "S", makeTS(2008, 2, 11), 1.0)
44+
).toDF(PatientID, Value, Start, Weight).select(PatientID, Value, Weight, Start)
45+
46+
val result = mockInstance.delayStart(input, 10.days)
47+
48+
assertDFs(expected, result)
49+
}
50+
51+
"getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in {
52+
53+
val sqlCtx = sqlContext
54+
import sqlCtx.implicits._
55+
56+
// Given
57+
val input = Seq(
58+
("A", "P", makeTS(2008, 1, 1), 2.0),
59+
("A", "P", makeTS(2008, 2, 1), 1.0),
60+
("A", "P", makeTS(2008, 5, 1), 1.0),
61+
("A", "P", makeTS(2008, 6, 1), 1.0),
62+
("A", "P", makeTS(2008, 7, 1), 1.0),
63+
("A", "P", makeTS(2009, 1, 1), 1.0),
64+
("A", "P", makeTS(2009, 7, 1), 1.0),
65+
("A", "P", makeTS(2009, 8, 1), 1.0),
66+
("A", "S", makeTS(2008, 2, 1), 2.0)
67+
).toDF(PatientID, Value, Start, Weight)
68+
3369
val expected = Seq(
34-
("A", "P", makeTS(2008, 1, 1), 1.0, "first", makeTS(2008, 2, 11)),
70+
("A", "P", makeTS(2008, 1, 1), 2.0, "first", makeTS(2008, 4, 11)),
3571
("A", "P", makeTS(2008, 2, 1), 1.0, "last", makeTS(2008, 3, 11)),
3672
("A", "P", makeTS(2008, 5, 1), 1.0, "first", makeTS(2008, 6, 11)),
3773
("A", "P", makeTS(2008, 7, 1), 1.0, "last", makeTS(2008, 8, 11)),
3874
("A", "P", makeTS(2009, 1, 1), 1.0, "first", makeTS(2009, 2, 11)),
3975
("A", "P", makeTS(2009, 7, 1), 1.0, "first", makeTS(2009, 8, 11)),
4076
("A", "P", makeTS(2009, 8, 1), 1.0, "last", makeTS(2009, 9, 11)),
41-
("A", "S", makeTS(2008, 2, 1), 1.0, "first", makeTS(2008, 3, 11))
77+
("A", "S", makeTS(2008, 2, 1), 2.0, "first", makeTS(2008, 5, 11))
4278
).toDF(PatientID, Value, Start, Weight, "Status", "purchaseReach")
4379

4480
val result = mockInstance.getFirstAndLastPurchase(input, 1.months, 3.month, 10.days)
@@ -93,15 +129,15 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
93129
).toDF(PatientID, Value, Start, Weight)
94130

95131
val expected = Seq(
96-
("A", "P", makeTS(2008, 1, 1), 1.0, makeTS(2008, 1, 1), makeTS(2008, 3, 11)),
97-
("A", "P", makeTS(2008, 5, 1), 1.0, makeTS(2008, 5, 1), makeTS(2008, 8, 11)),
98-
("A", "P", makeTS(2009, 1, 1), 1.0, makeTS(2009, 1, 1), makeTS(2009, 2, 11)),
99-
("A", "P", makeTS(2009, 7, 1), 1.0, makeTS(2009, 7, 1), makeTS(2009, 9, 11)),
100-
("A", "S", makeTS(2008, 2, 1), 1.0, makeTS(2008, 2, 1), makeTS(2008, 3, 11))
132+
("A", "P", makeTS(2008, 1, 6), 1.0, makeTS(2008, 1, 6), makeTS(2008, 3, 16)),
133+
("A", "P", makeTS(2008, 5, 6), 1.0, makeTS(2008, 5, 6), makeTS(2008, 8, 16)),
134+
("A", "P", makeTS(2009, 1, 6), 1.0, makeTS(2009, 1, 6), makeTS(2009, 2, 16)),
135+
("A", "P", makeTS(2009, 7, 6), 1.0, makeTS(2009, 7, 6), makeTS(2009, 9, 16)),
136+
("A", "S", makeTS(2008, 2, 6), 1.0, makeTS(2008, 2, 6), makeTS(2008, 3, 16))
101137
).toDF(PatientID, Value, Start, Weight, ExposureStart, ExposureEnd)
102138

103139
val result = new LimitedExposurePeriodAdder(input)
104-
.withStartEnd(0, 0.months, 0.months, Some(1.months), Some(3.months), Some(10.days))
140+
.withStartEnd(0, 5.days, 0.months, Some(1.months), Some(3.months), Some(10.days))
105141

106142
assertDFs(expected, result)
107143
}

0 commit comments

Comments
 (0)