Python多线程多进程

2021/9/20 7:09:08

本文主要是介绍Python多线程多进程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

多线程多进程

基本概念
  1. 并发与并行

    并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一时刻点上只有一个程序在处理机上运行。例如吃饭的时候打电话,你只能吃完一口饭,再说一句话,再吃一口饭,再说一句话,并发说明你有处理多个任务的能力,不一定要同时。

    并行:指的是系统具有同时处理多个任务的能力。例如上面的吃饭打电话的例子,你可以一边吃饭一边说话。

  2. 同步与异步

    同步和异步关注的是消息通信机制。同步就是调用消息,调用方必须等到这个消息返回结果才能继续往后执行。异步与同步相反,调用方不会等待调用消息返回的结果,而是通过调用发出后,调用者可以继续执行后续操作。消息调用完成后通知调用者。

    同步:当进程执行IO(等待外部数据)的时候,程序进行等待。

    异步:当进程执行IO的时候,进程可以执行其他任务,一直等到数据接受成功。

  3. 阻塞与非阻塞

    阻塞与非阻塞指的是执行一个操作是等操作结束再返回,还是立即返回。对于同步和异步的事件,阻塞和非阻塞都是可以的。非阻塞有两种方式:主动查询和被动接受消息。

python GIL

GIL官方解释翻译:在Cpython解释器中,同一个进程下开启多线程,同一时刻只能有一个现成执行,无法利用多核优势。GIL锁本质是一把互斥锁,将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务修改,进而保证数据安全。保护不同数据安全,应该加不同的锁。

每执行一个Python程序,就是开启一个进程,在一个python进程内,不仅有其主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器界别的线程。所有线程都运行在这一个进程内。因此:

  1. 所有数据都是共享的,这其中代码作为一种数据也是被所有线程共享的。
  2. 所有线程的任务,都需要将任务代码当做参数传给解释器代码去执行。即所有线程要想运行自己的任务,首先需要解决的是能够访问解释器代码。

有了GIL,Python的进程可以利用多核,但是开销大,同时多线程开销小,但是无法利用多核优势。

python的多线程是假的多线程,Python解释器虽然可以开启多个线程,但是同一时间只有一个线程能在解释器中执行,GIL锁的存在使得CPU资源同一时间只会给一个线程使用。如果是I/O密集型任务即使开再多进程也没有用,所以可以利用Python多线程。如果是计算密集型任务,可以直接使用多进程。

要理解GIL的含义,我们需要从Python的基础讲起。像C++这样的语言是编译型语言,所谓编译型语言,是指程序输入到编译器,编译器再根据语言的语 法进行解析,然后翻译成语言独立的中间表示,最终链接成具有高度优化的机器码的可执行程序。编译器之所以可以深层次的对代码进行优化,是因为它可以看到整 个程序(或者一大块独立的部分)。这使得它可以对不同的语言指令之间的交互进行推理,从而给出更有效的优化手段。

与此相反,Python是解释型语言。程序被输入到解释器来运行。解释器在程序执行之前对其并不了解;它所知道的只是Python的规则,以及在执行过程 中怎样去动态的应用这些规则。它也有一些优化,但是这基本上只是另一个级别的优化。由于解释器没法很好的对程序进行推导,Python的大部分优化其实是 解释器自身的优化。

现在我们来看一下问题的症结所在。要想利用多核系统,Python必须支持多线程运行。作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。

那么,不同线程同时访问时,数据的保护机制是怎样的呢?答案是解释器全局锁。从名字上看能告诉我们很多东西,很显然,这是一个加在解释器上的全局(从解释器的角度看)锁(从互斥或者类似角度看)。这种方式当然很安全,但是它有一层隐含的意思(Python初学者需要了解这个):对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。

”为什么我全新的多线程Python程序运行得比其只有一个线程的时候还要慢?“许多人在问这个问题时还是非常犯晕的,因为显然一个具有两个线程的程序要比其只有一个线程时要快(假设该程序确实是可并行的)。事实上,这个问题被问得如此频繁以至于Python的专家们精心制作了一个标准答案:”不要使用多线程,请使用多进程”。

所以,对于计算密集型的,我还是建议不要使用python的多线程而是使用多进程方式,而对于IO密集型的,还是劝你使用多进程方式,因为使用多线程方式出了问题,最后都不知道问题出在了哪里,这是多么让人沮丧的一件事情!

多线程

让一个进程同时执行一段代码,用起来类似于多进程,但是区别在于线程与线程之间能够共享资源。python不太推荐用多线程,因为GIL的存在。推荐使用multiprocessing或者concurrent.futures.ProcessPoolExecutor。但是如果想要同时运行多个I/O密集型任务,多线程仍然是一个合适的模型。线程之间的对于共享进程的数据需要考虑线程安全的问题,由于进程之间是隔离的,拥有独立的内存空间资源,相对比较安全。

多线程官网地址:python多线程参考文档

多进程

通信方式:管道,FIFO,消息队列,信号,共享内存,socket,stream流。同步方式是PV信号量,管程。

多进程官网地址:python3.8 多进程,python3.8 多进程共享内存

协程

协程运行与线程之上,当一个协程完成后,可以选择主动让出,让另一个协程运行在当前线程上。协程并没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多核协程,而且协程的切换在用户态完成,切换的代价比线程从用户态到内核态的代价小很多。最有效的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

协程最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。其次协程不需要多线程的机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

gevent官网

简单的协程例子:

import time

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'

def produce(c):
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)
实现方式
import threading
import time

def func(arg):
    time.sleep(0.5)
    print('%s running....' % arg)
  1. threading实现多线程

    t1 = threading.Thread(target=func, args=('This is thread 1', ))
    t2 = threading.Thread(target=func, args=('This is thread 2', ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('This is a main function')
    

    继承threading.Thread,并重写run

    # 继承threading.Thread 并重写run
    class CutomerThread(threading.Thread):
        def __init__(self, thread_name):
            # step 1: CALL BASE __init__ function
            super(CutomerThread, self).__init__(name=thread_name)
            self.__name = thread_name
        def run(self):
            # step 2: override run function
            time.sleep(0.5)
            print('%s running...\n' % self.__name)
    t1 = CutomerThread("thread 1")
    t2 = CutomerThread("thread 2")
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('this is main function')
    

    threading.Thread提供的线程对象方法和属性:

    • start()::创建线程后通过start启动线程,等待CPU调度,为run函数执行做准备。
    • run():线程开始执行的入口函数,函数体中会调用用户编写的target函数,或者执行被重载的run函数。
    • join([timeout]):阻塞挂起调用该函数的线程,知道被调用线程执行完成或超时。通常会在主线程中调用该方法,等待其他线程执行完成。
    • name,getName()&setName():线程名称相关的操作。
    • isAlive(), is_alive():start函数执行之后到run函数执行完之前都为True
    • demon、isDaemon()&setDaemon():守护线程相关。
  2. multiprocess实现多进程

    from multiprocessing import Process
    import os, time
    
    def pstart(name):
        print('Process name: %s, pid: %s'%(name, os.getpid()))
    subproc =Process(target=pstart, args=('subprocess', ))
    subproc.start()
    subproc.join()
    print(f'subprocess pid: {subproc.pid}')
    print(f'current process pid: {os.getpid()}')
    

    继承方式实现多进程

    # 继承方式创建多进程
    from multiprocessing import Process
    import os, time
    
    class CustomerProcess(Process):
        def __init__(self, p_name, target=None):
            super(CustomerProcess, self).__init__(name = p_name, target = target, args=(p_name, ))
        def run(self):
            print(f'Custom Process name: {self.name}, {os.getpid()}')
    p1 = CustomerProcess('process_1')
    p1.start()
    p1.join()
    print(f"subprocess pid: {p1.pid}")
    print(f"current process pid: {os.getpid()}" )
    
  3. gevent实现协程

    from gevent import monkey; monkey.patch_socket()
    import gevent
    
    def f(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
    
    g1 = gevent.spawn(f, 5)
    g2 = gevent.spawn(f, 5)
    g3 = gevent.spawn(f, 5)
    g1.join()
    g2.join()
    g3.join()
    
    import gevent
    
    def f1():
        for i in range(5):
            print(f'f1:{i}')
            gevent.sleep(0)
    
    def f2():
        for i in range(10):
            print(f'f2:{i}')
            gevent.sleep(0)
    
    t1 = gevent.spawn(f1)
    t2 = gevent.spawn(f2)
    gevent.joinall([t1, t2])
    
  4. concurrent实现并发

    异步执行可以由ThreadPoolExecutor使用线程或者由ProcessPoolExecutor使用单独的进程来实现。

    • Executor对象

      class concurrent.futures.Executor

      1. submit(fn, *args, **kwargs):fn表示要执行的函数,后面两个表示参数。方法执行返回Future对象表示可调用对象的执行。

        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(pow, 323, 1235)
            print(future.result())
        
      2. map(func, *iterables, timeout = None, chunksize=1)

        类似python的map高级函数,但是iterales是立即执行而不是延迟执行的;func是异步执行的,对func的多个调用可以并发执行。如果设置了timeout,当超时的时候,会报concurrent.futures.TimeoutError。timeout可以是整数或者浮点数。

        使用ProcessPoolExecutor时,这个方法会将iterables分割任务快并作为独立的任务并提交到执行池中。这些块的大概数量可以由chunksize指定正整数设置。

      3. shutrdown(wait=True)

        当待执行的future对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。在关闭后调用Executor.submit()和Executor.map()将会引发RuntimeError。

        如果wait为True,则此方法只有在所有待执行的future对象完成执行且释放已分配的资源后才会返回。使用with语句可以避免显式调用这个方法。

    • ThreadPoolExecutor对象

      ThreadPoolExecutor是Executor的子类,它使用线程池来异步执行调用。class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=’’, initializer=None, initargs=())

      from concurrent import futures
      import time
      
      def func1(n):
          print(f'this is func1')
          time.sleep(2)
          return i
      
      fs = []
      with futures.ThreadPoolExecutor(max_workers=3) as executor:
          # for i in range(3):
          #    future = executor.submit(func1, i)
          #    fs.append(future) 
          result = [executor.submit(func1, i) for i in range(3)]
          for future in futures.as_completed(result):
              print(future.result()) 
      
    • ProcessPoolExecutor对象

      ProcessPoolExecutor类是Executor的子类,它使用进程池来异步地执行调用。它可以绕过全局解释器锁,也意味着可以处理和返回可封存的对象。在juptynotebook上运行代码好像不行。

      from concurrent import futures
      import time
      
      def func1(n):
          i =1
          print(f'this is func1')
          return i
      if __name__ == '__main__':
          fs = []
          with futures.ProcessPoolExecutor(max_workers=3) as executor:
              # for i in range(3):
              #    future = executor.submit(func1, i)
              #    fs.append(future) 
              result = [executor.submit(func1, i) for i in range(3)]
              for future in futures.as_completed(result):
                  print(future.result())
      
    • Future对象

      Future类可将调用对象封装为异步执行。

      class concurrent.futures.Future:

      • cancel():尝试取消调用。如果去掉正在执行或已结束云顶不能取消,则该方法将返回False,否则调用会被取消并且该方法将返回True。
      • cancelled():如果成功调用则返回True
      • running():如果调用正在执行而且不能被取消那么返回Ture
      • done():如果调用已被取消或正常结束那么返回True
      • result():返回调用返回值。


这篇关于Python多线程多进程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程