Make組ブログ

Python、Webサービスや製品開発、ライブラリー開発についてhirokikyが書きます

aiohttpのWebSocketクライアントの実装をソースコードリーディングしていく

こんにちは、最近 aiohttp をすごく使っています。 Web-DBな処理はDjangoで実装して、非同期処理が必要なサーバーやクライアントをaiohttpで書くという住み分けをしています (今までNode.jsを使っていたところをaiohttpで実装しています)。

平たく言うとaiohttpはかなり最高なので、今日はそのWebSocketクライアントの実装をソースコードリーディングしていきましょう。

(なぜ読むかと言うと、Dockerのexec start APIがWebSocketのようでそうでない変則的な仕様になっていて、それに対応するためにWebSocketクライアントの実装を読んでいました。 exec start wsのようなAPIを用意してくれると良いのですが)

ws_connectから読んでいこう

aiohttpClientSession にある .ws_connect() メソッドから実装を紐解いていきます。 まずはこのメソッドのドキュメントを読んでおきましょう。 https://aiohttp.readthedocs.io/en/stable/client_quickstart.html#websockets

今回は細かい仕様には焦点を当てずに、大まかな実装がどのようになっているかを見ていきましょう。 ClientSession.ws_connect の実装はここにあります (_ws_connect というメソッドに処理があるのでこれを見ていきます)。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L662

まずは必要になるヘッダーの処理が書かれています。 必須になる Connection: UpgradeUpgrade: Websocket などが設定されます。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L686-L711 ヘッダーの値はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/hdrs.py#L89-L90

次に、対象のURLにリクエストを送信する処理があります。 ここで Connection: Upgrade のリクエストを送って、WebSocket通信を開始します。

        # send request
        resp = await self.request(method, url,
                                  headers=real_headers,
                                  read_until_eof=False,
                                  auth=auth,
                                  proxy=proxy,
                                  proxy_auth=proxy_auth,
                                  ssl=ssl,
                                  proxy_headers=proxy_headers)

Upgradeのリクエストは read_until_eof=False オプションを指定して、レスポンスを読み切るまで待たないようにしています。

レスポンスが返ってきた後は、ステータスコード101 かや、ヘッダーがWebSocketであるかどうかを検証します。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h

その後はWebSocket通信のキーの計算などの処理が入りますが、ここは割愛します。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h

ここで、今後WebSocketで通信できるように先程のレスポンスからコネクションを取り出します。

            conn = resp.connection
            assert conn is not None
            proto = conn.protocol
            assert proto is not None
            transport = conn.transport
            assert transport is not None

https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L714-L721h

細かいことを抜きにしてしまえば、この transport.write() で書きこめばUpgrade後の通信を通してサーバーに書き込みができます。 ですが、書き込み時にヘッダーをつけたり、圧縮したり、読み込んだ通信結果を解釈して WSMessage クラスでラップしたり、ping/pongをやりとりしたりする処理が必要になります。

そのために readerwriter を設定します。

            reader = FlowControlDataQueue(
                proto, limit=2 ** 16, loop=self._loop)  # type: FlowControlDataQueue[WSMessage]  # noqa
            proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
            writer = WebSocketWriter(
                proto, transport, use_mask=True,
                compress=compress, notakeover=notakeover)

https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L797-L802

FlowControlDataQueue は読み込んだデータを貯めておくキューで、実際に読み込んだ通信を解釈しているのは WebSocketReader というクラスです。 また、書き込みを行う WebSocketWriter も作成されています。

readerとwriterの詳細は後述します。 変則的にWebSocketのプロトコルを少し変えたい場合などはこの reader/writer を差し替えれば良いわけですが、そのフックポイントはありません

あとはこの reader と writer をまとめた ClientWebSocketResponse というインスタンスで返せば ws_connect の処理は終了になります (self._ws_response_class で返していますが、これは ClientSession のコンストラクタで受け取る ws_response_class 引数です。デフォルトで ClientWebSocketResponse です)。

            return self._ws_response_class(reader,
                                           writer,
                                           protocol,
                                           resp,
                                           timeout,
                                           autoclose,
                                           autoping,
                                           self._loop,
                                           receive_timeout=receive_timeout,
                                           heartbeat=heartbeat,
                                           compress=compress,
                                           client_notakeover=notakeover)

https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client.py#L807-L818

ClientWebSocketResponse にまとめることで、ライブラリーのユーザーからはreader/writerを分けて考えずに使えるようになっています。また、コネクションをクローズする処理などまとめられています。

aiohttpのClientWebSocketResponseの実装はこちらです。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/client_ws.py#L28

平たく言うと receive メソッドでは reader.read() を、 send_str メソッドでは writer.send() を呼び出すためクラスです。他にも例外を処理したり、 ping/pong を勝手に処理してくれたり使いやすくするためのクラスでもあります。

Reader/Writer

WebSocketReaderの実装はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/http_websocket.py#L241

このreaderは feed_datafeed_eof を実装しています。 ws_connectproto.set_parser することで、このメソッドに通信されてきた文字を渡すことができます。 役割としてはWebSocketの通信 (メッセージなのか、ping/pongなのか) を解釈して、 WSMessage クラスにラップした上でキューに書き込みます。 このキューというのは ClientWebSocketResponse_reader に設定されている FlowControlDataQueue です。

WebSocketWriterの実装はここにあります。 https://github.com/aio-libs/aiohttp/blob/v3.5.4/aiohttp/http_websocket.py#L544

writerは sendcloseping, pong を実装しています。 ClientWebSocketReader_writer に設定されているので、 self._writer.send のように呼ばれます。

WebSocketWriter は書き込まれた内容を self.transport.write を使ってサーバーに送信します。 writerはメッセージの圧縮や、ヘッダーを足す処理をしてtransportに書き込みます。

まとめ

ざっと処理の流れを確認するとこのようになっています。 細かいことは割愛しましたが、ますUpgradeのリクエストを送って、そのあとはその通信をそのまま利用してWebSocketのやり取りをするというのがよく分かりました。