Skip to content

Studies

Anfor edited this page Apr 1, 2020 · 2 revisions

Studies

A study is created using Extractor, Transformer, Events, Configuration and a main element that manages the treatment. To exemplify this process we will use the Fall's study.

The entry point for this study is the FallMain object inherited from Main and FractureCodes, from the first one you get the necessary elements to create a Spark session, use it and destroy it; from the second one you get the codes to identify fractures.

In the Main object we find the main method that contains the necessary to start and stop SparkSession, recover the arguments that are going to be used during the study, these arguments are the path of the configuration file and the environment in which the treatment is launched and to pass both of them as parameters through run method.

 def main(args: Array[String]): Unit = {
   //This method starts the SparkSession
    startContext()
    val sqlCtx = sqlContext
    val argsMap = args.map(
      arg => arg.split("=")(0) -> arg.split("=")(1)
    ).toMap
    try {
   //This method allows pass as parameters SQLContext and parsed arguments
      run(sqlCtx, argsMap)
    }
    finally stopContext()
  }
Arguments

 "conf"="/src/main/resources/config/fall/default.conf","env"="test"

The first step once treatment is running is load in config object all config values from config file.

The FallConfig object allows to parametrize the study, with input, output and other parameters to control study's values. There are a template to use with fall study.

# Template configuration file for the Fall study. To override the defaults, copy this file to your working
#   directory, then uncomment the desired lines and pass the file path to spark-submit

# input.dcir = "src/test/resources/test-input/DCIR.parquet"
# input.mco = "src/test/resources/test-input/MCO.parquet"
# input.mco_ce = "src/test/resources/test-input/MCO_CE.parquet"
# input.ir_ben = "src/test/resources/test-input/IR_BEN_R.parquet"
# input.ir_imb = "src/test/resources/test-input/IR_IMB_R.parquet"
# input.ir_pha = "src/test/resources/test-input/IR_PHA_R_With_molecules.parquet"

# output.root = "target/test/output"
# output.save_mode = "errorIfExists"   // Possible values = [overwrite, append, errorIfExists, withTimestamp] Strategy of saving output data. errorIfExists by deault

# exposures.start_delay: 0 months      // 0+ (Usually between 0 and 3). Represents the delay in months between a dispensation and its exposure start date.
# exposures.purchases_window: 0 months // 0+ (Usually 0 or 6) Represents the window size in months. Ignored when min_purchases=1.
# exposures.end_threshold_gc: 90 days  // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished".
# exposures.end_threshold_ngc: 30 days // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished".
# exposures.end_delay: 30 days         // Number of periods that we add to the exposure end to delay it (lag).

# interactions.level: 3 // Integer representing the maximum number of values of Interaction. Please be careful as this not scale well beyond 5 when the data contains a patient with very high number of exposures

# drugs.level: "Therapeutic"           // Options are Therapeutic, Pharmacological, MoleculeCombination
# drugs.families: ["Antihypertenseurs", "Antidepresseurs", "Neuroleptiques", "Hypnotiques"]

# patients.start_gap_in_months: 2      // filter Removes all patients who have got an event within N months after the study start.

# sites.sites: ["BodySites"]

# outcomes.fall_frame: 0 months        // fractures are grouped if they happen in the same site within the period fallFrame, (default value 0 means no group)

# run_parameters.outcome: ["Acts", "Diagnoses", "Outcomes"]                                           // pipeline of calculation of outcome, possible values : Acts, Diagnoses, and Outcomes
# run_parameters.exposure: ["Patients", "StartGapPatients", "DrugPurchases", "Exposures"]             // pipeline of the calculation of exposure, possible values : Patients, StartGapPatients, DrugPurchases, Exposures

Extracting from config object paths to load sources in sources objects or parameters to filter study's data. Later it runs three methods that yield OperationMetadata.

All three methods takes the same parameters Sources and FallConfig, these methods extract data from source for example in computecontrols:


//computecontrols
    val opioids = OpioidsExtractor.extract(sources).cache()
    
object OpioidsExtractor {
  def extract(sources: Sources): Dataset[Event[Drug]] = {
    new DrugExtractor(DrugConfig(MoleculeCombinationLevel, List(Opioids))).extract(sources, Set.empty)
  }
}

//Extractor is inherited here
class DrugExtractor(drugConfig: DrugConfig) extends Extractor[Drug] {

  override def extract(
    sources: Sources,
    codes: Set[String])
    (implicit ctag: universe.TypeTag[Drug]): Dataset[Event[Drug]] = {

    val input: DataFrame = getInput(sources)

    import input.sqlContext.implicits._

    {
      if (drugConfig.families.isEmpty) {
        input.filter(isInExtractorScope _)
      }
      else {
        input.filter(isInExtractorScope _).filter(isInStudy(codes) _)
      }
    }.flatMap(builder _).distinct()
  }

and some methods as well use Transformer to create Event(Events), for instance computeOutcomes extract data from sources and later if condition is true, transform them into Outcome's Event type.

   if (fallConfig.runParameters.outcomes) {
      logger.info("Fractures")
      val fractures: Dataset[Event[Outcome]] = new FracturesTransformer(fallConfig)
        .transform(optionLiberalActs.get, optionActs.get, optionDiagnoses.get)
      operationsMetadata += {
        OperationReporter
          .report(
            "fractures",
            List("acts"),
            OperationTypes.Outcomes,
            fractures.toDF,
            Path(fallConfig.output.outputSavePath),
            fallConfig.output.saveMode
          )
      }
    }

Here as example the output of computeExposures:

OperationMetadata(drug_purchases,List(DCIR),dispensations,target/test/output/drug_purchases/data,target/test/output/drug_purchases/patients), 
OperationMetadata(extract_patients,List(DCIR, MCO, IR_BEN_R, MCO_CE),patients,target/test/output/extract_patients/data,), 
OperationMetadata(filter_patients,List(drug_purchases, extract_patients),patients,target/test/output/filter_patients/data,)

The result of these methods, all OperationMetadata, are stored in a value operationsMetadata and this value is stored as a list with other descriptive values (class name,start timestamp,end timestamp,operationsMetadata) in a case class MainMetadata.

case class MainMetadata(
  className: String,
  startTimestamp: java.util.Date,
  endTimestamp: java.util.Date,
//This is the parameter that store the list of operationsMetadata
  operations: List[OperationMetadata])
  extends JsonSerializable

This one is the result of the study, it's saved as Json with other values passed as parameters to writeMetaData method in OperationReporter object.

    // Write Metadata
    val metadata = MainMetadata(this.getClass.getName, startTimestamp, new java.util.Date(), operationsMetadata.toList)
    val metadataJson: String = metadata.toJsonString()

    OperationReporter
     .writeMetaData(metadataJson, "metadata_fall_" + format.format(startTimestamp) + ".json", argsMap("env"))

  1. DCIR
  2. PMSI
  1. Referentials

Clone this wiki locally