Pythonをとりまく並行/非同期の話

tell-k
Pycon JP 2017

で、誰?

https://pbs.twimg.com/profile_images/775582814871752704/J1TaucBz_400x400.jpg https://dl.dropboxusercontent.com/spa/ghyn87yb4ejn5yy/vxjmiemo.png

Beproud.inc - connpass

PyQ - Pythonオンライン学習サービス

PyQ - 機械学習はじめました

_images/nishio.png

目的/動機

対象

目標

https://pbs.twimg.com/media/CTBnrdoUcAACGqL.jpg

前提

目次

  • 並行/並列処理とは?
  • 負荷の種類
  • マルチスレッド/マルチプロセス
  • コルーチン/イベントループ/I/O多重化
  • Pythonの非同期ライブラリ
  • asyncioの話

並行/並列処理とは?

並行/並列処理とは?

並行/並列処理とは?

雑に言うと

負荷の種類

負荷の種類

  • 数値計算のようにCPUを使い続けるような処理 … CPUバウンド
  • ファイルの読み書き、DBへの接続、ネットワーク通信 … I/Oバウンド

concurrent.futures

concurrent.futures

  • マルチスレッドを扱う ThreadPoolExecuter
  • マルチプロセスを扱う ProcessPoolExecuter

ThreadPoolExecuter

例えば

というの考える

ThreadPoolExecuter - I/Oバウンドの例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ThreadPoolExecutor, as_completed

def busy_io(index):
    # 巨大なファイルを読み込む
    with open('large{}.txt'.format(index)) as fp:
        content = fp.read()
    return 'Finish thread{}'.format(index)

if __name__ == '__main__':
    futures = []
    worker_num = 3  # ワーカースレッド数
    task_num = 6    # 実行タスク数

    with ThreadPoolExecutor(worker_num) as executer:
        for index in range(task_num):
            futures.append(executer.submit(busy_io, index))

    # 実行が終わったものから結果を表示
    for x in as_completed(futures):
        print(x.result())

ThreadPoolExecuter - I/Oバウンドの例

実行するとどうなるか?

ThreadPoolExecuter - I/Oバウンドの例

_images/multithred_iobound.png

ThreadPoolExecuter - CPUバウンドの例

例えば

というのを考える

ThreadPoolExecuter - CPUバウンドの例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor, as_completed

def busy_cpu(index):
    # 大きめの計算処理をする
    sum(range(10**8))
    return 'Finish thread{}'.format(index)

if __name__ == '__main__':
    futures = []
    worker_num = 3  # ワーカースレッド数
    task_num = 6    # 実行タスク数

    with ThreadPoolExecutor(worker_num) as executer:
        for index in range(task_num):
            futures.append(executer.submit(busy_cpu, index))

    # 実行が終わったのものから結果を表示
    for x in as_completed(futures):
        print(x.result())

ThreadPoolExecuter - CPUバウンドの例

実行するとどうなるか?

なぜ?

結果的に

ThreadPoolExecuter - CPUバウンドの例

_images/multithred_cpubound.png

GILとは?

なぜ制限あるのか?

ちなみに

GIL解放なんてできるの?

注釈
GIL を解放するのはほとんどがシステムのI/O関数を呼び出す時ですが、メモリバッファに対する圧縮や
暗号化のように、Pythonのオブジェクトにアクセスしない長時間かかる計算処理を呼び出すときもGIL
を解放することは有益です。例えば、 zlib や hashlib モジュールは圧縮やハッシュ計算の前にGILを解放します。

CPUバウンドな処理を並列にするには?

注意点

1
2
3
# コア数を確かめる時は?
import os
os.cpu_count()

ProcessPoolExecuter - CPUバウンドな例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ProcessPoolExecutor, as_completed

def busy_cpu(index):
    # 大きめの計算処理をする
    sum(range(10**8))
    return 'Finish process{}'.format(index)

if __name__ == '__main__':
    futures = []
    worker_num = 3  # ワーカープロセス数
    task_num = 6    # 実行タスク数

    # ProcessPoolExecuterに変えるだけで良い
    with ProcesssPoolExecutor(worker_num) as executer:
        for index in range(task_num):
            futures.append(executer.submit(busy_cpu, index))

    # 実行が終わったのから結果を表示
    for x in as_completed(futures):
        print(x.result())

ProcessPoolExecuter - CPUバウンド

_images/multiprocess_cpubound.png

ProcessPoolExecuter - CPUバウンド

もしCPUが2コアしかなかったら?

_images/multiprocess_out_of_core.png

マルチスレッド - メリット・デメリット

  • I/Oバウンドな処理には効果的
  • プロセスに比べて起動コストが低い
  • GILがあるためCPUバウンドな処理には不向き
  • スレッド間でのデータ競合を気をつけなければいけない

マルプロセス - メリット・デメリット

  • GILの影響を受けずに並列に処理することができる
  • メモリ空間を別プロセスと共有しない
  • CPUのコア数を超えて並列化はできない
  • スレッドよりもメモリを消費する
  • プロセス間通信しないとデータを受け渡せない

シングルスレッドでの処理効率の要求

非同期I/O

非同期I/O

  • イベントループ
  • I/O多重化
  • コルーチン

イベントループ

どんな事をするのか?

どんなのがイベント?

イベントループ

_images/event_loop.png

I/O多重化

主にイベントループのライブラリが内部で

コルーチン

ジェネレーターベースのコルーチン

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def coro():
    world = yield "Hello"
    yield world

c = coro()
print(c) # => <generator object coro at 0x10f2df620>

print(next(c))  # => Hello

print(c.send('World')) # => World

非同期ライブラリ

ライブラリ名 イベントループ コルーチン
gevent libev greentlet
Twisted reactor inlineCallbacks
Tornado tornado.gen.coroutine tornado.ioloop

たくさんフレンズがいるのは良いことだが・・・

_images/friends.png

asyncio

async/await

Python 3.4 以前

@asyncio.coroutine
def hello_world():
    print("Hello World!")
    yield from asyncio.sleep(1)

Python 3.5 以降

async def hello_world():
    print("Hello World!")
    await asyncio.sleep(1)

asyncio 挙動の確認

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

asyncio 挙動の確認

https://docs.python.org/ja/3/_images/tulip_coro.png

https://docs.python.org/3/library/asyncio-task.html#example-chain-coroutines

asyncioへの対応(1)

普段に使うようなライブラリの置き換え

asyncioへの対応(2)

非同期系だと

asyncioへの対応(3)

1
2
3
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

まとめ - 並行/並列処理

まとめ - 非同期I/O

感謝

忙しい中、レビューしてくださった。@shimizukawaさん、@crochacoさん、@mahataさん、@kamekoさん ありがとうございました。

また参考にさせていただいた資料、本の著者の皆さま。本当ににありがとうございました。

ご静聴ありがとうございました