Python进程间通讯与进程池超详细讲解 python进程池的作用

   2023-02-09 学习力0
核心提示:目录进程间通讯队列Queue管道Pipe进程池Pool在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。进程间通讯multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。队列Queue队列是一个可用

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

  • 若设定了超时且队列已满,会抛出queue.Full异常;
  • 队列已关闭时,抛出ValueError异常

get([block[, timeout]]):读取并删除一个元素;

  • 若设定了超时且队列为空,会抛出queue.Empty异常;
  • 队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
    try:
        for i in range(10):
            qu.put(f"{name}-{i + 1}")
            time.sleep(.1)
    except ValueError:
        print("queue closed")
    print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
    try:
        while True:
            r = qu.get()
            print(f"{name} recv: {r}")
    except ValueError:
        print("queue closed")
    print(f"{name}: get complete")
if __name__ == '__main__':
    qu = multiprocessing.Queue(100)
    puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
    gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
    list(map(lambda f: f.start(), puts))
    list(map(lambda f: f.start(), gets))
    for f in puts:
        f.join()
    print("To close")
    qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

  • send():发送消息;
  • recv():接收消息;

进程间的Pipe基于fork机制建立:

  • 主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;
  • 创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;
  • 主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。
def pipeProc(pipe):
    outPipe, inPipe = pipe
    inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
    try:
        while True:
            r = outPipe.recv()
            print("Recv:", r)
    except EOFError:
        print("RECV end")
if __name__ == '__main__':
    outPipe, inPipe = multiprocessing.Pipe()
    sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
    sub.start()
    outPipe.close() # 必须在进程成功运行后,才可关闭
    with inPipe:
        for x in range(10):
            inPipe.send(x)
            time.sleep(.1)
    print("send complete")
    sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

  • 请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;
  • 当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;
  • 池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。

multiprocessing.Pool([processes[, initializer[, initargs]]])

  • processes:要创建进程数量(默认os.cpu_count()个),在需要时才会创建;
  • initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);

Pool类中主要方法:

  • apply(func[, args[, kwds]]):以阻塞方式,从池中获取进程并执行func(*args,**kwargs)
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步方式(从池中获取一个进程)执行func(*args,**kwargs),返回AsyncResult;
  • map(func, iterable[, chunksize])/map_async:map的并行版本(可同时处理多个任务),异步时返回MapResult;
  • starmap(func, iterable[, chunksize])/starmap_async:与map的区别是允许传入多个参数;
  • imap(func, iterable[, chunksize]):map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;
  • imap_unordered(func, iterable[, chunksize]):imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;
  • close():关闭,禁止继续提交任务(已提交任务会继续执行完成);
  • terminate():立即终止所有任务;
  • join():等待工作进程完成(必须已close或terminate了);
def poolWorker():
    print(f"worker in process {os.getpid()}")
    time.sleep(1)
def poolWorkerOne(name):
    print(f"worker one {name} in process {os.getpid()}")
    time.sleep(random.random())
    return name
def poolWorkerTwo(first, second):
    res = first + second
    print(f"worker two {res} in process {os.getpid()}")
    time.sleep(1./(first+1))
    return res
def poolInit():
    print("pool init")
if __name__ == '__main__':
    workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
    with workers:
        for i in range(5):
            workers.apply_async(poolWorker)
        arg = [(i, i) for i in range(10)]
        workers.map_async(poolWorkerOne, arg)
        results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
        print("Starmap:", results.get())
        results = workers.imap_unordered(poolWorkerOne, arg)
        for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
            print("Unordered:", r)
    # 必须保证workers已close了
    workers.join()
原文地址:https://blog.csdn.net/alwaysrun/article/details/127185356
 
反对 0举报 0 评论 0
 

免责声明:本文仅代表作者个人观点,与乐学笔记(本网)无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
    本网站有部分内容均转载自其它媒体,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责,若因作品内容、知识产权、版权和其他问题,请及时提供相关证明等材料并与我们留言联系,本网站将在规定时间内给予删除等相关处理.

  • 如何在Abaqus的python中调用Matlab程序
    目录1. 确定版本信息2. 备份python3. 设置环境变量4. 安装程序5. 调试运行参考资料Abaqus2018操作系统Win10 64位Python版本2.7(路径C:\SIMULIA\CAE\2018\win_b64\tools\SMApy\python2.7)2. 备份python将上述的“python2.7”文件夹复制出来,避免因操作错误
    03-16
  • SICP:复数的直角和极坐标的表示(Python实现)
    SICP:复数的直角和极坐标的表示(Python实现)
    数据抽象屏障是控制复杂性的强有力工具,然而这种类型的数据抽象还不够强大有力。从一个另一个角度看,对于一个数据对象可能存在多种有用的表示方式,且我们希望所设计的系统能够处理多种表示形式。比如,复数就可以表示为两种几乎等价的形式:直角坐标形式(
    03-16
  • [个人发展] 我做了一个可以永远谈论任何事情的女士对话AI(TypeScript,Python)
    [个人发展] 我做了一个可以永远谈论任何事情的
    在个人发展中对话式人工智能服务 Eveki我做了虚构角色1这是一项以人工智能为特色的服务,可以再现并享受自然对话。这一次,作为第一个艾小姐发表了。请先尝试实物。服务概览与人工智能对话基本上只需输入您的信息是。对话是用女士的语言进行的,就像人类一样
    03-08
  • ruby写爬虫 ruby python
    ruby写爬虫 ruby python
    http://www.javaeye.com/topic/545160爬虫性能比较http://www.rubyrailways.com/data-extraction-for-web-20-screen-scraping-in-rubyrails/srcapihttp://huacnlee.com/blog/ruby-scrapi-collect-koubei  2009年4月22日 星期三用ruby写的一个网络爬虫程序前
    03-08
  • sf02_选择排序算法Java Python rust 实现
    Java 实现package common;public class SimpleArithmetic {/** * 选择排序 * 输入整形数组:a[n] 【4、5、3、7】 * 1. 取数组编号为i(i属于[0 , n-2])的数组值 a[i],即第一重循环 * 2. 假定a[i]为数组a[k](k属于[i,n-1])中的最小值a[min],即执行初始化 min =i
    02-09
  • Python vs Ruby: 谁是最好的 web 开发语言?
    Python 和 Ruby 都是目前用来开发 websites、web-based apps 和 web services 的流行编程语言之一。 这两种语言在许多方面有相似之处。它们都是高级的面向对象的编程语言,都是交互式脚本语言、都提供标准库且支持持久化。但是,Python 和 Ruby 的解决方法却
    02-09
  • 详解Python手写数字识别模型的构建与使用
    详解Python手写数字识别模型的构建与使用
    目录一:手写数字模型构建与保存1 加载数据集2 特征数据 标签数据3 训练集 测试集4 数据流图 输入层5 隐藏层6 损失函数7 梯度下降算法8 输出损失值 9 模型 保存与使用10 完整源码分享二:手写数字模型使用与测试一:手写数字模型构建与保存1 加载数据集# 1加
  • Python asyncore socket客户端实现方法详解
    Python asyncore socket客户端实现方法详解
    目录介绍1.定义类并且继承 asyncore.dispatcher2.实现类中的回调代码调用父类方法创建socket对象连接服务器3.创建对象并且执行asyncore.loop进入运行循环服务端示例代码运行结果注意介绍asyncore库是python的一个标准库,提供了以异步的方式写入套接字服务的
  • Python+Sklearn实现异常检测
    目录离群检测 与 新奇检测Sklearn 中支持的方法孤立森林 IsolationForestLocal Outlier FactorOneClassSVMElliptic Envelope离群检测 与 新奇检测很多应用场景都需要能够确定样本是否属于与现有的分布,或者应该被视为不同的分布。离群检测(Outlier detectio
  • Python基础教程之while循环用法讲解 Python中的while循环
    Python基础教程之while循环用法讲解 Python中的
    目录1.while 循环2.无限循环3、while 循环使用 else 语句4、简单语句组附小练习:总结1.while 循环Python 中 while 语句的一般形式:while 判断条件(condition):    执行语句(statements)……执行流程图如下:同样需要注意冒号和缩进。另外,在 Python 中
点击排行