bong-u/til

[모각코24하계] 03 : 결과

수정일 : 2024-07-16

Fastapi, RabbitMQ, Celery 연동

배경

  • 테커 부트캠프에서 팀프로젝트를 진행 중이다.
  • 웹소켓을 통해 클라이언트로부터 받은 데이터를 gpt를 통해 처리하고, 결과를 다시 클라이언트로 보내는 서비스를 구현하고 있다.
  • 여러 사용자의 요청을 원활하게 처리하기 위해 분산 비동기 시스템을 구축하려고 한다.

목표

  • Fastapi, RabbitMQ, Celery를 각자 docker 컨테이너로 구동시키고 연동한다.

docker-compose.yml

 1version: '3'
 2
 3services:
 4  rabbitmq:
 5    image: rabbitmq:3
 6    ports:
 7      - "5672:5672" # RabbitMQ의 AMQP 포트
 8      - "15672:15672" # RabbitMQ 관리 인터페이스 포트
 9    volumes:
10      - rabbitmq_data:/var/lib/rabbitmq
11    expose:
12      - "5672"
13      - "15672"
14
15  celery_worker:
16    build:
17      context: .
18      dockerfile: Dockerfile.worker
19    command: celery -A utils.celery_worker worker --loglevel=info
20    working_dir: /app
21    volumes:
22      - ./app/utils:/app/utils
23    environment:
24      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
25    depends_on:
26      - rabbitmq
27
28  celery_beat:
29    image: celery:4
30    command: celery -A celery_beat beat --loglevel=info
31    working_dir: /app
32    environment:
33      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
34    volumes:
35      - ./app/utils:/app
36    depends_on:
37      - rabbitmq
38  
39  web:
40    image: python:slim
41    working_dir: /app
42    # interactive mode
43    stdin_open: true
44    # tty mode
45    tty: true
46    environment:
47      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
48    volumes:
49      - ./app:/app
50    ports:
51      - "8000:8000"
52    depends_on:
53      - rabbitmq
54      - celery_worker
55      - celery_beat
56
57volumes:
58  rabbitmq_data:
  • Celery worker에만 Dockerfile.worker를 이미지로 사용한 이유
    1. worker에 추가적으로 python 라이브러리를 설치해야함
    2. Celery 공식 도커 이미지가 deprecated 되었음.
  • Fastapi는 시간 관계상 따로 이미지를 만들지 않고 python:slim 이미지를 사용했다.

Dockerfile.worker

 1FROM python:slim
 2
 3# 필요한 패키지 설치
 4# ffmpeg가 필요해서 추가하였다
 5RUN apt-get update && \
 6apt-get install -y --no-install-recommends gcc libpq-dev ffmpeg && \
 7rm -rf /var/lib/apt/lists/*
 8
 9# 필요한 파이썬 패키지 설치
10COPY requirements_celery_worker.txt ./
11RUN pip install --no-cache-dir -r requirements_celery_worker.txt

celery_worker.py

1import os
2from celery import Celery
3
4broker_url = os.getenv('CELERY_BROKER_URL')
5app = Celery('worker', broker=broker_url, backend="rpc://")
6
7@app.task
8def add(x, y):
9    return x + y
  • broker_url은 RabbitMQ의 AMQP 주소를 의미한다.
  • backend는 결과를 받기 위한 백엔드로 RabbitMQ를 사용한다.

Celery worker 사용 방법

 1from celery_worker import add
 2
 3# task를 비동기로 실행
 4result = add.delay(4, 4)
 5
 6# apply_async는 delay와 동일한 기능 수행
 7# delay와 달리 추가로 여러 옵션을 설정 가능
 8result = add.apply_async((4, 4))
 9
10# 결과를 받기 위해 get()을 사용, 블로킹 호출
11result.get()
12
13# 작업이 완료되었는지 확업
14result.ready()
15
16# 작업이 실패했는지 확인
17result.successful()
18# or
19result.failed()
20
21# 작업의 상태 확인 (PENDING, STARTED, SUCCESS, FAILURE)
22result.state()