マルチスレッド

マルチスレッドの例

C++

#include <thread>

void thread1() {
  printf("thread1\n");
}

void thread2() {
  printf("thread2\n");
}

int main() {
  std::thread th1(thread1);
  std::thread th2(thread2);
  // joinを呼ぶと、そのスレッドが終了するまで現在のコンテクストが中断する
  th1.join();
  th2.join();
  return 0;
}

Python

import threading

def thread1():
    print("thread1")

def thread2():
    print("thread2")

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
print("start")
t1.start()
t2.start()
t1.join()
t2.join()
print("end")

グローバル変数を複数スレッドから書き込む失敗例

#include <thread>

int cnt = 0;

void thread1() {
  printf("thread1 start cnt = %d\n", cnt);
  for (int i=0; i<100000; i++) {
    cnt++;
  }
  printf("thread1 end cnt = %d\n", cnt);
}

void thread2() {
  printf("thread2 start cnt = %d\n", cnt);
  for (int i=0; i<100000; i++) {
    cnt++;
  }
  printf("thread2 end cnt = %d\n", cnt);
}

int main() {
  std::thread th1(thread1);
  std::thread th2(thread2);
  // joinを呼ぶと、そのスレッドが終了するまで現在のコンテクストが中断する
  th1.join();
  th2.join();
  return 0;
}

出力例

thread1 start cnt = 0
thread2 start cnt = 422
thread2 end cnt = 77428
thread2 end cnt = 115025

排他処理を入れてグローバル変数を更新する例

C++

#include <thread>
#include <mutex>

int cnt = 0;
std::mutex mtx;

void add_count() {
  std::lock_guard<std::mutex> lock(mtx);
  cnt++;
}

void thread1() {
  printf("thread1 start cnt = %d\n", cnt);
  for (int i=0; i<100000; i++) {
    add_count();
  }
  printf("thread1 end cnt = %d\n", cnt);
}

void thread2() {
  printf("thread2 start cnt = %d\n", cnt);
  for (int i=0; i<100000; i++) {
    add_count();
  }
  printf("thread2 end cnt = %d\n", cnt);
}

int main() {
  std::thread th1(thread1);
  std::thread th2(thread2);
  // joinを呼ぶと、そのスレッドが終了するまで現在のコンテクストが中断する
  th1.join();
  th2.join();
  return 0;
}

出力例

thread1 start cnt = 0
thread2 start cnt = 0
thread2 end cnt = 195262
thread1 end cnt = 200000

最終的にグローバル変数が意図通りの値になっている。

Python

import threading
import time

lock = threading.Lock()
value = 0

def thread(n):
    lock.acquire()
    global value
    value = value + n
    value = value - n
    lock.release()

print(f"start value={value}")
ts = []
N = 4
for i in range(N):
    t = threading.Thread(target=thread, args=(i,))
    t.start()
    ts.append(t)

for i in range(N):
    ts[i].join()

print(f"end value={value}")

セマフォ

import threading
import time

# Semaphoreというオブジェクトもある                                                                                          
# Semaphore: acquireせずにreleaseして残セマフォ数を初期値以上に増やすことができる                                            
# BoundedSemaphore: qcquireしないとreleaseできない                                                                           
s = threading.BoundedSemaphore(2)

def thread(n):
    s.acquire()
    time.sleep(5)
    print(f"thread: {n}")
    s.release()

# こちらの獲得方法も可能                                                                                                     
def thread2(n):
    with s:
        time.sleep(5)
        print(f'thread2: {n}')

print("start")
for i in range(10):
    t = threading.Thread(target=thread, args=(i,))
    t.start()
print("end")

とやると、文字が2つずつ画面に出力される。

実行されるタイミングの確認

std::threadをコールした段階でマルチスレッドが立ち上がりスレッド処理が始まる。joinをコールした段階で開始するわけではないことに注意。

#include <thread>
#include <mutex>
#include <unistd.h>

int cnt = 0;
std::mutex mtx;

void add_count() {
  std::lock_guard<std::mutex> lock(mtx);
  cnt++;
}

void thread1() {
  printf("thread1 start cnt = %d\n", cnt);
  for (int i=0; i<100000; i++) {
    add_count();
  }
  printf("thread1 end cnt = %d\n", cnt);
}

void thread2() {
  printf("thread2 start cnt = %d\n", cnt);
  for (int i=0; i<100000; i++) {
    add_count();
  }
  printf("thread2 end cnt = %d\n", cnt);
}

int main() {
  std::thread th1(thread1);
  printf("cnt before sleep = %d\n", cnt);
  usleep(1000000); // 1 sec
  printf("cnt after sleep = %d\n", cnt);
  std::thread th2(thread2);
  // joinを呼ぶと、そのスレッドが終了するまで現在のコンテクストが中断する
  th1.join();
  th2.join();
  return 0;
}

出力例

cnt before sleep = 0
thread1 start cnt = 0
thread1 end cnt = 100000
cnt after sleep = 100000
thread2 start cnt = 100000
thread2 end cnt = 200000

usleep直後の段階ではまだth1.join()は呼んでいないが、thread1の関数の処理が実行完了していることがわかる。

マルチスレッドで実行する関数の返り値を設定する

PythonではThreadを継承したクラスを作りrunメソッドをオーバーロードすればよい

import threading
import time

def myfunc(n):
    ans = 0
    for i in range(n):
        ans += i
    return ans

class MyThread(threading.Thread):
    def __init__(self, func, args):
    # 親クラスのコンストラクタを呼ぶことを忘れない                                                     
        super().__init__()
        self.func = func
        self.args = args

    def run(self):
        self.result = self.func(*self.args)

    def get(self):
        return self.result

t1 = MyThread(myfunc, args=(10,))
t2 = MyThread(myfunc, args=(100,))
t1.start()
t2.start()
t1.join()
t2.join()

print(f"result1={t1.get()}")
print(f"result2={t2.get()}")

参考

Pythonマルチスレッドの戻り値を取得する2つの方法 - JPDEBUG.COM

デッドロック

Pythonでのデッドロックの例

import threading
import time

l1 = threading.Lock()
l2 = threading.Lock()

def func1():
    l1.acquire()
    time.sleep(2) # 時間のかかる処理を模擬                                                                                   
    g1 = 10
    l2.acquire()
    g2 = 10
    l2.release()
    l1.release()

def func2():
    l2.acquire()
    time.sleep(2) # 時間のかかる処理を模擬                                                                                   
    g2 = 10
    l1.acquire()
    g1 = 10
    l1.release()
    l2.release()

t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()
t1.join()
t2.join()

デッドロック対策

  • プロセスが利用するリソースをまとめて1つのロックで管理するようにし、実行開始前にリソース全体のロックを取得する。これによりリソースの利用率は低下する。
  • リソースのロックをかける順番を揃える。

参考

デッドロックと回避策

コメント

  • 複数スレッドから同じ変数の値を書き出す時だけでなく、読み込む時にむ排他処理を入れた方が良い。読み込み中に他スレッドによって値が更新されると値がundefinedになることがある。

参考

Pythonのthreadingとmultiprocessingを完全理解 - Qiita