Files
ch32v203-eth-node/mqtt_handler.c
kuwoyuki 8adc726b0b chore: debloat (remove platformio), onewire improvements
Squashed commit of the following:

commit 5f16309f629b9928d2134b85ae64af69bc3ebbcd
Author: kuwoyuki <kuwoyuki@cock.li>
Date:   Sun Nov 24 22:55:15 2024 +0600

    fix: Makefile, improve onewire retries

commit 55496a3bda941b52ff349dc75c9c06eb5a37c07d
Author: kuwoyuki <kuwoyuki@cock.li>
Date:   Mon Nov 18 00:41:18 2024 +0600

    fix: make onewire validity less strict

commit 3428a9bc9792508972ce3e7e4e35a64f047bca10
Author: kuwoyuki <kuwoyuki@cock.li>
Date:   Sun Nov 17 23:57:55 2024 +0600

    chore: rm bins

commit 1594e5ed430522b15466c8afa62ff7fb1b28947c
Author: kuwoyuki <kuwoyuki@cock.li>
Date:   Sun Nov 17 23:32:01 2024 +0600

    chore: unplatformiofy
2024-11-24 22:56:05 +06:00

386 lines
12 KiB
C

#include "mqtt_handler.h"
#include <MQTT/MQTTClient.h>
#include <socket.h>
#include <string.h>
#include "debug.h"
#include "gpio.h"
#include "modbus.h"
#include "onewire_temp.h"
#include "systick.h"
// MQTT
#define MQTT_YIELD_INTERVAL 100 // 100ms between yields in main loop
#define MQTT_MAX_PACKET_WAIT 20 // Only wait up to 20ms for packet processing
#define MQTT_RECONNECT_INTERVAL 5000 // 5 seconds between reconnection attempts
// Homie convention constants
#define HOMIE_VERSION "4.0"
#define HOMIE_STATE_READY "ready"
#define HOMIE_STATE_LOST "lost"
#define HOMIE_STATE_SLEEPING "sleeping"
#define HOMIE_STATE_DISCONNECTED "disconnected"
// nodes list buffer
char nodes_list[MAX_PAYLOAD_LENGTH];
// Parse Homie topic format: homie/node-id/device-name/property/[set|get]
static bool parse_homie_topic(const char* topic, size_t topic_len,
char* device_name, size_t name_max,
char* property, size_t prop_max,
uint8_t* is_set) {
const char* segment_start = topic;
const char* topic_end = topic + topic_len;
uint8_t segment = 0;
// Skip first three segments (homie/node-id/device-name/)
while (segment < 3 && segment_start < topic_end) {
const char* slash = memchr(segment_start, '/', topic_end - segment_start);
if (!slash) return false;
if (segment == 2) {
size_t len = slash - segment_start;
if (len >= name_max) return false;
memcpy(device_name, segment_start, len);
device_name[len] = '\0';
}
segment_start = slash + 1;
segment++;
}
const char* slash = memchr(segment_start, '/', topic_end - segment_start);
if (!slash) return false;
size_t len = slash - segment_start;
if (len >= prop_max) return false;
memcpy(property, segment_start, len);
property[len] = '\0';
segment_start = slash + 1;
if (segment_start >= topic_end) return false; // Missing set/get
*is_set = (*segment_start == 's');
return true;
}
// MQTT client
int setup_mqtt_client(Network* network, ch32_mqtt_options_t* opts,
MQTTClient* client) {
static unsigned char tx_buffer[MQTT_TX_BUFFER_SIZE];
static unsigned char rx_buffer[MQTT_RX_BUFFER_SIZE];
int rc;
uint8_t target_ip[] = MQTT_SERVER_IP;
// Initialize network
NewNetwork(network, TCP_SOCKET);
rc = ConnectNetwork(network, target_ip, MQTT_PORT);
if (rc != SOCK_OK) {
DEBUG_PRINT("Network connection failed: %d\n", rc);
return -1;
}
// Initialize the MQTT client
MQTTClientInit(client, network, MQTT_COMMAND_TIMEOUT_MS, tx_buffer,
sizeof(tx_buffer), rx_buffer, sizeof(rx_buffer));
// Setup MQTT connection data
MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer;
// Configure Last Will and Testament
char will_topic[MAX_TOPIC_LENGTH];
snprintf(will_topic, sizeof(will_topic), "homie/%s/$state", NODE_CONFIG.id);
connect_data.willFlag = 1; // Enable LWT
connect_data.will =
(MQTTPacket_willOptions)MQTTPacket_willOptions_initializer;
connect_data.will.topicName.cstring = will_topic;
connect_data.will.message.cstring = HOMIE_STATE_LOST;
connect_data.will.retained = 1;
connect_data.will.qos = QOS1;
// rest
connect_data.MQTTVersion = 3;
connect_data.clientID.cstring = opts->clientid;
connect_data.username.cstring = opts->username;
connect_data.password.cstring = opts->password;
connect_data.keepAliveInterval = MQTT_KEEP_ALIVE_INTERVAL;
connect_data.cleansession = 1;
// Connect to MQTT broker
rc = MQTTConnect(client, &connect_data);
if (rc != 0) {
DEBUG_PRINT("Failed to connect: %d\n", rc);
return -2;
}
return 0;
}
int subscribe_to_topic(MQTTClient* client, const char* topic, enum QoS qos,
messageHandler message_callback) {
int rc = MQTTSubscribe(client, topic, qos, message_callback);
if (rc != 0) {
DEBUG_PRINT("Failed to subscribe to %s: %d\n", topic, rc);
return rc;
}
return 0;
}
void publish_message(MQTTClient* client, const char* payload,
const char* topic) {
MQTTMessage message = {.qos = QOS0,
.retained = 0,
.dup = 0,
.payload = (void*)payload,
.payloadlen = strlen(payload)
};
if (MQTTPublish(client, topic, &message) != 0) {
DEBUG_PRINT("Publish failed\n");
}
}
// publish retained messages
void publish_retained(MQTTClient* client, const char* topic,
const char* payload) {
MQTTMessage message = {.qos = QOS1,
.retained = 1,
.dup = 0,
.payload = (void*)payload,
.payloadlen = strlen(payload)};
if (MQTTPublish(client, topic, &message) != 0) {
DEBUG_PRINT("Failed to publish to %s\n", topic);
}
}
static void publish_device_attributes(MQTTClient* client) {
char topic[MAX_TOPIC_LENGTH];
char mac_str[18];
char* ptr = nodes_list;
size_t remaining = sizeof(nodes_list);
// Device attributes
snprintf(topic, sizeof(topic), "homie/%s/$homie", NODE_CONFIG.id);
publish_retained(client, topic, HOMIE_VERSION);
snprintf(topic, sizeof(topic), "homie/%s/$name", NODE_CONFIG.id);
publish_retained(client, topic, NODE_CONFIG.name);
snprintf(topic, sizeof(topic), "homie/%s/$state", NODE_CONFIG.id);
publish_retained(client, topic, HOMIE_STATE_READY);
// Format MAC address as XX:XX:XX:XX:XX:XX
snprintf(mac_str, sizeof(mac_str), "%02X:%02X:%02X:%02X:%02X:%02X",
NODE_CONFIG.mac[0], NODE_CONFIG.mac[1], NODE_CONFIG.mac[2],
NODE_CONFIG.mac[3], NODE_CONFIG.mac[4], NODE_CONFIG.mac[5]);
snprintf(topic, sizeof(topic), "homie/%s/$mac", NODE_CONFIG.id);
publish_retained(client, topic, mac_str);
ptr = nodes_list;
*ptr = '\0';
// add rs485 devices
for (int i = 0; i < RS485_DEVICE_COUNT; i++) {
if (i > 0 && remaining > 1) {
*ptr++ = ',';
remaining--;
}
size_t len = strlen(RS485_DEVICES[i].name);
if (len >= remaining) break;
memcpy(ptr, RS485_DEVICES[i].name, len);
ptr += len;
remaining -= len;
}
*ptr = '\0';
snprintf(topic, sizeof(topic), "homie/%s/$nodes", NODE_CONFIG.id);
publish_retained(client, topic, nodes_list);
// loc attribute
if (NODE_CONFIG.location[0] != '\0') {
snprintf(topic, sizeof(topic), "homie/%s/$location", NODE_CONFIG.id);
publish_retained(client, topic, NODE_CONFIG.location);
}
}
static void publish_rs485_node_attributes(MQTTClient* client,
const rs485_device_t* device) {
char topic[MAX_TOPIC_LENGTH];
// Node base attributes
snprintf(topic, sizeof(topic), "homie/%s/%s/$name", NODE_CONFIG.id,
device->name);
publish_retained(client, topic, device->name);
// Set properties based on device type
switch (device->type) {
case DEVICE_RELAY:
snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id,
device->name);
publish_retained(client, topic, "state");
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$name", NODE_CONFIG.id,
device->name);
publish_retained(client, topic, "State");
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$datatype",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "boolean");
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$settable",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "true");
break;
case DEVICE_SOIL_SENSOR:
snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id,
device->name);
publish_retained(client, topic, "moisture,temperature");
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$name",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "Moisture Level");
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$datatype",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "integer");
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$unit",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "%");
// Temperature property
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$name",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "Temperature");
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$datatype",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "float");
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$unit",
NODE_CONFIG.id, device->name);
publish_retained(client, topic, "°C");
break;
case DEVICE_THERMOMETER:
// TODO
DEBUG_PRINT("not implemented\n");
}
}
// Initialize MQTT with Homie discovery
void mqtt_init(mqtt_state_t* state) {
state->opts.clientid = (char*)NODE_CONFIG.id; // Use node ID as client ID
state->opts.username = "";
state->opts.password = "";
state->opts.qos = QOS1;
state->last_reconnect = 0;
state->last_yield = 0;
state->is_connected = false;
}
// Find device by name and return its index
static int8_t find_device_index(const char* name) {
for (uint8_t i = 0; i < RS485_DEVICE_COUNT; i++) {
if (strcmp(RS485_DEVICES[i].name, name) == 0) {
return i;
}
}
return -1;
}
void message_arrived(MessageData* md) {
if (!md || !md->message || !md->topicName) {
DEBUG_PRINT("Error: MessageData is NULL.\n");
return;
}
char device_name[16];
char property[16];
uint8_t is_set;
if (!parse_homie_topic(md->topicName->lenstring.data,
md->topicName->lenstring.len, device_name,
sizeof(device_name), property, sizeof(property),
&is_set)) {
DEBUG_PRINT("Failed to parse topic\n");
return;
}
// For set operations, parse value
if (is_set && md->message->payloadlen > 0) {
uint16_t value = 0;
int8_t idx = find_device_index(device_name);
if (idx < 0) return;
if (RS485_DEVICES[idx].type == DEVICE_RELAY) {
if (md->message->payloadlen != 16) return;
char* str = (char*)md->message->payload;
for (int i = 0; i < 16; i++) {
value = (value << 1) | (str[i] - '0');
}
} else {
// todo:
DEBUG_PRINT("not implemented\n");
}
modbus_handler_send_request(idx, property, is_set, value);
}
}
void mqtt_process(mqtt_state_t* state) {
uint32_t now = millis();
int rc;
if (!state->is_connected) {
if (now - state->last_reconnect >= MQTT_RECONNECT_INTERVAL) {
rc = setup_mqtt_client(&state->network, &state->opts, &state->client);
if (rc == SUCCESS) {
state->is_connected = true;
if (!state->discovery_published) {
publish_device_attributes(&state->client);
for (int i = 0; i < RS485_DEVICE_COUNT; i++) {
publish_rs485_node_attributes(&state->client, &RS485_DEVICES[i]);
}
// with onewire we can discover new devices on the bus during runtime
// is it worth implementing?
onewire_temp_publish_discovery(&state->client, NODE_CONFIG.id);
state->discovery_published = 1;
}
char sub_topic[MAX_TOPIC_LENGTH];
snprintf(sub_topic, sizeof(sub_topic), "homie/%s/+/+/set",
NODE_CONFIG.id);
rc = subscribe_to_topic(&state->client, sub_topic, QOS1,
message_arrived);
if (rc != SUCCESS) {
state->is_connected = false;
}
}
state->last_reconnect = now;
}
led_status_set(LED_STATE_BUSY);
} else if (now - state->last_yield >= MQTT_YIELD_INTERVAL) {
rc = MQTTYield(&state->client, MQTT_MAX_PACKET_WAIT);
if (rc != SUCCESS) {
DEBUG_PRINT("MQTT Yield failed with rc=%d, ping_outstanding=%d\n", rc,
state->client.ping_outstanding);
state->is_connected = false;
state->discovery_published = false;
}
state->last_yield = now;
led_status_set(LED_STATE_ON);
}
}