Receive messages from an Amazon SQS queue using Spring Boot

In this article we’ll use Spring Boot to create an endpoint that polls an Amazon Simple Queue Service (SQS) queue for messages. Messages will consist of famous quotes saved to the database as they’re received.

Amazon SQS and Spring Series

This article is part of a series:

  1. Create a queue in Amazon SQS
  2. Set up a Spring Boot app with Amazon SQS
  3. Send messages to an Amazon SQS queue using Spring Boot
  4. Receive messages from an Amazon SQS queue using Spring Boot

Set up the database

Prerequisites

You must have MySQL installed locally. If you need help with the installation, refer to How to install MySQL on Windows.

You can use a tool such as MySQL Workbench to run the following query. It creates a database and a user with permissions to connect to the database.

CREATE DATABASE spring_sqs;
CREATE USER 'spring' @'localhost' IDENTIFIED BY 'password';
GRANT ALL ON spring_sqs.* TO 'spring'@'localhost';
aws-sqs-create-database-in-workbench

Back in the Spring app:

  1. Add the database connection information to application.properties.
  2. Remove the property we had previously added to exclude data source auto configuration (spring.autoconfigure.exclude).
### Database ###
spring.datasource.url=jdbc:mysql://localhost:3306/spring_sqs
spring.datasource.username=spring
spring.datasource.password=password
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
spring.jpa.hibernate.ddl-auto=validate

Create a table to store the quotes

We’re going to use Flyway, a database migration tool that uses plain SQL, to create the table that will store quotes.

In the Spring app:

  1. Go to src/main/resources/db.migration
  2. Create a file named V1.0__create_tables.sql with the following query.
CREATE TABLE quotes (
id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL,
text VARCHAR(400) NOT NULL,
author VARCHAR(100),
aws_message_id VARCHAR(100) UNIQUE NOT NULL,
date_received TIMESTAMP
);

Note: Flyway migrations have to be named following a certain pattern or you will get an error when starting the application.

  1. Start the application. This will cause the Flyway script to run and create the table.
aws-sqs-tables-created

Create a Quote entity

Next we’re going to set up the entity that we’re going to save to the database. The text and awsMessageId fields are required.

package com.makolyte.springsqsstarter.entity;

import javax.persistence.*;
import javax.validation.constraints.NotNull;
import java.time.Instant;

@Entity
@Table(name = "quotes")
public class QuoteEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private long id;

    @NotNull
    private String text;

    private String author;

    @NotNull
    private String awsMessageId;

    private Instant dateReceived;

    //required by Hibernate
    public QuoteEntity() {
    }

    public QuoteEntity(
            @NotNull String text,
            String author,
            @NotNull String awsMessageId,
            Instant dateReceived) {
        this.text = text;
        this.author = author;
        this.awsMessageId = awsMessageId;
        this.dateReceived = dateReceived;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public String getAwsMessageId() {
        return awsMessageId;
    }

    public void setAwsMessageId(String awsMessageId) {
        this.awsMessageId = awsMessageId;
    }

    public Instant getDateReceived() {
        return dateReceived;
    }

    public void setDateReceived(Instant dateReceived) {
        this.dateReceived = dateReceived;
    }
}

Create a repository to save the message

Create the following repository to save the QuoteEntity to the database. The save method is provided by the repository by default, so we don’t need to declare it explicitly.

We’ve added the method that allows us to verify if the database contains other quotes with a given AWS message ID.

package com.makolyte.springsqsstarter.repository;

import com.makolyte.springsqsstarter.entity.QuoteEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface QuoteRepository extends JpaRepository<QuoteEntity, Long> {

    boolean existsByAwsMessageId(String awsMessageId);
}

Create a service to handle duplicate quotes

Messages are normally deleted from the queue once they’re retrieved by a client. However, an AWS Standard Queue supports at-least-once-delivery, which means that in rare occasions a message could be received more than once.

For this reason, we need to build the app to handle duplicate quote messages. We’re going to use the AWS message ID, a unique identifier for the message that is included in the message headers, to detect duplicates. Duplicate messages will be ignored. We’ll add the logic to a service.

package com.makolyte.springsqsstarter.service;

import com.makolyte.springsqsstarter.dto.Quote;

public interface SqsService {
    void saveQuote(Quote incomingQuote,
                   String messageId,
                   String approximateFirstReceiveTimestamp);
}
package com.makolyte.springsqsstarter.service;

import com.makolyte.springsqsstarter.entity.QuoteEntity;
import com.makolyte.springsqsstarter.dto.Quote;
import com.makolyte.springsqsstarter.repository.QuoteRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.time.Instant;

@Service
public class SqsServiceImpl implements SqsService {
    private static final Logger LOG = LoggerFactory.getLogger(SqsServiceImpl.class);

    private final QuoteRepository quoteRepository;

    public SqsServiceImpl(QuoteRepository quoteRepository) {
        this.quoteRepository = quoteRepository;
    }

    @Override
    public void saveQuote(Quote incomingQuote, String messageId, String approximateFirstReceiveTimestamp) {
        if (quoteRepository.existsByAwsMessageId(messageId)) {
            LOG.warn("Quote with AWS Message ID {} already exists", messageId);
        } else {
            QuoteEntity quote = new QuoteEntity(
                    incomingQuote.getText(),
                    incomingQuote.getAuthor(),
                    messageId,
                    toInstant(approximateFirstReceiveTimestamp)
            );
            LOG.info("Saving quote with AWS Message ID {}", messageId);
            quoteRepository.save(quote);
        }
    }

    private Instant toInstant(String dateAsStr) {
        return Instant.ofEpochMilli(Long.parseLong(dateAsStr));
    }
}

Create an endpoint that polls the queue

We’re going to add an endpoint to the SqsController that polls or listens to the queue for new messages. We’re going to capture the AWS message ID and received date/time from the message headers.

To learn about the layout of the message received from Amazon SQS and the headers available, see the API documentation for ReceiveMessage.

package com.makolyte.springsqsstarter.controller;

import com.makolyte.springsqsstarter.dto.Quote;
import com.makolyte.springsqsstarter.service.SqsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.bind.annotation.*;

import javax.validation.Valid;

@RestController
@RequestMapping("/sqs")
public class SqsController {
    private static final Logger LOG = LoggerFactory.getLogger(SqsController.class);
    public static final String QUOTE_QUEUE = "QuoteQueue";

    private final QueueMessagingTemplate queueMessagingTemplate;
    private final SqsService sqsService;

    public SqsController(
            QueueMessagingTemplate queueMessagingTemplate,
            SqsService sqsService) {
        this.queueMessagingTemplate = queueMessagingTemplate;
        this.sqsService = sqsService;
    }

    @PostMapping("/quotes")
    @ResponseStatus(HttpStatus.CREATED)
    public void sendQuote(@RequestBody @Valid Quote quote) {
        LOG.info("Sending quote {} to SQS", quote);
        this.queueMessagingTemplate.convertAndSend(QUOTE_QUEUE, quote);
    }

    @SqsListener(QUOTE_QUEUE)
    @ResponseStatus(HttpStatus.CREATED)
    public void receiveQuote(@Valid Quote quote,
                             @Header("MessageId") String messageId,
                             @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
        LOG.info("Received quote {} with messageId {}", quote, messageId);
        sqsService.saveQuote(quote, messageId, approximateFirstReceiveTimestamp);
    }
}

Send and receive a quote

Start your app and send a quote to the queue. In a few seconds, you should see in the console that the quote is received and saved to the database.

aws-queue-console-received-quote
aws-queue-my-sql-received-quote

Add tests

We’re going to add unit tests for the SqsService to verify two main scenarios: 1) the quote received already exists, in which case it should not be saved to the database and 2) the quote received is a new quote and should be saved to the database.

package com.makolyte.springsqsstarter.service;

import com.makolyte.springsqsstarter.entity.QuoteEntity;
import com.makolyte.springsqsstarter.dto.Quote;
import com.makolyte.springsqsstarter.repository.QuoteRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class SqsServiceImplTest {

    private SqsService sqsServiceImpl;

    @Mock
    private QuoteRepository quoteRepository;

    @BeforeEach
    public void setup() {
        sqsServiceImpl = new SqsServiceImpl(quoteRepository);
    }

    @Test
    public void saveQuote_quoteExists_itIsNotSaved() {
        String awsMessageId = "12345";
        when(quoteRepository.existsByAwsMessageId(awsMessageId)).thenReturn(true);
        Quote incomingQuote = mock(Quote.class);

        sqsServiceImpl.saveQuote(
                incomingQuote,
                awsMessageId,
                "1585818038");

        verify(quoteRepository, never()).save(any(QuoteEntity.class));
    }

    @Test
    public void saveQuote_quoteDoesNotExist_itIsSaved() {
        String awsMessageId = "12345";
        String receivedTimestamp = "1585818038";
        when(quoteRepository.existsByAwsMessageId(awsMessageId)).thenReturn(false);
        Quote incomingQuote = new Quote(
                "The greatest glory in living lies not in never falling, " +
                        "but in rising every time we fall.",
                "Nelson Mandela"
        );

        sqsServiceImpl.saveQuote(
                incomingQuote,
                awsMessageId,
                receivedTimestamp);

        ArgumentCaptor<QuoteEntity> quoteEntityCaptor = ArgumentCaptor.forClass(QuoteEntity.class);
        verify(quoteRepository).save(quoteEntityCaptor.capture());
        QuoteEntity quoteEntity = quoteEntityCaptor.getValue();
        assertEquals(incomingQuote.getText(), quoteEntity.getText());
        assertEquals(incomingQuote.getAuthor(), quoteEntity.getAuthor());
        assertEquals(awsMessageId, quoteEntity.getAwsMessageId());
        assertEquals(Long.parseLong(receivedTimestamp), quoteEntity.getDateReceived().toEpochMilli());
    }
}
sqs-service-tests-passed

If you’ve been working through the articles in the series, you will also need to fix the previous controller test by adding a mock for the SqsService. Here’s an excerpt of our test class highlighting the code that needs to be added.

@WebMvcTest
class SqsControllerTest {

    @Autowired
    private MockMvc mockMvc;

    @MockBean
    private QueueMessagingTemplate queueMessagingTemplate;

    @MockBean
    private SqsService sqsService;

I’ve decided not to add an integration test for the listener endpoint because of how difficult it would be to set up such a test.

Code

The code for this application is available on GitHub.

Leave a Comment