chore: mqtt impl, fix more wiznet bugs

This commit is contained in:
2024-11-10 02:47:54 +06:00
parent 80cf21f143
commit 3ac9c62241
19 changed files with 1093 additions and 253 deletions

View File

@@ -14,6 +14,7 @@
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTClient.h"
#include <string.h>
static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
md->topicName = aTopicName;
@@ -56,7 +57,8 @@ void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeou
c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->messageHandlers[i].topicFilter[0] = '\0';
c->messageHandlers[i].fp = NULL;
c->command_timeout_ms = command_timeout_ms;
c->buf = sendbuf;
c->buf_size = sendbuf_size;
@@ -108,12 +110,12 @@ static int readPacket(MQTTClient* c, Timer* timer)
int len = 0;
int rem_len = 0;
/* 1. read the header byte. This has the packet type in it */
/* 1. read the header byte. This has the packet type in it */
if (c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)) != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
@@ -167,8 +169,8 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
// we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
if (c->messageHandlers[i].topicFilter[0] != '\0' && (MQTTPacket_equals(topicName, c->messageHandlers[i].topicFilter) ||
isTopicMatched(c->messageHandlers[i].topicFilter, topicName)))
{
if (c->messageHandlers[i].fp != NULL)
{
@@ -194,11 +196,14 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
int keepalive(MQTTClient* c)
{
int rc = FAILURE;
int rc = SUCCESSS;
if (c->keepAliveInterval == 0)
goto exit;
if (!c->isconnected)
{
rc = SUCCESSS;
rc = FAILURE;
goto exit;
}
@@ -209,9 +214,24 @@ int keepalive(MQTTClient* c)
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESSS) // send the ping packet
if (len <= 0)
{
rc = FAILURE;
goto exit;
}
rc = sendPacket(c, len, &timer);
if (rc == SUCCESSS)
{
c->ping_outstanding = 1;
TimerCountdown(&c->ping_timer, c->keepAliveInterval);
}
}
else
{
rc = FAILURE;
}
}
@@ -436,9 +456,10 @@ int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageH
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == 0)
if (c->messageHandlers[i].topicFilter[0] == '\0')
{
c->messageHandlers[i].topicFilter = topicFilter;
strncpy(c->messageHandlers[i].topicFilter, topicFilter, MAX_TOPIC_LENGTH - 1);
c->messageHandlers[i].topicFilter[MAX_TOPIC_LENGTH - 1] = '\0';
c->messageHandlers[i].fp = messageHandler;
rc = 0;
break;

View File

@@ -43,6 +43,8 @@
#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */
#endif
#define MAX_TOPIC_LENGTH 64
enum QoS { QOS0, QOS1, QOS2 };
/* all failure return codes must be negative */
@@ -97,7 +99,7 @@ typedef struct MQTTClient
struct MessageHandlers
{
const char* topicFilter;
char topicFilter[MAX_TOPIC_LENGTH];
void (*fp) (MessageData*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */