本帖最后由 jiangzhiwei 于 2018-12-6 14:46 编辑
前两篇介绍了如何去连接MQTT,以及MQTT与TCP协议的关系
本篇讲解一下如何在阿里云上搭建mqtt
首先进入阿里云官方
https://www.aliyun.com/?utm_content=se_1000301881
点击云服务器ECS 进入购买界面 怎么购买就不说了吧
如果是学生的话可进入 阿里云翼计划 (本人就是学生购买的优惠9.5一个月)
https://promotion.aliyun.com/ntms/act/campus2018.html
实名学生验证过后可以优惠很多
购买之后右上角进入控制台管理你的云服务器
进入点击 右侧的 “实例”管理 云服务器 ,
得到你的公网ip,,然后远程桌面连接
/****下面来介绍搭建MQTT服务器**********************/
http://activemq.apache.org/apollo/download.html 进入下载
根据自己购买的服务的系统选择版本类型。我的是windows ,所以选择第二个,下载下载好后解压
注意 在配置apollo 之前,还需配置java 环境变量 前往
https://www.oracle.com/technetwork/java/javase/downloads/index.html
自行下载sdk 并配置环境变量
https://www.cnblogs.com/smyhvae/p/3788534.html 环境变量配置
配置完java环境变量之后,将下载的apollo 解压到电脑上,打开win+R 运行cmd
先进入所在目录的bin文件夹
之后再输入 apollo create mybroker 即创建一个名为mybroker 的文件夹
创建成功会出现以下提示并多了一个mybroker的文件夹
之后 用命令行进入该文件夹 下的 bin 目录 ,
最后运行mybroker.cmd run
请留意箭头处所指的IP和端口号
这样就配置成功了 打开
后台Web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/ 登录服务器后,如果MQTT服务器有客户端连接,后台会显示如下
最后在阿里云的安全组里添加 入站规则 添加自己刚刚cmd命令行显示的所指的TCP的 端口号 一般的话是61613
最后连接测试成功
/********************************下面我来贴上我的AT指令开发MQTT的代码*******************************************************/
不过一切前提是你能建立TCP连接
主要分为2大块
一,建立TCP客户端连接
二,MQTT连接包封装函数 这是我从找到的32 源码的上面移植的封装函数,连接包的格式上有一个地方小错误,这边已经改正了,测试能正常连接
这边传感器没有接线上去,但是数据传输成功了!
kittenblock中小学创客名师推荐的图形化编程软件
#include <LobotServoController.h>
/*
*1.DHT11接Arduino:vcc连3.3v,DATA连A0,GND连GND
*2.ESP8266连Arduino:TX连0,RX连1,EN和3V3连3.3v,GND连GND
*/
#define DEBUG
#define _baudrate 9600
// USB TTL 检错用
#define _rxpin 2
#define _txpin 3
#define u8 unsigned char
#define u16 unsigned short
#define u32 unsigned int
SoftwareSerial debug( _rxpin, _txpin ); // RX, TX
// DHT温湿度传感器
#define dht_dpin A0
byte bGlobalErr;
byte dht_dat[5];
// ESP8266 Wifi模块
/*#define SSID "ONEPLUS3"
#define PASS "12345678"
#define IP "192.168.43.125"*/
#define SSID "POP"
#define PASS "thxy8888"
#define IP "47.93.19.134"
u8 count;
int flame=A5;//定义火焰接口为模拟5 接口
int qiti=A3;
int val=0;//定义数字变量
int cal=0;
u8 mqtt_msg[200]={0};
// 使用 GET格式
String GET = "\rRead sensor: OK\n";
boolean isconnect=false;
boolean mqtt_connect();
void serial_read();
void setup() {
InitDHT();
Serial.begin( _baudrate );
debug.begin( _baudrate );
while (!Serial) {
}
Serial.println("ok");
delay(300);
debug.listen();
sendDebug("AT");
delay(500);
if(debug.find("OK"))
{
Serial.println("RECEIVED: OK\nMQTT ready to conncet!");
connectWiFi();
Serial.println("WIFI OK\n is connect");
debug.listen();
}
while(Serial.read()>= 0){}
while(debug.read()>= 0){}
delay(700);
mqtt_connect();
while(debug.read()>= 0){}
}
void loop() {
debug.listen();
serial_read();
if(!debug.available()){
ReadDHT();
String TT,HH;
HH = String(dht_dat[0])+'.'+String(dht_dat[1]);
TT = String(dht_dat[2])+'.'+String(dht_dat[3]);
val=analogRead(flame);//读取火焰传感器的模拟值
cal=analogRead(qiti);
Serial.println(val);
Serial.println(cal);//输出模拟值,并将其打印出来
serial_read();
updateDHT11( TT, HH );
while(debug.read()>= 0){}
debug.listen();
delay(500);
count++;
if(count>=3){
u8 ping[2];
ping[0]=0xc0;
ping[1]=0x00;
if(ESP_SendTcp(ping,2))//·心跳包
{
while(debug.read()>= 0){}
Serial.println("MQTT_Ping ..Keep Alive.");
}
}
}
while(debug.read()>= 0){}
debug.listen();
}
void updateDHT11( String T, String H )
{
while(debug.read()>= 0){}
debug.listen();
// 使用AT指令上传
/* String cmd = "AT+CIPSTART=\"TCP\",\"";
cmd += IP;
cmd += "\",8080"; // 端口改自己的!!!!1
sendDebug(cmd);*/
delay(1000);
/* if( Serial.find( "Error" ) )
{
Serial.print( "RECEIVED: Error\nExit1" );
return;
}*/
String cmd="";
u16 len;
u16 i;
cmd ="Temperature="+T+"oC Humidity="+H+"%"+ " Flame="+val+ " Qiti: "+cal+"\r\n" ;
char mg[100];
strcpy(mg,cmd.c_str());
Serial.print( "Begin send chuan gan qi" );
len=mqtt_publish_message(mqtt_msg,"mydata",mg,0);
// char string[2]={0};
// char string2[2]={0};
//itoa(len,string,10);
// Serial.println(string);
// Serial.println(string2);
//Serial.print(mqtt_msg);
if(ESP_SendTcp(mqtt_msg,len))
Serial.print("send data sucessfull");
/* Serial.print( "AT+CIPSEND=" );
Serial.println( cmd.length());
if(Serial.find( ">" ) )
{
debug.print(">");
debug.print(cmd);
Serial.print(cmd);
}
else
{
sendDebug( "AT+CIPCLOSE" );
}
if( Serial.find("OK") )
{
debug.println( "RECEIVED: OK" );
}
else
{
debug.println( "RECEIVED: Error\nExit2" );
}*/
}
void sendDebug(String cmd)
{
// 传到 USB TTL
// debug.print("SEND: ");
debug.println(cmd);
// Serial.println(cmd);
}
boolean connectWiFi()
{
// 连接到Wifi
while(Serial.read()>= 0){}
while(debug.read()>= 0){}
debug.println("AT+RST");
delay(1000);
//while(debug.read()>= 0){}
// debug.listen();
debug.println("AT+CWMODE=1");
delay(500);
String cmd="AT+CWJAP=\"";
cmd+=SSID;
cmd+="\",\"";
cmd+=PASS;
cmd+="\"";
while(debug.read()>= 0){}
debug.listen();
sendDebug(cmd);
delay(3000);
while(!debug.find("OK"))
{
Serial.println(.);
// return true;
}
cmd = "AT+CIPMUX=0";
while(debug.read()>= 0){}
debug.listen();
sendDebug( cmd );
if(debug.find("OK"))
{
Serial.println("one OK");
// return true;
}
else if( debug.find( "Error") )
{
Serial.print( "RECEIVED: Error" );
return false;
}
}
void InitDHT(){
// 初始化DHT
pinMode(dht_dpin,OUTPUT);
digitalWrite(dht_dpin,HIGH);
}
void ReadDHT(){
bGlobalErr=0;
byte dht_in;
byte i;
digitalWrite(dht_dpin,LOW);
delay(20);
digitalWrite(dht_dpin,HIGH);
delayMicroseconds(40);
pinMode(dht_dpin,INPUT);
dht_in=digitalRead(dht_dpin);
if(dht_in){
bGlobalErr=1;
return;
}
delayMicroseconds(80);
dht_in=digitalRead(dht_dpin);
if(!dht_in){
bGlobalErr=2;
return;
}
delayMicroseconds(80);
for (i=0; i<5; i++)
dht_dat = read_dht_dat();
pinMode(dht_dpin,OUTPUT);
digitalWrite(dht_dpin,HIGH);
byte dht_check_sum =
dht_dat[0]+dht_dat[1]+dht_dat[2]+dht_dat[3];
if(dht_dat[4]!= dht_check_sum)
{bGlobalErr=3;}
};
byte read_dht_dat(){
byte i = 0;
byte result=0;
for(i=0; i< 8; i++){
while(digitalRead(dht_dpin)==LOW);
delayMicroseconds(30);
if (digitalRead(dht_dpin)==HIGH)
result |=(1<<(7-i));
while (digitalRead(dht_dpin)==HIGH);
}
return result;
}
/*************MQTT连接包*******************/
u8 mqtt_connect_message(u8 *mqtt_message,char *client_id,char *username,char *password)
{
Serial.println(" GO TO CON");
u8 client_id_length = strlen(client_id);
u8 username_length = strlen(username);
u8 password_length = strlen(password);
u8 packetLen;
u8 i,baseIndex;
packetLen = 12 + 2 + client_id_length;
if(username_length > 0)
packetLen = packetLen + 2 + username_length;
if(password_length > 0)
packetLen = packetLen+ 2 + password_length;
mqtt_message[0] = 16; //0x10 // MQTT Message Type CONNECT
mqtt_message[1] = packetLen - 2; //Ê£Ó೤¶È(²»°üÀ¨¹Ì¶¨Í·²¿)
baseIndex = 2;
if(packetLen > 127)
{
mqtt_message[2] = 1; //packetLen/127;
baseIndex = 3;
}
mqtt_message[baseIndex++] = 0; // Protocol Name Length MSB
mqtt_message[baseIndex++] = 4; // Protocol Name Length LSB
mqtt_message[baseIndex++] = 77; // ASCII Code for M
mqtt_message[baseIndex++] = 81; // ASCII Code for Q
mqtt_message[baseIndex++] = 84; // ASCII Code for T
mqtt_message[baseIndex++] = 84; // ASCII Code for T
mqtt_message[baseIndex++] = 4; // MQTT Protocol version = 4
mqtt_message[baseIndex++] = 194; // conn flags
mqtt_message[baseIndex++] = 0; // Keep-alive Time Length MSB
mqtt_message[baseIndex++] = 60; // Keep-alive Time Length LSB
mqtt_message[baseIndex++] = (0xff00&client_id_length)>>8;// Client ID length MSB
mqtt_message[baseIndex++] = 0xff&client_id_length; // Client ID length LSB
// Client ID
for(i = 0; i < client_id_length; i++)
{
mqtt_message[baseIndex + i] = client_id;
}
baseIndex = baseIndex+client_id_length;
if(username_length > 0)
{
//username
mqtt_message[baseIndex++] = (0xff00&username_length)>>8;//username length MSB
mqtt_message[baseIndex++] = 0xff&username_length; //username length LSB
for(i = 0; i < username_length ; i++)
{
mqtt_message[baseIndex + i] = username;
}
baseIndex = baseIndex + username_length;
}
if(password_length > 0)
{
//password
mqtt_message[baseIndex++] = (0xff00&password_length)>>8;//password length MSB
mqtt_message[baseIndex++] = 0xff&password_length; //password length LSB
for(i = 0; i < password_length ; i++)
{
mqtt_message[baseIndex + i] = password;
}
baseIndex += password_length;
}
return baseIndex;
}
/*********MQTT发布消息包******************************/
u8 mqtt_publish_message(u8 *mqtt_message, char * topic, char * message, u8 qos)
{
u16 topic_length = strlen(topic);
u16 message_length = strlen(message);
u16 i,index=0;
static u16 id=0;
mqtt_message[index++] = 48; //0x30 // MQTT Message Type PUBLISH
if(qos)
mqtt_message[index++] = 2 + topic_length + 2 + message_length;
else
mqtt_message[index++] = 2 + topic_length + message_length; // Remaining length
mqtt_message[index++] = (0xff00&topic_length)>>8;
mqtt_message[index++] = 0xff&topic_length;
// Topic
for(i = 0; i < topic_length; i++)
{
mqtt_message[index + i] = topic;
}
index += topic_length;
if(qos)
{
mqtt_message[index++] = (0xff00&id)>>8;
mqtt_message[index++] = 0xff&id;
id++;
}
// Message
for(i = 0; i < message_length; i++)
{
mqtt_message[index + i] = message;
}
index += message_length;
return index;
}
/*****************发布确认包****************************/
u8 mqtt_puback_message(u8 *mqtt_message)
{
static u16 id=0;
mqtt_message[0] = 64; //0x40 //ÏûÏ¢ÀàÐͺͱêÖ¾ PUBACK
mqtt_message[1] = 2; //Ê£Ó೤¶È(²»°üÀ¨¹Ì¶¨Í·²¿)
mqtt_message[2] = (0xff00&id)>>8; //ÏûÏ¢±êʶ·û
mqtt_message[3] = 0xff&id; //ÏûÏ¢±êʶ·û
id++;
return 4;
}
/*********构建订阅请求*********/
u16 mqtt_subscribe_message(u8 *mqtt_message,char *topic,u8 qos,u8 whether)
{
u16 topic_len = strlen(topic);
u16 i,index = 0;
static u16 id=0;
id++;
if(whether)
mqtt_message[index++] = 130;
else
mqtt_message[index++] = 162;
mqtt_message[index++] = topic_len + 5;
mqtt_message[index++] = (0xff00&id)>>8;
mqtt_message[index++] = 0xff&id;
mqtt_message[index++] = (0xff00&topic_len)>>8;
mqtt_message[index++] = 0xff&topic_len;
for (i = 0;i < topic_len; i++)
{
mqtt_message[index + i] = topic;
}
index += topic_len;
if(whether)
{
mqtt_message[index] = qos;//QoS¼¶±ð
index++;
}
return index;
}
/***********构建MQTT请求PING包********************/
u8 mqtt_ping_message(u8 *mqtt_message)
{
mqtt_message[0] = 192; //0xC0 //ÏûÏ¢ÀàÐͺͱêÖ¾ PING
mqtt_message[1] = 0; //Ê£Ó೤¶È(²»°üÀ¨¹Ì¶¨Í·²¿)
return 2;
}
/************构建断开包******************/
u8 mqtt_disconnect_message(u8 *mqtt_message)
{
mqtt_message[0] = 224; //0xE0 //ÏûÏ¢ÀàÐͺͱêÖ¾ DISCONNECT
mqtt_message[1] = 0; //Ê£Ó೤¶È(²»°üÀ¨¹Ì¶¨Í·²¿)
return 2;
}
/***************单路连接****************/
u8 ESP_SendTcp(u8 *data,u16 len)
{
while(debug.read()>= 0){}
debug.listen();
debug.print("AT+CIPSEND=");
debug.println(len);
Serial.println("len:"+len);
Serial.println("...ing2................");
delay(1000);
if(debug.find( ">" ) )
{
Serial.println("is come >>>>");
String mqtt_data="";
u16 i;
for(i=0;i<len;i++)
{
//mqtt_data +=data;
debug.write(data);
Serial.write(data);
}
debug.listen();
debug.println();
delay(500);
//debug.println();
}
else
{
// sendDebug( "AT+CIPCLOSE" );
Serial.println("send error");
return false;
}
if( debug.find("OK") )
{
Serial.println("send succfful!!");
return true;
}
else
{
Serial.println("send wrong");
return false;
}
}
boolean mqtt_connect(){
Serial.println("is connect mqtt");
u16 len,i;
String cmd = "AT+CIPSTART=\"TCP\",\"";
cmd += IP;
cmd += "\",1883";
while(debug.read()>= 0){}
debug.listen();
sendDebug(cmd);
delay(1500);
if( debug.find( "OK" ) )
{
Serial.print( "RECEIVED: OK\nExit1" );
//return;
}
while(debug.read()>= 0){}
debug.listen();
len=mqtt_connect_message(mqtt_msg,"123456","admin","password");//id=123456,用户名和密码''
if(ESP_SendTcp(mqtt_msg,len))
{
Serial.print( "RECEIVED: MQTT OK!\nMQTT connect sussfull" );
delay(1000);
while(debug.read()>= 0){}
isconnect =true;
}else {isconnect= false;}
Serial.print( "Begin to dingyue" );
len=mqtt_subscribe_message(mqtt_msg,"test",0,1);
Serial.println("...ing................");
if(ESP_SendTcp(mqtt_msg,len))
Serial.print("dingyue sucessfull");
delay(1000);
while(debug.read()>= 0){}
return isconnect;
}
void serial_read()
{
if (debug.available()>0) {
delay(100);
if(debug.find('P')){
debug.readStringUntil(':');
String msg="";
msg=debug.readStringUntil('!');
Serial.println(msg);
while(debug.read()>= 0){}
}
}
while(debug.read()>= 0){}
}
qq群 439237650
qq 804034739 有问题可以留言或者私戳
|