这篇文章将为大家详细讲解有关RocketMQ中怎么实现权限控制,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
创新互联10多年企业网站制作服务;为您提供网站建设,网站制作,网页设计及高端网站定制服务,企业网站制作及推广,对成都电动窗帘等多个方面拥有丰富的网站制作经验的网站建设公司。
1、简单使用
ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?
用户:用户是访问控制的基础要素,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。 资源:需要保护的对象,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。 权限:针对资源,能进行的操作。 角色:RocketMQ中,只定义两种角色:是否是管理员。
acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下
需要使用acl必须在服务端开启此功能,在Broker的配置文件中配置,aclEnable = true开启此功能
配置plain_acl.yml文件
globalWhiteRemoteAddresses: - 10.10.15.* - 192.168.0.* accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true
下面我们介绍一下plain_acl.yml文件中相关的参数含义及使用
字段 | 取值 | 含义 |
---|---|---|
globalWhiteRemoteAddresses | *;192.168.*.*;192.168.0.1 | 全局IP白名单 |
accessKey | 字符串 | Access Key 用户名 |
secretKey | 字符串 | Secret Key 密码 |
whiteRemoteAddress | *;192.168.*.*;192.168.0.1 | 用户IP白名单 |
admin | true;false | 是否管理员账户 |
defaultTopicPerm | DENY;PUB;SUB;PUB|SUB | 默认的Topic权限 |
defaultGroupPerm | DENY;PUB;SUB;PUB|SUB | 默认的ConsumerGroup权限 |
topicPerms | topic=权限 | 各个Topic的权限 |
groupPerms | group=权限 | 各个ConsumerGroup的权限 |
权限标识符的含义
权限 | 含义 |
---|---|
DENY | 拒绝 |
ANY | PUB 或者 SUB 权限 |
PUB | 发送权限 |
SUB | 订阅权限 |
处理流程
特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限
RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点
如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中
public class AclProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看结果
报错提示topicA没有权限,我们在plain_acl.yml文件中配置的也确实是RocketMQ用户拒绝,生产消费topicA主题信息,我们改变主题为topicB,则发现发送消息成功,topicB=PUB|SUB设置的权限是生产消费都可以。
查看结果
public class AclConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topicB", "*"); consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看结果:发现没有任何消息被消费,也没有报错信息,对于RocketMQ用户topicB设置的就是可以可以生产可以消费的,但是我们发现其groupA=DENY是拒绝的,说明消费组是groupA则拒绝消费任何消息,我们改成groupB或者groupC查看结果。
Broker端ACL原理图
Broker服务启动时创建BrokerController并初始化initialize()时调用acl相关的初始化方法initialAcl()
private void initialAcl() { //broker配置文件中是否开启ACL功能,默认关闭 if (!this.brokerConfig.isAclEnable()) { log.info("The broker dose not enable acl"); return; } //获取权限访问校验器的列表,加载的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向 //org.apache.rocketmq.acl.plain.PlainAccessValidator,默认只有一个 ListaccessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; //注册服务端就的“钩子”对象,对权限进行校验 this.registerServerRPCHook(new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
源码中有相关的注解,我们查看一下注册registerServerRPCHook方法
public void registerServerRPCHook(RPCHook rpcHook) { //服务端的NettyRemotingServer服务注册“钩子”函数 getRemotingServer().registerRPCHook(rpcHook); this.fastRemotingServer.registerRPCHook(rpcHook); }
关于NettyRemotingServer服务和NettyRemotingClient服务配合使用,后面章节RocketMQ Remoting会重点分析
PlainAccessValidator.parse(),根据客户端不同的请求Code其需要的检验资源也不一样
switch (request.getCode()) { //发送消息需要校验当前的账户的topic是否具有PUB权限 case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; //拉取消息时需要知道该consumer账户下拉取的topic是否具有SUB权限,并且还要知道订阅组consumerGroup是否有sub权限 case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; }
根据request.getCode()获取当前的操作需要的权限标识集合,供后面与系统的权限配置文件plain_acl.yml中的权限标识符校验时使用
Broker初始化相关服务的时候创建了PlainAccessValidator,我们发现其默认的构造方法中调用了其权限资源加载器PlainPermissionLoader
public PlainAccessValidator() { aclPlugEngine = new PlainPermissionLoader(); }
创建PlainPermissionLoader对象
public PlainPermissionLoader() { //加载服务端的权限文件plain_acl.yml load(); //开启线程每500ms检测权限文件是否改变,若改变则执行load()从新加载权限文件 watch(); }
查看load方法流程
public void load() { MapplainAccessResourceMap = new HashMap<>(); List globalWhiteRemoteAddressStrategy = new ArrayList<>(); JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class); if (plainAclConfData == null || plainAclConfData.isEmpty()) { throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName)); } log.info("Broker plain acl conf data is : ", plainAclConfData.toString()); //获取全局白名单IP集合 JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } } //获取账户权限集合 JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { List plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { //构建每个账户的权限资源 PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); //放入Map中AccessKey作为key,该账户的权限资源作为value plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap; }
加载资源文件,解析其中的权限标识,等待权限校验器PlainAccessValidator调用其validate()对权限校验
核心的校验方法PlainPermissionLoader.validate()
public void validate(PlainAccessResource plainAccessResource) { //全局的白名单IP进行校验 for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { //匹配成功说明是全局的白名单IP,具有所有权限,直接返回。 if (remoteAddressStrategy.match(plainAccessResource)) { return; } } //判断用户名是否为空,null则抛出AclException异常 if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } //校验账户是否存在于服务端的权限资源文件中plain_acl.yml,不在则抛出异常 if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); //检查该账户的白名单IP是否匹配上客户端IP,匹配成功具有所有权限,除UPDATE_AND_CREATE_TOPIC等特殊权限需要管理员权限 if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; } //校验签名 String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } //校验账户内的资源权限 checkPerm(plainAccessResource, ownedAccess); }
查看其对于当前账户内部的资源校验
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { //判断请求的命令的Code是否需要管理员权限,并判断该用户是否是管理员 if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); } MapneedCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } for (Map.Entry needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); //判断是否是group,在构建resourcePermMap时候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup boolean isGroup = PlainAccessResource.isRetryTopic(resource); //系统的权限配置文件中配置项包不含该客户端命令请求需要的权限 if (!ownedPermMap.containsKey(resource)) { //判断其是否是topic还是group的权限标识,获取该类型的全局的权限是什么 byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : needCheckedAccess.getDefaultTopicPerm(); //核对权限 if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } //系统的权限配置文件中配置项包含该客户端命令请求需要的权限,则直接判断其权限 if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } } }
所有的检验流程如果有一项不满足则抛出AclException异常
上面图中只是分析了Broker服务端的处理流程,客户端如何调用我们具体分析下我们以发送消息为例:
我们之前分析过Producer的消息发送的核心方法是DefaultMQProducerImpl.sendKernelImpl()该方法
//是否注册了“钩子” if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } //封装其ACL请求的参数信息 this.executeSendMessageHookBefore(context); }
hasSendMessageHook(),我们在构建Producer的时候创建了该对象,加入到DefaultMQProducerImpl的sendMessageHookList属性中。
我们查看其发送消息NettyRemotingClient类中调用AclClientRPCHook.doBeforeRequest()发送前的数据准备
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); request.addExtField(SIGNATURE, signature); request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } }
关于RocketMQ中怎么实现权限控制就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。