Skip to content

Commit b062b06

Browse files
committed
CNAM-164 Fixed purchase-based cumulative exposures
Now it discretizes the number of purchases CNAM-164 Removed untested file
1 parent 582d008 commit b062b06

File tree

14 files changed

+92
-305
lines changed

14 files changed

+92
-305
lines changed

src/main/resources/config/filtering-default.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ default = {
5757
window = 1 #Number of months to quantile.
5858
start_threshold = 6 #Number of months within which more than one purchases have to made
5959
end_threshold = 4 #Number of months during which no purchases of the particular molecule have to be made
60-
dosage_level_intervals = [0,100,200,300,400,500] #list of consumption levels in mg / put only 0 when we want all the weights to 1
60+
dosage_level_intervals = [0, 100, 200, 300, 400, 500] #list of consumption levels in mg / put only 0 when we want all the weights to 1
61+
purchase_intervals = [0, 3, 5] #list of consumption levels in mg / put only 0 when we want all the weights to 1
6162
}
6263
}
6364

src/main/scala/fr/polytechnique/cmap/cnam/filtering/FilteringConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ object FilteringConfig {
154154
cumulativeExposureWindow = conf.getInt("exposures.cumulative.window"),
155155
cumulativeStartThreshold = conf.getInt("exposures.cumulative.start_threshold"),
156156
cumulativeEndThreshold = conf.getInt("exposures.cumulative.end_threshold"),
157-
dosageLevelIntervals = conf.getIntList("exposures.cumulative.dosage_level_intervals").asScala.toList.map(x => x.toInt)
157+
dosageLevelIntervals = conf.getIntList("exposures.cumulative.dosage_level_intervals").asScala.map(_.toInt).toList,
158+
purchaseIntervals = conf.getIntList("exposures.cumulative.purchase_intervals").asScala.map(_.toInt).toList
158159
)
159160
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/NewCoxMain.scala

Lines changed: 0 additions & 102 deletions
This file was deleted.

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/DosageBasedWeightAgg.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ class DosageBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data) {
1616
val window = Window.partitionBy("patientID", "eventId")
1717
val finalWindow = Window.partitionBy("patientID", "eventId", "weight")
1818

19-
val getLevel = udf{(Quantity:Double) => dosageLevelIntervals.filter(x => x <= Quantity).size}
19+
val getLevel = udf {
20+
(weight: Double) => dosageLevelIntervals.count(_ <= weight).toDouble
21+
}
2022

2123
data
2224
.withColumn("exposureStart", col("start")) // temporary (todo: "first-only" feature in unlimitedPeriodAdder)
@@ -29,7 +31,8 @@ class DosageBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data) {
2931
cumWindow: Option[Int] = None,
3032
cumStartThreshold: Option[Int] = None,
3133
cumEndThreshold: Option[Int] = None,
32-
dosageLevelIntervals: Option[List[Int]]): DataFrame = {
34+
dosageLevelIntervals: Option[List[Int]],
35+
purchaseIntervals: Option[List[Int]]): DataFrame = {
3336

3437
aggregateWeightImpl(dosageLevelIntervals.get)
3538
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/ExposuresConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ case class ExposuresConfig(
1616
cumulativeExposureWindow: Int,
1717
cumulativeStartThreshold: Int,
1818
cumulativeEndThreshold: Int,
19-
dosageLevelIntervals: List[Int])
19+
dosageLevelIntervals: List[Int],
20+
purchaseIntervals: List[Int])
2021

2122
object ExposuresConfig {
2223
// todo: Remove filters from ExposuresConfig pipeline

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/ExposuresTransformer.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ class ExposuresTransformer(config: ExposuresConfig)
3737
.where(col("category") === "molecule")
3838
.withStartEnd(config.minPurchases,config.startDelay,config.purchasesWindow)
3939
.where(col("exposureStart") !== col("exposureEnd")) // This also removes rows where exposureStart = null
40-
.aggregateWeight(Some(config.studyStart), Some(config.cumulativeExposureWindow), Some(config.cumulativeStartThreshold), Some(config.cumulativeEndThreshold), Some(config.dosageLevelIntervals))
40+
.aggregateWeight(
41+
Some(config.studyStart),
42+
Some(config.cumulativeExposureWindow),
43+
Some(config.cumulativeStartThreshold),
44+
Some(config.cumulativeEndThreshold),
45+
Some(config.dosageLevelIntervals),
46+
Some(config.purchaseIntervals)
47+
)
4148
.dropDuplicates(Seq("patientID", "eventID", "exposureStart", "exposureEnd", "weight"))
4249
.select(outputColumns: _*)
4350
.as[FlatEvent]

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/NonCumulativeWeightAgg.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ class NonCumulativeWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data)
1313
cumWindow: Option[Int],
1414
cumStartThreshold: Option[Int],
1515
cumEndThreshold: Option[Int],
16-
dosageLevelIntervals: Option[List[Int]]): DataFrame = this.aggregateWeightImpl
16+
dosageLevelIntervals: Option[List[Int]],
17+
purchaseIntervals: Option[List[Int]]): DataFrame = this.aggregateWeightImpl
1718

18-
def aggregateWeight: DataFrame = aggregateWeight(None, None, None, None, None)
19+
def aggregateWeight: DataFrame = aggregateWeight(None, None, None, None, None, None)
1920
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/PurchaseBasedWeightAgg.scala

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,40 +10,31 @@ import fr.polytechnique.cmap.cnam.utilities.functions.makeTS
1010

1111
class PurchaseBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data) {
1212

13-
private def aggregateWeightImpl(studyStart: Timestamp, cumWindow: Int): DataFrame = {
13+
private def aggregateWeightImpl(purchaseIntervals: List[Int]): DataFrame = {
1414

15-
val outputColumns: Seq[Column] = data.columns.map(col) :+ col("weight")
1615
val window = Window.partitionBy("patientID", "eventId")
17-
val windowCumulativeExposure = window.partitionBy("patientID", "eventId", "exposureStart")
16+
val finalWindow = Window.partitionBy("patientID", "eventId", "weight")
1817

19-
def normalizeStartMonth(start: Column) = {
20-
floor(months_between(start, lit(studyStart)) / cumWindow).cast(IntegerType)
18+
val getLevel = udf {
19+
(weight: Double) => purchaseIntervals.count(_ <= weight).toDouble
2120
}
2221

23-
val normalizeStart = udf(
24-
(normalizedMonth: Int) => {
25-
val cal: Calendar = Calendar.getInstance()
26-
cal.setTime(studyStart)
27-
cal.add(Calendar.MONTH, normalizedMonth * cumWindow)
28-
makeTS(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1, 1)
29-
}
30-
)
31-
3222
data
33-
.withColumn("normalizedMonth", normalizeStartMonth(col("start")))
34-
.withColumn("exposureStart", normalizeStart(col("normalizedMonth")))
35-
.withColumn("weight", row_number().over(window.orderBy("start")))
36-
.withColumn("weight", max("weight").over(windowCumulativeExposure).cast(DoubleType))
37-
.select(outputColumns: _*)
23+
.withColumn("exposureStart", col("start")) // temporary (todo: "first-only" feature in unlimitedPeriodAdder)
24+
.withColumn("weight", (sum(lit(1.0)).over(window.orderBy("exposureStart"))))
25+
.withColumn("weight", getLevel(col("weight")))
26+
.withColumn("exposureStart", min("exposureStart").over(finalWindow))
3827
}
3928

4029
def aggregateWeight(
41-
studyStart: Option[Timestamp],
42-
cumWindow: Option[Int],
43-
cumStartThreshold: Option[Int] = None,
44-
cumEndThreshold: Option[Int] = None,
45-
dosageLevelIntervals: Option[List[Int]]= None): DataFrame = {
46-
47-
this.aggregateWeightImpl(studyStart.get, cumWindow.get)
30+
studyStart: Option[Timestamp],
31+
cumWindow: Option[Int],
32+
cumStartThreshold: Option[Int] = None,
33+
cumEndThreshold: Option[Int] = None,
34+
dosageLevelIntervals: Option[List[Int]]= None,
35+
purchaseIntervals: Option[List[Int]]): DataFrame = {
36+
37+
this.aggregateWeightImpl(purchaseIntervals.get)
4838
}
4939
}
40+

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/TimeBasedWeightAgg.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ class TimeBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data) {
1212
cumWindow: Option[Int],
1313
cumStartThreshold: Option[Int],
1414
cumEndThreshold: Option[Int],
15-
dosageLevelIntervals: Option[List[Int]]): DataFrame = {
15+
dosageLevelIntervals: Option[List[Int]],
16+
purchaseIntervals: Option[List[Int]]): DataFrame = {
1617

1718
val window = Window.partitionBy("patientID", "eventId").orderBy("exposureStart", "exposureEnd")
1819
data
@@ -22,5 +23,5 @@ class TimeBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data) {
2223
.withColumn("exposureEnd", col("followUpEnd"))
2324
}
2425

25-
def aggregateWeight: DataFrame = aggregateWeight(None, None, None, None, None)
26+
def aggregateWeight: DataFrame = aggregateWeight(None, None, None, None, None, None)
2627
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/WeightAggregator.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ trait WeightAggregator {
1919

2020
abstract class WeightAggregatorImpl(data: DataFrame) {
2121

22+
// todo: refactor the parametrization (maybe passing a single config object).
23+
// The current approach is not maintainable nor scalable
2224
def aggregateWeight(
23-
studyStart: Option[Timestamp],
24-
cumWindow: Option[Int],
25-
cumStartThreshold: Option[Int],
26-
cumEndThreshold: Option[Int],
27-
dosageLevelIntervals: Option[List[Int]]): DataFrame
25+
studyStart: Option[Timestamp] = None,
26+
cumWindow: Option[Int] = None,
27+
cumStartThreshold: Option[Int] = None,
28+
cumEndThreshold: Option[Int] = None,
29+
dosageLevelIntervals: Option[List[Int]] = None,
30+
purchaseIntervals: Option[List[Int]] = None): DataFrame
2831
}

0 commit comments

Comments
 (0)