-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathminimalReader.py
More file actions
75 lines (57 loc) · 2.03 KB
/
minimalReader.py
File metadata and controls
75 lines (57 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
import select
import time
from imdclient.IMDClient import IMDClient
from .utils import parse_host_port
logger = logging.getLogger("imdclient.IMDClient")
class minimalReader:
"""
Minimal reader for testing purposes
Parameters
----------
filename : str
a string of the form "host:port" where host is the hostname
or IP address of the listening MD engine server and port
is the port number.
n_atoms : int (optional)
number of atoms in the system. defaults to number of atoms
in the topology. don't set this unless you know what you're doing.
kwargs : dict (optional)
keyword arguments passed to the constructed :class:`IMDClient`
"""
def __init__(self, filename, n_atoms=None, process_stream=False, **kwargs):
self.imd_frame = None
# a trajectory of imd frames
self.trajectory = None
self.n_atoms = n_atoms
host, port = parse_host_port(filename)
# This starts the simulation
self._imdclient = IMDClient(host, port, n_atoms, **kwargs)
imdsinfo = self._imdclient.get_imdsessioninfo()
self._frame = -1
if process_stream:
self._process_stream()
def _read_next_frame(self):
try:
imd_frame = self._imdclient.get_imdframe()
except EOFError:
raise
self._frame += 1
self.imd_frame = imd_frame
logger.debug(f"minimalReader: Loaded frame {self._frame}")
def _add_frame_to_trajectory(self):
# Add the current frame to the trajectory
if self.trajectory is None:
self.trajectory = []
self.trajectory.append(self.imd_frame)
logger.debug(
f"minimalReader: Added frame {self._frame} to trajectory"
)
def _process_stream(self):
# Process the stream of frames
while True:
try:
self._read_next_frame()
self._add_frame_to_trajectory()
except EOFError:
break