Fix bpo-30589: improve Process.exitcode with forkserver (#1989) · python/cpython@dfd5f34

@@ -6,6 +6,7 @@

66

import struct

77

import sys

88

import threading

9+

import warnings

9101011

from . import connection

1112

from . import process

@@ -22,7 +23,7 @@

2223

#

23242425

MAXFDS_TO_SEND = 256

25-

UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t

26+

SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t

26272728

#

2829

# Forkserver class

@@ -148,21 +149,33 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):

148149149150

util._close_stdin()

150151151-

# ignoring SIGCHLD means no need to reap zombie processes;

152+

sig_r, sig_w = os.pipe()

153+

os.set_blocking(sig_w, False)

154+155+

def sigchld_handler(*_unused):

156+

try:

157+

os.write(sig_w, b'.')

158+

except BlockingIOError:

159+

pass

160+152161

# letting SIGINT through avoids KeyboardInterrupt tracebacks

153162

handlers = {

154-

signal.SIGCHLD: signal.SIG_IGN,

163+

signal.SIGCHLD: sigchld_handler,

155164

signal.SIGINT: signal.SIG_DFL,

156165

}

157166

old_handlers = {sig: signal.signal(sig, val)

158167

for (sig, val) in handlers.items()}

159168169+

# map child pids to client fds

170+

pid_to_fd = {}

171+160172

with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \

161173

selectors.DefaultSelector() as selector:

162174

_forkserver._forkserver_address = listener.getsockname()

163175164176

selector.register(listener, selectors.EVENT_READ)

165177

selector.register(alive_r, selectors.EVENT_READ)

178+

selector.register(sig_r, selectors.EVENT_READ)

166179167180

while True:

168181

try:

@@ -176,62 +189,100 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):

176189

assert os.read(alive_r, 1) == b''

177190

raise SystemExit

178191179-

assert listener in rfds

180-

with listener.accept()[0] as s:

181-

code = 1

182-

if os.fork() == 0:

192+

if sig_r in rfds:

193+

# Got SIGCHLD

194+

os.read(sig_r, 65536) # exhaust

195+

while True:

196+

# Scan for child processes

183197

try:

184-

_serve_one(s, listener, alive_r, old_handlers)

185-

except Exception:

186-

sys.excepthook(*sys.exc_info())

187-

sys.stderr.flush()

188-

finally:

189-

os._exit(code)

198+

pid, sts = os.waitpid(-1, os.WNOHANG)

199+

except ChildProcessError:

200+

break

201+

if pid == 0:

202+

break

203+

child_w = pid_to_fd.pop(pid, None)

204+

if child_w is not None:

205+

if os.WIFSIGNALED(sts):

206+

returncode = -os.WTERMSIG(sts)

207+

else:

208+

assert os.WIFEXITED(sts)

209+

returncode = os.WEXITSTATUS(sts)

210+

# Write the exit code to the pipe

211+

write_signed(child_w, returncode)

212+

os.close(child_w)

213+

else:

214+

# This shouldn't happen really

215+

warnings.warn('forkserver: waitpid returned '

216+

'unexpected pid %d' % pid)

217+218+

if listener in rfds:

219+

# Incoming fork request

220+

with listener.accept()[0] as s:

221+

# Receive fds from client

222+

fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)

223+

assert len(fds) <= MAXFDS_TO_SEND

224+

child_r, child_w, *fds = fds

225+

s.close()

226+

pid = os.fork()

227+

if pid == 0:

228+

# Child

229+

code = 1

230+

try:

231+

listener.close()

232+

code = _serve_one(child_r, fds,

233+

(alive_r, child_w, sig_r, sig_w),

234+

old_handlers)

235+

except Exception:

236+

sys.excepthook(*sys.exc_info())

237+

sys.stderr.flush()

238+

finally:

239+

os._exit(code)

240+

else:

241+

# Send pid to client processes

242+

write_signed(child_w, pid)

243+

pid_to_fd[pid] = child_w

244+

os.close(child_r)

245+

for fd in fds:

246+

os.close(fd)

190247191248

except OSError as e:

192249

if e.errno != errno.ECONNABORTED:

193250

raise

194251195-

def _serve_one(s, listener, alive_r, handlers):

252+253+

def _serve_one(child_r, fds, unused_fds, handlers):

196254

# close unnecessary stuff and reset signal handlers

197-

listener.close()

198-

os.close(alive_r)

199255

for sig, val in handlers.items():

200256

signal.signal(sig, val)

257+

for fd in unused_fds:

258+

os.close(fd)

201259202-

# receive fds from parent process

203-

fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)

204-

s.close()

205-

assert len(fds) <= MAXFDS_TO_SEND

206-

(child_r, child_w, _forkserver._forkserver_alive_fd,

207-

stfd, *_forkserver._inherited_fds) = fds

208-

semaphore_tracker._semaphore_tracker._fd = stfd

209-210-

# send pid to client processes

211-

write_unsigned(child_w, os.getpid())

260+

(_forkserver._forkserver_alive_fd,

261+

semaphore_tracker._semaphore_tracker._fd,

262+

*_forkserver._inherited_fds) = fds

212263213-

# run process object received over pipe

264+

# Run process object received over pipe

214265

code = spawn._main(child_r)

215266216-

# write the exit code to the pipe

217-

write_unsigned(child_w, code)

267+

return code

268+218269219270

#

220-

# Read and write unsigned numbers

271+

# Read and write signed numbers

221272

#

222273223-

def read_unsigned(fd):

274+

def read_signed(fd):

224275

data = b''

225-

length = UNSIGNED_STRUCT.size

276+

length = SIGNED_STRUCT.size

226277

while len(data) < length:

227278

s = os.read(fd, length - len(data))

228279

if not s:

229280

raise EOFError('unexpected EOF')

230281

data += s

231-

return UNSIGNED_STRUCT.unpack(data)[0]

282+

return SIGNED_STRUCT.unpack(data)[0]

232283233-

def write_unsigned(fd, n):

234-

msg = UNSIGNED_STRUCT.pack(n)

284+

def write_signed(fd, n):

285+

msg = SIGNED_STRUCT.pack(n)

235286

while msg:

236287

nbytes = os.write(fd, msg)

237288

if nbytes == 0: