본문 바로가기
개발/NestJS

[NestJS] Queue 완벽 가이드: 기초 개념부터 실전 활용까지

by coking 2024. 11. 25.

들어가며

안녕하세요! Queue 시스템을 처음 접하시는 분들을 위해 글을 써봅니다. 처음에는 저도 "그냥 바로 처리하면 되지, 왜 Queue가 필요하지?"라는 생각을 했었는데요. Queue의 동작 원리를 이해하고 나니 그 필요성이 확 와닿더라고요.

Queue 시스템의 기본 동작 원리

Queue는 어떻게 구성되어 있나요?

Queue 시스템은 크게 세 가지 컴포넌트로 구성됩니다:

  1. Producer (작업 등록자)
  2. Queue (작업 대기열)
  3. Consumer (작업 처리자)

가장 중요한 점은 Consumer가 Queue를 지속적으로 모니터링하고 있다는 것입니다.

// 실제 동작 흐름
1. Producer: "새 작업이 있어요!" → Queue에 작업 등록
2. Queue: "새 작업이 들어왔어요!" → Redis에 작업 저장
3. Consumer: "오, 새 작업이네요!" → 즉시 작업 처리 시작

Consumer는 어떻게 Queue를 감시하나요?

BullMQ는 Redis의 pub/sub 메커니즘을 사용합니다. 쉽게 말해서:

  • Consumer가 Queue를 계속 지켜보고 있어요 (구독 중)
  • 새 작업이 들어오면 Redis가 Consumer에게 알려줍니다 (발행)
// Consumer 예시
@Processor('email')
class EmailConsumer {
  @Process('send-email')
  async handleEmail(job: Job) {
    // 이 메서드는 새 작업이 들어올 때마다 자동으로 호출됩니다!
    const { to, subject, content } = job.data;
    await this.emailService.send(to, subject, content);
  }
}

장애 상황에서는 어떻게 동작하나요?

여러 가지 장애 상황을 살펴봅시다:

  1. Consumer가 죽었을 때
    // 1. Consumer 다운
    Consumer: "으악, 죽어버렸다! 😵"
    Queue: "괜찮아, 작업들 여기 안전하게 보관할게 📦"
    

// 2. Consumer 재시작
Consumer: "다시 살아났어! 🏃‍♂️"
Queue: "오! 자, 여기 처리 못한 작업들이야 📋"
Consumer: "알았어, 바로 처리할게! 💪"


2. **작업 처리 실패 시**
```typescript
@Process('send-email')
async handleEmail(job: Job) {
  try {
    await this.emailService.send(job.data);
  } catch (error) {
    // 실패하면 자동으로 재시도합니다
    throw error; // BullMQ가 재시도 로직을 처리해줍니다
  }
}

왜 Queue가 필요할까요? 실제 사례로 알아보기

1. 대량 이메일 발송 시스템

Queue 사용 전:

// ❌ 문제가 많은 코드
@Controller('newsletter')
class NewsletterController {
  @Post('send')
  async sendNewsletter(@Body() dto: SendNewsletterDto) {
    const users = await this.userService.getActiveUsers(); // 10,000명

    for (const user of users) {
      await this.emailService.send(user.email, dto.content);
      // 1명당 평균 500ms 소요
      // 10,000명 = 5,000,000ms = 83.3분 소요
    }

    return { message: '발송 완료' };  // 83분 후에야 응답
  }
}

이 방식의 문제점:

  1. API 타임아웃 (보통 30초)
  2. 서버 재시작/장애 시 진행상황 유실
  3. 하나의 이메일 발송 실패 시 전체 프로세스 중단
  4. 서버 자원 과다 점유

Queue 사용 후:

// ✅ 개선된 코드
@Controller('newsletter')
class NewsletterController {
  constructor(@InjectQueue('email') private emailQueue: Queue) {}

  @Post('send')
  async sendNewsletter(@Body() dto: SendNewsletterDto) {
    const users = await this.userService.getActiveUsers();

    // Queue에 작업 등록 (즉시 완료)
    const jobs = users.map(user => ({
      name: 'send-newsletter',
      data: { email: user.email, content: dto.content }
    }));

    await this.emailQueue.addBulk(jobs);

    return { 
      message: '뉴스레터 발송이 시작되었습니다',
      totalCount: users.length
    };  // 1초 이내 응답
  }
}

// 별도의 Consumer가 처리
@Processor('email')
class NewsletterConsumer {
  @Process({
    name: 'send-newsletter',
    concurrency: 5  // 동시에 5명에게 발송
  })
  async sendEmail(job: Job) {
    const { email, content } = job.data;
    await this.emailService.send(email, content);
  }
}

개선된 점:

  1. API 즉시 응답 (사용자 경험 향상)
  2. 서버 재시작해도 안전 (Redis에 데이터 보관)
  3. 특정 이메일 실패해도 다른 이메일은 정상 발송
  4. 동시 처리로 총 소요 시간 단축

Queue 시스템 구축하기

1. 기본 설정

// app.module.ts
@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: 'email',
      defaultJobOptions: {
        attempts: 3,  // 실패 시 최대 3번 재시도
        backoff: {
          type: 'exponential', 
          delay: 1000,  // 재시도 간격
        },
        removeOnComplete: true,  // 성공한 작업 제거
        removeOnFail: false,     // 실패한 작업은 보관
      },
    }),
  ],
})
export class AppModule {}

2. Producer 구현

@Injectable()
class NewsletterService {
  constructor(@InjectQueue('email') private emailQueue: Queue) {}

  async sendNewsletters(subscribers: User[]) {
    console.log('뉴스레터 발송 작업 시작');

    // Queue에 작업 등록
    const jobs = subscribers.map(subscriber => ({
      name: 'send-newsletter',
      data: {
        email: subscriber.email,
        name: subscriber.name
      }
    }));

    await this.emailQueue.addBulk(jobs);
    console.log(`${jobs.length}개의 뉴스레터 발송이 Queue에 등록됨`);

    return {
      message: '뉴스레터 발송이 시작되었습니다',
      jobCount: jobs.length
    };
  }
}

3. Consumer 구현

@Processor('email')
class NewsletterConsumer {
  constructor(private logger: Logger) {}

  @Process('send-newsletter')
  async handleNewsletter(job: Job) {
    const { email, name } = job.data;
    this.logger.log(`${email} 처리 시작`);

    try {
      await this.emailService.send(email, {
        template: 'newsletter',
        data: { name }
      });

      this.logger.log(`${email} 발송 완료`);
    } catch (error) {
      this.logger.error(`${email} 발송 실패: ${error.message}`);
      throw error; // 재시도를 위해 에러를 다시 던집니다
    }
  }

  // Queue 이벤트 핸들링
  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(`작업 시작: ${job.id}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job) {
    this.logger.log(`작업 완료: ${job.id}`);
  }

  @OnQueueFailed()
  onFailed(job: Job, error: Error) {
    this.logger.error(`작업 실패: ${job.id}, 에러: ${error.message}`);
  }
}

Queue 시스템의 장점

  1. 안정성

    • Consumer가 죽어도 작업은 Redis에 안전하게 보관
    • 실패한 작업은 자동으로 재시도
    • 시스템 장애 상황에서도 데이터 유실 없음
  2. 확장성

    • 여러 Consumer를 동시에 실행 가능
    • 처리량에 따라 Consumer 수 조절 가능
      @Process({
      name: 'send-newsletter',
      concurrency: 5  // 동시에 5개 작업 처리
      })
  3. 모니터링과 디버깅

    • 작업 상태 실시간 추적
    • 실패한 작업의 원인 분석 용이
    • 처리 시간, 성공/실패율 등 지표 수집 가능

실전 팁

  1. 적절한 Concurrency 설정

    @Process({
    name: 'send-newsletter',
    concurrency: process.env.NODE_ENV === 'production' ? 10 : 3
    })
  2. 작업 우선순위 활용

    await this.emailQueue.add('send-email', data, {
    priority: data.isUrgent ? 1 : 5  // 낮은 숫자가 높은 우선순위
    });
  3. 에러 처리 전략

    @Process('send-email')
    async handleEmail(job: Job) {
    try {
     await this.emailService.send(job.data);
    } catch (error) {
     if (error.code === 'TEMPORARY_ERROR') {
       // 일시적 오류는 재시도
       throw error;
     } else {
       // 영구적 오류는 로깅 후 다음으로
       await this.logger.error(error);
       return { success: false, error: error.message };
     }
    }
    }

마무리

Queue는 단순히 '나중에 처리하는 시스템'이 아닌, 대규모 처리에서 안정성과 확장성을 보장하는 필수적인 아키텍처 요소입니다. 특히 다음과 같은 상황에서는 Queue 도입을 반드시 고려해보세요:

  1. 처리 시간이 오래 걸리는 작업 (이메일 발송, 파일 처리 등)
  2. 실패 시 재시도가 필요한 작업 (외부 API 호출, 결제 처리 등)
  3. 순차적 또는 병렬 처리가 필요한 대량 작업
  4. 시스템 안정성이 중요한 중요 작업

실제 프로젝트에 적용하면서 생기는 궁금증이나 어려움이 있다면 댓글로 남겨주세요!

참고 자료

댓글