nMqtt类实现客户端和服务端
服务端:
客户端: using System;using System.Collections;using System.Collections.Generic;using System.Linq;using System.ComponentModel;using System.Data;using System.Drawing;using System.Text;using System.Windows.Forms;using System.Net.Sockets;using System.Threading;using System.Net;using System.IO;using LitJson;using AsyncIocpServer;using nMqtt.Messages;namespace nMqttServer{  public partial class Form1 : Form  {    //主窗体    public static Form1 mainform = null;    //server服务    public static IocpServer serverSocket;    public static MqttServer mqttServer = new MqttServer();    public string IPAddr;    public int maxRecePackIndex;    public int recePackIndex;    public int ipNum;    public int qos;    public bool isShowUser;    public bool startButton;    public delegate void SetListBoxCallBack(string str);    public SetListBoxCallBack setlistboxcallback;    public static void Dubeg(string str)//跨线程调用    {      Form1.mainform.Invoke(Form1.mainform.setlistboxcallback, str);    }    public void SetListBox(string str)    {      if (listBox1.Items.Count > 100)        listBox1.Items.Clear();      listBox1.Items.Insert(0, str);      listBox1.SelectedIndex = 0;    }    //获取主窗体    public static Form1 GetMainForm()    {      if (mainform != null)        return mainform;      return null;    }    /// <summary>    /// 验证IP地址是否有效    /// </summary>    /// <param name="ip"></param>    /// <returns></returns>    private static bool IsCorrentIP(string ip)    {      string pattrn = @"(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])";      if (System.Text.RegularExpressions.Regex.IsMatch(ip, pattrn))        return true;      else        return false;    }    public Form1()    {      InitializeComponent();      setlistboxcallback = new SetListBoxCallBack(SetListBox);      Form1.mainform = this;      isShowUser = false;      startButton = false;      qosComboBox.SelectedIndex = 0;      ipNum = 0;      mqttServer.ReaderRetainMessages();      IPAddress[] addressList = Dns.GetHostEntry(Environment.MachineName).AddressList;      foreach (IPAddress ip in addressList)      {        if (IsCorrentIP(ip.ToString()))        {          ipNum ;          ipComboBox.Items.Add(ip);          //SetListBox(String.Format("addressList {0} ", ip));          if (ipComboBox.Items.Count > 0)            ipComboBox.SelectedIndex = ipComboBox.Items.Count - 1;        }      }      if (ipNum > 0)        msg.Text = "有多个IP!";    }    // 断开客户端事件    public void disconnect(object sender, SocketAsyncEventArgs e)    {      AsyncUserToken userToken = e.UserToken as AsyncUserToken;      try      {        mqttServer.RemoveIdFromSubscribeTopic(userToken.ConnectionId);        mqttServer.DelMqttClientToConnectionId(userToken.ConnectionId);      }      catch (Exception ex)      {      }    }    // 接收到数据事件    public void clientread(object sender, SocketAsyncEventArgs e)    {      AsyncUserToken userToken = e.UserToken as AsyncUserToken;      try      {        byte[] destArray = new byte[e.BytesTransferred];// 目的字节数组        Array.Copy(userToken.ReceiveBuffer, e.Offset, destArray, 0, e.BytesTransferred);        //string str = System.Text.Encoding.UTF8.GetString(destArray);        if (destArray[0] == 0xAB && destArray[2] == 0xAB)        {          //自定义协议        }        else        {          //MQTT协议解码          //mqttServer.DecodeMessage(destArray, userToken.ConnectionId);//解码消息        }        recePackIndex ;      }      catch (Exception ex)       {        MessageBox.Show(ex.ToString());      }    }    private void timer1_Tick(object sender, EventArgs e)    {      topicNum.Text = mqttServer.SubscribeTopicCount.ToString();      userNum.Text = mqttServer.ActiveMqttClientCount.ToString();      if (serverSocket != null)        connCnt.Text = serverSocket.ClientCount.ToString();    }    private void timer2_Tick(object sender, EventArgs e)    {      if (recePackIndex > maxRecePackIndex)        maxRecePackIndex = recePackIndex;      packIndex.Text = maxRecePackIndex.ToString();      msg.Text = "每秒接收 "  recePackIndex  " 个数据包";      recePackIndex = 0;    }    private void startBtn_Click(object sender, EventArgs e)    {      try      {        int m_port = 0;        int count = 0;        int buffersize = 0;        if (startButton == false)        {          if (!int.TryParse(maxNum.Text, out count))          {            MessageBox.Show("连接数量错误!请重新填写!");            return;          }          if (!int.TryParse(buffer.Text, out buffersize))          {            MessageBox.Show("缓冲区填写错误!请重新填写!");            return;          }          if (!int.TryParse(port.Text, out m_port))          {            MessageBox.Show("端口填写错误,请重新填写!");            return;          }          if (count > 62000)          {            MessageBox.Show("连接数量最大62000!请重新填写!");            return;          }          if (buffersize > 1024)          {            MessageBox.Show("缓冲区最大1024!请重新填写!");            return;          }          if (m_port > 65535)          {            MessageBox.Show("端口填写错误,请重新填写!");            return;          }          startButton = true;          startBtn.Text = "停止监听";          serverSocket = new IocpServer(count, buffersize);          serverSocket.Start(IPAddr, m_port);          serverSocket.OnClientRead = new EventHandler<SocketAsyncEventArgs>(clientread);          serverSocket.OnClientDisconnect = new EventHandler<SocketAsyncEventArgs>(disconnect);          serverSocket.mainForm = this;          SetListBox("开始监听");          timer2.Enabled = true;        }        else        {          startButton = false;          startBtn.Text = "开始监听";          serverSocket.Stop();          SetListBox("停止监听");          timer2.Enabled = false;        }      }      catch (Exception ex)      {        //SetListBox("异常来自于 Form1.cs StartServer "  ex.ToString());      }    }    private void ipComboBox_SelectedIndexChanged(object sender, EventArgs e)    {      IPAddr = ipComboBox.SelectedItem.ToString();    }    private void qosComboBox_SelectedIndexChanged(object sender, EventArgs e)    {      string str = qosComboBox.SelectedItem.ToString();      str = str.Substring(0, 1);      if (!int.TryParse(str, out qos))        MessageBox.Show("值错误!");    }    private void cleanListBox_Click(object sender, EventArgs e)    {      listBox1.Items.Clear();    }    private void peekTopic_Click(object sender, EventArgs e)    {      mqttServer.PeekTopic();    }    //单独发送给某个主题,或者群发消息    private void sendBtn_Click(object sender, EventArgs e)    {      if (topicTextBox.Text == "")      {        SetListBox("主题为空");        return;      }      if (sendTextBox.Text == "")      {        SetListBox("消息为空");        return;      }      byte[] data = System.Text.Encoding.UTF8.GetBytes(sendTextBox.Text);      if (startButton)      {        if (mqttServer.ActiveMqttClientCount == 0)        {          SetListBox("没有用户");          return;        }        if (multiSendCheckBox.Checked)//群发        {          qosComboBox.SelectedIndex = 0;//设置Qos=0          mqttServer.ToAllMqttClientPublishMsg(topicTextBox.Text, data, qos);          SetListBox("群发完成");          return;        }        mqttServer.ToPublishSubscribeTopicMsg(topicTextBox.Text, data, qos);      }      else      {        SetListBox("请开启服务器");      }    }    //搜索用户    private void findBtn_Click(object sender, EventArgs e)    {      if (inUser.Text == "")      {        SetListBox("请输入用户名");        return;      }      mqttServer.FindMqttClient(inUser.Text);    }    //剔除一个用户    private void delBtn_Click(object sender, EventArgs e)    {      if (inUser.Text == "")      {        SetListBox("请输入用户名");        return;      }      mqttServer.DelMqttClienToUserName(inUser.Text);    }    //关闭窗口时出发的事件    private void Form1_FormClosed(object sender, FormClosedEventArgs e)    {      try      {        mqttServer.PubThreadClose();      }      catch (Exception ex)      {        MessageBox.Show(ex.Message.ToString());      }    }    public enum stateFlag    {      None = 0,      Add,      Remove,      Clear    }   }  public class obj1  {    public string username;    public string connectid;    public override string ToString()    {      return username;    }  }}
using System;using System.Collections;using System.Collections.Generic;using System.Linq;using System.ComponentModel;using System.Data;using System.Drawing;using System.Text;using System.Windows.Forms;using System.Net.Sockets;using System.Threading;using System.Net;using System.IO;using LitJson;using AsyncIocpServer;using nMqtt.Messages;namespace nMqttServer{  public partial class Form1 : Form  {    //主窗体    public static Form1 mainform = null;    //server服务    public static IocpServer serverSocket;    public static MqttServer mqttServer = new MqttServer();    public string IPAddr;    public int maxRecePackIndex;    public int recePackIndex;    public int ipNum;    public int qos;    public bool isShowUser;    public bool startButton;    public delegate void SetListBoxCallBack(string str);    public SetListBoxCallBack setlistboxcallback;    public static void Dubeg(string str)//跨线程调用    {      Form1.mainform.Invoke(Form1.mainform.setlistboxcallback, str);    }    public void SetListBox(string str)    {      if (listBox1.Items.Count > 100)        listBox1.Items.Clear();      listBox1.Items.Insert(0, str);      listBox1.SelectedIndex = 0;    }    //获取主窗体    public static Form1 GetMainForm()    {      if (mainform != null)        return mainform;      return null;    }    /// <summary>    /// 验证IP地址是否有效    /// </summary>    /// <param name="ip"></param>    /// <returns></returns>    private static bool IsCorrentIP(string ip)    {      string pattrn = @"(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])";      if (System.Text.RegularExpressions.Regex.IsMatch(ip, pattrn))        return true;      else        return false;    }    public Form1()    {      InitializeComponent();      setlistboxcallback = new SetListBoxCallBack(SetListBox);      Form1.mainform = this;      isShowUser = false;      startButton = false;      qosComboBox.SelectedIndex = 0;      ipNum = 0;      mqttServer.ReaderRetainMessages();      IPAddress[] addressList = Dns.GetHostEntry(Environment.MachineName).AddressList;      foreach (IPAddress ip in addressList)      {        if (IsCorrentIP(ip.ToString()))        {          ipNum ;          ipComboBox.Items.Add(ip);          //SetListBox(String.Format("addressList {0} ", ip));          if (ipComboBox.Items.Count > 0)            ipComboBox.SelectedIndex = ipComboBox.Items.Count - 1;        }      }      if (ipNum > 0)        msg.Text = "有多个IP!";    }    // 断开客户端事件    public void disconnect(object sender, SocketAsyncEventArgs e)    {      AsyncUserToken userToken = e.UserToken as AsyncUserToken;      try      {        mqttServer.RemoveIdFromSubscribeTopic(userToken.ConnectionId);        mqttServer.DelMqttClientToConnectionId(userToken.ConnectionId);      }      catch (Exception ex)      {      }    }    // 接收到数据事件    public void clientread(object sender, SocketAsyncEventArgs e)    {      AsyncUserToken userToken = e.UserToken as AsyncUserToken;      try      {        byte[] destArray = new byte[e.BytesTransferred];// 目的字节数组        Array.Copy(userToken.ReceiveBuffer, e.Offset, destArray, 0, e.BytesTransferred);        //string str = System.Text.Encoding.UTF8.GetString(destArray);        if (destArray[0] == 0xAB && destArray[2] == 0xAB)        {          //自定义协议        }        else        {          //MQTT协议解码          //mqttServer.DecodeMessage(destArray, userToken.ConnectionId);//解码消息        }        recePackIndex ;      }      catch (Exception ex)       {        MessageBox.Show(ex.ToString());      }    }    private void timer1_Tick(object sender, EventArgs e)    {      topicNum.Text = mqttServer.SubscribeTopicCount.ToString();      userNum.Text = mqttServer.ActiveMqttClientCount.ToString();      if (serverSocket != null)        connCnt.Text = serverSocket.ClientCount.ToString();    }    private void timer2_Tick(object sender, EventArgs e)    {      if (recePackIndex > maxRecePackIndex)        maxRecePackIndex = recePackIndex;      packIndex.Text = maxRecePackIndex.ToString();      msg.Text = "每秒接收 "  recePackIndex  " 个数据包";      recePackIndex = 0;    }    private void startBtn_Click(object sender, EventArgs e)    {      try      {        int m_port = 0;        int count = 0;        int buffersize = 0;        if (startButton == false)        {          if (!int.TryParse(maxNum.Text, out count))          {            MessageBox.Show("连接数量错误!请重新填写!");            return;          }          if (!int.TryParse(buffer.Text, out buffersize))          {            MessageBox.Show("缓冲区填写错误!请重新填写!");            return;          }          if (!int.TryParse(port.Text, out m_port))          {            MessageBox.Show("端口填写错误,请重新填写!");            return;          }          if (count > 62000)          {            MessageBox.Show("连接数量最大62000!请重新填写!");            return;          }          if (buffersize > 1024)          {            MessageBox.Show("缓冲区最大1024!请重新填写!");            return;          }          if (m_port > 65535)          {            MessageBox.Show("端口填写错误,请重新填写!");            return;          }          startButton = true;          startBtn.Text = "停止监听";          serverSocket = new IocpServer(count, buffersize);          serverSocket.Start(IPAddr, m_port);          serverSocket.OnClientRead = new EventHandler<SocketAsyncEventArgs>(clientread);          serverSocket.OnClientDisconnect = new EventHandler<SocketAsyncEventArgs>(disconnect);          serverSocket.mainForm = this;          SetListBox("开始监听");          timer2.Enabled = true;        }        else        {          startButton = false;          startBtn.Text = "开始监听";          serverSocket.Stop();          SetListBox("停止监听");          timer2.Enabled = false;        }      }      catch (Exception ex)      {        //SetListBox("异常来自于 Form1.cs StartServer "  ex.ToString());      }    }    private void ipComboBox_SelectedIndexChanged(object sender, EventArgs e)    {      IPAddr = ipComboBox.SelectedItem.ToString();    }    private void qosComboBox_SelectedIndexChanged(object sender, EventArgs e)    {      string str = qosComboBox.SelectedItem.ToString();      str = str.Substring(0, 1);      if (!int.TryParse(str, out qos))        MessageBox.Show("值错误!");    }    private void cleanListBox_Click(object sender, EventArgs e)    {      listBox1.Items.Clear();    }    private void peekTopic_Click(object sender, EventArgs e)    {      mqttServer.PeekTopic();    }    //单独发送给某个主题,或者群发消息    private void sendBtn_Click(object sender, EventArgs e)    {      if (topicTextBox.Text == "")      {        SetListBox("主题为空");        return;      }      if (sendTextBox.Text == "")      {        SetListBox("消息为空");        return;      }      byte[] data = System.Text.Encoding.UTF8.GetBytes(sendTextBox.Text);      if (startButton)      {        if (mqttServer.ActiveMqttClientCount == 0)        {          SetListBox("没有用户");          return;        }        if (multiSendCheckBox.Checked)//群发        {          qosComboBox.SelectedIndex = 0;//设置Qos=0          mqttServer.ToAllMqttClientPublishMsg(topicTextBox.Text, data, qos);          SetListBox("群发完成");          return;        }        mqttServer.ToPublishSubscribeTopicMsg(topicTextBox.Text, data, qos);      }      else      {        SetListBox("请开启服务器");      }    }    //搜索用户    private void findBtn_Click(object sender, EventArgs e)    {      if (inUser.Text == "")      {        SetListBox("请输入用户名");        return;      }      mqttServer.FindMqttClient(inUser.Text);    }    //剔除一个用户    private void delBtn_Click(object sender, EventArgs e)    {      if (inUser.Text == "")      {        SetListBox("请输入用户名");        return;      }      mqttServer.DelMqttClienToUserName(inUser.Text);    }    //关闭窗口时出发的事件    private void Form1_FormClosed(object sender, FormClosedEventArgs e)    {      try      {        mqttServer.PubThreadClose();      }      catch (Exception ex)      {        MessageBox.Show(ex.Message.ToString());      }    }    public enum stateFlag    {      None = 0,      Add,      Remove,      Clear    }   }  public class obj1  {    public string username;    public string connectid;    public override string ToString()    {      return username;    }  }}

 
  
					
				
评论