Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/local/analysis_split/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def map_analysis_dependencies(analyses,relational_mapping,data,debug):
for analysis in analyses:
if debug: print("#%s" % analysis)
for analysisType in analyses.get(analysis).get("analysis").get('data')['analysisType'].unique().tolist():
print("#1",relational_mapping.get("analysis"))
print("#2",relational_mapping.get("analysis").get("analysisType"))
print("#3",relational_mapping.get("analysis").get("analysisTypes").get(analysisType))
print("#4",relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign"))
print("#5",relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign").get("entity"))
foreign_entity=relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign").get("entity")
foreign_key=relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign").get("foreign")
foreign_values=analyses.get(analysis).get('analysis').get('data').loc[:,foreign_key].values.tolist()
Expand Down
2 changes: 1 addition & 1 deletion modules/local/check_dependencies/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def main(args):

check_minimum_columns(
args.file_metadata,
["fileName","dataType",'fileMd5sum']
["fileName","dataType"]
)

update_relational_mapping(relational_mapping,analysis_types)
Expand Down
49 changes: 46 additions & 3 deletions modules/local/payload/generate/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,38 @@
import argparse
import csv
import re
import os
from pathlib import Path
import hashlib

def calculate_md5(file_path):
"""Calculate MD5 checksum of a file."""
hash_md5 = hashlib.md5()

print("calculating MD5 for %s" % file_path)
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
print(f'ERROR: Failed to calculate MD5 for {file_path}: {e}', file=sys.stderr)
return None


def calculate_filesize(file_path):
"""Calculate MD5 checksum of a file."""
return(os.path.getsize(file_path))

def verify_md5sum(file_path,provided_md5):
"""Calculate MD5 checksum of a file."""

calculated_md5=calculate_md5(file_path)
if calculated_md5!=provided_md5:
print(f'ERROR: Mismatching MD5 detected for {file_path}. Provided \'{provided_md5}\' vs Calculated \'{calculated_md5}\'', file=sys.stderr)
sys.exit(1)

return(provided_md5)

def detect_delimiter(file_path):
"""Detect delimiter by examining the first few lines of the file"""
Expand Down Expand Up @@ -104,12 +135,24 @@ def main():
# Process files from file metadata (TSV or CSV format)
files_info = []
for file_row in file_data:

file_name=file_row.get("fileName", None)

if file_row.get("fileName", None)!=None:
file_name=re.findall(r'[^\\/]+$',file_row.get("fileName"))[0]

file_path="%s/%s" % (os.getcwd(),file_name)

if not os.path.exists(file_path):
print(f"Error: File {file_path} is missing")
sys.exit(1)

file_info = {
"fileName": file_row.get("fileName", None),
"fileSize": int(file_row.get("fileSize")) if file_row.get("fileSize") and file_row.get("fileSize").isdigit() else None,
"fileName": file_name,
"fileSize": int(file_row.get("fileSize")) if file_row.get("fileSize") and file_row.get("fileSize").isdigit() else calculate_filesize(file_path),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add similar function of verify_filesize().

"dataType": file_row.get("dataType", None),
"fileAccess": file_row.get("fileAccess", "controlled"),
"fileMd5sum": file_row.get("fileMd5sum", None),
"fileMd5sum": verify_md5sum(file_path,file_row.get("fileMd5sum")) if file_row.get("fileMd5sum") else calculate_md5(file_path),
"fileType": file_row.get("fileType", None)
}

Expand Down
1 change: 1 addition & 0 deletions modules/local/score/upload/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ process SCORE_UPLOAD {
tag "$meta.id"
label 'process_high'
label 'process_long'
maxForks params.fork_limit

container "${params.file_transfer_container}:${params.file_transfer_container_tag}"
containerOptions {
Expand Down
1 change: 1 addition & 0 deletions modules/local/song/getanalysis/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
process SONG_GETANALYSIS {
tag "$meta.id"
label 'process_single'
maxForks params.fork_limit

container "${ params.file_manager_container }:${ params.file_manager_container_tag }"

Expand Down
1 change: 1 addition & 0 deletions modules/local/song/manifest/main.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
process SONG_MANIFEST {
tag "$meta.id"
label 'process_single'
maxForks params.fork_limit

container "${ params.file_manager_container }:${ params.file_manager_container_tag }"
containerOptions {
Expand Down
1 change: 1 addition & 0 deletions modules/local/song/publish/main.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
process SONG_PUBLISH {
tag "$meta.id"
label 'process_single'
maxForks params.fork_limit

container "${ params.file_manager_container }:${ params.file_manager_container_tag }"
containerOptions {
Expand Down
3 changes: 2 additions & 1 deletion modules/local/song/submit/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
process SONG_SUBMIT {
tag "$meta.id"
label 'process_single'

maxForks params.fork_limit

container "${params.file_manager_container}:${params.file_manager_container_tag}"
containerOptions {
workflow.containerEngine == 'singularity' ?
Expand Down
5 changes: 3 additions & 2 deletions modules/local/validate_clinical/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def check_file_filename_duplicates(analysis):
analysis['status']=False
analysis['comments'].append("Multiple entries with the same fileName %s detected" % fileName)
def check_file_filename_filem5d(analysis):
if len(analysis['files'].groupby("fileMd5sum").count().query('fileName>1'))>1:
for fileName in analysis['files'].groupby("fileMd5sum").count().query('fileName>1').index.values.tolist():
###Exclude empty Md5
if len(analysis['files'].replace(np.nan,0).query("fileMd5sum!=0").groupby("fileMd5sum").count().query('fileName>1'))>1:
for fileName in analysis['files'].replace(np.nan,0).query("fileMd5sum!=0").groupby("fileMd5sum").count().query('fileName>1').index.values.tolist():
analysis['status']=False
analysis['comments'].append("Multiple entries with the same fileMd5sum %s detected" % fileMd5sum)
def check_study_id(analysis,study_id):
Expand Down
3 changes: 2 additions & 1 deletion modules/local/validation/crosscheck/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ process VALIDATION_CROSSCHECK {
tuple val(meta), path(payload), path(payload_files)

output:
tuple val(meta), path(payload), path(payload_files), emit: ch_payload_files
tuple val(meta), path("updated*.json"), path(payload_files), emit: ch_payload_files
tuple val(meta), path("*_status.yml"), emit: status
path "versions.yml", emit: versions

Expand All @@ -30,6 +30,7 @@ process VALIDATION_CROSSCHECK {
# Check if upstream process was successful by checking meta.status
if [ "${meta.status ?: 'pass'}" != "pass" ]; then
echo "Upstream process failed (meta.status: ${meta.status ?: 'pass'}), skipping MD5 checksum validation"
touch updated_${payload}.json
CROSSCHECK_EXIT_CODE=1
ERROR_DETAILS="Skipped MD5 checksum validation due to upstream failure"
elif grep -q '"error".*"payload_generation_failed"' "${payload}" 2>/dev/null; then
Expand Down
72 changes: 59 additions & 13 deletions modules/local/validation/crosscheck/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def validate_md5_checksums(payload_file, files_list):
file_md5_map = {}

for pf in payload_files:
if 'fileName' in pf and 'fileMd5sum' in pf:
file_md5_map[pf['fileName']] = pf['fileMd5sum']
file_md5_map[pf['fileName']] = pf['fileMd5sum']

print(f'Found {len(file_md5_map)} files with MD5 checksums in payload')

Expand All @@ -56,18 +55,21 @@ def validate_md5_checksums(payload_file, files_list):
validation_errors.append(f'File {filename} not found in payload metadata')
continue

expected_md5 = file_md5_map[filename]
if not expected_md5:
validation_errors.append(f'File {filename} has no MD5 checksum in payload')
continue

print(f'Validating MD5 for {filename}...')
actual_md5 = calculate_md5(actual_file)

if actual_md5 is None:
validation_errors.append(f'Failed to calculate MD5 for {filename}')
continue


expected_md5 = file_md5_map[filename]

if not expected_md5:
print(f'File {filename} has no MD5 checksum in payload. Replacing with calculated')
file_md5_map[filename]=actual_md5
continue

if actual_md5.lower() != expected_md5.lower():
validation_errors.append(f'MD5 mismatch for {filename}: expected {expected_md5}, got {actual_md5}')
else:
Expand All @@ -78,20 +80,49 @@ def validate_md5_checksums(payload_file, files_list):
print('MD5 VALIDATION ERRORS:', file=sys.stderr)
for error in validation_errors:
print(f' - {error}', file=sys.stderr)
return False, validation_errors
return False, validation_errors,{}
else:
print(f'MD5 checksum validation completed successfully for {files_validated} files')
return True, []
return True, [], file_md5_map

except json.JSONDecodeError as e:
error_msg = f'Invalid JSON in payload file: {e}'
print(f'ERROR: {error_msg}', file=sys.stderr)
return False, [error_msg]
return False, [error_msg],{}
except Exception as e:
error_msg = f'MD5 checksum validation failed: {e}'
print(f'ERROR: {error_msg}', file=sys.stderr)
return False, [error_msg]
return False, [error_msg],{}


def get_filesizes(payload_file, files_list):
"""Validate MD5 checksums of files against payload metadata."""
with open(payload_file, 'r') as f:
payload = json.load(f)

print(f'Loaded payload for Filesizes: {payload.get("studyId", "unknown")}')
print(f'Files to validate: {len(files_list)} files')

# Create payload file mapping by filename
payload_files = payload.get('files', [])
file_size_map = {}

for pf in payload_files:
file_size_map[pf['fileName']] = pf['fileSize']

print(f'Found {len(file_size_map)} files with FileSizes in payload')

for actual_file in files_list:
filename = os.path.basename(actual_file)

if not file_size_map[filename]:
file_size_map[filename]=os.path.getsize(actual_file)

return(file_size_map)





def main():
"""Main function for command line execution."""
Expand All @@ -102,9 +133,24 @@ def main():
args = parser.parse_args()

try:
success, errors = validate_md5_checksums(args.payload_file, args.files)

success, errors, file_md5_map = validate_md5_checksums(args.payload_file, args.files)
file_size_map = get_filesizes(args.payload_file, args.files)
if success:
with open(args.payload_file, 'r') as f:
payload = json.load(f)
updated_payload=payload

print(file_size_map)
for ind,file in enumerate(updated_payload['files']):
print("before",ind,updated_payload['files'][ind]['fileMd5sum'],updated_payload['files'][ind]['fileSize'])
updated_payload['files'][ind]['fileMd5sum']=file_md5_map[updated_payload['files'][ind]['fileName']]
updated_payload['files'][ind]['fileSize']=file_size_map[updated_payload['files'][ind]['fileName']]
print("after",ind,updated_payload['files'][ind]['fileMd5sum'],updated_payload['files'][ind]['fileSize'])


with open("updated_%s" % args.payload_file , "w") as f:
json.dump(updated_payload, f, indent=4)

sys.exit(0)
else:
sys.exit(1)
Expand Down
1 change: 1 addition & 0 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ params {
file_manager_url = null
file_transfer_url = null
clinical_url = null
fork_limit = 10

// Container images - Docker containers work with both Docker and Singularity
// Singularity automatically converts Docker images, no separate images needed
Expand Down
26 changes: 5 additions & 21 deletions subworkflows/local/data_validation/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

include { FILE_INTEGRITY } from '../file_integrity/main'
include { VALIDATION_METADATA } from '../../../modules/local/validation/metadata/main'
include { VALIDATION_CROSSCHECK } from '../../../modules/local/validation/crosscheck/main'


workflow DATA_VALIDATION {

Expand All @@ -26,26 +26,10 @@ workflow DATA_VALIDATION {
VALIDATION_METADATA ( ch_payload_files_biospecimen )
ch_versions = ch_versions.mix(VALIDATION_METADATA.out.versions.first())

// Step 2: MD5 checksum cross-check validation
// Update meta.status based on metadata validation result before passing to crosscheck
ch_crosscheck_input = VALIDATION_METADATA.out.ch_payload_files
.join(VALIDATION_METADATA.out.status, by: 0)
.map { meta, payload, payload_files, status_file ->
// Read status from YAML file and update meta
def status_content = status_file.text
def status_value = status_content.contains('status: "FAILED"') ? 'failed' : 'pass'
def updated_meta = meta.clone()
updated_meta.status = status_value
[updated_meta, payload, payload_files]
}

VALIDATION_CROSSCHECK ( ch_crosscheck_input )
ch_versions = ch_versions.mix(VALIDATION_CROSSCHECK.out.versions.first())

// Step 3: Validate file integrity (BAM, CRAM, VCF, FASTQ format checks)
// Step 2: Validate file integrity (BAM, CRAM, VCF, FASTQ format checks)
// Update meta.status based on crosscheck validation result before passing to file integrity
ch_integrity_input = VALIDATION_CROSSCHECK.out.ch_payload_files
.join(VALIDATION_CROSSCHECK.out.status, by: 0)
ch_integrity_input = VALIDATION_METADATA.out.ch_payload_files
.join(VALIDATION_METADATA.out.status, by: 0)
.map { meta, payload, files, status_file ->
// Read status from YAML file and update meta
def status_content = status_file.text
Expand Down Expand Up @@ -108,9 +92,9 @@ workflow DATA_VALIDATION {
// Use original FILE_INTEGRITY status files since we only update meta.status internally
ch_all_status = Channel.empty()
.mix(VALIDATION_METADATA.out.status)
.mix(VALIDATION_CROSSCHECK.out.status)
.mix(FILE_INTEGRITY.out.status)


emit:
// Output channels as specified
validated_payload_files = ch_validated_payload_files // channel: [ val(meta), payload(json), [files] ]
Expand Down
11 changes: 6 additions & 5 deletions tests/test_data/analysis_meta/analysis_metadata.tsv
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
studyId submitter_analysis_id analysisType submitter_participant_id submitter_specimen_id submitter_sample_id submitter_experiment_id data_category variant_class variant_calling_strategy genome_build genome_annotation
TEST-CA analysis_001 sequenceExperiment PART_001 SPEC_001 SAMPLE_001 EXP_001 Genomics
TEST-CA analysis_002 sequenceAlignment PART_002 SPEC_002 SAMPLE_002 EXP_002 Genomics GRCh38
TEST-CA analysis_003 variantCall PART_003 SPEC_003 SAMPLE_003 EXP_003 Genomics Germline Single sample GRCh38 GENCODE v38
TEST-CA analysis_004 sequenceAlignment PART_004 SPEC_004 SAMPLE_004 EXP_004 Genomics GRCh38 GENCODE v38
TEST-CA analysis_005 sequenceExperiment PART_004 SPEC_004 SAMPLE_004 EXP_005 Genomics GRCh38 GENCODE v38
PCGLST0003 analysis_001 sequenceExperiment PART_001 SPEC_001 SAMPLE_001 EXP_001 Genomics
PCGLST0003 analysis_002 sequenceAlignment PART_002 SPEC_002 SAMPLE_002 EXP_002 Genomics GRCh38
PCGLST0003 analysis_003 variantCall PART_003 SPEC_003 SAMPLE_003 EXP_003 Genomics Germline Single sample GRCh38 GENCODE v38
PCGLST0003 analysis_004 sequenceAlignment PART_004 SPEC_004 SAMPLE_004 EXP_004 Genomics GRCh38 GENCODE v38
PCGLST0003 analysis_005 sequenceExperiment PART_004 SPEC_004 SAMPLE_004 EXP_005 Genomics GRCh38 GENCODE v38
PCGLST0003 analysis_006 sequenceAlignment PART_004 SPEC_004 SAMPLE_004 EXP_004 Genomics GRCh38 GENCODE v38
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create new metadata test files for analysis, file and workflow?

3 changes: 2 additions & 1 deletion tests/test_data/analysis_meta/file_metadata.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ analysis_001 test_file_001_1.bam 69 3ffac00f58862a7c06a705795e7ac26a BAM control
analysis_001 test_file_001_2.bam 69 39744760b92b097738ba4758961e0121 BAM controlled Aligned reads
analysis_002 test_file_002_1.bam 74 b3fa249fdcbbe7de479c4394132eb4cc BAM controlled Aligned reads
analysis_003 test_file_003_1.vcf 124 02b4359d79feb76d67b0ea66fbbc2c86 VCF controlled Single Nucleotide Variants (SNVs)
analysis_004 test_rg_3.v2.bam 14911 178f97f7b1ca8bfc28fd5586bdd56799 BAM controlled Aligned reads
analysis_004 test_rg_3.v2.bam BAM controlled Aligned reads
analysis_005 test_sample_R1.fastq.gz 85 a1505aad2f04bd9594f43745ee5ee4a5 FASTQ controlled Raw Sequencing Reads
analysis_005 test_sample_R2.fastq.gz 87 38a41f5dbf5f1e1e09fb3024555ede4e FASTQ controlled Raw Sequencing Reads
analysis_006 test_file1.bam 311 02b4359d79feb76d67b0ea66fbbc2c86 BAM controlled Aligned reads
1 change: 1 addition & 0 deletions tests/test_data/analysis_meta/workflow_metadata.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ WF_001 analysis_001 molecular-data-submission-workflow 1.0.0 https://github.com/
WF_002 analysis_002 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_003 analysis_003 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_004 analysis_004 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_006 analysis_006 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
6 changes: 5 additions & 1 deletion tests/test_data/biospecimen/experiment.tsv
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
submitter_experiment_id submitter_sample_id experiment_type experiment_design assay_type_code assay_type_term platform instrument instrument_metadata sequencing_protocol
EXP001 SAMPLE001 Sequencing Whole genome sequencing WGS Whole genome sequencing ILLUMINA Illumina HiSeq 2000 {"run_name": "test_run"} Illumina TruSeq DNA PCR-Free
EXP_001 SAMPLE_001 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0000759 (Illumina) HiSeq 2500 v4 TruSeq DNA PCR-Free
EXP_002 SAMPLE_002 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0000759 (Illumina) HiSeq 2500 v4 TruSeq DNA PCR-Free
EXP_003 SAMPLE_003 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0002630 (Illumina NovaSeq 6000) NovaSeq 6000 S4 TruSeq DNA PCR-Free
EXP_004 SAMPLE_004 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0002630 (Illumina NovaSeq 6000) NovaSeq 6000 S4 TruSeq DNA PCR-Free
EXP_005 SAMPLE_004 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0002630 (Illumina NovaSeq 6000) NovaSeq 6000 S4 TruSeq DNA PCR-Free
5 changes: 4 additions & 1 deletion tests/test_data/biospecimen/sample.tsv
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
submitter_sample_id submitter_specimen_id molecule_type_code molecule_type_term sample_status
SAMPLE001 SPECIMEN001 1 Total DNA Fresh
SAMPLE_001 SPEC_001 NCIT:C449 Genomic DNA Case
SAMPLE_002 SPEC_002 NCIT:C449 Genomic DNA Case
SAMPLE_003 SPEC_003 NCIT:C449 Genomic DNA Case
SAMPLE_004 SPEC_004 NCIT:C449 Genomic DNA Case
5 changes: 4 additions & 1 deletion tests/test_data/biospecimen/specimen.tsv
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
submitter_participant_id submitter_specimen_id specimen_tissue_source_code specimen_tissue_source_term specimen_storage specimen_processing age_at_specimen_collection specimen_anatomic_location_code specimen_anatomic_location_term specimen_laterality
PARTICIPANT001 SPECIMEN001 2 Blood derived - peripheral blood Fresh Cryopreservation in liquid nitrogen 45 C94 Blood Not applicable
PART_001 SPEC_001 NCIT:C12434 Blood Frozen in -70 freezer Fresh 45 ICDO3:C50.9 Breast NOS Left
PART_002 SPEC_002 NCIT:C164014 Solid Tissue Sample Frozen in -70 freezer Fresh 52 ICDO3:C78.0 Secondary malignant neoplasm of lung Right
PART_003 SPEC_003 NCIT:C164014 Solid Tissue Sample Paraffin block Formalin fixed - buffered 38 ICDO3:C25.9 Pancreas NOS Not applicable
PART_004 SPEC_004 NCIT:C164014 Solid Tissue Sample Frozen in -70 freezer Fresh 60 ICDO3:C18.9 Colon NOS Not applicable