- FCM 서버 성능 개선 과정(+FastAPI, HTTP v1 API)2025년 01월 19일
- 째하Develop
- 작성자
- 2025.01.19.:04
반응형개요
현재 회사에서는 중,고등학생들 내신을 위한 문제 풀이 앱을 운영하고 있습니다.
해당 앱에서 유저들에게 FCM을 개인 맞춤화 메시지로 보내는 기획이 생겨
급하게 FastAPI 프레임워크를 사용해 만들었습니다. 하지만 시간이 점점
지날수록 유저들에게 보낼 메시지 수가 많아지고 점점 보내는데 오래 걸려
Timeout이 발생했고 Request가 비정상으로 끊겨 Memory Leak
현상이 발생했습니다.- 문제에 대한 설명 (What)
- 문제의 원인 (Why)
- 어떻게 문제를 풀었는지 (Who)
- 구현 및 결과 (Implement and Result)
로 정리해서 설명할려고 합니다.
1. 문제에 대한 설명(What)
유저들에게 개인 맞춤화 메시지를 보낼 때 처리하는 로직은 다음과 같습니다.
# 보내고 싶은 Target 에 대한 User Data 가져온다 target_user_data = user_repo.find(target) # 데이터 전처리 후 list로 형 변환 df = pd.DataFrame(....) send_data : list = [...] # 전처리한 데이터를 Firebase SDK를 사용해 Send def send_fcm_by_token(data): noti = messaging.Notification(title=..., body=..., image=...) message = messaging.MulticastMessage( tokens=..., notification=noti, fcm_options=messaging.FCMOptions(analytics_label=...), ) data = messaging.send_multicast(message, dry_run=...) return data with concurrent.futures.ThreadPoolExecutor( max_workes= (os.cpu_count() or 1 ) +2 ) as executor: results = list(executor.map(self.send_fcm_by_token, send_data)) ## 보낸 결과 값을 Slack 전송 failure_count = 0 success_count = 0 if results.__iter__: for result in results: # if result.failure_count and result.success_count: failure_count += result.failure_count success_count += result.success_count send_slack(f"FCM Send Success: {success_count}, Failure: {failure_count}")
2. 문제에 대한 원인(Why)
1. Send 하는 과정 중에 발생하는 CPU 사용율 증가
원인
현재 구조가 ThreadPool을 열어 함수를 실행하고 해당 함수(send_each_for_multicast)는 또 ThreadPool을 열어 실행한다. 이렇게 한 이유는 Request가 Sync I/O 이기 때문에 했습니다.
Firbase SDK에서 제공해주는 send_each_for_multicast 라는 함수가 존재합니다. 해당 함수 소스 코드를 까보면
def send_multicast(multicast_message, dry_run=False, app=None): messages = [Message( data=multicast_message.data, notification=multicast_message.notification, android=multicast_message.android, webpush=multicast_message.webpush, apns=multicast_message.apns, fcm_options=multicast_message.fcm_options, token=token ) for token in multicast_message.tokens] return _get_messaging_service(app).send_each(messages, dry_run) def send_each(self, messages, dry_run=False): """Sends the given messages to FCM via the FCM v1 API.""" if not isinstance(messages, list): raise ValueError('messages must be a list of messaging.Message instances.') if len(messages) > 500: raise ValueError('messages must not contain more than 500 elements.') def send_data(data): try: resp = self._client.body( 'post', url=self._fcm_url, headers=self._fcm_headers, json=data) except requests.exceptions.RequestException as exception: return SendResponse(resp=None, exception=self._handle_fcm_error(exception)) else: return SendResponse(resp, exception=None) message_data = [self._message_data(message, dry_run) for message in messages] try: with concurrent.futures.ThreadPoolExecutor(max_workers=len(message_data)) as executor: responses = [resp for resp in executor.map(send_data, message_data)] return BatchResponse(responses) except Exception as error: raise exceptions.UnknownError( message='Unknown error while making remote service calls: {0}'.format(error), cause=error)
이 코드에서 확인해야하는 부분은 이 부분입니다.
concurrent.futures.ThreadPoolExecutor(max_workers=len(message_data)) as executor:
ThreadPool을 생성을 하되 해당 Pool 의 Max Thread 개수는
보낼 메시지의 양입니다. 매 번 한 번씩 보낼 떄 30만개의 List를 500개씩 Chunk 한 후 보내니 Thread 개수만 500개씩 생성을 한다는 것입니다.2. Memory Leak
- 주황색 : Max Memory
- 초록색 : Min Memory
- 파란색 : Avg Memory
FCM 서버는 현재 ECS 서비스 단에서 운영하고 있습니다.
메모리가 점점 차올라서 MemoryOverFlow가 발생하여 ECS 단에서 다시
컨테이너를 띄우는 것을 볼 수가 있습니다.
AWS ELB는 Request 보낼 때 해당 Request의 Timeout을 이렇게 설정할 수 있습니다.Target이 적으면 Max Timeout 이내에 보낼 수 있지만, Target이 많을수록 1시간이 넘어가는 케이스도 있습니다. 해당 케이스는 Request Timeout이 발생하기 때문에 비정상적인 Connection Less로 인해 데이터를 보내기 위한 TCP Socket이 제대로 Close가 안되서 Memory Leak이 발생하는 걸 볼 수가 있습니다.
3. 어떻게 문제를 풀었는지(Who)
문제를 생긴 원인을 정리하자면
- Thread 과다 생산으로 인한 CPU 사용율 증가
- Socket Less으로 인한 Memory Leak 발생
Thread 과다 생산으로 인한 CPU 사용율 증가
- ThreadPool을 생성하는 send_each_for_multicast 함수 사용 X
- Python Event Loop를 사용하여 Thread 1개로 처리
Socket Less으로 인한 Memory Leak 발생
- Request Timeout 이 발생하기 전에 끝내기
- BackGround or Celery or lambda 같은 외부 작업자에 일임하기
4. 구현 및 결과(Implement and Result)
최종 아키텍처는 FCM 서버에서 모든 걸 처리하는 게 아닌
FCM 서버 데이터 전처리 -> Request -> Lambda -> FCM Send -> FCM 서버 결과 처리로 했습니다.FCM 서버
# AS-IS def send_fcm_by_token(data): noti = messaging.Notification(title=..., body=..., image=...) message = messaging.MulticastMessage( tokens=..., notification=noti, fcm_options=messaging.FCMOptions(analytics_label=...), ) data = messaging.send_multicast(message, dry_run=...) return data with concurrent.futures.ThreadPoolExecutor( max_workes= (os.cpu_count() or 1 ) +2 ) as executor: results = list(executor.map(self.send_fcm_by_token, send_data)) # TO-BE def find_optimal_split(self, lst, split_number): ''' Lambda Request Body Max Size : 6MB로 인해 Request Data를 5MB 단위로 Chunk함 ''' max_size_mb = 5 def spliter(): nonlocal lst nonlocal split_number splits = np.array_split(lst, split_number) first_split_size = sys.getsizeof(splits[0]) / 1024 / 1024 if first_split_size > max_size_mb: split_number += 10 return spliter(split_number) else: return splits return spliter() def send_to_lambda(self, data): client = config.lambda_client response = client.invoke( FunctionName=lambda_send_fcm, Payload=json.dumps({"data": data}) ) return response def send_fcm_by_token( self, target, data: list, cnt_send_fcm_data, ): data = self.list_append(data) data = self.find_optimal_split(data, 50) def temp_func(jobs): jobs = [list(job) for job in jobs] self.send_to_lambda(jobs) with concurrent.futures.ThreadPoolExecutor( max_workers=(os.cpu_count() or 1) + 3 ) as executor: executor.map(temp_func, data)
람다
async def post_data(session, headers, url, data, semaphore): async with semaphore: async with session.post(url, headers=headers, json=data) as response: return await response.text() def list_convert_fcm_message(datas): return [ { "validate_only": data[5], "message": { "notification": { "title": data[1], "body": data[2], }, "fcm_options": {"analytics_label": data[3]}, "token": data[0], }, } for data in datas ] async def main(event): FCM_URL = "https://fcm.googleapis.com/v1/projects/{Project Name}/messages:send" cred = credentials.Certificate("./firebase.json") # # 헤더 설정 headers = { "Authorization": "Bearer " + cred.get_access_token().access_token, "Content-Type": "application/json; UTF-8", } data = event["data"] data = list_convert_fcm_message(data) data_cnt = len(data) urls = [FCM_URL] * data_cnt total_request = data_cnt batch_size = 15_000 semaphore = asyncio.Semaphore(batch_size) async with aiohttp.ClientSession() as session: tasks = [] try: for i in range(total_request): task = post_data(session, headers, urls[i], data[i], semaphore) tasks.append(task) if len(tasks) >= batch_size: await asyncio.gather(*tasks) tasks = [] if tasks: await asyncio.gather(*tasks) except Exception as e: print(e) def lambda_handler(event, context): loop = asyncio.get_event_loop() loop.run_until_complete(main(event)) return {"statusCode": 200}
Semaphore 사용한 이유
Semaphore를 따로 설정 안하면 해당 List의 len만큼 데이터를 보낸다.
만약 등록된 Task가 20,000개가 있다고 한다.이벤트 루프 시작 │ ▼ 작업 등록 (총 20,000개) │ ▼ ┌───────────────────────────────────┐ │ 이벤트 루프가 작업을 관리하고 실행 │ │ │ │ │ ▼ ▼ │[ Task 1 ] ... [ Task 100 ] [ Task 101 ] ... [ Task 20000 ] │(실행 중) └───────────────────────────────────┘ │ ▼
동시에 20,000개가 실행이 되면 연결할 수 있는 Port의 수가 없어
에러가 발생한다. ( Port의 수가 왜 연관있는 지는 다른 포스트에 적겠습니다)만약 Semaphore를 설정을 하면
이벤트 루프 시작 │ ▼ 세마포어 설정 (동시 실행 작업 수 제한) │ ▼ 작업 등록 (총 20,000개) │ ▼ ┌───────────────────────────────────┐ │ 이벤트 루프가 작업을 관리하고 실행 │ │ │ │ │ ▼ ▼ │[ Task 1 ] ... [ Task 100 ] [ Task 101 ] ... [ Task 20000 ] │(실행 중, 세마포어로 제한됨) (대기 중) └───────────────────────────────────┘ │ ▼ 작업 완료 시 대기 중 작업 실행
동시에 실행되는 테스크가 Semaphore만큼 실행이 되니 Port의 개수도
널널해 에러가 발생을 안한다.정리
따로 사진을 없지만 Token을 약 40만개 보낼 때
- AS-IS : 70분
- T0-BE : 4분
으로 10 배 ~ 20배 정도 성능 개선을 했다.
이렇게 성능 개선을 하면서 Semaphore, TCP Socket, Buffer Size, Port의 개수 등
책으로만 공부했던 걸 실제 코드에 구현을 하니 그 의미가 훨씬 와닿았던 것 같다.추가) 면접에서 해당 이슈를 처리하면서 단기간에 이렇게 처리량을 늘리면 서버 과부하 나지 않냐??
라는 질문을 받았다. 이 떄 해당 이슈를 처리했을 떄는 개발자의 관점으로만 성능 개선!! 에만 집중했던 거 같아다른 사이드 이펙트를 생각을 안했던 것 같다.
반응형'Python' 카테고리의 다른 글
AWS ECR 비용 최적화 해보기(Python 스크립트를 곁들인) (0) 2025.01.19 FastAPI에서 APP Mount + CORSMiddleware (0) 2025.01.19 FastAPI + Sqlalchemy를 활용한 Pytest( + async) (0) 2025.01.19 Iterator 와 Generator (0) 2025.01.19 Python을 활용한 FCM Bulk Push (0) 2025.01.19 다음글이전글이전 글이 없습니다.댓글