python中信号量处理与优雅退出

youncyb 发布于 22 天前 99 次阅读 Python


文档列举了在实际编程中使用信号量控制进程结束的常见场景,并提供了一些代码示例来说明如何确保在程序接收到终止信号时,能够平稳地释放资源、保存状态并正确退出。

1. subprocess 的 preexec_fn

当使用 subprocess.Popen 设置 preexec_fn=os.setsid 时,子进程会在一个新的会话中启动,并且该子进程及其子进程组成一个独立的进程组。由于这些进程不再属于主进程组,因此通常的 SIGINT(如按 Ctrl+C)不会终止它们。

p = subprocess.Popen('sleep 30', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, preexec_fn=os.setsid)

退出方法:

terminate_event = threading.Event()

def start_subprocess():
    p = subprocess.Popen(
        'sleep 30', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, preexec_fn=os.setsid)

    def signal_handler(sig, frame):
        print("SIGINT received, setting terminate event...")
        terminate_event.set()  # 触发事件,通知主线程

    signal.signal(signal.SIGINT, signal_handler)

    # 等待子进程完成或接收到终止事件
    while p.poll() is None:
        if terminate_event.is_set():
            print("Terminating child process...")
            os.killpg(os.getpgid(p.pid), signal.SIGTERM)
            break

    out, _ = p.communicate()  # 等待子进程结束
    print(out.decode())

# 启动子进程
try:
    start_subprocess()
except Exception as e:
    print(f"Error: {e}")

如下述代码所示,闭包中的 signal_handler 经过 signal.signal 注册后,就成为了全局的 SIGINT 信号的处理器。假设后续仍然有其他的线程代码需要处理 SIGINT 信号,则需要在该 signal_handler 中进行处理。

def start_subprocess():
    ......
    def signal_handler(sig, frame):
        print("SIGINT received, setting terminate event...")
        terminate_event.set()  # 触发事件,通知主线程

    signal.signal(signal.SIGINT, signal_handler)
    ......

一个信号量只能被注册一次,后来的注册会覆盖之前的注册。

2. ThreadPoolExecutor 的优雅退出

2.1 问题描述与猜想

当同时使用 executor.shutdown(wait=False, cancel_future=True)as_completed() 时,脚本会陷入死锁状态。

wait=False,代表不管子线程的资源回收,程序的控制权会交到主线程。

cancel_future=True,则会将剩余的线程设置为 CANCELLED 状态。

当调用 shutdown 时,线程的运行控制权,会回归到主线程的 as_completed 源码,只有当函数处于:[CANCELLED_AND_NOTIFIED, FINISHED]) 两种状态时,才会被当做 finished。而 shutdown 只会将剩余的未开始的函数设置为 CANCELLED 状态,所以 as_completed 会一直卡在 waiter.event.wait(wait_timeout) 这一行代码上,导致整个程序进入死锁状态。[1]

    fs = set(fs)
    total_futures = len(fs)
    with _AcquireFutures(fs):
        finished = set(
                f for f in fs
                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        pending = fs - finished
        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
    finished = list(finished)
    try:
        yield from _yield_finished_futures(finished, waiter,
                                           ref_collect=(fs,))

        while pending:
            if timeout is None:
                wait_timeout = None
            else:
                wait_timeout = end_time - time.monotonic()
                if wait_timeout < 0:
                    raise TimeoutError(
                            '%d (of %d) futures unfinished' % (
                            len(pending), total_futures))

            waiter.event.wait(wait_timeout)

<center> code 2-1: as_completed 源码 </center>

为了避免 as_completed 死锁状态,可以将代码切换为手动遍历 futures 对象,如代码 2-3 所示。但所需要注意的是,当在 macos 上运行 future 任务超过 7 万个时,程序在处理异常退出时,容易卡住。

with ThreadPoolExecutor(2) as pool:
    futures = [
        pool.submit(task, i) for i in range(1,10)
    ]

    for future in as_completed(futures):
        print([f._state for f in futures])
        try:
            n = future.result()
            print(f"{datetime.now()} - {n}")
        except ValueError as e:
            print(f"{datetime.now()} - EXCEPTION! {e}")
            pool.shutdown(wait=False, cancel_futures=True)

<center> code 2-2: as_completed 遍历 futures 对象 </center>

with ThreadPoolExecutor(2) as pool:
    futures = [
        pool.submit(task, i) for i in range(1,10)
    ]

    for future in futures:
        print([f._state for f in futures])
        if future.cancelled():
            continue
        try:
            n = future.result()
            print(f"{datetime.now()} - {n}")
        except ValueError as e:
            print(f"{datetime.now()} - EXCEPTION! {e}")
            pool.shutdown(wait=False, cancel_futures=True)

<center> code 2-3: 手动遍历 futures 对象 </center>

2.2 解决方案

上述结论属于在 macos 下使用 ThreadPoolExecutor 时,设置总任务数量超过 7 万条时(即 futures 超过 7 万个),猜想 1.

但通过实践,将任务数量分隔为 1 万每次,则在遇到信号量时顺畅结束,如代码 2-4 所示,猜测是 futures 越多,python3 处理器则需要花费更多的时间销毁申请的资源,但为何间隔在 7 万,比较难以理解。

在此期间为了排除是 ulimit -n 的影响,已经将 open files 设置为了 1000、5000、10000、100000 等级别,都没有任务帮助。所以排除是 open files 带来的影响。

batch_size = 10000
executor = ThreadPoolExecutor(max_workers=20)
with tqdm(total=len(numbers), miniters=1) as pbar:
    for step in range(0, len(numbers), batch_size):
        batch = numbers[step: step + batch_size]
        futures = [executor.submit(info_workers, asnumber) for number in batch]
        for future in as_completed(futures):
            future.result()
            pbar.update(1)

<center> code 2-4: 分隔任务 </center>

参考


  1. Shutting Down Python’s ThreadPoolExecutor ↩︎