Skip to content

Commit acc659a

Browse files
committed
ENH: Add writer for Siemens CSA header
Allows us to take a parsed CSA header and convert it back into a string. Useful for things like DICOM anonymization, or perhaps round tripping DICOM -> Nifti -> DICOM.
1 parent 65d5fc6 commit acc659a

File tree

2 files changed

+121
-0
lines changed

2 files changed

+121
-0
lines changed

nibabel/nicom/csareader.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
'''
44
import numpy as np
5+
import struct
56

67
from .structreader import Unpacker
78
from .utils import find_private_section
@@ -29,6 +30,10 @@ class CSAReadError(CSAError):
2930
pass
3031

3132

33+
class CSAWriteError(CSAError):
34+
pass
35+
36+
3237
def get_csa_header(dcm_data, csa_type='image'):
3338
''' Get CSA header information from DICOM header
3439
@@ -162,6 +167,96 @@ def read(csa_str):
162167
return csa_dict
163168

164169

170+
def write(csa_header):
171+
''' Write string from CSA header `csa_header`
172+
173+
Parameters
174+
----------
175+
csa_header : dict
176+
header information as dict, where `header` has fields (at least)
177+
``type, n_tags, tags``. ``header['tags']`` is also a dictionary
178+
with one key, value pair for each tag in the header.
179+
180+
Returns
181+
-------
182+
csa_str : str
183+
byte string containing CSA header information
184+
'''
185+
result = []
186+
if csa_header['type'] == 2:
187+
result.append(b'SV10')
188+
result.append(csa_header['unused0'])
189+
if not 0 < csa_header['n_tags'] <= 128:
190+
raise CSAWriteError('Number of tags `t` should be '
191+
'0 < t <= 128')
192+
result.append(struct.pack('2I',
193+
csa_header['n_tags'],
194+
csa_header['check'])
195+
)
196+
197+
# Build list of tags in correct order
198+
tags = list(csa_header['tags'].items())
199+
tags.sort(key=lambda x: x[1]['tag_no'])
200+
tag0_n_items = tags[0][1]['n_items']
201+
202+
# Add the information for each tag
203+
for tag_name, tag_dict in tags:
204+
vm = tag_dict['vm']
205+
vr = tag_dict['vr']
206+
n_items = tag_dict['n_items']
207+
assert n_items < 100
208+
result.append(struct.pack('64si4s3i',
209+
make_nt_str(tag_name),
210+
vm,
211+
make_nt_str(vr),
212+
tag_dict['syngodt'],
213+
n_items,
214+
tag_dict['last3'])
215+
)
216+
217+
# Figure out the number of values for this tag
218+
if vm == 0:
219+
n_values = n_items
220+
else:
221+
n_values = vm
222+
223+
# Add each item for this tag
224+
for item_no in range(n_items):
225+
# Figure out the item length
226+
if item_no >= n_values or tag_dict['items'][item_no] == '':
227+
item_len = 0
228+
else:
229+
item = tag_dict['items'][item_no]
230+
if not isinstance(item, str):
231+
item = str(item)
232+
item_nt_str = make_nt_str(item)
233+
item_len = len(item_nt_str)
234+
235+
# These values aren't actually preserved in the dict
236+
# representation of the header. Best we can do is set the ones
237+
# that determine the item length appropriately.
238+
x0, x1, x2, x3 = 0, 0, 0, 0
239+
if csa_header['type'] == 1: # CSA1 - odd length calculation
240+
x0 = tag0_n_items + item_len
241+
if item_len < 0 or (ptr + item_len) > csa_len:
242+
if item_no < vm:
243+
items.append('')
244+
break
245+
else: # CSA2
246+
x1 = item_len
247+
result.append(struct.pack('4i', x0, x1, x2, x3))
248+
249+
if item_len == 0:
250+
continue
251+
252+
result.append(item_nt_str)
253+
# go to 4 byte boundary
254+
plus4 = item_len % 4
255+
if plus4 != 0:
256+
result.append(b'\x00' * (4 - plus4))
257+
return b''.join(result)
258+
259+
165260
def get_scalar(csa_dict, tag_name):
166261
try:
167262
items = csa_dict['tags'][tag_name]['items']
@@ -259,3 +354,18 @@ def nt_str(s):
259354
if zero_pos == -1:
260355
return s
261356
return s[:zero_pos].decode('latin-1')
357+
358+
359+
def make_nt_str(s):
360+
''' Create a null terminated byte string from a unicode object.
361+
362+
Parameters
363+
----------
364+
s : unicode
365+
366+
Returns
367+
-------
368+
result : bytes
369+
s encoded as latin-1 with a null char appended
370+
'''
371+
return s.encode('latin-1') + b'\x00'

nibabel/nicom/tests/test_csareader.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,14 @@ def test_missing_csa_elem():
136136
del dcm[csa_tag]
137137
hdr = csa.get_csa_header(dcm, 'image')
138138
assert hdr is None
139+
140+
141+
def test_read_write_rt():
142+
# Try doing a read-write-read round trip and make sure the dictionary
143+
# representation of the header is the same. We can't exactly reproduce the
144+
# original string representation currently.
145+
for csa_str in (CSA2_B0, CSA2_B1000):
146+
csa_info = csa.read(csa_str)
147+
new_csa_str = csa.write(csa_info)
148+
new_csa_info = csa.read(new_csa_str)
149+
assert csa_info == new_csa_info

0 commit comments

Comments
 (0)