들어가며
안녕하세요! Queue 시스템을 처음 접하시는 분들을 위해 글을 써봅니다. 처음에는 저도 "그냥 바로 처리하면 되지, 왜 Queue가 필요하지?"라는 생각을 했었는데요. Queue의 동작 원리를 이해하고 나니 그 필요성이 확 와닿더라고요.
Queue 시스템의 기본 동작 원리
Queue는 어떻게 구성되어 있나요?
Queue 시스템은 크게 세 가지 컴포넌트로 구성됩니다:
- Producer (작업 등록자)
- Queue (작업 대기열)
- Consumer (작업 처리자)
가장 중요한 점은 Consumer가 Queue를 지속적으로 모니터링하고 있다는 것입니다.
// 실제 동작 흐름
1. Producer: "새 작업이 있어요!" → Queue에 작업 등록
2. Queue: "새 작업이 들어왔어요!" → Redis에 작업 저장
3. Consumer: "오, 새 작업이네요!" → 즉시 작업 처리 시작
Consumer는 어떻게 Queue를 감시하나요?
BullMQ는 Redis의 pub/sub 메커니즘을 사용합니다. 쉽게 말해서:
- Consumer가 Queue를 계속 지켜보고 있어요 (구독 중)
- 새 작업이 들어오면 Redis가 Consumer에게 알려줍니다 (발행)
// Consumer 예시
@Processor('email')
class EmailConsumer {
@Process('send-email')
async handleEmail(job: Job) {
// 이 메서드는 새 작업이 들어올 때마다 자동으로 호출됩니다!
const { to, subject, content } = job.data;
await this.emailService.send(to, subject, content);
}
}
장애 상황에서는 어떻게 동작하나요?
여러 가지 장애 상황을 살펴봅시다:
- Consumer가 죽었을 때
// 1. Consumer 다운 Consumer: "으악, 죽어버렸다! 😵" Queue: "괜찮아, 작업들 여기 안전하게 보관할게 📦"
// 2. Consumer 재시작
Consumer: "다시 살아났어! 🏃♂️"
Queue: "오! 자, 여기 처리 못한 작업들이야 📋"
Consumer: "알았어, 바로 처리할게! 💪"
2. **작업 처리 실패 시**
```typescript
@Process('send-email')
async handleEmail(job: Job) {
try {
await this.emailService.send(job.data);
} catch (error) {
// 실패하면 자동으로 재시도합니다
throw error; // BullMQ가 재시도 로직을 처리해줍니다
}
}
왜 Queue가 필요할까요? 실제 사례로 알아보기
1. 대량 이메일 발송 시스템
Queue 사용 전:
// ❌ 문제가 많은 코드
@Controller('newsletter')
class NewsletterController {
@Post('send')
async sendNewsletter(@Body() dto: SendNewsletterDto) {
const users = await this.userService.getActiveUsers(); // 10,000명
for (const user of users) {
await this.emailService.send(user.email, dto.content);
// 1명당 평균 500ms 소요
// 10,000명 = 5,000,000ms = 83.3분 소요
}
return { message: '발송 완료' }; // 83분 후에야 응답
}
}
이 방식의 문제점:
- API 타임아웃 (보통 30초)
- 서버 재시작/장애 시 진행상황 유실
- 하나의 이메일 발송 실패 시 전체 프로세스 중단
- 서버 자원 과다 점유
Queue 사용 후:
// ✅ 개선된 코드
@Controller('newsletter')
class NewsletterController {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
@Post('send')
async sendNewsletter(@Body() dto: SendNewsletterDto) {
const users = await this.userService.getActiveUsers();
// Queue에 작업 등록 (즉시 완료)
const jobs = users.map(user => ({
name: 'send-newsletter',
data: { email: user.email, content: dto.content }
}));
await this.emailQueue.addBulk(jobs);
return {
message: '뉴스레터 발송이 시작되었습니다',
totalCount: users.length
}; // 1초 이내 응답
}
}
// 별도의 Consumer가 처리
@Processor('email')
class NewsletterConsumer {
@Process({
name: 'send-newsletter',
concurrency: 5 // 동시에 5명에게 발송
})
async sendEmail(job: Job) {
const { email, content } = job.data;
await this.emailService.send(email, content);
}
}
개선된 점:
- API 즉시 응답 (사용자 경험 향상)
- 서버 재시작해도 안전 (Redis에 데이터 보관)
- 특정 이메일 실패해도 다른 이메일은 정상 발송
- 동시 처리로 총 소요 시간 단축
Queue 시스템 구축하기
1. 기본 설정
// app.module.ts
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
attempts: 3, // 실패 시 최대 3번 재시도
backoff: {
type: 'exponential',
delay: 1000, // 재시도 간격
},
removeOnComplete: true, // 성공한 작업 제거
removeOnFail: false, // 실패한 작업은 보관
},
}),
],
})
export class AppModule {}
2. Producer 구현
@Injectable()
class NewsletterService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendNewsletters(subscribers: User[]) {
console.log('뉴스레터 발송 작업 시작');
// Queue에 작업 등록
const jobs = subscribers.map(subscriber => ({
name: 'send-newsletter',
data: {
email: subscriber.email,
name: subscriber.name
}
}));
await this.emailQueue.addBulk(jobs);
console.log(`${jobs.length}개의 뉴스레터 발송이 Queue에 등록됨`);
return {
message: '뉴스레터 발송이 시작되었습니다',
jobCount: jobs.length
};
}
}
3. Consumer 구현
@Processor('email')
class NewsletterConsumer {
constructor(private logger: Logger) {}
@Process('send-newsletter')
async handleNewsletter(job: Job) {
const { email, name } = job.data;
this.logger.log(`${email} 처리 시작`);
try {
await this.emailService.send(email, {
template: 'newsletter',
data: { name }
});
this.logger.log(`${email} 발송 완료`);
} catch (error) {
this.logger.error(`${email} 발송 실패: ${error.message}`);
throw error; // 재시도를 위해 에러를 다시 던집니다
}
}
// Queue 이벤트 핸들링
@OnQueueActive()
onActive(job: Job) {
this.logger.log(`작업 시작: ${job.id}`);
}
@OnQueueCompleted()
onCompleted(job: Job) {
this.logger.log(`작업 완료: ${job.id}`);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
this.logger.error(`작업 실패: ${job.id}, 에러: ${error.message}`);
}
}
Queue 시스템의 장점
안정성
- Consumer가 죽어도 작업은 Redis에 안전하게 보관
- 실패한 작업은 자동으로 재시도
- 시스템 장애 상황에서도 데이터 유실 없음
확장성
- 여러 Consumer를 동시에 실행 가능
- 처리량에 따라 Consumer 수 조절 가능
@Process({ name: 'send-newsletter', concurrency: 5 // 동시에 5개 작업 처리 })
모니터링과 디버깅
- 작업 상태 실시간 추적
- 실패한 작업의 원인 분석 용이
- 처리 시간, 성공/실패율 등 지표 수집 가능
실전 팁
적절한 Concurrency 설정
@Process({ name: 'send-newsletter', concurrency: process.env.NODE_ENV === 'production' ? 10 : 3 })
작업 우선순위 활용
await this.emailQueue.add('send-email', data, { priority: data.isUrgent ? 1 : 5 // 낮은 숫자가 높은 우선순위 });
에러 처리 전략
@Process('send-email') async handleEmail(job: Job) { try { await this.emailService.send(job.data); } catch (error) { if (error.code === 'TEMPORARY_ERROR') { // 일시적 오류는 재시도 throw error; } else { // 영구적 오류는 로깅 후 다음으로 await this.logger.error(error); return { success: false, error: error.message }; } } }
마무리
Queue는 단순히 '나중에 처리하는 시스템'이 아닌, 대규모 처리에서 안정성과 확장성을 보장하는 필수적인 아키텍처 요소입니다. 특히 다음과 같은 상황에서는 Queue 도입을 반드시 고려해보세요:
- 처리 시간이 오래 걸리는 작업 (이메일 발송, 파일 처리 등)
- 실패 시 재시도가 필요한 작업 (외부 API 호출, 결제 처리 등)
- 순차적 또는 병렬 처리가 필요한 대량 작업
- 시스템 안정성이 중요한 중요 작업
실제 프로젝트에 적용하면서 생기는 궁금증이나 어려움이 있다면 댓글로 남겨주세요!
참고 자료
'개발 > NestJS' 카테고리의 다른 글
[NestJS] interface, type, class 도입기 (언제 뭘쓰지??) (1) | 2025.01.22 |
---|---|
JavaScript/TypeScript의 for...of와 for await...of 완벽 가이드 (0) | 2024.11.27 |
NestJS JWT 인증 시 발생하는 'secretOrPrivateKey must have value' 에러 완벽 해결하기 🔐 (1) | 2024.11.21 |
TypeORM에서 insert() 사용 시 @BeforeInsert가 동작하지 않는 문제 해결하기 🤔 (0) | 2024.11.20 |
NestJS 설정 관리의 진화: nestjs-library-config 도입기 (4) | 2024.11.18 |
댓글