PythonでコールバックなAPIをasync/awaitなAPIにする方法

Posted by rhoboro on 2023-12-22

この記事は RevComm Advent Calendar 2023 の22日目の記事です。 21日目の記事は宇佐美さんの「本番リリースを週一から随時に変えるためにやったこと」でした。

コールバックなAPIとasync/awaitなAPI

Pythonで非同期I/Oを使ったプログラムを書く場合、asyncio1を使うことが多いと思います。 asyncioを始めとするネイティブコルーチンを使うライブラリを使う場合、async/awaitキーワードを使いながらコードを書きます。 async/awaitキーワードを使うと、非同期処理を使ったロジックであっても同期処理に近い見た目で書けるため、コールバックAPIを使って非同期処理を利用する場合と比較して可読性を保ちやすいです。 

視点を変えると、コールバックAPIしか提供されていない処理であっても自分でasync/awaitキーワードに対応したAPIにラップすると、より可読性の高いコードにできるでしょう。

asyncioのUDP通信を題材に実践

この記事ではPython 3.12で動作確認しています。

asyncioはTCPやUDPなどの低レベルなプロトコルに限定してネットワーク通信機能を提供しています。2 このうち、TCPに関しては高レベルAPIとしてasyncio.open_connection()が提供されており、この関数からはasync/awaitで使いやすいStreamReaderStreamWriterが返されます。

一方、UDPに関してはそのような高レベルAPIは提供されていません。 UDPで提供されているのはasyncio.open_connection()のベースになっているようなトランスポートとプロトコルを使ったコールバックスタイルのAPIのみです。

下記はPythonの公式ドキュメントにあるUDPエコークライアントの実装例です。 asyncioにおけるトランスポートやプロトコルの詳細を知らない方でも、コールバックスタイルになっていることは何となくわかると思います。

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

実装するAPIのイメージ

この記事では上のコードと同等の処理をasync/awaitを使って実装できるようにするudpモジュールを実装します。 完成後のudpモジュールを使うと、次のようにwriter.sendto()でデータを送信でき、await reader.recv()で受信したデータを受け取れます。 コードは下記のGistで公開しています。 https://gist.github.com/rhoboro/64ab3863562032028c7a6e6e14a043e8

import asyncio

from udp import DatagramReader, DatagramWriter, create_udp_client


async def listen_forever(
    writer: DatagramWriter,
    reader: DatagramReader,
) -> None:
    while not writer.is_closing():
        data = await reader.recv(2048)
        print("Received:", data.decode())

async def main() -> None:
    reader, writer = await create_udp_client("127.0.0.1", 9999)
    listen_task = asyncio.create_task(listen_forever(writer, reader))

    writer.sendto(b"Hello World!")
    await asyncio.sleep(0.1)
    writer.close()
    try:
        await listen_task
    except asyncio.CancelledError:
        pass


if __name__ == "__main__":
    asyncio.run(main())

なお、writer.sendto()の実体となるDatagramTransport.sendto()はデータをバッファーし、非同期に送信する準備を行うメソッドでブロックされません。3 したがって、以降はawait reader.recv()のように使われるDatagramReader.recv()に焦点を当てます。

udpモジュール実装

DatagramReader.recv()の実装は次のようになっています。

class DatagramReader:
    ...
    async def recv(self, n: int) -> bytes:
        if self._buffer:
            data = bytes(memoryview(self._buffer)[:n])
            del self._buffer[:n]
            return data

        await self._wait_for_data()
        data = bytes(memoryview(self._buffer)[:n])
        del self._buffer[:n]
        return data

self._bufferにすでに受信済みのデータがある場合は即座にそれを返し、受信済みのデータがない場合は次のデータが到着してself._bufferに格納されるまでawait self._wait_for_data()の行で待機します。 つまり、次のデータを受信したタイミングでは「データをself._bufferに格納し、self._wait_for_data()の処理を完了させ」れば良いわけです。

つづいて、self._wait_for_data()を見ていきます。 self._wait_for_data()ではasyncio.Futureオブジェクトであるself._waiterを作成し、await self._waiterで完了するまで待機しています。 これを完了させるのはself._wakeup_waiter()の役目です。

class DatagramReader:
    ...
    async def _wait_for_data(self) -> None:
        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

    def _wakeup_waiter(self) -> None:
        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            if not waiter.cancelled():
                waiter.set_result(True)

Futureは非同期処理の最終的な結果を表現するもので、Pythonに限らず非同期処理の文脈でよく出てくる概念のひとつです。 Pythonのasyncio.Futureはawaitableなオブジェクトになっており、await式を使うとFuture.set_result()によって結果がセットされ完了状態になるまで待機できます。4

ここで重要な点は、_wakeup_waiter()は通常のメソッドであり、コルーチンではない点です。 つまり、このメソッドはプロトコルに実装された(コルーチンではない)コールバック関数DatagramProtocol.datagram_received()からでも直接呼び出せます。

class DatagramReaderProtocol(DatagramProtocol):

    def __init__(
        self,
        reader: "DatagramReader",
        loop: AbstractEventLoop
    ) -> None:
        self._loop = loop
        self._reader = reader
        self._transport = None

    def datagram_received(self, data, addr):
        self._reader.feed_data(data)
    ...


class DatagramReader:
    ...
    def feed_data(self, data: bytes) -> None:
        self._buffer.extend(data)
        self._wakeup_waiter()

DatagramProtocol.datagram_received()は、新しいデータを受信したタイミングで呼び出されるコールバック関数です。 先ほど説明したように、これが呼び出されたタイミングで「受信したデータをself._bufferに格納してからself._wait_for_data()を完了させる」処理を呼び出すと、await reader.recv()で待機していたコルーチンはデータを受け取り処理を再開できます。 データをself._bufferに格納し、self._wait_for_data()の処理を完了させれば良いわけです。 これでコールバックなAPIをasync/awaitなAPIに変換できました。

残りの部分を含む全体像に関してはGistを見ていただければと思います。 https://gist.github.com/rhoboro/64ab3863562032028c7a6e6e14a043e8

動作確認

動作確認も公式ドキュメントの例を使いました。

下記はPythonの公式ドキュメントにあるUDPエコーサーバーの実装例を動かした状態で、Gistにあるmain.pyを実行してみたときの画面です。

it_works_well.png

main.pyから送信した「Hello World!」をエコーサーバーで受信できています。 さらにエコーサーバーから返された同じ内容をmain.pyで受信できていることも確認できました。

今回のテーマの背景

冒頭にも記載していますが、この記事は RevComm Advent Calendar 2023 の22日目の記事になります。

わたしが所属しているRevCommでは音声データを扱った事業を展開しており、提供しているサービスのなかにはPythonと非同期I/Oをフル活用して実装しているクラウド型のIP電話システムもあります。

電話は一番単純な1対1の通話であってもセッション管理、音声データの送信、音声データの受信と3つの経路が必要です。 また、どちらからでも呼び出しや終了ができる必要があるなど、その制御は想像以上に複雑なものになります。 さらに、IP電話システムに欠かせないSIPと呼ばれるプロトコルではセッション確立のためにUDPも広く使われています。 このような背景から、UDPに関してもTCPのようにもう少し扱いやすいAPIが欲しくなり実装したものです。 UDPクライアントという題材自体はニッチなものかもしれませんが、コールバックなAPIをasync/awaitなAPIにすること自体は汎用的なパターンなのでどこかで誰かの参考になれば幸いです。


  1. asyncioと同等の機能を提供するライブラリにはAnyIOTRIOなどがあります 

  2. 例えば高レベルなプロトコルであるHTTPを使う場合はHTTPXaiohttpなどのライブラリを使うことが多いです 

  3. UDPは送信先にデータが届いたことを保証する必要がないため、StreamWriter.drain()に相当するメソッドは不要でしょう 

  4. future.set_result(value)を実行したときawait futureの戻り値はvalueとなります 

tags: Python