文档列举了在实际编程中使用信号量控制进程结束的常见场景,并提供了一些代码示例来说明如何确保在程序接收到终止信号时,能够平稳地释放资源、保存状态并正确退出。
1. subprocess 的 preexec_fn
当使用 subprocess.Popen
设置 preexec_fn=os.setsid
时,子进程会在一个新的会话中启动,并且该子进程及其子进程组成一个独立的进程组。由于这些进程不再属于主进程组,因此通常的 SIGINT
(如按 Ctrl+C
)不会终止它们。
p = subprocess.Popen('sleep 30', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, preexec_fn=os.setsid)
退出方法:
terminate_event = threading.Event()
def start_subprocess():
p = subprocess.Popen(
'sleep 30', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, preexec_fn=os.setsid)
def signal_handler(sig, frame):
print("SIGINT received, setting terminate event...")
terminate_event.set() # 触发事件,通知主线程
signal.signal(signal.SIGINT, signal_handler)
# 等待子进程完成或接收到终止事件
while p.poll() is None:
if terminate_event.is_set():
print("Terminating child process...")
os.killpg(os.getpgid(p.pid), signal.SIGTERM)
break
out, _ = p.communicate() # 等待子进程结束
print(out.decode())
# 启动子进程
try:
start_subprocess()
except Exception as e:
print(f"Error: {e}")
如下述代码所示,闭包中的 signal_handler
经过 signal.signal
注册后,就成为了全局的 SIGINT 信号的处理器。假设后续仍然有其他的线程代码需要处理 SIGINT 信号,则需要在该 signal_handler
中进行处理。
def start_subprocess():
......
def signal_handler(sig, frame):
print("SIGINT received, setting terminate event...")
terminate_event.set() # 触发事件,通知主线程
signal.signal(signal.SIGINT, signal_handler)
......
一个信号量只能被注册一次,后来的注册会覆盖之前的注册。
2. ThreadPoolExecutor 的优雅退出
2.1 问题描述与猜想
当同时使用 executor.shutdown(wait=False, cancel_future=True)
与 as_completed()
时,脚本会陷入死锁状态。
wait=False
,代表不管子线程的资源回收,程序的控制权会交到主线程。
cancel_future=True
,则会将剩余的线程设置为 CANCELLED
状态。
当调用 shutdown
时,线程的运行控制权,会回归到主线程的 as_completed
源码,只有当函数处于:[CANCELLED_AND_NOTIFIED, FINISHED])
两种状态时,才会被当做 finished。而 shutdown
只会将剩余的未开始的函数设置为 CANCELLED
状态,所以 as_completed
会一直卡在 waiter.event.wait(wait_timeout)
这一行代码上,导致整个程序进入死锁状态。[1]
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), total_futures))
waiter.event.wait(wait_timeout)
<center> code 2-1: as_completed 源码 </center>
为了避免 as_completed
死锁状态,可以将代码切换为手动遍历 futures 对象,如代码 2-3 所示。但所需要注意的是,当在 macos 上运行 future 任务超过 7 万个时,程序在处理异常退出时,容易卡住。
with ThreadPoolExecutor(2) as pool:
futures = [
pool.submit(task, i) for i in range(1,10)
]
for future in as_completed(futures):
print([f._state for f in futures])
try:
n = future.result()
print(f"{datetime.now()} - {n}")
except ValueError as e:
print(f"{datetime.now()} - EXCEPTION! {e}")
pool.shutdown(wait=False, cancel_futures=True)
<center> code 2-2: as_completed 遍历 futures 对象 </center>
with ThreadPoolExecutor(2) as pool:
futures = [
pool.submit(task, i) for i in range(1,10)
]
for future in futures:
print([f._state for f in futures])
if future.cancelled():
continue
try:
n = future.result()
print(f"{datetime.now()} - {n}")
except ValueError as e:
print(f"{datetime.now()} - EXCEPTION! {e}")
pool.shutdown(wait=False, cancel_futures=True)
<center> code 2-3: 手动遍历 futures 对象 </center>
2.2 解决方案
上述结论属于在 macos 下使用 ThreadPoolExecutor
时,设置总任务数量超过 7 万条时(即 futures 超过 7 万个),猜想 1.
但通过实践,将任务数量分隔为 1 万每次,则在遇到信号量时顺畅结束,如代码 2-4 所示,猜测是 futures 越多,python3 处理器则需要花费更多的时间销毁申请的资源,但为何间隔在 7 万,比较难以理解。
在此期间为了排除是 ulimit -n
的影响,已经将 open files 设置为了 1000、5000、10000、100000 等级别,都没有任务帮助。所以排除是 open files 带来的影响。
batch_size = 10000
executor = ThreadPoolExecutor(max_workers=20)
with tqdm(total=len(numbers), miniters=1) as pbar:
for step in range(0, len(numbers), batch_size):
batch = numbers[step: step + batch_size]
futures = [executor.submit(info_workers, asnumber) for number in batch]
for future in as_completed(futures):
future.result()
pbar.update(1)
<center> code 2-4: 分隔任务 </center>
Comments NOTHING