14
14
15
15
import os
16
16
import socket
17
+ from typing import Dict , List
17
18
18
19
from pytorch_lightning import _logger as log
19
20
from pytorch_lightning .plugins .environments import ClusterEnvironment
21
+ from pytorch_lightning .utilities import rank_zero_deprecation
22
+ from pytorch_lightning .utilities .cloud_io import get_filesystem
20
23
21
24
22
25
class LSFEnvironment (ClusterEnvironment ):
@@ -25,128 +28,161 @@ class LSFEnvironment(ClusterEnvironment):
25
28
It is expected that any execution using this ClusterEnvironment was executed
26
29
using the Job Step Manager i.e. ``jsrun``.
27
30
28
- This plugin expects the following environment variables.
31
+ This plugin expects the following environment variables:
29
32
30
- LSB_JOBID:
31
- The LSF assigned job ID
33
+ `` LSB_JOBID``
34
+ The LSF assigned job ID
32
35
33
- LSB_HOSTS:
34
- The hosts used in the job. This string is expected to have the format "batch <rank_0_host> ...."
36
+ ``LSB_DJOB_RANKFILE``
37
+ The OpenMPI compatibile rank file for the LSF job
35
38
36
- JSM_NAMESPACE_LOCAL_RANK:
37
- The node local rank for the task. This environment variable is set by jsrun
39
+ `` JSM_NAMESPACE_LOCAL_RANK``
40
+ The node local rank for the task. This environment variable is set by `` jsrun``
38
41
39
- JSM_NAMESPACE_SIZE:
40
- The world size for the task. This environment variable is set by jsrun
41
- """
42
+ ``JSM_NAMESPACE_SIZE``
43
+ The world size for the task. This environment variable is set by ``jsrun``
42
44
43
- def __init__ (self ):
44
- self ._master_address = self ._get_master_address ()
45
- self ._master_port = self ._get_master_port ()
46
- log .debug (f"MASTER_ADDR: { self ._master_address } " )
47
- log .debug (f"MASTER_PORT: { self ._master_port } " )
45
+ ``JSM_NAMESPACE_RANK``
46
+ The global rank for the task. This environment variable is set by ``jsrun``
47
+ """
48
48
49
- @staticmethod
50
- def is_using_lsf () -> bool :
51
- """Returns ``True`` if the current process was launched using the jsrun command."""
52
- required_env_vars = ("LSB_JOBID" , "LSB_HOSTS" , "JSM_NAMESPACE_LOCAL_RANK" , "JSM_NAMESPACE_SIZE" )
53
- return all (v in os .environ for v in required_env_vars )
49
+ def __init__ (self ) -> None :
50
+ super ().__init__ ()
51
+ # TODO: remove in 1.7
52
+ if hasattr (self , "is_using_lsf" ) and callable (self .is_using_lsf ):
53
+ rank_zero_deprecation (
54
+ f"`{ self .__class__ .__name__ } .is_using_lsf` has been deprecated in v1.6 and will be removed in v1.7."
55
+ " Implement the static method `detect()` instead (do not forget to add the `@staticmethod` decorator)."
56
+ )
57
+ self ._main_address = self ._get_main_address ()
58
+ self ._main_port = self ._get_main_port ()
59
+ self ._node_rank = self ._get_node_rank ()
60
+ self ._set_init_progress_group_env_vars ()
61
+
62
+ def _set_init_progress_group_env_vars (self ) -> None :
63
+ # set environment variables needed for initializing torch distributed process group
64
+ os .environ ["MASTER_ADDR" ] = str (self ._main_address )
65
+ log .debug (f"MASTER_ADDR: { os .environ ['MASTER_ADDR' ]} " )
66
+ os .environ ["MASTER_PORT" ] = str (self ._main_port )
67
+ log .debug (f"MASTER_PORT: { os .environ ['MASTER_PORT' ]} " )
54
68
55
69
@property
56
70
def creates_processes_externally (self ) -> bool :
71
+ """LSF creates subprocesses, i.e., PyTorch Lightning does not need to spawn them."""
57
72
return True
58
73
59
- def master_address (self ):
60
- """The master address is read from a list of hosts contained in the environment variable `LSB_HOSTS`."""
61
- return self ._master_address
74
+ def master_address (self ) -> str :
75
+ """The main address is read from an OpenMPI host rank file in the environment variable
76
+ ``LSB_DJOB_RANKFILE``."""
77
+ return self ._main_address
78
+
79
+ def master_port (self ) -> int :
80
+ """The main port is calculated from the LSF job ID."""
81
+ return self ._main_port
62
82
63
- def master_port (self ):
64
- """THe master port gets calculated from the LSF job ID."""
65
- return self ._master_port
83
+ @staticmethod
84
+ def is_using_lsf () -> bool :
85
+ """Returns ``True`` if the current process was launched using the ``jsrun`` command."""
86
+ required_env_vars = {"LSB_JOBID" , "LSB_DJOB_RANKFILE" , "JSM_NAMESPACE_LOCAL_RANK" , "JSM_NAMESPACE_SIZE" }
87
+ return required_env_vars .issubset (os .environ .keys ())
66
88
67
- def world_size (self ):
68
- """The world size is read from the environment variable `JSM_NAMESPACE_SIZE`."""
69
- var = "JSM_NAMESPACE_SIZE"
70
- world_size = os .environ .get (var )
89
+ def world_size (self ) -> int :
90
+ """The world size is read from the environment variable ``JSM_NAMESPACE_SIZE``."""
91
+ world_size = os .environ .get ("JSM_NAMESPACE_SIZE" )
71
92
if world_size is None :
72
93
raise ValueError (
73
- f "Cannot determine world size from environment variable { var } ."
74
- " Make sure you run your executable with `jsrun`"
94
+ "Cannot determine world size. Environment variable `JSM_NAMESPACE_SIZE` not found ."
95
+ "Make sure you run your executable with `jsrun`. "
75
96
)
76
97
return int (world_size )
77
98
78
99
def set_world_size (self , size : int ) -> None :
79
100
log .debug ("LSFEnvironment.set_world_size was called, but setting world size is not allowed. Ignored." )
80
101
81
- def global_rank (self ):
82
- """The world size is read from the environment variable `JSM_NAMESPACE_RANK`."""
83
- var = "JSM_NAMESPACE_RANK"
84
- global_rank = os .environ .get (var )
102
+ def global_rank (self ) -> int :
103
+ """The world size is read from the environment variable ``JSM_NAMESPACE_RANK``."""
104
+ global_rank = os .environ .get ("JSM_NAMESPACE_RANK" )
85
105
if global_rank is None :
86
106
raise ValueError (
87
- f "Cannot determine global rank from environment variable { var } ."
88
- " Make sure you run your executable with `jsrun`"
107
+ "Cannot determine global rank. Environment variable `JSM_NAMESPACE_RANK` not found ."
108
+ "Make sure you run your executable with `jsrun`. "
89
109
)
90
110
return int (global_rank )
91
111
92
112
def set_global_rank (self , rank : int ) -> None :
93
113
log .debug ("LSFEnvironment.set_global_rank was called, but setting global rank is not allowed. Ignored." )
94
114
95
- def local_rank (self ):
115
+ def local_rank (self ) -> int :
96
116
"""The local rank is read from the environment variable `JSM_NAMESPACE_LOCAL_RANK`."""
97
- var = "JSM_NAMESPACE_LOCAL_RANK"
98
- local_rank = os .environ .get (var )
117
+ local_rank = os .environ .get ("JSM_NAMESPACE_LOCAL_RANK" )
99
118
if local_rank is None :
100
119
raise ValueError (
101
- f "Cannot determine local rank from environment variable { var } ."
102
- " Make sure you run your executable with `jsrun`"
120
+ "Cannot determine local rank. Environment variable `JSM_NAMESPACE_LOCAL_RANK` not found ."
121
+ "Make sure you run your executable with `jsrun`. "
103
122
)
104
123
return int (local_rank )
105
124
106
- def node_rank (self ):
107
- """The node rank is determined by the position of the current hostname in the list of hosts stored in the
108
- environment variable `LSB_HOSTS`."""
125
+ def node_rank (self ) -> int :
126
+ """The node rank is determined by the position of the current hostname in the OpenMPI host rank file stored
127
+ in ``LSB_DJOB_RANKFILE``."""
128
+ return self ._node_rank
129
+
130
+ def _get_node_rank (self ) -> int :
131
+ """A helper method for getting the node rank.
132
+
133
+ The node rank is determined by the position of the current node in the list of hosts used in the job. This is
134
+ calculated by reading all hosts from ``LSB_DJOB_RANKFILE`` and finding this node's hostname in the list.
135
+ """
109
136
hosts = self ._read_hosts ()
110
- count = {}
137
+ count : Dict [ str , int ] = {}
111
138
for host in hosts :
112
- if "batch" in host or "login" in host :
113
- continue
114
139
if host not in count :
115
140
count [host ] = len (count )
116
141
return count [socket .gethostname ()]
117
142
118
143
@staticmethod
119
- def _read_hosts ():
120
- hosts = os .environ .get ("LSB_HOSTS" )
121
- if not hosts :
122
- raise ValueError ("Could not find hosts in environment variable LSB_HOSTS" )
123
- hosts = hosts .split ()
124
- if len (hosts ) < 2 :
125
- raise ValueError (
126
- 'Cannot parse hosts from LSB_HOSTS environment variable. Expected format: "batch <rank_0_host> ..."'
127
- )
128
- return hosts
144
+ def _read_hosts () -> List [str ]:
145
+ """Read compute hosts that are a part of the compute job.
129
146
130
- def _get_master_address (self ):
147
+ LSF uses the Job Step Manager (JSM) to manage job steps. Job steps are executed by the JSM from "launch" nodes.
148
+ Each job is assigned a launch node. This launch node will be the first node in the list contained in
149
+ ``LSB_DJOB_RANKFILE``.
150
+ """
151
+ var = "LSB_DJOB_RANKFILE"
152
+ rankfile = os .environ .get (var )
153
+ if rankfile is None :
154
+ raise ValueError ("Did not find the environment variable `LSB_DJOB_RANKFILE`" )
155
+ if not rankfile :
156
+ raise ValueError ("The environment variable `LSB_DJOB_RANKFILE` is empty" )
157
+
158
+ fs = get_filesystem (rankfile )
159
+ with fs .open (rankfile , "r" ) as f :
160
+ ret = [line .strip () for line in f ]
161
+ # remove the launch node (i.e. the first node in LSB_DJOB_RANKFILE) from the list
162
+ return ret [1 :]
163
+
164
+ def _get_main_address (self ) -> str :
165
+ """A helper for getting the main address.
166
+
167
+ The main address is assigned to the first node in the list of nodes used for the job.
168
+ """
131
169
hosts = self ._read_hosts ()
132
- return hosts [1 ]
170
+ return hosts [0 ]
133
171
134
172
@staticmethod
135
- def _get_master_port () :
136
- """A helper function for accessing the master port.
173
+ def _get_main_port () -> int :
174
+ """A helper function for accessing the main port.
137
175
138
- Uses the LSF job ID so all ranks can compute the master port.
176
+ Uses the LSF job ID so all ranks can compute the main port.
139
177
"""
140
- # check for user-specified master port
141
- port = os .environ .get ("MASTER_PORT" )
142
- if not port :
143
- jobid = os .environ .get ("LSB_JOBID" )
144
- if not jobid :
145
- raise ValueError ("Could not find job id in environment variable LSB_JOBID" )
146
- port = int (jobid )
178
+ # check for user-specified main port
179
+ if "MASTER_PORT" in os .environ :
180
+ log .debug (f"Using externally specified main port: { os .environ ['MASTER_PORT' ]} " )
181
+ return int (os .environ ["MASTER_PORT" ])
182
+ if "LSB_JOBID" in os .environ :
183
+ port = int (os .environ ["LSB_JOBID" ])
147
184
# all ports should be in the 10k+ range
148
- port = int (port ) % 1000 + 10000
149
- log .debug (f"calculated LSF master port: { port } " )
150
- else :
151
- log .debug (f"using externally specified master port: { port } " )
152
- return int (port )
185
+ port = port % 1000 + 10000
186
+ log .debug (f"calculated LSF main port: { port } " )
187
+ return port
188
+ raise ValueError ("Could not find job id in environment variable LSB_JOBID" )
0 commit comments