고민은 격렬하게, 행동은 단순하게

[python] aiomysql > excutemany 로 multi insert 쿼리 효율성 개선 본문

개발

[python] aiomysql > excutemany 로 multi insert 쿼리 효율성 개선

jomminii 2024. 4. 4. 18:55

비동기 기반인 FastAPI 프레임워크에서 mysql을 비동기적으로 사용하기 위해 사용하는 aiomysql에서는 Cursor 클래스 내 executemany 메서드로 다중 insert 문을 사용할 수 있게 지원합니다.

대부분의 기능에서는 excute 메서드로 단건 처리를 많이 하게되어 사용할 일이 거의 없었는데, 이번에 한 번에 많은 데이터를 insert 하게 되는 기능을 만들면서 executemany 의 효용성에 대해 체감을 좀 하게 되었습니다.

🚔 테스트

테스트 코드를 작성해볼게요.

테스트 환경
python 3.11.0
aiomysql 0.2.0
db : 8.0.mysql_aurora.3.04.1
db spec : AWS RDS db.t3.medium (core 1, vCPU 2, 4GB RAM)

먼저 테스트 쿼리를 날릴 테이블을 하나 만듭니다. 간단히 하나의 필드만 선언했습니다.

CREATE TABLE sptemp.excute_many_test
 (
    no int unsigned 
 )
;

그리고, 1,000개의 insert를 실행하는 코드를 짜고 실행시켰고, 평균 19초 후 실행이 완료되었습니다. 정말 간단한 쿼리인데도 시간이 꽤나 걸리죠?

각 INSERT 문마다 데이터베이스에 대한 네트워크 호출이 발생하기 때문인데요. 이는 네트워크 지연, 쿼리 파싱, 실행 계획 생성, 트랜잭션 로그 기록 등으로 인해 상대적으로 높은 오버헤드를 유발할 수 있습니다. 이런 오버헤드는 데이터를 대량으로 삽입할 때 성능 저하의 주요 원인이 됩니다.

from datetime import datetime

sql = """
INSERT INTO sptemp.excute_many_test (
    no
) VALUES (
    %(no)s
);

"""

start_time = datetime.now()

async with db.get_cursor() as cursor:

    for _ in range(1000):
        await cursor.execute(query=sql, args={'no': 1}

end_time = datetime.now()
print(f'걸린시간: {end_time - start_time}')


⚒️ 개선 1 - 트랜잭션 명시 (19초 -> 13초, 약 31% 개선)

그리고 명시적으로 트랜잭션을 선언하지 않았기 때문에 각 쿼리가 실행될 때마다 commit 관련 로직도 실행되어 늦어지는 것으로 보입니다. 일단 이것부터 개선해볼게요.


start_time = datetime.now()

# 트랜잭션 시작
await db.begin()

async with db.get_cursor() as cursor:

    for _ in range(1000):
        await cursor.execute(query=sql, args={'no': 1}

# 트랜잭션 종료 후 커밋
await db.commit()

end_time = datetime.now()
print(f'걸린시간: {end_time - start_time}')

시간 체크 로직 사이에 트랜잭션의 시작과 종료를 명시했습니다. 이번엔 평균 13초 정도가 소요됐습니다. 이전에는 각 INSERT 문마다 커밋이 이루어졌다면, 이제는 1000개의 쿼리가 실행된 후, 한 번에 커밋을 하게 되어 오버헤드가 줄어들었습니다.

다만 이런 개선은 비즈니스 로직 상 트랜잭션 구분이 필요한지 여부를 먼저 따져보고 적용해야합니다. 각각의 INSERT 마다 데이터가 반영되고, 그 외 시도 건과는 독립적이어야한다면, 트랜잭션을 적용하면 안돼요!

🪚 개선 2 - executemany (13초 -> 0.23초, 약 92% 개선)

이번에는 본 글의 주제처럼 aiomysql에서 제공하는 excutemany 메서드로 적용을 해볼게요.

async with db.get_cursor() as cursor:
    insert_param_list = []

    for _ in range(10000):
        insert_param_list.append({'no':1})

    start_time = datetime.now()

    await cursor.executemany(
        query=sql,
        args=insert_param_list,
    )

    end_time = datetime.now()
    print(f'걸린시간: {end_time - start_time}')

이전에는 반복문을 통해 전체 갯수 만큼의 쿼리를 실행했다면, 이번에는 모든 경우의 수를 리스트에 담아서 한 번에 실행하도록 excutemany를 사용합니다.
그 결과는... 평균 0.23초 정도가 소요 됐습니다.


위에서 언급했던 것처럼 하나의 쿼리로 전체 과정을 처리하기 때문에, 네트워크 오버헤드가 1/1000으로 줄었고, 이로 인해 드라마틱한 개선이 이루어졌습니다.

excutemany가 뭐길래 이런 일이 일어나는걸까요?


🔥 excutemany is...

aiomysql(A pure-Python MySQL client library for asyncio) 은 mysql을 사용하기 위한 여러가지 메서드를 제공하는데요, 그 중 executemany라는 메서드가 있습니다.

간단히 요약하자면 executemany는 mysql의 INSERT INTO 'table name' (field1, ... ) VALUES (%(value1)s...); 구문을 쉽게 말아서 실행해주는 역할을 합니다.
코드를 한 번 살펴볼까요? (코드 보기 - aiomysql/cursors.py)

# aiomysql/cursors.py

RE_INSERT_VALUES = re.compile(
    r"\s*((?:INSERT|REPLACE)\s.+\sVALUES?\s+)" +
    r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))" +
    r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
    re.IGNORECASE | re.DOTALL)


async def executemany(self, query, args):
    """Execute the given operation multiple times

    The executemany() method will execute the operation iterating
    over the list of parameters in seq_params.

    ...

    """
    ...

    m = RE_INSERT_VALUES.match(query)
    if m:
        q_prefix = m.group(1) % ()
        q_values = m.group(2).rstrip()
        q_postfix = m.group(3) or ''
        assert q_values[0] == '(' and q_values[-1] == ')'
        return (await self._do_execute_many(
            q_prefix, q_values, q_postfix, args, self.max_stmt_length,
            self._get_db().encoding))
    else:
        rows = 0
        for arg in args:
            await self.execute(query, arg)
            rows += self._rowcount
        self._rowcount = rows
    return self._rowcount

executemany 메서드를 보면, 정규표현식을 통해 변수들이 매핑되기 전의 sql 문을 INSERT ... VALUES... 형식에 맞는지 확인을 먼저 합니다. 이 형식이 맞다면 필드를 정의되어 있는 prefix, 매핑 변수 부분인 values, 그 뒷 부분인 postfix 로 그룹을 나누어 이후 과정을 진행합니다.

만약 정규표현식에 매칭되지 않는다면, 전달 받은 인자 리스트 기준으로 반복문을 통해 각각의 INSERT 쿼리를 실행합니다. 이렇게 되면 executemany 를 쓴 이유가 없죠.

이 부분이 사소해보이면서도 주의 깊게 살펴봐야할 부분인데요. 실제로 어떻게 실행됐는지 로그를 따로 찍지 않으면 알 수 없기 때문에, 나는 bulk insert 를 했는데 왜 이렇게 느리지 이슈가 발생할 수 있어요.

저도 이거 때문에 시간을 꽤나 썼는데요... 저희 팀에서는 sql 문 최상단에 주석으로 해당 쿼리를 설명하는 주석을 달아놓는데, 이 주석 때문에 정규표현식 매칭이 되지 않아 실제로는 for 문으로 쿼리가 실행되고 있었어요. 데이터가 적을 때는 인지하지 못했는데, 데이터가 많아지고 난 후 이슈가 확인되서 수정을 했죠.

만약에 비슷한 관습이 있으신 분들은 executemany로 바통을 넘기기 전에 sql 문을 가공하는 로직이 있어야합니다.

요런식으로요.

 sql = '\n'.join(line for line in sql.split('\n') if not line.strip().startswith('--'))

그리고나서 아래 로직이 실행되는데요. 각각의 값들을 DB 설정 값에 맞게 인코딩을 해주고(pymysql의 디폴트 값은 utf8mb4) escaping 처리를 해줍니다.

async def _do_execute_many(self, prefix, values, postfix, args,
                            max_stmt_length, encoding):
    conn = self._get_db()
    escape = self._escape_args
    if isinstance(prefix, str):
        prefix = prefix.encode(encoding)
    if isinstance(postfix, str):
        postfix = postfix.encode(encoding)
    sql = bytearray(prefix)
    args = iter(args)
    v = values % escape(next(args), conn)

    if isinstance(v, str):
        v = v.encode(encoding, 'surrogateescape')
    sql += v
    rows = 0
    for arg in args:
        v = values % escape(arg, conn)
        if isinstance(v, str):
            v = v.encode(encoding, 'surrogateescape')
        if len(sql) + len(v) + len(postfix) + 1 > max_stmt_length:
            r = await self.execute(sql + postfix)
            rows += r
            sql = bytearray(prefix)
        else:
            sql += b','
        sql += v
    r = await self.execute(sql + postfix)
    rows += r
    self._rowcount = rows
    return rows

이 과정에서 각각의 부분을 bytearray 로 변환하여 매핑하는데요, 이는 대량 데이터를 처리하는데 좀 더 효율적으로 처리하기 위한걸로 보여요.

bytearray에 대한 내용은 아래 부록 1을 참고해보세요.
encode Surrogateescape 방식에 대한 내용은 아래 부록 2를 참고해보세요.

인코딩처리까지 완료된 매핑 부분은 sql 문에 하나씩 추가가 됩니다.


    INSERT INTO sptemp.excute_many_test (
        no
    ) VALUES (
        1
    ),
    ------- 이 부분 계속 추가
    (
        1
    )
    -------
    ...
    ;

그런데 이렇게 마냥 추가되면, mysql 이 감당할 수 없는 순간이 찾아올지도 모릅니다. 그래서 제한을 두고 있는데요, default max_stmt_length 를 1024000로 두고 있습니다. 대충 1mb 정도인데요, 이 한도를 넘으면 이때까지 모인 쿼리를 한 번 실행하고, 이후부터 새로 쿼리를 만들어서 실행해줍니다. 영어가 1바이트니 웬만하면 한도에 들지않을까 싶어요. 이 한도는 db가 감당할 수 있는 패킷의 한도와 관련이 있는데요, 좀 더 알아보려면 부록 3을 찾아봐주세요.

이렇게 만들어진 쿼리는 한 번의 요청으로 실행되어 네트워크 오버헤드를 줄이게 되고, 이로 인해 실행 시간도 대폭 줄여주는 결과를 가져왔습니다.

배치와 같이 대용량 데이터를 처리하는 상황에서 사용하면 좋습니다.

유의사항

1. values 문 사용

일반적으로 INSERT 문으로 많이 사용하는 INSERT INTO 'table name' SET filed1=%(value1)s..; 구문은 사용할 수 없어요. INSERT INTO 'table name' (field1, ... ) VALUES (%(value1)s...); 문만 사용해야합니다.

2. 최초 라인 주석 등 문자 입력 금지

인자 매칭 내에서는 상관 없지만, prefix 앞 부분에 부가적인 문자를 집어넣으면 정규표현식 통과가 안됩니다.

3. 서브쿼리 사용 금지

아래와 같이 서브쿼리를 쿼리 내부에 사용하면 작동하지 않습니다. 서브쿼리 값이 필요하다면 외부에서 조회하여 미리 넣어주세요.

...
VALUES (
    (
        SELECT
            no
        FROM test_table
        ...
        LIMIT 1
    )
    , %(year)s
    , %(month)s
    ...

부록 1. bytearray

참고 > bytearray의 특징:

prefixbytearray로 만드는 이유는 이후에 SQL 쿼리 문자열을 동적으로 구성하고 수정하는 과정에서 bytearray의 가변성(mutability)과 효율성을 활용하기 위함입니다.

  • 가변성(Mutability): bytearray는 수정 가능한(mutable) 바이트 시퀀스입니다. 이는 한 번 생성된 후에도 내용을 변경하거나, 추가, 삭제가 가능하다는 것을 의미합니다. 반면, Python의 문자열(str)과 바이트 문자열(bytes)은 불변(immutable)입니다. 따라서 SQL 쿼리 구성 과정에서 쿼리의 일부를 변경하거나 추가적인 데이터를 쿼리에 삽입해야 할 경우, bytearray를 사용하면 이러한 작업을 보다 효율적으로 수행할 수 있습니다.
  • 성능: 대규모 데이터를 다룰 때 bytearray를 사용하면 메모리를 절약하고 성능을 개선할 수 있습니다. bytearray는 동적으로 크기를 조정할 수 있으며, 여러 조각의 데이터를 추가하거나 변경할 때 새로운 객체를 계속 생성하지 않아도 되므로 처리 속도가 빨라집니다.

    사용 예시:

    위 코드에서는 SQL 쿼리의 prefix 부분을 먼저 bytearray로 변환합니다. 이는 이후 단계에서 쿼리의 다른 부분들, 예를 들어 삽입할 데이터 값(values)이나 조건(postfix) 등을 추가할 때, 기존 bytearray 객체에 새로운 내용을 직접 추가하거나 변경할 수 있게 하기 위함입니다.

    예를 들어, 다음과 같은 과정을 통해 동적으로 SQL 쿼리를 구성할 수 있습니다:
  1. bytearray로 변환된 prefix에 쿼리의 values 부분을 추가.
  2. 필요한 경우 더 많은 데이터나 조건을 추가하여 쿼리를 확장.
  3. 최종적으로 구성된 쿼리를 데이터베이스에 전송.

    이 방식은 특히 대량의 데이터를 처리하거나, 복잡하게 구성된 쿼리를 동적으로 생성할 때 유용하게 사용됩니다. bytearray를 사용함으로써, 쿼리 문자열의 생성과 수정을 보다 효율적이고 유연하게 처리할 수 있습니다.

부록 2. encode Surrogateescape 방식

Surrogateescape 사용 시나리오

surrogateescape는 파이썬에서 바이트를 문자열로 디코딩하거나 문자열을 바이트로 인코딩할 때, 유효하지 않은 데이터를 처리하는 방식 중 하나입니다. 이 에러 핸들러를 사용하면, 유효하지 않은 바이트는 유니코드의 서로게이트 영역에 있는 코드 포인트로 변환되어, 나중에 다시 같은 바이트로 복원될 수 있습니다. 주로 파일 시스템 경로나 외부에서 들어온 데이터를 처리할 때 사용되며, 데이터 손실을 방지할 수 있습니다.

Prefix와 Postfix의 처리

prefixpostfix는 쿼리의 시작부와 종료부를 나타내며, 일반적으로 쿼리의 구조를 정의하는 정적인 문자열입니다. 이들 문자열에는 외부에서 입력받은 데이터가 포함되지 않으므로, surrogateescape와 같은 특별한 에러 핸들링이 필요하지 않습니다. 대신, 이 문자열들은 쿼리를 정의하는 데 사용되는 명확한 SQL 구문을 포함하므로, 정확한 인코딩을 유지하는 것이 중요합니다.

정리

따라서, prefixpostfix를 인코딩할 때는 표준 encode 메소드만 사용하며, surrogateescape는 사용하지 않는 것이 일반적입니다. 이는 이들 문자열이 데이터베이스 쿼리의 구조를 정의하는데 사용되며, 인코딩 과정에서 데이터 손실이 발생하지 않도록 보장하기 위함입니다. 반면, v와 같이 외부 데이터를 포함할 수 있는 부분에서는 surrogateescape를 사용하여 비정상적인 데이터를 안전하게 처리하고, 필요한 경우 복원할 수 있도록 합니다.


부록 3. max_allowed_packet

packet은 mysql 서버와 클라이언트가 주고 받을 수 있는 정보인데, mysql 8.0 기준으로는 1GB가 최대 값이라고 합니다. 디폴트 값으로는 16MB 가 지정되어 있구요.
aiomysql 에서는 오버헤드를 주지 않기 위해서 이를 일부 조정해서 적용해주고 있는거 같아요. aiomysql 내에서는 이 값을 수동으로 설정할 수는 없는거 같아서, 원한다면 따로 코드를 작성해서 적용해야할 것으로 보입니다.
mysql 문서

반응형