并发编程(一) - threading

2021/6/14 12:50:55

本文主要是介绍并发编程(一) - threading,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

python的GIL导致python的并发不同于java,原因不说,下面直接说解决方案

concurrent.futures库提供了一个ProcessPoolExecutor` 类, 可被用来在一个单独的Python解释器中执行计算密集型函数

threading库 对I/O密集型接口做多线程并发

asyncio(协程) IO 密集型,主流的高性能并发库

threading

通过threading.Thread实现

# @Time    :  '2021-6-9 19:49'
# @Author  :  'pc.kang'

# 主线程与子线程
import threading
import time
def run(n):
    print('task',n,threading.current_thread())#threading.current_thread()打印当前线程
    time.sleep(2)
    print('task done',n)

for i in range(10):#循环里边全是子线程,t-1是子线程,不是主线程
    t= threading.Thread(target=run,args=('t-%s'%i,))
    t.start()

#threading.current_thread()打印当前线程
print('主线程就是程序本身,这个程序一共有11个线程:10个子线程,1个主线程',threading.current_thread())#主线程

print('查看当前活动线程的个数:',threading.active_count())

单线程

import threading

def f():
   print('我是一个函数:')

t = threading.Thread(target=f)
t.start()

多线程

多线程只需要通过循环创建多个线程,并循环启动线程执行就好

import threading,time

def f():
   print('我是第%s个线程函数' %threading.current_thread())

def many_thread():
   threads = []
   for _ in range(10):
      t = threading.Thread(target=f)
      threads.append(t)
   for t in threads:
      t.start()

if __name__ == '__main__':
   print('我是主线程:%s'%threading.current_thread())
    many_thread()

通过循环创建10个线程,并且执行了10次线程函数,但需要注意的是python的并发并非绝对意义上的同时处理,
因为启动线程是通过循环启动的,还是有先后顺序的,通过执行结果的时间可以看出还是有细微的差异,但可以忽略不记。
当然如果线程过多就会扩大这种差异。可以启动500个线程看下

import threading
from datetime import datetime


def thread_func():  # 线程函数
    print('我是一个线程函数', datetime.now())


def many_thread():
    threads = []
    for _ in range(500):  # 循环创建500个线程
        t = threading.Thread(target=thread_func)
        threads.append(t)
    for t in threads:  # 循环启动500个线程
        t.start()


if __name__ == '__main__':
    start = datetime.today().now()
    many_thread()
    duration = datetime.today().now() - start
    print(duration)

500个线程共执行了大约0.11秒

那么针对这种问题我们该如何优化呢?我们可以创建25个线程,每个线程执行20次线程函数,
这样在启动下一个线程的时候,上一个线程已经在循环执行了,这样就大大减少了并发的时间差异

import threading
from datetime import datetime


def thread_func():  # 线程函数,创建子线程
    print('我是一个线程函数', datetime.now())


def execute_func():
    for _ in range(20): # 循环20次
        thread_func()


def many_thread():
    start = datetime.now()
    threads = []
    for _ in range(25):  # 循环创建25个线程
        t = threading.Thread(target=execute_func)
        threads.append(t)
    for t in threads:  # 循环启动25个线程
        t.start()
    duration = datetime.now() - start
    print(duration)

if __name__ == '__main__':
    many_thread()  # 主线程,先启动主线程main(),然后执行线程函数子线程

守护线程

默认不设置的情况下是没有守护线程的,主线程执行完毕后,会等待子线程全部执行完毕,才会关闭结束程序

这里会有一个问题,理论上时间会打印10个线程x10次,但是现在看有些线程会超过10次,有些线程会不到10次

import threading
from datetime import datetime
import time


def thread_func():  # 线程函数
   time.sleep(2)
   i = 0
   while(i < 10):
      print(datetime.now())
      i += 1

def many_thread():
   threads = []
   for _ in range(10):
      t = threading.Thread(target=thread_func)
      threads.append(t)
   for t in threads:
      t.start()


# 可以看到主线程打印了“thread end”之后(主线程结束),子线程还在继续执行,并未随着主线程的结束而结束
if __name__ == '__main__':
   many_thread()
   print("thread end")

守护线程:子线程会随着主线程的结束而结束,无论子线程是否执行完毕
即当主线程执行完毕之后,所有的子线程也被关闭(无论子线程是否执行完成)。
下面通过 setDaemon方法给子线程添加守护线程,我们把循环改为死循环,再来看看输出结果(注意守护线程要加在start之前)

import threading
from datetime import datetime
def thread_func():  # 线程函数
    i = 0
    while(1):
        print(datetime.now())
        i += 1

def many_thread():
    threads = []
    for _ in range(10):
        t = threading.Thread(target=thread_func)
        threads.append(t)
        t.setDaemon(True)  # 给每个子线程添加守护线程
    for t in threads:
        t.start()


if __name__ == '__main__':
    many_thread()
    print("thread end")

通过结果我们可以发现,主线程关闭之后子线程也会随着关闭,并没有无限的循环下去,这就像程序执行到一半强制关闭执行一样,看似暴力却很有用,如果子线程发送一个请求未收到请求结果,那不可能永远等下去,这时候就需要强制关闭。
所以守护线程解决了主线程和子线程关闭的问题。

上面的厘子都是主线程先执行完毕打印后面的“thread end”,然后等待子线程执行完毕
下面通过加入阻塞线程,会看到一个效果:主线程等待子线程执行完毕,然后主线程继续执行打印thread end。

阻塞线程:

主线程会等待子线程的执行结束,才继续执行(控制子线程和主线程的执行顺序)
上面说了守护线程的作用,那么有没有别的方法来解决上述问题呢?

其实是有的,那就是阻塞线程,这种方式更加合理,

使用join()方法阻塞线程,让主线程等待子线程执行完成之后再往下执行,再关闭所有子线程,而不是只要主线程结束,不管子线程是否执行完成都终止子线程执行。
下面我们给子线程添加上join()(join要加到start之后)

import threading
from datetime import datetime
import time


def thread_func():  # 线程函数
   time.sleep(1)
   i = 0
   while(i < 11):
      print(datetime.now())
      i += 1

def many_thread():
   threads = []
   for _ in range(10):
      t = threading.Thread(target=thread_func)
      threads.append(t)
      t.setDaemon(True)
   for t in threads:
      t.start()
      # t.join()  # 每个子线程阻塞
   for t in threads:
      t.join()  # 阻塞线程

if __name__ == '__main__':
   many_thread()
   print("thread end")

对于死循环或者一直等待的情况,我们可以给join设置超时等待,我们设置join的参数为2,那么子线程会告诉主线程让其等待2秒,
如果2秒内子线程执行结束主线程就继续往下执行,
如果2秒内子线程未结束,主线程也会继续往下执行,执行完成后关闭子线程

import threading
from datetime import datetime
import time


def thread_func():  # 线程函数
    time.sleep(1)
    i = 0
    while(1):
        print(datetime.now())
        i += 1

def many_thread():
    threads = []
    for _ in range(5):
        t = threading.Thread(target=thread_func)
        threads.append(t)
        t.setDaemon(True)  # 给每个子线程添加守护线程
    for t in threads:
        t.start()
    for t in threads:
        t.join(2)  # 设置子线程超时2秒

if __name__ == '__main__':
    many_thread()
    print("thread end")

通过multiprocessing.dummy.Pool线程池实现

通过asyncio实现高并发



这篇关于并发编程(一) - threading的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程