Squashed commit of the following: commit 5f16309f629b9928d2134b85ae64af69bc3ebbcd Author: kuwoyuki <kuwoyuki@cock.li> Date: Sun Nov 24 22:55:15 2024 +0600 fix: Makefile, improve onewire retries commit 55496a3bda941b52ff349dc75c9c06eb5a37c07d Author: kuwoyuki <kuwoyuki@cock.li> Date: Mon Nov 18 00:41:18 2024 +0600 fix: make onewire validity less strict commit 3428a9bc9792508972ce3e7e4e35a64f047bca10 Author: kuwoyuki <kuwoyuki@cock.li> Date: Sun Nov 17 23:57:55 2024 +0600 chore: rm bins commit 1594e5ed430522b15466c8afa62ff7fb1b28947c Author: kuwoyuki <kuwoyuki@cock.li> Date: Sun Nov 17 23:32:01 2024 +0600 chore: unplatformiofy
386 lines
12 KiB
C
386 lines
12 KiB
C
#include "mqtt_handler.h"
|
|
|
|
#include <MQTT/MQTTClient.h>
|
|
#include <socket.h>
|
|
#include <string.h>
|
|
|
|
#include "debug.h"
|
|
#include "gpio.h"
|
|
#include "modbus.h"
|
|
#include "onewire_temp.h"
|
|
#include "systick.h"
|
|
|
|
// MQTT
|
|
#define MQTT_YIELD_INTERVAL 100 // 100ms between yields in main loop
|
|
#define MQTT_MAX_PACKET_WAIT 20 // Only wait up to 20ms for packet processing
|
|
#define MQTT_RECONNECT_INTERVAL 5000 // 5 seconds between reconnection attempts
|
|
|
|
// Homie convention constants
|
|
#define HOMIE_VERSION "4.0"
|
|
#define HOMIE_STATE_READY "ready"
|
|
#define HOMIE_STATE_LOST "lost"
|
|
#define HOMIE_STATE_SLEEPING "sleeping"
|
|
#define HOMIE_STATE_DISCONNECTED "disconnected"
|
|
|
|
// nodes list buffer
|
|
char nodes_list[MAX_PAYLOAD_LENGTH];
|
|
|
|
// Parse Homie topic format: homie/node-id/device-name/property/[set|get]
|
|
static bool parse_homie_topic(const char* topic, size_t topic_len,
|
|
char* device_name, size_t name_max,
|
|
char* property, size_t prop_max,
|
|
uint8_t* is_set) {
|
|
const char* segment_start = topic;
|
|
const char* topic_end = topic + topic_len;
|
|
uint8_t segment = 0;
|
|
|
|
// Skip first three segments (homie/node-id/device-name/)
|
|
while (segment < 3 && segment_start < topic_end) {
|
|
const char* slash = memchr(segment_start, '/', topic_end - segment_start);
|
|
if (!slash) return false;
|
|
|
|
if (segment == 2) {
|
|
size_t len = slash - segment_start;
|
|
if (len >= name_max) return false;
|
|
memcpy(device_name, segment_start, len);
|
|
device_name[len] = '\0';
|
|
}
|
|
|
|
segment_start = slash + 1;
|
|
segment++;
|
|
}
|
|
|
|
const char* slash = memchr(segment_start, '/', topic_end - segment_start);
|
|
if (!slash) return false;
|
|
|
|
size_t len = slash - segment_start;
|
|
if (len >= prop_max) return false;
|
|
memcpy(property, segment_start, len);
|
|
property[len] = '\0';
|
|
|
|
segment_start = slash + 1;
|
|
if (segment_start >= topic_end) return false; // Missing set/get
|
|
|
|
*is_set = (*segment_start == 's');
|
|
|
|
return true;
|
|
}
|
|
|
|
// MQTT client
|
|
int setup_mqtt_client(Network* network, ch32_mqtt_options_t* opts,
|
|
MQTTClient* client) {
|
|
static unsigned char tx_buffer[MQTT_TX_BUFFER_SIZE];
|
|
static unsigned char rx_buffer[MQTT_RX_BUFFER_SIZE];
|
|
int rc;
|
|
uint8_t target_ip[] = MQTT_SERVER_IP;
|
|
|
|
// Initialize network
|
|
NewNetwork(network, TCP_SOCKET);
|
|
rc = ConnectNetwork(network, target_ip, MQTT_PORT);
|
|
if (rc != SOCK_OK) {
|
|
DEBUG_PRINT("Network connection failed: %d\n", rc);
|
|
return -1;
|
|
}
|
|
|
|
// Initialize the MQTT client
|
|
MQTTClientInit(client, network, MQTT_COMMAND_TIMEOUT_MS, tx_buffer,
|
|
sizeof(tx_buffer), rx_buffer, sizeof(rx_buffer));
|
|
|
|
// Setup MQTT connection data
|
|
MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer;
|
|
|
|
// Configure Last Will and Testament
|
|
char will_topic[MAX_TOPIC_LENGTH];
|
|
snprintf(will_topic, sizeof(will_topic), "homie/%s/$state", NODE_CONFIG.id);
|
|
|
|
connect_data.willFlag = 1; // Enable LWT
|
|
connect_data.will =
|
|
(MQTTPacket_willOptions)MQTTPacket_willOptions_initializer;
|
|
connect_data.will.topicName.cstring = will_topic;
|
|
connect_data.will.message.cstring = HOMIE_STATE_LOST;
|
|
connect_data.will.retained = 1;
|
|
connect_data.will.qos = QOS1;
|
|
|
|
// rest
|
|
connect_data.MQTTVersion = 3;
|
|
connect_data.clientID.cstring = opts->clientid;
|
|
connect_data.username.cstring = opts->username;
|
|
connect_data.password.cstring = opts->password;
|
|
connect_data.keepAliveInterval = MQTT_KEEP_ALIVE_INTERVAL;
|
|
connect_data.cleansession = 1;
|
|
|
|
// Connect to MQTT broker
|
|
rc = MQTTConnect(client, &connect_data);
|
|
if (rc != 0) {
|
|
DEBUG_PRINT("Failed to connect: %d\n", rc);
|
|
return -2;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int subscribe_to_topic(MQTTClient* client, const char* topic, enum QoS qos,
|
|
messageHandler message_callback) {
|
|
int rc = MQTTSubscribe(client, topic, qos, message_callback);
|
|
if (rc != 0) {
|
|
DEBUG_PRINT("Failed to subscribe to %s: %d\n", topic, rc);
|
|
return rc;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void publish_message(MQTTClient* client, const char* payload,
|
|
const char* topic) {
|
|
MQTTMessage message = {.qos = QOS0,
|
|
.retained = 0,
|
|
.dup = 0,
|
|
.payload = (void*)payload,
|
|
.payloadlen = strlen(payload)
|
|
|
|
};
|
|
|
|
if (MQTTPublish(client, topic, &message) != 0) {
|
|
DEBUG_PRINT("Publish failed\n");
|
|
}
|
|
}
|
|
|
|
// publish retained messages
|
|
void publish_retained(MQTTClient* client, const char* topic,
|
|
const char* payload) {
|
|
MQTTMessage message = {.qos = QOS1,
|
|
.retained = 1,
|
|
.dup = 0,
|
|
.payload = (void*)payload,
|
|
.payloadlen = strlen(payload)};
|
|
|
|
if (MQTTPublish(client, topic, &message) != 0) {
|
|
DEBUG_PRINT("Failed to publish to %s\n", topic);
|
|
}
|
|
}
|
|
|
|
static void publish_device_attributes(MQTTClient* client) {
|
|
char topic[MAX_TOPIC_LENGTH];
|
|
char mac_str[18];
|
|
char* ptr = nodes_list;
|
|
size_t remaining = sizeof(nodes_list);
|
|
|
|
// Device attributes
|
|
snprintf(topic, sizeof(topic), "homie/%s/$homie", NODE_CONFIG.id);
|
|
publish_retained(client, topic, HOMIE_VERSION);
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/$name", NODE_CONFIG.id);
|
|
publish_retained(client, topic, NODE_CONFIG.name);
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/$state", NODE_CONFIG.id);
|
|
publish_retained(client, topic, HOMIE_STATE_READY);
|
|
|
|
// Format MAC address as XX:XX:XX:XX:XX:XX
|
|
snprintf(mac_str, sizeof(mac_str), "%02X:%02X:%02X:%02X:%02X:%02X",
|
|
NODE_CONFIG.mac[0], NODE_CONFIG.mac[1], NODE_CONFIG.mac[2],
|
|
NODE_CONFIG.mac[3], NODE_CONFIG.mac[4], NODE_CONFIG.mac[5]);
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/$mac", NODE_CONFIG.id);
|
|
publish_retained(client, topic, mac_str);
|
|
|
|
ptr = nodes_list;
|
|
*ptr = '\0';
|
|
// add rs485 devices
|
|
for (int i = 0; i < RS485_DEVICE_COUNT; i++) {
|
|
if (i > 0 && remaining > 1) {
|
|
*ptr++ = ',';
|
|
remaining--;
|
|
}
|
|
size_t len = strlen(RS485_DEVICES[i].name);
|
|
if (len >= remaining) break;
|
|
memcpy(ptr, RS485_DEVICES[i].name, len);
|
|
ptr += len;
|
|
remaining -= len;
|
|
}
|
|
*ptr = '\0';
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/$nodes", NODE_CONFIG.id);
|
|
publish_retained(client, topic, nodes_list);
|
|
|
|
// loc attribute
|
|
if (NODE_CONFIG.location[0] != '\0') {
|
|
snprintf(topic, sizeof(topic), "homie/%s/$location", NODE_CONFIG.id);
|
|
publish_retained(client, topic, NODE_CONFIG.location);
|
|
}
|
|
}
|
|
|
|
static void publish_rs485_node_attributes(MQTTClient* client,
|
|
const rs485_device_t* device) {
|
|
char topic[MAX_TOPIC_LENGTH];
|
|
|
|
// Node base attributes
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/$name", NODE_CONFIG.id,
|
|
device->name);
|
|
publish_retained(client, topic, device->name);
|
|
|
|
// Set properties based on device type
|
|
switch (device->type) {
|
|
case DEVICE_RELAY:
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id,
|
|
device->name);
|
|
publish_retained(client, topic, "state");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$name", NODE_CONFIG.id,
|
|
device->name);
|
|
publish_retained(client, topic, "State");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$datatype",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "boolean");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$settable",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "true");
|
|
break;
|
|
|
|
case DEVICE_SOIL_SENSOR:
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id,
|
|
device->name);
|
|
publish_retained(client, topic, "moisture,temperature");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$name",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "Moisture Level");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$datatype",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "integer");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$unit",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "%");
|
|
|
|
// Temperature property
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$name",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "Temperature");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$datatype",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "float");
|
|
|
|
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$unit",
|
|
NODE_CONFIG.id, device->name);
|
|
publish_retained(client, topic, "°C");
|
|
break;
|
|
case DEVICE_THERMOMETER:
|
|
// TODO
|
|
DEBUG_PRINT("not implemented\n");
|
|
}
|
|
}
|
|
|
|
// Initialize MQTT with Homie discovery
|
|
void mqtt_init(mqtt_state_t* state) {
|
|
state->opts.clientid = (char*)NODE_CONFIG.id; // Use node ID as client ID
|
|
state->opts.username = "";
|
|
state->opts.password = "";
|
|
state->opts.qos = QOS1;
|
|
state->last_reconnect = 0;
|
|
state->last_yield = 0;
|
|
state->is_connected = false;
|
|
}
|
|
|
|
// Find device by name and return its index
|
|
static int8_t find_device_index(const char* name) {
|
|
for (uint8_t i = 0; i < RS485_DEVICE_COUNT; i++) {
|
|
if (strcmp(RS485_DEVICES[i].name, name) == 0) {
|
|
return i;
|
|
}
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
void message_arrived(MessageData* md) {
|
|
if (!md || !md->message || !md->topicName) {
|
|
DEBUG_PRINT("Error: MessageData is NULL.\n");
|
|
return;
|
|
}
|
|
|
|
char device_name[16];
|
|
char property[16];
|
|
uint8_t is_set;
|
|
|
|
if (!parse_homie_topic(md->topicName->lenstring.data,
|
|
md->topicName->lenstring.len, device_name,
|
|
sizeof(device_name), property, sizeof(property),
|
|
&is_set)) {
|
|
DEBUG_PRINT("Failed to parse topic\n");
|
|
return;
|
|
}
|
|
|
|
// For set operations, parse value
|
|
if (is_set && md->message->payloadlen > 0) {
|
|
uint16_t value = 0;
|
|
int8_t idx = find_device_index(device_name);
|
|
if (idx < 0) return;
|
|
|
|
if (RS485_DEVICES[idx].type == DEVICE_RELAY) {
|
|
if (md->message->payloadlen != 16) return;
|
|
char* str = (char*)md->message->payload;
|
|
for (int i = 0; i < 16; i++) {
|
|
value = (value << 1) | (str[i] - '0');
|
|
}
|
|
} else {
|
|
// todo:
|
|
DEBUG_PRINT("not implemented\n");
|
|
}
|
|
|
|
modbus_handler_send_request(idx, property, is_set, value);
|
|
}
|
|
}
|
|
|
|
void mqtt_process(mqtt_state_t* state) {
|
|
uint32_t now = millis();
|
|
int rc;
|
|
|
|
if (!state->is_connected) {
|
|
if (now - state->last_reconnect >= MQTT_RECONNECT_INTERVAL) {
|
|
rc = setup_mqtt_client(&state->network, &state->opts, &state->client);
|
|
|
|
if (rc == SUCCESS) {
|
|
state->is_connected = true;
|
|
|
|
if (!state->discovery_published) {
|
|
publish_device_attributes(&state->client);
|
|
|
|
for (int i = 0; i < RS485_DEVICE_COUNT; i++) {
|
|
publish_rs485_node_attributes(&state->client, &RS485_DEVICES[i]);
|
|
}
|
|
|
|
// with onewire we can discover new devices on the bus during runtime
|
|
// is it worth implementing?
|
|
onewire_temp_publish_discovery(&state->client, NODE_CONFIG.id);
|
|
|
|
state->discovery_published = 1;
|
|
}
|
|
|
|
char sub_topic[MAX_TOPIC_LENGTH];
|
|
snprintf(sub_topic, sizeof(sub_topic), "homie/%s/+/+/set",
|
|
NODE_CONFIG.id);
|
|
|
|
rc = subscribe_to_topic(&state->client, sub_topic, QOS1,
|
|
message_arrived);
|
|
if (rc != SUCCESS) {
|
|
state->is_connected = false;
|
|
}
|
|
}
|
|
state->last_reconnect = now;
|
|
}
|
|
led_status_set(LED_STATE_BUSY);
|
|
|
|
} else if (now - state->last_yield >= MQTT_YIELD_INTERVAL) {
|
|
rc = MQTTYield(&state->client, MQTT_MAX_PACKET_WAIT);
|
|
if (rc != SUCCESS) {
|
|
DEBUG_PRINT("MQTT Yield failed with rc=%d, ping_outstanding=%d\n", rc,
|
|
state->client.ping_outstanding);
|
|
state->is_connected = false;
|
|
state->discovery_published = false;
|
|
}
|
|
state->last_yield = now;
|
|
led_status_set(LED_STATE_ON);
|
|
}
|
|
} |