Take advantage of consumer groups in Redis Streams to distribute the processing of a data stream among multiple consumers Credit: Vaikoovery Redis Streams is a new data structure, introduced in Redis 5.0, that allows you to create and manage data streams. In a previous article, I showed how to add data to a stream, and how to read the data in multiple ways. In this article, I’ll explain how to use consumer groups in Redis Streams. A consumer group is a way to split a stream of messages among multiple clients to speed up processing or lighten the load for slower consumers. In a perfect world, both data producers and consumers work at the same pace, and there’s no data loss or data backlog. Unfortunately, that’s not the case in the real world. In nearly all real-time data stream processing use cases, producers and consumers work at different speeds. In addition, there is more than one type of consumer, each with its own requirements and processing pace. Redis Streams addresses this need with a feature set that gravitates heavily towards supporting the consumers. One of its most important features is the consumer group. When to use a Redis Streams consumer group The purpose of consumer groups is to scale out your data consumption process. Let’s consider one example — an image processing application. The solution requires three main components: A producer (one or more cameras, perhaps) that captures and stores images; Redis Stream that saves images (in a stream data store) in the order they arrive; and An image processor that processes each image. Redis Labs Figure 1. Sample image processing solution with a single producer and consumer. Suppose your producer saves 500 images per second, and the image processor processes only 100 images per second at its full capacity. This rate difference will create a backlog, and your image processor will never be able to catch up. An easy way to address this problem is to run five image processors (as shown in Figure 2), each processing a mutually exclusive set of images. You can achieve this through a consumer group, which enables you to partition your workloads and route them to different consumers. Redis Labs Figure 2. Scaling the solution with more consumers to match the production rate. A consumer group does more than data partitioning — it ensures data safety and enables disaster recovery. How a Redis Streams consumer group works A consumer group is a data structure within a Redis Stream. As shown in Figure 3, you can think about a consumer group as a collection of lists. Another thing to imagine is a list of items that are not consumed by any consumers — for our discussion, let’s call this an “unconsumed list.” As data arrives in the stream, it is immediately pushed to the unconsumed list. Redis Labs Figure 3. How a Redis Streams consumer group is structured. The consumer group maintains a separate list for each consumer, typically with an application attached. In figure 3, our solution has N identical applications (App 1, App 2, …. App n) that read data via Consumer 1, Consumer 2, … Consumer n respectively. When an app reads data using the XREADGROUP command, specific data entries are removed from the unconsumed list and pushed into the pending entries list that belongs to the respective consumer. Thus, no two consumers will consume the same data. Finally, when the app notifies the stream with the XACK command, it will remove the item from the consumer’s pending entries list. Now that I’ve explained the basics of consumer groups, let’s dig deeper into how this data lifecycle works. Creating a Redis Streams consumer group You can create a new consumer group using the command XGROUP CREATE, as shown below. XGROUP CREATE mystream mygroup $ MKSTREAM As with XREAD, a $ sign at the end of the command tells the stream to deliver only new data from that point in time forward. The alternate option is 0 or another ID from the stream entry. When using 0, the stream will deliver all data from the beginning of the stream. MKSTREAM creates a new stream, mystream in this case, if it does not already exist. Reading and managing Redis Stream data Assume you have a Redis Stream (mystream), and you have already created a consumer group (mygroup) as shown above. You can now add items with names a, b, c, d, e as in the following example. XADD mystream * name a Running this command for names a through e will populate Redis Stream, mystream, and the unconsumed list of the consumer group mystream. This is illustrated in Figure 4. Redis Labs Figure 4. A new Redis Streams consumer group called mygroup. Here you can see that consumers Alice and Bob haven’t started their jobs yet. App A consumes data through the consumer Alice, while App B consumes data through Bob. Consuming Redis Streams data The command to read data from a group is XREADGROUP. In our example, when App A starts processing data, it calls the consumer (Alice) to fetch data, as in: XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream > Similarly, App B reads the data via Bob, as follows: XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream > The special character > at the end tells Redis Streams to fetch only data entries that are not delivered to any other consumers. Also note that no two consumers will consume the same data, which will result in moving data from the unconsumed list to Alice and Bob as shown in Figure 5. Redis Labs Figure 5. Reading data to the consumer in a Redis Streams consumer group. Removing processed messages from pending entries lists The data in the pending entries lists of your consumers will remain there until App A and App B acknowledge to Redis Streams that they have successfully consumed the data. This is done using the command XACK. For example, App A would acknowledge as follows after consuming d and e, which have the IDs 1526569411111-0 and 1526569411112-0. XACK mystream mygroup 1526569411111-0 1526569411112-0 The combination of XREADGROUP and XACK is analogous to starting a transaction and committing it, which ensures data safety. After running XACK, let’s assume App A executed XREADGROUP as shown below. Now the data structure looks like Figure 6. XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream > Redis Labs Figure 6. The Redis Streams consumer group after XACK and XREADGROUP by App A. Recovering from failures If App B terminated due to failure while processing b and c, then the data structure would look like Figure 7. Redis Labs Figure 7. App B hasn’t acknowledged with the XACK command. Now you are left with two options: 1. Restart App B and reload the data from the consumer (Bob). In this case, App B must read data from your consumer (Bob) using the XREADGROUP command, but with one difference. Instead of > at the end, App B would pass 0 (or the ID lower than the previous data entry that was processed). Remember that > sends new data from the unconsumed list to the consumer. XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream 0 The above command will retrieve data entries that are already stored in the list for consumer Bob. It will not fetch new data from the unconsumed list. App B could iterate through all the data in the consumer Bob before fetching new data. 2. Force Alice to claim all the data from Bob and process it via App A. This is particularly helpful if you cannot recover App B due to node, disk, or network failure. In such cases, any other consumer (such as Alice) can claim Bob’s data and continue processing that data, thus preventing service downtime. To claim Bob’s data, you must run two sets of commands: XPENDING mystream mygroup - + 10 Bob This will fetch all pending data entries for Bob. The options – and + fetch the entire range. If b and c had the IDs 1526569411113-0 and 1526569411114-0 respectively, the command that will move Bob’s data to Alice is as follows: XCLAIM mystream mygroup Alice 0 1526569411113-0 1526569411114-0 Consumer groups maintain a running clock for data in the consumed list. For example, when App B reads b, the clock kicks in until Bob receives the ACK. With the time option in the XCLAIM command, you can tell the consumer group to move only data that’s idle longer than a specified time. You can also ignore that by passing 0 as shown in the example above. The result of these commands is illustrated in Figure 8. XCLAIM also comes in handy when one of your consumer processors is slow, resulting in a backlog of unprocessed data. Redis Labs Figure 8. Alice claimed all of the data from Bob. In the previous article, we covered the basics of how to use Redis Streams. We went a bit deeper in this article and explained when to use consumer groups and how they work. Consumer groups in Redis Streams reduce your burden when it comes to managing data partitions, their lifecycles, and data safety. Plus, the scale-out capabilities of consumer groups can benefit many real-time applications. In a forthcoming third article on Redis Streams, I will demonstrate how to develop a real-time classification application using Redis Streams and Lettuce, a Java-based open source library for Redis. Meanwhile, you can learn more by working through the Redis Streams tutorial on the Redis project website. Roshan Kumar is a senior product manager at Redis Labs. He has extensive experience in software development and technology marketing. Roshan has worked at Hewlett-Packard and many successful Silicon Valley startups including ZillionTV, Salorix, Alopa, and ActiveVideo. As an enthusiastic programmer, he designed and developed mindzeal.com, an online platform hosting computer programming courses for young students. Roshan holds a bachelor’s degree in computer science and an MBA from Santa Clara University. — New Tech Forum provides a venue to explore and discuss emerging enterprise technology in unprecedented depth and breadth. The selection is subjective, based on our pick of the technologies we believe to be important and of greatest interest to InfoWorld readers. InfoWorld does not accept marketing collateral for publication and reserves the right to edit all contributed content. Send all inquiries to newtechforum@infoworld.com. Related content feature 14 great preprocessors for developers who love to code Sometimes it seems like the rules of programming are designed to make coding a chore. Here are 14 ways preprocessors can help make software development fun again. By Peter Wayner Nov 18, 2024 10 mins Development Tools Software Development feature Designing the APIs that accidentally power businesses Well-designed APIs, even those often-neglected internal APIs, make developers more productive and businesses more agile. By Jean Yang Nov 18, 2024 6 mins APIs Software Development news Spin 3.0 supports polyglot development using Wasm components Fermyon’s open source framework for building server-side WebAssembly apps allows developers to compose apps from components created with different languages. By Paul Krill Nov 18, 2024 2 mins Microservices Serverless Computing Development Libraries and Frameworks news Go language evolving for future hardware, AI workloads The Go team is working to adapt Go to large multicore systems, the latest hardware instructions, and the needs of developers of large-scale AI systems. By Paul Krill Nov 15, 2024 3 mins Google Go Generative AI Programming Languages Resources Videos