Background
Why SQS?
특징 |
스탠다드 |
FIFO |
메시지 순서 |
섞일 수 있음 |
보장 |
속도 |
무제한 |
Default : 300 msg/s ~ 3000 msg/s (batch consume: MAX 10) 아무래도 순서와 중복을 보장해야되서 처리속도가 느립니다. |
중복 |
발생가능 |
중복없음 |
지원여부 |
모두지원 |
리전별 지원 |
Msg Locking
어떤 컨슈머가 특정 메시지를 받는 순간, 해당 메시지는 락이 걸립니다.
락을 통해 다른 컨슈머가 동시에 같은 메시지를 컨슈밍 하지않도록 제어합니다.
Message Locking은 Visibility Time 파라미터를 이용해서 구현됩니다.
Visibility Time 값은 SQS에서 큐서비스를 제공하기 위한 변수입니다.
SQS 내부적으로는 카프카와 같이 분산 시스템에 데이터를 복제하여 저장하고 Visibility Time이라는 변수를 이용해서 큐를 흉내냅니다.
Delay Queues - Delivery Delay
Visibility Timeout이 메시지가 소모된 이후 다른 컨슈머에게 숨기는 시간이라면,
Delay time은 메시지가 프로듀서에 의해 도착하자마자 갖는 숨기는 시간입니다.
즉, 모든 컨슈머에 의해 처리가능한 상태가 될 때까지의 지연시간입니다.
Default: 0 ~ MAX 15 min.
Spring 에서 Spring-cloud-aws 라이브러리를 이용하여 SQS를 구현한 예제를 소개합니다.
spring-cloud-aws 개요
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | @Configuration public class SqsConfig { @Value("${aws.auth.access_key}") private String awsAccessKey; @Value("${aws.auth.secret_key}") private String awsSecretKey; @Bean public AWSCredentials credential() { return new BasicAWSCredentials(awsAccessKey, awsSecretKey); } @Bean public AWSCredentialsProvider awsCredentialsProvider() { return new AWSStaticCredentialsProvider(credential()); } @Bean public RegionProvider regionProvider() { return new RegionProvider() { @Override public Region getRegion() { return Region.getRegion(Regions.AP_NORTHEAST_2); } }; } @Bean public AmazonSQSAsync amazonSQSAsync() { return AmazonSQSAsyncClientBuilder .standard() .withCredentials(new AWSStaticCredentialsProvider(credential())) .withRegion(Regions.AP_NORTHEAST_2) .build(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | @Service public class SqsDownloadRequestProducer { private final QueueMessagingTemplate queueMessagingTemplate; @Autowired private SqsProducerQueueNameResolver queueNameResolver; @Autowired public SqsRequestProducer(AmazonSQSAsync amazonSQSAsync) { this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync); } public void send(String jobType, String message) { String queueName = queueNameResolver.getQueueName(jobType); this.queueMessagingTemplate.send(queueName, MessageBuilder.withPayload(message).build()); } } | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | @Slf4j @Import({ SqsConfig.class }) @ComponentScan(basePackageClasses = { Example.class}) @Configuration public class SqsConsumerConfig { @Value("${sqs.example1.queue.name}") private String EXAMPLE1_SQS_NAME; @Value("${sqs.example2.queue.name}") private String EXAMPLE2_SQS_NAME; @Value("${sqs.consumer.auto.startup}") private boolean consumerAutoStartUp; @Autowired private AmazonSQSAsync amazonSQSAsync; @Bean public SqsListener prodSqsListener() { String example1Destination = getDestination(EXAMPLE1_SQS_NAME); String example2Destination = getDestination(EXAMPLE2_SQS_NAME); QueueAttributes example1Queue = new QueueAttributes( example1Worker() , EXAMPLE1_SQS_NAME, example1Destination); QueueAttributes example2Queue = new QueueAttributes( example2Worker() , EXAMPLE2_SQS_NAME, example2Destination); SqsListener prodSqsListener = new SqsListener(); prodSqsListener.setQueues(Lists.newArrayList(example1Queue, example2Queue)); prodSqsListener.setAmazonSqs(amazonSQSAsync); prodSqsListener.setAutoStartup(consumerAutoStartUp); return prodSqsListener; } @Bean(name = "example1Worker") public ThreadPoolTaskExecutor example1Worker() { return makePool("example1-", 50, 50); } @Bean(name = "example2Worker") public ThreadPoolTaskExecutor example2Worker() { return makePool("example2-", 40, 40); } private ThreadPoolTaskExecutor makePool(String threadNamePrefix, int corePoolSize, int maxPoolSize) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix(threadNamePrefix); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setKeepAliveSeconds(60); // 60s return executor; } private String getDestination(String queueName) { CachingDestinationResolverProxy<String> destinationResolver = new CachingDestinationResolverProxy<>(new DynamicQueueUrlDestinationResolver(amazonSQSAsync)); return destinationResolver.resolveDestination(queueName); } } | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | @Setter @Slf4j @Component public class SqsListener implements SmartLifecycle { @Autowired private MessageHandler messageHandler; private List<QueueAttributes> queues = new ArrayList<>(); private AmazonSQSAsync amazonSqs; private boolean autoStartup = false; private final AtomicBoolean isRunning = new AtomicBoolean(false); @Override public void start() { isRunning.set(true); queues.forEach(queue -> { Thread listener = new Thread(new AsynchronousMessageListener(queue)); listener.start(); }); } private class AsynchronousMessageListener implements Runnable { private final QueueAttributes queueAttributes; private AsynchronousMessageListener(QueueAttributes queueAttributes) { this.queueAttributes = queueAttributes; } @Override public void run() { while (isRunning.get()) { try{ // Wait for isAvailable thread active count issue. Thread.sleep(10); if(isAvailable(queueAttributes.getWorkerExecutor())) { ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(queueAttributes.getReceiveMessageRequest()); for (Message message : receiveMessageResult.getMessages()) { queueAttributes.getWorkerExecutor().execute(messageHandler.createHandler(queueAttributes, message)); } } } catch (InterruptedException ie ) { log.error("[SqsListener] error during sleep", ie); } catch (Exception e) { log.error("[SqsListener] An Exception occurred while polling queue '{}'." , this.queueAttributes.getQueueName(), e); } } } } private boolean isAvailable(ThreadPoolTaskExecutor workerExecutor) { // log.debug("getActiveCount1 = ", workerExecutor.getActiveCount()); int activeCnt = workerExecutor.getActiveCount(); // log.debug("getActiveCount2 = ", workerExecutor.getActiveCount()); int maxPoolSize = workerExecutor.getMaxPoolSize(); if(activeCnt < maxPoolSize) { return true; } else { return false; } } private AmazonSQSAsync getAmazonSqs() { return this.amazonSqs; } public void setAutoStartup(boolean autoStartup) { this.autoStartup = autoStartup; } @Override public boolean isAutoStartup() { return this.autoStartup; } @Override public void stop() { isRunning.set(false); } @Override public boolean isRunning() { return isRunning.get(); } @Override public void stop(Runnable callback) { this.stop(); callback.run(); } @Override public int getPhase() { return 0; } } | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | @Slf4j @Setter @Component public class MessageHandler implements Runnable { @Autowired private WorkerExecution workerExecution; @Autowired private RequestConverter requestConverter; @Autowired private AmazonSQSAsync amazonSQSAsync; private QueueAttributes queueAttributes; private Message message; @Override public void run() { RequestMessageDto request = requestConverter.convert(message.getBody()); log.info("[SqsRequestConsumer] Start Consuming : {}", request); WorkflowJob workflowJob = requestConverter.convert(request.getRequestId()); if(workflowJob != null) { workerExecution.execute(workflowJob); } deleteMessage(); } protected void deleteMessage() { String receiptHandle = message.getReceiptHandle(); String queueUrl = this.queueAttributes.getReceiveMessageRequest().getQueueUrl(); amazonSQSAsync.deleteMessageAsync(new DeleteMessageRequest(queueUrl, receiptHandle)); } public MessageHandler createHandler(QueueAttributes queueAttributes, Message message) { MessageHandler messageHandler = new MessageHandler(); messageHandler.workerExecution = workerExecution; messageHandler.requestConverter = requestConverter; messageHandler.amazonSQSAsync = amazonSQSAsync; messageHandler.queueAttributes = queueAttributes; messageHandler.message = message; return fwMessageHandler; } } | cs |
1. Visibility time 이해하기
'개발 > Messaging system' 카테고리의 다른 글
Kafka Consumer Design (1) | 2018.10.31 |
---|