Skip to content

Commit 582d008

Browse files
committed
CNAM-164 Added dosage-based cumulative exposures
CNAM-164 Fixes for running at CNAM CNAM-164 Added option to reuse flatEvents from a previous run CNAM-164 Fixed tests CNAM-164 Updated DosageBased aggregation strategy
1 parent 657c035 commit 582d008

17 files changed

+285
-26
lines changed

src/main/resources/config/filtering-default.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ default = {
22
env_name = "default"
33
drug_categories = ["A10"]
44
cancer_definition = "broad"
5+
# reuse_flat_events_path = /path/to/flat-events/
56
disease_code = "C67"
67
mco_death_code = 9
78
limits = {
@@ -56,6 +57,7 @@ default = {
5657
window = 1 #Number of months to quantile.
5758
start_threshold = 6 #Number of months within which more than one purchases have to made
5859
end_threshold = 4 #Number of months during which no purchases of the particular molecule have to be made
60+
dosage_level_intervals = [0,100,200,300,400,500] #list of consumption levels in mg / put only 0 when we want all the weights to 1
5961
}
6062
}
6163

src/main/scala/fr/polytechnique/cmap/cnam/Main.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import org.apache.log4j.{Level, Logger}
55
import org.apache.spark.sql.Dataset
66
import org.apache.spark.sql.hive.HiveContext
77
import org.apache.spark.{SparkConf, SparkContext}
8-
import fr.polytechnique.cmap.cnam.flattening.FlatteningMain._
98

109
trait Main {
1110

src/main/scala/fr/polytechnique/cmap/cnam/filtering/FilteringConfig.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fr.polytechnique.cmap.cnam.filtering
22

33
import java.sql.Timestamp
44
import scala.collection.JavaConverters._
5+
import scala.util.Try
56
import org.apache.spark.SparkContext
67
import org.apache.spark.sql.SQLContext
78
import com.typesafe.config.{Config, ConfigFactory}
@@ -92,7 +93,7 @@ object FilteringConfig {
9293
threshold: Int,
9394
delay: Int
9495
)
95-
96+
lazy val reuseFlatEventsPath: Option[String] = Try(conf.getString("reuse_flat_events_path")).toOption
9697
lazy val drugCategories: List[String] = conf.getStringList("drug_categories").asScala.toList
9798
lazy val cancerDefinition: String = conf.getString("cancer_definition")
9899
lazy val diseaseCode: String = conf.getString("disease_code")
@@ -152,6 +153,7 @@ object FilteringConfig {
152153
filterDelayedPatients = conf.getBoolean("filters.delayed_entries"),
153154
cumulativeExposureWindow = conf.getInt("exposures.cumulative.window"),
154155
cumulativeStartThreshold = conf.getInt("exposures.cumulative.start_threshold"),
155-
cumulativeEndThreshold = conf.getInt("exposures.cumulative.end_threshold")
156+
cumulativeEndThreshold = conf.getInt("exposures.cumulative.end_threshold"),
157+
dosageLevelIntervals = conf.getIntList("exposures.cumulative.dosage_level_intervals").asScala.toList.map(x => x.toInt)
156158
)
157159
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/FilteringMain.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.spark.sql.Dataset
44
import org.apache.spark.sql.functions._
55
import org.apache.spark.sql.hive.HiveContext
66
import fr.polytechnique.cmap.cnam.Main
7+
import fr.polytechnique.cmap.cnam.filtering.FilteringConfig.{InputPaths, OutputPaths}
78
import fr.polytechnique.cmap.cnam.utilities.functions._
89

910
object FilteringMain extends Main {
@@ -30,10 +31,20 @@ object FilteringMain extends Main {
3031
FilteringConfig.init()
3132
*/
3233

33-
val inputPaths = FilteringConfig.inputPaths
34-
val outputPaths = FilteringConfig.outputPaths
35-
val cancerDefinition = FilteringConfig.cancerDefinition
36-
val upperBoundQuantityIrpha = FilteringConfig.limits.maxQuantityIrpha
34+
val reuseFlatEventsPath: Option[String] = FilteringConfig.reuseFlatEventsPath
35+
36+
if (reuseFlatEventsPath.isDefined) {
37+
val flatEventsPath = reuseFlatEventsPath.get
38+
logger.info(s"Reusing flatEvents from $flatEventsPath")
39+
val flatEvents = sqlContext.read.parquet(flatEventsPath).as[FlatEvent]
40+
return Some(flatEvents)
41+
}
42+
43+
val inputPaths: InputPaths = FilteringConfig.inputPaths
44+
val outputPaths: OutputPaths = FilteringConfig.outputPaths
45+
val cancerDefinition: String = FilteringConfig.cancerDefinition
46+
val upperBoundQuantityIrpha: Int = FilteringConfig.limits.maxQuantityIrpha
47+
3748
logger.info(s"Running for the $cancerDefinition cancer definition")
3849

3950
logger.info("(Lazy) Extracting sources...")

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxExposuresTransformer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ object CoxExposuresTransformer extends ExposuresTransformer {
105105
.withColumn("weight", lit(1.0))
106106
}
107107

108-
def withPurchaseBasedCumulativeExposure(cumulativePeriod: Integer): DataFrame = {
108+
def withPurchaseBasedCumulativeExposure(cumulativePeriod: Int): DataFrame = {
109109
val window = Window.partitionBy("patientID", "eventId")
110110
val windowCumulativeExposure = window.partitionBy("patientID", "eventId", "exposureStart")
111111

@@ -114,7 +114,7 @@ object CoxExposuresTransformer extends ExposuresTransformer {
114114
}
115115

116116
val normalizedExposureDate = udf(
117-
(normalizedMonth: Integer) => {
117+
(normalizedMonth: Int) => {
118118
val cal: Calendar = Calendar.getInstance()
119119
cal.setTime(StudyStart)
120120
cal.add(Calendar.MONTH, normalizedMonth * cumulativePeriod)
Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,36 @@
11
package fr.polytechnique.cmap.cnam.filtering.exposures
22

33
import java.sql.Timestamp
4-
import org.apache.spark.sql.DataFrame
4+
5+
import fr.polytechnique.cmap.cnam.filtering.cox.CoxConfig.CoxExposureDefinition
6+
import org.apache.spark.sql.{Column, DataFrame}
7+
import org.apache.spark.sql.expressions.Window
58
import org.apache.spark.sql.functions._
69

710
class DosageBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data) {
811

12+
val minPurchases = 5
13+
14+
private def aggregateWeightImpl(dosageLevelIntervals: List[Int]): DataFrame = {
15+
16+
val window = Window.partitionBy("patientID", "eventId")
17+
val finalWindow = Window.partitionBy("patientID", "eventId", "weight")
18+
19+
val getLevel = udf{(Quantity:Double) => dosageLevelIntervals.filter(x => x <= Quantity).size}
20+
21+
data
22+
.withColumn("exposureStart", col("start")) // temporary (todo: "first-only" feature in unlimitedPeriodAdder)
23+
.withColumn("weight", (sum("weight").over(window.orderBy("exposureStart"))))
24+
.withColumn("weight", getLevel(col("weight")))
25+
.withColumn("exposureStart", min("exposureStart").over(finalWindow))
26+
}
927
def aggregateWeight(
10-
studyStart: Option[Timestamp],
11-
cumWindow: Option[Int],
12-
cumStartThreshold: Option[Int],
13-
cumEndThreshold: Option[Int]): DataFrame = {
28+
studyStart: Option[Timestamp] = None,
29+
cumWindow: Option[Int] = None,
30+
cumStartThreshold: Option[Int] = None,
31+
cumEndThreshold: Option[Int] = None,
32+
dosageLevelIntervals: Option[List[Int]]): DataFrame = {
1433

15-
data.withColumn("weight", lit("dosage-based cumulative weight"))
34+
aggregateWeightImpl(dosageLevelIntervals.get)
1635
}
1736
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/ExposuresConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ case class ExposuresConfig(
1515
filterDelayedPatients: Boolean,
1616
cumulativeExposureWindow: Int,
1717
cumulativeStartThreshold: Int,
18-
cumulativeEndThreshold: Int)
18+
cumulativeEndThreshold: Int,
19+
dosageLevelIntervals: List[Int])
1920

2021
object ExposuresConfig {
2122
// todo: Remove filters from ExposuresConfig pipeline

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/ExposuresTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ExposuresTransformer(config: ExposuresConfig)
3737
.where(col("category") === "molecule")
3838
.withStartEnd(config.minPurchases,config.startDelay,config.purchasesWindow)
3939
.where(col("exposureStart") !== col("exposureEnd")) // This also removes rows where exposureStart = null
40-
.aggregateWeight(Some(config.studyStart), Some(config.cumulativeExposureWindow), Some(config.cumulativeStartThreshold), Some(config.cumulativeEndThreshold))
40+
.aggregateWeight(Some(config.studyStart), Some(config.cumulativeExposureWindow), Some(config.cumulativeStartThreshold), Some(config.cumulativeEndThreshold), Some(config.dosageLevelIntervals))
4141
.dropDuplicates(Seq("patientID", "eventID", "exposureStart", "exposureEnd", "weight"))
4242
.select(outputColumns: _*)
4343
.as[FlatEvent]

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/NonCumulativeWeightAgg.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ class NonCumulativeWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data)
1212
studyStart: Option[Timestamp],
1313
cumWindow: Option[Int],
1414
cumStartThreshold: Option[Int],
15-
cumEndThreshold: Option[Int]): DataFrame = this.aggregateWeightImpl
15+
cumEndThreshold: Option[Int],
16+
dosageLevelIntervals: Option[List[Int]]): DataFrame = this.aggregateWeightImpl
1617

17-
def aggregateWeight: DataFrame = aggregateWeight(None, None, None, None)
18+
def aggregateWeight: DataFrame = aggregateWeight(None, None, None, None, None)
1819
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/exposures/PurchaseBasedWeightAgg.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class PurchaseBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data)
2121
}
2222

2323
val normalizeStart = udf(
24-
(normalizedMonth: Integer) => {
24+
(normalizedMonth: Int) => {
2525
val cal: Calendar = Calendar.getInstance()
2626
cal.setTime(studyStart)
2727
cal.add(Calendar.MONTH, normalizedMonth * cumWindow)
@@ -41,7 +41,8 @@ class PurchaseBasedWeightAgg(data: DataFrame) extends WeightAggregatorImpl(data)
4141
studyStart: Option[Timestamp],
4242
cumWindow: Option[Int],
4343
cumStartThreshold: Option[Int] = None,
44-
cumEndThreshold: Option[Int] = None): DataFrame = {
44+
cumEndThreshold: Option[Int] = None,
45+
dosageLevelIntervals: Option[List[Int]]= None): DataFrame = {
4546

4647
this.aggregateWeightImpl(studyStart.get, cumWindow.get)
4748
}

0 commit comments

Comments
 (0)