Skip to content

Commit d00c480

Browse files
committed
Updates
1 parent a417a4a commit d00c480

File tree

1 file changed

+14
-36
lines changed

1 file changed

+14
-36
lines changed

hazelcast/internal/asyncio_reactor.py

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,7 @@ async def create_and_connect(
9090
return this
9191

9292
def _create_protocol(self):
93-
return HazelcastProtocol(
94-
self._loop,
95-
self._reader,
96-
self._address,
97-
self._update_read_time,
98-
self._update_write_time,
99-
self._update_sent,
100-
self._update_received,
101-
)
93+
return HazelcastProtocol(self)
10294

10395
async def _create_connection(self):
10496
loop = self._loop
@@ -133,23 +125,8 @@ class HazelcastProtocol(asyncio.BufferedProtocol):
133125

134126
PROTOCOL_STARTER = b"CP2"
135127

136-
def __init__(
137-
self,
138-
loop: AbstractEventLoop,
139-
reader,
140-
address,
141-
update_read_time,
142-
update_write_time,
143-
update_sent,
144-
update_received,
145-
):
146-
self._loop = loop
147-
self._reader = reader
148-
self._address = address
149-
self._update_read_time = update_read_time
150-
self._update_write_time = update_write_time
151-
self._update_sent = update_sent
152-
self._update_received = update_received
128+
def __init__(self, conn: AsyncioConnection):
129+
self._conn = conn
153130
self._transport: transports.BaseTransport | None = None
154131
self.start_time: float | None = None
155132
self._write_buf = io.BytesIO()
@@ -161,11 +138,12 @@ def connection_made(self, transport: transports.BaseTransport):
161138
self._transport = transport
162139
self.start_time = time.time()
163140
self.write(self.PROTOCOL_STARTER)
164-
_logger.debug("Connected to %s", self._address)
165-
self._loop.call_soon(self._write_loop)
141+
_logger.debug("Connected to %s", self._conn._address)
142+
self._conn._loop.call_soon(self._write_loop)
166143

167144
def connection_lost(self, exc):
168145
self._alive = False
146+
self._conn._loop.create_task(self._conn.close_connection(str(exc), None))
169147
return False
170148

171149
def close(self):
@@ -183,11 +161,11 @@ def get_buffer(self, sizehint):
183161

184162
def buffer_updated(self, nbytes):
185163
recv_bytes = self._recv_buf[:nbytes]
186-
self._update_read_time(time.time())
187-
self._update_received(nbytes)
188-
self._reader.read(recv_bytes)
189-
if self._reader.length:
190-
self._reader.process()
164+
self._conn._update_read_time(time.time())
165+
self._conn._update_received(nbytes)
166+
self._conn._reader.read(recv_bytes)
167+
if self._conn._reader.length:
168+
self._conn._reader.process()
191169

192170
def eof_received(self):
193171
self._alive = False
@@ -197,11 +175,11 @@ def _do_write(self):
197175
return
198176
buf_bytes = self._write_buf.getvalue()
199177
self._transport.write(buf_bytes[: self._write_buf_size])
200-
self._update_write_time(time.time())
201-
self._update_sent(self._write_buf_size)
178+
self._conn._update_write_time(time.time())
179+
self._conn._update_sent(self._write_buf_size)
202180
self._write_buf.seek(0)
203181
self._write_buf_size = 0
204182

205183
def _write_loop(self):
206184
self._do_write()
207-
return self._loop.call_later(0.01, self._write_loop)
185+
return self._conn._loop.call_later(0.01, self._write_loop)

0 commit comments

Comments
 (0)