식당 웨이팅 시스템은 여러 고객들의 예약 및 대기 리스트를 관리해야 한다. 동시에 여러 고객이 예약하거나 대기 목록에 추가되는 경우, 데이터의 일관성을 유지하고 예약 가능한 시간 또는 자리를 정확하게 추적하기 위해 동시성 제어가 필요하다.
✔Bullqueue를 사용하여 웨이팅 동시성 제어하기!
Bullqueue란 무엇인가?
Bull Queue는 백그라운드 작업 처리를 위한 분산 큐 시스템으로, Redis 데이터베이스를 기반으로 동작한다. 기본적인 원리는, Producer(생산자)가 Message를 Queue에 넣어두면, Consumer가 Message를 가져와 처리하는 방식이다.
Bullqueue의 작동원리
- 프로듀서(Producer) : 작업을 생성하고 bull Queue에 추가하는 역할, 이 작업은 큐에 대기하며 작업에 대한 정보와 실행할 함수 등이 포함
- 큐(Queue) : 작업을 관리하고 저장하는 공간, 큐는 Redis db를 기반으로 동작, 프로듀서가 추가한 작업들이 여기에 저장된다
- 워커(Worker) = 컨슈머(Consumer) : 큐에서 작업을 추출하고 실행하는 역할, 워커는 '백그라운드'에서 실행되며 큐에서 작업을 가져와 지정된 함수를 실행하여 실제 작업을 처리한다
이로써 백그라운드에서 '비동기적'으로 작업을 처리한다
프로젝트 내 Bullqueue의 적용범위
웨이팅조회, 변경, 취소 등 동시성제어가 필요하지 않은 부분은 제외하고, 핵심적인 웨이팅 신청과 관리에 적용
Producer
waitings.module.ts - Bull Module을 import 하고, 추가적으로 큐의 이름을 'waitingQueue'로 지정하여 등록
import { Module } from '@nestjs/common';
.
.
.이하 생략
const result = config();
if (result.error) {
throw result.error;
}
const redisOptions4: RedisOptions = {
host: process.env.EC2_REDIS_HOST,
port: Number(process.env.EC2_REDIS_PORT),
password: process.env.EC2_REDIS_PASSWORD,
};
@Module({
imports: [
TypeOrmModule.forFeature([Waitings, Stores, Reviews], {
type: 'spanner',
maxQueryExecutionTime: 50000,
}),
AuthModule,
ScheduleModule.forRoot(),
BullModule.forRoot({
redis: redisOptions4,
}),
BullModule.registerQueue({
name: 'waitingQueue',
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
limiter: { max: 10, duration: 300 },
}),
CustomCacheModule,
],
controllers: [WaitingsController],
providers: [
WaitingsService,
WaitingsRepository,
StoresRepository,
WaitingConsumer,
ReviewsRepository,
],
})
export class WaitingsModule {}
waitings.service.ts - 작업을 queue에 add하는 부분
import { ReviewsRepository } from './../reviews/reviews.repository';
.
.생략
import { Queue } from 'bull';
import { Redis } from 'ioredis';
@Injectable()
export class WaitingsService {
constructor(
@InjectRedis('ec2redis') private readonly redisClient: Redis,
@InjectQueue('waitingQueue')
private waitingQueue: Queue,
private waitingsRepository: WaitingsRepository,
private storesRepository: StoresRepository,
private reviewsRepository: ReviewsRepository,
) {}
//대기열 추가
async postWaitings(
storeId: number,
peopleCnt: number,
user: Users,
): Promise<string> {
if (peopleCnt > 4) {
throw new BadRequestException('최대 4명까지 신청할 수 있습니다');
}
const existsStore = await this.storesRepository.findStoreById(storeId);
if (!existsStore) {
throw new NotFoundException('음식점이 존재하지 않습니다');
}
let storeHash = await this.redisClient.hgetall(`store:${storeId}`);
if (Object.keys(storeHash).length === 0) {
const rating = await this.reviewsRepository.getAverageRating(storeId);
storeHash = {
maxWaitingCnt: `${existsStore.maxWaitingCnt}`,
currentWaitingCnt: '0',
cycleTime: `${existsStore.cycleTime}`,
tableForTwo: `${existsStore.tableForTwo}`,
tableForFour: `${existsStore.tableForFour}`,
availableTableForTwo: `${existsStore.tableForTwo}`,
availableTableForFour: `${existsStore.tableForFour}`,
rating: `${rating}`,
};
await this.redisClient.hset(`store:${storeId}`, storeHash);
}
const peopleCntForTables =
peopleCnt <= 2
? Number(storeHash.availableTableForTwo)
: Number(storeHash.availableTableForFour);
// if (
// Number(peopleCntForTables) !== 0 &&
// !Number(storeHash.currentWaitingCnt)
// ) {
// throw new ConflictException('해당 인원수는 바로 입장하실 수 있습니다');
// }
console.log('storeHash.maxWaitingCnt:', storeHash.maxWaitingCnt);
console.log('storeHash.currentWaitingCnt:', storeHash.currentWaitingCnt);
if (
Number(storeHash.maxWaitingCnt) <= Number(storeHash.currentWaitingCnt)
) {
throw new ConflictException('최대 웨이팅 수를 초과했습니다');
}
const existsUser = await this.waitingsRepository.getWaitingByUser(user);
if (existsUser) {
throw new ConflictException('이미 웨이팅을 신청하셨습니다');
}
try {
const job = await this.waitingQueue.add('postWaitingWithRedis', {
storeId,
peopleCnt,
user,
});
const finished = await job.finished();
return finished;
} catch (err) {
throw new InternalServerErrorException('대기열 추가에 실패했습니다');
}
}
Consumer
queue에 추가된 작업을 처리하는 클래스를 생성한다
waitings.consumer.ts
import { ConflictException } from '@nestjs/common';
import { Redis } from 'ioredis';
import { WaitingsRepository } from './waitings.repository';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { EventEmitter } from 'events';
import { InjectRedis } from '@liaoliaots/nestjs-redis';
EventEmitter.defaultMaxListeners = 100;
@Processor('waitingQueue')
export class WaitingConsumer {
constructor(
@InjectRedis('ec2redis') private readonly redisClient: Redis,
private readonly waitingsRepository: WaitingsRepository,
) {}
@Process('patchToDelayed')
async patchToDelayed(job: Job): Promise<void> {
const { storeId, waitingId } = job.data;
console.log(`${job.id}의 작업을 수행하였습니다`);
try {
await this.waitingsRepository.patchToDelayed(storeId, waitingId);
return;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
// with Redis
@Process('getCurrentWaitingCntInRedis')
async getCurrentWaitingCntInRedis(job: Job): Promise<number> {
const storeId = job.data;
console.log(`${job.id}번의 작업을 수행하였습니다`);
try {
const currentWaitingCnt = await this.redisClient.hget(
`store:${storeId}`,
'currentWaitingCnt',
);
if (currentWaitingCnt) return Number(currentWaitingCnt);
else return 0;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
@Process('getStoreHashesFromRedis')
async getStoreHashesFromRedis(job: Job): Promise<any> {
const storeId = job.data;
try {
return await this.redisClient.hgetall(`store:${storeId}`);
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
// one add at one api
@Process('postWaitingWithRedis')
async postWaitingWithRedis(job: Job): Promise<void> {
const { storeId, peopleCnt, user } = job.data;
try {
const storeHashes = await this.redisClient.hgetall(`store:${storeId}`);
if (
Number(storeHashes.maxWaitingCnt) <=
Number(storeHashes.currentWaitingCnt)
) {
throw new ConflictException('최대 웨이팅 수를 초과했습니다');
}
await this.waitingsRepository.postWaitings(storeId, peopleCnt, user);
await this.redisClient.hincrby(
`store:${storeId}`,
'currentWaitingCnt',
1,
);
console.log(`${job.id}의 작업을 수행하였습니다`);
return;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
@Process('addStoreHashAndPostEntered')
async addStoreHashAndPostEntered(job: Job): Promise<void> {
const { storeId, userId, peopleCnt, ...data } = job.data;
try {
await this.redisClient.hset(`store:${storeId}`, data);
await this.waitingsRepository.postEntered(storeId, userId, peopleCnt);
console.log(`${job.id}의 작업을 수행하였습니다`);
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
@Process('exitedAndIncrementTable')
async patchToExitedAndIncrementTable(job: Job): Promise<void> {
const { storeId, peopleCnt, waitingId } = job.data;
try {
await this.waitingsRepository.patchToExited(storeId, waitingId);
let availableTable: string;
if (peopleCnt == 1 || peopleCnt == 2) {
availableTable = 'availableTableForTwo';
} else {
availableTable = 'availableTableForFour';
}
await this.redisClient.hincrby(`store:${storeId}`, availableTable, 1);
console.log(`${job.id}의 작업을 수행하였습니다`);
return;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
@Process('enteredAndDecrementCnts')
async enteredAndDecrementCnts(job: Job): Promise<void> {
const { storeId, waitingId, status, peopleCnt } = job.data;
try {
await this.waitingsRepository.patchToEntered(storeId, waitingId, status);
let availableTable: string;
if (peopleCnt == 1 || peopleCnt == 2) {
availableTable = 'availableTableForTwo';
} else {
availableTable = 'availableTableForFour';
}
await this.redisClient.hincrby(`store:${storeId}`, availableTable, -1);
await this.redisClient.hincrby(
`store:${storeId}`,
'currentWaitingCnt',
-1,
);
return;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
@Process('canceledAndDecrementWaitingCnt')
async canceledAndDecrementWaitingCnt(job: Job): Promise<void> {
const { storeId, waitingId } = job.data;
try {
await this.waitingsRepository.patchToCanceled(storeId, waitingId);
await this.redisClient.hincrby(
`store:${storeId}`,
'currentWaitingCnt',
-1,
);
return;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
@Process('saveNoshowAndDecrementWaitingCnt')
async saveNoshowAndDecrementWaitingCnt(job: Job): Promise<void> {
const { entity } = job.data;
try {
await this.waitingsRepository.saveNoshow(entity);
const storeId = entity.StoreId;
await this.redisClient.hincrby(
`store:${storeId}`,
'currentWaitingCnt',
-1,
);
return;
} catch (err) {
throw new Error('Redis 연결에 실패했습니다');
}
}
}
app.module.ts ==> 여기에 등록해줘야하는 것 아닌가?
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { AuthModule } from './auth/auth.module';
import { StoresModule } from './stores/stores.module';
import { WaitingsModule } from './waitings/waitings.module';
import { ReviewsModule } from './reviews/reviews.module';
import { TypeOrmModule } from '@nestjs/typeorm';
import { typeORMConfig } from './configs/typeorm.config';
import { APP_GUARD } from '@nestjs/core';
import { AccessTokenGuard } from './auth/guards';
import { LoggerMiddleware } from './middlewares/logger.middleware';
import { CustomRedisModule } from './redis/custom-redis.module';
import { config } from 'dotenv';
import { CustomCacheModule } from './cache/cache.module';
import { AppController } from './app/app.controller';
const result = config();
if (result.error) {
throw result.error;
}
@Module({
imports: [
TypeOrmModule.forRoot(typeORMConfig),
AuthModule,
StoresModule,
WaitingsModule,
ReviewsModule,
CustomRedisModule,
CustomCacheModule,
],
providers: [
{
provide: APP_GUARD, // APP_GUARD: 애플리케이션의 전역 가드를 설정하는 토큰
useClass: AccessTokenGuard, // AccessTokenGuard를 APP_GUARD에 등록
},
],
controllers: [AppController],
})
export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer) {
consumer.apply(LoggerMiddleware).forRoutes('*');
}
}
결론
bullqueue의 consumer로 작업queue가 들어오는 순서는 보장이 되지만 완료시점을 보장할 수 없으므로 순서가 뒤바뀐다던지 max waiting cnt를 넘어서까지 예약이 되는 문제가 발생했다. 다수의 consumer가 존재할 경우와 처리속도가 매우 빠르면 완료하는 순간까지 보장하기는 힘들기 때문에 lock을 설정해주는 것이 최종적인 동시성제어라고 할 수 있기에 방법을 찾아야했다.
**디비 트랜잭션이란
하나이상의 데이터베이스 작업을 논리적으로 묶어서 실행하는 개념, 이러한 묶음은 ACID라고 불리는 속성들을 보장하며 데이터 베이스에서 안전하고 신뢰할 수 있는 작업을 수행하는데 중요한 역할을 한다.
- 원자성 (Atomicity) : 트랜잭션 내의 모든 작업은 하나의 논리적 단위로 간주되며, 모든 작업이 성공적으로 완료되거나 아무런 작업업도 수행되지 않은 것 처럼 보장. 만약 트랜잭션 내의 어느 한 작업이 실패하면, 트랜잭션 전체가 롤백되어 이전상태로 복원된다.
- 일관성 (Consistency) : 트랜잭션이 실행된 후에도 데이터 베이스는 일관된 상태를 유지해야한다.
- 격리성 (Isolation) : 동시에 여러 트랜잭션이 실행될 때 각 트랜잭션은 다른 트랜잭션에 영향을 주지 않도록 격리된다.
- 지속성 (Durability) : 트랜잭션이 성공적으로 완료되면 해당 변경사항은 영구적으로 유지되어야한다.
추천글
https://overcome-the-limits.tistory.com/673
'개발 > 프로젝트-식당 웨이팅 앱 FOOD LINE' 카테고리의 다른 글
[Typescript] TS로 식당 웨이팅 앱을 개발한 이유 (0) | 2024.01.16 |
---|---|
웨이팅 API에서 동시성제어 구현하기 2 (0) | 2023.08.15 |
230703 실전프로젝트25 [PostgreSQL] 프로젝트 기술적 의사결정 (0) | 2023.07.03 |
230628 실전프로젝트24 [Nest.js] 애플리케이션 최종 테스트 (0) | 2023.06.28 |
230627 실전프로젝트23 [Nest.js] Jmeter로 부하테스트하기4 (0) | 2023.06.28 |