在Python3.9中,如果有多个计算任务需要并行计算,通常会采用多线程(multithreading)或者协程(coroutine)的方式来完成
本文是基于Python3.9官方文档说明,针对一些开发中常见的应用进行总结
线程是操作系统进行运算调度的最小单位,一个进程可以有多个线程,多个线程之间共享当前进程的全部系统资源
线程有时也称为“轻量级进程”
线程虽然共享进程的全部系统资源,但仍然拥有自己的独立资源部分
由于共享当前进程所有系统资源,所以一个线程可以轻易摧毁其他线程的独立资源部分
线程优点
线程缺点
Linux下可通过 ulimit -s
来查看为每一个线程默认分配内存大小
在Python3中启动一个线程非常简单
Python中的线程会在一个单独的系统级线程中执行(比如说一个 POSIX 线程或者一个 Windows 线程)
代码如下
import datetime
import time
from threading import Thread
letters = ['a', 'b', 'c', 'd', 'e', 'f']
def PrintLetter(sleep: int = 5):
while len(letters) > 0:
print('%s Pop letter: %s' % (datetime.datetime.now().strftime('%H:%M:%S'), letters.pop()))
time.sleep(sleep)
print('Print success')
t = Thread(target=PrintLetter, args=(1, ))
t.start()
输出如下
17:30:10 Pop letter: f
17:30:11 Pop letter: e
17:30:12 Pop letter: d
17:30:13 Pop letter: c
17:30:14 Pop letter: b
17:30:15 Pop letter: a
实际开发经常需要线程之间互相通信或订阅某些特定信息来决定是否执行下一步
线程之间的消息传递
queue.Queue
是线程安全的,可用于线程之间交互threding.Event
可用于状态监听,以满足特别数据在被线程处理完成后由其他线程进行下一步处理假设有2个线程,1个生产者线程负责推送字符串"hello",等待消费者线程在字符串后添加"world"后生产者线程立刻打印出来
线程间通信实际上是在线程间传递对象引用
代码实现如下
from multiprocessing import Event
from queue import Queue
from threading import Thread, Event
import time
def Producet(q: Queue):
while True:
e = Event()
hello = ['hello']
q.put((hello, e))
print('%s Producet push data "hello"' % time.time())
e.wait()
print('%s Producet say %s\n' % (time.time(), ' '.join(hello)))
def consumer(q: Queue):
while True:
# Until the queue has to value
data, e = q.get()
time.sleep(5)
data = data.append('world')
e.set()
print('%s Consumer add "world" to data \n' % time.time())
qe = Queue()
t1 = Thread(target=Producet, args=(qe, ))
t1.start()
t2 = Thread(target=consumer, args=(qe, ))
t2.start()
输出如下,可以看到生产者线程在加工后的1毫秒内拿到了”hello world“字符串
1652840455.1704156 Producet push data "hello"
1652840460.1712673 Consumer add "world" to data
1652840460.1713297 Producet say hello world
CPython解释器采用一种叫做“GIL全局解释锁”的互斥锁,全称是Global Interpreter Lock,用于防止多线程并发执行机器码
关于GIL在这里不展开,其结论有2点
对于IO密集型任务,多线程与多进程的性能区别不大(GIL锁优化了IO堵塞的情况)
对于计算密集型任务,多线程没有提升(由于线程调度上下文切换可能更慢),应采用多进程或协程处理
例如对于下列程序,计算 100000000 的累加值
import time
def Add(n: int):
count = 1
for i in range(2, n):
count = count + i
print('Count: %s' % count)
start = time.time()
Add(100000000)
Add(100000000)
print('Run time: %0.3f' % (time.time() - start))
输出如下
Count: 4999999950000000
Count: 4999999950000000
Run time: 15.024
我们采用线程的方式来并行计算2个 100000000 的累加值
import time
from threading import Thread
def Add(n: int):
count = 1
for i in range(2, n):
count = count + i
print('Count: %s ' % count)
start = time.time()
t1 = Thread(target=Add, args=(100000000, ))
t1.start()
t2 = Thread(target=Add, args=(100000000, ))
t2.start()
# Wait t1 and t2 completed
t1.join()
t2.join()
print('Run time: %0.3f' % (time.time() - start))
输出结果
Count: 4999999950000000
Count: 4999999950000000
Run time: 15.406
可以看到相对于顺序执行其速度没有区别,对于计算密集型任务而言,线程并不能使处理速度变快
如果一定要在Python中处理计算密集型任务,可以考虑进程池
现实中任何多线程调度的操作多半要考虑到原子操作(atomic operation),即固定步骤的执行顺序不应被线程调度机制所打断乃至出现异常
在Python3中,dict/tuple/list等对象都是线程安全的,可以大胆使用
如以下 Add
方法,每调用1次,则count的值增大500000,2条线程各调一次,理论上应该输出1000000
from threading import Thread
count = 0
def Add():
global count
i = 0
while i < 500000:
count += 1
i += 1
t1 = Thread(target=Add)
t1.start()
t2 = Thread(target=Add)
t2.start()
# Wait t1 and t2 completed
t1.join()
t2.join()
print('Count value : %d' % count)
输出如下,Count的值并不是1000000,多次执行可以发现这个数变化不一,这就是原子操作被破坏的体现
Count value : 804263
锁可以在实际开发中很好的避免这种问题,但通常也伴随着一定的性能损耗
from threading import Thread, Lock
count = 0
lock = Lock()
def Add():
global count
i = 0
while i < 500000:
with lock:
count += 1
i += 1
t1 = Thread(target=Add)
t1.start()
t2 = Thread(target=Add)
t2.start()
# Wait t1 and t2 completed
t1.join()
t2.join()
print('Count value : %d' % count)
输出如下,多次执行Count的值都是 1000000
Count value : 1000000
有时不好确认任务数量大小,那么线程池便派上用场了
以产生随机字符串为例,每次产生1个随机字符串需要1秒
我们使用2条线程来生成4个长度分别为5,6,7,8的随机字符串
代码如下
from concurrent.futures import ThreadPoolExecutor
import random
import threading
import time
def RomdomString(length: int):
romdom_Str = ''
while length > 0:
num = random.randint(0, 9)
s = str(random.choice([num, chr(random.randint(65, 90))]))
romdom_Str += s
length -= 1
time.sleep(1)
t = threading.currentThread()
print('Thread(%s) romdom string: %s' % (t.native_id, romdom_Str))
return romdom_Str
start = time.time()
pool = ThreadPoolExecutor(2)
t1 = pool.submit(RomdomString, 5)
t2 = pool.submit(RomdomString, 6)
t3 = pool.submit(RomdomString, 7)
t4 = pool.submit(RomdomString, 8)
print('All thread start')
print('T1 string: %s\nT2 string: %s\nT3 string: %s\nT4 string: %s' % (t1.result(), t2.result(), t3.result(), t4.result()))
print('Run time: %0.3f' % (time.time() - start))
输出如下
All thread start
Thread(13538) romdom string: VZFSV
Thread(13541) romdom string: E4PM0S
Thread(13538) romdom string: M2QN7XX
Thread(13541) romdom string: 2MZ33XH3
T1 string: VZFSV
T2 string: E4PM0S
T3 string: M2QN7XX
T4 string: 2MZ33XH3
Run time: 14.018
可以看到无论何时都只有2条线程在处理字符串的生成,且自行调度处理剩余任务,线程池非常适合处理大量IO堵塞型任务的场景
通常,你也应该只在I/O处理相关代码中使用线程池