Make組ブログ

Python、Webアプリや製品・サービス開発についてhirokikyが書きます。

Pythonのaiohttpでストリーム(aiohttp.web_ws.WebSocketResponse)をPipeする

最近Pythonの非同期処理、asyncioを使ったプログラムを書いています。 今までは非同期だとNode.jsを使っていたんですが、aiohttpや周辺ライブラリーが揃ってきたようなので使っています。

Node.jsの場合、Streamは stream.pipe(other_stream) のようにPipeできるのですが、それをaiohttpの WebSocketResponse でやる方法を書きます。

Server Reference — aiohttp 3.4.4 documentation

asyncioの Streams でも要領は同じなので参考になると思います。

(注意: Pythonのasyncioやaiohttpはまだ新しいものなので、情報が古くなってないか気をつけてください)

Pipeを1回やる(一方通行の)例

1つのストリームから読み込んで、もう片方のストリームに書き込むのは簡単です。

async for msg in ws:
    await to_ws.send_str(msg.data)

WebSocketでアクセスを受け付けて、バックエンドの別のWebSocketにつなぎ込むaiohttpのView関数を考えると、以下のようになります。

from aiohttp import web

routes = web.RouteTableDef()


@routes.post('...')
async def websocket_gateway(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    session = aiohttp.ClientSession()
    to_ws = await session.ws_connect('...')

    async for msg in ws:
        await to_ws.send_str(msg.data)

    await ws.close()
    await to_ws.close()

この場合、サーバー側のWebSocket ws がクローズすると処理が終了します。

他にも書き込み先のWebSocket to_ws が先にクローズした場合の処理や、 msg.data がバイト列の場合に to_ws.send_bytes にする処理なども必要そうです。

相互にPipeする例

上記の例ではWebSocketは一方通行にしか仲介されていません。ここで、 to_ws からの入力も、 ws に返すようにしましょう。 その場合、 2つの非同期処理を同時に実行する必要があります 。 1つの while ループの中で2つのストリームの読み書きをしても、片方が書き込まないと、もう片方の入力が受け取れないようになるので難しいです。 以下のように別々のループをして、それを asyncio.gather するとうまくいきます。

import asyncio

from aiohttp import web
from aiohttp.http import WSMsgType

WS_CLOSE_TYPES = (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED)

routes = web.RouteTableDef()


async def pipe(f, t):
    while not f.closed and not f.closed:
        msg = await f.receive()
        if msg.type in WS_CLOSE_TYPES:
            break

        await t.send_str(msg.data)
    return


@routes.post('...')
async def websocket_gateway(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    session = aiohttp.ClientSession()
    to_ws = await session.ws_connect('...')

    await asyncio.gather(
        pipe(ws, cs),
        pipe(cs, ws)
    )

    await ws.close()
    await to_ws.close()

asyncio.gather を使うことで、2つの処理を同時に実行して、両方が終了するのを待ってくれます。 優秀なやつですが、今回は片方のWebSocketが終わったら両方終了してほしいので、 pipe 関数内に工夫をしています。

pipe 関数の中で async for msg in ws せずに while ループを使っているのは、両方のWebSocketがクローズしていない限り処理を続けるようにするためです。 そうしないと、片方のWebSocketがクローズしてるのに、反対のWebSocketから書き込まれようとしてエラーになるからです。

こんな感じで、 asyncioaiohttp はかなり刺激的で面白いのでぜひ試してみてください。 誰得情報かもしれませんが、 aiohttpasyncio についての情報を出していけたらなと思います。