How to Manage Queues in SQL Database Efficiently - Windows ASP.NET Core Hosting 2024 | Review and ComparisonWindows ASP.NET Core Hosting 2024 | Review and Comparison

How to Manage Queues in SQL Database Efficiently

The idea of a queue is very straightforward: there is a sender and a receiver, and they both send and receive messages. The delivery’s semantics are still important, though.

There are 3 semantic options

  • At-most-once;
  • At-least-once;
  • Exactly once.

At-most-once is also known as Fire-and-forget. Appropriate for analytics systems or notification delivery, where lost messages don’t hurt. However, since it is unnecessary to monitor whether a message is sent or even received, having such a thing has the benefit of maintaining performance at a higher level.

At least once – in this case, the message is not marked as read and stays that way until it is successfully read. Because it is straightforward and highly reliable (it goes with idempotent message handling), this semantic is the most widely used.

Exactly-once – when a message will be transmitted (and processed) a minimum of once and a maximum of once. It’s very challenging to accomplish, and the majority of message brokers don’t offer this capability. It needs atomic processing on the receiver’s end and synchronization between the sender and recipient. For instance, both the message handler and an operation that marks the message as read use a standard ACID transaction.

Systems that require Exactly-once and At-least-once are a typical use case for the below-described solution. OR, it might be a use of the Transactional Outbox pattern. A Failover Job, also known as a Dead Letter Queue, is another frequent occurrence. In this process, it’s crucial to prevent message loss while continuing to process in the event of a failure.

Although the concept I’m implementing below is not original and has been discussed previously, I’m sharing my practical experience and outlining variations and trade-offs. Every choice involves compromises, so in each situation you should rely on what you are accustomed to. However, if you choose to use the database as a queue, the code implementation will probably be the same as what I recommend in this article.

Implementation

Message Sender

A straightforward SQL-insert can easily send data to a queue in a SQL database. It might appear as follows in its most basic form:

private final ObjectMapper objectMapper;
private final SimpleJdbcInsert jdbcInsert;

public void send(String topic, T message) {
   try {
       MessageEntity entity = new MessageEntity();
       entity.setTopic(topic);
       entity.setPayload(objectMapper.writeValueAsString(message));
       entity.setCreated(Instant.now().toEpochMilli());
       jdbcInsert.execute(new BeanPropertySqlParameterSource(entity));
   } catch (Exception e) {
       log.error("Unable to send the message {}", message, e);
       throw new RuntimeException(e);
   }
}

To send a message we need the following:

  • Know where to send to – topic.
  • Any practical method can be used to serialize the message itself. JSON has been used here because it is a human-readable text format, but a binary format might be more space-efficient.
  • Investigations into problems may benefit from the creation date message.

You can use an additional field with the date the message can be processed after to use scheduled/delayed messages. Business logic combined with a common transaction will apply ACID properties to the “sending” message.

Message Receiver

Using multiple instances of the applications to scale or prevent a single point of failure is a common practice. As a result, we must prevent multiple people from accessing the same message at once. Utilizing a lock is the most effective way to do this.

You have a choice between two options, depending on how much time is needed to process the message:

  1. To lock the database row using SELECT ... FOR UPDATE – works for short transactions (not longer than a couple of seconds), because this lock works only within an open transaction. So the message should be marked as completed or deleted in the same transaction.
  2. Update the row by setting the status column in the database to IN_PROGRESS to create a lock based on that value. Start the processing, then finish it by setting the status to DONE or deleting the row. This will involve a number of transactions, and the intermediate state might be lost as a result of a server restart or an error. As a result, the lock should have an expiration date, and while handling the message, we must update the data. Remembering the fencing token and other distributed locking nuances is also important.

I put the first choice into practice below. It’s a straightforward approach that would be suitable for the vast majority of use cases for SQL-based queues.

private static final String SELECT = 
        "SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1";
private static final String DELETE = 
        "DELETE FROM messages WHERE id =:id";

private final DataClassRowMapper<MessageEntity> rowMapper = 
        new DataClassRowMapper<>(MessageEntity.class);

private final String topic;
private final Class<T> messageType;
private final MessageHandler<T> handler;

private final ObjectMapper objectMapper;
private final NamedParameterJdbcOperations jdbcOperations;

@Transactional
public void processMessage() {
   try {
       List<MessageEntity> messages = 
               jdbcOperations.query(SELECT, Map.of("topic", topic), rowMapper);
       if (messages.isEmpty()) {
           return;
       }
       MessageEntity messageEntity = messages.get(0);
       String payload = messageEntity.getPayload();
       T message = objectMapper.readValue(payload, messageType);
       handler.handle(message);
       jdbcOperations.update(DELETE, Map.of("id", messageEntity.getId()));
   } catch (Throwable e) {
       log.error("Unable to handle message of {}", topic, e);
   }
}

Let’s take a closer look at the fetching query.

SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1

FOR UPDATE, as stated above, secures the lock on a particular row within the active transaction. If all rows are locked or nothing matches the where clause, we can immediately return one of the unlocked rows or an empty set by using the construction SKIP LOCKED.

Only one message is intended to be fetched, so we limit the number of results and the number of messages that a transaction can block at once.

The database row should be deserialized before being passed to the handler because it contains serialized messages. The code above deletes the row from the database and commits the transaction after a successful handling. Without leaving a trace, deleting processed messages could make it difficult to conduct an investigation. Utilizing flags and the handling timestamp is an additional choice.

ProcessMessage should be looped with all necessary optimizations, such as reducing the number of handlers if the queue is empty or raising it to accommodate peak load.

Conclusion

I keep saying that the use of unconventional solutions shouldn’t cause issues for a project that is expanding. The secret is to evaluate the loads and choose workable solutions for them. Below is a list of the solution’s benefits and drawbacks.

Advantages

  • Writing and reading messages is made simple by the database’s understandable ACID mechanism. Failures and server restarts cannot be feared.
  • You can always run a query against the table without affecting how it is handled.
  • If you use a database queue, you won’t need to be familiar with other systems or component maintenance.
  • You can quickly improve your features, such as handling delay messages.

Disadvantages

  • Very little throughput. With read-only replicas, the database can be scaled, but in the event of a queue, all reading and writing must take place exactly in the master node. As a result, memory usage and the number of connections must be strictly limited because both the sender and the recipient, as well as all other database users generally, will compete for resources.
  • The load on the queue table is made up of frequent inserts and deletions (or soft deletions), and the row is locked during processing, which results in either lengthy transactions or complicated locking mechanisms. This can result in extremely high costs for database maintenance when using cloud solutions.