こんにちは、最近 aiohttp をすごく使っています。 Web-DBな処理はDjangoで実装して、非同期処理が必要なサーバーやクライアントをaiohttpで書くという住み分けをしています (今までNode.jsを使っていたところをaiohttpで実装しています)。
平たく言うとaiohttpはかなり最高なので、今日はそのWebSocketクライアントの実装をソースコードリーディングしていきましょう。
(なぜ読むかと言うと、Dockerのexec start APIがWebSocketのようでそうでない変則的な仕様になっていて、それに対応するためにWebSocketクライアントの実装を読んでいました。 exec start wsのようなAPIを用意してくれると良いのですが)
ws_connectから読んでいこう
aiohttp
の ClientSession
にある .ws_connect()
メソッドから実装を紐解いていきます。
まずはこのメソッドのドキュメントを読んでおきましょう。
https://aiohttp.readthedocs.io/en/stable/client_quickstart.html#websockets
今回は細かい仕様には焦点を当てずに、大まかな実装がどのようになっているかを見ていきましょう。
ClientSession.ws_connect
の実装はここにあります
(_ws_connect
というメソッドに処理があるのでこれを見ていきます)。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L662
まずは必要になるヘッダーの処理が書かれています。
必須になる Connection: Upgrade
や Upgrade: Websocket
などが設定されます。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L686-L711
ヘッダーの値はここにあります。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/hdrs.py#L89-L90
次に、対象のURLにリクエストを送信する処理があります。
ここで Connection: Upgrade
のリクエストを送って、WebSocket通信を開始します。
# send request resp = await self.request(method, url, headers=real_headers, read_until_eof=False, auth=auth, proxy=proxy, proxy_auth=proxy_auth, ssl=ssl, proxy_headers=proxy_headers)
Upgradeのリクエストは read_until_eof=False
オプションを指定して、レスポンスを読み切るまで待たないようにしています。
レスポンスが返ってきた後は、ステータスコードが 101
かや、ヘッダーがWebSocketであるかどうかを検証します。
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h
その後はWebSocket通信のキーの計算などの処理が入りますが、ここは割愛します。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h
ここで、今後WebSocketで通信できるように先程のレスポンスからコネクションを取り出します。
conn = resp.connection assert conn is not None proto = conn.protocol assert proto is not None transport = conn.transport assert transport is not None
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h
細かいことを抜きにしてしまえば、この transport
の .write()
で書きこめばUpgrade後の通信を通してサーバーに書き込みができます。
ですが、書き込み時にヘッダーをつけたり、圧縮したり、読み込んだ通信結果を解釈して WSMessage
クラスでラップしたり、ping/pongをやりとりしたりする処理が必要になります。
そのために reader
と writer
を設定します。
reader = FlowControlDataQueue( proto, limit=2 ** 16, loop=self._loop) # type: FlowControlDataQueue[WSMessage] # noqa proto.set_parser(WebSocketReader(reader, max_msg_size), reader) writer = WebSocketWriter( proto, transport, use_mask=True, compress=compress, notakeover=notakeover)
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L797-L802
FlowControlDataQueue
は読み込んだデータを貯めておくキューで、実際に読み込んだ通信を解釈しているのは WebSocketReader
というクラスです。
また、書き込みを行う WebSocketWriter
も作成されています。
readerとwriterの詳細は後述します。 変則的にWebSocketのプロトコルを少し変えたい場合などはこの reader/writer を差し替えれば良いわけですが、そのフックポイントはありません
あとはこの reader と writer をまとめた ClientWebSocketResponse
というインスタンスで返せば ws_connect
の処理は終了になります
(self._ws_response_class
で返していますが、これは ClientSession
のコンストラクタで受け取る ws_response_class
引数です。デフォルトで ClientWebSocketResponse
です)。
return self._ws_response_class(reader,
writer,
protocol,
resp,
timeout,
autoclose,
autoping,
self._loop,
receive_timeout=receive_timeout,
heartbeat=heartbeat,
compress=compress,
client_notakeover=notakeover)
https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L807-L818
ClientWebSocketResponse
にまとめることで、ライブラリーのユーザーからはreader/writerを分けて考えずに使えるようになっています。また、コネクションをクローズする処理などまとめられています。
aiohttpのClientWebSocketResponseの実装はこちらです。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client_ws.py#L28
平たく言うと receive
メソッドでは reader.read()
を、 send_str
メソッドでは writer.send()
を呼び出すためクラスです。他にも例外を処理したり、 ping/pong
を勝手に処理してくれたり使いやすくするためのクラスでもあります。
Reader/Writer
WebSocketReaderの実装はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/http_websocket.py#L241
このreaderは feed_data
と feed_eof
を実装しています。
ws_connect
で proto.set_parser
することで、このメソッドに通信されてきた文字を渡すことができます。
役割としてはWebSocketの通信 (メッセージなのか、ping/pongなのか) を解釈して、 WSMessage
クラスにラップした上でキューに書き込みます。
このキューというのは ClientWebSocketResponse
の _reader
に設定されている FlowControlDataQueue
です。
WebSocketWriterの実装はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/http_websocket.py#L544
writerは send
や close
、 ping
, pong
を実装しています。 ClientWebSocketReader
の _writer
に設定されているので、 self._writer.send
のように呼ばれます。
WebSocketWriter
は書き込まれた内容を self.transport.write
を使ってサーバーに送信します。
writerはメッセージの圧縮や、ヘッダーを足す処理をしてtransportに書き込みます。
まとめ
ざっと処理の流れを確認するとこのようになっています。 細かいことは割愛しましたが、ますUpgradeのリクエストを送って、そのあとはその通信をそのまま利用してWebSocketのやり取りをするというのがよく分かりました。