본문 바로가기
개발/프로젝트-식당 웨이팅 앱 FOOD LINE

웨이팅 API에서 동시성제어 구현하기 2

by 코딩하는짱구 2023. 8. 15.
반응형

Bullqueue 의 적용 범위와 load balancing 과의 병합 때문에 많은 trouble 을 겪은 bullqueue 였지만 요청 처리를 시작하는 기준을 bullqueue 로 제어한다고 하여도, 처리가 끝나는 latency 가 존재하기 때문에 데이터의 무결성, 안정성을 확실하게 보장하기 위해서는 Lock 등 처리를 하는 것이 좋다는 결론에 도달했다.

 

웨이팅 API에서 동시성제어 구현하기 2

 

웨이팅 API에서 동시성제어 구현하기 2

1. 문제상황 1

2. 문제해결

3. 문제상황 2

4. 문제해결

관련글

위의 목차를 클릭하면 해당 글로 자동 이동 합니다.

 

👉문제상황 1

Bullqueue 의 적용 범위와 message queue add 갯수의 문제

기존코드 문제점

  • 하나의 요청에서 두개 이상의 queue.add 수행, redis cloud를 참조하는 횟수가 늘어나 latency  손해
  • 두 개 이상의 consumer를 설정하게 되었을 때 두개의 queue가 각각 다른 node에서 처리되어 관리 불가
waitings.service.ts

async postWaiting(storeId: number, peopleCnt: number, user: User) : Promise<void> {
	await this.waitingQueue.add('postWaiting', {storeId, peopleCnt, user})
	await this.waitingQueue.add('incrementCurrentWaitingCnt', {storeId, peopleCnt, user}
}
waitings.consumer.ts

@Process('postWaiting')
async postWaiting(job:Job) : Promise<void> {
	const {storeId, peopleCnt, user} = job.data
	this.waitingRepository.postWaitings(storeId, peopleCnt, user)
}

@Process('incrementCurrentWaitingCnt')
async incrementCurrentWaitingCnt(job:Job) : Promise<void> {
	const {storeId, peopleCnt, user} = job.data
	await this.redisClient.hincrby(`store:${storeId}`,'currentWaitingCnt', 1);
}

 

☑문제해결

하나의 API에서 한번의 add를 통해 처리하도록 코드 수정

//waitings.service.ts

async postWaiting(storeId: number, peopleCnt: number, user: User) : Promise<void> {
	await this.waitingQueue.add('postWaitingAndIncrementCurrentWaitingCnt', 
	{storeId, peopleCnt, user})
}
//waitings.consumer.ts

@Process('postWaitingAndIncrementCurrentWaitingCnt')
  async postWaitingWithRedis(job: Job): Promise<void> {
    const { storeId, peopleCnt, user } = job.data;
    try {
      const storeHashes = await this.redisClient.hgetall(`store:${storeId}`);
      if (
        Number(storeHashes.maxWaitingCnt) <=
        Number(storeHashes.currentWaitingCnt)
      ) {
        throw new ConflictException('최대 웨이팅 수를 초과했습니다');
      }
      await this.waitingsRepository.postWaitings(storeId, peopleCnt, user);
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        1,
      );
      console.log(`${job.id}의 작업을 수행하였습니다`);
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

 

👉문제상황 2

기존코드 문제점

  • consumer가 복수존재하는 상황에서 load balancing 하는 경우 순서가 보장되지 않음
  • bullqueue가 적용되어 있음에도 불구하고 maxWaitingCnt를 초과하여 등록됌, 분산된 처리속도가 너무 빠르기 때문에 error 처리전에 등록되어 버리는 것 
// waitings.module.ts

@Module({
  imports: [
    TypeOrmModule.forFeature([Waitings, Stores, Reviews], {
      type: 'spanner',
      maxQueryExecutionTime: 50000,
    }),
    AuthModule,
    ScheduleModule.forRoot(),
    BullModule.forRoot({
      redis: redisOptions4,
    }),
    BullModule.registerQueue({
      name: 'waitingQueue',
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: true,
      },
    }),
    CustomCacheModule,
  ],
  controllers: [WaitingsController],
  providers: [
    WaitingsService,
    WaitingsRepository,
    StoresRepository,
    WaitingConsumer,
    ReviewsRepository,
  ],
})
export class WaitingsModule {}
// waitings.service.ts

async postWaitings(
    storeId: number,
    peopleCnt: number,
    user: Users,
  ): Promise<string> {
    if (peopleCnt > 4) {
      throw new BadRequestException('최대 4명까지 신청할 수 있습니다');
    }
    const existsStore = await this.storesRepository.findStoreById(storeId);
    if (!existsStore) {
      throw new NotFoundException('음식점이 존재하지 않습니다');
    }
    let storeHash = await this.redisClient.hgetall(`store:${storeId}`);

    if (Object.keys(storeHash).length === 0) {
      const rating = await this.reviewsRepository.getAverageRating(storeId);
      storeHash = {
        maxWaitingCnt: `${existsStore.maxWaitingCnt}`,
        currentWaitingCnt: '0',
        cycleTime: `${existsStore.cycleTime}`,
        tableForTwo: `${existsStore.tableForTwo}`,
        tableForFour: `${existsStore.tableForFour}`,
        availableTableForTwo: `${existsStore.tableForTwo}`,
        availableTableForFour: `${existsStore.tableForFour}`,
        rating: `${rating}`,
      };
      await this.redisClient.hset(`store:${storeId}`, storeHash);
    }

    const peopleCntForTables =
      peopleCnt <= 2
        ? Number(storeHash.availableTableForTwo)
        : Number(storeHash.availableTableForFour);

    // if (
    //   Number(peopleCntForTables) !== 0 &&
    //   !Number(storeHash.currentWaitingCnt)
    // ) {
    //   throw new ConflictException('해당 인원수는 바로 입장하실 수 있습니다');
    // }

    if (
      Number(storeHash.maxWaitingCnt) <= Number(storeHash.currentWaitingCnt)
    ) {
      throw new ConflictException('최대 웨이팅 수를 초과했습니다');
    }

    const existsUser = await this.waitingsRepository.getWaitingByUser(user);
    if (existsUser) {
      throw new ConflictException('이미 웨이팅을 신청하셨습니다');
    }

    try {
      const job = await this.waitingQueue.add('postWaitingWithRedis', {
        storeId,
        peopleCnt,
        user,
      });
      const finished = await job.finished();
      return finished;
    } catch (err) {
      throw new InternalServerErrorException('대기열 추가에 실패했습니다');
    }
  }
// waiting.consumer.ts

@Process('postWaitingWithRedis')
  async postWaitingWithRedis(job: Job): Promise<void> {
    const { storeId, peopleCnt, user } = job.data;
    try {
      await this.waitingsRepository.postWaitings(storeId, peopleCnt, user);
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        1,
      );
      console.log(`${job.id}의 작업을 수행하였습니다`);
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

 

☑문제해결

*Redis spin lock의 원리는?

다른 작업이 해당 락을 얻을 때까지 반복적으로 lock 점유를 시도하는 방식으로 작동, 스핀 락은 주로 리소스에 대한 접근 시간이 짧고 락을 기다리는 시간이 예측 가능한 경우에 유용

쓰레드1 락 획득 시도-> 이미 다른 쓰레드가 보유 -> 쓰레드1은 계속해서 루프를 돌며(스핀) 락을 기다림 -> 락 해제 -> 다른 쓰레드가 접근

무한루프에 빠지지 않도록 스핀 락 횟수를 조정해주었다. 

// lock.service.ts

import { InjectRedis } from '@liaoliaots/nestjs-redis';
import { Injectable } from '@nestjs/common';
import { Redis } from 'ioredis';

@Injectable()
export class LockService {
  constructor(@InjectRedis('ec2redis') private readonly redisClient: Redis) {}

  async acquireLock(key: string): Promise<boolean> {
    const acquired = await this.redisClient.set(key, 'locked', 'NX');
    return acquired !== null;
  }

  async releaseLock(key: string): Promise<boolean> {
    const released = await this.redisClient.del(key);
    return released > 0;
  }
}
  • acquired를 'NX' 로 세팅하여 이미 locked가 존재할 경우 null을 반환, 아닐 경우 OK를 반환하는 속성을 이용하여 acquire를 boolean으로 반환한다.
// waiting.consumer.ts

@Process('postWaitingWithRedis')
  async postWaitingWithRedis(job: Job): Promise<void> {
    const { storeId, peopleCnt, user } = job.data;
    const lockKey = `lock:${storeId}`;
    const maxRetryAttempts = 10;
    const retryDelay = 100;
    const storeHashes = await this.redisClient.hgetall(`store:${storeId}`);
    if (
      Number(storeHashes.maxWaitingCnt) <= Number(storeHashes.currentWaitingCnt)
    ) {
      throw new ConflictException('최대 웨이팅 수를 초과했습니다');
    }
    try {
      const lockService = new LockService(this.redisClient);

      const tryAcquireLock = async (attempt = 1): Promise<boolean> => {
        const acquireLock = await lockService.acquireLock(lockKey);
        if (acquireLock) {
          return true;
        }
        if (attempt < maxRetryAttempts) {
          await new Promise((resolve) => setTimeout(resolve, retryDelay));
          return tryAcquireLock(attempt + 1);
        }
        return false;
      };

      const acquireLock = await tryAcquireLock();
      if (!acquireLock) {
        await lockService.releaseLock(lockKey);
        throw new Error('락을 확보하지 못했습니다.');
      }

      const storeHashes = await this.redisClient.hgetall(`store:${storeId}`);
      if (
        Number(storeHashes.maxWaitingCnt) <=
        Number(storeHashes.currentWaitingCnt)
      ) {
        await lockService.releaseLock(lockKey);
        throw new ConflictException('최대 웨이팅 수를 초과했습니다');
      }
      await this.waitingsRepository.postWaitings(storeId, peopleCnt, user);
      await this.redisClient.hincrby(
        `store:${storeId}`,
        'currentWaitingCnt',
        1,
      );
      await lockService.releaseLock(lockKey);
      console.log(`${job.id}의 작업을 수행하였습니다`);
      return;
    } catch (err) {
      throw new Error('Redis 연결에 실패했습니다');
    }
  }

 

  • acquireLock이 boolean 으로 반환되는 값을 이용하여 lock:${storeId} 를 찾지 못할 경우 설정된 retryDelay 와 maxTryAttempts 를 반영하여 스핀 락을 설정하여 주었다.
  • 설정한 스핀 락을 다 돌 때까지 Lock 을 확보하지 못할 경우 Error 를 반환하고 종료되고, Lock 을 확보하면 로직이 진행되고 releaseLock 을 통해 다시 Lock 을 지우는 로직이다.

Bullqueue 의 적용 범위와 load balancing 과의 병합 때문에 많은 trouble 을 겪은 bullqueue 였지만 요청 처리를 시작하는 기준을 bullqueue 로 제어한다고 하여도, 처리가 끝나는 latency 가 존재하기 때문에 데이터의 무결성, 안정성을 확실하게 보장하기 위해서는 Lock 등 처리를 하는 것이 좋다는 결론에 도달했다. ==> DB Transaction 구현

 

관련글

2023.08.09 - [실전프로젝트] - 웨이팅API에서 동시성제어 구현하기 1

 

웨이팅API에서 동시성제어 구현하기 1

식당 웨이팅 시스템은 여러 고객들의 예약 및 대기 리스트를 관리해야 한다. 동시에 여러 고객이 예약하거나 대기 목록에 추가되는 경우, 데이터의 일관성을 유지하고 예약 가능한 시간 또는 자

veritas-crystal.tistory.com

 

반응형