using Container.Common; using Newtonsoft.Json; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows; namespace Container.Business { public class MqServer { private static App app = ((App)Application.Current); //public static RabbitConfig config; public static ConnectionFactory factory = new ConnectionFactory { UserName = app.rabbitMQ.UserName, Password = app.rabbitMQ.Password, HostName = app.rabbitMQ.HostName, Port = app.rabbitMQ.Port, }; static IModel models = null; private static Class_Log _log = new Class_Log(); static string exchangeName = "topic"; static string exchangeName_temp = "topic_immediately"; public MqServer() { Create(); } static public bool Create() { factory.AutomaticRecoveryEnabled = true; //创建连接 var conn = factory.CreateConnection(); conn.CallbackException += Connetion_CallbackException; conn.RecoverySucceeded += Connetion_RecoverySucceeded; conn.ConnectionRecoveryError += Connetion_ConnectionRecoveryError; conn.ConnectionBlocked += Connetion_ConnectionBlocked; conn.ConnectionUnblocked += Connetion_ConnectionUnblocked; //连接关闭的时候 conn.ConnectionShutdown += Connetion_ConnectionShutdown; { models = conn.CreateModel(); //声明持久消息队列 models.ExchangeDeclare(exchangeName, "topic", true, false, null); //声明即时消息队列 models.ExchangeDeclare(exchangeName_temp, "topic", true, false, null); //models.ExchangeDeclare("topic_temp", "topic", true, false, null); //models.QueueDelete(); //models.QueueDeclare("deviceInfo", true, false, false, null); //Console.WriteLine("生产者启动成功"); _log.WriteLogFile("Start-"+"消息中心启动成功", "MqServer"); //for (int i = 0; i < 5; i++) //{ // models.BasicPublish("message", "myRabbit", mandatory: true, null, Encoding.UTF8.GetBytes(i + "")); //} } return true; } private static bool Publish(string Msg, string routingKey = "default", bool isDurable = false) { try { _log.WriteLogFile(routingKey); _log.WriteLogFile(Msg); models.BasicPublish(exchange: isDurable ? exchangeName : exchangeName_temp, routingKey: "device." + routingKey, mandatory: true, body: Encoding.UTF8.GetBytes(Msg)); //Console.WriteLine("推送消息成功"); } catch (Exception ex) { _log.WriteLogFile(routingKey+ex.Message, "MqServer"); } return true; } /// /// 发布消息 /// /// /// /// /// 是否持久化 /// public static void PublishMessage(string module, object content, string[] Queue, bool isDurable = true) { try { string msg = JsonConvert.SerializeObject(new { content, module, MallCode = app.mallCode }); if (Queue.Length == 0) Publish(msg, isDurable: isDurable); else foreach (var item in Queue) { Publish(msg, item, isDurable); } } catch (Exception ex) { _log.WriteLogFile("rabbitMq--" + "PublishMessage"+ex.Message, "MqServer"); } } private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e) { _log.WriteLogFile("rabbitMq--"+ "Connetion_ConnectionShutdown", "MqServer"); } private static void Connetion_ConnectionUnblocked(object sender, EventArgs e) { _log.WriteLogFile("rabbitMq--" + "Connetion_ConnectionUnblocked", "MqServer"); } private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e) { _log.WriteLogFile("rabbitMq--" + "Connetion_ConnectionBlocked", "MqServer"); } private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e) { _log.WriteLogFile("rabbitMq--" + "Connetion_ConnectionRecoveryError", "MqServer"); } private static void Connetion_RecoverySucceeded(object sender, EventArgs e) { _log.WriteLogFile("rabbitMq--" + "Connetion_RecoverySucceeded", "MqServer"); } private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e) { _log.WriteLogFile("rabbitMq--" + "Connetion_CallbackException", "MqServer"); } } }