개발/Messaging system

SQS with Spring

BEBONG 2018. 10. 31. 23:35

Background

이전 아티클에서 RabbitMQ 를 Kafka 로 마이그레이션하다가 비즈니스 성격상 맞지 않아 중단했습니다. 
그래서 새로운 대체 메시징 시스템이 필요했습니다. 
결과적으로 SQS를 사용하게됬는데,  그 경험을 정리하고자 합니다.  


Why SQS?

먼저, 왜 여러 MQ 구현체 중에 SQS 를 선택했을까?
>> SaaS(Software as a Service)
기존에 사용하던 솔루션은  SaaS가 아니였습니다. 기존에는 RabbitMQ 클러스터를 구성하고 유지하는 비용을 모두 감당해야했습니다. 
RabbitMQ에 요청이 늘고, 줄을 때마다 클러스터를 구성하는 인스턴스 늘려주는 작업도 필요하고, 인스턴스 관리도 필요합니다. 
하지만 SQS는 AWS에서 SaaS로 제공되어, 유지보수는 AWS가 알아서 해줍니다. 
또한 SQS 는 생각보다 요금도 저렴합니다. 매월 100만건에 대해서는 요금을 부과하지 않습니다. 
물론 가장 중요한 어플리케이션에 필요한 기능을 제공해주기도 합니다.

따라서 효율적인 면과, 관리적인 면에서 SQS로 마이그레이션 하기로 결정하였습니다. 



SQS Features
어떤 서비스든 각각 장단점이 있다고 생각합니다


SQS Queue Types 
SQS에서 제공하는 큐 타입은 스탠다드, FIFO 두가지가 있고, 특징은 아래와 같습니다. 
FIFO가 300 msg/s 속도제한은 있지만 저의 경우 이정도면 충분했습니다. 하지만 아직 일부 리전만 지원됩니다.
결국 어쩔 수 없이, FIFO가 지원되지 않는 리전이기 때문에, 스탠다드 큐로 구현하기로 결정했습니다. 

 특징

스탠다드 

 FIFO

 메시지 순서

섞일 수 있음 

보장 

속도 

무제한 

 Default : 300 msg/s ~ 3000 msg/s (batch consume: MAX 10)

아무래도 순서와 중복을 보장해야되서 처리속도가 느립니다.

 중복

발생가능 

중복없음 

지원여부 

모두지원 

리전별 지원 



Payload Size & 요금
요금은 Request 단위입니다.
메시지 Payload는 S3 에 저장되며, SQS는 레퍼런스를 전달합니다. 
하나의 메시지는 256KB 까지 전달 가능합니다.
64KB를 "chunk" 청크 라고 부르며 1개의 요청에 해당합니다. 즉, 한번의 API로 256K 메시지를 전달하면 결국 4개의 요청에 해당하여 4개 Request에 대한 요금을 지불합니다.


Polling Time - Long polling / Short polling
메시지를 받을때까지 대기하는 시간입니다.
1~20 초 사이에서 설정 가능합니다.
SQS 도 내부적으로 Push방식이 아니라 Poll 방식입니다. 
따라서 Poll 메소드를 얼마나 자주 호출하냐에 따라 Short / Long 폴링으로 구분됩니다.
Poll 메소드안에서 오래 머물 경우 Long 폴링 이라고 합니다.
대부분의 경우 Long polling 이 적합하며, Long polling 을 사용한다면 컨슈머마다 개별 스레드를 가져가는 것이 좋습니다.
Short polling은 하나의 스레드로 여러 큐를 처리할 경우 적합합니다. 
왜냐하면 하나의 큐가 Long polling이면 그 큐에서 많이 머무르므로 다른 큐가 처리가 안되기 때문입니다. 



Msg Locking 

어떤 컨슈머가 특정 메시지를 받는 순간, 해당 메시지는 락이 걸립니다. 

락을 통해 다른 컨슈머가 동시에 같은 메시지를 컨슈밍 하지않도록 제어합니다. 

Message Locking은 Visibility Time 파라미터를 이용해서 구현됩니다. 

Visibility Time 값은 SQS에서 큐서비스를 제공하기 위한 변수입니다. 

SQS 내부적으로는 카프카와 같이 분산 시스템에 데이터를 복제하여 저장하고 Visibility Time이라는 변수를 이용해서 큐를 흉내냅니다. 



Visibility Time / Retention Time
** Visibility Timeout : Default 30sec ~ MAX 12h  => 12시간 내에 처리 가능해야 함을 의미합니다. 
Visibility Time동안 다른 컨슈머는 해당 메시지를 볼 수 없습니다. 즉, 해당 메시지는 스킵하고 볼수 있는 다음 메시지를 처리합니다. 
** Retention Time : MAX 14 days => visibility time과는 다른 속성으로, 클러스터에 메시지가 남아있는 시간을 의미합니다. 

분산시스템에서 송신자가 수신자가 메시지를 처리했는지 확인하기는 힘듭니다. 
따라서 SQS 에서는 컨슈머가 메시지를 처리하고 큐에서 지우는 역할을 합니다. 
메시지가 컨슈머에 의해 처리되는 동안, 다른 컨슈머는 처리중인 메시지를 또 수신하면 안되므로, 메시지를 가립니다. 
Visibility Timeout = N 으로 설정된 시간 동안, 만약 해당 메시지를 가져간 특정 컨슈머가 메시지를 처리하지 못하면,
해당 메시지는 다른 컨슈머에게 노출되서 다른 컨슈머가 처리할 수 있게됩니다. 




Delay Queues - Delivery Delay

Visibility Timeout이 메시지가 소모된 이후 다른 컨슈머에게 숨기는 시간이라면,

Delay time은 메시지가 프로듀서에 의해 도착하자마자 갖는 숨기는 시간입니다. 

즉, 모든 컨슈머에 의해 처리가능한 상태가 될 때까지의 지연시간입니다.

Default: 0 ~ MAX 15 min.




Serializer
네트워크를 통해 데이터를 전송할 때는 Serializer가 필요합니다. 
SQS로 데이터를 송/수신할 때 데이터를 읽거나 쓸때 사용하는 Serializer 를 지정할 수 있습니다. 
Serializer에는 Byte Serializer / JSON Serializer 두 가지 종류가 있습니다. 
단, Byte Serializer 를 사용할 때는 주의해야할 점이 있습니다. 
- 바이트 시리얼라이저는, 필드가 추가되거나 삭제되면 에러를 발생시킬 수 있습니다.
- 따라서 배포때 메시지 필드가 추가/삭제되면 문제가 될 수 있습니다.
- 또한 발행된 메시지가 큐에서 제대로 발행되었는지 확인이 불가능합니다.
따라서 보통 JSON Serializer 를 사용합니다.





SQS with Spring

Spring 에서 Spring-cloud-aws 라이브러리를 이용하여 SQS를 구현한 예제를 소개합니다. 


spring-cloud-aws 개요

spring-cloud에서 제공하는 라이브러리입니다. 
Spring에서 aws지원하기 위한 REST API를 제공해줍니다. 
Producer는 Spring-cloud-aws에서 기본적으로 제공하는 방식을 사용해도 대부분 문제가 없습니다. 
대부분의 메시징 시스템이 그러하듯, 프로듀서는 간단합니다. 
하지만 Consumer의 경우 메시지를 어떻게 처리하냐에 따라, 비즈니스의 특성에 따라 성격이 많이 달라집니다. 
병렬로 메시지를 처리해야되는지, 순차적으로 처리해야되는지..등 
제가 구현해야될 어플리케이션에서는 기본으로 제공되는 컨슈머의 기능만으로는 제한이 있어 별도로 구현하였습니다. 
기본으로 제공되는 컨슈머는 워커-스레드 모델에서 워커의 수를 확인하지 않고 SQS에 메시지가 있을때마다 컨슈밍해와서 문제가 있습니다. 따라서 isAvailiable이란 메소드를 추가하여 워커의 수가 여유있을 때만 메시지를 컨슈밍하도록 구현했습니다. 


SQS Config
아래와 같이 SQS와 커넥션을 얻기 위한 설정 빈을 생성합니다.
물론, 해당 빈은 스프링 빈 컨텍스트 설정 객체에서 @Import 해줍니다. 
설정은 특별한게 없습니다. 프라퍼티만 바꾸고 거의 그대로 사용해도 됩니다.

이중에서 amazonSQSAsync 빈이 SQS 통신을 하기 위해 사용됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class SqsConfig {
   @Value("${aws.auth.access_key}")
   private String awsAccessKey;
   @Value("${aws.auth.secret_key}")
   private String awsSecretKey;
   @Bean
   public AWSCredentials credential() {
      return new BasicAWSCredentials(awsAccessKey, awsSecretKey);
   }
   @Bean
   public AWSCredentialsProvider awsCredentialsProvider() {
      return new AWSStaticCredentialsProvider(credential());
   }
   @Bean
   public RegionProvider regionProvider() {
      return new RegionProvider() {
         @Override
         public Region getRegion() {
            return Region.getRegion(Regions.AP_NORTHEAST_2);
         }
      };
   }
   @Bean
   public AmazonSQSAsync amazonSQSAsync() {
      return AmazonSQSAsyncClientBuilder
         .standard()
         .withCredentials(new AWSStaticCredentialsProvider(credential()))
         .withRegion(Regions.AP_NORTHEAST_2)
         .build();
   }
}




SQS Producer
위에서 설정한 AmazonSQSAsync  빈을 이용해서 메시지 템플릿을 생성하고 템플릿을 이용해서 메시지를 생성합니다. 

프로듀서도 설정과 마찬가지로 일반적인 내용이기 때문에 별도로 설명이 필요 없습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class SqsDownloadRequestProducer {
   private final QueueMessagingTemplate queueMessagingTemplate;
   @Autowired
   private SqsProducerQueueNameResolver queueNameResolver;
 
   @Autowired
   public SqsRequestProducer(AmazonSQSAsync amazonSQSAsync) {
      this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
   }
 
   public void send(String jobType, String message) {
      String queueName = queueNameResolver.getQueueName(jobType);
      this.queueMessagingTemplate.send(queueName, MessageBuilder.withPayload(message).build());
   }
}
cs



Check Message in SQS
위에까지 하면 이제 큐에 성공적으로 발행이 가능합니다. 
queueNameResolver는 jobType에 맞게 알맞는 큐 이름을 리턴하도록 만들면 됩니다. 
아래와 같이 AWS SQS 로 들어가면 큐의 내용을 볼 수 있습니다. 물론 큐 는 가장먼저 생성해놓아야 합니다. 
큐를 생성할 때는 Deadletter를 위한 큐 DLQ(Dead Letter Queue)를 먼저 생성해놓아야 큐를 생성할때 DLQ를 지정할 수 있습니다. 

**주의!
이때 주의할 점은, 큐 안의 메시지를 대쉬보드에서 폴링하게되면, 소모됩니다. 
즉, 다른 컨슈머가 못보므로 실제 운영환경에서는 조심해야 합니다. 
그리고 또한 Serializer가 ByteSerializer인 경우 내용을 봐도 알아볼 수 없습니다.




SQS Consumer Config
아래 코드는 컨슈머 설정 코드입니다. 
스프링에서 MQ 관련 프로그래밍을 해보신 분이라면 매우 비슷해서 빨리 적응할 것 같습니다. 
** QueueAtributes
QueueAtributes 객체는 하나의 큐를 처리하기 위한 정보를 가지고 있습니다. 
이번에는 example1, example2라는 두 개의 큐를 병렬로 컨슈밍하며 병렬로 처리합니다.
따라서 example1 큐용 워커스레드풀인 example1Worker, example2큐용 워커스레드풀인 example2Worker를 별도로 가져갑니다.
각각의 큐를 처리하는데 필요한 스레드풀이나, 큐정보는 QueueAttributes 객체가 담당합니다. 
따라서 QueueAtributes 도 두 개 생성되며, 이들은 리스너에 전달됩니다.
다시한번, QueueAttributes 가 큐에 대한 특징을 모두 갖고있습니다. 큐 이름, 큐를 담당하는 워커 스레드 풀 정보를 갖습니다. 

리스너에서는 start()라이프사이클 메소드에서 QueueAttributes를 반복문으로 돌며 각 큐를 처리하는 리스너 스레드를 생성합니다. 
리스너 스레드는 그저 메시지를 폴링하는 역할이고, 폴링된 메시지를 처리하는 역할은 QueueAttributes에 할당된 워커 스레드가 담당합니다. 
워커스레드가 있을 때만 메시지를 폴링해야 하기 때문에 isAvailable()이라는 메소드가 컨슈머코드에 추가됬습니다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@Import({ SqsConfig.class })
@ComponentScan(basePackageClasses = { Example.class})
@Configuration
public class SqsConsumerConfig {
   @Value("${sqs.example1.queue.name}")
   private String EXAMPLE1_SQS_NAME;
   @Value("${sqs.example2.queue.name}")
   private String EXAMPLE2_SQS_NAME;
   @Value("${sqs.consumer.auto.startup}")
   private boolean consumerAutoStartUp;
   @Autowired
   private AmazonSQSAsync amazonSQSAsync;
 
   @Bean
   public SqsListener prodSqsListener() {
      String example1Destination = getDestination(EXAMPLE1_SQS_NAME);
      String example2Destination = getDestination(EXAMPLE2_SQS_NAME);
      QueueAttributes example1Queue = new QueueAttributes( example1Worker() , EXAMPLE1_SQS_NAME, example1Destination);
      QueueAttributes example2Queue = new QueueAttributes( example2Worker() , EXAMPLE2_SQS_NAME, example2Destination);
      SqsListener prodSqsListener = new SqsListener();
      prodSqsListener.setQueues(Lists.newArrayList(example1Queue, example2Queue));
      prodSqsListener.setAmazonSqs(amazonSQSAsync);
      prodSqsListener.setAutoStartup(consumerAutoStartUp);
      return prodSqsListener;
   }
 
   @Bean(name = "example1Worker")
   public ThreadPoolTaskExecutor example1Worker() {
      return makePool("example1-"5050);
   }
 
   @Bean(name = "example2Worker")
   public ThreadPoolTaskExecutor example2Worker() {
      return makePool("example2-"4040);
   }
 
 
   private ThreadPoolTaskExecutor makePool(String threadNamePrefix, int corePoolSize, int maxPoolSize) {
      ThreadPoolTaskExecutor executor =  new ThreadPoolTaskExecutor();
      executor.setThreadNamePrefix(threadNamePrefix);
      executor.setCorePoolSize(corePoolSize);
      executor.setMaxPoolSize(maxPoolSize);
      executor.setKeepAliveSeconds(60); // 60s
      return executor;
   }
 
   private String getDestination(String queueName) {
      CachingDestinationResolverProxy<String> destinationResolver = new CachingDestinationResolverProxy<>(new DynamicQueueUrlDestinationResolver(amazonSQSAsync));
      return destinationResolver.resolveDestination(queueName);
   }
}
cs




SQS Consumer
** isAutoStartup
컨슈머 입니다. SmartLifeCycle 인터페이스를 이용해서 빈으로 등록될 경우 isAutoStartup 이  true일 때만 start()를 실행하도록 합니다. 
이렇게 설정한 이유는 운영환경에서 큐 시스템을 배포할 때는, autoStartup을 false로 하고, 배포 단계에 맞춰서 큐를 실행시키고 중지시키는 과정이 필요하기 때문입니다. 

Start에서 위에 설정으로 등록한 example1, example2 큐를 반복문으로 돌면서 컨슈머 스레드를 실행시킵니다. 
또한 각 컨슈머 스레드는 동시에 처리할 수 있는 메시지 개수를 각각 지정할 수 있도록 하였습니다. 
각 컨슈머는 비동기로 메시지를 최대 지정한 개수 N개 까지 가져와서 별도의 워커 스레드로 처리합니다. 
메시지 큐 컨슈밍 모델에는 컨슈머 수를 늘리는 멀티 컨슈머모델과 워커스레드 모델이있습니다. 
지금 이것이 워커-스레드모델에 해당합니다. 

** isAvailable
isAvailable 메소드는 남아있는 워커 스레드가 있을 때만 SQS에서 메시지를 가져오기 위해 추가됬습니다. 
이게 없으면 SQS 에서 비동기로 메시지를 가져오기 때문에 메시지가 있을 때마다 가져와서 메모리의 스레드 작업 큐에 쌓아버립니다. 
이렇게되면 종료할 경우 메모리가 모두 날라가거나 스레드 잡 큐가 모두 찰 경우가 발생할 수 있어 메시지는 온전히 SQS 에서 대기하도록 구현하였습니다. 


Sleep before consume
Thread.sleep(10) 을 매번 주는 이유도 있습니다. 
이건 isAvailable 에서 현재 동작중인 스레드를 카운트하는 로직과 관련이 깊습니다. 

getActiveCount() 메소드는 해당 Executor 에서 현재 실행중인 스레드 개수를 가져옵니다. 
근데 생각해보면 스레드의 상태는 언제고 변할 수 있기 때문에 정확한 수치를 가져오기가 매우 힘듭니다. 
가져온다고 해도 가져오는 시점과 가져와서 사용하는 시점사이에 변화가 생길 수 있습니다. 

Thread.sleep(10) 을 제거하고 getActiveCount1, getActiveCount2  debug 로그를 찍어봅니다. 
그러면 드물게 getActiveCount2가 getActiveCount1 보다 1이 더 큰 경우가 발생합니다.
이는 activeCount를 구하는 그 짧은 시간 동안 스레드 상태가 바뀌어서 발생하는 것입니다. 
execute 를 실행하면 스레드 잡이 submit 되고 submit된 잡이 실행되야 activeCount에 카운트 됩니다. 
따라서 sleep이 없을 경우 submit되고 바로 다음 activeCount를 구하게 되는데, 그 중간에 submit된 잡이 실행됨으로써 activeCount에 오차가 발생할 수 있습니다. 
이런 오차때문에 가끔 워커스레드 개수가 N개로 제한되어도 N+1개가 실행되는 현상이 발생합니다. 

sleep(10)을 거는 이유는 activeCount 구하는 시간이 1ms도 안나오지만, 넉넉히 주어서 오차를 없애기 위함입니다.
다시한번 QueueAtributes 에 워커 스레드 정보를 포함하여 큐 정보를 모두 포함하는 것에 주목합니다. 
QueueAtributes 객체에 큐 컨슈밍에 필요한 모든 정보를 포함함으로써, 리스너는 단순히 QueueAtributes객체에서 정보를 가져다가 실행하기만 합니다. 
그래서 리스너 스레드를 보면, QueueAtributes에서 워커스레드를 가져다가 사용하며, 큐정보도 QueueAtributes에서 가져다가 사용합니다.  이렇게 구조를 잡은 덕분에, QueueAtributes 에 또 다른 큐 정보를 넣으면 여러 큐를 동시에 컨슈밍할 수도 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
 
@Setter
@Slf4j
@Component
public class SqsListener implements SmartLifecycle {
   @Autowired
   private MessageHandler messageHandler;
   private List<QueueAttributes> queues = new ArrayList<>();
   private AmazonSQSAsync amazonSqs;
   private boolean autoStartup = false;
   private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
@Override
   public void start() {
      isRunning.set(true);
      queues.forEach(queue -> {
         Thread listener = new Thread(new AsynchronousMessageListener(queue));
         listener.start();
      });
   }
 
   private class AsynchronousMessageListener implements Runnable {
      private final QueueAttributes queueAttributes;
 
      private AsynchronousMessageListener(QueueAttributes queueAttributes) {
         this.queueAttributes = queueAttributes;
      }
 
      @Override
      public void run() {
         while (isRunning.get()) {
            try{
               // Wait for isAvailable thread active count issue.
               Thread.sleep(10);
               if(isAvailable(queueAttributes.getWorkerExecutor())) {
                  ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(queueAttributes.getReceiveMessageRequest());
                  for (Message message : receiveMessageResult.getMessages()) {
                     queueAttributes.getWorkerExecutor().execute(messageHandler.createHandler(queueAttributes, message));
                  }
               }
            } catch (InterruptedException ie ) {
               log.error("[SqsListener] error during sleep", ie);
            } catch (Exception e) {
               log.error("[SqsListener] An Exception occurred while polling queue '{}'."
                  , this.queueAttributes.getQueueName(),  e);
            }
         }
      }
   }
 
   private boolean isAvailable(ThreadPoolTaskExecutor workerExecutor) {
      // log.debug("getActiveCount1 = ",  workerExecutor.getActiveCount());
      int activeCnt = workerExecutor.getActiveCount();
      // log.debug("getActiveCount2 = ",  workerExecutor.getActiveCount());
      int maxPoolSize = workerExecutor.getMaxPoolSize();
      if(activeCnt < maxPoolSize) {
         return true;
      } else {
         return false;
      }
   }
   private AmazonSQSAsync getAmazonSqs() {
      return this.amazonSqs;
   }
   public void setAutoStartup(boolean autoStartup) {
      this.autoStartup = autoStartup;
   }
   @Override
   public boolean isAutoStartup() {
      return this.autoStartup;
   }
   
   @Override
   public void stop() {
      isRunning.set(false);
   }
   @Override
   public boolean isRunning() {
      return isRunning.get();
   }
   @Override
   public void stop(Runnable callback) {
      this.stop();
      callback.run();
   }
   @Override
   public int getPhase() {
      return 0;
   }
}
 
cs



Message Handler
핸들러는 비즈니스 로직을 처리합니다. 
컨슈머가 컨슈밍한 메시지는 메시지 핸들러에게 전달합니다. 
QueueAtributes는 큐정보,스레드정보 등 큐처리에 관한정보를 갖습니다. 
컨슈머는 QueueAtributes를 이용해서 메시지를 읽기만 합니다.  
읽은 메시지의 처리는 핸들러가 담당합니다. 
workerExecution빈은 비즈니스 로직이 담긴 빈이므로 신경쓰지 않아도 됩니다.
 ( * 이 빈은 비동기 플랫폼인 아카(AKKA)액터로 구현되어있는데, 나중에 아카에 대한 포스팅도 할 예정입니다. ) 

** deleteMessage
비동기로 메시지를 컨슈밍해서 처리했기 때문에 deleteMessage()를 호출해서 메시지를 SQS에서 제거해줘야 합니다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
@Setter
@Component
public class MessageHandler implements Runnable {
   @Autowired
   private WorkerExecution workerExecution;
   @Autowired
   private RequestConverter requestConverter;
   @Autowired
   private AmazonSQSAsync amazonSQSAsync;
   private QueueAttributes queueAttributes;
   private Message message;
 
   @Override
   public void run() {
      RequestMessageDto request = requestConverter.convert(message.getBody());
      log.info("[SqsRequestConsumer] Start Consuming : {}", request);
      WorkflowJob workflowJob = requestConverter.convert(request.getRequestId());
      if(workflowJob != null) {
         workerExecution.execute(workflowJob);
      }
      deleteMessage();
   }
 
   protected void deleteMessage() {
      String receiptHandle = message.getReceiptHandle();
      String queueUrl = this.queueAttributes.getReceiveMessageRequest().getQueueUrl();
      amazonSQSAsync.deleteMessageAsync(new DeleteMessageRequest(queueUrl, receiptHandle));
   }
 
   public MessageHandler createHandler(QueueAttributes queueAttributes, Message message) {
      MessageHandler messageHandler = new MessageHandler();
      messageHandler.workerExecution = workerExecution;
      messageHandler.requestConverter = requestConverter;
      messageHandler.amazonSQSAsync = amazonSQSAsync;
      messageHandler.queueAttributes = queueAttributes;
      messageHandler.message = message;
      return fwMessageHandler;
   }
}
 
cs


Fail-over , Duplicate 처리
** Re-queue
MQ 는 잘 처리되지 못한 메시지에 대해 Re-queue 기능을 제공해줍니다. 
하지만 SQS 는 정말 심플하여… Re-queue기능은 제공하지 않습니다. 
FIFO가 아닌 경우 중복 방지 또한 visibility time을 이용해서 비완전한 기능을 제공합니다.
** 중복처리
Visibility time이 오버되면 또 한번 컨슈머에 의해 처리될 수 있으므로 중복 방지 로직도 추가해 줘야 합니다. 

다이어그램을 보면서 Visibility time에 대해 이해하고,
어떻게 Re-queue, Duplicate 를 처리해야되는지 순차적으로 알아봅니다.

1. Visibility time 이해하기

복잡한 오류처리를 하기 전 먼저 가장 간단하게 visibility time 을 이해해봅니다.
첫번째 다이어그램은 visibility time을 이해하기 위함입니다. 
아래처럼 Visibility time = 10min으로 설정된 경우 폴링 스레드는 1번 메시지를 소모한 뒤,
deleteMessage ACK를 호출하지 않으면 10분 뒤 visibility time이 끝나서 1번 메시지를 또 소모합니다.

그리고 비동기이기 때문에 두 워커스레드에 의해 나뉘어서 처리되고 ACK 또한 두번갑니다.
ACK 자체는 두번가도 idempotent (멱등) 하기 때문에 상관없습니다.




2. 중복처리
** 상태를 저장해서 중복방지
워커스레드에서 DB를 이용해서 메시지의 처리 상태를 저장하여 중복을 방지할 수 있습니다. 
메시지를 하나의 워커스레드가 받으면, DB에 상태를 처리중으로 업데이트 합니다. 
다른 워커 스레드가 동일한 메시지를 받으면 ACK를 보내지 않고 다음 메시지를 소모합니다. 
자기가 처리하는 메시지가 아니기 때문에, ACK를 보내면 안됩니다. 기존에 처리하던 워커스레드에서 실패할 수도 있기 때문입니다.)


3. 메시지 처리시간이 너무 긴 경우 => DLQ 
** 처리가 잘 되었는데도 메시지가 DLQ로
Visibility time에 비해 process time이 너무 긴 경우 처리가 잘 되더라도 DLQ 로 메시지가 전달될 수 있습니다. 
아래 다이어그램을 보면, 컨슈머가 메시지를 컨슈밍 할 때마다 프로듀써는 메시지 수신 카운터를 증가시킵니다. 
그리고 그 다음 메시지를 전송할 때 메시지 수신 카운터 N이 설정된 수치를 초과하면,  메시지는 데드레러 큐(DLQ)로 빠집니다. 
그리고 컨슈머는 비동기로 메시지를 처리하기때문에, 컨슈머가 오랜시간 뒤에 에크를 잘 리턴해도 메시지는 그 사이에 여러번 시도되어 DLQ로 빠질 수 있습니다.


 



회고
현재 SQS 를 이용해서 시스템을 운영중입니다.
하루에 시간당 많게는 000건, 적게는 000건을 처리하는 시스템임에도 이슈가 한번도 없을 정도로 안정적입니다. 
SaaS라 관리할 필요도 없어서 매우 편리합니다. 문제가 생기면 AWS에 Case Open을 통해 문의하기만 하면 됩니다.
라이브러리 또한 잘 제공되고 있어 개발하기도 편합니다. 
또한, 무엇보다 월 100만건에 대해 무료인 점이, 작은 규모로 큐 서비스를 제공하고자 하는 시스템에서 큰 장점이 됩니다. 

RabbitMQ보다는 조금 더 Simple하여 Re-queue / 중복방지 처리를 구현해야 하는 불편함이 있지만,
구조가 심플하기 때문에 확장성이 좋아 FIFO가 아닌 일반 스탠다드 큐의 경우 거의 무한대에 가까운 메시지 처리속도를 제공합니다. 
RabbitMQ 보다는 확장성에서 더 뛰어난지는 테스트가 필요하지만, 
카프카와 같은 개념이라면, RabbitMQ보다 Dumb pipe라 간단하기 때문에 확장성이 더 좋은 것 같습니다.
만약, 그렇지 않더라도 확장은 AWS에서 SaaS로 제공해주는 것이기 때문에, AWS가 생각할 일입니다.
엄청.. 많은 양의 요청을 처리해야한다면 먼저 AWS에 Case open을 통해 문의해보는 것을 권장합니다. 

정리하자면, RabbitMQ보다는 확장성이 좋고, 카프카보다 큐 서비스에 비슷한 서비스가 필요하다면 SQS도 괜찮은 것 같습니다. 
카프카 보다는 기존에 사용하던 큐 서비스에 조금더 가까운 기능을 제공하여, 마이그레이션이 수월했습니다. 
또한 SaaS로써 큐를 따로 관리할 필요가 없는 것도, 개발인력을 줄일 수 있어 큰 장점이 됩니다. 


그밖에...
** JMS
spring-cloud-aws 를 이용하기 전에 JMS를 고려했었습니다. 
JMS란 메시징 표준으로 자바에서 네트워크를 통해 데이터를 송수신할 때 사용하는 API 명세입니다.
SQS에서 기존에 JMS를 구현한 큐 서비스가 있을 경우 쉽게 마이그레이션 해주기 위해 JMS 통합 API를 제공합니다.
즉, 다른 큐 서비스여도 JMS 를 구현했으면 SQS로 쉽게 마이그레이션이 가능합니다.
따라서 어떤 큐 서비스든지 JMS를 이용해서 구현해놓는 것이 표준을 지킨다는 점에서 좋고,
그 외에도 JMS에서 제공하는 세션객체 등을 사용함으로써 부가적인 이점도 있습니다.

하지만!
Spring-cloud-aws는 REST API를 이용합니다. 
JMS는 메시지 프로토콜로, SQS에서 REST API를 제공한다면, 
메시징 표준보다 REST API가 더 넓은 범위의 통신규약이므로 REST API를 사용하는것이 더 좋습니다. 
또한, JMS는 지금 많이 사용 안한다고 합니다. 그래서 Spring-cloud-aws를 사용하기로 결정했습니다.