|
2 | 2 |
|
3 | 3 | import io |
4 | 4 | import os |
5 | | -import six |
6 | 5 | import subprocess |
7 | 6 | import time |
8 | 7 |
|
9 | | -try: |
10 | | - import psutil |
11 | | -except ImportError: |
12 | | - psutil = None |
13 | | - |
14 | 8 | from shutil import rmtree |
15 | | -from six import raise_from |
| 9 | +from six import raise_from, iteritems |
16 | 10 | from tempfile import mkstemp, mkdtemp |
17 | 11 |
|
18 | 12 | from .enums import NodeStatus, ProcessType |
|
70 | 64 | from .backup import NodeBackup |
71 | 65 |
|
72 | 66 |
|
| 67 | +class ProcessProxy(object): |
| 68 | + """ |
| 69 | + Wrapper for psutil.Process |
| 70 | +
|
| 71 | + Attributes: |
| 72 | + process: wrapped psutill.Process object |
| 73 | + ptype: instance of ProcessType |
| 74 | + """ |
| 75 | + |
| 76 | + def __init__(self, process): |
| 77 | + self.process = process |
| 78 | + self.ptype = ProcessType.from_process(process) |
| 79 | + |
| 80 | + def __getattr__(self, name): |
| 81 | + return getattr(self.process, name) |
| 82 | + |
| 83 | + def __str__(self): |
| 84 | + pid = self.process.pid |
| 85 | + cmdline = ' '.join(self.process.cmdline()).strip() |
| 86 | + return '{} [{}]'.format(cmdline, pid) |
| 87 | + |
| 88 | + |
73 | 89 | class PostgresNode(object): |
74 | 90 | def __init__(self, name=None, port=None, base_dir=None): |
75 | 91 | """ |
@@ -122,11 +138,88 @@ def __exit__(self, type, value, traceback): |
122 | 138 |
|
123 | 139 | @property |
124 | 140 | def pid(self): |
125 | | - return self.get_main_pid() |
| 141 | + """ |
| 142 | + Return postmaster's PID if node is running, else 0. |
| 143 | + """ |
| 144 | + |
| 145 | + if self.status(): |
| 146 | + pid_file = os.path.join(self.data_dir, PG_PID_FILE) |
| 147 | + with io.open(pid_file) as f: |
| 148 | + return int(f.readline()) |
| 149 | + |
| 150 | + # for clarity |
| 151 | + return 0 |
126 | 152 |
|
127 | 153 | @property |
128 | 154 | def auxiliary_pids(self): |
129 | | - return self.get_auxiliary_pids() |
| 155 | + """ |
| 156 | + Returns a dict of { ProcessType : PID }. |
| 157 | + """ |
| 158 | + |
| 159 | + result = {} |
| 160 | + |
| 161 | + for process in self.auxiliary_processes: |
| 162 | + if process.ptype not in result: |
| 163 | + result[process.ptype] = [] |
| 164 | + |
| 165 | + result[process.ptype].append(process.pid) |
| 166 | + |
| 167 | + return result |
| 168 | + |
| 169 | + @property |
| 170 | + def auxiliary_processes(self): |
| 171 | + """ |
| 172 | + Returns a list of auxiliary processes. |
| 173 | + Each process is represented by ProcessProxy object. |
| 174 | + """ |
| 175 | + |
| 176 | + def is_aux(process): |
| 177 | + return process.ptype != ProcessType.Unknown |
| 178 | + |
| 179 | + return list(filter(is_aux, self.child_processes)) |
| 180 | + |
| 181 | + @property |
| 182 | + def child_processes(self): |
| 183 | + """ |
| 184 | + Returns a list of all child processes. |
| 185 | + Each process is represented by ProcessProxy object. |
| 186 | + """ |
| 187 | + |
| 188 | + try: |
| 189 | + import psutil |
| 190 | + except ImportError: |
| 191 | + raise TestgresException("psutil module is not installed") |
| 192 | + |
| 193 | + # get a list of postmaster's children |
| 194 | + children = psutil.Process(self.pid).children() |
| 195 | + |
| 196 | + return [ProcessProxy(p) for p in children] |
| 197 | + |
| 198 | + @property |
| 199 | + def source_walsender(self): |
| 200 | + """ |
| 201 | + Returns master's walsender feeding this replica. |
| 202 | + """ |
| 203 | + |
| 204 | + sql = """ |
| 205 | + select pid |
| 206 | + from pg_catalog.pg_stat_replication |
| 207 | + where application_name = $1 |
| 208 | + """ |
| 209 | + |
| 210 | + if not self.master: |
| 211 | + raise TestgresException("Node doesn't have a master") |
| 212 | + |
| 213 | + # master should be on the same host |
| 214 | + assert self.master.host == self.host |
| 215 | + |
| 216 | + with self.master.connect() as con: |
| 217 | + for row in con.execute(sql, self.name): |
| 218 | + for child in self.master.auxiliary_processes: |
| 219 | + if child.pid == int(row[0]): |
| 220 | + return child |
| 221 | + |
| 222 | + raise QueryException("Master doesn't send WAL to {}", self.name) |
130 | 223 |
|
131 | 224 | @property |
132 | 225 | def master(self): |
@@ -427,98 +520,6 @@ def status(self): |
427 | 520 | elif e.exit_code == 4: |
428 | 521 | return NodeStatus.Uninitialized |
429 | 522 |
|
430 | | - def get_main_pid(self): |
431 | | - """ |
432 | | - Return postmaster's PID if node is running, else 0. |
433 | | - """ |
434 | | - |
435 | | - if self.status(): |
436 | | - pid_file = os.path.join(self.data_dir, PG_PID_FILE) |
437 | | - with io.open(pid_file) as f: |
438 | | - return int(f.readline()) |
439 | | - |
440 | | - # for clarity |
441 | | - return 0 |
442 | | - |
443 | | - def get_child_processes(self): |
444 | | - ''' Returns child processes for this node ''' |
445 | | - |
446 | | - if psutil is None: |
447 | | - raise TestgresException("psutil module is not installed") |
448 | | - |
449 | | - try: |
450 | | - postmaster = psutil.Process(self.pid) |
451 | | - except psutil.NoSuchProcess: |
452 | | - return None |
453 | | - |
454 | | - return postmaster.children(recursive=True) |
455 | | - |
456 | | - def get_auxiliary_pids(self): |
457 | | - ''' Returns dict with pids of auxiliary processes ''' |
458 | | - |
459 | | - alternative_names = { |
460 | | - ProcessType.LogicalReplicationLauncher: [ |
461 | | - 'postgres: bgworker: logical replication launcher' |
462 | | - ], |
463 | | - ProcessType.BackgroundWriter: [ |
464 | | - 'postgres: writer', |
465 | | - ], |
466 | | - ProcessType.WalWriter: [ |
467 | | - 'postgres: wal writer', |
468 | | - ], |
469 | | - ProcessType.WalReceiver: [ |
470 | | - 'postgres: wal receiver', |
471 | | - ], |
472 | | - } |
473 | | - |
474 | | - children = self.get_child_processes() |
475 | | - if children is None: |
476 | | - return None |
477 | | - |
478 | | - result = {} |
479 | | - for child in children: |
480 | | - line = ' '.join(child.cmdline()) |
481 | | - for ptype in ProcessType: |
482 | | - if ptype == ProcessType.WalSender \ |
483 | | - and (line.startswith(ptype.value) or |
484 | | - line.startswith('postgres: wal sender')): |
485 | | - result.setdefault(ptype, []) |
486 | | - result[ptype].append(child.pid) |
487 | | - break |
488 | | - elif line.startswith(ptype.value): |
489 | | - result[ptype] = child.pid |
490 | | - break |
491 | | - elif ptype in alternative_names: |
492 | | - names = alternative_names[ptype] |
493 | | - for name in names: |
494 | | - if line.startswith(name): |
495 | | - result[ptype] = child.pid |
496 | | - break |
497 | | - |
498 | | - return result |
499 | | - |
500 | | - def get_walsender_pid(self): |
501 | | - ''' Returns pid of according walsender for replica ''' |
502 | | - |
503 | | - if not self._master: |
504 | | - raise TestgresException("This node is not a replica") |
505 | | - |
506 | | - children = self._master.get_child_processes() |
507 | | - if children is None: |
508 | | - return None |
509 | | - |
510 | | - sql = 'select application_name, client_port from pg_stat_replication' |
511 | | - for name, client_port in self._master.execute(sql): |
512 | | - if name == self.name: |
513 | | - for child in children: |
514 | | - line = ' '.join(child.cmdline()) |
515 | | - if (line.startswith(ProcessType.WalSender.value) or |
516 | | - line.startswith('postgres: wal sender')) and \ |
517 | | - str(client_port) in line: |
518 | | - return child.pid |
519 | | - |
520 | | - return None |
521 | | - |
522 | 523 | def get_control_data(self): |
523 | 524 | """ |
524 | 525 | Return contents of pg_control file. |
@@ -1079,7 +1080,7 @@ def pgbench_run(self, |
1079 | 1080 | "-U", username, |
1080 | 1081 | ] + options |
1081 | 1082 |
|
1082 | | - for key, value in six.iteritems(kwargs): |
| 1083 | + for key, value in iteritems(kwargs): |
1083 | 1084 | # rename keys for pgbench |
1084 | 1085 | key = key.replace('_', '-') |
1085 | 1086 |
|
|
0 commit comments