gateway_api.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. /*
  2. * Tencent is pleased to support the open source community by making IoT Hub
  3. available.
  4. * Copyright (C) 2018-2020 THL A29 Limited, a Tencent company. All rights
  5. reserved.
  6. * Licensed under the MIT License (the "License"); you may not use this file
  7. except in
  8. * compliance with the License. You may obtain a copy of the License at
  9. * http://opensource.org/licenses/MIT
  10. * Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is
  12. * distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. KIND,
  14. * either express or implied. See the License for the specific language
  15. governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. #include <string.h>
  20. #include "gateway_common.h"
  21. #include "mqtt_client.h"
  22. #include "utils_param_check.h"
  23. #ifdef MULTITHREAD_ENABLED
  24. /**
  25. * gateway yield thread runner
  26. */
  27. static void gateway_yield_thread(void *pClient)
  28. {
  29. #define THREAD_SLEEP_INTERVAL_MS 1
  30. int rc = QCLOUD_RET_SUCCESS;
  31. Gateway *pGateway = (Gateway *)pClient;
  32. Log_d("gateway yield thread start ...");
  33. while (pGateway->yield_thread_running) {
  34. rc = IOT_Gateway_Yield(pGateway, 200);
  35. if (rc == QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT) {
  36. HAL_SleepMs(THREAD_SLEEP_INTERVAL_MS);
  37. continue;
  38. } else if (rc == QCLOUD_RET_MQTT_MANUALLY_DISCONNECTED || rc == QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT) {
  39. Log_e("Gateway yield thread exit with error: %d", rc);
  40. break;
  41. } else if (rc != QCLOUD_RET_SUCCESS && rc != QCLOUD_RET_MQTT_RECONNECTED) {
  42. Log_e("Something goes error: %d", rc);
  43. }
  44. HAL_SleepMs(THREAD_SLEEP_INTERVAL_MS);
  45. }
  46. pGateway->yield_thread_running = false;
  47. pGateway->yield_thread_exit_code = rc;
  48. #undef THREAD_SLEEP_INTERVAL_MS
  49. }
  50. int IOT_Gateway_Start_Yield_Thread(void *pClient)
  51. {
  52. POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
  53. Gateway *pGateway = (Gateway *)pClient;
  54. ThreadParams thread_params = {0};
  55. thread_params.thread_func = gateway_yield_thread;
  56. thread_params.thread_name = "gateway_yield_thread";
  57. thread_params.user_arg = pClient;
  58. thread_params.stack_size = 4096;
  59. thread_params.priority = 1;
  60. pGateway->yield_thread_running = true;
  61. IOT_MQTT_Set_Yield_Thread_State(pGateway->mqtt, true);
  62. int rc = HAL_ThreadCreate(&thread_params);
  63. if (rc) {
  64. Log_e("create template_yield_thread fail: %d", rc);
  65. return QCLOUD_ERR_FAILURE;
  66. }
  67. HAL_SleepMs(500);
  68. return QCLOUD_RET_SUCCESS;
  69. }
  70. void IOT_Gateway_Stop_Yield_Thread(void *pClient)
  71. {
  72. POINTER_SANITY_CHECK_RTN(pClient);
  73. Gateway *pGateway = (Gateway *)pClient;
  74. pGateway->yield_thread_running = false;
  75. IOT_MQTT_Set_Yield_Thread_State(pGateway->mqtt, false);
  76. HAL_SleepMs(1000);
  77. return;
  78. }
  79. bool IOT_Gateway_Get_Yield_Status(void *pClient, int *exit_code)
  80. {
  81. POINTER_SANITY_CHECK(pClient, false);
  82. Gateway *pGateway = (Gateway *)pClient;
  83. *exit_code = pGateway->yield_thread_exit_code;
  84. return pGateway->yield_thread_running;
  85. }
  86. #endif
  87. void _gateway_event_handler(void *client, void *context, MQTTEventMsg *msg)
  88. {
  89. uintptr_t packet_id = (uintptr_t)msg->msg;
  90. Gateway * gateway = (Gateway *)context;
  91. POINTER_SANITY_CHECK_RTN(context);
  92. POINTER_SANITY_CHECK_RTN(msg);
  93. MQTTMessage *topic_info = (MQTTMessage *)msg->msg;
  94. switch (msg->event_type) {
  95. case MQTT_EVENT_SUBCRIBE_SUCCESS:
  96. case MQTT_EVENT_UNSUBCRIBE_SUCCESS:
  97. Log_d("gateway sub|unsub(%d) success, packet-id=%u", msg->event_type, (unsigned int)packet_id);
  98. if (gateway->gateway_data.sync_status == packet_id) {
  99. gateway->gateway_data.sync_status = 0;
  100. return;
  101. }
  102. break;
  103. case MQTT_EVENT_SUBCRIBE_TIMEOUT:
  104. case MQTT_EVENT_UNSUBCRIBE_TIMEOUT:
  105. case MQTT_EVENT_SUBCRIBE_NACK:
  106. case MQTT_EVENT_UNSUBCRIBE_NACK:
  107. Log_d("gateway timeout|nack(%d) event, packet-id=%u", msg->event_type, (unsigned int)packet_id);
  108. if (gateway->gateway_data.sync_status == packet_id) {
  109. gateway->gateway_data.sync_status = -1;
  110. return;
  111. }
  112. break;
  113. case MQTT_EVENT_PUBLISH_RECVEIVED:
  114. Log_d(
  115. "gateway topic message arrived but without any related handle: "
  116. "topic=%.*s, topic_msg=%.*s",
  117. topic_info->topic_len, topic_info->ptopic, topic_info->payload_len, topic_info->payload);
  118. break;
  119. default:
  120. break;
  121. }
  122. if (gateway->event_handle.h_fp != NULL) {
  123. gateway->event_handle.h_fp(client, gateway->event_handle.context, msg);
  124. }
  125. return;
  126. }
  127. void *IOT_Gateway_Construct(GatewayInitParam *init_param)
  128. {
  129. int rc = 0;
  130. GatewayParam param = DEFAULT_GATEWAY_PARAMS;
  131. POINTER_SANITY_CHECK(init_param, NULL);
  132. Gateway *gateway = (Gateway *)HAL_Malloc(sizeof(Gateway));
  133. if (gateway == NULL) {
  134. Log_e("gateway malloc failed");
  135. IOT_FUNC_EXIT_RC(NULL);
  136. }
  137. memset(gateway, 0, sizeof(Gateway));
  138. /* replace user event handle */
  139. gateway->event_handle.h_fp = init_param->init_param.event_handle.h_fp;
  140. gateway->event_handle.context = init_param->init_param.event_handle.context;
  141. /* set _gateway_event_handler as mqtt event handle */
  142. init_param->init_param.event_handle.h_fp = _gateway_event_handler;
  143. init_param->init_param.event_handle.context = gateway;
  144. /* construct MQTT client */
  145. gateway->mqtt = IOT_MQTT_Construct(&init_param->init_param);
  146. if (NULL == gateway->mqtt) {
  147. Log_e("construct MQTT failed");
  148. HAL_Free(gateway);
  149. IOT_FUNC_EXIT_RC(NULL);
  150. }
  151. /* subscribe default topic */
  152. param.product_id = init_param->init_param.product_id;
  153. param.device_name = init_param->init_param.device_name;
  154. rc = gateway_subscribe_unsubscribe_default(gateway, &param);
  155. if (QCLOUD_RET_SUCCESS != rc) {
  156. Log_e("subscribe default topic failed");
  157. IOT_Gateway_Destroy((void *)gateway);
  158. IOT_FUNC_EXIT_RC(NULL);
  159. }
  160. #ifdef MULTITHREAD_ENABLED
  161. gateway->yield_thread_running = false;
  162. #endif
  163. return (void *)gateway;
  164. }
  165. int IOT_Gateway_Subdev_Online(void *client, GatewayParam *param)
  166. {
  167. int rc = 0;
  168. char topic[MAX_SIZE_OF_CLOUD_TOPIC + 1] = {0};
  169. char payload[GATEWAY_PAYLOAD_BUFFER_LEN + 1] = {0};
  170. int size = 0;
  171. SubdevSession *session = NULL;
  172. PublishParams params = DEFAULT_PUB_PARAMS;
  173. Gateway * gateway = (Gateway *)client;
  174. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  175. POINTER_SANITY_CHECK(param, QCLOUD_ERR_INVAL);
  176. STRING_PTR_SANITY_CHECK(param->product_id, QCLOUD_ERR_INVAL);
  177. STRING_PTR_SANITY_CHECK(param->device_name, QCLOUD_ERR_INVAL);
  178. STRING_PTR_SANITY_CHECK(param->subdev_product_id, QCLOUD_ERR_INVAL);
  179. STRING_PTR_SANITY_CHECK(param->subdev_device_name, QCLOUD_ERR_INVAL);
  180. session = subdev_find_session(gateway, param->subdev_product_id, param->subdev_device_name);
  181. if (NULL == session) {
  182. Log_d("there is no session, create a new session");
  183. /* create subdev session */
  184. session = subdev_add_session(gateway, param->subdev_product_id, param->subdev_device_name);
  185. if (NULL == session) {
  186. Log_e("create session error!");
  187. IOT_FUNC_EXIT_RC(QCLOUD_ERR_GATEWAY_CREATE_SESSION_FAIL);
  188. }
  189. } else {
  190. if (SUBDEV_SEESION_STATUS_ONLINE == session->session_status) {
  191. Log_i("device have online");
  192. IOT_FUNC_EXIT_RC(QCLOUD_ERR_GATEWAY_SUBDEV_ONLINE);
  193. }
  194. }
  195. size = HAL_Snprintf(topic, MAX_SIZE_OF_CLOUD_TOPIC + 1, GATEWAY_TOPIC_OPERATION_FMT, param->product_id,
  196. param->device_name);
  197. if (size < 0 || size > MAX_SIZE_OF_CLOUD_TOPIC) {
  198. Log_e("buf size < topic length!");
  199. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  200. }
  201. size = HAL_Snprintf(payload, GATEWAY_PAYLOAD_BUFFER_LEN + 1, GATEWAY_PAYLOAD_STATUS_FMT, GATEWAY_ONLINE_OP_STR,
  202. param->subdev_product_id, param->subdev_device_name);
  203. if (size < 0 || size > GATEWAY_PAYLOAD_BUFFER_LEN) {
  204. Log_e("buf size < payload length!");
  205. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  206. }
  207. size = HAL_Snprintf(gateway->gateway_data.online.client_id, MAX_SIZE_OF_CLIENT_ID, GATEWAY_CLIENT_ID_FMT,
  208. param->subdev_product_id, param->subdev_device_name);
  209. if (size < 0 || size > MAX_SIZE_OF_CLIENT_ID) {
  210. Log_e("buf size < client_id length!");
  211. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  212. }
  213. gateway->gateway_data.online.result = -2;
  214. params.qos = QOS0;
  215. params.payload_len = strlen(payload);
  216. params.payload = (char *)payload;
  217. /* publish packet */
  218. rc = gateway_publish_sync(gateway, topic, &params, &gateway->gateway_data.online.result);
  219. if (QCLOUD_RET_SUCCESS != rc) {
  220. subdev_remove_session(gateway, param->subdev_product_id, param->subdev_device_name);
  221. IOT_FUNC_EXIT_RC(rc);
  222. }
  223. session->session_status = SUBDEV_SEESION_STATUS_ONLINE;
  224. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  225. }
  226. int IOT_Gateway_Subdev_Offline(void *client, GatewayParam *param)
  227. {
  228. int rc = 0;
  229. char topic[MAX_SIZE_OF_CLOUD_TOPIC + 1] = {0};
  230. char payload[GATEWAY_PAYLOAD_BUFFER_LEN + 1] = {0};
  231. int size = 0;
  232. SubdevSession *session = NULL;
  233. Gateway * gateway = (Gateway *)client;
  234. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  235. POINTER_SANITY_CHECK(param, QCLOUD_ERR_INVAL);
  236. STRING_PTR_SANITY_CHECK(param->product_id, QCLOUD_ERR_INVAL);
  237. STRING_PTR_SANITY_CHECK(param->device_name, QCLOUD_ERR_INVAL);
  238. STRING_PTR_SANITY_CHECK(param->subdev_product_id, QCLOUD_ERR_INVAL);
  239. STRING_PTR_SANITY_CHECK(param->subdev_device_name, QCLOUD_ERR_INVAL);
  240. session = subdev_find_session(gateway, param->subdev_product_id, param->subdev_device_name);
  241. if (NULL == session) {
  242. Log_d("no session, can not offline");
  243. IOT_FUNC_EXIT_RC(QCLOUD_ERR_GATEWAY_SESSION_NO_EXIST);
  244. }
  245. if (SUBDEV_SEESION_STATUS_OFFLINE == session->session_status) {
  246. Log_i("device have offline");
  247. /* free session */
  248. subdev_remove_session(gateway, param->subdev_product_id, param->subdev_device_name);
  249. IOT_FUNC_EXIT_RC(QCLOUD_ERR_GATEWAY_SUBDEV_OFFLINE);
  250. }
  251. size = HAL_Snprintf(topic, MAX_SIZE_OF_CLOUD_TOPIC + 1, GATEWAY_TOPIC_OPERATION_FMT, param->product_id,
  252. param->device_name);
  253. if (size < 0 || size > MAX_SIZE_OF_CLOUD_TOPIC) {
  254. Log_e("buf size < topic length!");
  255. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  256. }
  257. size = HAL_Snprintf(payload, GATEWAY_PAYLOAD_BUFFER_LEN + 1, GATEWAY_PAYLOAD_STATUS_FMT, GATEWAY_OFFLIN_OP_STR,
  258. param->subdev_product_id, param->subdev_device_name);
  259. if (size < 0 || size > GATEWAY_PAYLOAD_BUFFER_LEN) {
  260. Log_e("buf size < payload length!");
  261. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  262. }
  263. size = HAL_Snprintf(gateway->gateway_data.offline.client_id, MAX_SIZE_OF_CLIENT_ID, GATEWAY_CLIENT_ID_FMT,
  264. param->subdev_product_id, param->subdev_device_name);
  265. if (size < 0 || size > MAX_SIZE_OF_CLIENT_ID) {
  266. Log_e("buf size < client_id length!");
  267. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  268. }
  269. gateway->gateway_data.offline.result = -2;
  270. PublishParams params = DEFAULT_PUB_PARAMS;
  271. params.qos = QOS0;
  272. params.payload_len = strlen(payload);
  273. params.payload = (char *)payload;
  274. /* publish packet */
  275. rc = gateway_publish_sync(gateway, topic, &params, &gateway->gateway_data.offline.result);
  276. if (QCLOUD_RET_SUCCESS != rc) {
  277. IOT_FUNC_EXIT_RC(rc);
  278. }
  279. session->session_status = SUBDEV_SEESION_STATUS_OFFLINE;
  280. /* free session */
  281. subdev_remove_session(gateway, param->subdev_product_id, param->subdev_device_name);
  282. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  283. }
  284. int IOT_Gateway_Subdev_Bind(void *client, GatewayParam *param, DeviceInfo *pBindSubDevInfo)
  285. {
  286. char topic[MAX_SIZE_OF_CLOUD_TOPIC + 1];
  287. char payload[GATEWAY_PAYLOAD_BUFFER_LEN + 1];
  288. int size = 0;
  289. Gateway * gateway = (Gateway *)client;
  290. memset(topic, 0, MAX_SIZE_OF_CLOUD_TOPIC);
  291. size = HAL_Snprintf(topic, MAX_SIZE_OF_CLOUD_TOPIC + 1, GATEWAY_TOPIC_OPERATION_FMT, param->product_id, param->device_name);
  292. if (size < 0 || size > MAX_SIZE_OF_CLOUD_TOPIC) {
  293. Log_e("buf size < topic length!");
  294. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  295. }
  296. srand((unsigned)HAL_GetTimeMs());
  297. int nonce = rand();
  298. long timestamp = HAL_Timer_current_sec();
  299. /*cal sign*/
  300. char sign[SUBDEV_BIND_SIGN_LEN];
  301. memset(sign, 0, SUBDEV_BIND_SIGN_LEN);
  302. if (QCLOUD_RET_SUCCESS != subdev_bind_hmac_sha1_cal(pBindSubDevInfo, sign, SUBDEV_BIND_SIGN_LEN, nonce, timestamp)) {
  303. Log_e("cal sign fail");
  304. return QCLOUD_ERR_FAILURE;
  305. }
  306. memset(payload, 0, GATEWAY_PAYLOAD_BUFFER_LEN);
  307. #ifdef AUTH_MODE_CERT
  308. size = HAL_Snprintf(payload, GATEWAY_PAYLOAD_BUFFER_LEN + 1, GATEWAY_PAYLOAD_OP_FMT, GATEWAY_BIND_OP_STR,
  309. pBindSubDevInfo->product_id, pBindSubDevInfo->device_name, sign, nonce, timestamp, "hmacsha1", "certificate");
  310. #else
  311. size = HAL_Snprintf(payload, GATEWAY_PAYLOAD_BUFFER_LEN + 1, GATEWAY_PAYLOAD_OP_FMT, GATEWAY_BIND_OP_STR,
  312. pBindSubDevInfo->product_id, pBindSubDevInfo->device_name, sign, nonce, timestamp, "hmacsha1", "psk");
  313. #endif
  314. if (size < 0 || size > GATEWAY_PAYLOAD_BUFFER_LEN) {
  315. Log_e("buf size < payload length!");
  316. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  317. }
  318. size = HAL_Snprintf(gateway->gateway_data.bind.client_id, MAX_SIZE_OF_CLIENT_ID, GATEWAY_CLIENT_ID_FMT,
  319. pBindSubDevInfo->product_id, pBindSubDevInfo->device_name);
  320. if (size < 0 || size > MAX_SIZE_OF_CLIENT_ID) {
  321. Log_e("buf size < client_id length!");
  322. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  323. }
  324. PublishParams params = DEFAULT_PUB_PARAMS;
  325. params.qos = QOS0;
  326. params.payload_len = strlen(payload);
  327. params.payload = (char *)payload;
  328. /* publish packet */
  329. gateway->gateway_data.bind.result = -2;
  330. int rc = gateway_publish_sync(gateway, topic, &params, &gateway->gateway_data.bind.result);
  331. if (QCLOUD_RET_SUCCESS != rc) {
  332. IOT_FUNC_EXIT_RC(gateway->gateway_data.bind.result);
  333. }
  334. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  335. }
  336. int IOT_Gateway_Subdev_Unbind(void *client, GatewayParam *param, DeviceInfo *pSubDevInfo)
  337. {
  338. char topic[MAX_SIZE_OF_CLOUD_TOPIC + 1];
  339. char payload[GATEWAY_PAYLOAD_BUFFER_LEN + 1];
  340. int size = 0;
  341. Gateway * gateway = (Gateway *)client;
  342. memset(topic, 0, MAX_SIZE_OF_CLOUD_TOPIC);
  343. size = HAL_Snprintf(topic, MAX_SIZE_OF_CLOUD_TOPIC + 1, GATEWAY_TOPIC_OPERATION_FMT, param->product_id, param->device_name);
  344. if (size < 0 || size > MAX_SIZE_OF_CLOUD_TOPIC) {
  345. Log_e("buf size < topic length!");
  346. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  347. }
  348. memset(payload, 0, GATEWAY_PAYLOAD_BUFFER_LEN);
  349. size = HAL_Snprintf(payload, GATEWAY_PAYLOAD_BUFFER_LEN + 1, GATEWAY_PAYLOAD_STATUS_FMT, GATEWAY_UNBIND_OP_STR,
  350. pSubDevInfo->product_id, pSubDevInfo->device_name);
  351. if (size < 0 || size > GATEWAY_PAYLOAD_BUFFER_LEN) {
  352. Log_e("buf size < payload length!");
  353. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  354. }
  355. size = HAL_Snprintf(gateway->gateway_data.unbind.client_id, MAX_SIZE_OF_CLIENT_ID, GATEWAY_CLIENT_ID_FMT,
  356. pSubDevInfo->product_id, pSubDevInfo->device_name);
  357. if (size < 0 || size > MAX_SIZE_OF_CLIENT_ID) {
  358. Log_e("buf size < client_id length!");
  359. IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
  360. }
  361. PublishParams params = DEFAULT_PUB_PARAMS;
  362. params.qos = QOS0;
  363. params.payload_len = strlen(payload);
  364. params.payload = (char *)payload;
  365. /* publish packet */
  366. gateway->gateway_data.unbind.result = -2;
  367. int rc = gateway_publish_sync(gateway, topic, &params, &gateway->gateway_data.unbind.result);
  368. if (QCLOUD_RET_SUCCESS != rc) {
  369. IOT_FUNC_EXIT_RC(gateway->gateway_data.unbind.result);
  370. }
  371. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
  372. }
  373. void *IOT_Gateway_Get_Mqtt_Client(void *client)
  374. {
  375. POINTER_SANITY_CHECK(client, NULL);
  376. Gateway *gateway = (Gateway *)client;
  377. return gateway->mqtt;
  378. }
  379. int IOT_Gateway_Destroy(void *client)
  380. {
  381. Gateway *gateway = (Gateway *)client;
  382. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  383. SubdevSession *cur_session = gateway->session_list;
  384. while (cur_session) {
  385. SubdevSession *session = cur_session;
  386. cur_session = cur_session->next;
  387. HAL_Free(session);
  388. }
  389. IOT_MQTT_Destroy(&gateway->mqtt);
  390. HAL_Free(client);
  391. IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS)
  392. }
  393. int IOT_Gateway_Yield(void *client, uint32_t timeout_ms)
  394. {
  395. Gateway *gateway = (Gateway *)client;
  396. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  397. return IOT_MQTT_Yield(gateway->mqtt, timeout_ms);
  398. }
  399. int IOT_Gateway_Subscribe(void *client, char *topic_filter, SubscribeParams *params)
  400. {
  401. Gateway *gateway = (Gateway *)client;
  402. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  403. return IOT_MQTT_Subscribe(gateway->mqtt, topic_filter, params);
  404. }
  405. int IOT_Gateway_Unsubscribe(void *client, char *topic_filter)
  406. {
  407. Gateway *gateway = (Gateway *)client;
  408. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  409. return IOT_MQTT_Unsubscribe(gateway->mqtt, topic_filter);
  410. }
  411. int IOT_Gateway_IsSubReady(void *client, char *topic_filter)
  412. {
  413. Gateway *gateway = (Gateway *)client;
  414. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  415. return IOT_MQTT_IsSubReady(gateway->mqtt, topic_filter);
  416. }
  417. int IOT_Gateway_Publish(void *client, char *topic_name, PublishParams *params)
  418. {
  419. Gateway *gateway = (Gateway *)client;
  420. POINTER_SANITY_CHECK(gateway, QCLOUD_ERR_INVAL);
  421. return IOT_MQTT_Publish(gateway->mqtt, topic_name, params);
  422. }