Use Redis Stream module to realize group chat function

created at 07-28-2021 views: 5

First of all, let's take a look at the functions of our IM. Recall the IM that we use the most—WeChat. There are two forms of chat:

  • Single chat
  • group chat

Single chat is not difficult in terms of data, such as the simplest, we can use a relational database to store each chat record, or each one-to-one relationship, we use an AOF file to store, and so on, today our The focus is on group chats. We continue to analyze:

  • WeChat group chat does not save history records, which means that after switching from one phone to another, the history records no longer exist.
  • Group chat is many-to-many, meaning everyone can send and receive messages

The Redis Stream module perfectly provides the functions we need. Before using Stream, we have to understand what Stream is. The following is a brief description. If you want to understand it in more detail, you can refer to here:

The meaning of Stream, you can think of it as a stream of water. Water flows in the water flow, while in the Redis flow, information flows. Just like a water pipe, we can inject water from one end and discharge water from the other end. In Redis, we can inject information from one end, which we call generating information (the end that generates information is called the producer), or it can be at the other end. Consumer information (one end of consumer information is called a consumer).

To generate messages in Stream, the command XADD is used, and for consumption, XREAD is used.

Next, we will use these two commands to implement a simple group chat conversation.

MVP: blocking version

from redis import Redis

r = Redis() # Initialize the Redis object instance, there are no parameters here, so it will connect to the local redis: 127.0.0.1:6379

name = input("what's your name? ") # First of all, it is required to enter a name, which will be used as the identity authentication mark for group chat later
chat_stream = "my_chat_stream" # This is an identifier of this group chat, which is equivalent to the name of a group

while True: # enter an endless loop
     user_input = input("what you wanna say? ") # First output what you want to say? Prompt the user to enter content

     r.xadd(chat_stream, {name: user_input}) # Then send the input content to the group chat content

     print(r.xread({chat_stream: "$"}, None, 0)) # output the content read from the group

Run it, start a terminal and execute python main.py:

$ python main.py 
what's your name? jhon
what you wanna say? hello world
[[b'hello', [(b'1586415762129-0', {b'marry': b'nothing just kidding'})]]]
what you wanna say? hello, marry
[[b'hello', [(b'1586415773558-0', {b'marry': b'well'})]]]

Start another terminal and execute python main.py as well:

$ python main.py 
what's your name? marry
what you wanna say? nothing just kidding
[[b'hello', [(b'1586415768566-0', {b'jhon': b'hello, marry'})]]]
what you wanna say? well

As you can see, they can see each other's content, but there is a disadvantage, that is, they have to wait for the reading each time before the next input can be made. Then we set out to improve this.

Improvement: send and receive at the same time

When we receive messages, the timeout period is filled with 0, that is, we wait until the message is received, so if there is no group chat message, we can only wait forever instead of inputting new messages. A simpler improvement is to change it to not wait when the message is not received, and go directly to the next cycle, but this has another disadvantage, that is, if you do not enter the content, you will never see the new message.

Is there a way to prevent receiving and sending messages from interfering with each other? There is a way, we have to use threads: one thread is responsible for receiving messages, and one is responsible for sending messages:

import threading

from redis import Redis 

r = Redis() 

name = input("what's your name? ")
chat_stream = "my_chat_stream" 


def send_msgs():
    while True: 
        user_input = input("what you wanna say? ") 
        r.xadd(chat_stream, {name: user_input})  


def recv_msgs():
    while True:
        print(r.xread({chat_stream: "$"}, None, 0))  


if __name__ == "__main__":
    threading.Thread(target=recv_msgs).start()
    send_msgs()

If you run it, you will find that it is now possible, but there are still shortcomings, that is, both ends will print out their own information, this is because we did not process the received information, but directly printed it out. The next step is to improve these:

import threading

from redis import Redis  

r = Redis()  

name = input("what's your name? ")  
chat_stream = "my_chat_stream"  


def send_msgs():
    while True:  
        user_input = input("what you wanna say? ") 
        if user_input:
            r.xadd(chat_stream, {name: user_input})  


def handle_msgs(msgs):
    # msgs结构是:[[b'my_chat_stream', [(b'1586416610013-0', {b'jhon': b'nothing'})]]]
    for msg in msgs:  # Iteration, so msg is [b'my_chat_stream', [(b'1586416610013-0', {b'jhon': b'nothing'})]]
        _, msg_list = msg # Unpack, so msg_list is [(b'1586416610013-0', {b'jhon': b'nothing'})]
        for _, content in msg_list: # Unpack again and iterate, so the content is {b'jhon': b'nothing'}
            for user_name, user_input in content.items(): # Iteration, so user_name is b'jhon' and user_input is b'nothing'
                decoded_user_name = user_name.decode()
                decoded_user_input = user_input.decode()
                if decoded_user_name == name:
                    continue

                print("[{}]: {}".format(decoded_user_name, decoded_user_input))


def recv_msgs():
    while True:
        msgs = r.xread({chat_stream: "$"}, None, 0)  # Get the content read from the group
        handle_msgs(msgs) # Because the logic is not simple, in order to look simple and easy to understand here, we put the logic of processing the message in another function

if __name__ == "__main__":
    threading.Thread(target=recv_msgs).start()
    send_msgs()

Summarize

In this article, we have implemented a simple group chat function through a step-by-step iteration and with the help of Redis's Stream function. First, we use a simple infinite loop, enter the content after the input, and then we make improvements so that the input and The output is processed separately, complementary interference, and finally we further optimize the logic of processing the message, so that the output content looks clean and tidy.

created at:07-28-2021
edited at: 07-28-2021: