174 lines
5.2 KiB
Python
174 lines
5.2 KiB
Python
import os
|
|
import sys
|
|
import msvcrt
|
|
import _winapi
|
|
from pickle import load
|
|
from multiprocessing import process, util
|
|
from multiprocessing.context import set_spawning_popen
|
|
from multiprocessing.popen_spawn_win32 import Popen as _Popen
|
|
|
|
from . import reduction, spawn
|
|
|
|
|
|
__all__ = ["Popen"]
|
|
|
|
#
|
|
#
|
|
#
|
|
|
|
|
|
def _path_eq(p1, p2):
|
|
return p1 == p2 or os.path.normcase(p1) == os.path.normcase(p2)
|
|
|
|
|
|
WINENV = hasattr(sys, "_base_executable") and not _path_eq(
|
|
sys.executable, sys._base_executable
|
|
)
|
|
|
|
|
|
def _close_handles(*handles):
|
|
for handle in handles:
|
|
_winapi.CloseHandle(handle)
|
|
|
|
|
|
#
|
|
# We define a Popen class similar to the one from subprocess, but
|
|
# whose constructor takes a process object as its argument.
|
|
#
|
|
|
|
|
|
class Popen(_Popen):
|
|
"""
|
|
Start a subprocess to run the code of a process object.
|
|
|
|
We differ from cpython implementation with the way we handle environment
|
|
variables, in order to be able to modify then in the child processes before
|
|
importing any library, in order to control the number of threads in C-level
|
|
threadpools.
|
|
|
|
We also use the loky preparation data, in particular to handle main_module
|
|
inits and the loky resource tracker.
|
|
"""
|
|
|
|
method = "loky"
|
|
|
|
def __init__(self, process_obj):
|
|
prep_data = spawn.get_preparation_data(
|
|
process_obj._name, getattr(process_obj, "init_main_module", True)
|
|
)
|
|
|
|
# read end of pipe will be duplicated by the child process
|
|
# -- see spawn_main() in spawn.py.
|
|
#
|
|
# bpo-33929: Previously, the read end of pipe was "stolen" by the child
|
|
# process, but it leaked a handle if the child process had been
|
|
# terminated before it could steal the handle from the parent process.
|
|
rhandle, whandle = _winapi.CreatePipe(None, 0)
|
|
wfd = msvcrt.open_osfhandle(whandle, 0)
|
|
cmd = get_command_line(parent_pid=os.getpid(), pipe_handle=rhandle)
|
|
|
|
python_exe = spawn.get_executable()
|
|
|
|
# copy the environment variables to set in the child process
|
|
child_env = {**os.environ, **process_obj.env}
|
|
|
|
# bpo-35797: When running in a venv, we bypass the redirect
|
|
# executor and launch our base Python.
|
|
if WINENV and _path_eq(python_exe, sys.executable):
|
|
cmd[0] = python_exe = sys._base_executable
|
|
child_env["__PYVENV_LAUNCHER__"] = sys.executable
|
|
|
|
cmd = " ".join(f'"{x}"' for x in cmd)
|
|
|
|
with open(wfd, "wb") as to_child:
|
|
# start process
|
|
try:
|
|
hp, ht, pid, _ = _winapi.CreateProcess(
|
|
python_exe,
|
|
cmd,
|
|
None,
|
|
None,
|
|
False,
|
|
0,
|
|
child_env,
|
|
None,
|
|
None,
|
|
)
|
|
_winapi.CloseHandle(ht)
|
|
except BaseException:
|
|
_winapi.CloseHandle(rhandle)
|
|
raise
|
|
|
|
# set attributes of self
|
|
self.pid = pid
|
|
self.returncode = None
|
|
self._handle = hp
|
|
self.sentinel = int(hp)
|
|
self.finalizer = util.Finalize(
|
|
self, _close_handles, (self.sentinel, int(rhandle))
|
|
)
|
|
|
|
# send information to child
|
|
set_spawning_popen(self)
|
|
try:
|
|
reduction.dump(prep_data, to_child)
|
|
reduction.dump(process_obj, to_child)
|
|
finally:
|
|
set_spawning_popen(None)
|
|
|
|
|
|
def get_command_line(pipe_handle, parent_pid, **kwds):
|
|
"""Returns prefix of command line used for spawning a child process."""
|
|
if getattr(sys, "frozen", False):
|
|
return [sys.executable, "--multiprocessing-fork", pipe_handle]
|
|
else:
|
|
prog = (
|
|
"from joblib.externals.loky.backend.popen_loky_win32 import main; "
|
|
f"main(pipe_handle={pipe_handle}, parent_pid={parent_pid})"
|
|
)
|
|
opts = util._args_from_interpreter_flags()
|
|
return [
|
|
spawn.get_executable(),
|
|
*opts,
|
|
"-c",
|
|
prog,
|
|
"--multiprocessing-fork",
|
|
]
|
|
|
|
|
|
def is_forking(argv):
|
|
"""Return whether commandline indicates we are forking."""
|
|
if len(argv) >= 2 and argv[1] == "--multiprocessing-fork":
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def main(pipe_handle, parent_pid=None):
|
|
"""Run code specified by data received over pipe."""
|
|
assert is_forking(sys.argv), "Not forking"
|
|
|
|
if parent_pid is not None:
|
|
source_process = _winapi.OpenProcess(
|
|
_winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, False, parent_pid
|
|
)
|
|
else:
|
|
source_process = None
|
|
new_handle = reduction.duplicate(
|
|
pipe_handle, source_process=source_process
|
|
)
|
|
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
|
|
parent_sentinel = source_process
|
|
|
|
with os.fdopen(fd, "rb", closefd=True) as from_parent:
|
|
process.current_process()._inheriting = True
|
|
try:
|
|
preparation_data = load(from_parent)
|
|
spawn.prepare(preparation_data, parent_sentinel)
|
|
self = load(from_parent)
|
|
finally:
|
|
del process.current_process()._inheriting
|
|
|
|
exitcode = self._bootstrap(parent_sentinel)
|
|
sys.exit(exitcode)
|