mqtt_client.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. /*
  2. * Tencent is pleased to support the open source community by making IoT Hub
  3. available.
  4. * Copyright (C) 2018-2020 THL A29 Limited, a Tencent company. All rights
  5. reserved.
  6. * Licensed under the MIT License (the "License"); you may not use this file
  7. except in
  8. * compliance with the License. You may obtain a copy of the License at
  9. * http://opensource.org/licenses/MIT
  10. * Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is
  12. * distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. KIND,
  14. * either express or implied. See the License for the specific language
  15. governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. #ifndef IOT_MQTT_CLIENT_H_
  20. #define IOT_MQTT_CLIENT_H_
  21. #ifdef __cplusplus
  22. extern "C" {
  23. #endif
  24. #include <stdbool.h>
  25. #include <stddef.h>
  26. #include <stdint.h>
  27. #include "mqtt_client_net.h"
  28. #include "qcloud_iot_common.h"
  29. #include "qcloud_iot_export.h"
  30. #include "qcloud_iot_import.h"
  31. #include "utils_list.h"
  32. #include "utils_param_check.h"
  33. #include "utils_timer.h"
  34. /* packet id, random from [1 - 65536] */
  35. #define MAX_PACKET_ID (65535)
  36. /* Max size of conn Id */
  37. #define MAX_CONN_ID_LEN (6)
  38. /* Max number of topic subscribed */
  39. #define MAX_MESSAGE_HANDLERS (10)
  40. /* Max number in repub list */
  41. #define MAX_REPUB_NUM (20)
  42. /* Minimal wait interval when reconnect */
  43. #define MIN_RECONNECT_WAIT_INTERVAL (1000)
  44. /* Minimal MQTT timeout value */
  45. #define MIN_COMMAND_TIMEOUT (500)
  46. /* Maxmal MQTT timeout value */
  47. #define MAX_COMMAND_TIMEOUT (20000)
  48. /* Max size of a topic name */
  49. #define MAX_SIZE_OF_CLOUD_TOPIC ((MAX_SIZE_OF_DEVICE_NAME) + (MAX_SIZE_OF_PRODUCT_ID) + 64 + 6)
  50. /* minimal TLS handshaking timeout value (unit: ms) */
  51. #define QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT (5 * 1000)
  52. #define MQTT_RMDUP_MSG_ENABLED
  53. /**
  54. * @brief MQTT Message Type
  55. */
  56. typedef enum msgTypes {
  57. RESERVED = 0, // Reserved
  58. CONNECT = 1, // Client request to connect to Server
  59. CONNACK = 2, // Connect Acknowledgment
  60. PUBLISH = 3, // Publish message
  61. PUBACK = 4, // Publish Acknowledgment
  62. PUBREC = 5, // Publish Received
  63. PUBREL = 6, // Publish Release
  64. PUBCOMP = 7, // Publish Complete
  65. SUBSCRIBE = 8, // Client Subscribe request
  66. SUBACK = 9, // Subscribe Acknowledgment
  67. UNSUBSCRIBE = 10, // Client Unsubscribe request
  68. UNSUBACK = 11, // Unsubscribe Acknowledgment
  69. PINGREQ = 12, // PING Request
  70. PINGRESP = 13, // PING Response
  71. DISCONNECT = 14 // Client is Disconnecting
  72. } MessageTypes;
  73. typedef enum { NOTCONNECTED = 0, CONNECTED = 1 } ConnStatus;
  74. /**
  75. * MQTT byte 1: fixed header
  76. * bits |7654: Message Type | 3:DUP flag | 21:QoS level | 0:RETAIN |
  77. */
  78. #define MQTT_HEADER_TYPE_SHIFT 0x04
  79. #define MQTT_HEADER_TYPE_MASK 0xF0
  80. #define MQTT_HEADER_DUP_SHIFT 0x03
  81. #define MQTT_HEADER_DUP_MASK 0x08
  82. #define MQTT_HEADER_QOS_SHIFT 0x01
  83. #define MQTT_HEADER_QOS_MASK 0x06
  84. #define MQTT_HEADER_RETAIN_MASK 0x01
  85. /**
  86. * @brief MQTT will options sturcture
  87. *
  88. */
  89. typedef struct {
  90. char struct_id[4]; // The eyecatcher for this structure. must be MQTW
  91. uint8_t struct_version; // struct version = 0
  92. char * topic_name;
  93. char * message;
  94. uint8_t retained;
  95. QoS qos;
  96. } WillOptions;
  97. /**
  98. * default MQTT will options
  99. */
  100. #define DEFAULT_WILL_OPTIONS \
  101. { \
  102. {'M', 'Q', 'T', 'W'}, 0, NULL, NULL, 0, QOS0 \
  103. }
  104. /**
  105. * @brief define MQTT connect parameters structure
  106. *
  107. */
  108. typedef struct {
  109. char *client_id; // unique client id
  110. char *username; // user name
  111. char *password; // passwrod
  112. char conn_id[MAX_CONN_ID_LEN];
  113. char struct_id[4]; // The eyecatcher for this structure. must be MQTC.
  114. uint8_t struct_version; // struct version = 0
  115. uint8_t mqtt_version; // MQTT protocol version: 4 = 3.1.1
  116. uint16_t keep_alive_interval; // keep alive interval, unit: second
  117. uint8_t clean_session; // flag of clean session, refer to MQTT spec 3.1.2.4
  118. uint8_t auto_connect_enable; // enable auto connection or not
  119. #ifdef AUTH_WITH_NOTLS
  120. char *device_secret; // PSK
  121. int device_secret_len; // length of PSK
  122. #endif
  123. } MQTTConnectParams;
  124. /**
  125. * default value of MQTT connect parameters structure
  126. */
  127. #ifdef AUTH_WITH_NOTLS
  128. #define DEFAULT_MQTTCONNECT_PARAMS \
  129. { \
  130. NULL, NULL, NULL, {0}, {'M', 'Q', 'T', 'C'}, 0, 4, 240, 1, 1, NULL, 0 \
  131. }
  132. #else
  133. #define DEFAULT_MQTTCONNECT_PARAMS \
  134. { \
  135. NULL, NULL, NULL, {0}, {'M', 'Q', 'T', 'C'}, 0, 4, 240, 1, 1 \
  136. }
  137. #endif
  138. /**
  139. * @brief data structure for topic subscription handle
  140. */
  141. typedef struct SubTopicHandle {
  142. const char * topic_filter; // topic name, wildcard filter is supported
  143. OnMessageHandler message_handler; // callback when msg of this subscription arrives
  144. OnSubEventHandler sub_event_handler; // callback when event of this subscription happens
  145. void * handler_user_data; // user context for callback
  146. QoS qos; // QoS
  147. } SubTopicHandle;
  148. /**
  149. * @brief MQTT QCloud IoT Client structure
  150. */
  151. typedef struct Client {
  152. uint8_t is_connected;
  153. uint8_t was_manually_disconnected;
  154. uint8_t is_ping_outstanding; // 1 = ping request is sent while ping response
  155. // not arrived yet
  156. uint16_t next_packet_id; // MQTT random packet id
  157. uint32_t command_timeout_ms; // MQTT command timeout, unit:ms
  158. uint32_t current_reconnect_wait_interval; // unit:ms
  159. uint32_t counter_network_disconnected; // number of disconnection
  160. size_t write_buf_size; // size of MQTT write buffer
  161. size_t read_buf_size; // size of MQTT read buffer
  162. unsigned char write_buf[QCLOUD_IOT_MQTT_TX_BUF_LEN]; // MQTT write buffer
  163. unsigned char read_buf[QCLOUD_IOT_MQTT_RX_BUF_LEN]; // MQTT read buffer
  164. void *lock_generic; // mutex/lock for this client struture
  165. void *lock_write_buf; // mutex/lock for write buffer
  166. void *lock_list_pub; // mutex/lock for puback waiting list
  167. void *lock_list_sub; // mutex/lock for suback waiting list
  168. List *list_pub_wait_ack; // puback waiting list
  169. List *list_sub_wait_ack; // suback waiting list
  170. MQTTEventHandler event_handle; // callback for MQTT event
  171. MQTTConnectParams options; // handle to connection parameters
  172. qcloud_Network network_stack; // MQTT network stack
  173. Timer ping_timer; // MQTT ping timer
  174. Timer reconnect_delay_timer; // MQTT reconnect delay timer
  175. SubTopicHandle sub_handles[MAX_MESSAGE_HANDLERS]; // subscription handle array
  176. char host_addr[HOST_STR_LENGTH];
  177. #ifdef AUTH_MODE_CERT
  178. char cert_file_path[FILE_PATH_MAX_LEN]; // full path of device cert file
  179. char key_file_path[FILE_PATH_MAX_LEN]; // full path of device key file
  180. #else
  181. unsigned char psk_decode[DECODE_PSK_LENGTH];
  182. #endif
  183. #ifdef MQTT_RMDUP_MSG_ENABLED
  184. #define MQTT_MAX_REPEAT_BUF_LEN 10
  185. uint16_t repeat_packet_id_buf[MQTT_MAX_REPEAT_BUF_LEN];
  186. unsigned int current_packet_id_cnt;
  187. #endif
  188. #ifdef MULTITHREAD_ENABLED
  189. bool yield_thread_running;
  190. int yield_thread_exit_code;
  191. #endif
  192. } Qcloud_IoT_Client;
  193. /**
  194. * @brief MQTT protocol version
  195. */
  196. typedef enum { MQTT_3_1_1 = 4 } MQTT_VERSION;
  197. typedef enum MQTT_NODE_STATE {
  198. MQTT_NODE_STATE_NORMANL = 0,
  199. MQTT_NODE_STATE_INVALID,
  200. } MQTTNodeState;
  201. /* topic publish info */
  202. typedef struct REPUBLISH_INFO {
  203. Timer pub_start_time; /* timer for puback waiting */
  204. MQTTNodeState node_state; /* node state in wait list */
  205. uint16_t msg_id; /* packet id */
  206. uint32_t len; /* msg length */
  207. unsigned char *buf; /* msg buffer */
  208. } QcloudIotPubInfo;
  209. /* topic subscribe/unsubscribe info */
  210. typedef struct SUBSCRIBE_INFO {
  211. enum msgTypes type; /* type: sub or unsub */
  212. uint16_t msg_id; /* packet id */
  213. Timer sub_start_time; /* timer for suback waiting */
  214. MQTTNodeState node_state; /* node state in wait list */
  215. SubTopicHandle handler; /* handle of topic subscribed(unsubcribed) */
  216. uint16_t len; /* msg length */
  217. unsigned char *buf; /* msg buffer */
  218. } QcloudIotSubInfo;
  219. /**
  220. * @brief Init MQTT client
  221. *
  222. * @param pClient handle to MQTT client
  223. * @param pParams MQTT init parameters
  224. *
  225. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  226. */
  227. int qcloud_iot_mqtt_init(Qcloud_IoT_Client *pClient, MQTTInitParams *pParams);
  228. /**
  229. * @brief Release resources of MQTT client
  230. *
  231. * @param pClient handle to MQTT client
  232. *
  233. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  234. */
  235. int qcloud_iot_mqtt_fini(Qcloud_IoT_Client *pClient);
  236. /**
  237. * @brief Connect with MQTT server
  238. *
  239. * @param pClient handle to MQTT client
  240. * @param pParams MQTT connect parameters
  241. *
  242. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  243. */
  244. int qcloud_iot_mqtt_connect(Qcloud_IoT_Client *pClient, MQTTConnectParams *pParams);
  245. /**
  246. * @brief Reconnect with MQTT server and re-subscribe topics if reconnected
  247. *
  248. * @param pClient handle to MQTT client
  249. *
  250. * @return QCLOUD_RET_MQTT_RECONNECTED for success, or err code for failure
  251. */
  252. int qcloud_iot_mqtt_attempt_reconnect(Qcloud_IoT_Client *pClient);
  253. /**
  254. * @brief Disconnect with MQTT server
  255. *
  256. * @param pClient handle to MQTT client
  257. *
  258. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  259. */
  260. int qcloud_iot_mqtt_disconnect(Qcloud_IoT_Client *pClient);
  261. /**
  262. * @brief Publish MQTT message
  263. *
  264. * @param pClient handle to MQTT client
  265. * @param topicName MQTT topic name
  266. * @param pParams publish parameters
  267. *
  268. * @return packet id (>=0) when success, or err code (<0) for failure
  269. */
  270. int qcloud_iot_mqtt_publish(Qcloud_IoT_Client *pClient, char *topicName, PublishParams *pParams);
  271. /**
  272. * @brief Subscribe MQTT topic
  273. *
  274. * @param pClient handle to MQTT client
  275. * @param topicFilter MQTT topic filter
  276. * @param pParams subscribe parameters
  277. *
  278. * @return packet id (>=0) when success, or err code (<0) for failure
  279. */
  280. int qcloud_iot_mqtt_subscribe(Qcloud_IoT_Client *pClient, char *topicFilter, SubscribeParams *pParams);
  281. /**
  282. * @brief Re-subscribe MQTT topics
  283. *
  284. * @param pClient handle to MQTT client
  285. *
  286. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  287. */
  288. int qcloud_iot_mqtt_resubscribe(Qcloud_IoT_Client *pClient);
  289. /**
  290. * @brief Unsubscribe MQTT topic
  291. *
  292. * @param pClient handle to MQTT client
  293. * @param topicFilter MQTT topic filter
  294. *
  295. * @return packet id (>=0) when success, or err code (<0) for failure
  296. */
  297. int qcloud_iot_mqtt_unsubscribe(Qcloud_IoT_Client *pClient, char *topicFilter);
  298. /**
  299. * @brief check if MQTT topic has been subscribed or not
  300. *
  301. * @param pClient handle to MQTT client
  302. * @param topicFilter MQTT topic filter
  303. *
  304. * @return true when successfully subscribed, or false if not yet
  305. */
  306. bool qcloud_iot_mqtt_is_sub_ready(Qcloud_IoT_Client *pClient, char *topicFilter);
  307. /**
  308. * @brief Check connection and keep alive state, read/handle MQTT message in
  309. * synchronized way
  310. *
  311. * @param pClient handle to MQTT client
  312. * @param timeout_ms timeout value (unit: ms) for this operation
  313. *
  314. * @return QCLOUD_RET_SUCCESS when success, QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT
  315. * when try reconnecing, or err code for
  316. * failure
  317. */
  318. int qcloud_iot_mqtt_yield(Qcloud_IoT_Client *pClient, uint32_t timeout_ms);
  319. // wrapper for qcloud_iot_mqtt_yield for multi-thread mode
  320. int qcloud_iot_mqtt_yield_mt(Qcloud_IoT_Client *mqtt_client, uint32_t timeout_ms);
  321. /**
  322. * @brief Check if auto reconnect is enabled or not
  323. *
  324. * @param pClient handle to MQTT client
  325. * @return true if auto reconnect is enabled
  326. */
  327. bool qcloud_iot_mqtt_is_autoreconnect_enabled(Qcloud_IoT_Client *pClient);
  328. /**
  329. * @brief Set to enable auto reconnect or not
  330. *
  331. * @param pClient handle to MQTT client
  332. * @param value enable or disable
  333. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  334. */
  335. int qcloud_iot_mqtt_set_autoreconnect(Qcloud_IoT_Client *pClient, bool value);
  336. /**
  337. * @brief Get the count of disconnection
  338. *
  339. * @param pClient handle to MQTT client
  340. * @return count of disconnection
  341. */
  342. int qcloud_iot_mqtt_get_network_disconnected_count(Qcloud_IoT_Client *pClient);
  343. /**
  344. * @brief Set the count of disconnection
  345. *
  346. * @param pClient handle to MQTT client
  347. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  348. */
  349. int qcloud_iot_mqtt_reset_network_disconnected_count(Qcloud_IoT_Client *pClient);
  350. /**
  351. * @brief Get next packet id
  352. *
  353. * @param pClient
  354. * @return
  355. */
  356. uint16_t get_next_packet_id(Qcloud_IoT_Client *pClient);
  357. /**
  358. * @brief Get next conn id
  359. *
  360. * @param options
  361. * @return
  362. */
  363. void get_next_conn_id(char *conn_id);
  364. /**
  365. * @brief Init packet header
  366. * @param header
  367. * @param message_type
  368. * @param qos
  369. * @param dup
  370. * @param retained
  371. * @return
  372. */
  373. int mqtt_init_packet_header(unsigned char *header, MessageTypes message_type, QoS qos, uint8_t dup, uint8_t retained);
  374. /**
  375. * @brief Read and handle one MQTT msg/ack from server
  376. *
  377. * @param pClient
  378. * @param timer
  379. * @param packet_type
  380. * @param qos
  381. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  382. */
  383. int cycle_for_read(Qcloud_IoT_Client *pClient, Timer *timer, uint8_t *packet_type, QoS qos);
  384. /**
  385. * @brief Send the packet in buffer
  386. *
  387. * @param pClient
  388. * @param length
  389. * @param timer
  390. * @return
  391. */
  392. int send_mqtt_packet(Qcloud_IoT_Client *pClient, size_t length, Timer *timer);
  393. /**
  394. * @brief wait for a specific packet with timeout
  395. *
  396. * only used in single-threaded mode where one command at a time is in process
  397. *
  398. * @param pClient MQTT Client
  399. * @param packet_type MQTT packet type
  400. * @param timer timer with timeout
  401. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  402. */
  403. int wait_for_read(Qcloud_IoT_Client *pClient, uint8_t packet_type, Timer *timer, QoS qos);
  404. /**
  405. * @brief Set MQTT connection state
  406. *
  407. * @param pClient MQTT Client
  408. * @param connected 0: disconnected 1: connected
  409. * @return
  410. */
  411. void set_client_conn_state(Qcloud_IoT_Client *pClient, uint8_t connected);
  412. /**
  413. * @brief Get MQTT connection state
  414. *
  415. * @param pClient MQTT Client
  416. * @return 0: disconnected 1: connected
  417. */
  418. uint8_t get_client_conn_state(Qcloud_IoT_Client *pClient);
  419. /**
  420. * @brief Check Publish ACK waiting list, remove the node if PUBACK received or
  421. * timeout
  422. *
  423. * @param pClient MQTT client
  424. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  425. */
  426. int qcloud_iot_mqtt_pub_info_proc(Qcloud_IoT_Client *pClient);
  427. /**
  428. * @brief Check Subscribe ACK waiting list, remove the node if SUBACK received
  429. * or timeout
  430. *
  431. * @param pClient MQTT client
  432. * @return QCLOUD_RET_SUCCESS for success, or err code for failure
  433. */
  434. int qcloud_iot_mqtt_sub_info_proc(Qcloud_IoT_Client *pClient);
  435. int push_sub_info_to(Qcloud_IoT_Client *c, int len, unsigned short msgId, MessageTypes type, SubTopicHandle *handler,
  436. ListNode **node);
  437. int serialize_pub_ack_packet(unsigned char *buf, size_t buf_len, MessageTypes packet_type, uint8_t dup,
  438. uint16_t packet_id, uint32_t *serialized_len);
  439. int serialize_packet_with_zero_payload(unsigned char *buf, size_t buf_len, MessageTypes packetType,
  440. uint32_t *serialized_len);
  441. int deserialize_publish_packet(unsigned char *dup, QoS *qos, uint8_t *retained, uint16_t *packet_id, char **topicName,
  442. uint16_t *topicNameLen, unsigned char **payload, size_t *payload_len, unsigned char *buf,
  443. size_t buf_len);
  444. int deserialize_suback_packet(uint16_t *packet_id, uint32_t max_count, uint32_t *count, QoS *grantedQoSs,
  445. unsigned char *buf, size_t buf_len);
  446. int deserialize_unsuback_packet(uint16_t *packet_id, unsigned char *buf, size_t buf_len);
  447. int deserialize_ack_packet(uint8_t *packet_type, uint8_t *dup, uint16_t *packet_id, unsigned char *buf, size_t buf_len);
  448. #ifdef MQTT_RMDUP_MSG_ENABLED
  449. void reset_repeat_packet_id_buffer(Qcloud_IoT_Client *pClient);
  450. #endif
  451. size_t get_mqtt_packet_len(size_t rem_len);
  452. size_t mqtt_write_packet_rem_len(unsigned char *buf, uint32_t length);
  453. int mqtt_read_packet_rem_len_form_buf(unsigned char *buf, uint32_t *value, uint32_t *readBytesLen);
  454. uint16_t mqtt_read_uint16_t(unsigned char **pptr);
  455. unsigned char mqtt_read_char(unsigned char **pptr);
  456. void mqtt_write_char(unsigned char **pptr, unsigned char c);
  457. void mqtt_write_uint_16(unsigned char **pptr, uint16_t anInt);
  458. void mqtt_write_utf8_string(unsigned char **pptr, const char *string);
  459. #ifdef __cplusplus
  460. }
  461. #endif
  462. #endif // IOT_MQTT_CLIENT_H_