You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
5.3 KiB

using Container.Common;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
namespace Container.Business
{
public class MqServer
{
private static App app = ((App)Application.Current);
//public static RabbitConfig config;
public static ConnectionFactory factory = new ConnectionFactory
{
UserName = app.rabbitMQ.UserName,
Password = app.rabbitMQ.Password,
HostName = app.rabbitMQ.HostName,
Port = app.rabbitMQ.Port,
};
static IModel models = null;
private static Class_Log _log = new Class_Log();
static string exchangeName = "topic";
static string exchangeName_temp = "topic_immediately";
public MqServer()
{
Create();
}
static public bool Create()
{
factory.AutomaticRecoveryEnabled = true;
//创建连接
var conn = factory.CreateConnection();
conn.CallbackException += Connetion_CallbackException;
conn.RecoverySucceeded += Connetion_RecoverySucceeded;
conn.ConnectionRecoveryError += Connetion_ConnectionRecoveryError;
conn.ConnectionBlocked += Connetion_ConnectionBlocked;
conn.ConnectionUnblocked += Connetion_ConnectionUnblocked;
//连接关闭的时候
conn.ConnectionShutdown += Connetion_ConnectionShutdown;
{
models = conn.CreateModel();
//声明持久消息队列
models.ExchangeDeclare(exchangeName, "topic", true, false, null);
//声明即时消息队列
models.ExchangeDeclare(exchangeName_temp, "topic", true, false, null);
//models.ExchangeDeclare("topic_temp", "topic", true, false, null);
//models.QueueDelete();
//models.QueueDeclare("deviceInfo", true, false, false, null);
//Console.WriteLine("生产者启动成功");
_log.WriteLogFile("Start-"+"消息中心启动成功", "MqServer");
//for (int i = 0; i < 5; i++)
//{
// models.BasicPublish("message", "myRabbit", mandatory: true, null, Encoding.UTF8.GetBytes(i + ""));
//}
}
return true;
}
private static bool Publish(string Msg, string routingKey = "default", bool isDurable = false)
{
try
{
_log.WriteLogFile(routingKey);
_log.WriteLogFile(Msg);
models.BasicPublish(exchange: isDurable ? exchangeName : exchangeName_temp, routingKey: "device." + routingKey, mandatory: true, body: Encoding.UTF8.GetBytes(Msg));
//Console.WriteLine("推送消息成功");
}
catch (Exception ex)
{
_log.WriteLogFile(routingKey+ex.Message, "MqServer");
}
return true;
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="module"></param>
/// <param name="content"></param>
/// <param name="Queue"></param>
/// <param name="isDurable">是否持久化</param>
/// <returns></returns>
public static void PublishMessage(string module, object content, string[] Queue, bool isDurable = true)
{
try
{
string msg = JsonConvert.SerializeObject(new { content, module, MallCode = app.mallCode });
if (Queue.Length == 0)
Publish(msg, isDurable: isDurable);
else
foreach (var item in Queue)
{
Publish(msg, item, isDurable);
}
}
catch (Exception ex)
{
_log.WriteLogFile("rabbitMq--" + "PublishMessage"+ex.Message, "MqServer");
}
}
private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
_log.WriteLogFile("rabbitMq--"+ "Connetion_ConnectionShutdown", "MqServer");
}
private static void Connetion_ConnectionUnblocked(object sender, EventArgs e)
{
_log.WriteLogFile("rabbitMq--" + "Connetion_ConnectionUnblocked", "MqServer");
}
private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
{
_log.WriteLogFile("rabbitMq--" + "Connetion_ConnectionBlocked", "MqServer");
}
private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e)
{
_log.WriteLogFile("rabbitMq--" + "Connetion_ConnectionRecoveryError", "MqServer");
}
private static void Connetion_RecoverySucceeded(object sender, EventArgs e)
{
_log.WriteLogFile("rabbitMq--" + "Connetion_RecoverySucceeded", "MqServer");
}
private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
{
_log.WriteLogFile("rabbitMq--" + "Connetion_CallbackException", "MqServer");
}
}
}