RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1607058
Accepted
Flevtek
Flevtek
Asked:2025-02-17 05:52:50 +0000 UTC2025-02-17 05:52:50 +0000 UTC 2025-02-17 05:52:50 +0000 UTC

使用 pathos.multiprocessing.ProcessPool 作为使用其自身实例的函数

  • 772

是否可以在包含同一池的另一个管道调用的函数上使用pool.apipe?在这两种情况下,我都使用池来实现功能而无需等待它们完成。以下是我所想象的(最小可重现示例):

from pathos.multiprocessing import ProcessingPool as Pool
from sympy import sympify
import pyautogui as auto
import keyboard as k

class Worker:
    def __init__(self):
        self.pool = Pool(2)
    def execute(self, code):
        def execute_start(code):
            def timeout_risk(val):
                return sympify(val, rational=True)
            def expr(val):
                val = eval(f"f'{val}'")
                task = self.pool.apipe(timeout_risk, val)
                res = task.get(timeout=0.25)
                return res
            try:
                exec(code)
                return 200
            except Exception as ex:
                return ex.args[0] if ex.args else type(ex)
        task = self.pool.apipe(execute_start, code)
        return task.get()
    def stop(self):
        self.pool.terminate()
        self.pool.restart()

def check(e):
    if k.is_pressed('ctrl+q'):
        executor.stop()
    elif k.is_pressed('ctrl+1'):
        res = executor.execute('while True:\n\tauto.moveTo(expr("10**2"), 0)')
        print(res)

if __name__ == '__main__':
    k.on_press(check)
    executor = Worker()
    k.wait()

但这样的尝试,最终一切都归结于daemonic processes are not allowed to have children。帮我解决这个问题。没有必要使用 pathos,主要的是概念起作用并且在声明类时创建池。如果有人知道更好的方法来实现这一点,我会很高兴看到它。

python
  • 1 1 个回答
  • 57 Views

1 个回答

  • Voted
  1. Best Answer
    Pak Uula
    2025-02-17T15:47:43Z2025-02-17T15:47:43Z

    这样做是故意的,以便在出现错误的情况下不会生成无限的进程树。

    但绕过这个限制是可能的。要做到这一点,你需要两个技巧:

    • daemon创建一个忽略该属性的流程类型
    • 拦截池中新进程的创建。
    import logging
    
    from multiprocess.pool import Pool as PythonPool
    import multiprocess.context
    
    import pathos.multiprocessing as mp
    
    DefaultProcessFactory = PythonPool.Process
    
    _clazz_cache = {}
    
    class NoDaemonMixin:
        def _get_daemon(self):
            logging.getLogger("nodaemon.class").debug("NoDaemon._get_daemon()")
            return False
        def _set_daemon(self, value):
            logging.getLogger("nodaemon.class").debug("NoDaemon._set_daemon(%s) - ignored", value)
        daemon = property(_get_daemon, _set_daemon)
    
    if hasattr(multiprocess.context, "SpawnProcess"):
        class NoDaemonSpawnProcess(NoDaemonMixin, multiprocess.context.SpawnProcess):
            pass
        _clazz_cache[multiprocess.context.SpawnProcess] = NoDaemonSpawnProcess
    
    if hasattr(multiprocess.context, "ForkProcess"):
        class NoDaemonForkProcess(NoDaemonMixin, multiprocess.context.ForkProcess):
            pass
        _clazz_cache[multiprocess.context.ForkProcess] = NoDaemonForkProcess
    
    if hasattr(multiprocess.context, "ForkServerProcess"):
        class NoDaemonForkServerProcess(NoDaemonMixin, multiprocess.context.ForkServerProcess):
            pass
        _clazz_cache[multiprocess.context.ForkServerProcess] = NoDaemonForkServerProcess
    

    此代码为标准库中用于各种线程生成策略的类创建非守护进程包装器。有必要检查类的存在,因为multiprocess.context不同操作系统的类集不同。

    现在如何拦截子进程的生成。

    它在内部pathos.multiprocessing使用,因此您需要拦截对产生新进程的multiprocess.pool.Pool静态方法的调用。Pool.Process由于某种原因,简单的上下文替换是不够的 - 参数编组器停止工作pool.map

    from multiprocess.pool import Pool as PythonPool
    
    DefaultProcessFactory = PythonPool.Process
    
    
    def _process(_, ctx, *args, **kwargs):
        log = logging.getLogger("nodaemon.factory")
        log.debug("MyPool.Process(): ctx=%s, args=%s, kwargs=%s", ctx, args, kwargs)
        if ctx.Process in _clazz_cache:
            wrap_clz = _clazz_cache[ctx.Process]
        else:
            raise TypeError(f"Unsupported process class: {ctx.Process}")
                
        try:
            log.debug("Creating process instance: process class %s", wrap_clz)
            return wrap_clz(*args, **kwargs)
        finally:
            log.debug("MyPool.Process() done")
    
    
    class MyProcessingPool(mp.ProcessingPool):
        def _serve(self, nodes=None):
            log = logging.getLogger("nodaemon.pool")
            
            log.debug("MyProcessingPool._serve(nodes=%s)", nodes)
    
            restore = PythonPool.Process is not _process
            PythonPool.Process = _process
            try:
                return super(MyProcessingPool, self)._serve(nodes)
            finally:
                if restore:
                    log.debug("Restoring PythonPool.Process factory")
                    PythonPool.Process = DefaultProcessFactory
    
        def restart(self, force = False):
            log = logging.getLogger("nodaemon.pool")
    
            log.debug("MyProcessingPool.restart(force=%s)", force)
    
            restore = PythonPool.Process is not _process
            PythonPool.Process = _process
            try:
                return super().restart(force=force)
            finally:
                if restore:
                    log.debug("Restoring PythonPool.Process factory")
                    PythonPool.Process = DefaultProcessFactory
    

    工作原理:(完整示例)

    def sub_worker(x):
        print(f"sub_worker({x=})")
        return x * 2
    
    def worker(max_num):
        print(f"worker({max_num=})")
        
        sub_pool = MyProcessingPool()
        try:
            return sub_pool.map(sub_worker, range(max_num))
        finally:
            sub_pool.close()
            sub_pool.join()
            
    def main():
        # logging.basicConfig(level=logging.DEBUG)
        # logging.getLogger("nodaemon").setLevel(logging.INFO)
        pool = MyProcessingPool()
        result = pool.map(worker, [1, 2, 3, 4])
        print(result)
        pool.close()
        pool.join()
    

    结论:

    worker(max_num=1)
    worker(max_num=2)
    worker(max_num=3)
    worker(max_num=4)
    sub_worker(x=0)
    sub_worker(x=0)
    sub_worker(x=1)
    sub_worker(x=2)
    sub_worker(x=3)
    sub_worker(x=0)
    sub_worker(x=1)
    sub_worker(x=0)
    sub_worker(x=1)
    sub_worker(x=2)
    [[0], [0, 2], [0, 2, 4], [0, 2, 4, 6]]
    

    然而,尽管有可能在池中生成非守护进程,但仍应非常非常小心地使用。

    首先,对普罗斯工厂的拦截非常脆弱。如果同时使用多种类型的池,肯定会出现问题。

    其次,如果孩子不小心生成了垃圾信息,可能会造成整个系统崩溃,导致系统完全减速,甚至需要重新启动。

    • 1

相关问题

  • 是否可以以某种方式自定义 QTabWidget?

  • telebot.anihelper.ApiException 错误

  • Python。检查一个数字是否是 3 的幂。输出 无

  • 解析多个响应

  • 交换两个数组的元素,以便它们的新内容也反转

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    我看不懂措辞

    • 1 个回答
  • Marko Smith

    请求的模块“del”不提供名为“default”的导出

    • 3 个回答
  • Marko Smith

    "!+tab" 在 HTML 的 vs 代码中不起作用

    • 5 个回答
  • Marko Smith

    我正在尝试解决“猜词”的问题。Python

    • 2 个回答
  • Marko Smith

    可以使用哪些命令将当前指针移动到指定的提交而不更改工作目录中的文件?

    • 1 个回答
  • Marko Smith

    Python解析野莓

    • 1 个回答
  • Marko Smith

    问题:“警告:检查最新版本的 pip 时出错。”

    • 2 个回答
  • Marko Smith

    帮助编写一个用值填充变量的循环。解决这个问题

    • 2 个回答
  • Marko Smith

    尽管依赖数组为空,但在渲染上调用了 2 次 useEffect

    • 2 个回答
  • Marko Smith

    数据不通过 Telegram.WebApp.sendData 发送

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5