Skip to content

Commit 8485678

Browse files
DataSubscriber code cleanup
1 parent a1b8e69 commit 8485678

File tree

1 file changed

+37
-50
lines changed

1 file changed

+37
-50
lines changed

src/sttp/transport/datasubscriber.py

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
from ..ticks import Ticks
3838
from ..version import Version
3939
from typing import List, Callable, Optional
40-
from datetime import datetime
4140
from time import time
4241
from uuid import UUID
4342
from threading import Lock, Thread, Event
@@ -315,7 +314,7 @@ def connect(self, hostname: str, port: np.uint16) -> Optional[Exception]:
315314
# User requests to connection are not an auto-reconnect attempt
316315
return self._connect(hostname, port, False)
317316

318-
def _connect(self, hostname: str, port: np.uint16, autoreconnecting: bool) -> Optional[Exception]:
317+
def _connect(self, hostname: str, port: np.uint16, autoreconnecting: bool) -> Optional[Exception]: # sourcery skip: extract-method
319318
if self._connected:
320319
return RuntimeError("subscriber is already connected; disconnect first")
321320

@@ -390,17 +389,18 @@ def subscribe(self) -> Optional[Exception]:
390389
self._total_measurementsreceived = np.uint64(0)
391390

392391
subscription = self._subscription
393-
connectionbuilder: List[str] = []
394-
395-
connectionbuilder.append(f"throttled={subscription.throttled}")
396-
connectionbuilder.append(f";publishInterval={subscription.publishinterval:.6f}")
397-
connectionbuilder.append(f";includeTime={subscription.includetime}")
398-
connectionbuilder.append(f";processingInterval={subscription.processinginterval}")
399-
connectionbuilder.append(f";useMillisecondResolution={subscription.use_millisecondresolution}")
400-
connectionbuilder.append(f";requestNaNValueFilter={subscription.request_nanvaluefilter}")
401-
connectionbuilder.append(f";assemblyInfo={{source={self.sttp_sourceinfo}")
402-
connectionbuilder.append(f";version={self.sttp_versioninfo}")
403-
connectionbuilder.append(f";updatedOn={self.sttp_updatedoninfo}}}")
392+
393+
connectionbuilder: List[str] = [
394+
f"throttled={subscription.throttled}",
395+
f";publishInterval={subscription.publishinterval:.6f}",
396+
f";includeTime={subscription.includetime}",
397+
f";processingInterval={subscription.processinginterval}",
398+
f";useMillisecondResolution={subscription.use_millisecondresolution}",
399+
f";requestNaNValueFilter={subscription.request_nanvaluefilter}",
400+
f";assemblyInfo={{source={self.sttp_sourceinfo}",
401+
f";version={self.sttp_versioninfo}",
402+
f";updatedOn={self.sttp_updatedoninfo}}}"
403+
]
404404

405405
if len(subscription.filterexpression) > 0:
406406
connectionbuilder.append(f";filterExpression={{{subscription.filterexpression}}}")
@@ -443,7 +443,7 @@ def subscribe(self) -> Optional[Exception]:
443443

444444
# Reset TSSC decompressor on successful (re)subscription
445445
self._tssc_lastoosreport_mutex.acquire()
446-
self._tssc_lastoosreport = datetime.utcnow()
446+
self._tssc_lastoosreport = time()
447447
self._tssc_lastoosreport_mutex.release()
448448
self._tssc_resetrequested = True
449449

@@ -488,7 +488,7 @@ def disconnect(self):
488488
def _disconnect(self, jointhread: bool, autoreconnecting: bool):
489489
# Check if disconnect thread is running or subscriber has already disconnected
490490
if self._disconnecting:
491-
if not autoreconnecting and self._disconnecting and not self._disconnected:
491+
if not autoreconnecting and not self._disconnected:
492492
self._connector.cancel()
493493

494494
self._disconnectthread_mutex.acquire()
@@ -516,7 +516,7 @@ def _disconnect(self, jointhread: bool, autoreconnecting: bool):
516516
if jointhread and disconnectthread.is_alive():
517517
disconnectthread.join()
518518

519-
def _run_disconnectthread(self, autoreconnecting: bool):
519+
def _run_disconnectthread(self, autoreconnecting: bool): # sourcery skip: extract-method
520520
# Let any pending connect operation complete before disconnect - prevents destruction disconnect before connection is completed
521521
if not autoreconnecting:
522522
self._connector.cancel()
@@ -644,8 +644,8 @@ def _read_payloadheader(self):
644644
# subscription, data packets get handled from this thread.
645645
def _run_datachannel_responsethread(self):
646646
reader = StreamEncoder(
647-
(lambda length: self._datachannel_socket.recv(length)),
648-
(lambda buffer: self._datachannel_socket.send(buffer)))
647+
(lambda length: self._datachannel_socket.recvfrom(length)),
648+
(lambda buffer: self._datachannel_socket.sendto(buffer)))
649649

650650
buffer = bytearray(MAXPACKET_SIZE)
651651

@@ -658,12 +658,12 @@ def _run_datachannel_responsethread(self):
658658

659659
# Process response
660660
self._process_serverresponse(bytes(self._readbuffer[:length]))
661-
except:
661+
except Exception:
662662
# Read error, connection may have been closed by peer; terminate connection
663663
self._dispatch_connectionterminated()
664664
return
665665

666-
def _process_serverresponse(self, buffer: bytes):
666+
def _process_serverresponse(self, buffer: bytes): # sourcery skip: remove-pass-elif
667667
if self._disconnecting:
668668
return
669669

@@ -705,10 +705,10 @@ def _handle_succeeded(self, commandcode: ServerCommand, data: bytes):
705705

706706
if commandcode == ServerCommand.METADATAREFRESH:
707707
self._handle_metadatarefresh(data)
708-
elif commandcode == ServerCommand.SUBSCRIBE or commandcode == ServerCommand.UNSUBSCRIBE:
708+
elif commandcode in [ServerCommand.SUBSCRIBE, ServerCommand.UNSUBSCRIBE]:
709709
self._subscribed = commandcode == ServerCommand.SUBSCRIBE
710710
has_response_message = True
711-
elif commandcode == ServerCommand.ROTATECIPHERKEYS or commandcode == ServerCommand.UPDATEPROCESSINGINTERVAL:
711+
elif commandcode in [ServerCommand.ROTATECIPHERKEYS, ServerCommand.UPDATEPROCESSINGINTERVAL]:
712712
has_response_message = True
713713
else:
714714
# If we don't know what the message is, we can't interpret
@@ -721,10 +721,9 @@ def _handle_succeeded(self, commandcode: ServerCommand, data: bytes):
721721

722722
# Each of these responses come with a message that will
723723
# be delivered to the user via the status message callback.
724-
message: List[str] = []
725-
message.append(f"Received success code in response to server command: {normalize_enumname(commandcode)}")
724+
message: List[str] = [f"Received success code in response to server command: {normalize_enumname(commandcode)}"]
726725

727-
if len(data) > 0:
726+
if data: # len > 0
728727
message.append("\n")
729728
message.append(self.decodestr(data))
730729

@@ -738,13 +737,13 @@ def _handle_failed(self, commandcode: ServerCommand, data: bytes):
738737
else:
739738
message.append(f"Received failure code in response to server command: {normalize_enumname(commandcode)}")
740739

741-
if len(data) > 0:
742-
if len(message) > 0:
740+
if data: # len > 0
741+
if message: # len > 0
743742
message.append("\n")
744743

745744
message.append(self.decodestr(data))
746745

747-
if len(message) > 0:
746+
if message: # len > 0
748747
self._dispatch_errormessage("".join(message))
749748

750749
def _handle_metadatarefresh(self, data: bytes):
@@ -775,7 +774,7 @@ def _handle_processingcomplete(self, data: bytes):
775774
self.processingcomplete_callback(self.decodestr(data))
776775

777776
def _handle_update_signalindexcache(self, data: bytes):
778-
if len(data) == 0:
777+
if not data: # len == 0
779778
return
780779

781780
version = self.version
@@ -814,7 +813,7 @@ def _handle_update_signalindexcache(self, data: bytes):
814813
self.subscriptionupdated_callback(signalindexcache)
815814

816815
def _handle_update_basetimes(self, data: bytes):
817-
if len(data) == 0:
816+
if not data: # len == 0
818817
return
819818

820819
self._timeindex = 0 if BigEndian.to_uint32(data) == 0 else 1
@@ -887,10 +886,7 @@ def _handle_datapacket(self, data: bytes):
887886
if self._key_ivs is not None:
888887
# Get a local copy keyIVs - these can change at any time
889888
key_ivs = self._key_ivs
890-
cipherindex = 0
891-
892-
if datapacketflags & DataPacketFlags.CIPHERINDEX > 0:
893-
cipherindex = 1
889+
cipherindex = 1 if datapacketflags & DataPacketFlags.CIPHERINDEX > 0 else 0
894890

895891
try:
896892
cipher = AES.new(key_ivs[cipherindex][KEY_INDEX], AES.MODE_CBC, key_ivs[cipherindex][IV_INDEX])
@@ -901,10 +897,7 @@ def _handle_datapacket(self, data: bytes):
901897
return
902898

903899
count = BigEndian.to_uint32(data)
904-
cacheindex = 0
905-
906-
if datapacketflags & DataPacketFlags.CACHEINDEX > 0:
907-
cacheindex = 1
900+
cacheindex = 1 if datapacketflags & DataPacketFlags.CACHEINDEX > 0 else 0
908901

909902
self._signalindexcache_mutex.acquire()
910903
signalindexcache = self._signalindexcache[cacheindex]
@@ -943,7 +936,7 @@ def _parse_compact_measurements(self, signalindexcache: SignalIndexCache, data:
943936
includetime = self.subscription.includetime
944937
index = 0
945938

946-
for i in range(count):
939+
for _ in range(count):
947940
# Deserialize compact measurement format
948941
measurement = CompactMeasurement(signalindexcache, includetime, usemillisecondresolution, self._basetimeoffsets)
949942
(bytesdecoded, err) = measurement.decode(data[index:])
@@ -958,24 +951,18 @@ def _parse_compact_measurements(self, signalindexcache: SignalIndexCache, data:
958951

959952
return measurements
960953

961-
def _handle_bufferblock(self, data: bytes): # sourcery skip: low-code-quality
954+
def _handle_bufferblock(self, data: bytes): # sourcery skip: low-code-quality, extract-method
962955
# Buffer block received - wrap as a BufferBlockMeasurement and expose back to consumer
963956
sequencenumber = BigEndian.to_uint32(data)
964957
buffercacheindex = int(sequencenumber - self._bufferblock_expectedsequencenumber)
965-
signalindexcacheindex = 0
966-
967-
if self.version > 1 and data[4:][0] > 0:
968-
signalindexcacheindex = 1
969958

970959
# Check if this buffer block has already been processed (e.g., mistaken retransmission due to timeout)
971960
if buffercacheindex >= 0 and (buffercacheindex >= len(self._bufferblock_cache) and self._bufferblock_cache[buffercacheindex].buffer is None):
972961
# Send confirmation that buffer block is received
973962
self.send_servercommand(ServerCommand.CONFIRMBUFFERBLOCK, data[:4])
974963

975-
if self.version > 1:
976-
data = data[5:]
977-
else:
978-
data = data[4:]
964+
signalindexcacheindex = 1 if self.version > 1 and data[4:][0] > 0 else 0
965+
data = data[5:] if self.version > 1 else data[4:]
979966

980967
# Get measurement key from signal index cache
981968
signalindex = BigEndian.to_uint32(data)
@@ -1016,7 +1003,7 @@ def _handle_bufferblock(self, data: bytes): # sourcery skip: low-code-quality
10161003
# Ensure that the list has at least as many elements as it needs to cache this measurement.
10171004
# This edge case handles possible dropouts and/or out of order packet deliver when data
10181005
# transport is UDP - this use case is not expected when using a TCP only connection.
1019-
for i in range(len(self._bufferblock_cache), buffercacheindex + 1):
1006+
for _ in range(len(self._bufferblock_cache), buffercacheindex + 1):
10201007
self._bufferblock_cache.append(BufferBlock())
10211008

10221009
# Insert this buffer block into the proper location in the list
@@ -1057,7 +1044,7 @@ def send_servercommand(self, commandcode: ServerCommand, data: bytes = None):
10571044
# Insert command code
10581045
self._writebuffer[4] = commandcode
10591046

1060-
if data is not None and len(data) > 0:
1047+
if data is not None and data: # len > 0
10611048
self._writebuffer[5:commandbuffersize] = data
10621049

10631050
if commandcode == ServerCommand.METADATAREFRESH:

0 commit comments

Comments
 (0)