Python 實作簡易訊息佇列服務


建立時間: 2023年2月21日 02:45
更新時間: 2023年2月21日 03:44

說明

最近閱讀《Python 非同步設計|使用 Asyncio》看到了一個不錯的訊息佇列服務範例,對於初學者來說可能會有很多沒看過的語法,不過內容還是相當有趣,想要了解非同步程式設計訊息佇列服務的人,值得一看。

本篇不會詳細說明每行程式在做什麼,而是會大概說明每個腳本的用途,有興趣的讀者們可以購買此書或上網查詢。

系統說明

我先簡單說明這個服務會有兩種使用者,一種是發佈者,另一種是訂閱者,訂閱者會訂閱某個頻道,發佈者則是代表某個頻道發布訊息給訂閱者的用戶,而不管是發送訊息或是接收訊息都會由伺服器處理,這裡的伺服器只是一個 Python 腳本而已,不用擔心。

目錄結構

因為需要在終端下指令,為了方便,等等實作請把檔案放在同一個資料夾底下

  • Chapter4/
    • message_proto.py
    • mq_client_listen.py
    • mq_client_sender.py
    • mq_server_plus.py

訊息協定

message_proto.py

from asyncio import StreamReader
from asyncio import StreamWriter


async def read_message(stream: StreamReader) -> bytes:
    size_bytes = await stream.readexactly(4)
    size = int.from_bytes(size_bytes, byteorder='big')
    data = await stream.readexactly(size)

    return data


async def send_message(stream: StreamWriter, data: bytes):
    size_bytes = len(data).to_bytes(4, byteorder='big')
    stream.writelines([size_bytes, data])
    await stream.drain()

這個腳本是用來處理訊息的,無論是發佈者或是訂閱者,發送訊息和接收訊息都會用這個腳本裡面的方法, size_bytes 就是資料長度,這個是自己定義的,所以可以看到發送訊息和接收訊息是相似的。

伺服器

更具體來說這個是訊息代理器

mq_server_plus.py

from asyncio import CancelledError
from asyncio import IncompleteReadError
from asyncio import Queue
from asyncio import StreamReader
from asyncio import StreamWriter
from asyncio import create_task
from asyncio import run
from asyncio import sleep
from asyncio import start_server
from collections import defaultdict
from collections import deque
from contextlib import suppress
from message_proto import read_message
from message_proto import send_message

SUBSCRIBERS: defaultdict[bytes, deque] = defaultdict(deque)
SEND_QUEUES: defaultdict[StreamWriter, Queue] = defaultdict(Queue)
CHANNEL_QUEUES: dict[bytes, Queue] = {}


async def client(reader: StreamReader, writer: StreamWriter):
    peername = writer.get_extra_info('peername')
    subscribe_channel = await read_message(reader)
    SUBSCRIBERS[subscribe_channel].append(writer)
    send_task = create_task(send_client(writer, SEND_QUEUES[writer]))
    print(f'Remote {peername} subscribed to {subscribe_channel}')

    try:
        while channel_name := await read_message(reader):
            data = await read_message(reader)

            if channel_name not in CHANNEL_QUEUES:
                CHANNEL_QUEUES[channel_name] = Queue(maxsize=10)
                create_task(channel_sender(channel_name))

            await CHANNEL_QUEUES[channel_name].put(data)
    except CancelledError:
        print(f'Remote {peername} connection cancelled')
    except IncompleteReadError:
        print(f'Remote {peername} disconnected')
    finally:
        print(f'Remote {peername} closed')
        await SEND_QUEUES[writer].put(None)
        await send_task
        del SEND_QUEUES[writer]
        SUBSCRIBERS[subscribe_channel].remove(writer)


async def send_client(writer: StreamWriter, queue: Queue):
    while True:
        try:
            data = await queue.get()
        except CancelledError:
            continue

        if not data:
            break

        try:
            await send_message(writer, data)
        except CancelledError:
            await send_message(writer, data)

    writer.close()
    await writer.wait_closed()


async def channel_sender(name: bytes):
    with suppress(CancelledError):
        while True:
            writers = SUBSCRIBERS[name]

            if not writers:
                await sleep(1)
                continue

            if name.startswith(b'/queue'):
                writers.rotate()
                writers = [writers[0]]

            if not (message := await CHANNEL_QUEUES[name].get()):
                break

            for writer in writers:
                if not SEND_QUEUES[writer].full():
                    print(f'Sending to {name}: {message[:19]}...')
                    await SEND_QUEUES[writer].put(message)


async def main(*args, **kwargs):
    server = await start_server(*args, **kwargs)

    async with server:
        await server.serve_forever()

try:
    run(main(client, host='127.0.0.1', port=25000))
except KeyboardInterrupt:
    print('Bye!')

這個伺服器是書中已經優化的版本,書中總共有兩個版本,一個是原版,另一個是優化後的 plus 版。

main() 為主要程式的入口,相當於伺服器開始運作。每當有一個用戶端(發佈者或訂閱者)跟伺服器連線時就會觸發一次 client()send_client() 為發送訊息給用戶端的方法, channel_sender() 為接收頻道的方法,這麼做的用意是不用因為接收訊息太慢,整個塞車。

傾聽器

mq_client_listen.py

from argparse import ArgumentParser
from asyncio import IncompleteReadError
from asyncio import open_connection
from asyncio import run
from message_proto import read_message
from message_proto import send_message
from uuid import uuid4


class Args():
    host: str
    port: int
    listen: str


async def main(args: Args):
    me = uuid4().hex[:8]
    print(f'Starting up {me}')
    reader, writer = await open_connection(args.host, args.port)
    print(f'I am {writer.get_extra_info("sockname")}')
    channel = args.listen.encode()
    await send_message(writer, channel)

    try:
        while data := await read_message(reader):
            print(f'Received by {me}: {data[:20]}')

        print('Connection closed.')
    except IncompleteReadError:
        print('Server closed.')
    finally:
        writer.close()
        await writer.wait_closed()

if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument('--host', type=str, default='localhost')
    parser.add_argument('--port', type=int, default=25000)
    parser.add_argument('--listen', type=str, default='/toptic/foo')

    try:
        run(main(parser.parse_args()))
    except KeyboardInterrupt:
        print('Bye!')

這就是訂閱者,傾聽訂閱的頻道的訊息。

傳送器

mq_client_sender.py

from argparse import ArgumentParser
from asyncio import CancelledError
from asyncio import sleep
from asyncio import open_connection
from asyncio import run
from itertools import count
from message_proto import send_message
from uuid import uuid4


class Args():
    channel: str
    host: str
    interval: float
    port: int
    size: int


async def main(args: Args):
    me = uuid4().hex[:8]
    print(f'Starting up {me}')
    _, writer = await open_connection(args.host, args.port)
    print(f'I am {writer.get_extra_info("sockname")}')
    channel = b'/null'
    await send_message(writer, channel)
    target_channel = args.channel.encode()

    try:
        for i in count():
            await sleep(args.interval)
            data = b'X' * args.size or f'Message {i} from {me}'.encode()

            try:
                await send_message(writer, target_channel)
                await send_message(writer, data)
            except OSError:
                print('Connection ended.')
                break
    except CancelledError:
        writer.close()
        await writer.wait_closed()

if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument('--channel', default='/toptic/foo', type=str)
    parser.add_argument('--host', default='localhost', type=str)
    parser.add_argument('--interval', default=1, type=float)
    parser.add_argument('--port', default=25000, type=int)
    parser.add_argument('--size', default=0, type=int)

    try:
        run(main(parser.parse_args()))
    except KeyboardInterrupt:
        print('Bye!')

這就是發佈者,頻道發佈訊息出去給伺服器,伺服器會發送訊息給傾聽器(訂閱者),讓傾聽器接收到訊息。

輸出

這裡會開多個終端,因為每個終端開啟後會一直運行,直到你結束行程為止。

注意 $ 後面的是指令,^C 是 control + C 組合鍵,用來終止行程。

終端1

$ python mq_server_plus.py
Remote ('127.0.0.1', 56215) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 56220) subscribed to b'/queue/blah'
Remote ('127.0.0.1', 56225) subscribed to b'/null'
Sending to b'/queue/blah': b'Message 0 from fe10'...
Sending to b'/queue/blah': b'Message 1 from fe10'...
Sending to b'/queue/blah': b'Message 2 from fe10'...
Sending to b'/queue/blah': b'Message 3 from fe10'...
^C(手動終止)
Remote ('127.0.0.1', 56220) connection cancelled
Remote ('127.0.0.1', 56220) closed
Remote ('127.0.0.1', 56225) connection cancelled
Remote ('127.0.0.1', 56225) closed
Remote ('127.0.0.1', 56215) connection cancelled
Remote ('127.0.0.1', 56215) closed
Bye!

終端2

python mq_client_listen.py --listen /queue/blah
Starting up 2270068c
I am ('127.0.0.1', 56215)
Received by 2270068c: b'Message 1 from fe10c'
Received by 2270068c: b'Message 3 from fe10c'
Received by 2270068c: b'Message 5 from fe10c'
Received by 2270068c: b'Message 7 from fe10c'
Received by 2270068c: b'Message 9 from fe10c'
Server closed.

這是第1個傾聽者。

終端3

python mq_client_listen.py --listen /queue/blah
Starting up b63cfb7a
I am ('127.0.0.1', 56220)
Received by b63cfb7a: b'Message 0 from fe10c'
Received by b63cfb7a: b'Message 2 from fe10c'
Received by b63cfb7a: b'Message 4 from fe10c'
Received by b63cfb7a: b'Message 6 from fe10c'
Received by b63cfb7a: b'Message 8 from fe10c'
Server closed.

這是第2個傾聽者。

終端4

python mq_client_sender.py --channel /queue/blah
Starting up fe10cc27
I am ('127.0.0.1', 56225)
Connection ended.

這是第一個發佈者。

結論

非同步程式設計較為複雜,考慮的事情也較多,無法以單向思維的方式思考,這也是我自己第一次使用 Python 實作訊息服務,體驗相當不錯。

參考

觀看次數: 872
asyncioclientmessagepythonqueueserver
按讚追蹤 Enjoy 軟體 Facebook 粉絲專頁
每週分享資訊技術

一杯咖啡的力量,勝過千言萬語的感謝。

支持我一杯咖啡,讓我繼續創作優質內容,與您分享更多知識與樂趣!