You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

352 lines
14 KiB

using IOTContainer.Common;
using IOTContainer.Model;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IOTContainer.Communication
{
public class MqttClientCom
{
public MqttClient mqttClient = null;
public List<MqttClient> mqttClients = new List<MqttClient>();
public string TcpServer = "127.0.0.1";
public int TcpPort = 1833;
private string _clientId;
private string _pwd;
private string _username;
public MqttClientCom()
{
TcpServer = ComParameters.Parameters.mqttServer;
TcpPort = ComParameters.Parameters.mqttPort;
_pwd = HttpComm.Http.GetPwd();
LocalStorage.InsertPipTable("infos", ConfigKey.pwd, _pwd);
}
private Timer timer;
public void Start(string username)
{
ClientStart(username, _pwd);
timer = new Timer(_ => TimeRun(), null, 0, 30000);
}
private void TimeRun()
{
try
{
HttpComm.Http.IsMqttConnect();
}
catch (Exception)
{
throw;
}
if (mqttClient == null || mqttClient.IsConnected == false)
{
Log.MyLog.WriteLogFile("重新连接中...", "MqttMessage");
ClientStart(ComParameters.Parameters.devCode, _pwd);
}
}
/// <summary>
/// MQTT协议参数解释
/// https://docs.emqx.cn/broker/v4.3/development/protocol.html#mqtt%E5%8D%8F%E8%AE%AE
/// </summary>
/// <param name="username"></param>
/// <param name="password"></param>
public async void ClientStart(string username = "test-host:admin-test", string password = "test")
{
try
{
var tcpServer = TcpServer;
var tcpPort = TcpPort;
_username = username;
var mqttPassword = password;
_clientId = ComParameters.Parameters.devCode;
var mqttFactory = new MqttFactory();
Log.MyLog.WriteLogFile(tcpServer + "-" + tcpPort + "-" + _username + "-" + password, "MqttMessage");
//var options = new MqttClientOptions
//{
// ClientId = _clientId,
// ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
// ChannelOptions = new MqttClientTcpOptions
// {
// Server = tcpServer,
// Port = tcpPort,
// //TlsOptions = new MqttClientTlsOptions
// //{
// // UseTls = true,
// // AllowUntrustedCertificates = true,
// // IgnoreCertificateChainErrors = true,
// // IgnoreCertificateRevocationErrors = true,
// // CertificateValidationHandler = _ => { return true; },
// // SslProtocol = (System.Security.Authentication.SslProtocols)12288
// //}
// },
// WillDelayInterval = 10,
// //遗嘱,下线时发送
// WillMessage = new MqttApplicationMessage
// {
// //Topic = $"LastWill/{_clientId}",
// //Payload = Encoding.UTF8.GetBytes("I Lost the Connection!"),
// Topic = $"/{ComParameters.Parameters.projectCode}/{username}",
// Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MQTTMessageModel(_clientId,"app-offline"))),
// QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce
// }
//};
//if (options.ChannelOptions == null)
//{
// throw new InvalidOperationException();
//}
//if (!string.IsNullOrWhiteSpace(_username))
//{
// options.Credentials = new MqttClientCredentials
// {
// Username = _username,
// Password = Encoding.UTF8.GetBytes(mqttPassword)
// };
//}
#region
var optionss = new MqttClientOptionsBuilder()
.WithTcpServer(tcpServer, tcpPort)
.WithClientId(_clientId)
.WithCredentials(_username, mqttPassword)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithWillDelayInterval(10)
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
//.WithCommunicationTimeout(TimeSpan.FromSeconds(60))
.WithWillMessage(new MqttApplicationMessage
{
//Topic = $"LastWill/{_clientId}",
//Payload = Encoding.UTF8.GetBytes("I Lost the Connection!"),
Topic = $"/{ComParameters.Parameters.projectCode}/{username}",
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MQTTMessageModel(_clientId, "app-offline"))),
QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce
})
.WithTls(
o =>
{
// The used public broker sometimes has invalid certificates. This sample accepts all
// certificates. This should not be used in live environments.
o.CertificateValidationHandler = _ => true;
// The default value is determined by the OS. Set manually to force version.
o.SslProtocol = SslProtocols.Tls12;
}
)
.Build();
#endregion
// 设置为false,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
// 设置为true,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
//options.CleanSession = false;
// 连接保活心跳
//options.KeepAlivePeriod = TimeSpan.FromSeconds(10);
mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Action<MqttClientConnectedEventArgs>(e =>
{
Log.MyLog.WriteLogFile("客户端已连接", "MqttMessage");
SubscribeDev();
PublishOnline();
//Subscribe($"/{mallCode}/{usname}");
//Subscribe($"/{ComParameters.Parameters.projectCode}/#");
}));
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Action<MqttClientDisconnectedEventArgs>(e =>
{
Log.MyLog.WriteLogFile("客户端已断开连接", "MqttMessage");
}));
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(e =>
{
string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
string topic = e.ApplicationMessage.Topic;
string qos = e.ApplicationMessage.QualityOfServiceLevel.ToString();
string retained = e.ApplicationMessage.Retain.ToString();
Log.MyLog.WriteLogFile($"客户端接收消息 >>Topic:{topic}; Qos:{qos}; Retained:{retained}" + Environment.NewLine + $"客户端接收消息 >>Msg:{text}", "MqttMessage");
var res = JObject.Parse(text);
var timestamp = res.Value<long>("timestamp");
var type = res.Value<string>("type");
DeviceWork.TopicClassification(type, text);
//var now = DateTime.Now.AddMinutes(-2).GetMilliTimeStamp();
//if (now <= timestamp)
//{
// DeviceWork.TopicClassification(type, text);
//}
}));
//mqttClients.Add(mqttClient);
await mqttClient.ConnectAsync(optionss);
Log.MyLog.WriteLogFile($"客户端{optionss.ClientId}正在连接...", "MqttMessage");
}
catch (Exception ex)
{
Log.MyLog.WriteLogFile($"客户端尝试连接出错:" + ex.Message, "MqttMessage");
}
}
private Task OnMqttClientConnected(MqttClientConnectedEventArgs obj)
{
throw new NotImplementedException();
}
private void btnDisconnect_Click(object sender, EventArgs e)
{
ClientStop();
}
public async void ClientStop()
{
try
{
if (mqttClient == null)
{
return;
}
await mqttClient.DisconnectAsync();
mqttClient = null;
}
catch (Exception ex)
{
Log.MyLog.WriteLogFile($"客户端尝试断开连接出错:" + ex.Message, "MqttMessage");
}
}
/// <summary>
/// 订阅
/// </summary>
public void Subscribe(string topic)
{
try
{
if (mqttClient == null || !mqttClient.IsConnected)
{
return;
}
if (string.IsNullOrWhiteSpace(topic))
{
Log.MyLog.WriteLogFile($"客户端订阅主题不能为空", "MqttMessage");
return;
}
ClientSubscribeTopic(topic);
}
catch (Exception ex)
{
Log.MyLog.WriteLogFile($"客户端订阅出错:" + ex.Message, "MqttMessage");
}
}
public async void ClientSubscribeTopic(string topic)
{
try
{
await mqttClient.SubscribeAsync(topic);
Log.MyLog.WriteLogFile($"客户端{mqttClient.Options.CleanSession}订阅{topic}成功!", "MqttMessage");
}
catch (Exception ex)
{
Log.MyLog.WriteLogFile($"客户端订阅出错:" + ex.Message, "MqttMessage");
}
}
/// <summary>
/// 发布
/// </summary>
public void Publish(string topic, string message)
{
try
{
if (mqttClient == null || !mqttClient.IsConnected)
{
return;
}
var topics = topic;
var payload = message;
if (string.IsNullOrWhiteSpace(topic))
{
Log.MyLog.WriteLogFile($"客户端发布主题不能为空", "MqttMessage");
return;
}
if (string.IsNullOrWhiteSpace(payload))
{
Log.MyLog.WriteLogFile($"客户端发布内容不能为空", "MqttMessage");
return;
}
ClientPublishTopic(topic, payload);
}
catch (Exception ex)
{
Log.MyLog.WriteLogFile($"客户端发布主题出错:" + ex.Message, "MqttMessage");
ClientStop();
}
}
public async void ClientPublishTopic(string topic, string payload)
{
try
{
var message = new MqttApplicationMessage
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload),
QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce,
Retain = false
};
await mqttClient.PublishAsync(message, CancellationToken.None);
Log.MyLog.WriteLogFile($"客户端{mqttClient.Options.ClientId}发布主题{topic}成功", "MqttMessage");
}
catch (Exception ex)
{
Log.MyLog.WriteLogFile($"客户端{mqttClient.Options.ClientId}发布主题{topic}出错:" + ex.Message, "MqttMessage");
}
}
#region Subscribe
public void SubscribeDev()
{
Subscribe($"/{ComParameters.Parameters.projectCode}/{ComParameters.Parameters.devCode}");
}
public void SubscribeGroup(string groupCode)
{
Subscribe($"/{ComParameters.Parameters.projectCode}/{groupCode}");
}
#endregion
#region Publish
public void PublishOnline()
{
var data = new MQTTMessageModel(_clientId);
Publish($"/{ComParameters.Parameters.projectCode}/{_username}", JsonConvert.SerializeObject(data));//online消息
}
#endregion
}
}