コルーチンは怖くない

Posted by rhoboro on 2019-02-09

コルーチンの基本的なアイディアを正確さより雰囲気重視で解説してみました。 一応コルーチンやイベントループによる非同期処理に関しては概念はPythonに限った話ではないのでNode.jsなどでも同じ理解でいいはず。。。

下記のサンプルを全部動かすにはPython 3.6以降を使ってください。(ローカルにない場合はdocker run --rm -it python:3.7とかでもOK)

コルーチンとは

サブルーチンの上位(?)概念です。 サブルーチン(いわゆる関数)はエントリポイントが一つで、一度呼び出したら値が返されるまで一気に動きます。 これに対しコルーチンはエントリポイントが複数箇所あるため、一度呼び出されても中断でき、その場所からまた再開できるというものです。

async/awaitとネイティブコルーチン

Python 3.5でasync/await構文とともに導入されたネイティブコルーチンの主な目的はイベントループを使った非同期処理を行うことです。

ちなみに「ネイティブコルーチン」呼んでいるのは「ジェネレータベースのコルーチン」というものがPythonには古くからあるためです。 @asyncio.coroutineをつけるとネイティブコルーチンと相互利用が可能ですが、ジェネレータベースのコルーチンはPython 3.10でなくなる予定です。

したがって、以降のコルーチンはネイティブコルーチンを指します。

最小のコルーチン

コルーチンはasync defで定義します。 次のf()async def f():と宣言しているのでこれはコルーチンです。 ただし、中断しないので事実上ただの関数と同じです。

async def f(n):
  return n

イベントループを使った非同期処理ではコルーチンは次のように利用されます。 まずイベントループを取得し、そのイベントループに対してコルーチンを渡します。

def main():
  loop = asyncio.get_event_loop()
  v = loop.run_until_complete(f(1))  # ループにコルーチンを渡す
  return v

それでは実行してみます。 動きもただの関数と同じように見えます。

>>> import asyncio
>>> main()
1

時間のかかる処理を追加する

冒頭で「コルーチンはエントリポイントが複数箇所あるため、一度呼び出されても中断でき、その場所からまた再開できる。」と書きました。 どういう時に中断させるかというとネットワークI/Oのような時間のかかる処理です。これはすなわちCPUが待機状態になる時と同義です。

以下では先ほどの例にCPUの待機が発生する処理を少し追加しました。 awaitが「この行で時間のかかる処理をします。」という指示になっています。 つまり、awaitのある行が中断ポイントかつエントリ(再開)ポイントになります。

async def get_content(n):
  """ネットワークI/Oなどの時間のかかる処理の代わり。実際にはHTTPリクエストなど。"""
  await asyncio.sleep(3)  # この行でget_content()は中断される
  return n + 1

async def f(n):
  content = await get_content(n)  # この行でf()は中断される
  return content
>>> main()  # 3秒後に出力される
2

待機してるCPUに他の処理をさせる

ここまででawaitの行で待機が発生することを伝えることができました。 しかし、CPUをただ待機させておくのはリソースが勿体無いです。 そこで「待ち時間が発生するならその間に他の処理をさせてればいいんじゃない?」という考えが浮かびます。 これがイベントループによる非同期処理の基本的なアイディアです。

前項では1つの処理しかなかったのでCPUは待機中に何もせず待っていました。 そこでやって欲しいことを複数登録して実行してみます。 awaitの行の実行には時間がかかるため、awaitの行に到達するとそのコルーチンの処理はそこで一度中断されます。

この時イベントループはCPUの手があいたと判断し、その時実行可能なコルーチンの処理を開始し(再開させ)ます。

async def get_content(n):
  print(f'start {n}')
  await asyncio.sleep(random.randint(1, 5))  # レスポンスタイムが違うことをシミュレート
  print(f'end {n}')
  return n

async def f(n):
  tasks = (
    asyncio.ensure_future(get_content('a')),
    asyncio.ensure_future(get_content('b')),
    asyncio.ensure_future(get_content('c')),
  )
  return n, await asyncio.gather(*tasks)  # asyncio.gather()の戻り値の順番は保証される
>>> import random
>>> main()
start a
start b
start c
end b
end a
end c
(1, ['a', 'b', 'c'])

ここでは最初のget_content('a')await asyncio...の行に到達すると時間のかかる処理だと判断され、処理がget_content('b')に移ります。 同様にget_content('c')await asyncio...の行まで進み中断されますが、この時は即座に実行できる処理がないためしばらく待機状態となります。

しばらくしてsleep()が終わると再開が可能になるため、中断されていた処理は再開可能になったものから順に処理を再開されます。

このようにイベントループとコルーチンを使った非同期処理では、1度に実行されているのは常に1行だけなので並列処理はしていません。 それでもこのmain()の呼び出しは最大でも5秒程度で終わります。

より実践的なサンプル

ここまでのことを踏まえて作ったサンプルプログラムをgistに置いています。 このプログラムはURLのリストを受け取り、非同期処理でリクエストを投げつつ、レスポンスに任意の加工をしてくれるものです。 aiohttpで同時接続数を制限するサンプルとしても利用できると思います。

まとめ

イベントループによる非同期処理はプロセスやスレッドを増やすわけではないので効率的です。便利ですね。 ただし、効果があるのはネットワークI/OなどCPUに依存しない処理という点は覚えておきましょう。 つっこみとかあればTwitterとかで@rhoboroにメンションしていただければ。