1. マルチスレッドモジュール#
Python では、主に threading モジュールがスレッドのサポートを提供します。
threading モジュール#
threading モジュールの一般的な関数
- threading.current_thread (): 現在のスレッドオブジェクトを返します。
- threading.enumerate (): 実行中のスレッドを含むリストを返します。実行中とは、スレッドが開始された後、終了する前を指し、開始前や終了後のスレッドは含まれません。
- threading.active_count (): 実行中のスレッドの数を返します。len (threading.enumerate ()) と同じ結果です。
Thread クラス#
threading.Thread () を使用してスレッドオブジェクトを作成します。
主なパラメータ:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group デフォルトは None で、将来的に ThreadGroup クラスを実装するために保持されています。
- target run () メソッドで呼び出される呼び出し可能オブジェクトです。デフォルトは None で、何も呼び出す必要がないことを示します。
- name スレッドの名前です。デフォルトでは、"Thread-N" 形式で一意の名前が構成され、N は小さな 10 進数です。
- args 目標関数を呼び出すための引数のタプルです。デフォルトは () です。
- kwargs 目標関数を呼び出すためのキーワード引数の辞書です。デフォルトは {} です。
- daemon スレッドがデーモンスレッドかどうかを示します。
Thread クラスの一般的なメソッドと属性
- run(): スレッドの活動を示すメソッドです。
- start(): スレッドの活動を開始します。
- join(*timeout=None*): スレッドが終了するまで待機します。これは、スレッドの join () メソッドが呼び出されて終了するまで、またはオプションのタイムアウトが発生するまで、呼び出しスレッドをブロックします。
- isAlive(): スレッドが活動中かどうかを返します。
- getName(): スレッド名を返します。
- setName(): スレッド名を設定します。
- name:スレッドオブジェクトの名前
- setDaemon():デーモンスレッドかどうかを設定します。
2. GIL ロック#
GIL、グローバルインタプリタロック(Global Interpreter Lock)は、CPython インタプリタ特有のもので、1 つのプロセス内で同時に CPU によって呼び出されるスレッドは 1 つだけです。
プログラムがコンピュータのマルチコアの利点を利用して、CPU が同時にいくつかのタスクを処理する場合は、マルチプロセス開発(リソースのオーバーヘッドが大きくても)を使用するのが適しています。
プログラムがコンピュータのマルチコアの利点を利用しない場合は、マルチスレッド開発が適しています。
一般的なプログラム開発では、計算操作は CPU のマルチコアの利点を利用する必要があり、IO 操作は CPU のマルチコアの利点を利用する必要がないため、次のようなことが言えます:
- 計算集約型はマルチプロセスを使用します。例:大量のデータ計算【累積計算の例】。
- IO 集約型はマルチスレッドを使用します。例:ファイルの読み書き、ネットワークデータ転送【TikTok 動画のダウンロード例】。
直列処理:
import time
start = time.time()
result = 0
for i in range(100000000):
result += i
print(result)
end = time.time()
print("時間:", end - start)
マルチスレッド処理:
import time
import multiprocessing
def task(start, end, queue):
result = 0
for i in range(start, end):
result += i
queue.put(result)
if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()
p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
p2.start()
v1 = queue.get(block=True) #ブロック
v2 = queue.get(block=True) #ブロック
print(v1 + v2)
end_time = time.time()
print("時間:", end_time - start_time)
マルチスレッドとマルチプロセスの組み合わせ使用:
import multiprocessing
import threading
def thread_task():
pass
def task(start, end):
t1 = threading.Thread(target=thread_task)
t1.start()
t2 = threading.Thread(target=thread_task)
t2.start()
t3 = threading.Thread(target=thread_task)
t3.start()
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task, args=(0, 50000000))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
p2.start()
3. マルチスレッド開発#
import threading
def task(arg):
pass
# Threadオブジェクト(スレッド)を作成し、CPUスケジュール時に実行すべきタスクと関連パラメータをカプセル化します。
t = threading.Thread(target=task,args=('xxx',))
# スレッドが準備完了(CPUスケジュール待機)、コードは続行します。
t.start()
print("続行中...") # メインスレッドがすべてのコードを実行し終わるまで終了しません(サブスレッドを待機)
スレッドの一般的なメソッド#
t.start()
、現在のスレッドが準備完了(CPU スケジュール待機、具体的な時間は CPU によって決定されます)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)
t.join()
、現在のスレッドのタスクが完了するまで待機してから続行します
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
t.join() # メインスレッドが待機中...
print(number)
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1スレッドが完了するまで、次に進みません
t2.start()
t2.join() # t2スレッドが完了するまで、次に進みません
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1スレッドが完了するまで、次に進みません
t2.join() # t2スレッドが完了するまで、次に進みません
print(number)
t.setDaemon(ブール値)
、デーモンスレッド(start の前に設定する必要があります)t.setDaemon(True)
、デーモンスレッドに設定し、メインスレッドが完了すると、サブスレッドも自動的に終了します。t.setDaemon(False)
、非デーモンスレッドに設定し、メインスレッドがサブスレッドを待機し、サブスレッドが完了した後にメインスレッドが終了します。(デフォルト)
import threading
import time
def task(arg):
time.sleep(5)
print('タスク')
t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()
print('END')
- スレッド名の設定と取得
import threading
def task(arg):
# 現在このコードを実行しているスレッドを取得
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('スレッド-{}'.format(i))
t.start()
- カスタムスレッドクラス、スレッドが行うべきことを run メソッドに直接書きます
import threading
class MyThread(threading.Thread):
def run(self):
print('このスレッドを実行中', self._args)
t = MyThread(args=(100,))
t.start()
4. スレッドロック#
複数のスレッドがグローバル変数を操作する場合、1 つのスレッドがグローバル変数を操作するのに複数のステップを要する場合、最終結果を得る前にそのスレッドが CPU から撤去され、他のスレッドが CPU を使用し続けると、最終的にグローバル変数のデータが混乱することがあります。
import time, threading
balance = 0
def change_it(n):
# 先に入金し、後に引き出す。結果は0であるべきです:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(1000):
change_it(n)
#ここで100回繰り返し実験します
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
# 先に入金し、後に引き出す。結果は0であるべきです:
global balance
#ロックを取得し、スレッドの同期を行います
lock.acquire()
balance = balance + n
balance = balance - n
# ロックを解放し、次のスレッドを開始します
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
#ここで100回繰り返し実験します
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
プログラム内で手動でロックを加えたい場合、一般的に 2 つの方法があります:Lock と RLock
- Lock、同期ロック。(ネストされたロックはサポートされていません)
import threading
num = 0
lock_object = threading.Lock()
def task():
print("開始")
lock_object.acquire() # 最初に到達したスレッドが入ってロックし、他のスレッドはここで待機する必要があります。
global num
for i in range(1000000):
num += 1
lock_object.release() # スレッドが出て、ロックを解放し、他のスレッドが入って実行できるようになります
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
- RLock、再帰ロック。(ネストされたロックをサポートします)
import threading
num = 0
lock_object = threading.RLock()
def task():
print("開始")
lock_object.acquire() # 最初に到達したスレッドが入ってロックし、他のスレッドはここで待機する必要があります。
global num
for i in range(1000000):
num += 1
lock_object.release() # スレッドが出て、ロックを解放し、他のスレッドが入って実行できるようになります
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
RLock は複数回のロック取得と解放をサポートしますが、Lock はサポートしません。
import threading
lock = threading.RLock()
# プログラマーAが関数を開発し、他の開発者が呼び出すことができ、内部でロックを使用してデータの安全性を保証します。
def func():
with lock:
pass
# プログラマーBが関数を開発し、この関数を直接呼び出すことができます。
def run():
print("他の機能")
func() # プログラマーAが書いたfunc関数を呼び出し、内部でロックを使用します。
print("他の機能")
# プログラマーCが関数を開発し、自分でロックを加え、同時にfunc関数を呼び出す必要があります。
def process():
with lock:
print("他の機能")
func() # ----------------> この時、複数回のロックが発生し、RLockのみがサポートします(Lockはサポートしません)。
5. キュー(Queue)#
キューはイベント同期と条件同期の簡易版と見なすことができ、スレッド A はキューにデータを格納し、スレッド B はキューからデータを取得することができ、同期やロックの問題を気にする必要がありません。主なメソッドは次の通りです:
- put (): キューに項目を追加します
- get (): キューから項目を削除して返します
- task_done (): 特定のタスクが完了したときに呼び出し、前のタスクが完了したことを示します
- join (): すべての項目が処理されるまでブロックします
import threading
import queue
import random
import time
class Producer(threading.Thread):
def __init__(self, queue):
super(Producer,self).__init__()
self.queue = queue
def run(self):
while True:
integer = random.randint(0, 1000)
self.queue.put(integer)
print('%d をキューに追加しました %s' % (integer, self.name))
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
super(Consumer, self).__init__()
self.queue = queue
def run(self):
while True:
integer = self.queue.get()
print('%d をリストから取り出しました %s' % (integer, self.name))
self.queue.task_done()
if __name__ == '__main__':
queue = queue.Queue()
t1 = Producer(queue)
t2 = Consumer(queue)
t1.start()
t2.start()
t1.join()
t2.join()