-
Notifications
You must be signed in to change notification settings - Fork 3
How are linked the elements
To illustrate how all the elements (extractors,transformer,event,studies) that have been described individually are linked,
we can take the computeExposures method of
Fall's study
as an example.
The first step in conducting the study is to extract the necessary data. This stage is done with an Extractor that retrieves the data and filters it if necessary.
val optionDrugPurchases = if (fallConfig.runParameters.drugPurchases) {
// First step extract data
val drugPurchases = new DrugsExtractor(fallConfig.drugs).extract(sources).cache()
operationsMetadata += {
OperationReporter
.report(
"drug_purchases",
List("DCIR"),
OperationTypes.Dispensations,
drugPurchases.toDF,
Path(fallConfig.output.outputSavePath),
fallConfig.output.saveMode
)
}
Some(drugPurchases)
} else {
None
}
The above method call a class used as Extractor's wrap.
//DrugExtractor's wrap
class DrugsExtractor(drugConfig: DrugConfig) {
def extract(sources: Sources): Dataset[Event[Drug]] = {
new DrugExtractor(drugConfig).extract(sources, Set.empty)
}
}
In this example DrugExtractor extract from the sources dcir and irphar these columns
patientID,
CIP13,
ATC5,
eventDate,
molecules,
conditioning,
FluxDate,
FluxProcessingDate,
EmitterType,
EmitterId,
FluxSeqNumber,
OrganisationOldId,
OrganisationDecompteNumber.
//This method retrieves the columns listed earlier from sources.
override def getInput(sources: Sources): DataFrame = {
val neededColumns: List[Column] = List(
col("NUM_ENQ").cast(StringType).as("patientID"),
col("ER_PHA_F__PHA_PRS_C13").cast(StringType).as("CIP13"),
col("PHA_ATC_C07").cast(StringType).as("ATC5"),
col("EXE_SOI_DTD").cast(TimestampType).as("eventDate"),
col("molecule_combination").cast(StringType).as("molecules"),
col("PHA_CND_TOP").cast(StringType).as("conditioning")
) ::: ColNames.GroupID.map(col)
ColNames{
lazy val GroupID = List(
FluxDate, FluxProcessingDate, EmitterType, EmitterId, FluxSeqNumber, OrganisationOldId, OrganisationDecompteNumber
)
}
lazy val irPhaR = sources.irPha.get
lazy val dcir = sources.dcir.get
val spark: SparkSession = dcir.sparkSession
lazy val df: DataFrame = dcir.join(irPhaR, dcir.col("ER_PHA_F__PHA_PRS_C13") === irPhaR.col("PHA_CIP_C13"))
df
.select(neededColumns: _*)
.withColumn("conditioning", when(col("conditioning") === "GC", 1).otherwise(2))
.na.drop(Seq("eventDate", "CIP13", "ATC5"))
}
Columns are used to create a drug's Event
override def extract(
sources: Sources,
codes: Set[String])
(implicit ctag: universe.TypeTag[Drug]): Dataset[Event[Drug]] = {
val input: DataFrame = getInput(sources)
import input.sqlContext.implicits._
{
if (drugConfig.families.isEmpty) {
input.filter(isInExtractorScope _)
}
else {
input.filter(isInExtractorScope _).filter(isInStudy(codes) _)
}
// builder used to construct Event
}.flatMap(builder _).distinct()
}
Once drugs events are extracted, they are transformed into DrugPrescription events.
/**
* Transform DrugPurchases Events to DrugPrescription Events.
* @param drugs [[Dataset]][[Event]][[Drug]]
* @return [[Dataset]][[Event]][[DrugPrescription]]
*/
def transform(drugs: Dataset[Event[Drug]]): Dataset[Event[DrugPrescription]] = {
val sqlCtx = drugs.sqlContext
import sqlCtx.implicits._
drugs
.groupByKey(drug => (drug.groupID, drug.patientID, drug.start))
.mapGroups((_, drugs) => fromDrugs(drugs.toList))
.distinct()
}
That are used to complete a study.
val prescriptions = new DrugPrescriptionTransformer().transform(optionDrugPurchases.get).cache()
operationsMetadata += {
OperationReporter
.report(
"prescriptions",
List("drug_purchases"),
OperationTypes.Dispensations,
prescriptions.toDF,
Path(fallConfig.output.outputSavePath),
fallConfig.output.saveMode
)
}
This is how are elements are used.
- Section One: Events Mapping
- DCIR
- PMSI
- Referentials
- Section Two: Extraction Elements
- Section Three: Patients Mapping
- Section Four: Transformers