并发编程4
2022/4/22 1:12:39
本文主要是介绍并发编程4,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录- GIL与普通互斥锁区别
- 验证多线程是否有用
- 单个CPU
- 多个CPU
- 死锁现象
- 信号量与event事件
- 信号量
- event事件
- 进程池与线程池
- 线程池
- 进程池
- 协程
- 基于协程实现TCP服务端并发
- 服务端
- 客户端
GIL与普通互斥锁区别
# 验证GIL的存在 from threading import Thread count = 100 def task(): global count count -= 1 for i in range(100): # 创建一百个线程 t = Thread(target=task) t.start() print(count) # 0 线程的执行不是同时进行的,没有造成数据错乱 # 验证不同数据加不同锁 from threading import Thread, Lock import time count = 100 mutex = Lock() def task(): global count # global关键字声明 mutex.acquire() temp = count time.sleep(0.1) count = temp - 1 mutex.release() t_list = [] current_time = time.time() for i in range(100): # 创建一百个线程 t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(count) # 100-0 # 等待所有的线程运行完毕再打印money print(count, time.time() - current_time) # 0 10.867115020751953
验证多线程是否有用
单个CPU
多个IO密集型任务
多进程:浪费资源 无法利用多个CPU
多线程:节省资源 切换+保存状态
多个计算密集型任务
多进程:耗时更长 创建进程的消耗+切换消耗
多线程:耗时较短 切换消耗
多个CPU
多个IO密集型任务
多进程:浪费资源 多个CPU利用率低
多线程:节省资源 切换+保存状态
多个计算密集型任务
多进程:利用多核 速度更快
多线程:速度较慢
# 计算密集型 from threading import Thread from multiprocessing import Process import os import time def work(): res = 1 for i in range(1, 2000000): res += i print(res) if __name__ == '__main__': print(os.cpu_count()) # 4 查看当前计算机CPU个数 # process_start_time = time.time() # p_list = [] # for i in range(4): # p = Process(target=work) # p.start() # p_list.append(p) # for p in p_list: # p.join() # print('多进程总耗时:%s' % (time.time() - process_start_time)) # 总耗时:1.0024194717407227 thread_start_time = time.time() t_list = [] for i in range(4): t = Thread(target=work) t.start() t_list.append(t) for t in t_list: t.join() print('多线程总耗时:%s' % (time.time() - thread_start_time)) # 总耗时:0.753563642501831
# IO密集型 from threading import Thread from multiprocessing import Process import time def work(): time.sleep(1) # 模拟IO操作 if __name__ == '__main__': thread_start_time = time.time() t_list = [] for i in range(100): t = Thread(target=work) t.start() for t in t_list: t.join() print('多线程总耗时:%s' % (time.time() - thread_start_time)) # 总耗时:0.017986774444580078 process_start_time = time.time() p_list = [] for i in range(100): p = Process(target=work) p.start() for p in p_list: p.join() print('多进程总耗时:%s' % (time.time() - process_start_time)) # 总耗时:0.6566195487976074
死锁现象
即便设定了如何抢锁和放锁也会产生死锁现象
from threading import Thread, Lock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(f'{self.name}抢到了A锁') mutexB.acquire() print(f'{self.name}抢到了B锁') mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(f'{self.name}抢到了B锁') time.sleep(2) mutexA.acquire() print(f'{self.name}抢到了A锁') mutexA.release() mutexB.release() for i in range(20): t = MyThread() t.start()
信号量与event事件
信号量
from threading import Thread, Semaphore import time import random sp = Semaphore(5) # 创建一个有五个停车位的停车场 def task(car_num): sp.acquire() # 抢锁 print('%s正在倒车入库' % car_num) time.sleep(random.randint(1, 5)) sp.release() # 放锁 print(f'{car_num}开走了') for i in range(1, 10): t = Thread(target=task, args=('车辆沪B0000%s' % i, )) t.start()
event事件
from threading import Thread, Event import time event = Event() # 类似于造了一个红绿灯 def light(): print('红灯亮,所有车驻车等待') time.sleep(30) print('绿灯亮,可以通行') event.set() def car(name): print('%s正在等红灯' % name) event.wait() print('%s加油门,缓慢起步' % name) t = Thread(target=light) t.start() for i in range(1, 11): t = Thread(target=car, args=('五菱mini%s' % i,)) t.start()
进程池与线程池
线程池
from concurrent.futures import ThreadPoolExecutor import time import os # 线程池 pool = ThreadPoolExecutor(5) # 线程池线程数默认是CPU个数的五倍,支持自定义 def task(n): time.sleep(1) print(n) print(os.getpid()) return '任务的执行结果:%s' % n**2 def func(*args, **kwargs): print(args, kwargs) print(args[0].result()) for i in range(20): res = pool.submit(task, i) print(res.result()) pool.submit(task, i).add_done_callback(func)
进程池
from concurrent.futures import ProcessPoolExecutor from multiprocessing import current_process import time import os # 进程池 pool = ProcessPoolExecutor(5) # 进程池进程数默认是CPU个数,支持自定义 def task(n): time.sleep(2) print(n) print(os.getpid()) print(current_process().name) def func(*args, **kwargs): print(args, kwargs) print(args[0].result()) if __name__ == '__main__': for i in range(20): pool.submit(task, i).add_done_callback(func)
协程
协程,单线程下实现并发。程序员通过代码来检测程序的IO操作并处理,使得CPU感觉不到IO的存在从而最大幅度的占用CPU
from gevent import monkey;monkey.patch_all() from gevent import spawn import time def buy(name): print('%s buy uc' % name) time.sleep(5) print('%s buy eleme' % name) def found(name): print('%s found alibaba' % name) time.sleep(3) print('%s found alipay' % name) start_time = time.time() g1 = spawn(buy, 'jack_ma') g2 = spawn(found, 'jack_ma') g1.join() g2.join() print('总耗时:', time.time() - start_time) # 总耗时: 5.3707029819488525
基于协程实现TCP服务端并发
服务端
from gevent import monkey;monkey.patch_all() from gevent import spawn import socket def communication(sock): while True: data = sock.recv(1024) # IO操作 print(data.decode('utf8')) sock.send(data.upper()) def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) while True: sock, addr = server.accept() # IO操作 spawn(communication, sock) g1 = spawn(get_server) g1.join()
客户端
from threading import Thread, current_thread import socket def get_client(): client = socket.socket() client.connect(('127.0.0.1', 8080)) count = 0 while True: send_msg = input('send_msg>>>:').strip() msg = '%s %s %s' % (current_thread().name, send_msg, count) count += 1 client.send(msg.encode('utf8')) data = client.recv(1024) print(data.decode('utf8')) # 创建多线程 for i in range(20): t = Thread(target=get_client) t.start()
这篇关于并发编程4的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16ShardingSphere 如何完美驾驭分布式事务与 XA 协议?
- 2024-11-16ShardingSphere如何轻松驾驭Seata柔性分布式事务?
- 2024-11-16Maven资料入门指南
- 2024-11-16Maven资料入门教程
- 2024-11-16MyBatis Plus资料:新手入门教程与实践指南
- 2024-11-16MyBatis-Plus资料入门教程:快速上手指南
- 2024-11-16Mybatis资料入门教程:新手必看指南
- 2024-11-16MyBatis资料详解:新手入门与初级实战指南
- 2024-11-16MyBatisPlus资料:初学者入门指南与实用教程
- 2024-11-16MybatisPlus资料详解:初学者入门指南