1. 多線程模塊#
python 中主要用 threading 模塊提供對線程的支持
threading 模塊#
threading 模塊常用函數
- threading.current_thread (): 返回當前的線程對象。
- threading.enumerate (): 返回一個包含正在運行的線程的 list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
- threading.active_count (): 返回正在運行的線程數量,與 len (threading.enumerate ()) 有相同的結果。
Thread 類#
通過 threading.Thread () 創建線程對象
主要參數:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group 默認為 None,為了日後擴展 ThreadGroup 類實現而保留。
- target 是用於 run () 方法調用的可調用對象。默認是 None,表示不需要調用任何方法。
- name 是線程名稱。默認情況下,由 "Thread-N" 格式構成一個唯一的名稱,其中 N 是小的十進制數
- args 是用於調用目標函數的參數元組。默認是 ()
- kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}。
- daemon表示線程是不是守護線程。
Thread 類常用方法與屬性
- run(): 用以表示線程活動的方法。
- start(): 啟動線程活動。
- join(*timeout=None*): 等待至線程中止。這阻塞調用線程直至線程的 join () 方法被調用中止 - 正常退出或者拋出未處理的異常 - 或者是可選的超時發生。
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
- name:線程對象名字
- setDaemon():設置是否為守護線程
2. GIL 鎖#
GIL, 全局解釋器鎖(Global Interpreter Lock),是 CPython 解釋器特有一個玩意,讓一個進程中在同一個時刻只能有一個線程可以被 CPU 調用。
如果程序想利用 計算機的多核優勢,讓 CPU 同時處理一些任務,適合用多進程開發(即使資源開銷大)。
如果程序不利用 計算機的多核優勢,適合用多線程開發。
常見的程序開發中,計算操作需要使用 CPU 多核優勢,IO 操作不需要利用 CPU 的多核優勢,所以,就有這一句話:
- 計算密集型,用多進程,例如:大量的數據計算【累加計算示例】。
- IO 密集型,用多線程,例如:文件讀寫、網絡數據傳輸【下載抖音視頻示例】。
串行處理:
import time
start = time.time()
result = 0
for i in range(100000000):
result += i
print(result)
end = time.time()
print("耗時:", end - start)
多線程處理:
import time
import multiprocessing
def task(start, end, queue):
result = 0
for i in range(start, end):
result += i
queue.put(result)
if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()
p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
p2.start()
v1 = queue.get(block=True) #阻塞
v2 = queue.get(block=True) #阻塞
print(v1 + v2)
end_time = time.time()
print("耗時:", end_time - start_time)
多線程與多進程結合使用:
import multiprocessing
import threading
def thread_task():
pass
def task(start, end):
t1 = threading.Thread(target=thread_task)
t1.start()
t2 = threading.Thread(target=thread_task)
t2.start()
t3 = threading.Thread(target=thread_task)
t3.start()
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task, args=(0, 50000000))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
p2.start()
3. 多線程開發#
import threading
def task(arg):
pass
# 創建一個Thread對象(線程),並封裝線程被CPU調度時應該執行的任務和相關參數。
t = threading.Thread(target=task,args=('xxx',))
# 線程準備就緒(等待CPU調度),代碼繼續向下執行。
t.start()
print("繼續執行...") # 主線程執行完所有代碼,不結束(等待子線程)
線程的常用方法#
t.start()
,當前線程準備就緒(等待 CPU 調度,具體時間是由 CPU 來決定)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)
t.join()
,等待當前線程的任務執行完畢後再向下繼續執行
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
t.join() # 主線程等待中...
print(number)
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1線程執行完畢,才繼續往後走
t2.start()
t2.join() # t2線程執行完畢,才繼續往後走
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1線程執行完畢,才繼續往後走
t2.join() # t2線程執行完畢,才繼續往後走
print(number)
t.setDaemon(布爾值)
,守護線程(必須放在 start 之前)t.setDaemon(True)
,設置為守護線程,主線程執行完畢後,子線程也自動關閉。t.setDaemon(False)
,設置為非守護線程,主線程等待子線程,子線程執行完畢後,主線程才結束。(默認)
import threading
import time
def task(arg):
time.sleep(5)
print('任務')
t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()
print('END')
- 線程名稱的設置和獲取
import threading
def task(arg):
# 獲取當前執行此代碼的線程
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('日魔-{}'.format(i))
t.start()
- 自定義線程類,直接將線程需要做的事寫到 run 方法中
import threading
class MyThread(threading.Thread):
def run(self):
print('執行此線程', self._args)
t = MyThread(args=(100,))
t.start()
4. 線程鎖#
多個線程操作全局變量的時候,如果一個線程對全局變量操作分幾個步驟,當還沒有得到最後結果時,這個線程就被撤下 CPU,使用其他線程繼續上 CPU 操作,最後就會搞亂全局變量數據
import time, threading
balance = 0
def change_it(n):
# 先存後取,結果應該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(1000):
change_it(n)
#這裡重複實驗100次
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
# 先存後取,結果應該為0:
global balance
#獲取鎖,用於線程同步
lock.acquire()
balance = balance + n
balance = balance - n
# 釋放鎖,開啟下一個線程
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
#這裡重複實驗100次
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
在程序中如果想要自己手動加鎖,一般有兩種:Lock 和 RLock
- Lock,同步鎖。(不支持鎖的嵌套)
import threading
num = 0
lock_object = threading.Lock()
def task():
print("開始")
lock_object.acquire() # 第1個抵達的線程進入並上鎖,其他線程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 線程出去,並解開鎖,其他線程就可以進入並執行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
- RLock,遞歸鎖。(支持鎖的嵌套)
import threading
num = 0
lock_object = threading.RLock()
def task():
print("開始")
lock_object.acquire() # 第1個抵達的線程進入並上鎖,其他線程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 線程出去,並解開鎖,其他線程就可以進入並執行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
RLock 支持多次申請鎖和多次釋放;Lock 不支持
import threading
lock = threading.RLock()
# 程序員A開發了一個函數,函數可以被其他開發者調用,內部需要基於鎖保證數據安全。
def func():
with lock:
pass
# 程序員B開發了一個函數,可以直接調用這個函數。
def run():
print("其他功能")
func() # 調用程序員A寫的func函數,內部用到了鎖。
print("其他功能")
# 程序員C開發了一個函數,自己需要加鎖,同時也需要調用func函數。
def process():
with lock:
print("其他功能")
func() # ----------------> 此時就會出現多次鎖的情況,只有RLock支持(Lock不支持)。
5. 隊列(Queue)#
隊列可以看作事件同步與條件同步的簡單版,線程 A 可以往隊列裡面存數據,線程 B 則可以從隊列裡面取,不用關心同步以及鎖的問題,主要方法有:
- put (): 向隊列中添加一個項
- get (): 從隊列中刪除並返回一個項
- task_done (): 當某一項任務完成時調用,表示前面排隊的任務已經被完成
- join (): 阻塞直到所有的項目都被處理完
import threading
import queue
import random
import time
class Producer(threading.Thread):
def __init__(self, queue):
super(Producer,self).__init__()
self.queue = queue
def run(self):
while True:
integer = random.randint(0, 1000)
self.queue.put(integer)
print('%d put to queue by %s' % (integer, self.name))
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
super(Consumer, self).__init__()
self.queue = queue
def run(self):
while True:
integer = self.queue.get()
print('%d popped from list by %s' % (integer, self.name))
self.queue.task_done()
if __name__ == '__main__':
queue = queue.Queue()
t1 = Producer(queue)
t2 = Consumer(queue)
t1.start()
t2.start()
t1.join()
t2.join()