@@ -1049,6 +1049,9 @@ def inspect(path):
1049
1049
return obj .summary_table ()
1050
1050
1051
1051
1052
+ DEFAULT_ZARR_COMPRESSOR = numcodecs .Blosc (cname = "zstd" , clevel = 7 )
1053
+
1054
+
1052
1055
@dataclasses .dataclass
1053
1056
class ZarrColumnSpec :
1054
1057
name : str
@@ -1058,20 +1061,46 @@ class ZarrColumnSpec:
1058
1061
dimensions : list
1059
1062
description : str
1060
1063
vcf_field : str
1061
- compressor : dict
1064
+ compressor : dict = None
1065
+ filters : list = None
1062
1066
# TODO add filters
1063
1067
1064
1068
def __post_init__ (self ):
1065
1069
self .shape = tuple (self .shape )
1066
1070
self .chunks = tuple (self .chunks )
1067
1071
self .dimensions = tuple (self .dimensions )
1072
+ self .compressor = DEFAULT_ZARR_COMPRESSOR .get_config ()
1073
+ self .filters = []
1074
+ self ._choose_compressor_settings ()
1075
+
1076
+ def _choose_compressor_settings (self ):
1077
+ """
1078
+ Choose compressor and filter settings based on the size and
1079
+ type of the array, plus some hueristics from observed properties
1080
+ of VCFs.
1081
+
1082
+ See https://github.com/pystatgen/bio2zarr/discussions/74
1083
+ """
1084
+ dt = np .dtype (self .dtype )
1085
+ # Default is to not shuffle, because autoshuffle isn't recognised
1086
+ # by many Zarr implementations, and shuffling can lead to worse
1087
+ # performance in some cases anyway. Turning on shuffle should be a
1088
+ # deliberate choice.
1089
+ shuffle = numcodecs .Blosc .NOSHUFFLE
1090
+ if dt .itemsize == 1 :
1091
+ # Any 1 byte field gets BITSHUFFLE by default
1092
+ shuffle = numcodecs .Blosc .BITSHUFFLE
1093
+ self .compressor ["shuffle" ] = shuffle
1094
+
1095
+ if dt .name == "bool" :
1096
+ self .filters .append (numcodecs .PackBits ().get_config ())
1068
1097
1069
1098
1070
1099
ZARR_SCHEMA_FORMAT_VERSION = "0.2"
1071
1100
1072
1101
1073
1102
@dataclasses .dataclass
1074
- class ZarrConversionSpec :
1103
+ class VcfZarrSchema :
1075
1104
format_version : str
1076
1105
samples_chunk_size : int
1077
1106
variants_chunk_size : int
@@ -1095,15 +1124,15 @@ def fromdict(d):
1095
1124
"Zarr schema format version mismatch: "
1096
1125
f"{ d ['format_version' ]} != { ZARR_SCHEMA_FORMAT_VERSION } "
1097
1126
)
1098
- ret = ZarrConversionSpec (** d )
1127
+ ret = VcfZarrSchema (** d )
1099
1128
ret .columns = {
1100
1129
key : ZarrColumnSpec (** value ) for key , value in d ["columns" ].items ()
1101
1130
}
1102
1131
return ret
1103
1132
1104
1133
@staticmethod
1105
1134
def fromjson (s ):
1106
- return ZarrConversionSpec .fromdict (json .loads (s ))
1135
+ return VcfZarrSchema .fromdict (json .loads (s ))
1107
1136
1108
1137
@staticmethod
1109
1138
def generate (pcvcf , variants_chunk_size = None , samples_chunk_size = None ):
@@ -1117,7 +1146,6 @@ def generate(pcvcf, variants_chunk_size=None, samples_chunk_size=None):
1117
1146
logger .info (
1118
1147
f"Generating schema with chunks={ variants_chunk_size , samples_chunk_size } "
1119
1148
)
1120
- compressor = core .default_compressor .get_config ()
1121
1149
1122
1150
def fixed_field_spec (
1123
1151
name , dtype , vcf_field = None , shape = (m ,), dimensions = ("variants" ,)
@@ -1130,7 +1158,6 @@ def fixed_field_spec(
1130
1158
description = "" ,
1131
1159
dimensions = dimensions ,
1132
1160
chunks = [variants_chunk_size ],
1133
- compressor = compressor ,
1134
1161
)
1135
1162
1136
1163
alt_col = pcvcf .columns ["ALT" ]
@@ -1206,7 +1233,6 @@ def fixed_field_spec(
1206
1233
chunks = chunks ,
1207
1234
dimensions = dimensions ,
1208
1235
description = field .description ,
1209
- compressor = compressor ,
1210
1236
)
1211
1237
colspecs .append (colspec )
1212
1238
@@ -1225,7 +1251,6 @@ def fixed_field_spec(
1225
1251
chunks = list (chunks ),
1226
1252
dimensions = list (dimensions ),
1227
1253
description = "" ,
1228
- compressor = compressor ,
1229
1254
)
1230
1255
)
1231
1256
shape += [ploidy ]
@@ -1239,7 +1264,6 @@ def fixed_field_spec(
1239
1264
chunks = list (chunks ),
1240
1265
dimensions = list (dimensions ),
1241
1266
description = "" ,
1242
- compressor = compressor ,
1243
1267
)
1244
1268
)
1245
1269
colspecs .append (
@@ -1251,11 +1275,10 @@ def fixed_field_spec(
1251
1275
chunks = list (chunks ),
1252
1276
dimensions = list (dimensions ),
1253
1277
description = "" ,
1254
- compressor = compressor ,
1255
1278
)
1256
1279
)
1257
1280
1258
- return ZarrConversionSpec (
1281
+ return VcfZarrSchema (
1259
1282
format_version = ZARR_SCHEMA_FORMAT_VERSION ,
1260
1283
samples_chunk_size = samples_chunk_size ,
1261
1284
variants_chunk_size = variants_chunk_size ,
@@ -1328,6 +1351,7 @@ def init_array(self, variable):
1328
1351
chunks = variable .chunks ,
1329
1352
dtype = variable .dtype ,
1330
1353
compressor = numcodecs .get_codec (variable .compressor ),
1354
+ filters = [numcodecs .get_codec (filt ) for filt in variable .filters ],
1331
1355
object_codec = object_codec ,
1332
1356
)
1333
1357
a .attrs ["_ARRAY_DIMENSIONS" ] = variable .dimensions
@@ -1446,7 +1470,7 @@ def encode_samples(self):
1446
1470
"sample_id" ,
1447
1471
self .schema .sample_id ,
1448
1472
dtype = "str" ,
1449
- compressor = core . default_compressor ,
1473
+ compressor = DEFAULT_ZARR_COMPRESSOR ,
1450
1474
chunks = (self .schema .samples_chunk_size ,),
1451
1475
)
1452
1476
array .attrs ["_ARRAY_DIMENSIONS" ] = ["samples" ]
@@ -1457,7 +1481,7 @@ def encode_contig_id(self):
1457
1481
"contig_id" ,
1458
1482
self .schema .contig_id ,
1459
1483
dtype = "str" ,
1460
- compressor = core . default_compressor ,
1484
+ compressor = DEFAULT_ZARR_COMPRESSOR ,
1461
1485
)
1462
1486
array .attrs ["_ARRAY_DIMENSIONS" ] = ["contigs" ]
1463
1487
if self .schema .contig_length is not None :
@@ -1474,7 +1498,7 @@ def encode_filter_id(self):
1474
1498
"filter_id" ,
1475
1499
self .schema .filter_id ,
1476
1500
dtype = "str" ,
1477
- compressor = core . default_compressor ,
1501
+ compressor = DEFAULT_ZARR_COMPRESSOR ,
1478
1502
)
1479
1503
array .attrs ["_ARRAY_DIMENSIONS" ] = ["filters" ]
1480
1504
return {v : j for j , v in enumerate (self .schema .filter_id )}
@@ -1647,7 +1671,7 @@ def service_completed_futures():
1647
1671
1648
1672
def mkschema (if_path , out ):
1649
1673
pcvcf = PickleChunkedVcf .load (if_path )
1650
- spec = ZarrConversionSpec .generate (pcvcf )
1674
+ spec = VcfZarrSchema .generate (pcvcf )
1651
1675
out .write (spec .asjson ())
1652
1676
1653
1677
@@ -1664,7 +1688,7 @@ def encode(
1664
1688
):
1665
1689
pcvcf = PickleChunkedVcf .load (if_path )
1666
1690
if schema_path is None :
1667
- schema = ZarrConversionSpec .generate (
1691
+ schema = VcfZarrSchema .generate (
1668
1692
pcvcf ,
1669
1693
variants_chunk_size = variants_chunk_size ,
1670
1694
samples_chunk_size = samples_chunk_size ,
@@ -1674,7 +1698,7 @@ def encode(
1674
1698
if variants_chunk_size is not None or samples_chunk_size is not None :
1675
1699
raise ValueError ("Cannot specify schema along with chunk sizes" )
1676
1700
with open (schema_path , "r" ) as f :
1677
- schema = ZarrConversionSpec .fromjson (f .read ())
1701
+ schema = VcfZarrSchema .fromjson (f .read ())
1678
1702
zarr_path = pathlib .Path (zarr_path )
1679
1703
if zarr_path .exists ():
1680
1704
logger .warning (f"Deleting existing { zarr_path } " )
0 commit comments