|
22 | 22 | from .datatypes import S7WordLen |
23 | 23 | from .error import S7Error, S7ConnectionError, S7ProtocolError, S7StalePacketError |
24 | 24 | from .client_base import ClientMixin |
| 25 | +from .optimizer import ReadItem, ReadPacket, sort_items, merge_items, packetize, extract_results |
25 | 26 |
|
26 | 27 | from .type import ( |
27 | 28 | Area, |
|
43 | 44 | logger = logging.getLogger(__name__) |
44 | 45 |
|
45 | 46 |
|
| 47 | +class _OptimizationPlan: |
| 48 | + """Cached optimization plan for repeated read_multi_vars calls with the same layout.""" |
| 49 | + |
| 50 | + def __init__(self, cache_key: tuple[int, ...], packets: list[ReadPacket], read_items: list[ReadItem]): |
| 51 | + self.cache_key = cache_key |
| 52 | + self.packets = packets |
| 53 | + self.read_items = read_items |
| 54 | + |
| 55 | + |
46 | 56 | class Client(ClientMixin): |
47 | 57 | """ |
48 | 58 | Pure Python S7 client implementation. |
@@ -123,6 +133,10 @@ def __init__( |
123 | 133 | Parameter.PDURequest: 480, |
124 | 134 | } |
125 | 135 |
|
| 136 | + # Multi-read optimizer state |
| 137 | + self._opt_plan: Optional[_OptimizationPlan] = None |
| 138 | + self.multi_read_max_gap: int = 5 |
| 139 | + |
126 | 140 | # Async operation state |
127 | 141 | self._async_pending = False |
128 | 142 | self._async_result: Optional[bytearray] = None |
@@ -665,55 +679,130 @@ def build_chunk_request(o: int = chunk_offset, cd: bytes = bytes(chunk_data)) -> |
665 | 679 | return 0 |
666 | 680 |
|
667 | 681 | def read_multi_vars(self, items: Union[List[dict[str, Any]], "Array[S7DataItem]"]) -> Tuple[int, Any]: |
668 | | - """ |
669 | | - Read multiple variables in a single request. |
| 682 | + """Read multiple variables in a single request. |
| 683 | +
|
| 684 | + When given a list of dicts with two or more items, uses the multi-variable |
| 685 | + read optimizer to merge adjacent reads and pack them into minimal PDU |
| 686 | + exchanges. This significantly reduces the number of round-trips compared |
| 687 | + to reading each variable individually. |
670 | 688 |
|
671 | 689 | Args: |
672 | | - items: List of item specifications or S7DataItem array |
| 690 | + items: List of item specifications (dicts with ``area``, ``start``, |
| 691 | + ``size``, and optionally ``db_number``) **or** a ctypes |
| 692 | + ``Array[S7DataItem]``. |
673 | 693 |
|
674 | 694 | Returns: |
675 | | - Tuple of (result, items with data) |
| 695 | + Tuple of (result_code, data) where *data* is either the updated |
| 696 | + ctypes array or a list of bytearrays in the original item order. |
676 | 697 |
|
677 | 698 | Raises: |
678 | | - ValueError: If more than MAX_VARS items are requested |
| 699 | + ValueError: If more than MAX_VARS items are requested. |
679 | 700 | """ |
680 | 701 | if not items: |
681 | 702 | return (0, items) |
682 | 703 |
|
683 | 704 | if len(items) > self.MAX_VARS: |
684 | 705 | raise ValueError(f"Too many items: {len(items)} exceeds MAX_VARS ({self.MAX_VARS})") |
685 | 706 |
|
686 | | - # Handle S7DataItem array (ctypes) |
| 707 | + # Handle S7DataItem array (ctypes) -- unchanged legacy path |
687 | 708 | if hasattr(items, "_type_") and hasattr(items[0], "Area"): |
688 | | - # This is a ctypes array of S7DataItem - use cast for type safety |
689 | 709 | s7_items = cast("Array[S7DataItem]", items) |
690 | 710 | for s7_item in s7_items: |
691 | 711 | area = Area(s7_item.Area) |
692 | 712 | db_number = s7_item.DBNumber |
693 | 713 | start = s7_item.Start |
694 | 714 | size = s7_item.Amount |
695 | 715 | data = self.read_area(area, db_number, start, size) |
696 | | - |
697 | | - # Copy data to pData buffer |
698 | 716 | if s7_item.pData: |
699 | 717 | for i, b in enumerate(data): |
700 | 718 | s7_item.pData[i] = b |
701 | | - |
702 | 719 | return (0, items) |
703 | 720 |
|
704 | | - # Handle dict list |
| 721 | + # Dict list path -- use optimizer for 2+ items |
705 | 722 | dict_items = cast(List[dict[str, Any]], items) |
706 | | - results = [] |
707 | | - for dict_item in dict_items: |
708 | | - area = dict_item["area"] |
709 | | - db_number = dict_item.get("db_number", 0) |
710 | | - start = dict_item["start"] |
711 | | - size = dict_item["size"] |
712 | | - data = self.read_area(area, db_number, start, size) |
713 | | - results.append(data) |
714 | 723 |
|
| 724 | + if len(dict_items) <= 1: |
| 725 | + # Single item: no optimization needed |
| 726 | + results: list[bytearray] = [] |
| 727 | + for dict_item in dict_items: |
| 728 | + area = dict_item["area"] |
| 729 | + db_number = dict_item.get("db_number", 0) |
| 730 | + start = dict_item["start"] |
| 731 | + size = dict_item["size"] |
| 732 | + data = self.read_area(area, db_number, start, size) |
| 733 | + results.append(data) |
| 734 | + return (0, results) |
| 735 | + |
| 736 | + return self._read_multi_vars_optimized(dict_items) |
| 737 | + |
| 738 | + def _read_multi_vars_optimized(self, dict_items: List[dict[str, Any]]) -> Tuple[int, List[bytearray]]: |
| 739 | + """Optimized multi-variable read using merge + packetize strategy. |
| 740 | +
|
| 741 | + Args: |
| 742 | + dict_items: List of item dicts (area, db_number, start, size). |
| 743 | +
|
| 744 | + Returns: |
| 745 | + Tuple of (0, list of bytearrays in original order). |
| 746 | + """ |
| 747 | + # Build ReadItem list |
| 748 | + read_items: list[ReadItem] = [] |
| 749 | + for idx, d in enumerate(dict_items): |
| 750 | + area_val = int(d["area"]) |
| 751 | + db_number = d.get("db_number", 0) |
| 752 | + read_items.append( |
| 753 | + ReadItem( |
| 754 | + area=area_val, |
| 755 | + db_number=db_number, |
| 756 | + byte_offset=d["start"], |
| 757 | + bit_offset=0, |
| 758 | + byte_length=d["size"], |
| 759 | + index=idx, |
| 760 | + ) |
| 761 | + ) |
| 762 | + |
| 763 | + # Build cache key from the item layout |
| 764 | + cache_key = tuple(val for ri in read_items for val in (ri.area, ri.db_number, ri.byte_offset, ri.byte_length)) |
| 765 | + |
| 766 | + # Reuse cached plan if layout matches |
| 767 | + if self._opt_plan is not None and self._opt_plan.cache_key == cache_key: |
| 768 | + packets = self._opt_plan.packets |
| 769 | + else: |
| 770 | + sorted_ri = sort_items(read_items) |
| 771 | + max_block = self._max_read_size() |
| 772 | + blocks = merge_items(sorted_ri, max_gap=self.multi_read_max_gap, max_block_size=max_block) |
| 773 | + packets = packetize(blocks, self.pdu_length) |
| 774 | + self._opt_plan = _OptimizationPlan(cache_key, packets, read_items) |
| 775 | + |
| 776 | + # Execute each packet |
| 777 | + for packet in packets: |
| 778 | + block_specs = [(blk.area, blk.db_number, blk.start_offset, blk.byte_length) for blk in packet.blocks] |
| 779 | + |
| 780 | + if len(block_specs) == 1: |
| 781 | + # Single block: use regular read to avoid multi-read overhead |
| 782 | + blk = packet.blocks[0] |
| 783 | + data = self.read_area( |
| 784 | + Area(blk.area) if blk.area in {a.value for a in Area} else Area.DB, |
| 785 | + blk.db_number, |
| 786 | + blk.start_offset, |
| 787 | + blk.byte_length, |
| 788 | + ) |
| 789 | + blk.buffer = data |
| 790 | + else: |
| 791 | + # Multi-block: use multi-read PDU |
| 792 | + request = self.protocol.build_multi_read_request(block_specs) |
| 793 | + response = self._send_receive(request) |
| 794 | + block_data_list = self.protocol.extract_multi_read_data(response, len(block_specs)) |
| 795 | + for blk, buf in zip(packet.blocks, block_data_list): |
| 796 | + blk.buffer = buf |
| 797 | + |
| 798 | + # Extract per-item results in original order |
| 799 | + results = extract_results(packets, len(dict_items)) |
715 | 800 | return (0, results) |
716 | 801 |
|
| 802 | + def _map_area_int(self, area_int: int) -> S7Area: |
| 803 | + """Map integer area value to S7Area enum.""" |
| 804 | + return S7Area(area_int) |
| 805 | + |
717 | 806 | def write_multi_vars(self, items: Union[List[dict[str, Any]], List[S7DataItem]]) -> int: |
718 | 807 | """ |
719 | 808 | Write multiple variables in a single request. |
|
0 commit comments