python多线程详解

简介

多进程和多线程都可以执行多个任务,线程是进程的一部分。线程的特点是线程之间可以共享内存和变量,资源消耗少,缺点是线程之间的同步和加锁比较麻烦。
在cpython中,截止到3.12为止依然存在全局解释器锁(GIL),不能发挥多核的优势,因此python多线程更适合IO密集型任务并发提高效率,CPU密集型任务推荐使用多进程并行解决。

  • 注:此说法仅适用于python(如:c++的多线程可以利用到多核并行计算)。

使用

新建:使用线程的第一步就是创建线程,创建后的线程只是进入可执行的状态,也就是Runnable
Runnable:进入此状态的线程还并未开始运行,一旦CPU分配时间片给这个线程后,该线程才正式的开始运行
Running:线程正式开始运行,在运行过程中线程可能会进入阻塞的状态,即Blocked
Blocked:在该状态下,线程暂停运行,解除阻塞后,线程会进入Runnable状态,等待CPU再次分配时间片给它
结束:线程方法执行完毕或者因为异常终止返回
其中最复杂的是线程从Running进入Blocked状态,通常有三种情况:

睡眠:线程主动调用**sleep()或join()方法后.
等待:线程中调用
wait()方法,此时需要有其他线程通过notify()**方法来唤醒
同步:线程中获取线程锁,但是因为资源已经被其他线程占用时.

示例

import threading
import time
#定义线程需要做的内容,写在函数里面
def target():
    print('当前的线程%s 在运行' % threading.current_thread().name)
    time.sleep(1)
    print('当前的线程 %s 结束' % threading.current_thread().name)

print('当前的线程 %s 在运行' % threading.current_thread().name)
t = threading.Thread(target=target,args = [])
 
t.start()  #线程启动
print('当前的线程 %s 结束' % threading.current_thread().name)

输出

当前的线程 MainThread 在运行
当前的线程Thread-13 在运行
当前的线程 MainThread 结束
当前的线程 Thread-13 结束

从结果可以看出,主线程结束后,子线程还没有运行结束;这时候为了让子进程先与主进程结束,可以调用join( )函数;
join( )函数:join所完成的工作就是线程同步,即主线程任务结束之前,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程再终止。
join有一个timeout参数:

当设置守护线程时【setDaemon(True)】,含义是主线程对于子线程等待timeout的时间将会杀死该子线程,最后退出程序。所以说,如果有10个子线程,全部的等待时间就是每个timeout的累加和。简单的来说,就是给每个子线程一个timeout的时间,让他去执行,时间一到,不管任务有没有完成,直接杀死。
没有设置守护线程时,主线程将会等待timeout的累加和这样的一段时间,时间一到,主线程结束,但是并没有杀死子线程,子线程依然可以继续执行,直到子线程全部结束,程序退出。

import threading
import time
 
def target():
    print('当前的线程%s 在运行' % threading.current_thread().name)
    time.sleep(1)
    print('当前的线程 %s 结束' % threading.current_thread().name)
 
print('当前的线程 %s 在运行' % threading.current_thread().name)
t = threading.Thread(target=target,args = [])
 
t.start()
t.join() #阻塞进程
print('当前的线程 %s 结束' % threading.current_thread().name)

添加join( )阻塞进程后输出:

当前的线程 MainThread 在运行
当前的线程Thread-11 在运行
当前的线程 Thread-11 结束
当前的线程 MainThread 结束

但如果为线程实例添加t.setDaemon(True)守护进程之后,如果不加join语句,那么当主线程结束之后,会杀死子线程。如果加上join,并设置等待时间,就会等待线程一段时间再退出:

首先看一下守护进程机制:

import time

def run():

    time.sleep(2)
    print('当前线程的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('这是主线程:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.setDaemon(True)
        t.start()

    print('主线程结束了!' , threading.current_thread().name)
    print('一共用时:', time.time()-start_time)
这是主线程: MainThread
主线程结束了! MainThread
一共用时: 0.015624046325683594
当前线程的名字是: 当前线程的名字是:  Thread-30
当前线程的名字是:  Thread-29
 Thread-31
当前线程的名字是:  Thread-33
当前线程的名字是:  Thread-32

可以看出当设置守护进程t.setDaemon(True)后,主进程结束就结束了整个进程,子进程依旧打印。

然后加上join( ),如果加上join,并设置等待时间,就会等待线程一段时间再退出

import threading

import time
 
def target():
    print('当前的线程%s 在运行' % threading.current_thread().name)
    time.sleep(4)
    print('当前的线程 %s 结束' % threading.current_thread().name)
 
print('当前的线程 %s 在运行' % threading.current_thread().name)
t = threading.Thread(target=target,args = [])

t.setDaemon(False)
t.start()
t.join(5)  #为子线程设定运行的时间,5s后就退出子线程,如果不加等待时间,就会一直等待子线程运行结束后,再运行主线程
print('当前的线程 %s 结束' % threading.current_thread().name)
当前的线程 MainThread 在运行
当前的线程Thread-35 在运行
当前的线程 Thread-35 结束
当前的线程 MainThread 结束

线程池

1.1 什么是线程池?

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。 例如,线程数一般取 cpu 数量+2 比较合适,线程数过多会导致额外的线程切换开销。

线程池的优点:
降低资源消耗,复用已创建的线程来降低创建和销毁线程的消耗。
提高响应速度,任务到达时,可以不需要等待线程的创建立即执行。
提高线程的可管理性,使用线程池能够统一的分配、调优和监控。

import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def task(i):
    sleep_seconds = random.randint(1, 3)  # 随机睡眠时间
    print('线程名称:%s,参数:%s,睡眠时间:%s' % (threading.current_thread().name, i, sleep_seconds))
    time.sleep(sleep_seconds)  # 定义睡眠时间
    return i


def task_callback(i):
    print(i)


with ThreadPoolExecutor(max_workers=2) as pool:  # 定义两个线程,这样就建立了一条简单的线程池,其中最大线程数为 2
    for i in range(10):  # 创建十个任务
        future_result = pool.submit(task, i)  # 提交线程需要执行的任务(函数名和参数)到线程池中,立刻返回一个future对象。
        # future_result.cancel()  # 取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
        print(future_result.cancelled())  # 返回 Future 代表的线程任务是否被成功取消。
        print(future_result.running())
        print(future_result.result(timeout=1))  # 获取函数执行结果,可以设置超时时间
        future_result.add_done_callback(task_callback)  # 设置回调函数
with ThreadPoolExecutor(max_workers=2) as pool:
    # map() 函数会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数. 同时,使用 map 函数,还会自动获取返回值。
    pool.map(task, [i for i in range(11, 15)])

扩展

在使用多线程应用下为了保证线程安全以及线程之间的同步和共享变量问题,python提供了Lock 、Rlock 、Semaphore 、Event 、Condition 用来保证线程之间的同步,后者保证访问共享变量的互斥问题。

  • Lock & RLock:互斥锁,用来保证多线程访问共享变量的问题
  • Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时拥有。
  • Event对象:它是线程间通信的方式,相当于信号,一个线程可以给另外一个线程发送信号后让其执行操作。
  • Condition对象:其可以在某些事件触发或者达到特定的条件后才处理数据

Lock(互斥锁)

请求锁定 — 进入锁定池等待 — — 获取锁 — 已锁定— — 释放锁
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
构造方法:mylock = Threading.Lock( )
实例方法:

acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
实例一(未使用锁):

import threading
import time

num = 0

def show(arg):
    global num
    time.sleep(1)
    num +=1
    print('bb :{}'.format(num))

for i in range(5):
    t = threading.Thread(target=show, args=(i,))  # 注意传入参数后一定要有【,】逗号
    t.start()

print('main thread stop')

--------------------------------------------------------------------------
main thread stop
bb :1
bb :2
bb :3bb :4
bb :5

实例二(使用锁)

import threading
import time

num = 0

lock = threading.RLock()


# 调用acquire([timeout])时,线程将一直阻塞,
# 直到获得锁定或者直到timeout秒后(timeout参数可选)。
# 返回是否获得锁。
def Func():
    lock.acquire()
    global num
    num += 1
    time.sleep(1)
    print(num)
    lock.release()


for i in range(10):
    t = threading.Thread(target=Func)
    t.start()
------------------------------------------------------------------
1
2
3
4
5
6
7
8
9
10
#可以看出,全局变量在在每次被调用时都要获得锁,才能操作,因此保证了共享数据的安全性

对于Lock对象而言,如果一个线程连续两次release,使得线程死锁。所以Lock不常用,一般采用Rlock进行线程锁的设定。

import threading
import time

class MyThread(threading.Thread):
    def run(self):
        global num 
        time.sleep(1)

        if lock.acquire(1):  
            num = num+1
            msg = self.name+' set num to '+str(num)
            print(msg)
            lock.acquire()
            lock.release()
            lock.release()
num = 0
lock = threading.Lock()
def test():
    for i in range(5):
        t = MyThread()
        t.start()
if __name__ == '__main__':
    test()
------------------------------------------------------
Thread-12 set num to 1

RLock(可重入锁)

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
构造方法:mylock = Threading.RLock()
实例方法:acquire([timeout])/release(): 跟Lock差不多。

实例解决死锁,调用相同次数的acquire和release,保证成对出现

import threading
rLock = threading.RLock()  #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

print(rLock.acquire())

详细实例:

import threading
mylock = threading.RLock()
num = 0
class WorkThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.t_name = name
    def run(self):
        global num
        while True:
            mylock.acquire()
            print('\n%s locked, number: %d' % (self.t_name, num))
            if num >= 2:
                mylock.release()
                print('\n%s released, number: %d' % (self.t_name, num))
                break
            num += 1
            print('\n%s released, number: %d' % (self.t_name, num))
            mylock.release()
def test():
    thread1 = WorkThread('A-Worker')
    thread2 = WorkThread('B-Worker')
    thread1.start()
    thread2.start()
if __name__ == '__main__':
    test() 
--------------------------------------------------
A-Worker locked, number: 0

A-Worker released, number: 1

A-Worker locked, number: 1

A-Worker released, number: 2

A-Worker locked, number: 2

A-Worker released, number: 2

B-Worker locked, number: 2

B-Worker released, number: 2

Event

  • threading.Event()
    Python并未提供能够立马阻塞的函数,我们可以使用 threading.Event()
    threading.Event()可以产生一个event对象,并带一个默认值为False的标志位,通过方法set()可以设置为True;
    event.wait(timeout) 阻塞标记,可以设置延迟时间,程序执行到此处就会决定是否阻塞
    event.set() 设置非阻塞状态
    event.clear() 设置阻塞状态
import threading
import time
class MyThread(threading.Thread):
    def __init__(self):
        super().__init__()
 
        # 定义event对象,并使用set()设置标志位为True
        self.event = threading.Event()
        self.event.set()
 
    def run(self):
        print("start:")
        for i in range(1, 11):
            self.event.wait()
            print("第%d行" % i)
            time.sleep(1)
 
    def pause(self):
        print("阻塞")
        self.event.clear()
 
    def restart(self):
        print("不阻塞")
        self.event.set()
 
 
m = MyThread()
m.start()
time.sleep(3.1)
m.pause()
time.sleep(3)
print('阻塞3秒后')
m.restart()
  • event.wait()如何放置
    假设下面有个10000次插入数据库的需求,你要保证能够迅速的被阻断,又要保证最小事务的完整性。
def insert():
    insert_DB()
 
def requirement():
    for i in range(10000):
        event.wait()
        insert()

ERROR

def requirement():
    event.wait()
    for i in range(10000):
        insert()

ERROR

def insert():
    #'''
    event.wait()
    #'''   

Condition

但是互斥锁是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持。
Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。
Condition的处理流程如下:
首先acquire一个条件变量,然后判断一些条件。
如果条件不满足则wait;
如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。
不断的重复这一过程,从而解决复杂的同步问题。
Condition的基本原理如下:
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有的线程永远处于沉默状态。

演示条件变量同步的经典问题是生产者与消费者问题:假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。用Condition解决生产者与消费者问题的代码如下:

import threading
import time

class Producer(threading.Thread):
    # 生产者函数
    def run(self):
        global count
        while True:
            if con.acquire():
                # 当count 小于等于1000 的时候进行生产
                if count > 1000:
                    con.wait()
                else:
                    count = count+100
                    msg = self.name+' produce 100, count=' + str(count)
                    print(msg)
                    # 完成生成后唤醒waiting状态的线程,
                    # 从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁
                    con.notify()
                con.release()
                time.sleep(1)

class Consumer(threading.Thread):
    # 消费者函数
    def run(self):
        global count
        while True:
            # 当count 大于等于100的时候进行消费
            if con.acquire():
                if count < 100:
                    con.wait()
                
                else:
                    count = count-5
                    msg = self.name+' consume 5, count='+str(count)
                    print(msg)
                    con.notify()
                    # 完成生成后唤醒waiting状态的线程,
                    # 从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁
                con.release()
                time.sleep(1)

count = 500
con = threading.Condition()

def test():
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()
if __name__ == '__main__':
    test()

信号量(Semaphore)

信号量是一个内部数据,它有一个内置的计数器,它标明当前的共享资源可以有多少线程同时读取。

示例:定义一个只能同时执行5个线程的信号量。

semaphore = threading.Semaphore(5)  # 创建信号量对象,5个线程并发

当线程需要读取关联信号量的共享资源时,需调用acquire(),这时信号量的计数器会-1。

semaphore.acquire() # 获取共享资源,信号量计数器-1

在线程不需要共享资源时,需释放信号release(),这时信号量的计数器会+1,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。。

semaphore.release()  # 释放共享资源,信号量计数器+1

信号量控制规则:当计数器大于0时,那么可以为线程分配资源权限;当计数器小于0时,未获得权限的线程会被挂起,直到其他线程释放资源。

示例1: 信号量运行3个线程并行

import time
import threading
import random

# 创建信号量对象,信号量设置为3,需要有3个线程才启动
semaphore = threading.Semaphore(3)


def func():

    if semaphore.acquire():  # 获取信号 -1
        print(threading.currentThread().getName() + '获得信号量')
        time.sleep(random.randint(1, 5))
        semaphore.release()  # 释放信号 +1


for i in range(10):
    t1 = threading.Thread(target=func)
    t1.start()

注意观察程序运行,开始只有3个线程获得了资源的权限,后面当释放几个资源时就有几个获得资源权限。

示例2:运用信号量进行线程同步

import threading
import time
import random

# 同步两个不同线程,信号量被初始化0
semaphore = threading.Semaphore(0)


def consumer():
    print("-----等待producer运行------")
    semaphore.acquire()  # 获取资源,信号量为0被挂起,等待信号量释放
    print("----consumer 结束----- 编号: %s" % item )


def producer():
    global item  # 全局变量
    time.sleep(3)
    item = random.randint(0, 100)  # 随机编号
    print("producer运行编号: %s" % item)
    semaphore.release()


if __name__ == "__main__":
    for i in range(0, 4):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("程序终止")

信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。这种运用场景有时会用到,比较难理解,多运行示例仔细观察打印结果。

拓展:

信号量的一个特殊用法是互斥量。互斥量是初始值为1的信号量,可以实现数据、资源的互斥访问。

信号量在在多线程的编程语言中应用很广,他也有可能造成死锁的情况。例如,有一个线程t1,先等待信号量s1,然后等待信号量s2,而线程t2会先等待信号量s2,然后再等待信号量s1,这样就会发生死锁,导致t1等待s2,但是t2在等待s1。

Barrie 障碍锁

也可以叫屏障或者栅栏,可以想象成路障、道闸。当线程达到设定的数值时放开道闸允许继续运行。

创建Barrier障碍锁

barrier=threading.Barrier(parties, action=None, timeout=None)

参数:

parties 参与线程的数量
action 全部线程被释放时可被其中一条线程调用的可调用对象
timeout 调用wait方法时未指定时超时的默认值
Barrier实例的方法

wait(timeout=None) 等待通过栅栏,返回值是一个0到parties-1之间的整数, 每个线程返回不同。如果wait方法设置了超时,并超时发送,栅栏将处于broken状态。
reset() 重置障碍,返回默认的 False 状态,即重新开始阻塞线程。
abort() 将障碍置为断开状态,这将导致已调用wait()和之后调用wait()引发BrokenBarrierError异常。
Barrier实例的属性

partier 通过障碍的线程数
n_waiting 当前在屏障中等待的线程数
broken 布尔值,表明barrier是否broken。
示例1:

import threading

def work(barrier):
    print("n_waiting = {}".format(barrier.n_waiting))  # 等待的线程数
    bid = barrier.wait()  # 参与者的id,返回0到线程数减1的数值
    print("障碍后运行 {}".format(bid))  # 障碍之后

barrier = threading.Barrier(3)  # 3个参与者,每3个开闸放行

for x in range(6):  # 这里启动线程只能是3的倍数,你可以试5
    threading.Thread(target=work, args=(barrier,)).start()

注意:当我们设置Barrier的放行的线程数时,我们启动的线程只能是他的倍数,不然会有线程一直陷入等待中。

怎么解决这个问题,我们需要在wait加入timeout参数,如下示例。

示例2: 解决线程不是Barrier的倍数问题

import threading

def work(barrier):
    bid = None
    print("n_waiting = {}".format(barrier.n_waiting))  # 等待的线程数
    try:
        # 设置超时时间,线程达不到数量会抛出异常
        bid = barrier.wait(1)  # 设置线程超时时间
        print("障碍后运行 {}".format(bid))  # 障碍之后
    except threading.BrokenBarrierError:  # 接收超时的异常
        barrier.abort()  # 取消屏障
        print("打破障碍运行 {}".format(threading.current_thread()))


barrier = threading.Barrier(3)  # 3个参与者,每3个开闸放行
for x in range(5):  # 5个线程,剩余2个会抛出超时异常
    threading.Event().wait(0.1)
    threading.Thread(target=work, args=(barrier,)).start()

队列

Queue 模块可以实现多生产者与多消费者队列,它可以实现多个线程之间的信息安全交换。

它有几种队列模式:

FIFO队列,先进先出

q = queue.Queue(10)  # FIFO 队列,最多放入10个项目
q.put(123) # 队列中存入项目123

LIFO队列,后进先出,如同栈

q = queue.LifoQueue()   # LIFO 队列,项目数无限 
q.put(123) # 队列中存入项目123

Priority队列,对着中的数据始终保持排序,优先检索最低值。 通常使用 (优先序号, 数据)形式存储数据。不带需要默认对其值进行排序。
注意:Priority队列因为是顺序的,存储的数据必须是要能排序,即相同类型数据
q = queue.PriorityQueue(10) # Priority 队列,最多放入10个项目
q.put((1,‘a’)) # 队列中存入项目(1,‘a’)
注意: 创建队列时会指定队列中最多存储项目的个数,就是设定一个非常大的值也比无限制好。

队列的公共方法:

  • qsize() 队列大大致大小,非准确值
  • empty() 当前是否为空
  • full() 当前是否已满
  • put(item, block=True, timeout=None) 将item放入队列。
    block=True, timeout=None 在必要时阻塞,直到有空位可用,timeout 为阻止的时间,超时抛出Full异常。
    block=False 立即将item放入队列,队列已满引发Full异常。
  • put_nowait(item) 立即放入队列,同put(item,False)
    get(block=True, timeout=None) 从队列中删除并返回一个item。
    block=True, timeout=None 在必要时阻塞,直到有可用数据为止,timeout 为阻止的时间,超时抛出Empty异常。
    block=False 立即获取队列中的可用数据,否则抛出Empty异常。
  • get_nowait() 立即获取队列中的数据,同get(False)。
  • task_done() 向已完成的队列任务发送一个信号。一般是告诉join() 我以完成任务。
  • join() 阻塞线程,直到队列为空才放行。
    注意:join() 与 task_done() 是套组合拳,有使用 join() 必须在任务结束后执行 task_done() 通知队列。

示例:生产 消费模式

import threading, time
import queue

# 最多存入10个
q = queue.PriorityQueue(10)


def producer(name):
    ''' 生产者 '''
    count = 1
    while True:
        #  生产袜子
        q.put("袜子 %s" % count)  # 将生产的袜子方法队列
        print(name, "---生产了袜子", count)
        count += 1
        time.sleep(0.2)


def consumer(name):
    ''' 消费者 '''
    while True:
        print("%s ***卖掉了[%s]" % (name, q.get()))  # 消费生产的袜子
        time.sleep(1)
        q.task_done()  # 告知这个任务执行完了


# 生产线程
z = threading.Thread(target=producer, args=("张三",))
# 消费线程
l = threading.Thread(target=consumer, args=("李四",))
w = threading.Thread(target=consumer, args=("王五",))

# 执行线程
z.start()
l.start()
w.start()

线程安全

线程安全是指多线程并发访问共享资源时,不会导致数据的错误或不一致性。在多线程编程中,由于多个线程同时访问共享数据,可能会导致数据竞争、死锁等问题。线程安全的实现是为了避免这些问题的发生。

实现线程安全有多种方式,常见的包括使用锁、条件变量、原子操作、线程本地存储等。

锁: 锁是一种最常见的线程同步机制,用于控制对共享资源的访问。在Python中,锁有两种类型:互斥锁和读写锁。互斥锁在同一时刻只允许一个线程对共享数据进行操作,而读写锁则允许多个线程同时读取共享数据,但是只允许一个线程进行写操作。可以使用threading.Lock()或threading.RLock()来创建互斥锁或读写锁。
条件变量: 条件变量是一种用于线程间通信的同步机制,用于在多个线程之间共享状态信息,以便线程可以等待某个条件的出现并被通知。通常情况下,一个线程会等待另一个线程触发某个条件,当条件满足时,等待的线程被通知并继续执行。
在Python中,条件变量是通过threading.Condition类实现的。Condition类实际上是一个锁对象,它允许多个线程等待某个条件的出现。当条件被满足时,Condition会自动通知等待的线程继续执行。可以使用Condition对象的wait()方法使线程等待某个条件的出现,使用notify()或notify_all()方法通知等待的线程。
原子操作: 原子操作是指一种不可中断的操作,可以保证在多线程并发执行时不会导致数据的竞争和冲突。在Python中,可以使用atomic模块提供的atomic_add()、atomic_sub()、atomic_swap()等函数来实现原子操作。
线程本地存储: 线程本地存储是指为每个线程分配独立的存储空间,这样可以避免多个线程访问同一份数据的情况。在Python中,可以使用threading.local()类创建一个线程本地存储对象,每个线程都有一个独立的存储空间,可以在其中存储线程特定的数据。使用线程本地存储可以有效地避免线程安全问题。
在Python中,由于全局解释器锁(GIL)的存在,多线程并发执行时只有一个线程能够执行Python字节码,因此对于CPU密集型的操作,多线程并不能够提高程序的执行速度。但是,对于IO密集型的操作,多线程能够在等待IO时释放GIL,从而提高程序的执行效率。

为了实现线程安全,我们可以采取以下几个措施:

使用线程安全的数据结构: Python中的一些内置数据结构,例如list、dict、set等,都是非线程安全的。但是Python标准库中也提供了一些线程安全的数据结构,例如queue、threading.local等。使用这些数据结构可以有效地避免多个线程同时读写同一份数据的情况。
使用锁: 在Python中,可以使用锁来保证对共享数据的访问是同步的。锁可以分为互斥锁和读写锁两种类型。互斥锁在同一时刻只允许一个线程对共享数据进行操作,而读写锁则允许多个线程同时读取共享数据,但是只允许一个线程进行写操作。
避免竞争条件: 竞争条件是在多线程环境下常见的问题。例如,在多个线程之间对同一份数据进行自增操作时,可能会出现数据不一致的情况。为了避免竞争条件,可以使用锁、条件变量等同步原语来确保操作的原子性,避免多个线程同时对同一份数据进行操作。
使用线程池: 在Python中,可以使用线程池来管理线程的数量。线程池可以有效地避免创建过多的线程,从而避免因为线程数量过多而导致的性能下降和资源浪费。使用线程池还可以更加方便地管理线程的生命周期和执行的任务。
总之,在Python中,实现线程安全是非常重要的。通过使用线程安全的数据结构、锁、条件变量等同步原语,可以有效地避免多线程并发时的问题,保证程序的正确性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/567573.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

gcc make makefile cmake之间的关系梳理

gcc是GNU Compiler Collection&#xff08;GNU编译器套件&#xff09;&#xff0c;也可以简单认为是编译器&#xff0c;它可以编译很多编程语言&#xff08;包括C、C、Object-C、Fortran、Java等&#xff09;当你的程序只有一个源文件&#xff0c;直接用gcc命令编译它。但是当你…

【Java--数据结构】提升你的编程段位:泛型入门指南,一看就会!

前言 泛型是一种编程概念&#xff0c;它允许我们编写可以适用于多种数据类型的代码。通过使用泛型&#xff0c;我们可以在编译时期将具体的数据类型作为参数传递给代码&#xff0c;从而实现代码的复用和灵活性。 在传统的编程中&#xff0c;我们通常需要为不同的数据类型编写不…

总结一下背包里的顺序和是否逆序

1.对于01背包而言&#xff0c;一维压缩态只能物品到背包且需要逆序 2.对应多重背包而言&#xff0c;组合数物品到背包&#xff0c;排列数背包到物品&#xff0c;且都需要正序

【北京迅为】《iTOP-3588开发板系统编程手册》-第20章 socket 应用编程

RK3588是一款低功耗、高性能的处理器&#xff0c;适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用&#xff0c;RK3588支持8K视频编解码&#xff0c;内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

Mudem,打造私密安全、高效稳定的私人空间

Mudem 是 Codigger 平台中的一个关键组件&#xff0c;它提供基础通讯服务&#xff0c;确保不同类型的机器之间可以进行安全和高效的连接。它其设计理念在于将本地机器、公有云以及私有云上的设备无缝地整合为一个可远程在线访问的工作站&#xff08;Workstation&#xff09;。这…

UE4_常见动画节点学习_Two Bone IK双骨骼IK

学习资料&#xff0c;仅供参考&#xff01; Two Bone IK 控制器将逆运动&#xff08;IK&#xff09;解算器应用于到如角色四肢等3关节链。 变量&#xff08; HandIKWeight &#xff09;被用于在角色的 hand_l 和 hand_r 控制器上驱动 关节目标位置&#xff08;Joint Target Lo…

Java常见输入输出练习

1.AB(1) 计算ab 数据范围&#xff1a; 数据组数 1≤ t ≤100 , 数据大小满足 1≤ n ≤1000 输入描述&#xff1a; 输入包括两个正整数a,b(1 < a, b < 1000),输入数据包括多组。 输出描述&#xff1a; 输出ab的结果 输入例子&#xff1a; 1 5 10 20 输出例子&#xff…

ctfshow 每周大挑战RCE极限挑战

讨厌SQl看到这个了想来玩玩 rce1 <?phperror_reporting(0); highlight_file(__FILE__);$code $_POST[code];$code str_replace("(","括号",$code);$code str_replace(".","点",$code);eval($code);?>括号过滤点过滤&…

qt;lt;等xml|Html转义字符

在写Android布局文件时&#xff0c;左右尖括号<>&#xff0c;括号在XML中没办法直接使用&#xff0c;需要进行转义&#xff0c;收集一些转义符&#xff0c;以便查询使用。 常用表&#xff1a; **对于文章出现的任何问题请大家批评指出&#xff0c;一定及时修改 **可联系…

牛客网刷题 | BC60 判断是不是字母

描述 KiKi想判断输入的字符是不是字母&#xff0c;请帮他编程实现。 输入描述&#xff1a; 多组输入&#xff0c;每一行输入一个字符。 输出描述&#xff1a; 针对每组输入&#xff0c;输出单独占一行&#xff0c;判断输入字符是否为字母&#xff0c;输出内容详见输出样例…

加密、解密、签名、验签、数字证书、CA浅析

一、加密和解密 加密和解密应用的很广&#xff0c;主要作用就是防止数据或者明文被泄露。 加解密算法主要有两大类&#xff0c;对称加密和非对称加密。对称加密就是加密和解密的密钥都是一个&#xff0c;典型的有AES算法。非对称加密就是有公钥和私钥&#xff0c;公钥可以发布…

在线测径仪的六类测头组合形式!哪种适合你?

在线测径仪&#xff0c;这一现代工业的精密仪器&#xff0c;犹如一位技艺高超的工匠&#xff0c;以其卓越的性能和精准度&#xff0c;为工业生产提供了坚实的保障。它的出现&#xff0c;不仅提高了生产效率&#xff0c;更保证了产品质量&#xff0c;为企业的可持续发展注入了强…

1张图片+3090显卡微调Qwen-VL视觉语言大模型(仅做演示、效果还需加大数据量)

原项目地址&#xff1a;https://github.com/QwenLM/Qwen-VL/blob/master/README_CN.md 环境本地部署&#xff08;见之前博文&#xff09; 【本地部署 】23.08 阿里Qwen-VL&#xff1a;能对图片理解、定位物体、读取文字的视觉语言模型 (推理最低12G显存) 一、数据集格式说明 …

『视觉感官盛宴』3D线上商场全方位展示商品与互动购买体验

随着技术的进步和消费者需求的多样化&#xff0c;3D线上商场作为一种新兴的电子商务平台&#xff0c;正逐渐改变传统的在线购物模式。 一、商品展示革命 在3D线上商场中&#xff0c;商品展示不再局限于静态图片和文字描述。借助先进的3D建模技术&#xff0c;商家能够创建商…

从0到1带你玩转pandas

学习 pandas 的过程可以分为几个阶段&#xff0c;每个阶段都围绕着不同的核心技能和概念。下面是一个为初学者设计的学习大纲&#xff1a; 一. 基础介绍 学习如何安装和设置 pandas 以及了解它的基本概念是开始使用 pandas 进行数据分析的第一步。下面我将详细介绍这些步骤&am…

【MySQL】A01、性能优化-语句分析

1、数据库优化方向 A、SQL及索引优化 根据需求写出良好的SQL&#xff0c;并创建有效的索引&#xff0c;实现某一种需求可以多种写法&#xff0c;这时候我们就要选择一种效率最高的写法。这个时候就要了解sql优化 B、数据库表结构优化 根据数据库的范式&#xff0c;设计表结构&…

mac电脑搭建vue环境(上篇)

第一步&#xff1a;mac电脑要有homebrew&#xff0c;如何安装homebrew 点击下方 MAC安装homebrew-CSDN博客 第二步&#xff1a;homebrew安装node.js 第三步&#xff1a;安装npm 第四步&#xff1a;安装webpack 第五步&#xff1a;安装vue脚手架 第六步&#xff1a;可以在…

翻译《The Old New Thing》 - Some reasons not to do anything scary in your DllMain

Some reasons not to do anything scary in your DllMain - The Old New Thing (microsoft.com)https://devblogs.microsoft.com/oldnewthing/20040127-00/?p40873 Raymond Chen 2004年01月27日 简介 这篇文章讨论了为什么不应该在DLL的DllMain函数中执行复杂的操作 正文 众所…

Java中的重写

package day34; ​ public class Father {String name;int age;public void 输出(){System.out.println("father");} } ​ package day34; ​ public class Son extends Father{Overridepublic void 输出() {System.out.println("son");} } ​ package d…

C++:构造函数和析构函数

一、构造函数 1.1概念 构造函数是一个特殊的成员函数&#xff0c;名字与类相同&#xff0c;创建类类型对象时由编译器自动调用&#xff0c;保证每个数据成员都由一个合适的初始值。在对象的生命周期内只调用一次。 不使用构造函数 #include<iostream> using namespac…