home

동기 함수를 비동기로 변경하는 방법

yagmail이라고 손쉽게 이메일을 보내주는 라이브러리를 비동기 함수 속에서 실행하려고 하였으나, 애석하게도 동기 라이브러리라서 비동기 함수에서 사용하려면 추가적인 작업이 필요합니다. 그래서 그 방법에 대해서 몇 가지 가져왔습니다.

asyncio.to_thread사용

파이썬 3.9부터 사용할 수 있고, 가장 간단하고 직관적인 방법입니다. 내부적으로 ThreadPoolExecutor를 사용합니다.

class YagEmailHandler(AbstractEmailHandler):
    def __init__(self):
        self._mail_client = yagmail.SMTP(
            user=settings.smtp_username,
            password=settings.smtp_password,
        )

    async def send_email(self, to: str, subject: str, body: str):
        await asyncio.to_thread(
            self._mail_client.send,
            to=to,
            subject=subject,
            contents=body
        )

to_thread 를 단순화 하면 밑과 같이 볼 수 있어요.

async def to_thread(func, /, *args, **kwargs):
    loop = asyncio.get_running_loop()
    executor = loop._default_executor  # ThreadPoolExecutor 사용
    if executor is None:
        executor = ThreadPoolExecutor()
        loop.set_default_executor(executor)
    
    return await loop.run_in_executor(executor, func, *args, **kwargs)

이벤트 루프의 기본 ThreadPoolExecutor 를 사용하고, 스레드 수의 기본 값은 min(32, (os.cpu_count() or 1) + 4) 입니다.

이렇듯 자동으로 스레드 풀을 생성하고 관리해주기 때문에 복잡한 설정 없이도 쓸 수 있는 장점이 있어요.

run_in_executor 사용

더 세밀한 제어가 가능하고, 커스텀 Excutor를 사용할 수 있는 차이점이 있습니다. 예를 들면 ProcessPoolExecutor 과 같이 프로세스 풀을 사용할 수 있어요. 그래서 CPU 집약적인 작업을 할 때는 run_in_executor를 사용하는 것이 더 유리할 수 있습니다.

from concurrent.futures import ThreadPoolExecutor

class YagEmailHandler(AbstractEmailHandler):
    def __init__(self):
        self._mail_client = yagmail.SMTP(
            user=settings.smtp_username,
            password=settings.smtp_password,
        )
        # 커스텀 Executor 생성
        self._executor = ThreadPoolExecutor(
            max_workers=10,
            thread_name_prefix="email_sender_"
        )

    async def send_email(self, to: str, subject: str, body: str):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(
            self._executor,
            self._mail_client.send,
            to,
            subject,
            body
        )
    
    async def close(self):
        """리소스 정리"""
        self._executor.shutdown(wait=True)

executor를 파라미터로 받기 때문에, 설정 부분에서 max_workers, thread_name_prefix 등 설정이 가능하고, 특정 작업에 최적화된 스레드 풀 구성 가능해서 성능 튜닝이 가능한 장점이 있습니다.

하지만 트레이드 오프로는 당연하게도 관리 포인트가 늘어나는데요, 명시적으로 executor를 생성하고 해제해줘야 돼서 리소스 관리를 스스로 해야합니다.