menu Chancel's blog
rss_feed
Chancel's blog
有善始者实繁,能克终者盖寡。

Python3线程的使用

作者:Chancel Yang, 创建:2022-05-18, 字数:6429, 已阅:490, 最后更新:2024-03-10

在Python中,如果有多个计算任务需要并行计算,通常会采用多线程(multithreading)或者协程(coroutine)的方式来完成

本文是基于Python3.9官方文档说明,针对一些开发中常见的应用进行总结

1. 线程

1.1. 说明

线程是操作系统进行运算调度的最小单位,一个进程可以有多个线程,多个线程之间共享当前进程的全部系统资源

因此,线程也被称为"轻量级进程"

线程虽然共享进程的全部系统资源,但仍然拥有自己的独立资源部分

  • 调用栈(call stack)
  • 寄存器环境(register context)
  • 本地存储(thread-local storage)

由于共享当前进程所有系统资源,所以一个线程可以轻易摧毁其他线程的独立资源部分

线程优点

  • 相较于进程的方式节省系统资源
  • 多线程之间的数据共享
  • 充分利用CPU多核多线程

线程缺点

  • 内存占用固定
  • 切换线程上下文耗时

Linux下可通过 ulimit -s 来查看为每一个线程默认分配内存大小

2. 例子

在Python3中启动一个线程非常简单

编辑一个main.py,代码如下

Python
import datetime
import time
from threading import Thread

letters = ['a', 'b', 'c', 'd', 'e', 'f']


def PrintLetter(sleep: int = 5):
    while len(letters) > 0:
        print('%s Pop letter: %s' % (datetime.datetime.now().strftime('%H:%M:%S'), letters.pop()))
        time.sleep(sleep)
    print('Print success')


t = Thread(target=PrintLetter, args=(1, ))
t.start()

输出如下

Bash
17:30:10 Pop letter: f
17:30:11 Pop letter: e
17:30:12 Pop letter: d
17:30:13 Pop letter: c
17:30:14 Pop letter: b
17:30:15 Pop letter: a

但需要注意的是,Python中的线程是通过操作系统提供的底层线程机制来实现的。Python解释器会将线程映射到操作系统的底层线程,比如POSIX线程(在类Unix系统上)或Windows线程(在Windows系统上)

这会导致一些常见的注意事项,如全局GIL锁平台兼容性

2.1. 消息订阅

实际开发中,经常需要线程之间互相通信或订阅某些特定信息来决定是否执行下一步

线程之间的消息传递

  • 队列对象 queue.Queue 是线程安全的,可用于线程之间交互
  • 线程属性 threding.Event 可用于状态监听,以满足特别数据在被线程处理完成后由其他线程进行下一步处理

假设有2个线程,1个生产者线程负责推送字符串"hello",等待消费者线程在字符串后添加"world"后生产者线程立刻打印出来

线程间通信实际上是在线程间传递对象引用

编辑main.py,实现如下

Python
from multiprocessing import Event
from queue import Queue
from threading import Thread, Event
import time


def Producet(q: Queue):
    while True:
        e = Event()
        hello = ['hello']
        q.put((hello, e))
        print('%s Producet push data "hello"' % time.time())
        e.wait()
        print('%s Producet say %s\n' % (time.time(), ' '.join(hello)))


def consumer(q: Queue):
    while True:
        # Until the queue has to value
        data, e = q.get()
        time.sleep(5)
        data = data.append('world')
        e.set()
        print('%s Consumer add "world" to data \n' % time.time())


qe = Queue()
t1 = Thread(target=Producet, args=(qe, ))
t1.start()
t2 = Thread(target=consumer, args=(qe, ))
t2.start()

输出如下,可以看到生产者线程在加工后的1毫秒内拿到了”hello world“字符串

Bash
1652840455.1704156 Producet push data "hello"
1652840460.1712673 Consumer add "world" to data 
1652840460.1713297 Producet say hello world

2.2. 线程池

有时不好确认任务数量大小,那么线程池便派上用场了,以产生随机字符串为例,每次产生1个随机字符串需要1秒,我们使用2条线程来生成4个长度分别为5,6,7,8的随机字符串

编辑main.py,代码如下

Python
from concurrent.futures import ThreadPoolExecutor
import random
import threading
import time


def RomdomString(length: int):
    romdom_Str = ''
    while length > 0:
        num = random.randint(0, 9)
        s = str(random.choice([num, chr(random.randint(65, 90))]))
        romdom_Str += s
        length -= 1
        time.sleep(1)
    t = threading.currentThread()
    print('Thread(%s) romdom string: %s' % (t.native_id, romdom_Str))
    return romdom_Str


start = time.time()

pool = ThreadPoolExecutor(2)
t1 = pool.submit(RomdomString, 5)
t2 = pool.submit(RomdomString, 6)
t3 = pool.submit(RomdomString, 7)
t4 = pool.submit(RomdomString, 8)

print('All thread start')
print('T1 string: %s\nT2 string: %s\nT3 string: %s\nT4 string: %s' % (t1.result(), t2.result(), t3.result(), t4.result()))
print('Run time: %0.3f' % (time.time() - start))

输出如下

Bash
All thread start
Thread(13538) romdom string: VZFSV
Thread(13541) romdom string: E4PM0S
Thread(13538) romdom string: M2QN7XX
Thread(13541) romdom string: 2MZ33XH3
T1 string: VZFSV
T2 string: E4PM0S
T3 string: M2QN7XX
T4 string: 2MZ33XH3
Run time: 14.018

可以看到无论何时都只有2条线程在处理字符串的生成,且自行调度处理剩余任务,线程池非常适合处理大量IO堵塞型任务的场景

通常,你也应该只在I/O处理相关代码中使用线程池

3. 其他

3.1. GIL全局解释锁

GIL(Global Interpreter Lock),即全局解释锁,在Python中,GIL是一种线程级别的锁,用于保护解释器内部的数据结构不受并发访问的影响

简单来说,GIL限制了Python解释器同一时间只能执行一个线程的字节码,即同一时刻只有一个线程在解释和执行Python代码。这意味着在多线程的情况下,Python无法真正实现多核的并行计算

GIL的存在是因为CPython解释器的设计和实现方式,CPython是Python的一种主要实现,它使用了引用计数(reference counting)作为内存管理的方式

GIL的主要作用是保护引用计数的一致性,确保在多线程环境下引用计数的操作是线程安全的

由于同一时间只有一个线程在执行Python代码,所以在多线程的情况下,无法充分利用多核处理器的性能优势

因此,在需要并行执行计算密集型任务时,使用多线程可能并不能提高性能,反而可能会导致性能下降。

Python的解释器实现并不只有CPython一种,在JPython的实现中并没有GIL锁

综上,结论有2点

  • 对于IO密集型任务,多线程与多进程的性能区别不大(GIL锁优化了IO堵塞的情况)
  • 对于计算密集型任务,多线程没有提升(由于线程调度上下文切换可能更慢),应采用多进程或协程处理

例如对于下列程序,计算 100000000 的累加值

Python
import time

def Add(n: int):
    count = 1
    for i in range(2, n):
        count = count + i
    print('Count: %s' % count)

start = time.time()

Add(100000000)
Add(100000000)

print('Run time: %0.3f' % (time.time() - start))

输出如下

Bash
Count: 4999999950000000
Count: 4999999950000000
Run time: 15.024

我们采用线程的方式来并行计算2个 100000000 的累加值

Python
import time
from threading import Thread


def Add(n: int):
    count = 1
    for i in range(2, n):
        count = count + i
    print('Count: %s ' % count)


start = time.time()

t1 = Thread(target=Add, args=(100000000, ))
t1.start()
t2 = Thread(target=Add, args=(100000000, ))
t2.start()

# Wait t1 and t2 completed
t1.join()
t2.join()

print('Run time: %0.3f' % (time.time() - start))

输出结果

Bash
Count: 4999999950000000 
Count: 4999999950000000 
Run time: 15.406

可以看到相对于顺序执行其速度没有区别,对于计算密集型任务而言,线程并不能使处理速度变快

如果一定要在Python中处理计算密集型任务,可以考虑进程池

3.2. 线程锁

现实中任何多线程调度的操作多半要考虑到原子操作(atomic operation),即固定步骤的执行顺序不应被线程调度机制所打断乃至出现异常

如以下 Add 方法,每调用1次,则count的值增大500000,2条线程各调一次,理论上应该输出1000000

Python
from threading import Thread

count = 0

def Add():
    global count
    i = 0
    while i < 500000:
        count += 1
        i += 1


t1 = Thread(target=Add)
t1.start()
t2 = Thread(target=Add)
t2.start()

# Wait t1 and t2 completed
t1.join()
t2.join()

print('Count value : %d' % count)

输出如下,Count的值并不是1000000,多次执行可以发现这个数变化不一,这就是原子操作被破坏的体现

Bash
Count value : 804263

锁可以在实际开发中很好的避免这种问题,但通常也伴随着一定的性能损耗

Python
from threading import Thread, Lock

count = 0

lock = Lock()


def Add():
    global count
    i = 0
    while i < 500000:
        with lock:
            count += 1
        i += 1


t1 = Thread(target=Add)
t1.start()
t2 = Thread(target=Add)
t2.start()

# Wait t1 and t2 completed
t1.join()
t2.join()

print('Count value : %d' % count)

输出如下,多次执行Count的值都是 1000000

Bash
Count value : 1000000

4. 尾声

资料来源


[[replyMessage== null?"发表评论":"发表评论 @ " + replyMessage.m_author]]

account_circle
email
web_asset
textsms

评论列表([[messageResponse.total]])

还没有可以显示的留言...
[[messageItem.m_author]] [[messageItem.m_author]]
[[messageItem.create_time]]
[[getEnviron(messageItem.m_environ)]]
[[subMessage.m_author]] [[subMessage.m_author]] @ [[subMessage.parent_message.m_author]] [[subMessage.parent_message.m_author]]
[[subMessage.create_time]]
[[getEnviron(messageItem.m_environ)]]