Skip to content

Commit 723604e

Browse files
committed
CNAM-153 Removes filterDiagnosedPatients flag.
1 parent 8b7d942 commit 723604e

File tree

10 files changed

+67
-23
lines changed

10 files changed

+67
-23
lines changed

src/main/resources/config/filtering-default.conf

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ default = {
2424
follow-up-delay = 6 #Number of months after the observation start that is considered to be followup
2525
filter_delayed_patients = true #Patients who are exposed after certain months of the study start.
2626
delayed_entries_threshold = 12 #Months that signifies the delayed entries.
27-
filter_diagonized_patients = true #Patients who are diagonized cancer before study period.
2827

2928
exposures = {
3029
min_purchases = 2 #Minimum number of purchases that have to be made in order to be considered exposed.
@@ -77,12 +76,16 @@ default = {
7776
}
7877

7978
cnam = ${default}
80-
cnam = include "filtering-cnam.conf"
79+
cnam = {
80+
include "filtering-cnam.conf"
81+
}
8182

8283
cmap = ${default}
83-
cmap = include "filtering-cmap.conf"
84+
cmap = {
85+
include "filtering-cmap.conf"
86+
}
8487

8588
test = ${default}
8689
test = {
87-
include "filtering-test.conf"
88-
}
90+
include "filtering-test.conf"
91+
}
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package fr.polytechnique.cmap.cnam.filtering
22

3+
import java.sql.Timestamp
34
import org.apache.spark.sql.Dataset
4-
import fr.polytechnique.cmap.cnam.utilities.functions._
55

66
trait ExposuresTransformer extends DatasetTransformer[FlatEvent, FlatEvent] {
77

8-
// Constant definitions. Should be verified before compiling.
9-
// In the future, we may want to export them to an external file.
10-
val StudyStart = makeTS(2006, 1, 1)
8+
val StudyStart: Timestamp = FilteringConfig.dates.studyStart
119

1210
def transform(input: Dataset[FlatEvent]): Dataset[FlatEvent]
1311
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/FilteringConfig.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import scala.collection.JavaConverters._
55
import org.apache.spark.SparkContext
66
import org.apache.spark.sql.SQLContext
77
import com.typesafe.config.{Config, ConfigFactory}
8-
import fr.polytechnique.cmap.cnam.filtering.mlpp.MLPPFeature
98
import fr.polytechnique.cmap.cnam.utilities.functions._
109

1110
object FilteringConfig {

src/main/scala/fr/polytechnique/cmap/cnam/filtering/ObservationPeriodTransformer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package fr.polytechnique.cmap.cnam.filtering
22

3+
import java.sql.Timestamp
34
import org.apache.spark.sql.expressions.Window
45
import org.apache.spark.sql.functions._
56
import org.apache.spark.sql.{DataFrame, Dataset}
6-
import fr.polytechnique.cmap.cnam.utilities.functions._
77

88
trait ObservationPeriodTransformer extends DatasetTransformer[FlatEvent, FlatEvent]{
99

10-
final val StudyStart = makeTS(2006, 1, 1)
11-
final val StudyEnd = makeTS(2009, 12, 31, 23, 59, 59)
10+
final val StudyStart: Timestamp = FilteringConfig.dates.studyStart
11+
final val StudyEnd: Timestamp = FilteringConfig.dates.studyEnd
1212

1313
val outputColumns = List(
1414
col("patientID"),

src/main/scala/fr/polytechnique/cmap/cnam/filtering/TrackLossTransformer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import org.apache.spark.sql.{Column, DataFrame, Dataset}
99

1010
object TrackLossTransformer extends Transformer[Event]{
1111

12-
val EmptyMonths: Int = 4
13-
val TracklossMonthDelay: Int = 2
14-
val LastDay = Timestamp.valueOf("2010-01-01 00:00:00")
12+
val EmptyMonths: Int = FilteringConfig.tracklossDefinition.threshold
13+
val TracklossMonthDelay: Int = FilteringConfig.tracklossDefinition.delay
14+
val LastDay: Timestamp = FilteringConfig.dates.studyEnd
1515

1616
val inputColumns: List[Column] = List(
1717
col("NUM_ENQ").as("patientID"),

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxConfig.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,21 @@ object CoxConfig {
1919
lazy val followUpMonthsDelay: Int = modelParams.getInt("follow-up-delay")
2020
lazy val filterDelayedPatients: Boolean = modelParams.getBoolean("filter_delayed_patients")
2121
lazy val delayedEntriesThreshold: Int = modelParams.getInt("delayed_entries_threshold")
22-
lazy val filterDiagnosedPatients: Boolean = modelParams.getBoolean("filter_diagnosed_patients")
2322

2423
lazy val exposureDefinition = CoxExposureDefinition(
2524
minPurchases = modelParams.getInt("exposures.min_purchases"),
2625
startDelay = modelParams.getInt("exposures.start_delay"),
2726
purchasesWindow = modelParams.getInt("exposures.purchases_window")
2827
)
28+
29+
def summarize: Map[String, AnyVal] = {
30+
Map(
31+
"filterDelayedPatients" -> filterDelayedPatients,
32+
"delayedEntriesThreshold" -> delayedEntriesThreshold,
33+
"followUpMonthsDelay" -> followUpMonthsDelay,
34+
"exposureDefinition.minPurchases" -> exposureDefinition.minPurchases,
35+
"exposureDefinition.startDelay" -> exposureDefinition.startDelay,
36+
"exposureDefinition.purchasesWindow" -> exposureDefinition.purchasesWindow
37+
)
38+
}
2939
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxExposuresTransformer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ object CoxExposuresTransformer extends ExposuresTransformer {
4646
lit(StudyStart),
4747
DelayedEntriesThreshold
4848
).cast(TimestampType)
49+
4950
val drugFilter = max(
5051
when(
5152
col("category") === "molecule" && (col("start") <= firstYearObservation),

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxMain.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ object CoxMain extends Main {
1515

1616
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = {
1717

18+
logger.info("Running FilteringMain...")
1819
val flatEvents: Dataset[FlatEvent] = FilteringMain.run(sqlContext, argsMap).get
1920
coxFeaturing(flatEvents, argsMap)
2021
}
@@ -23,16 +24,15 @@ object CoxMain extends Main {
2324
import flatEvents.sqlContext.implicits._
2425

2526
val sqlContext = flatEvents.sqlContext
27+
val sc = flatEvents.sqlContext.sparkContext
2628

2729
argsMap.get("conf").foreach(sqlContext.setConf("conf", _))
2830
argsMap.get("env").foreach(sqlContext.setConf("env", _))
2931

3032
val cancerDefinition: String = FilteringConfig.cancerDefinition
3133
val filterDelayedPatients: Boolean = CoxConfig.filterDelayedPatients
3234
val outputRoot = FilteringConfig.outputPaths.coxFeatures
33-
val outputDir = s"$outputRoot/$cancerDefinition/$filterDelayedPatients"
34-
35-
logger.info("Running FilteringMain...")
35+
val outputDir = s"$outputRoot/$cancerDefinition"
3636

3737
val dcirFlat: DataFrame = sqlContext.read.parquet(FilteringConfig.inputPaths.dcir)
3838

@@ -52,7 +52,9 @@ object CoxMain extends Main {
5252
logger.info("Caching disease events...")
5353
logger.info("Number of disease events: " + diseaseFlatEvents.count)
5454

55-
logger.info("Preparing for Cox")
55+
logger.info("Preparing for Cox with the following parameters:")
56+
logger.info(CoxConfig.summarize.foreach(println))
57+
5658
logger.info("(Lazy) Transforming Follow-up events...")
5759
val observationFlatEvents = CoxObservationPeriodTransformer.transform(drugFlatEvents)
5860

@@ -89,8 +91,9 @@ object CoxMain extends Main {
8991
.union(observationFlatEvents)
9092
.union(tracklossFlatEvents)
9193

92-
logger.info("Writing summary of all cox events...")
94+
logger.info("Writing summary of all cox events and config...")
9395
flatEventsSummary.toDF.write.parquet(s"$outputDir/eventsSummary")
96+
sc.parallelize(CoxConfig.summarize.toSeq).coalesce(1).saveAsTextFile(s"$outputDir/config.txt")
9497
logger.info("Writing Exposures...")
9598
exposures.toDF.write.parquet(s"$outputDir/exposures")
9699

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package fr.polytechnique.cmap.cnam.filtering.cox
2+
3+
import fr.polytechnique.cmap.cnam.SharedContext
4+
5+
/**
6+
* Created by sathiya on 29/11/16.
7+
*/
8+
class CoxConfigSuite extends SharedContext {
9+
10+
"summarize" should "correctly return all the default Cox Parameters from the config file" in {
11+
// Given
12+
val expectedResult = Map(
13+
"filterDelayedPatients" -> true,
14+
"delayedEntriesThreshold" -> 12,
15+
"followUpMonthsDelay" -> 6,
16+
"exposureDefinition.minPurchases" -> 2,
17+
"exposureDefinition.startDelay" -> 3,
18+
"exposureDefinition.purchasesWindow" -> 6
19+
)
20+
21+
// When
22+
val result = CoxConfig.summarize
23+
24+
// Then
25+
println("Result:")
26+
result.foreach(println)
27+
println("Expected:")
28+
expectedResult.foreach(println)
29+
assert(result == expectedResult)
30+
}
31+
}

src/test/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxMainSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ class CoxMainSuite extends SharedContext {
3232

3333
// Then
3434
import RichDataFrames._
35-
result.printSchema()
3635
assert(result === expectedResult)
3736
}
3837

0 commit comments

Comments
 (0)