Blog
ブログ

2024年12月08日

Pythonの非同期プログラミング、asyncio のエラーデバッグ

おはようございます!AI/BI部のCです。

最近Advent Calendarイベントが行われており、仕事でPythonのasyncioライブラリを使用する際にいくつかの問題に遭遇しました。問題解決のプロセスとその後の考察を皆さんと共有します。

 

asyncioについて


ご存知の通り、Pythonの並行プログラミングは長い間批判されてきました。例えば、Pythonのマルチスレッドは実際には真のマルチスレッドではなく、GILを使用してスレッドを切り替えています。CPUの複数のコアを本当に活用できるのはマルチプロセスだけです。しかし、2014年のPython 3.4以降、asyncioモジュールが導入され、スレッドはPythonの非同期プログラミングの中心となりました。

早期のバージョン(Python 3.7以前)では、コルーチンの呼び出しには明示的にイベントマネージャを制御する必要がありました。その時のコードはおおよそ以下のような感じです。

import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as client:
        resp = awaitclient.get(‘http://httpbin.org/ip’)
        ip = awaitresp.json()
        print(ip)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

get_event_loop 関数は明示的にイベントループを作成し、その後 run_until_complete でコルーチンを登録して実行します。しかし、Python 3.7以降では run メソッドが導入され、イベントループの作成と管理が自動的に行われるようになり、使用方法は次のように変わりました。以前よりシンプルになりました。

 

import asyncio
import aiohttp
asyncdefmain():
    async with aiohttp.ClientSession() as client:
        resp = awaitclient.get('http://httpbin.org/ip')
        ip = awaitresp.json()
        print(ip)
asyncio.run(main())


asyncio.runを使用する際に直面した問題


import asyncio
import motor.motor_asyncio

classMongoUtil:
    def__init__(self):
        conn = motor.motor_asyncio.AsyncIOMotorClient()
        db = conn.exercise
        self.collection = db.person_info

    asyncdefread_people(self):
        asyncfordocinself.collection.find({}, {‘_id’: 0}):
        print(doc)

util = MongoUtil()
asyncio.run(util.read_people())
上記のコードを実行すると、次のようなエラーが発生しました:
RuntimeError: Task <Task pending name=’Task-1′ coro=<MongoUtil.read_people() running at /Users/mika/py_workspace/test/main.py:12> cb=[_run_until_complete_cb() at /Users/mika/opt/anaconda3/envs/py38/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/mika/opt/anaconda3/envs/py38/lib/python3.8/asyncio/futures.py:360]> attached to a different loop
こちらのエラーメッセージの意味は、現在のスレッドに複数のイベントループが存在するということです。この RuntimeError が発生する理由を考えてみましょう。まず、asyncio の公式ドキュメントによると、asyncio.run() は他のイベントループが実行中のときには実行できないと記載されています。
https://docs.python.org/3.10/library/asyncio-task.html#asyncio.run
では、asyncio.run() によって自動的に作成されたイベントループ以外で、どこでイベントループが作成されているのでしょうか。ここで motor.motor_asyncio.AsyncIOMotorClient のソースコードを確認してみると、イベントループの引数を渡すことができ、渡さない場合には新たにイベントループを作成することが分かります。これにより、__init__ 関数内で新しいイベントループが作成され、その結果後続の asyncio.run() がエラーを起こすことになります。
https://github.com/mongodb/motor/blob/4c7534c6200e4f160268ea6c0e8a9038dcc69e0f/motor/core.py#L154

解決方法


この問題を解決する方法は2つあります。一つは、外部で asyncio.get_event_loop() を使用して、現在のスレッドのイベントループ(つまり、motor が作成したイベントループ)を取得することです。これにより、複数のイベントループが存在することを避けることができます。

import asyncio
import motor.motor_asyncio

class MongoUtil:
    def __init__(self):
        conn = motor.motor_asyncio.AsyncIOMotorClient()
        db = conn.exercise
        self.collection = db.person_info

    async def read_people(self):
        async for doc in self.collection.find({}, {‘_id’: 0}):
            print(doc)

util = MongoUtil()
loop = asyncio.get_event_loop()
loop.run_until_complete(util.read_people())

もう一つの方法は、motor パッケージをアップグレードすることです。バージョン 3.0 以降では、AsyncIOMotorClient の初期化時にイベントループを自動的に作成しなくなったため、衝突が発生しなくなります。

 

最後


このエラーについてさらに asyncio の仕組みを掘り下げていくと、コルーチンにとってイベントループはスケジューラのような役割を果たしていることがわかります。コルーチンは awaitable 特性を持ち、await キーワードで自らを一時停止させ、制御をイベントループに返すことができます。これにより、イベントループは他のコルーチンをスケジュールして実行することができ、並行処理が実現されます。これらはすべて1つのスレッド内で行われ、スレッド間のコンテキストスイッチが不要なため、コルーチンは軽量で効率的な並行処理の手段となります。

では、最後に、マルチスレッド環境においてコルーチン同士はどのように管理されるのでしょうか?それらは完全に独立したリソースなのでしょうか?この点については、後続の記事でさらに詳しく議論していきたいと思います。

 

このページの先頭へ