最近Pythonの非同期処理、asyncioを使ったプログラムを書いています。 今までは非同期だとNode.jsを使っていたんですが、aiohttpや周辺ライブラリーが揃ってきたようなので使っています。
Node.jsの場合、Streamは stream.pipe(other_stream)
のようにPipeできるのですが、それをaiohttpの WebSocketResponse でやる方法を書きます。
Server Reference — aiohttp 3.4.4 documentation
asyncioの Streams でも要領は同じなので参考になると思います。
(注意: Pythonのasyncioやaiohttpはまだ新しいものなので、情報が古くなってないか気をつけてください)
Pipeを1回やる(一方通行の)例
1つのストリームから読み込んで、もう片方のストリームに書き込むのは簡単です。
async for msg in ws: await to_ws.send_str(msg.data)
WebSocketでアクセスを受け付けて、バックエンドの別のWebSocketにつなぎ込むaiohttpのView関数を考えると、以下のようになります。
from aiohttp import web routes = web.RouteTableDef() @routes.post('...') async def websocket_gateway(request): ws = web.WebSocketResponse() await ws.prepare(request) session = aiohttp.ClientSession() to_ws = await session.ws_connect('...') async for msg in ws: await to_ws.send_str(msg.data) await ws.close() await to_ws.close()
この場合、サーバー側のWebSocket ws
がクローズすると処理が終了します。
他にも書き込み先のWebSocket to_ws
が先にクローズした場合の処理や、 msg.data
がバイト列の場合に to_ws.send_bytes
にする処理なども必要そうです。
相互にPipeする例
上記の例ではWebSocketは一方通行にしか仲介されていません。ここで、 to_ws
からの入力も、 ws
に返すようにしましょう。
その場合、 2つの非同期処理を同時に実行する必要があります 。
1つの while
ループの中で2つのストリームの読み書きをしても、片方が書き込まないと、もう片方の入力が受け取れないようになるので難しいです。
以下のように別々のループをして、それを asyncio.gather
するとうまくいきます。
import asyncio from aiohttp import web from aiohttp.http import WSMsgType WS_CLOSE_TYPES = (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED) routes = web.RouteTableDef() async def pipe(f, t): while not f.closed and not f.closed: msg = await f.receive() if msg.type in WS_CLOSE_TYPES: break await t.send_str(msg.data) return @routes.post('...') async def websocket_gateway(request): ws = web.WebSocketResponse() await ws.prepare(request) session = aiohttp.ClientSession() to_ws = await session.ws_connect('...') await asyncio.gather( pipe(ws, cs), pipe(cs, ws) ) await ws.close() await to_ws.close()
asyncio.gather
を使うことで、2つの処理を同時に実行して、両方が終了するのを待ってくれます。
優秀なやつですが、今回は片方のWebSocketが終わったら両方終了してほしいので、 pipe
関数内に工夫をしています。
pipe
関数の中で async for msg in ws
せずに while
ループを使っているのは、両方のWebSocketがクローズしていない限り処理を続けるようにするためです。
そうしないと、片方のWebSocketがクローズしてるのに、反対のWebSocketから書き込まれようとしてエラーになるからです。
こんな感じで、 asyncio
や aiohttp
はかなり刺激的で面白いのでぜひ試してみてください。
誰得情報かもしれませんが、 aiohttp
や asyncio
についての情報を出していけたらなと思います。