获取 docker 镜像 最新安装emqx
docker pull emqx/emqx
启动容器docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
.net6 连接 添加nuget包
1883
连接端口18083
后台端口ip:18083
默认用户admin
默认密码public
首次登录会提示修改密码
封装
MQTTnet
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using System.Text;
////// mqtt客户端
/// public class MqttClientBase
{////// 客户端
/// public IMqttClient client;
////// 订阅主题列表
/// public ListTopics;
////// 初始化客户端
/// ///服务器地址///服务器端口 一般为1883///用户名///密码///客户端id///连接成功事件public MqttClientBase(string server,int port,string username,string password,string cliendID, Action? OnConnected = null)
{//创建客户端
client = new MqttFactory().CreateMqttClient();
//订阅主题列表
Topics = new List();
//客户端参数
var mqttOptions = new MqttClientOptions()
{ClientId = cliendID,//客户端id
ChannelOptions = new MqttClientTcpOptions()
{Server = server,//服务器地址
Port = port//服务器端口 一般为1883
},
//设置用户和密码
Credentials = new MqttClientCredentials(username, Encoding.UTF8.GetBytes(password)),
CleanSession = false,
//设置心跳
KeepAlivePeriod = TimeSpan.FromSeconds(30),
};
if (OnConnected != null)
{//Mqtt客户端连接成功
client.ConnectedAsync += (arg) =>{OnConnected(arg);
return Task.CompletedTask;
};
}
//连接服务器,这里需要等待异步完成连接,否则后面会报错
var res=client.ConnectAsync(mqttOptions).Result;
}
////// 连接断开 事件
/// public void OnDisconnected(Actionaction)
{//Mqtt客户端连接断开
client.DisconnectedAsync += (arg) =>{action(arg);
return Task.CompletedTask;
};
}
////// 接收到消息事件
/// 参数1 topic 订阅主题
/// 参数2 data 解析成字符串的内容
/// 参数3 arg 原始消息内容
/// public void OnMessage(Actionaction)
{//接收消息
client.ApplicationMessageReceivedAsync += (arg) =>{var buff = arg.ApplicationMessage.Payload;
var data = buff.BToString();
action(arg.ApplicationMessage.Topic, data ?? null, arg);
return Task.CompletedTask;
};
}
////// 添加订阅主题
/// ////// public async Task AddTopic(string topic)
{//将主题名称加入列表
Topics.Add(new MqttTopicFilter() {Topic = topic });
//更新订阅
client.SubscribeAsync(new MqttClientSubscribeOptions()
{TopicFilters = Topics
});
}
////// 发送消息
/// ///主题///内容/// public async Task Publish(string topic, string content)
{client.PublishAsync(new MqttApplicationMessage()
{Topic = topic,
Payload = Encoding.UTF8.GetBytes(content)
});
}
}
使用
创建客户端MqttClientBase mqtt = new MqttClientBase("ip", 端口, "用户名", "密码", "客户端id");
// 带连接成功事件
//MqttClientBase mqtt = new MqttClientBase("ip", 端口, "用户名", "密码", "客户端id", arg =>{// Console.WriteLine("连接成功");
//});
添加订阅mqtt.AddTopic("666");
mqtt.AddTopic("777");
连接断开事件mqtt.OnDisconnected(arg =>{Console.WriteLine("连接断开");
});
接收订阅消息事件mqtt.OnMessage((topic,data,arg) =>{Console.WriteLine($"{topic}===={data}");
});
发布消息mqtt.Publish("666",x);
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧