Google 태그 관리자 아이콘

새로운 로직접근

.findout(Redis를 이용해 CyclicBarrier만들어보기)

silvergoni 2020. 12. 21. 21:32
반응형

고민

  • 물리적/논리적으로 분리되어 있는 서버에서 특정 request가 몇번 들어올걸 체크하고 그 이후에 로직이 실행되었으면 좋겠다고 생각했다.

  • 프로세스를 진행을 특정 request수가 총족될때까지 대기하는 시스템을 만들고 싶었다.

  • 처음에는 이런건 메세징시스템에서 가능할거라 생각했지만 잘 아이디어가 안떠올랐고 지식이 없었다.

  • 그래서 익숙한 redis를 사용해보기로 했다.

결론

  • 자바에서는 스레드를 제어하는 방식이 여러개 있는데(CountDownLatch, CyclicBarrier, Phaser 등) 여기서 아이디어를 얻어서 Redis로 구현을 성공하였다.
  • CyclicBarrier가 각 스레드에서 대기했다가 특정시점에 다시 시작되도록 하는 로직이어서 그렇게 구현했다.

접근방법

  • 기본적으로 대기할 request 숫자를 갖고 있게 하고 한번 request가 올때마다 감소시켜서 0이되면 완료로 본다.
    • parties는 전체 카운트다운될 숫자
    • redisCacheKey는 레디스에 사용될 유니크한 키라고 생각하자.
        public class CyclicBarrierRedisTemplate {
        public void setBarrier(RedisCacheable redisCacheKey, int parties) {
            String readyCacheKey = addReadySuffix(redisCacheKey);
            redisTemplate.opsForValue().set(readyCacheKey, parties, redisCacheKey.getDuration());
            String finishCacheKey = addFinishSuffix(redisCacheKey);
            redisTemplate.opsForValue().set(finishCacheKey, parties, redisCacheKey.getDuration());
        }
  • setBarrier에서 카운트할 숫자와 키를 등록한다.

    • 이때 readyCacheKey, finishCacheKey를 두개 등록하는데 이유는 아래서 설명한다.
  • 이때 duration을 함께 설정해 특정시간이 나면 자동으로 실패하게 한다.

    public void cancelBarrier(RedisCacheable redisCacheKey) {
      String readyCacheKey = addReadySuffix(redisCacheKey);
      String finishCacheKey = addFinishSuffix(redisCacheKey);
    
      redisTemplate.delete(readyCacheKey);
      redisTemplate.delete(finishCacheKey);
    }
  • 혹시 뭔가 잘못되었을때는 cancelBarrier를 통해 키를 지워주면 된다.

    public void await(RedisCacheable redisCacheKey, Duration timeout, Duration period) {  
      String readyCacheKey = addReadySuffix(redisCacheKey);  
      String finishCacheKey = addFinishSuffix(redisCacheKey);
    
      if (redisTemplate.hasKey(readyCacheKey)) {
          Long remain = redisTemplate.opsForValue().decrement(readyCacheKey);
          log.info("남아있는 웨이팅 갯수: " + remain + "개");
      } else {
          throw new Exceptino("Has no key.");
      }
    
      redisTemplate.expire(readyCacheKey, timeout);
      redisTemplate.expire(finishCacheKey, timeout);
    
      LocalDateTime startTime = LocalDateTime.now();
      for(;;) {
          LocalDateTime maxWaitTime = startTime.plusSeconds(timeout.getSeconds());
          if (LocalDateTime.now().isBefore(maxWaitTime)) {
              if (redisTemplate.hasKey(readyCacheKey)) {
                  int countdown = (int)redisTemplate.opsForValue().get(readyCacheKey);
                  if (countdown == 0) {
                      try {
                          int finishCountdown = redisTemplate.opsForValue().decrement(finishCacheKey).intValue();
                          if (finishCountdown == 0) {
                              redisTemplate.delete(readyCacheKey);
                              redisTemplate.delete(finishCacheKey);
                              log.info("정상적으로 CyclicBarrier의 키도 삭제되었습니다. readyCacheKey: " + readyCacheKey, ", finishCacheKey: " + finishCacheKey);
                          }
                      } catch (Exception e) {
                          log.error("CyclicBarrier의 키 삭제가 필요합니다. readyCacheKey: " + readyCacheKey, ", finishCacheKey: " + finishCacheKey);
                      }
                      return;
                  }
    
                  try {
                      Thread.sleep(period.toMillis());
                  } catch (InterruptedException e) {
                  }
              } else {
                  throw new RuntimeException("해당 키가 없어 카운트다운을 지속할 수 없습니다. readyCacheKey: " + readyCacheKey);
              }
          } else {
              throw new RuntimeException("시간초과로 실패하였습니다. readyCacheKey: " + readyCacheKey + ", timeout: " + timeout.getSeconds());
          }
      }
    }
  • CyclicBarrier답게 await 메소드가 실행될때 카운트가 다운되고 기다리게 된다.

    • 일단 readyCacheKey가 있는지 확인한다. 시간이 지나거나 특별한 이유로 취소되어 삭제되면 애초에 대기를 할 필요가 없어지기 때문이다.
    • await가 실행되면 시간을 다시 duration만큼 세팅해줘서 생명연장을 해준다.
    • startTime으로 시작시간을 체크하고 timeout을 더한 maxWaitTime을 구하는데 이 maxWaitTime시간이 지나면 시간초과 실패를 낸다.
    • maxWaitTime시간 안이라면 (int)redisTemplate.opsForValue().get(readyCacheKey)으로 카운트를 가져와서 0인지 아닌지 체크한다.
      • 0 이면 정상적으로 완료가 된것으로 본다.
      • 0 이 아니면 period만큼 기다렸다가 다시 시도한다. 너무 많은 호출을 꺼릴때 period를 적정하게 설정해준다.
      • 중간에라도 키가 없어지면 실패를 떨어뜨린다.
  • 0이 되어 성공할때 finishCacheKey를 드디어 사용한다.

    • 모두 성공했기때문에 키를 삭제하는 과정이나 혹은 모두 성공한 시점에 한번만 호출되기를 바라는 모든것을 위해 finishCacheKey가 사용된다.
      • 여기서는 삭제시점을 finishCacheKey가 0이 되는 시점으로 잡았다.

참고

  • CountDownLatch, CylicBarrier에 대해서 한번 찾아서 보면 된다.
    • CountDownLatch는 메인스레드가 있고 다른 스레드에서 특정 카운트를 채우면 다시 메인스레드로 넘어가는 방식이고 CylicBarrier는 각 스레드에서 특정 카운트를 채우기를 대기했다가 채우면 그때 다시 진행되는 차이가 있다.