Redis Streams Tutorial

Redis introduced Streams in version 5.0. I never used them, or even used streams in any project in my life. In all my honesty, I always thought of streams as queues, and wondered why Redis even added them, if Redis Lists can also be used as queues. Ok, apparently I was wrong.

I started reading recently about Streams and Apache Kafka as well. And I thought playing with Redis Streams might be a good start point. Simply because I already have Redis installed, and Redis kind of copied a lof of the Streams Functionality from Kafka.

This tutorial will be updated hopefully with more examples and intructions from time to time, but I am going to start with something basic.

Difference between Queue and Stream

Queues

The queue is a data structure, that represents usually a container of tasks or messages that need to be processed. Usually when an item (task or message) is consumed from the queue, it's acknowledged or removed from the queue.

That's why queues are not usually supposed to save messages for long time, nor they should reserve so much space for messages or care so much about durability in most of the cases. But of course, when workers stop working, that's another story. But that's not our topic for today.

Because we are talking about Redis, it's worth mentioning that antirez, the original author of Redis, created a project called disque that represents in memory database for queues. You can watch this 21 minutes talk for an introduction about this project. The project used a lot of the Redis components that are written in C. The project couldn't get the attention it deserves, and never managed to match Redis popularity. It's not maintained anymore, but I thought it's worth mentioning.

Ok, so to summerize, queues are supposed to represent a small list of tasks that need to be processed. And in the best cases, this list is a few items long, until the workers manage to catch up, and process those few items and delete them.

Streams

The best way to think of streams, to imagine them as append only log of messages. Some producers write messages to the stream. These messages are persisted for some time, regardless of whether consumers read them or not. But here is the catch, two (consumer groups) can read the same messages. This is useful for many cases. The Stream Database (Redis) in this case, helps each consumer group to remember the last message they read, to give them the next message. So the stream database doesn't only save the stream messages, but also the state of the consumers pagination, to help them paginate through the messages, without having to remember any pagination token on their side.

I mentioned (consumer groups) here without defining it. A consumer group is a group of consumers, DAHAAAA, or workers that distribute the messages between them. In another words, they never read the same message. That's useful in case you need to process the messages in a way, but you need more than 1 worker to do the job.

Let's explain this with an example, you have a stream of 10 messages: [1,2,3,4,5,6,7,8,9,10]

If we have a (consumer group) A that consist has three consumers(workers), a1, a2, a3. In this case a1 might read the messages [1, 4, 8, 10], a2 might read the messages [2, 5, 6, 9]. And a3 will read the rest of the messages. But if had another consumer group B, that consists of only 1 worker, b1. b1 will read all the messages from 1 to 10. Consumer Group B has no idea about the consumer group A.

As you can see, this is different than Queues, where messages are removed, and cannot by read by anyone else.

Depends on the project scope, Streams are supposed to keep the messages in the database for some time. Maybe your project needs the messages to be saved for 24 hours. Redis supports functionality to keep X messages in the stream. We will see how to use it later.

Code Example

My intention was to write a small concise tutorial, so let's write some code. I will start with a Python code that fills the stream with a lot of messages.

import redis
import time
import datetime
r = redis.Redis(host='localhost', port=6379, db=0)
for i in range(1, 1000):
    r.xadd("mystream", {"value": i, "time": datetime.datetime.now().isoformat()}, id="*", maxlen=500, approximate=True)

What this script is doing, is to use the Redis Stream command XADD to add elements to the stream mystream. But also to keep the maximum size of the stream approximately to 500. Of course in real life projects, you might want to limit the number of messages by time, like 24 hours.

Not let's use the command XLEN to see how many messages are in the stream mystream

redis-cli XLEN mystream

In my case, I get 598. You might get a different result. That's because we have chosen approximate trimming of the stream, upon adding new elements.

Now let's create a streaming group A that read messages from the stream. Use the command XGROUP

redis-cli XGROUP CREATE mystream A 0

The 0 parameter here means, that we need to read the messages from the beginning of the stream. You could have passed $ instead to read only the newly added messages.

We used this command to create a (consumer group), but we don't need to run any explicit command to add members to this group. They will be created implicitly while reading.

Now let's run these commands to read messages using the a1, a2, a3 consumer group members.

XREADGROUP GROUP A a1 COUNT 1 STREAMS mystream >
XREADGROUP GROUP A a2 COUNT 1 STREAMS mystream >
XREADGROUP GROUP A a3 COUNT 1 STREAMS mystream >

The > here is a special ID that means messages that were not read by any other members of the same consumer group. Consumers can still pass a specific ID of some specific message. But hopefully I will expand on this part in the future.

You can see that each one of them returned a new message, because that's how consumer groups are supposed to work.

Now if we do the same steps again, but for consumer group (B), you will see that you will read the same messages that consumer group (A) has read.

TODO

I will expand this post in the near future with more instructions about how to acknowledge messages.

References


Homepage