비동기 기반인 FastAPI 프레임워크에서 mysql을 비동기적으로 사용하기 위해 사용하는 aiomysql에서는 Cursor 클래스 내 executemany
메서드로 다중 insert 문을 사용할 수 있게 지원합니다.
대부분의 기능에서는 excute
메서드로 단건 처리를 많이 하게되어 사용할 일이 거의 없었는데, 이번에 한 번에 많은 데이터를 insert
하게 되는 기능을 만들면서 executemany
의 효용성에 대해 체감을 좀 하게 되었습니다.
🚔 테스트
테스트 코드를 작성해볼게요.
테스트 환경
python 3.11.0
aiomysql 0.2.0
db : 8.0.mysql_aurora.3.04.1
db spec : AWS RDS db.t3.medium (core 1, vCPU 2, 4GB RAM)
먼저 테스트 쿼리를 날릴 테이블을 하나 만듭니다. 간단히 하나의 필드만 선언했습니다.
CREATE TABLE sptemp.excute_many_test
(
no int unsigned
)
;
그리고, 1,000개의 insert를 실행하는 코드를 짜고 실행시켰고, 평균 19초 후 실행이 완료되었습니다. 정말 간단한 쿼리인데도 시간이 꽤나 걸리죠?
각 INSERT 문마다 데이터베이스에 대한 네트워크 호출이 발생하기 때문인데요. 이는 네트워크 지연, 쿼리 파싱, 실행 계획 생성, 트랜잭션 로그 기록 등으로 인해 상대적으로 높은 오버헤드를 유발할 수 있습니다. 이런 오버헤드는 데이터를 대량으로 삽입할 때 성능 저하의 주요 원인이 됩니다.
from datetime import datetime
sql = """
INSERT INTO sptemp.excute_many_test (
no
) VALUES (
%(no)s
);
"""
start_time = datetime.now()
async with db.get_cursor() as cursor:
for _ in range(1000):
await cursor.execute(query=sql, args={'no': 1}
end_time = datetime.now()
print(f'걸린시간: {end_time - start_time}')
⚒️ 개선 1 - 트랜잭션 명시 (19초 -> 13초, 약 31% 개선)
그리고 명시적으로 트랜잭션을 선언하지 않았기 때문에 각 쿼리가 실행될 때마다 commit 관련 로직도 실행되어 늦어지는 것으로 보입니다. 일단 이것부터 개선해볼게요.
start_time = datetime.now()
# 트랜잭션 시작
await db.begin()
async with db.get_cursor() as cursor:
for _ in range(1000):
await cursor.execute(query=sql, args={'no': 1}
# 트랜잭션 종료 후 커밋
await db.commit()
end_time = datetime.now()
print(f'걸린시간: {end_time - start_time}')
시간 체크 로직 사이에 트랜잭션의 시작과 종료를 명시했습니다. 이번엔 평균 13초 정도가 소요됐습니다. 이전에는 각 INSERT 문마다 커밋이 이루어졌다면, 이제는 1000개의 쿼리가 실행된 후, 한 번에 커밋을 하게 되어 오버헤드가 줄어들었습니다.
다만 이런 개선은 비즈니스 로직 상 트랜잭션 구분이 필요한지 여부를 먼저 따져보고 적용해야합니다. 각각의 INSERT 마다 데이터가 반영되고, 그 외 시도 건과는 독립적이어야한다면, 트랜잭션을 적용하면 안돼요!
🪚 개선 2 - executemany (13초 -> 0.23초, 약 92% 개선)
이번에는 본 글의 주제처럼 aiomysql에서 제공하는 excutemany 메서드로 적용을 해볼게요.
async with db.get_cursor() as cursor:
insert_param_list = []
for _ in range(10000):
insert_param_list.append({'no':1})
start_time = datetime.now()
await cursor.executemany(
query=sql,
args=insert_param_list,
)
end_time = datetime.now()
print(f'걸린시간: {end_time - start_time}')
이전에는 반복문을 통해 전체 갯수 만큼의 쿼리를 실행했다면, 이번에는 모든 경우의 수를 리스트에 담아서 한 번에 실행하도록 excutemany
를 사용합니다.
그 결과는... 평균 0.23초 정도가 소요 됐습니다.
위에서 언급했던 것처럼 하나의 쿼리로 전체 과정을 처리하기 때문에, 네트워크 오버헤드가 1/1000으로 줄었고, 이로 인해 드라마틱한 개선이 이루어졌습니다.
excutemany
가 뭐길래 이런 일이 일어나는걸까요?
🔥 excutemany is...
aiomysql(A pure-Python MySQL client library for asyncio)
은 mysql을 사용하기 위한 여러가지 메서드를 제공하는데요, 그 중 executemany
라는 메서드가 있습니다.
간단히 요약하자면 executemany
는 mysql의 INSERT INTO 'table name' (field1, ... ) VALUES (%(value1)s...);
구문을 쉽게 말아서 실행해주는 역할을 합니다.
코드를 한 번 살펴볼까요? (코드 보기 - aiomysql/cursors.py)
# aiomysql/cursors.py
RE_INSERT_VALUES = re.compile(
r"\s*((?:INSERT|REPLACE)\s.+\sVALUES?\s+)" +
r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))" +
r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
re.IGNORECASE | re.DOTALL)
async def executemany(self, query, args):
"""Execute the given operation multiple times
The executemany() method will execute the operation iterating
over the list of parameters in seq_params.
...
"""
...
m = RE_INSERT_VALUES.match(query)
if m:
q_prefix = m.group(1) % ()
q_values = m.group(2).rstrip()
q_postfix = m.group(3) or ''
assert q_values[0] == '(' and q_values[-1] == ')'
return (await self._do_execute_many(
q_prefix, q_values, q_postfix, args, self.max_stmt_length,
self._get_db().encoding))
else:
rows = 0
for arg in args:
await self.execute(query, arg)
rows += self._rowcount
self._rowcount = rows
return self._rowcount
executemany
메서드를 보면, 정규표현식을 통해 변수들이 매핑되기 전의 sql 문을 INSERT ... VALUES...
형식에 맞는지 확인을 먼저 합니다. 이 형식이 맞다면 필드를 정의되어 있는 prefix, 매핑 변수 부분인 values, 그 뒷 부분인 postfix 로 그룹을 나누어 이후 과정을 진행합니다.
만약 정규표현식에 매칭되지 않는다면, 전달 받은 인자 리스트 기준으로 반복문을 통해 각각의 INSERT 쿼리를 실행합니다. 이렇게 되면 executemany
를 쓴 이유가 없죠.
이 부분이 사소해보이면서도 주의 깊게 살펴봐야할 부분인데요. 실제로 어떻게 실행됐는지 로그를 따로 찍지 않으면 알 수 없기 때문에, 나는 bulk insert 를 했는데 왜 이렇게 느리지
이슈가 발생할 수 있어요.
저도 이거 때문에 시간을 꽤나 썼는데요... 저희 팀에서는 sql 문 최상단에 주석으로 해당 쿼리를 설명하는 주석을 달아놓는데, 이 주석 때문에 정규표현식 매칭이 되지 않아 실제로는 for 문으로 쿼리가 실행되고 있었어요. 데이터가 적을 때는 인지하지 못했는데, 데이터가 많아지고 난 후 이슈가 확인되서 수정을 했죠.
만약에 비슷한 관습이 있으신 분들은 executemany
로 바통을 넘기기 전에 sql 문을 가공하는 로직이 있어야합니다.
요런식으로요.
sql = '\n'.join(line for line in sql.split('\n') if not line.strip().startswith('--'))
그리고나서 아래 로직이 실행되는데요. 각각의 값들을 DB 설정 값에 맞게 인코딩을 해주고(pymysql의 디폴트 값은 utf8mb4
) escaping 처리를 해줍니다.
async def _do_execute_many(self, prefix, values, postfix, args,
max_stmt_length, encoding):
conn = self._get_db()
escape = self._escape_args
if isinstance(prefix, str):
prefix = prefix.encode(encoding)
if isinstance(postfix, str):
postfix = postfix.encode(encoding)
sql = bytearray(prefix)
args = iter(args)
v = values % escape(next(args), conn)
if isinstance(v, str):
v = v.encode(encoding, 'surrogateescape')
sql += v
rows = 0
for arg in args:
v = values % escape(arg, conn)
if isinstance(v, str):
v = v.encode(encoding, 'surrogateescape')
if len(sql) + len(v) + len(postfix) + 1 > max_stmt_length:
r = await self.execute(sql + postfix)
rows += r
sql = bytearray(prefix)
else:
sql += b','
sql += v
r = await self.execute(sql + postfix)
rows += r
self._rowcount = rows
return rows
이 과정에서 각각의 부분을 bytearray
로 변환하여 매핑하는데요, 이는 대량 데이터를 처리하는데 좀 더 효율적으로 처리하기 위한걸로 보여요.
bytearray
에 대한 내용은 아래 부록 1
을 참고해보세요.encode Surrogateescape 방식
에 대한 내용은 아래 부록 2
를 참고해보세요.
인코딩처리까지 완료된 매핑 부분은 sql 문에 하나씩 추가가 됩니다.
INSERT INTO sptemp.excute_many_test (
no
) VALUES (
1
),
------- 이 부분 계속 추가
(
1
)
-------
...
;
그런데 이렇게 마냥 추가되면, mysql 이 감당할 수 없는 순간이 찾아올지도 모릅니다. 그래서 제한을 두고 있는데요, default max_stmt_length 를 1024000로 두고 있습니다. 대충 1mb 정도인데요, 이 한도를 넘으면 이때까지 모인 쿼리를 한 번 실행하고, 이후부터 새로 쿼리를 만들어서 실행해줍니다. 영어가 1바이트니 웬만하면 한도에 들지않을까 싶어요. 이 한도는 db가 감당할 수 있는 패킷의 한도와 관련이 있는데요, 좀 더 알아보려면 부록 3
을 찾아봐주세요.
이렇게 만들어진 쿼리는 한 번의 요청으로 실행되어 네트워크 오버헤드를 줄이게 되고, 이로 인해 실행 시간도 대폭 줄여주는 결과를 가져왔습니다.
배치와 같이 대용량 데이터를 처리하는 상황에서 사용하면 좋습니다.
유의사항
1. values 문 사용
일반적으로 INSERT 문으로 많이 사용하는 INSERT INTO 'table name' SET filed1=%(value1)s..;
구문은 사용할 수 없어요. INSERT INTO 'table name' (field1, ... ) VALUES (%(value1)s...);
문만 사용해야합니다.
2. 최초 라인 주석 등 문자 입력 금지
인자 매칭 내에서는 상관 없지만, prefix 앞 부분에 부가적인 문자를 집어넣으면 정규표현식 통과가 안됩니다.
3. 서브쿼리 사용 금지
아래와 같이 서브쿼리를 쿼리 내부에 사용하면 작동하지 않습니다. 서브쿼리 값이 필요하다면 외부에서 조회하여 미리 넣어주세요.
...
VALUES (
(
SELECT
no
FROM test_table
...
LIMIT 1
)
, %(year)s
, %(month)s
...
부록 1. bytearray
참고 >
bytearray
의 특징:
prefix
를bytearray
로 만드는 이유는 이후에 SQL 쿼리 문자열을 동적으로 구성하고 수정하는 과정에서bytearray
의 가변성(mutability)과 효율성을 활용하기 위함입니다.
- 가변성(Mutability):
bytearray
는 수정 가능한(mutable) 바이트 시퀀스입니다. 이는 한 번 생성된 후에도 내용을 변경하거나, 추가, 삭제가 가능하다는 것을 의미합니다. 반면, Python의 문자열(str)과 바이트 문자열(bytes)은 불변(immutable)입니다. 따라서 SQL 쿼리 구성 과정에서 쿼리의 일부를 변경하거나 추가적인 데이터를 쿼리에 삽입해야 할 경우,bytearray
를 사용하면 이러한 작업을 보다 효율적으로 수행할 수 있습니다. - 성능: 대규모 데이터를 다룰 때
bytearray
를 사용하면 메모리를 절약하고 성능을 개선할 수 있습니다.bytearray
는 동적으로 크기를 조정할 수 있으며, 여러 조각의 데이터를 추가하거나 변경할 때 새로운 객체를 계속 생성하지 않아도 되므로 처리 속도가 빨라집니다.사용 예시:
위 코드에서는 SQL 쿼리의prefix
부분을 먼저bytearray
로 변환합니다. 이는 이후 단계에서 쿼리의 다른 부분들, 예를 들어 삽입할 데이터 값(values
)이나 조건(postfix
) 등을 추가할 때, 기존bytearray
객체에 새로운 내용을 직접 추가하거나 변경할 수 있게 하기 위함입니다.
예를 들어, 다음과 같은 과정을 통해 동적으로 SQL 쿼리를 구성할 수 있습니다:
bytearray
로 변환된prefix
에 쿼리의values
부분을 추가.- 필요한 경우 더 많은 데이터나 조건을 추가하여 쿼리를 확장.
- 최종적으로 구성된 쿼리를 데이터베이스에 전송.
이 방식은 특히 대량의 데이터를 처리하거나, 복잡하게 구성된 쿼리를 동적으로 생성할 때 유용하게 사용됩니다.bytearray
를 사용함으로써, 쿼리 문자열의 생성과 수정을 보다 효율적이고 유연하게 처리할 수 있습니다.
부록 2. encode Surrogateescape 방식
Surrogateescape 사용 시나리오
surrogateescape
는 파이썬에서 바이트를 문자열로 디코딩하거나 문자열을 바이트로 인코딩할 때, 유효하지 않은 데이터를 처리하는 방식 중 하나입니다. 이 에러 핸들러를 사용하면, 유효하지 않은 바이트는 유니코드의 서로게이트 영역에 있는 코드 포인트로 변환되어, 나중에 다시 같은 바이트로 복원될 수 있습니다. 주로 파일 시스템 경로나 외부에서 들어온 데이터를 처리할 때 사용되며, 데이터 손실을 방지할 수 있습니다.
Prefix와 Postfix의 처리
prefix
와 postfix
는 쿼리의 시작부와 종료부를 나타내며, 일반적으로 쿼리의 구조를 정의하는 정적인 문자열입니다. 이들 문자열에는 외부에서 입력받은 데이터가 포함되지 않으므로, surrogateescape
와 같은 특별한 에러 핸들링이 필요하지 않습니다. 대신, 이 문자열들은 쿼리를 정의하는 데 사용되는 명확한 SQL 구문을 포함하므로, 정확한 인코딩을 유지하는 것이 중요합니다.
정리
따라서, prefix
와 postfix
를 인코딩할 때는 표준 encode
메소드만 사용하며, surrogateescape
는 사용하지 않는 것이 일반적입니다. 이는 이들 문자열이 데이터베이스 쿼리의 구조를 정의하는데 사용되며, 인코딩 과정에서 데이터 손실이 발생하지 않도록 보장하기 위함입니다. 반면, v
와 같이 외부 데이터를 포함할 수 있는 부분에서는 surrogateescape
를 사용하여 비정상적인 데이터를 안전하게 처리하고, 필요한 경우 복원할 수 있도록 합니다.
부록 3. max_allowed_packet
packet은 mysql 서버와 클라이언트가 주고 받을 수 있는 정보인데, mysql 8.0 기준으로는 1GB가 최대 값이라고 합니다. 디폴트 값으로는 16MB 가 지정되어 있구요.
aiomysql 에서는 오버헤드를 주지 않기 위해서 이를 일부 조정해서 적용해주고 있는거 같아요. aiomysql 내에서는 이 값을 수동으로 설정할 수는 없는거 같아서, 원한다면 따로 코드를 작성해서 적용해야할 것으로 보입니다.
mysql 문서
'개발' 카테고리의 다른 글
buy me a coffee로 후원받아 돈 벌기 (0) | 2024.11.19 |
---|---|
[intellij] 주석에서 맞춤법 검사 제거 (0) | 2024.04.09 |
[Streamlit] 변수를 기억하고 싶다면 Session State를 사용하십시다. (0) | 2024.01.26 |
[AWS] boto3 클라이언트로 Cloudwatch 등 각 서비스 사용량 가져오기 with python (0) | 2024.01.24 |
[AI 음성 생성] AWS Polly(폴리)를 이용한 TTS(Text To Speech, 음성 합성) 구현 (3) - Streamlit 프로젝트 (1) | 2024.01.24 |