1+ .rs.restartR()
12library(tidyverse )
23library(targets )
34library(glue )
45
56result_directory = " /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024"
67
8+
79tar_script(
810 {
911
@@ -27,19 +29,16 @@ tar_script(
2729 " glue" , " qs" , " purrr" , " tidybulk" , " tidySummarizedExperiment" , " crew" , " magrittr" , " digest" , " readr" , " forcats"
2830 ),
2931
30- garbage_collection = TRUE ,
31- # trust_object_timestamps = TRUE,
32- memory = " transient" ,
33- storage = " worker" ,
34- retrieval = " worker" ,
35- # error = "continue",
32+ memory = " transient" ,
33+ garbage_collection = 100 ,
34+ storage = " worker" ,
35+ retrieval = " worker" ,
36+ error = " continue" ,
37+ # debug = "dataset_id_sce_b5312463451d7ee3",
38+ cue = tar_cue(mode = " never" ),
3639 format = " qs" ,
37-
38- # Set the target you want to debug.
39- # debug = "metadata_sample",
4040
41- cue = tar_cue(mode = " never" ) ,
42-
41+
4342 # -----------------------#
4443 # SLURM
4544 # -----------------------#
@@ -227,7 +226,8 @@ tar_script(
227226 " assay" ,
228227 " experiment___" ,
229228 " disease" ,
230- " run_from_cell_id"
229+ " run_from_cell_id" ,
230+ " is_primary_data"
231231 ), na.rm = TRUE , sep = " ___" , remove = F ) | >
232232
233233
@@ -246,12 +246,20 @@ tar_script(
246246
247247 cache.path = " /vast/scratch/users/mangiola.s/cellxgenedp"
248248 dir.create(cache.path , recursive = TRUE , showWarnings = FALSE )
249-
249+
250250 h5_path = .x | > files_download(dry.run = FALSE , cache.path = cache.path )
251251
252252 sce =
253253 h5_path | >
254- readH5AD(use_hdf5 = TRUE , obs = FALSE , raw = FALSE , skip_assays = TRUE , layers = FALSE , reader = " R" )
254+ readH5AD(use_hdf5 = TRUE , raw = FALSE , skip_assays = TRUE , layers = FALSE , reader = " R" )
255+
256+
257+ if (is.null(sce ) || ! " donor_id" %in% colnames(colData(sce )))
258+ sce =
259+ h5_path | >
260+ readH5AD(use_hdf5 = TRUE , raw = FALSE , skip_assays = TRUE , layers = FALSE )
261+
262+
255263
256264 file.remove(h5_path )
257265
@@ -297,7 +305,7 @@ tar_script(
297305 # select(sample_, any_of(sample_column_to_preserve)) |>
298306 # distinct()
299307
300-
308+
301309
302310 metadata
303311 }
@@ -312,7 +320,7 @@ tar_script(
312320 colnames()
313321
314322 # Select only sample_ columns
315- metadata | >
323+ metadata | >
316324 select(sample_ , any_of(sample_column_to_preserve )) | >
317325 distinct()
318326
@@ -339,7 +347,7 @@ tar_script(
339347 ) | >
340348 group_split(dataset_id ),
341349 iteration = " list" ,
342- resources = tar_resources(crew = tar_resources_crew(" slurm_1_20 " ))
350+ resources = tar_resources(crew = tar_resources_crew(" slurm_1_80 " ))
343351 ),
344352
345353 # Get SCE SMALL
@@ -348,9 +356,11 @@ tar_script(
348356 get_metadata(files_dataset_id ),
349357 pattern = map(files_dataset_id ),
350358 iteration = " list" ,
351- resources = tar_resources(crew = tar_resources_crew(" slurm_1_20" ))
359+ resources = tar_resources(crew = tar_resources_crew(" slurm_1_20" )),
360+ deployment = " main"
352361 ),
353362
363+ # select column that are present in half of the datasets at least, so the common column
354364 tar_target(
355365 common_columns ,
356366 metadata_dataset_id | >
@@ -365,20 +375,37 @@ tar_script(
365375 tar_target(
366376 metadata_dataset_id_common_sample_columns ,
367377 metadata_dataset_id | >
378+
379+ # Only get primary data
380+ # filter(is_primary_data=="TRUE") |>
381+
368382 mutate(cell_ = as.character(cell_ )) | >
369383 select(any_of(common_columns )) | >
384+
385+ # Drop some clearly cell-wise columns
386+ select(- any_of(c(" observation_joinid" , " cell_" )), - contains(" cell_type" )) | >
387+
370388 select_sample_columns(),
371389 pattern = map(metadata_dataset_id ),
372- resources = tar_resources(crew = tar_resources_crew(" slurm_1_20 " ) )
390+ resources = tar_resources(crew = tar_resources_crew(" slurm_1_80 " ) )
373391 ),
374392
375393 tar_target(
376394 metadata_dataset_id_cell_to_sample_mapping ,
377395 metadata_dataset_id | >
378- mutate(cell_ = as.character(cell_ )) | >
379- select(cell_ , sample_ , donor_id ),
396+
397+ # Only get primary data
398+ # filter(is_primary_data=="TRUE") |>
399+
400+ mutate(
401+ cell_ = as.character(cell_ ),
402+ observation_joinid = as.character(observation_joinid )
403+ ) | >
404+ # select(cell_, observation_joinid, sample_, donor_id),
405+ select(observation_joinid , cell_ , sample_ , donor_id , dataset_id , is_primary_data , sample_heuristic , cell_type , cell_type_ontology_term_id ),
380406 pattern = map(metadata_dataset_id ),
381- resources = tar_resources(crew = tar_resources_crew(" slurm_1_20" ))
407+ resources = tar_resources(crew = tar_resources_crew(" slurm_1_80" )),
408+ deployment = " main"
382409 )
383410
384411 )
@@ -389,19 +416,162 @@ tar_script(
389416 script = glue(" {result_directory}/_targets.R" )
390417)
391418
419+ job :: job({
420+
421+ tar_make(
422+ # callr_function = NULL,
423+ reporter = " summary" ,
424+ script = glue(" {result_directory}/_targets.R" ),
425+ store = glue(" {result_directory}/_targets" )
426+ )
427+
428+ })
392429
393- tar_make(
394- # callr_function = NULL,
395- reporter = " verbose_positives" ,
396- script = glue(" {result_directory}/_targets.R" ),
397- store = glue(" {result_directory}/_targets" )
398- )
399430
400- # Sample metadata
401- tar_read(metadata_dataset_id_common_sample_columns , store = glue(" {result_directory}/_targets" ))
431+ library(arrow )
432+ library(dplyr )
433+ library(duckdb )
434+
435+ # # Sample metadata
436+ # tar_read(metadata_dataset_id_common_sample_columns, store = glue("{result_directory}/_targets")) |>
437+ # write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/sample_metadata.parquet")
438+
439+ # # Sample to cell link
440+ # tar_read(metadata_dataset_id_cell_to_sample_mapping, store = glue("{result_directory}/_targets")) |>
441+ # write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_ids_for_metadata.parquet")
442+
443+
444+
445+
446+ # sample_metadata <- tbl(
447+ # dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
448+ # sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/sample_metadata.parquet')")
449+ # )
450+ # #
451+ # cell_ids_for_metadata <- tbl(
452+ # dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
453+ # sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_ids_for_metadata.parquet')")
454+ # )
455+ #
456+ # cell_to_refined_sample_from_Mengyuan <- tbl(
457+ # dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
458+ # sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/census_samples_to_download.parquet')")
459+ # ) |>
460+ # select(cell_, observation_joinid, dataset_id, sample_id = sample_2) |>
461+ #
462+
463+ #
464+ write_parquet_to_parquet = function (data_tbl , output_parquet , compression = " gzip" ) {
465+
466+ # Establish connection to DuckDB in-memory database
467+ con_write <- dbConnect(duckdb :: duckdb(), dbdir = " :memory:" )
468+
469+ # Register `data_tbl` within the DuckDB connection (this doesn't load it into memory)
470+ duckdb :: duckdb_register(con_write , " data_tbl_view" , data_tbl )
471+
472+ # Use DuckDB's COPY command to write `data_tbl` directly to Parquet with compression
473+ copy_query <- paste0("
474+ COPY data_tbl_view TO '" , output_parquet , " ' (FORMAT PARQUET, COMPRESSION '" , compression , " ');
475+ " )
476+
477+ # Execute the COPY command
478+ dbExecute(con_write , copy_query )
479+
480+ # Unregister the temporary view
481+ duckdb :: duckdb_unregister(con_write , " data_tbl_view" )
482+
483+ # Disconnect from the database
484+ dbDisconnect(con_write , shutdown = TRUE )
485+ }
486+
487+ # Establish a connection to DuckDB in memory
488+ con <- dbConnect(duckdb :: duckdb(), dbdir = " :memory:" )
489+
490+ # Create views for each of the datasets in DuckDB
491+ dbExecute(con , "
492+ CREATE VIEW cell_to_refined_sample_from_Mengyuan AS
493+ SELECT cell_, observation_joinid, dataset_id, sample_2 AS sample_id, cell_type, cell_type_ontology_term_id
494+ FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/census_samples_to_download.parquet')
495+ " )
496+
497+ dbExecute(con , "
498+ CREATE VIEW cell_ids_for_metadata AS
499+ SELECT cell_, observation_joinid, sample_, donor_id, dataset_id FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_ids_for_metadata.parquet')
500+ " )
501+
502+ dbExecute(con , "
503+ CREATE VIEW sample_metadata AS
504+ SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/sample_metadata.parquet')
505+ " )
506+
507+
508+
509+ # Perform joins within DuckDB
510+ copy_query <- "
511+ COPY (
512+ SELECT *
513+ FROM cell_to_refined_sample_from_Mengyuan
514+
515+ LEFT JOIN cell_ids_for_metadata
516+ ON cell_ids_for_metadata.cell_ = cell_to_refined_sample_from_Mengyuan.cell_
517+ AND cell_ids_for_metadata.observation_joinid = cell_to_refined_sample_from_Mengyuan.observation_joinid
518+ AND cell_ids_for_metadata.dataset_id = cell_to_refined_sample_from_Mengyuan.dataset_id
519+
520+ LEFT JOIN sample_metadata
521+ ON cell_ids_for_metadata.sample_ = sample_metadata.sample_
522+ AND cell_ids_for_metadata.donor_id = sample_metadata.donor_id
523+ AND cell_ids_for_metadata.dataset_id = sample_metadata.dataset_id
524+
525+ ) TO '/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet'
526+ (FORMAT PARQUET, COMPRESSION 'gzip');
527+ "
528+
529+ # Execute the final query to write the result to a Parquet file
530+ dbExecute(con , copy_query )
531+
532+ # Disconnect from the database
533+ dbDisconnect(con , shutdown = TRUE )
534+
535+ # cell_metadata <- tbl(
536+ # dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
537+ # sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet')")
538+ # )
539+ #
540+ # system("~/bin/rclone copy /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/")
541+
542+
543+
544+ # Get Dharmesh metadata consensus
545+ # system("~/bin/rclone copy box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/consensus_output_new.parquet /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/ ")
546+ # system("~/bin/rclone copy box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/data_driven_consensus_new.parquet /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/ ")
547+ # system("~/bin/rclone copy box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/data_driven_consensus.parquet /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/ ")
548+
549+ # Non immune harmonisation to Dharmesh immune harmonisation
550+ non_immune_harmonisation =
551+ read_csv(" /vast/projects/mangiola_immune_map/PostDoc/CuratedAtlasQueryR/dev/cell_type_harmonisation_non_immune.csv" )
552+
553+ system(" ~/bin/rclone copy /vast/projects/mangiola_immune_map/PostDoc/CuratedAtlasQueryR/dev/cell_type_harmonisation_non_immune.csv box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/" )
554+
555+
556+ tbl(
557+ dbConnect(duckdb :: duckdb(), dbdir = " :memory:" ),
558+ sql(" SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/consensus_output_new.parquet')" )
559+ ) | >
560+
561+ # Add non immune harmonisation to Dharmesh immune harmonisation
562+ mutate(is_immune = reannotation_consensus == " non immune" ) | >
563+ left_join(non_immune_harmonisation , copy = TRUE
564+ ) | >
565+ mutate(reannotation_consensus = case_when(reannotation_consensus == " non immune" ~ non_immune_harmonised_cell_type , TRUE ~ reannotation_consensus )) | >
566+ select(- non_immune_harmonised_cell_type ) | >
567+ write_parquet_to_parquet(" /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/consensus_output_new_plus_non_immune_harmonisation.parquet" )
568+
569+
570+ annotation_with_harmonised <- tbl(
571+ dbConnect(duckdb :: duckdb(), dbdir = " :memory:" ),
572+ sql(" SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/consensus_output_new_plus_non_immune_harmonisation.parquet')" )
573+ )
402574
403- # Sample to cell link
404- tar_read(metadata_dataset_id_cell_to_sample_mapping , store = glue(" {result_directory}/_targets" ))
405575
406576#
407577# # a test to see whether donor ID is present in the new metadata
0 commit comments