Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/local/analysis_split/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def map_analysis_dependencies(analyses,relational_mapping,data,debug):
for analysis in analyses:
if debug: print("#%s" % analysis)
for analysisType in analyses.get(analysis).get("analysis").get('data')['analysisType'].unique().tolist():
print("#1",relational_mapping.get("analysis"))
print("#2",relational_mapping.get("analysis").get("analysisType"))
print("#3",relational_mapping.get("analysis").get("analysisTypes").get(analysisType))
print("#4",relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign"))
print("#5",relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign").get("entity"))
foreign_entity=relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign").get("entity")
foreign_key=relational_mapping.get("analysis").get("analysisTypes").get(analysisType).get("foreign").get("foreign")
foreign_values=analyses.get(analysis).get('analysis').get('data').loc[:,foreign_key].values.tolist()
Expand Down
2 changes: 1 addition & 1 deletion modules/local/check_dependencies/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def main(args):

check_minimum_columns(
args.file_metadata,
["fileName","dataType",'fileMd5sum']
["fileName","dataType"]
)

update_relational_mapping(relational_mapping,analysis_types)
Expand Down
59 changes: 56 additions & 3 deletions modules/local/payload/generate/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,48 @@
import argparse
import csv
import re
import os
from pathlib import Path
import hashlib

def calculate_md5(file_path):
"""Calculate MD5 checksum of a file."""
hash_md5 = hashlib.md5()

print("calculating MD5 for %s" % file_path)
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
print(f'ERROR: Failed to calculate MD5 for {file_path}: {e}', file=sys.stderr)
return None


def calculate_filesize(file_path):
"""Calculate file size of a file."""
return(os.path.getsize(file_path))


def verify_filesize(file_path,provided_filesize):
"""Verify file size of a file."""
calculated_filesize=calculate_filesize(file_path)
if calculated_filesize!=provided_filesize:
print(f'ERROR: Mismatching filesize detected for {file_path}. Provided \'{str(provided_filesize)}\' vs Calculated \'{str(calculated_filesize)}\'', file=sys.stderr)
sys.exit(1)

return(provided_filesize)

def verify_md5sum(file_path,provided_md5):
"""Verify MD5 checksum of a file."""

calculated_md5=calculate_md5(file_path)
if calculated_md5!=provided_md5:
print(f'ERROR: Mismatching MD5 detected for {file_path}. Provided \'{provided_md5}\' vs Calculated \'{calculated_md5}\'', file=sys.stderr)
sys.exit(1)

return(provided_md5)

def detect_delimiter(file_path):
"""Detect delimiter by examining the first few lines of the file"""
Expand Down Expand Up @@ -104,12 +145,24 @@ def main():
# Process files from file metadata (TSV or CSV format)
files_info = []
for file_row in file_data:

file_name=file_row.get("fileName", None)

if file_row.get("fileName", None)!=None:
file_name=re.findall(r'[^\\/]+$',file_row.get("fileName"))[0]

file_path="%s/%s" % (os.getcwd(),file_name)

if not os.path.exists(file_path):
print(f"Error: File {file_path} is missing")
sys.exit(1)

file_info = {
"fileName": file_row.get("fileName", None),
"fileSize": int(file_row.get("fileSize")) if file_row.get("fileSize") and file_row.get("fileSize").isdigit() else None,
"fileName": file_name,
"fileSize": verify_filesize(file_path,int(float(file_row.get("fileSize")))) if file_row.get("fileSize") else calculate_filesize(file_path),
"dataType": file_row.get("dataType", None),
"fileAccess": file_row.get("fileAccess", "controlled"),
"fileMd5sum": file_row.get("fileMd5sum", None),
"fileMd5sum": verify_md5sum(file_path,file_row.get("fileMd5sum")) if file_row.get("fileMd5sum") else calculate_md5(file_path),
"fileType": file_row.get("fileType", None)
}

Expand Down
1 change: 1 addition & 0 deletions modules/local/score/upload/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ process SCORE_UPLOAD {
tag "$meta.id"
label 'process_high'
label 'process_long'
maxForks params.fork_limit

container "${params.file_transfer_container}:${params.file_transfer_container_tag}"
containerOptions {
Expand Down
1 change: 1 addition & 0 deletions modules/local/song/getanalysis/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
process SONG_GETANALYSIS {
tag "$meta.id"
label 'process_single'
maxForks params.fork_limit

container "${ params.file_manager_container }:${ params.file_manager_container_tag }"

Expand Down
1 change: 1 addition & 0 deletions modules/local/song/manifest/main.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
process SONG_MANIFEST {
tag "$meta.id"
label 'process_single'
maxForks params.fork_limit

container "${ params.file_manager_container }:${ params.file_manager_container_tag }"
containerOptions {
Expand Down
1 change: 1 addition & 0 deletions modules/local/song/publish/main.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
process SONG_PUBLISH {
tag "$meta.id"
label 'process_single'
maxForks params.fork_limit

container "${ params.file_manager_container }:${ params.file_manager_container_tag }"
containerOptions {
Expand Down
3 changes: 2 additions & 1 deletion modules/local/song/submit/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
process SONG_SUBMIT {
tag "$meta.id"
label 'process_single'

maxForks params.fork_limit

container "${params.file_manager_container}:${params.file_manager_container_tag}"
containerOptions {
workflow.containerEngine == 'singularity' ?
Expand Down
5 changes: 3 additions & 2 deletions modules/local/validate_clinical/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def check_file_filename_duplicates(analysis):
analysis['status']=False
analysis['comments'].append("Multiple entries with the same fileName %s detected" % fileName)
def check_file_filename_filem5d(analysis):
if len(analysis['files'].groupby("fileMd5sum").count().query('fileName>1'))>1:
for fileName in analysis['files'].groupby("fileMd5sum").count().query('fileName>1').index.values.tolist():
###Exclude empty Md5
if len(analysis['files'].replace(np.nan,0).query("fileMd5sum!=0").groupby("fileMd5sum").count().query('fileName>1'))>1:
for fileName in analysis['files'].replace(np.nan,0).query("fileMd5sum!=0").groupby("fileMd5sum").count().query('fileName>1').index.values.tolist():
analysis['status']=False
analysis['comments'].append("Multiple entries with the same fileMd5sum %s detected" % fileMd5sum)
def check_study_id(analysis,study_id):
Expand Down
3 changes: 2 additions & 1 deletion modules/local/validation/crosscheck/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ process VALIDATION_CROSSCHECK {
tuple val(meta), path(payload), path(payload_files)

output:
tuple val(meta), path(payload), path(payload_files), emit: ch_payload_files
tuple val(meta), path("updated*.json"), path(payload_files), emit: ch_payload_files
tuple val(meta), path("*_status.yml"), emit: status
path "versions.yml", emit: versions

Expand All @@ -30,6 +30,7 @@ process VALIDATION_CROSSCHECK {
# Check if upstream process was successful by checking meta.status
if [ "${meta.status ?: 'pass'}" != "pass" ]; then
echo "Upstream process failed (meta.status: ${meta.status ?: 'pass'}), skipping MD5 checksum validation"
touch updated_${payload}.json
CROSSCHECK_EXIT_CODE=1
ERROR_DETAILS="Skipped MD5 checksum validation due to upstream failure"
elif grep -q '"error".*"payload_generation_failed"' "${payload}" 2>/dev/null; then
Expand Down
72 changes: 59 additions & 13 deletions modules/local/validation/crosscheck/resources/usr/bin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def validate_md5_checksums(payload_file, files_list):
file_md5_map = {}

for pf in payload_files:
if 'fileName' in pf and 'fileMd5sum' in pf:
file_md5_map[pf['fileName']] = pf['fileMd5sum']
file_md5_map[pf['fileName']] = pf['fileMd5sum']

print(f'Found {len(file_md5_map)} files with MD5 checksums in payload')

Expand All @@ -56,18 +55,21 @@ def validate_md5_checksums(payload_file, files_list):
validation_errors.append(f'File {filename} not found in payload metadata')
continue

expected_md5 = file_md5_map[filename]
if not expected_md5:
validation_errors.append(f'File {filename} has no MD5 checksum in payload')
continue

print(f'Validating MD5 for {filename}...')
actual_md5 = calculate_md5(actual_file)

if actual_md5 is None:
validation_errors.append(f'Failed to calculate MD5 for {filename}')
continue


expected_md5 = file_md5_map[filename]

if not expected_md5:
print(f'File {filename} has no MD5 checksum in payload. Replacing with calculated')
file_md5_map[filename]=actual_md5
continue

if actual_md5.lower() != expected_md5.lower():
validation_errors.append(f'MD5 mismatch for {filename}: expected {expected_md5}, got {actual_md5}')
else:
Expand All @@ -78,20 +80,49 @@ def validate_md5_checksums(payload_file, files_list):
print('MD5 VALIDATION ERRORS:', file=sys.stderr)
for error in validation_errors:
print(f' - {error}', file=sys.stderr)
return False, validation_errors
return False, validation_errors,{}
else:
print(f'MD5 checksum validation completed successfully for {files_validated} files')
return True, []
return True, [], file_md5_map

except json.JSONDecodeError as e:
error_msg = f'Invalid JSON in payload file: {e}'
print(f'ERROR: {error_msg}', file=sys.stderr)
return False, [error_msg]
return False, [error_msg],{}
except Exception as e:
error_msg = f'MD5 checksum validation failed: {e}'
print(f'ERROR: {error_msg}', file=sys.stderr)
return False, [error_msg]
return False, [error_msg],{}


def get_filesizes(payload_file, files_list):
"""Validate MD5 checksums of files against payload metadata."""
with open(payload_file, 'r') as f:
payload = json.load(f)

print(f'Loaded payload for Filesizes: {payload.get("studyId", "unknown")}')
print(f'Files to validate: {len(files_list)} files')

# Create payload file mapping by filename
payload_files = payload.get('files', [])
file_size_map = {}

for pf in payload_files:
file_size_map[pf['fileName']] = pf['fileSize']

print(f'Found {len(file_size_map)} files with FileSizes in payload')

for actual_file in files_list:
filename = os.path.basename(actual_file)

if not file_size_map[filename]:
file_size_map[filename]=os.path.getsize(actual_file)

return(file_size_map)





def main():
"""Main function for command line execution."""
Expand All @@ -102,9 +133,24 @@ def main():
args = parser.parse_args()

try:
success, errors = validate_md5_checksums(args.payload_file, args.files)

success, errors, file_md5_map = validate_md5_checksums(args.payload_file, args.files)
file_size_map = get_filesizes(args.payload_file, args.files)
if success:
with open(args.payload_file, 'r') as f:
payload = json.load(f)
updated_payload=payload

print(file_size_map)
for ind,file in enumerate(updated_payload['files']):
print("before",ind,updated_payload['files'][ind]['fileMd5sum'],updated_payload['files'][ind]['fileSize'])
updated_payload['files'][ind]['fileMd5sum']=file_md5_map[updated_payload['files'][ind]['fileName']]
updated_payload['files'][ind]['fileSize']=file_size_map[updated_payload['files'][ind]['fileName']]
print("after",ind,updated_payload['files'][ind]['fileMd5sum'],updated_payload['files'][ind]['fileSize'])


with open("updated_%s" % args.payload_file , "w") as f:
json.dump(updated_payload, f, indent=4)

sys.exit(0)
else:
sys.exit(1)
Expand Down
1 change: 1 addition & 0 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ params {
file_manager_url = null
file_transfer_url = null
clinical_url = null
fork_limit = 10

// Container images - Docker containers work with both Docker and Singularity
// Singularity automatically converts Docker images, no separate images needed
Expand Down
26 changes: 5 additions & 21 deletions subworkflows/local/data_validation/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

include { FILE_INTEGRITY } from '../file_integrity/main'
include { VALIDATION_METADATA } from '../../../modules/local/validation/metadata/main'
include { VALIDATION_CROSSCHECK } from '../../../modules/local/validation/crosscheck/main'


workflow DATA_VALIDATION {

Expand All @@ -26,26 +26,10 @@ workflow DATA_VALIDATION {
VALIDATION_METADATA ( ch_payload_files_biospecimen )
ch_versions = ch_versions.mix(VALIDATION_METADATA.out.versions.first())

// Step 2: MD5 checksum cross-check validation
// Update meta.status based on metadata validation result before passing to crosscheck
ch_crosscheck_input = VALIDATION_METADATA.out.ch_payload_files
.join(VALIDATION_METADATA.out.status, by: 0)
.map { meta, payload, payload_files, status_file ->
// Read status from YAML file and update meta
def status_content = status_file.text
def status_value = status_content.contains('status: "FAILED"') ? 'failed' : 'pass'
def updated_meta = meta.clone()
updated_meta.status = status_value
[updated_meta, payload, payload_files]
}

VALIDATION_CROSSCHECK ( ch_crosscheck_input )
ch_versions = ch_versions.mix(VALIDATION_CROSSCHECK.out.versions.first())

// Step 3: Validate file integrity (BAM, CRAM, VCF, FASTQ format checks)
// Step 2: Validate file integrity (BAM, CRAM, VCF, FASTQ format checks)
// Update meta.status based on crosscheck validation result before passing to file integrity
ch_integrity_input = VALIDATION_CROSSCHECK.out.ch_payload_files
.join(VALIDATION_CROSSCHECK.out.status, by: 0)
ch_integrity_input = VALIDATION_METADATA.out.ch_payload_files
.join(VALIDATION_METADATA.out.status, by: 0)
.map { meta, payload, files, status_file ->
// Read status from YAML file and update meta
def status_content = status_file.text
Expand Down Expand Up @@ -108,9 +92,9 @@ workflow DATA_VALIDATION {
// Use original FILE_INTEGRITY status files since we only update meta.status internally
ch_all_status = Channel.empty()
.mix(VALIDATION_METADATA.out.status)
.mix(VALIDATION_CROSSCHECK.out.status)
.mix(FILE_INTEGRITY.out.status)


emit:
// Output channels as specified
validated_payload_files = ch_validated_payload_files // channel: [ val(meta), payload(json), [files] ]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
studyId submitter_analysis_id analysisType submitter_participant_id submitter_specimen_id submitter_sample_id submitter_experiment_id data_category variant_class variant_calling_strategy genome_build genome_annotation
PCGLST0003 analysis_001 sequenceExperiment PART_001 SPEC_001 SAMPLE_001 EXP_001 Genomics
PCGLST0003 analysis_002 sequenceAlignment PART_002 SPEC_002 SAMPLE_002 EXP_002 Genomics GRCh38
PCGLST0003 analysis_003 variantCall PART_003 SPEC_003 SAMPLE_003 EXP_003 Genomics Germline Single sample GRCh38 GENCODE v38
PCGLST0003 analysis_004 sequenceAlignment PART_004 SPEC_004 SAMPLE_004 EXP_004 Genomics GRCh38 GENCODE v38
PCGLST0003 analysis_005 sequenceExperiment PART_004 SPEC_004 SAMPLE_004 EXP_005 Genomics GRCh38 GENCODE v38
PCGLST0003 analysis_006 sequenceAlignment PART_004 SPEC_004 SAMPLE_004 EXP_004 Genomics GRCh38 GENCODE v38
9 changes: 9 additions & 0 deletions tests/test_data/analysis_meta/md5_fileSize_file_metadata.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
submitter_analysis_id fileName fileSize fileMd5sum fileType fileAccess dataType
analysis_001 test_file_001_1.bam 6 3ffac00f58862a7c06a705795e7ac26a BAM controlled Aligned reads
analysis_001 test_file_001_2.bam 69 39744760b92b097738ba4758961e0121 BAM controlled Aligned reads
analysis_002 test_file_002_1.bam 74 b3fa249fdcbbe7de479c4394132eb4cc BAM controlled Aligned reads
analysis_003 test_file_003_1.vcf 124 02b4359d79feb76d67b0ea66fbbc2c86 VCF controlled Single Nucleotide Variants (SNVs)
analysis_004 test_rg_3.v2.bam BAM controlled Aligned reads
analysis_005 test_sample_R1.fastq.gz 85 a1505aad2f04bd9594f43745ee5ee4a5 FASTQ controlled Raw Sequencing Reads
analysis_005 test_sample_R2.fastq.gz 87 38a41f5dbf5f1e1e09fb3024555ede4e FASTQ controlled Raw Sequencing Reads
analysis_006 test_file1.bam 311 02b4359d79feb76d67b0ea66fbbc2c86 BAM controlled Aligned reads
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
submitter_workflow_id submitter_analysis_id workflow_name workflow_version workflow_url
WF_001 analysis_001 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_002 analysis_002 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_003 analysis_003 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_004 analysis_004 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
WF_006 analysis_006 molecular-data-submission-workflow 1.0.0 https://github.com/Pan-Canadian-Genome-Library/molecular-data-submission-workflow
6 changes: 6 additions & 0 deletions tests/test_data/biospecimen/md5_fileSize_experiment.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
submitter_experiment_id submitter_sample_id experiment_type experiment_design assay_type_code assay_type_term platform instrument instrument_metadata sequencing_protocol
EXP_001 SAMPLE_001 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0000759 (Illumina) HiSeq 2500 v4 TruSeq DNA PCR-Free
EXP_002 SAMPLE_002 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0000759 (Illumina) HiSeq 2500 v4 TruSeq DNA PCR-Free
EXP_003 SAMPLE_003 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0002630 (Illumina NovaSeq 6000) NovaSeq 6000 S4 TruSeq DNA PCR-Free
EXP_004 SAMPLE_004 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0002630 (Illumina NovaSeq 6000) NovaSeq 6000 S4 TruSeq DNA PCR-Free
EXP_005 SAMPLE_004 NCIT:C84343 (Genomics) OBI:0002117 Whole genome sequencing ILLUMINA OBI:0002630 (Illumina NovaSeq 6000) NovaSeq 6000 S4 TruSeq DNA PCR-Free
5 changes: 5 additions & 0 deletions tests/test_data/biospecimen/md5_fileSize_sample.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
submitter_sample_id submitter_specimen_id molecule_type_code molecule_type_term sample_status
SAMPLE_001 SPEC_001 NCIT:C449 Genomic DNA Case
SAMPLE_002 SPEC_002 NCIT:C449 Genomic DNA Case
SAMPLE_003 SPEC_003 NCIT:C449 Genomic DNA Case
SAMPLE_004 SPEC_004 NCIT:C449 Genomic DNA Case
5 changes: 5 additions & 0 deletions tests/test_data/biospecimen/md5_fileSize_specimen.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
submitter_participant_id submitter_specimen_id specimen_tissue_source_code specimen_tissue_source_term specimen_storage specimen_processing age_at_specimen_collection specimen_anatomic_location_code specimen_anatomic_location_term specimen_laterality
PART_001 SPEC_001 NCIT:C12434 Blood Frozen in -70 freezer Fresh 45 ICDO3:C50.9 Breast NOS Left
PART_002 SPEC_002 NCIT:C164014 Solid Tissue Sample Frozen in -70 freezer Fresh 52 ICDO3:C78.0 Secondary malignant neoplasm of lung Right
PART_003 SPEC_003 NCIT:C164014 Solid Tissue Sample Paraffin block Formalin fixed - buffered 38 ICDO3:C25.9 Pancreas NOS Not applicable
PART_004 SPEC_004 NCIT:C164014 Solid Tissue Sample Frozen in -70 freezer Fresh 60 ICDO3:C18.9 Colon NOS Not applicable
Loading