|
1 | 1 | from amaranth import * |
2 | | -from amaranth.lib import enum, data, wiring, stream, io |
| 2 | +from amaranth.lib import enum, data, wiring, stream, io, meta |
3 | 3 | from amaranth.lib.wiring import In, Out |
4 | 4 |
|
5 | | -from glasgow.gateware.ports import PortGroup |
| 5 | +from typing import List, Dict, Tuple, Self |
| 6 | +from collections.abc import Mapping |
| 7 | +from pprint import pformat |
6 | 8 |
|
| 9 | +__all__ = ["IOStreamer", "IOClocker", "IOShape", "PortAnnotation", "PortGroup"] |
7 | 10 |
|
8 | | -__all__ = ["IOStreamer"] |
| 11 | +class PortGroup: |
| 12 | + """Group of Amaranth library I/O ports. |
| 13 | +
|
| 14 | + This object is a stand-in for the object returned by the Amaranth :py:`platform.request()` |
| 15 | + function, as expected by the I/O cores. |
| 16 | + """ |
| 17 | + |
| 18 | + def __init__(self, **kwargs): |
| 19 | + for name, port in kwargs.items(): |
| 20 | + setattr(self, name, port) |
| 21 | + |
| 22 | + def __getitem__(self, key): |
| 23 | + return self.__dict__[key] |
| 24 | + |
| 25 | + def __setattr__(self, name, value): |
| 26 | + if not name.startswith("_"): |
| 27 | + assert value is None or isinstance(value, (io.PortLike, PortGroup)) |
| 28 | + object.__setattr__(self, name, value) |
| 29 | + |
| 30 | + def __repr__(self): |
| 31 | + attrs = [] |
| 32 | + for name, value in self.__dict__.items(): |
| 33 | + if not name.startswith('_'): |
| 34 | + attrs.append(f"{name}={value!r}") |
| 35 | + return f"{self.__class__.__name__}({', '.join(attrs)})" |
9 | 36 |
|
10 | 37 |
|
11 | 38 | def _filter_ioshape(direction, ioshape): |
@@ -65,7 +92,98 @@ def elaborate(self, platform): |
65 | 92 |
|
66 | 93 | return m |
67 | 94 |
|
| 95 | +class PortAnnotation(meta.Annotation): |
| 96 | + schema = { |
| 97 | + "$schema": "https://json-schema.org/draft/2020-12/schema", |
| 98 | + "$id": "https://api.chipflow.com/schema/chipflow-lib/0/port-layout.json", |
| 99 | + "type": "object", |
| 100 | + "properties": { |
| 101 | + "ports": { |
| 102 | + "description": "A list of ports required for a component, their directions and widths", |
| 103 | + "type": "object", |
| 104 | + "patternProperties": { |
| 105 | + "^.+$": { |
| 106 | + "type": "object", |
| 107 | + "properties": { |
| 108 | + "direction": {"enum": ["io", "i", "o"] }, |
| 109 | + "width": {"type": "number", "minimum": 1} |
| 110 | + } |
| 111 | + }, |
| 112 | + }, |
| 113 | + }, |
| 114 | + }, |
| 115 | + "requiredProperties": [ |
| 116 | + "ports", |
| 117 | + ], |
| 118 | + } |
| 119 | + |
| 120 | + def __init__(self, origin): |
| 121 | + self._origin = origin |
| 122 | + |
| 123 | + @property |
| 124 | + def origin(self): |
| 125 | + return self._origin |
| 126 | + |
| 127 | + def as_json(self): |
| 128 | + instance = { |
| 129 | + "ports": self._origin.ioshape, |
| 130 | + } |
| 131 | + # Validating the value returned by `as_json()` ensures its conformance. |
| 132 | + self.validate(instance) |
| 133 | + return instance |
| 134 | + |
| 135 | + |
| 136 | +class IOShapeException(Exception): |
| 137 | + """ |
| 138 | + This Exception is thrown whenever an :class`IOShape` and :class`PortLike` are mismatched. |
| 139 | +
|
| 140 | + """ |
| 141 | + |
68 | 142 |
|
| 143 | +class IOShape(Dict[str, Tuple[str, int]]): |
| 144 | + def check_ports(self, ports) -> PortGroup: |
| 145 | + print(f"IOShape.check_ports: {self}, {pformat(ports)}") |
| 146 | + |
| 147 | + def check_port(name, direction, width, port: io.PortLike): |
| 148 | + try: |
| 149 | + assert port.direction & io.Direction(direction) |
| 150 | + assert len(port) == width |
| 151 | + except Exception as e: |
| 152 | + raise IOShapeException(f"Port mismatch for {name}: shape = {direction}:{width}, port={port}: {e}") |
| 153 | + |
| 154 | + pg = PortGroup() |
| 155 | + |
| 156 | + if type(ports) == PortGroup: |
| 157 | + items = ports.__dict__.items() |
| 158 | + else: |
| 159 | + items = ports.items() |
| 160 | + |
| 161 | + for name, port in items: |
| 162 | + print (f"checking {name} {port}") |
| 163 | + if name not in self: |
| 164 | + print(f"splitting: {len(port)}") |
| 165 | + if len(port) > 1: |
| 166 | + for i in range(len(port)): |
| 167 | + pin = f"{name}{i}" |
| 168 | + print(f"checking: {i}, {pin}") |
| 169 | + check_port(pin, self[pin][0], 1, port[i]) |
| 170 | + setattr(pg, pin, port[i]) |
| 171 | + else: |
| 172 | + check_port(name, self[name][0], self[name][1], port) |
| 173 | + setattr(pg, name, port) |
| 174 | + |
| 175 | + return pg |
| 176 | + |
| 177 | +class PortSignature(wiring.Signature): |
| 178 | + def __init__(self, members): |
| 179 | + super().__init__(members) |
| 180 | + |
| 181 | + def annotations(self, obj, /): |
| 182 | + return wiring.Signature.annotations(self, obj) + (PortAnnotation(self),) |
| 183 | + |
| 184 | + def __repr__(self): |
| 185 | + return f"PortSignature({self._ioshape})" |
| 186 | + |
69 | 187 | class IOStreamer(wiring.Component): |
70 | 188 | """I/O buffer to stream adapter. |
71 | 189 |
|
|
0 commit comments