资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

【从0开始Python开发实战】Python集成Active

目录:

成都创新互联公司-专业网站定制、快速模板网站建设、高性价比南康网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式南康网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖南康地区。费用合理售后完善,十载实体公司更值得信赖。

1. Python集成ActiveMQ

2. 封装服务mq_service.py

3. 接收处理消息mq_listener.py

4. 启动消息监听服务mq.py

5. 单元测试test_mq_serivce.py

6. 发送消息功能调用

7. 常见问题和解决方法

ActiveMQ是一个非常流行的消息队列服务中间件,实现JMS规范,基于STOMP协议(端口为61613)支持Python访问。

JMS:Java Message Service

STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议

JMS规范定义了2类消息发送接收模型:点对点queue,发布订阅topic,区别是能够重复消费和是否保存。

1,点对点queue:不可重复消费,消息被消费前一直保存。

生产者发送消息到queue,一个消费者取出并消费消息。

消息被消费后,queue中不再保存,所有只有一个消费者能够取到消息。

queue支持多个消费者存在,但是一个消息只有一个消费者可以消费。

当前没有消费者时,消息一直保存,直到被消费者消费。

【从0开始Python开发实战】Python集成Active

2,发布订阅topic:可重复消费,发布给所有订阅者。

生产者发布消息到topic中,多个订阅者收到并消费消息。

和queue不同,发布到topic中的消息会被所有订阅者消费。

当生产者发布消息时,不管是否有订阅者,都不保存消息。

【从0开始Python开发实战】Python集成Active

JMS规范定义的2类消息传输模型queue和topic比较:


Queue

Topic

模型

点对点Point-to-Point

发布订阅publish/subscribe

有无状态

queue消息在消费前被一直保存在mq服务器上的文件或者配置DB

topic数据默认不保存,是无状态的。

完整性保障

queue保证每条消息都被消费者接收到

topic不保证生产者发布的每条消息都被订阅者接收到

消息是否会丢失

生产者发送消息到queue,消费者接收到消息。如果没有消费者,将一直保存,不会丢失。

生产者发布消息到topic时,当前的订阅者都能够接收到消息。如果当前没有订阅者,该消息就丢失。

消息发布接收策略

一对一的消息发布接收策略,一个生产者发送的消息只被一个消费者接收。mq服务器收到回复后,将这个消息删除。

一对多的消息发布接收策略,同一个topic的多个订阅者都能收到生产者发布的消息。

Python集成ActiveMQ使用stomp.py,只需简单配置,本文在Django框架下进一步封装服务mq_service.py。典型系统架构示意图和消息队列:

【从0开始Python开发实战】Python集成Active

时序图如下:

【从0开始Python开发实战】Python集成Active

示例代码:https://github.com/rickding/HelloPython/tree/master/hello_activemq

├── settings.py

├── mq

│   └── mq_service.py

│   └── mq_listener.py

├── tests

│   └── test_mq_service.py

├── management

│   └── commands

│        └── mq.py

一,Python集成ActiveMQ


代码文件

功能要点

Python集成ActiveMQ

requirements.txt

安装stomp.py:

stomp.py >= 5.0.1

封装服务

mq_serivce.py

封装ActiveMQ的消息发送和处理功能。在Django框架下,将地址等配置在settings.py中集中管理,注意端口为61613

接收处理消息

mq_listener.py

增加消息接收处理类,继承stomp.ConnectionListener

启动消息监听服务

mq.py

在Django框架下,将启动服务代码封装成command,方便调用和维护。

单元测试

test_mq_serivce.py

测试封装的功能函数

功能调用

views.py

增加REST接口/chk/mq,调用mq_service发送消息

1. 新建Django项目,运行:django-admin startproject hello_activemq

2. 进到目录hello_activemq,增加应用:python manage.py startapp app

【从0开始Python开发实战】Python集成Active

项目的目录文件结构如下:

【从0开始Python开发实战】Python集成Active

3. 安装stomp.py,pip install stomp.py >= 5.0.1

二,封装服务mq_service.py,调用ActiveMQ发送消息

1. 增加mq_service.py:

importjson
importlogging
importstomp
fromdjango.confimportsettings

log = logging.getLogger(__name__)

defsend_msg(msg_dict,queue_or_topic=settings.MQ_QUEUE):
    conn = stomp.Connection10([(settings.MQ_URL,settings.MQ_PORT)])
    conn.connect(settings.MQ_USER,settings.MQ_PASSWORD)

    msg_str = json.dumps(msg_dict)
    log.info('Send msg: %s, %s, %s'% (type(msg_dict),type(msg_str),msg_str))
    conn.send(queue_or_topic,msg_str)
    conn.disconnect()

2. 打开settings.py,配置ActiveMQ信息:

MQ_URL ='127.0.0.1'
MQ_PORT =61613
MQ_USER ='admin'
MQ_PASSWORD ='admin'
MQ_QUEUE ='/queue/SampleQueue'
MQ_TOPIC ='/topic/SampleTopic'

3. 为了增加代码的兼容和容错能力,封装get_conn(), close_conn()等辅助函数,详见代码文件mq_service.py。

三,接收处理消息mq_listener.py

1. 增加mq_listener.py,声明消息处理类,继承stomp.ConnectionListener:

importjson
importlogging
importstomp

log = logging.getLogger(__name__)

classMqListener(stomp.ConnectionListener):
    defon_message(self,headers,msg_str):
        log.info('Receive msg: %s, %s, %s'% (type(msg_str),msg_str,headers))

        msg_dict =None
        try:
            msg_dict = json.loads(msg_str)
        exceptExceptionase:
            log.warning('Exception when parse msg: %s'%str(e))

        log.info('Parsed msg: {}, {}'.format(type(msg_dict),msg_dict))

    defon_error(self,headers,msg_str):
        log.info('Error msg: %s, %s, %s'% (type(msg_str),msg_str,headers))

2. 在on_message()函数中,将消息字符串解析为json,方便业务处理。

3. 声明on_error()函数处理错误信息。

四,启动消息监听服务mq.py

1. 将循环接收消息代码封装成函数consume_msg(),增加在服务中mq_serivce.py:

importlogging
importtime
importstomp
fromdjango.confimportsettings

log = logging.getLogger(__name__)

defconsume_msg(listener,queue=settings.MQ_QUEUE,topic=settings.MQ_TOPIC):
    conn = stomp.Connection10([(settings.MQ_URL,settings.MQ_PORT)])
    conn.connect(settings.MQ_USER,settings.MQ_PASSWORD)
    
    conn.set_listener('',listener)
    conn.subscribe(queue)
    conn.subscribe(topic)

    while1:
        time.sleep(1000)  # secs

    conn.disconnect()

2. 调用set_listener()设置消息接收类实例,使用之前创建的MqListener

3. 调用subscribe()订阅消息,启动循环监听。

4. 我们将启动服务代码封装成command,在目录management/commands中增加mq.py

importlogging
fromdjango.core.management.baseimportBaseCommand
fromhello_activemq.mqimportmq_serviceasmq
fromhello_activemq.mq.mq_listenerimportMqListener

log = logging.getLogger(__name__)


classCommand(BaseCommand):
    help ='mq starts listener'

    defhandle(self,*args,**options):
        log.info("mq starts")
        returnmq.consume_msg(MqListener())

5. 运行命令python manage.py mq,看到消息提示,启动监听服务成功。

【从0开始Python开发实战】Python集成Active

五,单元测试test_mq_service.py

增加测试函数,发送消息:

importlogging
fromdjango.testimportTestCase
fromhello_activemq.mqimportmq_serviceasmq

log = logging.getLogger(__name__)

classMQServiceTest(TestCase):
    deftest_send_msg(self):
        msg_dict = {'content':'test msg dict','msg':'msg from python'}
        mq.send_msg_to_queue(msg_dict)
        mq.send_msg_to_topic({'msg':"test msg from python"})

运行python manage.py test,同时看到监听服务收到并处理消息:

【从0开始Python开发实战】Python集成Active

六,发送消息功能调用

1. 在views.py中发送消息,调用mq_servcie.py

importjson
fromdjango.httpimportHttpResponse
fromhello_activemq.mqimportmq_serviceasmq

defchk_mq(req):
    msg_dict = {
        'url': req.get_raw_uri(),
        'path': req.get_full_path(),
        'host': req.get_host(),
    }

    mq.send_msg_to_queue(msg_dict)
    mq.send_msg_to_topic(msg_dict)

    returnHttpResponse(json.dumps(msg_dict))

2. 在urls.py中配置路由

fromdjango.urlsimportpath
fromapp.viewsimportchk_mq

urlpatterns = [
    path('',chk_mq,name='chk'),
]

3. 运行命令启动服务:python manage.py runserver 0.0.0.0:8001

【从0开始Python开发实战】Python集成Active

4. REST接口发送消息

【从0开始Python开发实战】Python集成Active

七,常见问题和解决方法

1. 启动服务错误:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613

解决:检查ActiveMQ是否正常启动,特别注意是否开启STOMP协议端口61613

原因:Python连接ActiveMQ使用STOMP协议,端口默认61613

2. 发送消息时错误:TypeError: message should be a string or bytes, found

解决:将消息内容序列化为JSON,发送时调用json.dumps(),接收时调用json.loads()

原因:Python连接ActiveMQ使用的是STOMP协议,消息格式为简单文本。

注:JMS规范定义的5类消息:

字符串TextMessage,

键值对MapMessage,

序列化对象ObjectMessage

字节流BytesMessage

数据流StreamMessage

【从0开始Python开发实战】Python集成Active

ActiveMQ支持5类JMS消息,增加了二进制大文件消息BlobMessage:

【从0开始Python开发实战】Python集成Active

3. 跨系统对接时接收到的消息类型不是TextMessage

Python开发的业务处理服务 -> Java开发的API服务,接收到的消息类型为BytesMessage,Python发送时设置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的类型TextMessage

解决:stomp建立连接时配置参数conn = stomp.Connection10([("localhost", 61613)],auto_content_length=False)

原因:Python连接ActiveMQ使用STOMP协议,消息格式为简单文本,不携带类型信息,只通过header中的content-length来判断TextMessage和BytesMessage,所以发送消息时不在header中添加content-length就可以了。


当前文章:【从0开始Python开发实战】Python集成Active
文章URL:http://cdkjz.cn/article/pgphcj.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

业务热线:400-028-6601 / 大客户专线   成都:13518219792   座机:028-86922220