こんにちは、最近 aiohttp をすごく使っています。 Web-DBな処理はDjangoで実装して、非同期処理が必要なサーバーやクライアントをaiohttpで書くという住み分けをしています (今までNode.jsを使っていたところをaiohttpで実装しています)。
平たく言うとaiohttpはかなり最高なので、今日はそのWebSocketクライアントの実装をソースコードリーディングしていきましょう。
(なぜ読むかと言うと、Dockerのexec start APIがWebSocketのようでそうでない変則的な仕様になっていて、それに対応するためにWebSocketクライアントの実装を読んでいました。 exec start wsのようなAPIを用意してくれると良いのですが)
ws_connectから読んでいこう
aiohttp の ClientSession にある .ws_connect() メソッドから実装を紐解いていきます。
まずはこのメソッドのドキュメントを読んでおきましょう。
https://aiohttp.readthedocs.io/en/stable/client_quickstart.html#websockets
今回は細かい仕様には焦点を当てずに、大まかな実装がどのようになっているかを見ていきましょう。
ClientSession.ws_connect の実装はここにあります
(_ws_connect というメソッドに処理があるのでこれを見ていきます)。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L662
まずは必要になるヘッダーの処理が書かれています。
必須になる Connection: Upgrade や Upgrade: Websocket などが設定されます。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L686-L711
ヘッダーの値はここにあります。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/hdrs.py#L89-L90
次に、対象のURLにリクエストを送信する処理があります。
ここで Connection: Upgrade のリクエストを送って、WebSocket通信を開始します。
# send request resp = await self.request(method, url, headers=real_headers, read_until_eof=False, auth=auth, proxy=proxy, proxy_auth=proxy_auth, ssl=ssl, proxy_headers=proxy_headers)
Upgradeのリクエストは read_until_eof=False オプションを指定して、レスポンスを読み切るまで待たないようにしています。
レスポンスが返ってきた後は、ステータスコードが 101 かや、ヘッダーがWebSocketであるかどうかを検証します。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h
その後はWebSocket通信のキーの計算などの処理が入りますが、ここは割愛します。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h
ここで、今後WebSocketで通信できるように先程のレスポンスからコネクションを取り出します。
conn = resp.connection
assert conn is not None
proto = conn.protocol
assert proto is not None
transport = conn.transport
assert transport is not None
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h
細かいことを抜きにしてしまえば、この transport の .write() で書きこめばUpgrade後の通信を通してサーバーに書き込みができます。
ですが、書き込み時にヘッダーをつけたり、圧縮したり、読み込んだ通信結果を解釈して WSMessage クラスでラップしたり、ping/pongをやりとりしたりする処理が必要になります。
そのために reader と writer を設定します。
reader = FlowControlDataQueue(
proto, limit=2 ** 16, loop=self._loop) # type: FlowControlDataQueue[WSMessage] # noqa
proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
writer = WebSocketWriter(
proto, transport, use_mask=True,
compress=compress, notakeover=notakeover)
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L797-L802
FlowControlDataQueue は読み込んだデータを貯めておくキューで、実際に読み込んだ通信を解釈しているのは WebSocketReader というクラスです。
また、書き込みを行う WebSocketWriter も作成されています。
readerとwriterの詳細は後述します。 変則的にWebSocketのプロトコルを少し変えたい場合などはこの reader/writer を差し替えれば良いわけですが、そのフックポイントはありません
あとはこの reader と writer をまとめた ClientWebSocketResponse というインスタンスで返せば ws_connect の処理は終了になります
(self._ws_response_class で返していますが、これは ClientSession のコンストラクタで受け取る ws_response_class 引数です。デフォルトで ClientWebSocketResponse です)。
return self._ws_response_class(reader,
writer,
protocol,
resp,
timeout,
autoclose,
autoping,
self._loop,
receive_timeout=receive_timeout,
heartbeat=heartbeat,
compress=compress,
client_notakeover=notakeover)
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L807-L818
ClientWebSocketResponse にまとめることで、ライブラリーのユーザーからはreader/writerを分けて考えずに使えるようになっています。また、コネクションをクローズする処理などまとめられています。
aiohttpのClientWebSocketResponseの実装はこちらです。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client_ws.py#L28
平たく言うと receive メソッドでは reader.read() を、 send_str メソッドでは writer.send() を呼び出すためクラスです。他にも例外を処理したり、 ping/pong を勝手に処理してくれたり使いやすくするためのクラスでもあります。
Reader/Writer
WebSocketReaderの実装はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/http_websocket.py#L241
このreaderは feed_data と feed_eof を実装しています。
ws_connect で proto.set_parser することで、このメソッドに通信されてきた文字を渡すことができます。
役割としてはWebSocketの通信 (メッセージなのか、ping/pongなのか) を解釈して、 WSMessage クラスにラップした上でキューに書き込みます。
このキューというのは ClientWebSocketResponse の _reader に設定されている FlowControlDataQueue です。
WebSocketWriterの実装はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/http_websocket.py#L544
writerは send や close、 ping, pong を実装しています。 ClientWebSocketReader の _writer に設定されているので、 self._writer.send のように呼ばれます。
WebSocketWriter は書き込まれた内容を self.transport.write を使ってサーバーに送信します。
writerはメッセージの圧縮や、ヘッダーを足す処理をしてtransportに書き込みます。
まとめ
ざっと処理の流れを確認するとこのようになっています。 細かいことは割愛しましたが、ますUpgradeのリクエストを送って、そのあとはその通信をそのまま利用してWebSocketのやり取りをするというのがよく分かりました。