python multiprocessing

原创 ryan007  2017-04-16 01:07  阅读 579 次
好久都想写一篇多进程多线程的博客,今晚终于有时间了,由浅入深介绍一下,综合之前看到的所有的
所见所闻,略作总结,思想深度有限,仅用来抛砖引玉。

Python 解释器有一个全局解释器锁(PIL),导致每个 Python 进程中最多同时运行一个线程,因此 Python 多线程程序并不能改善程序性能,不能发挥多核系统的优势,可以通过这篇文章了解。

一,多进程:multiprocessing

由于Python设计的限制(我说的是咱们常用的CPython)。最多只能用满1个CPU核心。Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换,超简单,just try。

multiprocessing模块

multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。

属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

1、新建一进程

import multiprocessing
import time

def hello(msg):
    for i in xrange(3):
	    print msg
	    time.sleep(1)

if __name__ == "__main__":
    p = multiprocessing.Process(target=hello, args=("hello", ))
    p.start()
    p.join()
    print "Sub-process done."

2、使用进程池

是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。注意要用apply_async,如果落下async,就变成阻塞版本了。

import multiprocessing
import time

def hello(msg):
    for i in xrange(3):
	    print msg
	    time.sleep(1)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    for i in xrange(10):
        msg = "hello %d" %(i)
        pool.apply_async(hello, (msg, ))
    pool.close()
    pool.join()
    print "Sub-process(es) done."

如果需要获取结果,改进上面的代码:

import multiprocessing
import time

def hello(msg):
    for i in xrange(3):
	    print msg
	    time.sleep(1)
    return "done " + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = [] #存放结果
    for i in xrange(10):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(hello, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print res.get() #获取结果
    print "Sub-process(es) done."

 

 

 

二,多线程:multithread

 

Threading模块 包括Thread、Condition、Event、Lock、Rlock、Semaphore等类。

2.1、Thread类可以实例化一个线程t,(target=) t.start()

Thread方法如下:

getName:返回线程t的名称、setName设置线程t的名称

isAlive:判断一个线程是否是活动的,也就是线程的状态在t.start和t.run之间

isDaemon/setDaemon:如果线程t是一个驻留程序   setDaemon(True)

join:其他调用线程(非t)会直接挂起,直到t结束,只能在t.start之后调用t.join

start:让线程t活动

run:run是运行线程t主函数的方法,thread的子类通常会覆盖run,如果不被覆盖,run将调用在创建t时传递的target参数

 

先看一个例子,或者这个例子:http://www.cnblogs.com/fnng/p/3670789.html:

#-*- encoding:utf-8 -*-
import threading
from time import ctime
from random import randint 
from time import sleep 
from Queue import Queue

class ThreadTest(threading.Thread):
    def __init__(self,func,args,name=''):
        threading.Thread.__init__(self)
        self.func=func
        self.args=args
        self.name=name

    def getResult(self):
        return self.res
    def run(self):
        print('starting',self.name,'at',ctime())
        self.res=self.func(self.args[0],self.args[1])  ####此处需要修改
        print(self.name,'finished at:',ctime())

 

def writeQ(queue):
    print('producing object for Q...')
    queue.put('xxx',1)
    print('size now',queue.qsize())

def readQ(queue):
    val=queue.get(1)
    print('consumed object from Q...size now',queue.qsize())

def write(queue,loops):
    for i in range(loops):
        writeQ(queue)
        sleep(randint(1,3))

def reader(queue,loops):
    for i in range(loops):
        readQ(queue)
        sleep(randint(2,5))

funcs=[write,reader]
nfunc=range(len(funcs))

def main():
    nloops=randint(2,5)
    q=Queue(32)
    threads=[]
    for i in nfunc:
        t=ThreadTest(funcs[i],(q,nloops),funcs[i].__name__)
        threads.append(t)

    for i in nfunc:
        threads[i].start()

    for i in nfunc:
        threads[i].join()

    print('All Done')

if __name__=='__main__':
    main()

 

以下是一些基本观点和概念:

1.多线程采用的是分时复用技术,即不存在真正的多线程,cpu做的事是快速地切换线程,以达到类似同步运行的目的,因为高密集运算方面多线程是没有用的,但是对于存在延迟的情况(延迟IO,网络等)多线程可以大大减少等待时间,避免不必要的浪费。

2.原子操作:这件事情是不可再分的,如变量的赋值,不可能一个线程在赋值,到一半切到另外一个线程工作去了……但是一些数据结构的操作,如栈的push什么的,并非是原子操作,比如要经过栈顶指针上移、赋值、计数器加1等等,在其中的任何一步中断,切换到另一线程再操作这个栈时,就会产生严重的问题,因此要使用锁来避免这样的情况。比如加锁后的push操作就可以认为是原子的了……

3.阻塞:所谓的阻塞,就是这个线程等待,一直到可以运行为止。最简单的例子就是一线程原子操作下,其它线程都是阻塞状态,这是微观的情况。对于宏观的情况,比如服务器等待用户连接,如果始终没有连接,那么这个线程就在阻塞状态。同理,最简单的input语句,在等待输入时也是阻塞状态。

4.在创建线程后,执行p.start(),这个函数是非阻塞的,即主线程会继续执行以后的指令,相当于主线程和子线程都并行地执行。所以非阻塞的函数立刻返回值的~

 

对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列。

本文地址:http://it.zhongduwang.com/articles/python-multiprocessing-thread
版权声明:本文为原创文章,版权归 ryan007 所有,欢迎分享本文,转载请保留出处!

发表评论


表情