Skip to content

Commit 850a7da

Browse files
committed
refactor: use state machine and a more readable parser for segment parsing
1 parent 49b3f61 commit 850a7da

File tree

1 file changed

+258
-26
lines changed

1 file changed

+258
-26
lines changed

scripts/generator/pydifact_generator.py

Lines changed: 258 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -544,32 +544,264 @@ def parse_composite_dir(text, only_code: str | None = None):
544544
# ------------------ Segment ------------------
545545

546546

547-
# def get_segment_desc(directory: str, segment_tag: str) -> str:
548-
# """Returns the URL for the provided segment code.
549-
#
550-
# Attributes:
551-
# directory: The service directory where the segment is found, e.g. "d11a",
552-
# "d24a", "d11b"
553-
# segment_tag: The segment tag as lowercase string, e.g. "bgm", "unb"
554-
# """
555-
# # some tags don't seem to have a downloadable description...
556-
# # however, the "d23a" directory has it...
557-
# # fmt: off
558-
# if segment_tag in [
559-
# "UCD", "UCF", "UCI", "UCM", "UCS", "UGH", "UGT", "UIB", "UIH", "UIR",
560-
# "UIT", "UIZ", "UNB", "UNE", "UNG", "UNH", "UNO", "UNP", "UNS", "UNT",
561-
# "UNZ", "USA", "USB", "USC", "USD", "USE", "USF", "USH", "USL", "USR",
562-
# "UST", "USU", "USX", "USY",
563-
# ]:
564-
# return ""
565-
# # fmt: on
566-
# text = _retrieve_or_get_cached_file(
567-
# f"{base_url}/{directory}/trsd/trsd" f"{segment_tag.lower()}.htm",
568-
# f"{directory}/trsd{segment_tag.lower()}.txt",
569-
# )
570-
# if not text:
571-
# logger.warning(f"No description found for segment: {segment_tag}")
572-
# return text
547+
class SegmentParserState:
548+
"""State management for segment parsing."""
549+
550+
def __init__(self):
551+
self.in_composite = False
552+
self.in_segment = False
553+
self.multiline = False
554+
self.keep_next_line = False
555+
self.url = ""
556+
self.last_toplevel_element = None
557+
self.sub_elements = []
558+
559+
def reset_for_new_segment(self):
560+
self.in_composite = False
561+
self.last_toplevel_element = None
562+
self.sub_elements = []
563+
564+
565+
def parse_segment_title(line: str) -> tuple[str, str] | None:
566+
"""Extract segment tag and title from a line."""
567+
# find pattern for title
568+
# IDE IDENTITY
569+
# UCD DATA ELEMENT ERROR INDICATION
570+
# UGH ANTI-COLLISION SEGMENT GROUP HEADER
571+
# ACT ALTERNATIVE CURRENCY TOTAL AMOUNT 88.1
572+
# ACA ALTERNATIVE CURRENCY AMOUNT 88.1
573+
# first check, if it generally matches a title line
574+
if not re.match(r"\s*[A-Z]{3}\s+[A-Z,-].*$", line):
575+
return None
576+
577+
# Try syntax v4 trsd.* file pattern
578+
pattern = re.match(r"^[ +*#|X]{5,8}([A-Z]{3})\s+([A-Z,-].*)$", line)
579+
if not pattern:
580+
# try syntax v1 edsd.* file pattern
581+
pattern = re.match(r"^([A-Z]{3})\s+([A-Z,-].*)\s+(?:[\d.]{2,4})?$", line)
582+
583+
if pattern:
584+
tag, title = pattern.groups()
585+
return tag, processed_title(title)
586+
return None
587+
588+
589+
def parse_segment_description(
590+
lines, line_number: int, segment: SegmentSpec
591+
) -> tuple[int, str]:
592+
"""Parse the Function description of a segment.
593+
594+
Returns:
595+
A tuple of:
596+
* New line number after parsing the Function description,
597+
* Function description
598+
"""
599+
pattern = re.match(r"^\s+Function:\s(.*?)\s*$", lines[line_number])
600+
if pattern:
601+
desc_firstline = pattern.group(1)
602+
desc, line_number, line = parse_multiline_until(
603+
r"^(?:Pos\s+TAG\s+Name\s+S|\d{3}[ +*#|X]+[A-Z\d]\d{3}\s+\w+).*",
604+
lines,
605+
line_number,
606+
)
607+
segment.description = " ".join([desc_firstline, desc])
608+
return line_number, line
609+
return line_number, ""
610+
611+
612+
def parse_toplevel_data_element(
613+
line: str, segment_tag: str
614+
) -> SegmentDataElementUsage | None:
615+
"""Parse a top-level data element line."""
616+
# Search for a start of a top level data element, like this:
617+
# 030 3164 CITY NAME C 1 an..35
618+
match = re.match(
619+
r"^(\d{3})[ +*#|X]+(\d{4})\s+(.*?)\s{2,30}([MC])\s+(\d+)\s+([an]+\.?\.?\d+)(?:\s+[\d,]+|\s*)?$",
620+
line,
621+
)
622+
if not match:
623+
return None
624+
625+
pos, code, title, mandatory, repeat, repr_line = match.groups()
626+
title = processed_title(title)
627+
ensure_data_element_spec_exists(code, title, segment_tag)
628+
629+
if title != data_element_specs[code].title:
630+
logger.warning(
631+
f"{segment_tag}.{code} title mismatch: '{title}' != '{data_element_specs[code].title}'"
632+
)
633+
634+
return SegmentDataElementUsage(
635+
pos=pos,
636+
element=data_element_specs[code],
637+
mandatory=mandatory == "M",
638+
repeat=int(repeat),
639+
repr_line=repr_line,
640+
)
641+
642+
643+
def parse_toplevel_composite_element(
644+
line: str, segment_tag: str
645+
) -> tuple[SegmentCompositeElementUsage, list] | None:
646+
"""Parse a top-level composite element line."""
647+
# New start of a top level composite element, like this:
648+
# 060 C819 COUNTRY SUBDIVISION DETAILS C 5
649+
match = re.match(
650+
r"^(\d{3})[ +*#|X]+([A-Z]\d{3})\s+(.*?)\s+([MC])\s+(\d+)(?:\s+[\d,]+|\s*)?$",
651+
line,
652+
)
653+
if not match:
654+
return None
655+
656+
pos, code, title, mandatory, repeat = match.groups()
657+
ensure_composite_spec_exists(code, processed_title(title), segment_tag)
658+
659+
sub_elements = []
660+
element = SegmentCompositeElementUsage(
661+
pos=pos,
662+
element=composite_specs[code],
663+
mandatory=mandatory == "M",
664+
repeat=int(repeat),
665+
schema=sub_elements,
666+
)
667+
return element, sub_elements
668+
669+
670+
def parse_sub_element(
671+
line: str, lines_iter, segment_tag: str
672+
) -> SegmentInlineDataElementUsage | None:
673+
"""Parse a sub-element of a composite."""
674+
# Start of composite sub element line, like:
675+
# 3299 Address purpose code C an..3
676+
# + 3131 Address type code C an..3
677+
# 5105 Monetary amount function detail
678+
# description code C an..17
679+
# first check if it looks similar to a data sub element ("startswith"...)
680+
681+
# Check if line looks like a sub-element
682+
if not re.match(r"^[ +*#|X]+(\d{4})\s+(.+)$", line):
683+
return None
684+
685+
# Handle multiline titles
686+
if not re.search(r"([MC])\s{2,}([an]+\.?\.?\d+)\s*$", line):
687+
line = line + " " + next(lines_iter).strip()
688+
689+
match = re.match(
690+
r"^[ +*#|X]+(\d{4})\s+(.+)\s+([MC])\s{2,}([an]+\.?\.?\d+)\s*$",
691+
line,
692+
)
693+
if not match:
694+
return None
695+
696+
code, title, mandatory, repr_line = match.groups()
697+
title = processed_title(title)
698+
ensure_data_element_spec_exists(code, title, segment_tag)
699+
700+
if not data_element_specs[code].stub and title != data_element_specs[code].title:
701+
logger.warning(
702+
f"{segment_tag}.{code} title mismatch: '{title}' != '{data_element_specs[code].title}'"
703+
)
704+
705+
return SegmentInlineDataElementUsage(
706+
element=data_element_specs[code],
707+
mandatory=mandatory == "M",
708+
repr_line=repr_line,
709+
)
710+
711+
712+
def parse_segment_dir(text: str, only_segment_tag: str = ""):
713+
"""Parses the description text containing one or more segments.
714+
715+
Refactored version with better structure and separation of concerns.
716+
"""
717+
if not text:
718+
return
719+
720+
lines = iter(text.strip().splitlines())
721+
line_number = 0
722+
state = SegmentParserState()
723+
segment = SegmentSpec(tag="", title="", schema=[], url="")
724+
725+
def save_current_segment():
726+
"""Helper to save the current segment."""
727+
if not segment.tag:
728+
return
729+
if segment.tag not in segment_specs:
730+
logger.warning(f"Could not fill segment {segment.tag} schema")
731+
else:
732+
segment.stub = False
733+
segment_specs[segment.tag] = segment
734+
735+
def save_toplevel_element():
736+
"""Helper to save the current top-level element."""
737+
if state.last_toplevel_element:
738+
segment.schema.append(state.last_toplevel_element)
739+
state.sub_elements = []
740+
741+
while True:
742+
try:
743+
if not state.keep_next_line:
744+
line, line_number = get_next_not_empty_line(lines, line_number)
745+
state.keep_next_line = False
746+
747+
# Parse URL if not in segment yet
748+
if not state.in_segment and not state.in_composite and not state.url:
749+
state.url = parse_url(line)
750+
751+
# ---------------------------- parse title ---------------------------------
752+
if title_match := parse_segment_title(line):
753+
tag, title = title_match
754+
755+
# Save previous segment
756+
save_current_segment()
757+
758+
# Stop if we only want one segment and found another
759+
if state.in_segment and only_segment_tag:
760+
break
761+
762+
# Create new segment
763+
segment = SegmentSpec(tag=tag, title=title, url=state.url, schema=[])
764+
state.reset_for_new_segment()
765+
state.in_segment = True
766+
continue
767+
768+
if not state.in_segment:
769+
continue
770+
771+
# Parse function description
772+
if re.match(r"^\s+Function:\s", line):
773+
line_number, line = parse_segment_description(
774+
lines, line_number, segment
775+
)
776+
state.keep_next_line = True
777+
continue
778+
779+
# ----------------------- top level data element ---------------------------
780+
if data_elem := parse_toplevel_data_element(line, segment.tag):
781+
save_toplevel_element()
782+
state.last_toplevel_element = data_elem
783+
state.in_composite = False
784+
continue
785+
786+
# ------------------- top level composite data element ---------------------
787+
if composite_result := parse_toplevel_composite_element(line, segment.tag):
788+
save_toplevel_element()
789+
state.last_toplevel_element, state.sub_elements = composite_result
790+
state.in_composite = True
791+
continue
792+
793+
# ------------------------- sub element of a composite----------------------
794+
if state.in_composite:
795+
if sub_elem := parse_sub_element(line, lines, segment.tag):
796+
state.sub_elements.append(sub_elem)
797+
continue
798+
799+
except StopIteration:
800+
break
801+
802+
# Save last segment
803+
save_toplevel_element()
804+
save_current_segment()
573805

574806

575807
def parse_segment_dir(text: str, only_segment_tag: str = ""):

0 commit comments

Comments
 (0)