mqtt_client.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. /*
  2. * Tencent is pleased to support the open source community by making IoT Hub
  3. available.
  4. * Copyright (C) 2018-2020 THL A29 Limited, a Tencent company. All rights
  5. reserved.
  6. * Licensed under the MIT License (the "License"); you may not use this file
  7. except in
  8. * compliance with the License. You may obtain a copy of the License at
  9. * http://opensource.org/licenses/MIT
  10. * Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is
  12. * distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. KIND,
  14. * either express or implied. See the License for the specific language
  15. governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. #include <ctype.h>
  20. #include <stdbool.h>
  21. #include <stdint.h>
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <time.h>
  25. #ifdef __cplusplus
  26. extern "C" {
  27. #endif
  28. #include "lite-utils.h"
  29. #include "log_upload.h"
  30. #include "mqtt_client.h"
  31. #include "qcloud_iot_ca.h"
  32. #include "qcloud_iot_common.h"
  33. #include "qcloud_iot_device.h"
  34. #include "qcloud_iot_export.h"
  35. #include "qcloud_iot_import.h"
  36. #include "utils_base64.h"
  37. #include "utils_list.h"
  38. static uint16_t _get_random_start_packet_id(void)
  39. {
  40. srand((unsigned)HAL_GetTimeMs());
  41. return rand() % 65536 + 1;
  42. }
  43. // currently return a constant value
  44. int IOT_MQTT_GetErrCode(void)
  45. {
  46. return QCLOUD_ERR_FAILURE;
  47. }
  48. void *IOT_MQTT_Construct(MQTTInitParams *pParams)
  49. {
  50. POINTER_SANITY_CHECK(pParams, NULL);
  51. STRING_PTR_SANITY_CHECK(pParams->product_id, NULL);
  52. STRING_PTR_SANITY_CHECK(pParams->device_name, NULL);
  53. Qcloud_IoT_Client *mqtt_client = NULL;
  54. char *client_id = NULL;
  55. // create and init MQTTClient
  56. if ((mqtt_client = (Qcloud_IoT_Client *)HAL_Malloc(sizeof(Qcloud_IoT_Client))) == NULL) {
  57. Log_e("malloc MQTTClient failed");
  58. return NULL;
  59. }
  60. int rc = qcloud_iot_mqtt_init(mqtt_client, pParams);
  61. if (rc != QCLOUD_RET_SUCCESS) {
  62. Log_e("mqtt init failed: %d", rc);
  63. HAL_Free(mqtt_client);
  64. return NULL;
  65. }
  66. MQTTConnectParams connect_params = DEFAULT_MQTTCONNECT_PARAMS;
  67. client_id = HAL_Malloc(MAX_SIZE_OF_CLIENT_ID + 1);
  68. if (client_id == NULL) {
  69. Log_e("malloc client_id failed");
  70. HAL_Free(mqtt_client);
  71. return NULL;
  72. }
  73. memset(client_id, 0, MAX_SIZE_OF_CLIENT_ID + 1);
  74. HAL_Snprintf(client_id, MAX_SIZE_OF_CLIENT_ID, "%s%s", pParams->product_id, pParams->device_name);
  75. connect_params.client_id = client_id;
  76. // Upper limit of keep alive interval is (11.5 * 60) seconds
  77. connect_params.keep_alive_interval = Min(pParams->keep_alive_interval_ms / 1000, 690);
  78. connect_params.clean_session = pParams->clean_session;
  79. connect_params.auto_connect_enable = pParams->auto_connect_enable;
  80. #if defined(AUTH_WITH_NOTLS) && defined(AUTH_MODE_KEY)
  81. if (pParams->device_secret == NULL) {
  82. Log_e("Device secret is null!");
  83. qcloud_iot_mqtt_fini(mqtt_client);
  84. HAL_Free(mqtt_client);
  85. HAL_Free(client_id);
  86. return NULL;
  87. }
  88. size_t src_len = strlen(pParams->device_secret);
  89. size_t len;
  90. memset(mqtt_client->psk_decode, 0x00, DECODE_PSK_LENGTH);
  91. rc = qcloud_iot_utils_base64decode(mqtt_client->psk_decode, DECODE_PSK_LENGTH, &len,\
  92. (unsigned char *)pParams->device_secret, src_len);
  93. connect_params.device_secret = (char *)mqtt_client->psk_decode;
  94. connect_params.device_secret_len = len;
  95. if (rc != QCLOUD_RET_SUCCESS) {
  96. Log_e("Device secret decode err, secret:%s", pParams->device_secret);
  97. qcloud_iot_mqtt_fini(mqtt_client);
  98. HAL_Free(mqtt_client);
  99. HAL_Free(client_id);
  100. return NULL;
  101. }
  102. #endif
  103. rc = qcloud_iot_mqtt_connect(mqtt_client, &connect_params);
  104. if (rc != QCLOUD_RET_SUCCESS) {
  105. Log_e("mqtt connect with id: %s failed: %d", mqtt_client->options.conn_id, rc);
  106. qcloud_iot_mqtt_fini(mqtt_client);
  107. HAL_Free(mqtt_client);
  108. HAL_Free(client_id);
  109. return NULL;
  110. } else {
  111. Log_i("mqtt connect with id: %s success", mqtt_client->options.conn_id);
  112. }
  113. #ifdef LOG_UPLOAD
  114. // log subscribe topics
  115. if (is_log_uploader_init()) {
  116. set_log_mqtt_client((void *)mqtt_client);
  117. int log_level;
  118. rc = qcloud_get_log_level(&log_level);
  119. if (rc < 0) {
  120. Log_e("client get log topic failed: %d", rc);
  121. }
  122. IOT_Log_Upload(true);
  123. }
  124. #endif
  125. return mqtt_client;
  126. }
  127. int IOT_MQTT_Destroy(void **pClient)
  128. {
  129. POINTER_SANITY_CHECK(*pClient, QCLOUD_ERR_INVAL);
  130. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)(*pClient);
  131. int rc = qcloud_iot_mqtt_disconnect(mqtt_client);
  132. // disconnect network stack by force
  133. if (rc != QCLOUD_RET_SUCCESS) {
  134. mqtt_client->network_stack.disconnect(&(mqtt_client->network_stack));
  135. set_client_conn_state(mqtt_client, NOTCONNECTED);
  136. }
  137. int i = 0;
  138. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
  139. /* notify this event to topic subscriber */
  140. if (NULL != mqtt_client->sub_handles[i].topic_filter && NULL != mqtt_client->sub_handles[i].sub_event_handler)
  141. mqtt_client->sub_handles[i].sub_event_handler(mqtt_client, MQTT_EVENT_CLIENT_DESTROY,
  142. mqtt_client->sub_handles[i].handler_user_data);
  143. if (NULL != mqtt_client->sub_handles[i].topic_filter) {
  144. HAL_Free((void *)mqtt_client->sub_handles[i].topic_filter);
  145. mqtt_client->sub_handles[i].topic_filter = NULL;
  146. }
  147. }
  148. #ifdef MQTT_RMDUP_MSG_ENABLED
  149. reset_repeat_packet_id_buffer(mqtt_client);
  150. #endif
  151. HAL_MutexDestroy(mqtt_client->lock_generic);
  152. HAL_MutexDestroy(mqtt_client->lock_write_buf);
  153. HAL_MutexDestroy(mqtt_client->lock_list_sub);
  154. HAL_MutexDestroy(mqtt_client->lock_list_pub);
  155. list_destroy(mqtt_client->list_pub_wait_ack);
  156. list_destroy(mqtt_client->list_sub_wait_ack);
  157. HAL_Free(mqtt_client->options.client_id);
  158. HAL_Free(*pClient);
  159. *pClient = NULL;
  160. #ifdef LOG_UPLOAD
  161. set_log_mqtt_client(NULL);
  162. #endif
  163. Log_i("mqtt release!");
  164. return rc;
  165. }
  166. int IOT_MQTT_Yield(void *pClient, uint32_t timeout_ms)
  167. {
  168. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  169. int rc = qcloud_iot_mqtt_yield(mqtt_client, timeout_ms);
  170. #ifdef LOG_UPLOAD
  171. /* do instant log uploading if MQTT communication error */
  172. if (rc == QCLOUD_RET_SUCCESS)
  173. IOT_Log_Upload(false);
  174. else
  175. IOT_Log_Upload(true);
  176. #endif
  177. return rc;
  178. }
  179. int IOT_MQTT_Yield_MT(void *pClient, uint32_t timeout_ms)
  180. {
  181. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  182. int rc = qcloud_iot_mqtt_yield_mt(mqtt_client, timeout_ms);
  183. #ifdef LOG_UPLOAD
  184. /* do instant log uploading if MQTT communication error */
  185. if (rc == QCLOUD_RET_SUCCESS)
  186. IOT_Log_Upload(false);
  187. else
  188. IOT_Log_Upload(true);
  189. #endif
  190. return rc;
  191. }
  192. #ifdef MULTITHREAD_ENABLED
  193. void IOT_MQTT_Set_Yield_Thread_State(void *pClient, bool state)
  194. {
  195. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  196. mqtt_client->yield_thread_running = state;
  197. }
  198. #endif
  199. int IOT_MQTT_Publish(void *pClient, char *topicName, PublishParams *pParams)
  200. {
  201. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  202. return qcloud_iot_mqtt_publish(mqtt_client, topicName, pParams);
  203. }
  204. int IOT_MQTT_Subscribe(void *pClient, char *topicFilter, SubscribeParams *pParams)
  205. {
  206. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  207. return qcloud_iot_mqtt_subscribe(mqtt_client, topicFilter, pParams);
  208. }
  209. int IOT_MQTT_Unsubscribe(void *pClient, char *topicFilter)
  210. {
  211. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  212. return qcloud_iot_mqtt_unsubscribe(mqtt_client, topicFilter);
  213. }
  214. bool IOT_MQTT_IsSubReady(void *pClient, char *topicFilter)
  215. {
  216. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  217. return qcloud_iot_mqtt_is_sub_ready(mqtt_client, topicFilter);
  218. }
  219. bool IOT_MQTT_IsConnected(void *pClient)
  220. {
  221. IOT_FUNC_ENTRY;
  222. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  223. Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
  224. IOT_FUNC_EXIT_RC(get_client_conn_state(mqtt_client) == 1)
  225. }
  226. int qcloud_iot_mqtt_init(Qcloud_IoT_Client *pClient, MQTTInitParams *pParams)
  227. {
  228. IOT_FUNC_ENTRY;
  229. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  230. POINTER_SANITY_CHECK(pParams, QCLOUD_ERR_INVAL);
  231. memset(pClient, 0x0, sizeof(Qcloud_IoT_Client));
  232. int size = HAL_Snprintf(pClient->host_addr, HOST_STR_LENGTH, "%s.%s", pParams->product_id,
  233. iot_get_mqtt_domain(pParams->region));
  234. if (size < 0 || size > HOST_STR_LENGTH - 1) {
  235. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  236. }
  237. int i = 0;
  238. for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
  239. pClient->sub_handles[i].topic_filter = NULL;
  240. pClient->sub_handles[i].message_handler = NULL;
  241. pClient->sub_handles[i].sub_event_handler = NULL;
  242. pClient->sub_handles[i].qos = QOS0;
  243. pClient->sub_handles[i].handler_user_data = NULL;
  244. }
  245. if (pParams->command_timeout < MIN_COMMAND_TIMEOUT)
  246. pParams->command_timeout = MIN_COMMAND_TIMEOUT;
  247. if (pParams->command_timeout > MAX_COMMAND_TIMEOUT)
  248. pParams->command_timeout = MAX_COMMAND_TIMEOUT;
  249. pClient->command_timeout_ms = pParams->command_timeout;
  250. // packet id, random from [1 - 65536]
  251. pClient->next_packet_id = _get_random_start_packet_id();
  252. pClient->write_buf_size = QCLOUD_IOT_MQTT_TX_BUF_LEN;
  253. pClient->read_buf_size = QCLOUD_IOT_MQTT_RX_BUF_LEN;
  254. pClient->is_ping_outstanding = 0;
  255. pClient->was_manually_disconnected = 0;
  256. pClient->counter_network_disconnected = 0;
  257. #ifdef MULTITHREAD_ENABLED
  258. pClient->yield_thread_running = false;
  259. #endif
  260. pClient->event_handle = pParams->event_handle;
  261. pClient->lock_generic = HAL_MutexCreate();
  262. if (NULL == pClient->lock_generic) {
  263. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  264. }
  265. set_client_conn_state(pClient, NOTCONNECTED);
  266. if ((pClient->lock_write_buf = HAL_MutexCreate()) == NULL) {
  267. Log_e("create write buf lock failed.");
  268. goto error;
  269. }
  270. if ((pClient->lock_list_sub = HAL_MutexCreate()) == NULL) {
  271. Log_e("create sub list lock failed.");
  272. goto error;
  273. }
  274. if ((pClient->lock_list_pub = HAL_MutexCreate()) == NULL) {
  275. Log_e("create pub list lock failed.");
  276. goto error;
  277. }
  278. if ((pClient->list_pub_wait_ack = list_new()) == NULL) {
  279. Log_e("create pub wait list failed.");
  280. goto error;
  281. }
  282. pClient->list_pub_wait_ack->free = HAL_Free;
  283. if ((pClient->list_sub_wait_ack = list_new()) == NULL) {
  284. Log_e("create sub wait list failed.");
  285. goto error;
  286. }
  287. pClient->list_sub_wait_ack->free = HAL_Free;
  288. #ifndef AUTH_WITH_NOTLS// device param for TLS connection
  289. #ifdef AUTH_MODE_CERT
  290. strncpy(pClient->cert_file_path, pParams->cert_file, FILE_PATH_MAX_LEN - 1);
  291. strncpy(pClient->key_file_path, pParams->key_file, FILE_PATH_MAX_LEN - 1);
  292. pClient->network_stack.ssl_connect_params.cert_file = pClient->cert_file_path;
  293. pClient->network_stack.ssl_connect_params.key_file = pClient->key_file_path;
  294. pClient->network_stack.ssl_connect_params.ca_crt = iot_ca_get();
  295. pClient->network_stack.ssl_connect_params.ca_crt_len = strlen(pClient->network_stack.ssl_connect_params.ca_crt);
  296. #else
  297. if (pParams->device_secret != NULL) {
  298. size_t src_len = strlen(pParams->device_secret);
  299. size_t len;
  300. memset(pClient->psk_decode, 0x00, DECODE_PSK_LENGTH);
  301. qcloud_iot_utils_base64decode(pClient->psk_decode, DECODE_PSK_LENGTH, &len,
  302. (unsigned char *)pParams->device_secret, src_len);
  303. pClient->network_stack.ssl_connect_params.psk = (char *)pClient->psk_decode;
  304. pClient->network_stack.ssl_connect_params.psk_length = len;
  305. } else {
  306. Log_e("psk is empty!");
  307. IOT_FUNC_EXIT_RC(QCLOUD_ERR_INVAL);
  308. }
  309. memset( pClient->network_stack.ssl_connect_params.psk_id, 0, MAX_SIZE_OF_CLIENT_ID);
  310. HAL_Snprintf( pClient->network_stack.ssl_connect_params.psk_id, MAX_SIZE_OF_CLIENT_ID, \
  311. "%s%s", pParams->product_id, pParams->device_name);
  312. pClient->network_stack.ssl_connect_params.ca_crt = NULL;
  313. pClient->network_stack.ssl_connect_params.ca_crt_len = 0;
  314. #endif
  315. pClient->network_stack.host = pClient->host_addr;
  316. pClient->network_stack.port = MQTT_SERVER_PORT_TLS;
  317. pClient->network_stack.ssl_connect_params.timeout_ms =
  318. (pClient->command_timeout_ms > QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT) ? pClient->command_timeout_ms
  319. : QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT;
  320. #else
  321. pClient->network_stack.host = pClient->host_addr;
  322. pClient->network_stack.port = MQTT_SERVER_PORT_NOTLS;
  323. #endif
  324. // init network stack
  325. qcloud_iot_mqtt_network_init(pParams->profile_idx, &(pClient->network_stack));
  326. // ping timer and reconnect delay timer
  327. InitTimer(&(pClient->ping_timer));
  328. InitTimer(&(pClient->reconnect_delay_timer));
  329. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  330. error:
  331. if (pClient->list_pub_wait_ack) {
  332. pClient->list_pub_wait_ack->free(pClient->list_pub_wait_ack);
  333. pClient->list_pub_wait_ack = NULL;
  334. }
  335. if (pClient->list_sub_wait_ack) {
  336. pClient->list_sub_wait_ack->free(pClient->list_sub_wait_ack);
  337. pClient->list_sub_wait_ack = NULL;
  338. }
  339. if (pClient->lock_generic) {
  340. HAL_MutexDestroy(pClient->lock_generic);
  341. pClient->lock_generic = NULL;
  342. }
  343. if (pClient->lock_list_sub) {
  344. HAL_MutexDestroy(pClient->lock_list_sub);
  345. pClient->lock_list_sub = NULL;
  346. }
  347. if (pClient->lock_list_pub) {
  348. HAL_MutexDestroy(pClient->lock_list_pub);
  349. pClient->lock_list_pub = NULL;
  350. }
  351. if (pClient->lock_write_buf) {
  352. HAL_MutexDestroy(pClient->lock_write_buf);
  353. pClient->lock_write_buf = NULL;
  354. }
  355. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE)
  356. }
  357. int qcloud_iot_mqtt_fini(Qcloud_IoT_Client *mqtt_client)
  358. {
  359. IOT_FUNC_ENTRY;
  360. POINTER_SANITY_CHECK(mqtt_client, QCLOUD_ERR_INVAL);
  361. HAL_MutexDestroy(mqtt_client->lock_generic);
  362. HAL_MutexDestroy(mqtt_client->lock_write_buf);
  363. HAL_MutexDestroy(mqtt_client->lock_list_sub);
  364. HAL_MutexDestroy(mqtt_client->lock_list_pub);
  365. list_destroy(mqtt_client->list_pub_wait_ack);
  366. list_destroy(mqtt_client->list_sub_wait_ack);
  367. Log_i("release mqtt client resources");
  368. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  369. }
  370. int qcloud_iot_mqtt_set_autoreconnect(Qcloud_IoT_Client *pClient, bool value)
  371. {
  372. IOT_FUNC_ENTRY;
  373. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  374. pClient->options.auto_connect_enable = (uint8_t)value;
  375. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  376. }
  377. bool qcloud_iot_mqtt_is_autoreconnect_enabled(Qcloud_IoT_Client *pClient)
  378. {
  379. IOT_FUNC_ENTRY;
  380. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  381. bool is_enabled = false;
  382. if (pClient->options.auto_connect_enable == 1) {
  383. is_enabled = true;
  384. }
  385. IOT_FUNC_EXIT_RC(is_enabled);
  386. }
  387. int qcloud_iot_mqtt_get_network_disconnected_count(Qcloud_IoT_Client *pClient)
  388. {
  389. IOT_FUNC_ENTRY;
  390. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  391. IOT_FUNC_EXIT_RC(pClient->counter_network_disconnected);
  392. }
  393. int qcloud_iot_mqtt_reset_network_disconnected_count(Qcloud_IoT_Client *pClient)
  394. {
  395. IOT_FUNC_ENTRY;
  396. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  397. pClient->counter_network_disconnected = 0;
  398. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  399. }
  400. #ifdef __cplusplus
  401. }
  402. #endif