341 lines
14 KiB
Python
341 lines
14 KiB
Python
r"""
|
|
`torch.distributed.launch` is a module that spawns up multiple distributed
|
|
training processes on each of the training nodes.
|
|
|
|
The utility can be used for single-node distributed training, in which one or
|
|
more processes per node will be spawned. The utility can be used for either
|
|
CPU training or GPU training. If the utility is used for GPU training,
|
|
each distributed process will be operating on a single GPU. This can achieve
|
|
well-improved single-node training performance. It can also be used in
|
|
multi-node distributed training, by spawning up multiple processes on each node
|
|
for well-improved multi-node distributed training performance as well.
|
|
This will especially be benefitial for systems with multiple Infiniband
|
|
interfaces that have direct-GPU support, since all of them can be utilized for
|
|
aggregated communication bandwidth.
|
|
|
|
In both cases of single-node distributed training or multi-node distributed
|
|
training, this utility will launch the given number of processes per node
|
|
(``--nproc_per_node``). If used for GPU training, this number needs to be less
|
|
or equal to the number of GPUs on the current system (``nproc_per_node``),
|
|
and each process will be operating on a single GPU from *GPU 0 to
|
|
GPU (nproc_per_node - 1)*.
|
|
|
|
**How to use this module:**
|
|
|
|
1. Single-Node multi-process distributed training
|
|
|
|
::
|
|
|
|
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
|
|
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
|
|
arguments of your training script)
|
|
|
|
2. Multi-Node multi-process distributed training: (e.g. two nodes)
|
|
|
|
|
|
Node 1: *(IP: 192.168.1.1, and has a free port: 1234)*
|
|
|
|
::
|
|
|
|
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
|
|
--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
|
|
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
|
|
and all other arguments of your training script)
|
|
|
|
Node 2:
|
|
|
|
::
|
|
|
|
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
|
|
--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
|
|
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
|
|
and all other arguments of your training script)
|
|
|
|
3. To look up what optional arguments this module offers:
|
|
|
|
::
|
|
|
|
>>> python -m torch.distributed.launch --help
|
|
|
|
|
|
**Important Notices:**
|
|
|
|
1. This utility and multi-process distributed (single-node or
|
|
multi-node) GPU training currently only achieves the best performance using
|
|
the NCCL distributed backend. Thus NCCL backend is the recommended backend to
|
|
use for GPU training.
|
|
|
|
2. In your training program, you must parse the command-line argument:
|
|
``--local_rank=LOCAL_PROCESS_RANK``, which will be provided by this module.
|
|
If your training program uses GPUs, you should ensure that your code only
|
|
runs on the GPU device of LOCAL_PROCESS_RANK. This can be done by:
|
|
|
|
Parsing the local_rank argument
|
|
|
|
::
|
|
|
|
>>> import argparse
|
|
>>> parser = argparse.ArgumentParser()
|
|
>>> parser.add_argument("--local_rank", type=int)
|
|
>>> args = parser.parse_args()
|
|
|
|
Set your device to local rank using either
|
|
|
|
::
|
|
|
|
>>> torch.cuda.set_device(args.local_rank) # before your code runs
|
|
|
|
or
|
|
|
|
::
|
|
|
|
>>> with torch.cuda.device(args.local_rank):
|
|
>>> # your code to run
|
|
|
|
3. In your training program, you are supposed to call the following function
|
|
at the beginning to start the distributed backend. You need to make sure that
|
|
the init_method uses ``env://``, which is the only supported ``init_method``
|
|
by this module.
|
|
|
|
::
|
|
|
|
torch.distributed.init_process_group(backend='YOUR BACKEND',
|
|
init_method='env://')
|
|
|
|
4. In your training program, you can either use regular distributed functions
|
|
or use :func:`torch.nn.parallel.DistributedDataParallel` module. If your
|
|
training program uses GPUs for training and you would like to use
|
|
:func:`torch.nn.parallel.DistributedDataParallel` module,
|
|
here is how to configure it.
|
|
|
|
::
|
|
|
|
model = torch.nn.parallel.DistributedDataParallel(model,
|
|
device_ids=[args.local_rank],
|
|
output_device=args.local_rank)
|
|
|
|
Please ensure that ``device_ids`` argument is set to be the only GPU device id
|
|
that your code will be operating on. This is generally the local rank of the
|
|
process. In other words, the ``device_ids`` needs to be ``[args.local_rank]``,
|
|
and ``output_device`` needs to be ``args.local_rank`` in order to use this
|
|
utility
|
|
|
|
5. Another way to pass ``local_rank`` to the subprocesses via environment variable
|
|
``LOCAL_RANK``. This behavior is enabled when you launch the script with
|
|
``--use_env=True``. You must adjust the subprocess example above to replace
|
|
``args.local_rank`` with ``os.environ['LOCAL_RANK']``; the launcher
|
|
will not pass ``--local_rank`` when you specify this flag.
|
|
|
|
.. warning::
|
|
|
|
``local_rank`` is NOT globally unique: it is only unique per process
|
|
on a machine. Thus, don't use it to decide if you should, e.g.,
|
|
write to a networked filesystem. See
|
|
https://github.com/pytorch/pytorch/issues/12042 for an example of
|
|
how things can go wrong if you don't do this correctly.
|
|
|
|
"""
|
|
|
|
|
|
import time
|
|
import signal
|
|
import sys
|
|
import subprocess
|
|
import os
|
|
from argparse import ArgumentParser, REMAINDER
|
|
from typing import Optional, IO, List, Any
|
|
|
|
node_local_rank_stdout_filename = "node_{}_local_rank_{}_stdout"
|
|
node_local_rank_stderr_filename = "node_{}_local_rank_{}_stderr"
|
|
|
|
def parse_args():
|
|
"""
|
|
Helper function parsing the command line options
|
|
@retval ArgumentParser
|
|
"""
|
|
parser = ArgumentParser(description="PyTorch distributed training launch "
|
|
"helper utility that will spawn up "
|
|
"multiple distributed processes")
|
|
|
|
# Optional arguments for the launch helper
|
|
parser.add_argument("--nnodes", type=int, default=1,
|
|
help="The number of nodes to use for distributed "
|
|
"training")
|
|
parser.add_argument("--node_rank", type=int, default=0,
|
|
help="The rank of the node for multi-node distributed "
|
|
"training")
|
|
parser.add_argument("--nproc_per_node", type=int, default=1,
|
|
help="The number of processes to launch on each node, "
|
|
"for GPU training, this is recommended to be set "
|
|
"to the number of GPUs in your system so that "
|
|
"each process can be bound to a single GPU.")
|
|
parser.add_argument("--master_addr", default="127.0.0.1", type=str,
|
|
help="Master node (rank 0)'s address, should be either "
|
|
"the IP address or the hostname of node 0, for "
|
|
"single node multi-proc training, the "
|
|
"--master_addr can simply be 127.0.0.1")
|
|
parser.add_argument("--master_port", default=29500, type=int,
|
|
help="Master node (rank 0)'s free port that needs to "
|
|
"be used for communication during distributed "
|
|
"training")
|
|
parser.add_argument("--use_env", default=False, action="store_true",
|
|
help="Use environment variable to pass "
|
|
"'local rank'. For legacy reasons, the default value is False. "
|
|
"If set to True, the script will not pass "
|
|
"--local_rank as argument, and will instead set LOCAL_RANK.")
|
|
parser.add_argument("-m", "--module", default=False, action="store_true",
|
|
help="Changes each process to interpret the launch script "
|
|
"as a python module, executing with the same behavior as"
|
|
"'python -m'.")
|
|
parser.add_argument("--no_python", default=False, action="store_true",
|
|
help="Do not prepend the training script with \"python\" - just exec "
|
|
"it directly. Useful when the script is not a Python script.")
|
|
parser.add_argument(
|
|
"--logdir",
|
|
default=None,
|
|
type=str,
|
|
help=f"""Relative path to write subprocess logs to. Passing in a relative
|
|
path will create a directory if needed, and write the stdout and stderr to files
|
|
{node_local_rank_stdout_filename} and {node_local_rank_stderr_filename}. Note that
|
|
successive runs with the same path to write logs to will overwrite existing logs,
|
|
so be sure to save logs as needed.""",
|
|
)
|
|
|
|
# positional
|
|
parser.add_argument("training_script", type=str,
|
|
help="The full path to the single GPU training "
|
|
"program/script to be launched in parallel, "
|
|
"followed by all the arguments for the "
|
|
"training script")
|
|
|
|
# rest from the training program
|
|
parser.add_argument('training_script_args', nargs=REMAINDER)
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
# world size in terms of number of processes
|
|
dist_world_size = args.nproc_per_node * args.nnodes
|
|
|
|
# set PyTorch distributed related environmental variables
|
|
current_env = os.environ.copy()
|
|
current_env["MASTER_ADDR"] = args.master_addr
|
|
current_env["MASTER_PORT"] = str(args.master_port)
|
|
current_env["WORLD_SIZE"] = str(dist_world_size)
|
|
|
|
processes: List[Any] = []
|
|
|
|
if 'OMP_NUM_THREADS' not in os.environ and args.nproc_per_node > 1:
|
|
current_env["OMP_NUM_THREADS"] = str(1)
|
|
print("*****************************************\n"
|
|
"Setting OMP_NUM_THREADS environment variable for each process "
|
|
"to be {} in default, to avoid your system being overloaded, "
|
|
"please further tune the variable for optimal performance in "
|
|
"your application as needed. \n"
|
|
"*****************************************".format(current_env["OMP_NUM_THREADS"]))
|
|
|
|
if args.logdir:
|
|
# Possibly create the directory to write subprocess log output to.
|
|
if os.path.exists(args.logdir):
|
|
if not os.path.isdir(args.logdir):
|
|
raise ValueError("argument --logdir must be a path to a directory.")
|
|
else:
|
|
# create the relative directory
|
|
os.mkdir(os.path.join(os.getcwd(), args.logdir))
|
|
|
|
subprocess_file_handles = []
|
|
|
|
for local_rank in range(0, args.nproc_per_node):
|
|
# each process's rank
|
|
dist_rank = args.nproc_per_node * args.node_rank + local_rank
|
|
current_env["RANK"] = str(dist_rank)
|
|
current_env["LOCAL_RANK"] = str(local_rank)
|
|
|
|
# spawn the processes
|
|
with_python = not args.no_python
|
|
cmd = []
|
|
if with_python:
|
|
cmd = [sys.executable, "-u"]
|
|
if args.module:
|
|
cmd.append("-m")
|
|
else:
|
|
if not args.use_env:
|
|
raise ValueError("When using the '--no_python' flag, you must also set the '--use_env' flag.")
|
|
if args.module:
|
|
raise ValueError("Don't use both the '--no_python' flag and the '--module' flag at the same time.")
|
|
|
|
cmd.append(args.training_script)
|
|
|
|
if not args.use_env:
|
|
cmd.append("--local_rank={}".format(local_rank))
|
|
|
|
cmd.extend(args.training_script_args)
|
|
|
|
stdout_handle: Optional[IO]
|
|
stderr_handle: Optional[IO]
|
|
if args.logdir:
|
|
directory_path = os.path.join(os.getcwd(), args.logdir)
|
|
node_rank = args.node_rank
|
|
stdout_file_name = node_local_rank_stdout_filename.format(node_rank, local_rank)
|
|
stderr_file_name = node_local_rank_stderr_filename.format(node_rank, local_rank)
|
|
stdout_handle = open(os.path.join(directory_path, stdout_file_name), "w")
|
|
stderr_handle = open(os.path.join(directory_path, stderr_file_name), "w")
|
|
subprocess_file_handles.append((stdout_handle, stderr_handle))
|
|
stdout_name = stdout_handle.name
|
|
stderr_name = stderr_handle.name
|
|
print(f"""Note: Stdout and stderr for node {node_rank} rank {local_rank} will
|
|
be written to {stdout_name}, {stderr_name} respectively.""")
|
|
|
|
sig_names = {2: "SIGINT", 15: "SIGTERM"}
|
|
last_return_code = None
|
|
|
|
def sigkill_handler(signum, frame):
|
|
for process in processes:
|
|
print(f"Killing subprocess {process.pid}")
|
|
try:
|
|
process.kill()
|
|
except Exception:
|
|
pass
|
|
if last_return_code is not None:
|
|
raise subprocess.CalledProcessError(returncode=last_return_code, cmd=cmd)
|
|
if signum in sig_names:
|
|
print(f"Main process received {sig_names[signum]}, exiting")
|
|
sys.exit(1)
|
|
|
|
# pass SIGINT/SIGTERM to children if the parent is being terminated
|
|
signal.signal(signal.SIGINT, sigkill_handler)
|
|
signal.signal(signal.SIGTERM, sigkill_handler)
|
|
|
|
stdout_handle = None if not subprocess_file_handles else subprocess_file_handles[local_rank][0]
|
|
stderr_handle = None if not subprocess_file_handles else subprocess_file_handles[local_rank][1]
|
|
process = subprocess.Popen(cmd, env=current_env, stdout=stdout_handle, stderr=stderr_handle)
|
|
processes.append(process)
|
|
|
|
try:
|
|
alive_processes = set(processes)
|
|
while len(alive_processes):
|
|
finished_processes = []
|
|
for process in alive_processes:
|
|
if process.poll() is None:
|
|
# the process is still running
|
|
continue
|
|
else:
|
|
if process.returncode != 0:
|
|
last_return_code = process.returncode # for sigkill_handler
|
|
sigkill_handler(signal.SIGTERM, None) # not coming back
|
|
else:
|
|
# exited cleanly
|
|
finished_processes.append(process)
|
|
alive_processes = set(alive_processes) - set(finished_processes)
|
|
|
|
time.sleep(1)
|
|
finally:
|
|
# close open file descriptors
|
|
for (stdout_handle, stderr_handle) in subprocess_file_handles:
|
|
stdout_handle.close()
|
|
stderr_handle.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|