🏷️ python
🏷️ fastapi
🏷️ rabbitmq
🏷️ celery
수정일 : 2024-11-15
배경
- 테커 부트캠프에서 팀프로젝트를 진행 중이다.
- 웹소켓을 통해 클라이언트로부터 받은 데이터를 gpt를 통해 처리하고, 결과를 다시 클라이언트로 보내는 서비스를 구현하고 있다.
- 여러 사용자의 요청을 원활하게 처리하기 위해 분산 비동기 시스템을 구축하려고 한다.
목표
- Fastapi, RabbitMQ, Celery를 각자 docker 컨테이너로 구동시키고 연동한다.
docker-compose.yml
1version: '3'
2
3services:
4 rabbitmq:
5 image: rabbitmq:3
6 ports:
7 - "5672:5672" # RabbitMQ의 AMQP 포트
8 - "15672:15672" # RabbitMQ 관리 인터페이스 포트
9 volumes:
10 - rabbitmq_data:/var/lib/rabbitmq
11 expose:
12 - "5672"
13 - "15672"
14
15 celery_worker:
16 build:
17 context: .
18 dockerfile: Dockerfile.worker
19 command: celery -A utils.celery_worker worker --loglevel=info
20 working_dir: /app
21 volumes:
22 - ./app/utils:/app/utils
23 environment:
24 - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
25 depends_on:
26 - rabbitmq
27
28 celery_beat:
29 image: celery:4
30 command: celery -A celery_beat beat --loglevel=info
31 working_dir: /app
32 environment:
33 - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
34 volumes:
35 - ./app/utils:/app
36 depends_on:
37 - rabbitmq
38
39 web:
40 image: python:slim
41 working_dir: /app
42 # interactive mode
43 stdin_open: true
44 # tty mode
45 tty: true
46 environment:
47 - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
48 volumes:
49 - ./app:/app
50 ports:
51 - "8000:8000"
52 depends_on:
53 - rabbitmq
54 - celery_worker
55 - celery_beat
56
57volumes:
58 rabbitmq_data:
- Celery worker에만 Dockerfile.worker를 이미지로 사용한 이유
- worker에 추가적으로 python 라이브러리를 설치해야함
- Celery 공식 도커 이미지가 deprecated 되었음.
- Fastapi는 시간 관계상 따로 이미지를 만들지 않고 python:slim 이미지를 사용했다.
Dockerfile.worker
1FROM python:slim
2
3# 필요한 패키지 설치
4# ffmpeg가 필요해서 추가하였다
5RUN apt-get update && \
6apt-get install -y --no-install-recommends gcc libpq-dev ffmpeg && \
7rm -rf /var/lib/apt/lists/*
8
9# 필요한 파이썬 패키지 설치
10COPY requirements_celery_worker.txt ./
11RUN pip install --no-cache-dir -r requirements_celery_worker.txt
celery_worker.py
1import os
2from celery import Celery
3
4broker_url = os.getenv('CELERY_BROKER_URL')
5app = Celery('worker', broker=broker_url, backend="rpc://")
6
7@app.task
8def add(x, y):
9 return x + y
- broker_url은 RabbitMQ의 AMQP 주소를 의미한다.
- backend는 결과를 받기 위한 백엔드로 RabbitMQ를 사용한다.
Celery worker 사용 방법
1from celery_worker import add
2
3# task를 비동기로 실행
4result = add.delay(4, 4)
5
6# apply_async는 delay와 동일한 기능 수행
7# delay와 달리 추가로 여러 옵션을 설정 가능
8result = add.apply_async((4, 4))
9
10# 결과를 받기 위해 get()을 사용, 블로킹 호출
11result.get()
12
13# 작업이 완료되었는지 확업
14result.ready()
15
16# 작업이 실패했는지 확인
17result.successful()
18# or
19result.failed()
20
21# 작업의 상태 확인 (PENDING, STARTED, SUCCESS, FAILURE)
22result.state()