Skip to content

Commit 2a4dfe4

Browse files
committed
refactor: enhance validation logic for UNB segments and introduce detailed schema checks in segment definitions
1 parent d8a4dc9 commit 2a4dfe4

File tree

2 files changed

+211
-120
lines changed

2 files changed

+211
-120
lines changed

pydifact/segmentcollection.py

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from collections.abc import Callable, Iterable, Iterator, Sequence
2626
from typing import Type, TypeVar
2727

28-
from pydifact.exceptions import EDISyntaxError
28+
from pydifact.exceptions import EDISyntaxError, ValidationError
2929
from pydifact.control import Characters
3030
from pydifact.parser import Parser
3131
from pydifact.segments import Segment
@@ -284,6 +284,9 @@ class RawSegmentCollection(AbstractSegmentsContainer):
284284
If you are handling an Interchange or a Message, you may want to prefer
285285
those classes to RawSegmentCollection, as they offer more features and
286286
checks.
287+
288+
There are no header and footer segments in this collection, and validation is
289+
omitted.
287290
"""
288291

289292
def get_header_segment(self) -> Segment | None:
@@ -513,33 +516,73 @@ def from_segments(
513516
unb = first_segment
514517
else:
515518
raise EDISyntaxError("An interchange must start with UNB or UNA and UNB")
516-
# Loosy syntax check :
517-
if len(unb.elements) < 4:
518-
raise EDISyntaxError("Missing elements in UNB header")
519519

520-
# In syntax version 3 the year is formatted using two digits, while in version 4 four digits are used.
520+
# extract syntax identifier to know which version to validate against
521+
if (
522+
not isinstance(unb.elements[0], list)
523+
or len(unb.elements[0]) < 2
524+
or not unb.elements[0][1].isdecimal()
525+
):
526+
raise EDISyntaxError("Syntax identifier malformed.")
527+
528+
syntax_version = unb.elements[0][1]
529+
530+
# Validate UNB segment according to the applicable syntax version
531+
try:
532+
unb.validate(syntax_version=syntax_version, directory="")
533+
except (ValidationError, FileNotFoundError) as e:
534+
raise EDISyntaxError(f"Invalid UNB header: {e}") from e
535+
536+
# In syntax version 3 and earlier the year is formatted using two digits,
537+
# while in version 4 four digits are used.
521538
# Since some EDIFACT files in the wild don't adhere to this specification, we just use whatever format seems
522539
# more appropriate according to the length of the date string.
523-
if isinstance(unb.elements[3], list) and len(unb.elements[3]) > 0:
524-
if len(unb.elements[3][0]) == 6:
525-
datetime_fmt = "%y%m%d-%H%M"
526-
elif len(unb.elements[3][0]) == 8:
527-
datetime_fmt = "%Y%m%d-%H%M"
528-
else:
529-
raise EDISyntaxError("Timestamp of file-creation malformed.")
530-
else:
531-
raise EDISyntaxError("Timestamp of file-creation malformed.")
532-
540+
# Element 3 of UNB is the date/time of preparation.
541+
# In syntax v4 it's a composite S004 (0017 date, 0019 time)
542+
# In syntax v3 it's also a composite S004 (0017 date, 0019 time)
543+
# Note that Segment.elements might contain strings or lists of strings.
544+
preparation_datetime = unb.elements[3]
533545
if (
534-
isinstance(unb.elements[0], list)
535-
and len(unb.elements[0]) == 2
536-
and unb.elements[0][1].isdecimal()
546+
isinstance(preparation_datetime, (list, tuple))
547+
and len(preparation_datetime) > 0
537548
):
538-
syntax_identifier = (unb.elements[0][0], int(unb.elements[0][1]))
549+
date_str = preparation_datetime[0]
550+
if len(date_str) == 6:
551+
datetime_fmt = "%y%m%d"
552+
elif len(date_str) == 8:
553+
datetime_fmt = "%Y%m%d"
554+
else:
555+
raise EDISyntaxError(
556+
f"Timestamp of file-creation malformed: {date_str}"
557+
)
558+
559+
if len(preparation_datetime) > 1:
560+
time_str = preparation_datetime[1]
561+
datetime_fmt += "-%H%M"
562+
datetime_str = f"{date_str}-{time_str}"
563+
else:
564+
datetime_str = date_str
565+
elif isinstance(preparation_datetime, str) and preparation_datetime:
566+
# Fallback if it's not a composite but a single string
567+
if len(preparation_datetime) == 6:
568+
datetime_fmt = "%y%m%d"
569+
datetime_str = preparation_datetime
570+
elif len(preparation_datetime) == 8:
571+
datetime_fmt = "%Y%m%d"
572+
datetime_str = preparation_datetime
573+
elif len(preparation_datetime) == 10:
574+
datetime_fmt = "%y%m%d%H%M"
575+
datetime_str = preparation_datetime
576+
elif len(preparation_datetime) == 12:
577+
datetime_fmt = "%Y%m%d%H%M"
578+
datetime_str = preparation_datetime
579+
else:
580+
raise EDISyntaxError(
581+
f"Timestamp of file-creation malformed: {preparation_datetime}"
582+
)
539583
else:
540-
raise EDISyntaxError("Syntax identifier malformed.")
541-
542-
datetime_str = "-".join(unb.elements[3])
584+
raise EDISyntaxError("Timestamp of file-creation malformed.")
585+
syntax_identifier = (unb.elements[0][0], int(unb.elements[0][1]))
543586
timestamp = datetime.datetime.strptime(datetime_str, datetime_fmt)
544587
interchange = Interchange(
545588
syntax_identifier=syntax_identifier,

pydifact/segments.py

Lines changed: 146 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -188,114 +188,162 @@ def validate(self, syntax_version: str, directory: str) -> None:
188188
if not directory and self.tag in service_segments:
189189
directory = f"service/v{release_version}"
190190

191-
if directory:
192-
try:
193-
# load segments xml (or cache it)
194-
xml_root = _load_segments_xml(directory)
195-
196-
# Find the segment definition in XML
197-
segment_def = xml_root.find(f".//segment[@id='{self.tag}']")
198-
199-
if segment_def is None:
200-
logger.warning(f"No definition found for segment {self.tag}")
201-
else:
202-
# Validate against XML schema
203-
204-
# get sub elements (data_element or composite_data_element)
205-
xml_elements = segment_def.findall("./*")
206-
# get count of required elements
207-
required_element_count = len(
208-
[
209-
e
210-
for e in xml_elements
211-
if e.get("required", "false").lower() == "true"
212-
]
191+
if not directory:
192+
# no directory given to compare against
193+
return
194+
195+
try:
196+
# load segments xml (or cache it)
197+
xml_root = _load_segments_xml(directory)
198+
199+
if self.tag == "UNA":
200+
# UNA is special
201+
return
202+
203+
# Find the segment definition in XML
204+
segment_def = xml_root.find(f".//segment[@id='{self.tag}']")
205+
206+
if segment_def is None:
207+
logger.warning(f"No definition found for segment {self.tag}")
208+
raise ValidationError(
209+
f"No definition found for segment "
210+
f"{self.tag} in directory {directory}."
211+
)
212+
213+
# Validate against XML schema
214+
# first get sub elements (data_element or composite_data_element)
215+
xml_elements = segment_def.findall("./*")
216+
# get count of required elements
217+
required_element_count = len(
218+
[
219+
e
220+
for e in xml_elements
221+
if e.get("required", "false").lower() == "true"
222+
]
223+
)
224+
225+
# check if we have less than the required number of elements
226+
# defined in XML
227+
if len(self.elements) < required_element_count:
228+
raise ValidationError(
229+
f"{self.tag}: Too few elements. Expected at least {required_element_count}, "
230+
f"got {len(self.elements)}"
231+
)
232+
233+
# check if we have more elements than defined in XML
234+
if len(self.elements) > len(xml_elements):
235+
raise ValidationError(
236+
f"{self.tag}: Too many elements. Expected {len(xml_elements)}, "
237+
f"got {len(self.elements)}: {self.elements}"
238+
)
239+
240+
for index, xml_element in enumerate(xml_elements):
241+
element = self.elements[index] if index < len(self.elements) else None
242+
is_mandatory = xml_element.get("required", "false").lower() == "true"
243+
# repeat = int(xml_element.get("repeat", "1")) # not used yet
244+
245+
if is_mandatory and (element is None or element == ""):
246+
raise ValidationError(
247+
f"{self.tag} Segment, pos. {index}: "
248+
f"element {xml_element.get('id')} ({xml_element.get('name')}) "
249+
f"is required."
213250
)
214251

215-
# check if we have less than the required number of elements
216-
# defined in XML
217-
if len(self.elements) < required_element_count:
218-
raise ValidationError(
219-
f"{self.tag}: Too few elements. Expected at least {required_element_count}, "
220-
f"got {len(self.elements)}"
221-
)
222-
223-
# check if we have more elements than defined in XML
224-
if len(self.elements) > len(xml_elements):
225-
raise ValidationError(
226-
f"{self.tag}: Too many elements. Expected {len(xml_elements)}, "
227-
f"got {len(self.elements)}: {self.elements}"
228-
)
229-
230-
for index, xml_element in enumerate(xml_elements):
231-
element = (
232-
self.elements[index] if index < len(self.elements) else None
233-
)
234-
is_mandatory = (
235-
xml_element.get("required", "false").lower() == "true"
236-
)
237-
# repeat = int(xml_element.get("repeat", "1")) # not used yet
238-
239-
if is_mandatory and (element is None or element == ""):
252+
if element:
253+
if xml_element.tag == "data_element":
254+
if not isinstance(element, str):
240255
raise ValidationError(
241256
f"{self.tag} Segment, pos. {index}: "
242257
f"element {xml_element.get('id')} ({xml_element.get('name')}) "
243-
f"is required."
258+
f"should be a simple data element, but got: "
259+
f"{element}"
244260
)
245-
246-
if element:
247-
if xml_element.tag == "composite_data_element":
248-
if not isinstance(element, (list, str)):
249-
raise ValidationError(
250-
f"{self.tag} Segment, pos. {index}: "
251-
f"Element {xml_element.get('id')} must be a "
252-
f"composite data element (list or str), "
253-
f"but got '{type(element).__name__}': '{element}'"
254-
)
255-
# TODO: validate internal structure of composite
256-
elif xml_element.tag == "data_element":
257-
if isinstance(element, list):
261+
repeat = xml_element.get("repeat", "")
262+
if not repeat.isdigit():
263+
logger.warning(
264+
"'repeat' attribute missing for "
265+
f"element {directory}."
266+
f"{xml_element.get('id')}"
267+
)
268+
repeat = "1"
269+
# TODO: validate repeats
270+
271+
# validate data element (length, type)
272+
# convert type and maxlength/minlength to repr string (e.g. "an..3")
273+
type_code = xml_element.get("type")
274+
length = int(xml_element.get("length", "0"))
275+
maxlength = int(xml_element.get("maxlength", "0"))
276+
match type_code:
277+
case "an":
278+
# no validation necessary, all is allowed.
279+
280+
# this is dangerous, as supposedly many EDIFACT
281+
# senders do not comply to standards and send all
282+
# types of chars...
283+
284+
# for char in element:
285+
# if not char.isalnum():
286+
# raise ValidationError(
287+
# f"{self.tag} Segment, pos. {index}: "
288+
# f"element {xml_element.get('id')} ({xml_element.get('name')}) "
289+
# f"contains invalid character: {char}"
290+
# )
291+
pass
292+
case "n":
293+
# make sure the element only consists of numbers
294+
if not element.strip().isdigit():
258295
raise ValidationError(
259296
f"{self.tag} Segment, pos. {index}: "
260297
f"element {xml_element.get('id')} ({xml_element.get('name')}) "
261-
f"must be a data element, but got a list:"
262-
f" {element}"
298+
f"should only contain numbers, but got: "
299+
f"{element}"
263300
)
264-
265-
# validate data element (length, type)
266-
# convert type and maxlength/minlength to repr string (e.g. "an..3")
267-
type_code = xml_element.get("type", "an")
268-
maxlength = xml_element.get("maxlength")
269-
minlength = xml_element.get("minlength")
270-
if maxlength:
271-
if minlength and minlength == maxlength:
272-
repr_str = f"{type_code}{maxlength}"
273-
else:
274-
repr_str = f"{type_code}..{maxlength}"
275-
276-
# we use the DataElement class from common.py to validate
277-
# but we don't want to create an instance if we don't have to
278-
# however, DataElement.validate is an instance method.
279-
# Since we have the value as a string (or Element), we can use it.
280-
from pydifact.syntax.common import DataElement
281-
282-
de = DataElement(str(element))
283-
# provide a fake code and title for better error messages
284-
de.code = xml_element.get("id")
285-
de.title = xml_element.get("name")
286-
de.validate(mandatory=is_mandatory, repr=repr_str)
287-
288-
except FileNotFoundError:
289-
warnings.warn(
290-
f"segments.xml not found for directory '{directory}'. "
291-
f"Falling back to schema-based validation.",
292-
category=MissingImplementationWarning,
293-
)
294-
except ET.ParseError as e:
295-
warnings.warn(
296-
f"Failed to parse segments.xml: {e}. ",
297-
category=MissingImplementationWarning,
298-
)
301+
case "a":
302+
# Data element can include any letters, special
303+
# characters, and control characters but no digits.
304+
# make sure all chars are in SYNTAX_CHARACTERS
305+
for char in element:
306+
if not char.isalpha():
307+
raise ValidationError(
308+
f"{self.tag} Segment, pos. {index}: "
309+
f"element {xml_element.get('id')} ({xml_element.get('name')}) "
310+
f"contains invalid character: {char}"
311+
)
312+
313+
if maxlength:
314+
if len(element) > maxlength:
315+
raise ValidationError(
316+
f"{self.tag} Segment, pos. {index}: "
317+
f"element {xml_element.get('id')} "
318+
f"({xml_element.get('name')}) "
319+
f"exceeds maximum length of {maxlength}: {element}"
320+
)
321+
elif length:
322+
if len(element) != length:
323+
raise ValidationError(
324+
f"{self.tag} Segment, pos. {index}: "
325+
f"element {xml_element.get('id')} "
326+
f"({xml_element.get('name')}) "
327+
f"should be {length} characters long, but is {len(
328+
element)}: {element}"
329+
)
330+
331+
except FileNotFoundError as e:
332+
warnings.warn(
333+
f"segments.xml not found for directory '{directory}'. "
334+
f"Falling back to schema-based validation.",
335+
category=MissingImplementationWarning,
336+
)
337+
if self.tag in service_segments:
338+
raise ValidationError(
339+
f"Schema for service segment {self.tag} not found "
340+
f"(directory '{directory}')"
341+
) from e
342+
except ET.ParseError as e:
343+
warnings.warn(
344+
f"Failed to parse segments.xml: {e}. ",
345+
category=MissingImplementationWarning,
346+
)
299347

300348

301349
class SegmentFactory:

0 commit comments

Comments
 (0)