본문 바로가기
개발/프로젝트-식당 웨이팅 앱 FOOD LINE

웨이팅API에서 동시성제어 구현하기 1

by 코딩하는짱구 2023. 8. 9.
반응형

식당 웨이팅 시스템은 여러 고객들의 예약 및 대기 리스트를 관리해야 한다. 동시에 여러 고객이 예약하거나 대기 목록에 추가되는 경우, 데이터의 일관성을 유지하고 예약 가능한 시간 또는 자리를 정확하게 추적하기 위해 동시성 제어가 필요하다.

 

✔Bullqueue를 사용하여 웨이팅 동시성 제어하기!

 

 

 

Bullqueue란 무엇인가?

Bull Queue는 백그라운드 작업 처리를 위한 분산 큐 시스템으로, Redis 데이터베이스를 기반으로 동작한다. 기본적인 원리는, Producer(생산자)가 Message를 Queue에 넣어두면, Consumer가 Message를 가져와 처리하는 방식이다. 

 

Bullqueue의 작동원리

  1. 프로듀서(Producer) : 작업을 생성하고 bull Queue에 추가하는 역할, 이 작업은 큐에 대기하며 작업에 대한 정보와 실행할 함수 등이 포함
  2. 큐(Queue) : 작업을 관리하고 저장하는 공간, 큐는 Redis db를 기반으로 동작, 프로듀서가 추가한 작업들이 여기에 저장된다
  3. 워커(Worker) = 컨슈머(Consumer) : 큐에서 작업을 추출하고 실행하는 역할, 워커는 '백그라운드'에서 실행되며 큐에서 작업을 가져와 지정된 함수를 실행하여 실제 작업을 처리한다

이로써 백그라운드에서 '비동기적'으로 작업을 처리한다

 

프로젝트 내 Bullqueue의 적용범위

웨이팅조회, 변경, 취소 등 동시성제어가 필요하지 않은 부분은 제외하고, 핵심적인 웨이팅 신청과 관리에 적용

 

Producer

waitings.module.ts - Bull Module을 import 하고, 추가적으로 큐의 이름을 'waitingQueue'로 지정하여 등록

import { Module } from '@nestjs/common';
.
.
.이하 생략 

const result = config();
if (result.error) {
  throw result.error;
}

const redisOptions4: RedisOptions = {
  host: process.env.EC2_REDIS_HOST,
  port: Number(process.env.EC2_REDIS_PORT),
  password: process.env.EC2_REDIS_PASSWORD,
};

@Module({
  imports: [
    TypeOrmModule.forFeature([Waitings, Stores, Reviews], {
      type: 'spanner',
      maxQueryExecutionTime: 50000,
    }),
    AuthModule,
    ScheduleModule.forRoot(),
    BullModule.forRoot({
      redis: redisOptions4,
    }),
    BullModule.registerQueue({
      name: 'waitingQueue',
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: true,
      },
      limiter: { max: 10, duration: 300 },
    }),
    CustomCacheModule,
  ],
  controllers: [WaitingsController],
  providers: [
    WaitingsService,
    WaitingsRepository,
    StoresRepository,
    WaitingConsumer,
    ReviewsRepository,
  ],
})
export class WaitingsModule {}

 

waitings.service.ts - 작업을 queue에 add하는 부분

import { ReviewsRepository } from './../reviews/reviews.repository';
.
.생략
import { Queue } from 'bull';
import { Redis } from 'ioredis';

@Injectable()
export class WaitingsService {
  constructor(
    @InjectRedis('ec2redis') private readonly redisClient: Redis,
    @InjectQueue('waitingQueue')
    private waitingQueue: Queue,
    private waitingsRepository: WaitingsRepository,
    private storesRepository: StoresRepository,
    private reviewsRepository: ReviewsRepository,
  ) {}
  
    //대기열 추가
  async postWaitings(
    storeId: number,
    peopleCnt: number,
    user: Users,
  ): Promise<string> {
    if (peopleCnt > 4) {
      throw new BadRequestException('최대 4명까지 신청할 수 있습니다');
    }
    const existsStore = await this.storesRepository.findStoreById(storeId);
    if (!existsStore) {
      throw new NotFoundException('음식점이 존재하지 않습니다');
    }
    let storeHash = await this.redisClient.hgetall(`store:${storeId}`);

    if (Object.keys(storeHash).length === 0) {
      const rating = await this.reviewsRepository.getAverageRating(storeId);
      storeHash = {
        maxWaitingCnt: `${existsStore.maxWaitingCnt}`,
        currentWaitingCnt: '0',
        cycleTime: `${existsStore.cycleTime}`,
        tableForTwo: `${existsStore.tableForTwo}`,
        tableForFour: `${existsStore.tableForFour}`,
        availableTableForTwo: `${existsStore.tableForTwo}`,
        availableTableForFour: `${existsStore.tableForFour}`,
        rating: `${rating}`,
      };
      await this.redisClient.hset(`store:${storeId}`, storeHash);
    }

    const peopleCntForTables =
      peopleCnt <= 2
        ? Number(storeHash.availableTableForTwo)
        : Number(storeHash.availableTableForFour);

    // if (
    //   Number(peopleCntForTables) !== 0 &&
    //   !Number(storeHash.currentWaitingCnt)
    // ) {
    //   throw new ConflictException('해당 인원수는 바로 입장하실 수 있습니다');
    // }

    console.log('storeHash.maxWaitingCnt:', storeHash.maxWaitingCnt);
    console.log('storeHash.currentWaitingCnt:', storeHash.currentWaitingCnt);
    if (
      Number(storeHash.maxWaitingCnt) <= Number(storeHash.currentWaitingCnt)
    ) {
      throw new ConflictException('최대 웨이팅 수를 초과했습니다');
    }

    const existsUser = await this.waitingsRepository.getWaitingByUser(user);
    if (existsUser) {
      throw new ConflictException('이미 웨이팅을 신청하셨습니다');
    }

    try {
      const job = await this.waitingQueue.add('postWaitingWithRedis', {
        storeId,
        peopleCnt,
        user,
      });
      const finished = await job.finished();
      return finished;
    } catch (err) {
      throw new InternalServerErrorException('대기열 추가에 실패했습니다');
    }
  }

 

Consumer

queue에 추가된 작업을 처리하는 클래스를 생성한다

waitings.consumer.ts

import { ConflictException } from '@nestjs/common';
import { Redis } from 'ioredis';
import { WaitingsRepository } from './waitings.repository';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { EventEmitter } from 'events';
import { InjectRedis } from '@liaoliaots/nestjs-redis';
EventEmitter.defaultMaxListeners = 100;
@Processor('waitingQueue')
export class WaitingConsumer {
  constructor(
    @InjectRedis('ec2redis') private readonly redisClient: Redis,
    private readonly waitingsRepository: WaitingsRepository,
  ) {}

  @Process('patchToDelayed')
  async patchToDelayed(job: Job): Promise<void> {
    const { storeId, waitingId } = job.data;
    console.log(`${job.id}의 작업을 수행하였습니다`);
    try {
      await this.waitingsRepository.patchToDelayed(storeId, waitingId);
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  // with Redis
  @Process('getCurrentWaitingCntInRedis')
  async getCurrentWaitingCntInRedis(job: Job): Promise<number> {
    const storeId = job.data;
    console.log(`${job.id}번의 작업을 수행하였습니다`);
    try {
      const currentWaitingCnt = await this.redisClient.hget(
        `store:${storeId}`,
        'currentWaitingCnt',
      );
      if (currentWaitingCnt) return Number(currentWaitingCnt);
      else return 0;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }
  @Process('getStoreHashesFromRedis')
  async getStoreHashesFromRedis(job: Job): Promise<any> {
    const storeId = job.data;
    try {
      return await this.redisClient.hgetall(`store:${storeId}`);
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  // one add at one api

  @Process('postWaitingWithRedis')
  async postWaitingWithRedis(job: Job): Promise<void> {
    const { storeId, peopleCnt, user } = job.data;
    try {
      const storeHashes = await this.redisClient.hgetall(`store:${storeId}`);
      if (
        Number(storeHashes.maxWaitingCnt) <=
        Number(storeHashes.currentWaitingCnt)
      ) {
        throw new ConflictException('최대 웨이팅 수를 초과했습니다');
      }
      await this.waitingsRepository.postWaitings(storeId, peopleCnt, user);
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        1,
      );
      console.log(`${job.id}의 작업을 수행하였습니다`);
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  @Process('addStoreHashAndPostEntered')
  async addStoreHashAndPostEntered(job: Job): Promise<void> {
    const { storeId, userId, peopleCnt, ...data } = job.data;
    try {
      await this.redisClient.hset(`store:${storeId}`, data);
      await this.waitingsRepository.postEntered(storeId, userId, peopleCnt);
      console.log(`${job.id}의 작업을 수행하였습니다`);
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  @Process('exitedAndIncrementTable')
  async patchToExitedAndIncrementTable(job: Job): Promise<void> {
    const { storeId, peopleCnt, waitingId } = job.data;
    try {
      await this.waitingsRepository.patchToExited(storeId, waitingId);
      let availableTable: string;
      if (peopleCnt == 1 || peopleCnt == 2) {
        availableTable = 'availableTableForTwo';
      } else {
        availableTable = 'availableTableForFour';
      }
      await this.redisClient.hincrby(`store:${storeId}`, availableTable, 1);
      console.log(`${job.id}의 작업을 수행하였습니다`);
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  @Process('enteredAndDecrementCnts')
  async enteredAndDecrementCnts(job: Job): Promise<void> {
    const { storeId, waitingId, status, peopleCnt } = job.data;
    try {
      await this.waitingsRepository.patchToEntered(storeId, waitingId, status);
      let availableTable: string;
      if (peopleCnt == 1 || peopleCnt == 2) {
        availableTable = 'availableTableForTwo';
      } else {
        availableTable = 'availableTableForFour';
      }
      await this.redisClient.hincrby(`store:${storeId}`, availableTable, -1);
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        -1,
      );
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  @Process('canceledAndDecrementWaitingCnt')
  async canceledAndDecrementWaitingCnt(job: Job): Promise<void> {
    const { storeId, waitingId } = job.data;
    try {
      await this.waitingsRepository.patchToCanceled(storeId, waitingId);
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        -1,
      );
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

  @Process('saveNoshowAndDecrementWaitingCnt')
  async saveNoshowAndDecrementWaitingCnt(job: Job): Promise<void> {
    const { entity } = job.data;
    try {
      await this.waitingsRepository.saveNoshow(entity);
      const storeId = entity.StoreId;
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        -1,
      );
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }
}

 

app.module.ts ==> 여기에 등록해줘야하는 것 아닌가?

import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
import { AuthModule } from './auth/auth.module';
import { StoresModule } from './stores/stores.module';
import { WaitingsModule } from './waitings/waitings.module';
import { ReviewsModule } from './reviews/reviews.module';
import { TypeOrmModule } from '@nestjs/typeorm';
import { typeORMConfig } from './configs/typeorm.config';
import { APP_GUARD } from '@nestjs/core';
import { AccessTokenGuard } from './auth/guards';
import { LoggerMiddleware } from './middlewares/logger.middleware';
import { CustomRedisModule } from './redis/custom-redis.module';
import { config } from 'dotenv';
import { CustomCacheModule } from './cache/cache.module';
import { AppController } from './app/app.controller';

const result = config();
if (result.error) {
  throw result.error;
}

@Module({
  imports: [
    TypeOrmModule.forRoot(typeORMConfig),
    AuthModule,
    StoresModule,
    WaitingsModule,
    ReviewsModule,
    CustomRedisModule,
    CustomCacheModule,
  ],
  providers: [
    {
      provide: APP_GUARD, // APP_GUARD: 애플리케이션의 전역 가드를 설정하는 토큰
      useClass: AccessTokenGuard, // AccessTokenGuard를 APP_GUARD에 등록
    },
  ],
  controllers: [AppController],
})
export class AppModule implements NestModule {
  configure(consumer: MiddlewareConsumer) {
    consumer.apply(LoggerMiddleware).forRoutes('*');
  }
}

 

결론

bullqueue의 consumer로 작업queue가 들어오는 순서는 보장이 되지만 완료시점을 보장할 수 없으므로 순서가 뒤바뀐다던지 max waiting cnt를 넘어서까지 예약이 되는 문제가 발생했다. 다수의 consumer가 존재할 경우와 처리속도가 매우 빠르면 완료하는 순간까지 보장하기는 힘들기 때문에 lock을 설정해주는 것이 최종적인 동시성제어라고 할 수 있기에 방법을 찾아야했다.

**디비 트랜잭션이란

하나이상의 데이터베이스 작업을 논리적으로 묶어서 실행하는 개념, 이러한 묶음은 ACID라고 불리는 속성들을 보장하며 데이터 베이스에서 안전하고 신뢰할 수 있는 작업을 수행하는데 중요한 역할을 한다.

  1. 원자성 (Atomicity) : 트랜잭션 내의 모든 작업은 하나의 논리적 단위로 간주되며, 모든 작업이 성공적으로 완료되거나 아무런 작업업도 수행되지 않은 것 처럼 보장. 만약 트랜잭션 내의 어느 한 작업이 실패하면, 트랜잭션 전체가 롤백되어 이전상태로 복원된다. 
  2. 일관성 (Consistency) : 트랜잭션이 실행된 후에도 데이터 베이스는 일관된 상태를 유지해야한다. 
  3. 격리성 (Isolation) : 동시에 여러 트랜잭션이 실행될 때 각 트랜잭션은 다른 트랜잭션에 영향을 주지 않도록 격리된다.
  4. 지속성 (Durability) : 트랜잭션이 성공적으로 완료되면 해당 변경사항은 영구적으로 유지되어야한다.

 

 

추천글

https://overcome-the-limits.tistory.com/673

 

[Project] 프로젝트 삽질기9 (feat Queue, bull)

들어가며 사이드 프로젝트에서 푸시 알림을 활용한 서비스를 개발하고 있습니다. 그 과정에서 생각하고 배웠던 점들을 하나씩 작성하고자 합니다. 먼저 푸시 알림 서비스를 구축하려면, Queue를

overcome-the-limits.tistory.com

 

https://donis-note.medium.com/nestjs-redis-message-queue-%EA%B5%AC%ED%98%84%ED%95%B4%EB%B3%B4%EA%B8%B0-4a0da04f57b7

 

[NestJS] Redis Message Queue 구현해보기

Bull Queue를 이용한 메시지큐 튜토리얼

donis-note.medium.com

 

https://heowc.tistory.com/35

 

메시지 큐(Message Queue) 훑어보기

이번글은 메시지 큐에 대한 개념과 여러가지 미들웨어를 훑어보기 위한 글 입니다. 웹 서버를 구성하게 되면 성능에 대한 고려는 빼먹을 수 없습니다. 데이터 처리를 하다보면 너무 많은 처리로

heowc.tistory.com

 

반응형