forked from yanw/App_win_iot_V2.0
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
352 lines
14 KiB
352 lines
14 KiB
using IOTContainer.Common;
|
|
using IOTContainer.Model;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using MQTTnet.Client.Connecting;
|
|
using MQTTnet.Client.Disconnecting;
|
|
using MQTTnet.Client.Options;
|
|
using MQTTnet.Client.Receiving;
|
|
using Newtonsoft.Json;
|
|
using Newtonsoft.Json.Linq;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Security.Authentication;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace IOTContainer.Communication
|
|
{
|
|
public class MqttClientCom
|
|
{
|
|
public MqttClient mqttClient = null;
|
|
|
|
public List<MqttClient> mqttClients = new List<MqttClient>();
|
|
|
|
public string TcpServer = "127.0.0.1";
|
|
|
|
public int TcpPort = 1833;
|
|
private string _clientId;
|
|
private string _pwd;
|
|
private string _username;
|
|
|
|
public MqttClientCom()
|
|
{
|
|
TcpServer = ComParameters.Parameters.mqttServer;
|
|
TcpPort = ComParameters.Parameters.mqttPort;
|
|
_pwd = HttpComm.Http.GetPwd();
|
|
LocalStorage.InsertPipTable("infos", ConfigKey.pwd, _pwd);
|
|
}
|
|
private Timer timer;
|
|
public void Start(string username)
|
|
{
|
|
ClientStart(username, _pwd);
|
|
timer = new Timer(_ => TimeRun(), null, 0, 30000);
|
|
}
|
|
private void TimeRun()
|
|
{
|
|
try
|
|
{
|
|
HttpComm.Http.IsMqttConnect();
|
|
}
|
|
catch (Exception)
|
|
{
|
|
|
|
throw;
|
|
}
|
|
if (mqttClient == null || mqttClient.IsConnected == false)
|
|
{
|
|
Log.MyLog.WriteLogFile("重新连接中...", "MqttMessage");
|
|
ClientStart(ComParameters.Parameters.devCode, _pwd);
|
|
}
|
|
}
|
|
/// <summary>
|
|
/// MQTT协议参数解释
|
|
/// https://docs.emqx.cn/broker/v4.3/development/protocol.html#mqtt%E5%8D%8F%E8%AE%AE
|
|
/// </summary>
|
|
/// <param name="username"></param>
|
|
/// <param name="password"></param>
|
|
public async void ClientStart(string username = "test-host:admin-test", string password = "test")
|
|
{
|
|
try
|
|
{
|
|
var tcpServer = TcpServer;
|
|
var tcpPort = TcpPort;
|
|
_username = username;
|
|
var mqttPassword = password;
|
|
_clientId = ComParameters.Parameters.devCode;
|
|
var mqttFactory = new MqttFactory();
|
|
Log.MyLog.WriteLogFile(tcpServer + "-" + tcpPort + "-" + _username + "-" + password, "MqttMessage");
|
|
|
|
//var options = new MqttClientOptions
|
|
//{
|
|
|
|
// ClientId = _clientId,
|
|
// ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
|
|
// ChannelOptions = new MqttClientTcpOptions
|
|
// {
|
|
// Server = tcpServer,
|
|
// Port = tcpPort,
|
|
// //TlsOptions = new MqttClientTlsOptions
|
|
// //{
|
|
// // UseTls = true,
|
|
// // AllowUntrustedCertificates = true,
|
|
// // IgnoreCertificateChainErrors = true,
|
|
// // IgnoreCertificateRevocationErrors = true,
|
|
// // CertificateValidationHandler = _ => { return true; },
|
|
// // SslProtocol = (System.Security.Authentication.SslProtocols)12288
|
|
// //}
|
|
// },
|
|
// WillDelayInterval = 10,
|
|
|
|
// //遗嘱,下线时发送
|
|
// WillMessage = new MqttApplicationMessage
|
|
// {
|
|
// //Topic = $"LastWill/{_clientId}",
|
|
// //Payload = Encoding.UTF8.GetBytes("I Lost the Connection!"),
|
|
// Topic = $"/{ComParameters.Parameters.projectCode}/{username}",
|
|
// Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MQTTMessageModel(_clientId,"app-offline"))),
|
|
// QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce
|
|
// }
|
|
//};
|
|
//if (options.ChannelOptions == null)
|
|
//{
|
|
// throw new InvalidOperationException();
|
|
//}
|
|
|
|
//if (!string.IsNullOrWhiteSpace(_username))
|
|
//{
|
|
// options.Credentials = new MqttClientCredentials
|
|
// {
|
|
// Username = _username,
|
|
// Password = Encoding.UTF8.GetBytes(mqttPassword)
|
|
// };
|
|
//}
|
|
#region
|
|
var optionss = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(tcpServer, tcpPort)
|
|
.WithClientId(_clientId)
|
|
.WithCredentials(_username, mqttPassword)
|
|
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
|
|
.WithWillDelayInterval(10)
|
|
.WithCleanSession(true)
|
|
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
|
|
//.WithCommunicationTimeout(TimeSpan.FromSeconds(60))
|
|
.WithWillMessage(new MqttApplicationMessage
|
|
{
|
|
//Topic = $"LastWill/{_clientId}",
|
|
//Payload = Encoding.UTF8.GetBytes("I Lost the Connection!"),
|
|
Topic = $"/{ComParameters.Parameters.projectCode}/{username}",
|
|
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MQTTMessageModel(_clientId, "app-offline"))),
|
|
QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce
|
|
})
|
|
.WithTls(
|
|
o =>
|
|
{
|
|
// The used public broker sometimes has invalid certificates. This sample accepts all
|
|
// certificates. This should not be used in live environments.
|
|
o.CertificateValidationHandler = _ => true;
|
|
|
|
// The default value is determined by the OS. Set manually to force version.
|
|
o.SslProtocol = SslProtocols.Tls12;
|
|
}
|
|
)
|
|
.Build();
|
|
#endregion
|
|
// 设置为false,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
|
|
// 设置为true,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
|
|
//options.CleanSession = false;
|
|
|
|
// 连接保活心跳
|
|
//options.KeepAlivePeriod = TimeSpan.FromSeconds(10);
|
|
|
|
mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
|
|
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Action<MqttClientConnectedEventArgs>(e =>
|
|
{
|
|
Log.MyLog.WriteLogFile("客户端已连接", "MqttMessage");
|
|
|
|
SubscribeDev();
|
|
PublishOnline();
|
|
//Subscribe($"/{mallCode}/{usname}");
|
|
//Subscribe($"/{ComParameters.Parameters.projectCode}/#");
|
|
}));
|
|
|
|
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Action<MqttClientDisconnectedEventArgs>(e =>
|
|
{
|
|
Log.MyLog.WriteLogFile("客户端已断开连接", "MqttMessage");
|
|
}));
|
|
|
|
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(e =>
|
|
{
|
|
string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
|
|
string topic = e.ApplicationMessage.Topic;
|
|
string qos = e.ApplicationMessage.QualityOfServiceLevel.ToString();
|
|
string retained = e.ApplicationMessage.Retain.ToString();
|
|
Log.MyLog.WriteLogFile($"客户端接收消息 >>Topic:{topic}; Qos:{qos}; Retained:{retained}" + Environment.NewLine + $"客户端接收消息 >>Msg:{text}", "MqttMessage");
|
|
var res = JObject.Parse(text);
|
|
var timestamp = res.Value<long>("timestamp");
|
|
var type = res.Value<string>("type");
|
|
DeviceWork.TopicClassification(type, text);
|
|
//var now = DateTime.Now.AddMinutes(-2).GetMilliTimeStamp();
|
|
//if (now <= timestamp)
|
|
//{
|
|
// DeviceWork.TopicClassification(type, text);
|
|
//}
|
|
}));
|
|
|
|
//mqttClients.Add(mqttClient);
|
|
await mqttClient.ConnectAsync(optionss);
|
|
|
|
Log.MyLog.WriteLogFile($"客户端{optionss.ClientId}正在连接...", "MqttMessage");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端尝试连接出错:" + ex.Message, "MqttMessage");
|
|
}
|
|
}
|
|
|
|
private Task OnMqttClientConnected(MqttClientConnectedEventArgs obj)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
private void btnDisconnect_Click(object sender, EventArgs e)
|
|
{
|
|
ClientStop();
|
|
}
|
|
|
|
public async void ClientStop()
|
|
{
|
|
try
|
|
{
|
|
if (mqttClient == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
await mqttClient.DisconnectAsync();
|
|
mqttClient = null;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端尝试断开连接出错:" + ex.Message, "MqttMessage");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 订阅
|
|
/// </summary>
|
|
public void Subscribe(string topic)
|
|
{
|
|
try
|
|
{
|
|
if (mqttClient == null || !mqttClient.IsConnected)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(topic))
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端订阅主题不能为空", "MqttMessage");
|
|
return;
|
|
}
|
|
|
|
ClientSubscribeTopic(topic);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端订阅出错:" + ex.Message, "MqttMessage");
|
|
}
|
|
}
|
|
|
|
public async void ClientSubscribeTopic(string topic)
|
|
{
|
|
try
|
|
{
|
|
await mqttClient.SubscribeAsync(topic);
|
|
Log.MyLog.WriteLogFile($"客户端{mqttClient.Options.CleanSession}订阅{topic}成功!", "MqttMessage");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
|
|
Log.MyLog.WriteLogFile($"客户端订阅出错:" + ex.Message, "MqttMessage");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发布
|
|
/// </summary>
|
|
public void Publish(string topic, string message)
|
|
{
|
|
try
|
|
{
|
|
if (mqttClient == null || !mqttClient.IsConnected)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var topics = topic;
|
|
var payload = message;
|
|
|
|
if (string.IsNullOrWhiteSpace(topic))
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端发布主题不能为空", "MqttMessage");
|
|
return;
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(payload))
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端发布内容不能为空", "MqttMessage");
|
|
return;
|
|
}
|
|
|
|
ClientPublishTopic(topic, payload);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端发布主题出错:" + ex.Message, "MqttMessage");
|
|
ClientStop();
|
|
}
|
|
}
|
|
|
|
public async void ClientPublishTopic(string topic, string payload)
|
|
{
|
|
try
|
|
{
|
|
var message = new MqttApplicationMessage
|
|
{
|
|
Topic = topic,
|
|
Payload = Encoding.UTF8.GetBytes(payload),
|
|
QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce,
|
|
Retain = false
|
|
};
|
|
|
|
await mqttClient.PublishAsync(message, CancellationToken.None);
|
|
|
|
Log.MyLog.WriteLogFile($"客户端{mqttClient.Options.ClientId}发布主题{topic}成功", "MqttMessage");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Log.MyLog.WriteLogFile($"客户端{mqttClient.Options.ClientId}发布主题{topic}出错:" + ex.Message, "MqttMessage");
|
|
}
|
|
}
|
|
#region Subscribe
|
|
public void SubscribeDev()
|
|
{
|
|
Subscribe($"/{ComParameters.Parameters.projectCode}/{ComParameters.Parameters.devCode}");
|
|
}
|
|
public void SubscribeGroup(string groupCode)
|
|
{
|
|
Subscribe($"/{ComParameters.Parameters.projectCode}/{groupCode}");
|
|
}
|
|
#endregion
|
|
#region Publish
|
|
public void PublishOnline()
|
|
{
|
|
var data = new MQTTMessageModel(_clientId);
|
|
Publish($"/{ComParameters.Parameters.projectCode}/{_username}", JsonConvert.SerializeObject(data));//online消息
|
|
}
|
|
#endregion
|
|
}
|
|
}
|