chore: debloat (remove platformio), onewire improvements
Squashed commit of the following: commit 5f16309f629b9928d2134b85ae64af69bc3ebbcd Author: kuwoyuki <kuwoyuki@cock.li> Date: Sun Nov 24 22:55:15 2024 +0600 fix: Makefile, improve onewire retries commit 55496a3bda941b52ff349dc75c9c06eb5a37c07d Author: kuwoyuki <kuwoyuki@cock.li> Date: Mon Nov 18 00:41:18 2024 +0600 fix: make onewire validity less strict commit 3428a9bc9792508972ce3e7e4e35a64f047bca10 Author: kuwoyuki <kuwoyuki@cock.li> Date: Sun Nov 17 23:57:55 2024 +0600 chore: rm bins commit 1594e5ed430522b15466c8afa62ff7fb1b28947c Author: kuwoyuki <kuwoyuki@cock.li> Date: Sun Nov 17 23:32:01 2024 +0600 chore: unplatformiofy
This commit is contained in:
386
mqtt_handler.c
Normal file
386
mqtt_handler.c
Normal file
@@ -0,0 +1,386 @@
|
||||
#include "mqtt_handler.h"
|
||||
|
||||
#include <MQTT/MQTTClient.h>
|
||||
#include <socket.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "debug.h"
|
||||
#include "gpio.h"
|
||||
#include "modbus.h"
|
||||
#include "onewire_temp.h"
|
||||
#include "systick.h"
|
||||
|
||||
// MQTT
|
||||
#define MQTT_YIELD_INTERVAL 100 // 100ms between yields in main loop
|
||||
#define MQTT_MAX_PACKET_WAIT 20 // Only wait up to 20ms for packet processing
|
||||
#define MQTT_RECONNECT_INTERVAL 5000 // 5 seconds between reconnection attempts
|
||||
|
||||
// Homie convention constants
|
||||
#define HOMIE_VERSION "4.0"
|
||||
#define HOMIE_STATE_READY "ready"
|
||||
#define HOMIE_STATE_LOST "lost"
|
||||
#define HOMIE_STATE_SLEEPING "sleeping"
|
||||
#define HOMIE_STATE_DISCONNECTED "disconnected"
|
||||
|
||||
// nodes list buffer
|
||||
char nodes_list[MAX_PAYLOAD_LENGTH];
|
||||
|
||||
// Parse Homie topic format: homie/node-id/device-name/property/[set|get]
|
||||
static bool parse_homie_topic(const char* topic, size_t topic_len,
|
||||
char* device_name, size_t name_max,
|
||||
char* property, size_t prop_max,
|
||||
uint8_t* is_set) {
|
||||
const char* segment_start = topic;
|
||||
const char* topic_end = topic + topic_len;
|
||||
uint8_t segment = 0;
|
||||
|
||||
// Skip first three segments (homie/node-id/device-name/)
|
||||
while (segment < 3 && segment_start < topic_end) {
|
||||
const char* slash = memchr(segment_start, '/', topic_end - segment_start);
|
||||
if (!slash) return false;
|
||||
|
||||
if (segment == 2) {
|
||||
size_t len = slash - segment_start;
|
||||
if (len >= name_max) return false;
|
||||
memcpy(device_name, segment_start, len);
|
||||
device_name[len] = '\0';
|
||||
}
|
||||
|
||||
segment_start = slash + 1;
|
||||
segment++;
|
||||
}
|
||||
|
||||
const char* slash = memchr(segment_start, '/', topic_end - segment_start);
|
||||
if (!slash) return false;
|
||||
|
||||
size_t len = slash - segment_start;
|
||||
if (len >= prop_max) return false;
|
||||
memcpy(property, segment_start, len);
|
||||
property[len] = '\0';
|
||||
|
||||
segment_start = slash + 1;
|
||||
if (segment_start >= topic_end) return false; // Missing set/get
|
||||
|
||||
*is_set = (*segment_start == 's');
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// MQTT client
|
||||
int setup_mqtt_client(Network* network, ch32_mqtt_options_t* opts,
|
||||
MQTTClient* client) {
|
||||
static unsigned char tx_buffer[MQTT_TX_BUFFER_SIZE];
|
||||
static unsigned char rx_buffer[MQTT_RX_BUFFER_SIZE];
|
||||
int rc;
|
||||
uint8_t target_ip[] = MQTT_SERVER_IP;
|
||||
|
||||
// Initialize network
|
||||
NewNetwork(network, TCP_SOCKET);
|
||||
rc = ConnectNetwork(network, target_ip, MQTT_PORT);
|
||||
if (rc != SOCK_OK) {
|
||||
DEBUG_PRINT("Network connection failed: %d\n", rc);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Initialize the MQTT client
|
||||
MQTTClientInit(client, network, MQTT_COMMAND_TIMEOUT_MS, tx_buffer,
|
||||
sizeof(tx_buffer), rx_buffer, sizeof(rx_buffer));
|
||||
|
||||
// Setup MQTT connection data
|
||||
MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer;
|
||||
|
||||
// Configure Last Will and Testament
|
||||
char will_topic[MAX_TOPIC_LENGTH];
|
||||
snprintf(will_topic, sizeof(will_topic), "homie/%s/$state", NODE_CONFIG.id);
|
||||
|
||||
connect_data.willFlag = 1; // Enable LWT
|
||||
connect_data.will =
|
||||
(MQTTPacket_willOptions)MQTTPacket_willOptions_initializer;
|
||||
connect_data.will.topicName.cstring = will_topic;
|
||||
connect_data.will.message.cstring = HOMIE_STATE_LOST;
|
||||
connect_data.will.retained = 1;
|
||||
connect_data.will.qos = QOS1;
|
||||
|
||||
// rest
|
||||
connect_data.MQTTVersion = 3;
|
||||
connect_data.clientID.cstring = opts->clientid;
|
||||
connect_data.username.cstring = opts->username;
|
||||
connect_data.password.cstring = opts->password;
|
||||
connect_data.keepAliveInterval = MQTT_KEEP_ALIVE_INTERVAL;
|
||||
connect_data.cleansession = 1;
|
||||
|
||||
// Connect to MQTT broker
|
||||
rc = MQTTConnect(client, &connect_data);
|
||||
if (rc != 0) {
|
||||
DEBUG_PRINT("Failed to connect: %d\n", rc);
|
||||
return -2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int subscribe_to_topic(MQTTClient* client, const char* topic, enum QoS qos,
|
||||
messageHandler message_callback) {
|
||||
int rc = MQTTSubscribe(client, topic, qos, message_callback);
|
||||
if (rc != 0) {
|
||||
DEBUG_PRINT("Failed to subscribe to %s: %d\n", topic, rc);
|
||||
return rc;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void publish_message(MQTTClient* client, const char* payload,
|
||||
const char* topic) {
|
||||
MQTTMessage message = {.qos = QOS0,
|
||||
.retained = 0,
|
||||
.dup = 0,
|
||||
.payload = (void*)payload,
|
||||
.payloadlen = strlen(payload)
|
||||
|
||||
};
|
||||
|
||||
if (MQTTPublish(client, topic, &message) != 0) {
|
||||
DEBUG_PRINT("Publish failed\n");
|
||||
}
|
||||
}
|
||||
|
||||
// publish retained messages
|
||||
void publish_retained(MQTTClient* client, const char* topic,
|
||||
const char* payload) {
|
||||
MQTTMessage message = {.qos = QOS1,
|
||||
.retained = 1,
|
||||
.dup = 0,
|
||||
.payload = (void*)payload,
|
||||
.payloadlen = strlen(payload)};
|
||||
|
||||
if (MQTTPublish(client, topic, &message) != 0) {
|
||||
DEBUG_PRINT("Failed to publish to %s\n", topic);
|
||||
}
|
||||
}
|
||||
|
||||
static void publish_device_attributes(MQTTClient* client) {
|
||||
char topic[MAX_TOPIC_LENGTH];
|
||||
char mac_str[18];
|
||||
char* ptr = nodes_list;
|
||||
size_t remaining = sizeof(nodes_list);
|
||||
|
||||
// Device attributes
|
||||
snprintf(topic, sizeof(topic), "homie/%s/$homie", NODE_CONFIG.id);
|
||||
publish_retained(client, topic, HOMIE_VERSION);
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/$name", NODE_CONFIG.id);
|
||||
publish_retained(client, topic, NODE_CONFIG.name);
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/$state", NODE_CONFIG.id);
|
||||
publish_retained(client, topic, HOMIE_STATE_READY);
|
||||
|
||||
// Format MAC address as XX:XX:XX:XX:XX:XX
|
||||
snprintf(mac_str, sizeof(mac_str), "%02X:%02X:%02X:%02X:%02X:%02X",
|
||||
NODE_CONFIG.mac[0], NODE_CONFIG.mac[1], NODE_CONFIG.mac[2],
|
||||
NODE_CONFIG.mac[3], NODE_CONFIG.mac[4], NODE_CONFIG.mac[5]);
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/$mac", NODE_CONFIG.id);
|
||||
publish_retained(client, topic, mac_str);
|
||||
|
||||
ptr = nodes_list;
|
||||
*ptr = '\0';
|
||||
// add rs485 devices
|
||||
for (int i = 0; i < RS485_DEVICE_COUNT; i++) {
|
||||
if (i > 0 && remaining > 1) {
|
||||
*ptr++ = ',';
|
||||
remaining--;
|
||||
}
|
||||
size_t len = strlen(RS485_DEVICES[i].name);
|
||||
if (len >= remaining) break;
|
||||
memcpy(ptr, RS485_DEVICES[i].name, len);
|
||||
ptr += len;
|
||||
remaining -= len;
|
||||
}
|
||||
*ptr = '\0';
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/$nodes", NODE_CONFIG.id);
|
||||
publish_retained(client, topic, nodes_list);
|
||||
|
||||
// loc attribute
|
||||
if (NODE_CONFIG.location[0] != '\0') {
|
||||
snprintf(topic, sizeof(topic), "homie/%s/$location", NODE_CONFIG.id);
|
||||
publish_retained(client, topic, NODE_CONFIG.location);
|
||||
}
|
||||
}
|
||||
|
||||
static void publish_rs485_node_attributes(MQTTClient* client,
|
||||
const rs485_device_t* device) {
|
||||
char topic[MAX_TOPIC_LENGTH];
|
||||
|
||||
// Node base attributes
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/$name", NODE_CONFIG.id,
|
||||
device->name);
|
||||
publish_retained(client, topic, device->name);
|
||||
|
||||
// Set properties based on device type
|
||||
switch (device->type) {
|
||||
case DEVICE_RELAY:
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id,
|
||||
device->name);
|
||||
publish_retained(client, topic, "state");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$name", NODE_CONFIG.id,
|
||||
device->name);
|
||||
publish_retained(client, topic, "State");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$datatype",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "boolean");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/state/$settable",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "true");
|
||||
break;
|
||||
|
||||
case DEVICE_SOIL_SENSOR:
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id,
|
||||
device->name);
|
||||
publish_retained(client, topic, "moisture,temperature");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$name",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "Moisture Level");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$datatype",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "integer");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$unit",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "%");
|
||||
|
||||
// Temperature property
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$name",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "Temperature");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$datatype",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "float");
|
||||
|
||||
snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$unit",
|
||||
NODE_CONFIG.id, device->name);
|
||||
publish_retained(client, topic, "°C");
|
||||
break;
|
||||
case DEVICE_THERMOMETER:
|
||||
// TODO
|
||||
DEBUG_PRINT("not implemented\n");
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize MQTT with Homie discovery
|
||||
void mqtt_init(mqtt_state_t* state) {
|
||||
state->opts.clientid = (char*)NODE_CONFIG.id; // Use node ID as client ID
|
||||
state->opts.username = "";
|
||||
state->opts.password = "";
|
||||
state->opts.qos = QOS1;
|
||||
state->last_reconnect = 0;
|
||||
state->last_yield = 0;
|
||||
state->is_connected = false;
|
||||
}
|
||||
|
||||
// Find device by name and return its index
|
||||
static int8_t find_device_index(const char* name) {
|
||||
for (uint8_t i = 0; i < RS485_DEVICE_COUNT; i++) {
|
||||
if (strcmp(RS485_DEVICES[i].name, name) == 0) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
void message_arrived(MessageData* md) {
|
||||
if (!md || !md->message || !md->topicName) {
|
||||
DEBUG_PRINT("Error: MessageData is NULL.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
char device_name[16];
|
||||
char property[16];
|
||||
uint8_t is_set;
|
||||
|
||||
if (!parse_homie_topic(md->topicName->lenstring.data,
|
||||
md->topicName->lenstring.len, device_name,
|
||||
sizeof(device_name), property, sizeof(property),
|
||||
&is_set)) {
|
||||
DEBUG_PRINT("Failed to parse topic\n");
|
||||
return;
|
||||
}
|
||||
|
||||
// For set operations, parse value
|
||||
if (is_set && md->message->payloadlen > 0) {
|
||||
uint16_t value = 0;
|
||||
int8_t idx = find_device_index(device_name);
|
||||
if (idx < 0) return;
|
||||
|
||||
if (RS485_DEVICES[idx].type == DEVICE_RELAY) {
|
||||
if (md->message->payloadlen != 16) return;
|
||||
char* str = (char*)md->message->payload;
|
||||
for (int i = 0; i < 16; i++) {
|
||||
value = (value << 1) | (str[i] - '0');
|
||||
}
|
||||
} else {
|
||||
// todo:
|
||||
DEBUG_PRINT("not implemented\n");
|
||||
}
|
||||
|
||||
modbus_handler_send_request(idx, property, is_set, value);
|
||||
}
|
||||
}
|
||||
|
||||
void mqtt_process(mqtt_state_t* state) {
|
||||
uint32_t now = millis();
|
||||
int rc;
|
||||
|
||||
if (!state->is_connected) {
|
||||
if (now - state->last_reconnect >= MQTT_RECONNECT_INTERVAL) {
|
||||
rc = setup_mqtt_client(&state->network, &state->opts, &state->client);
|
||||
|
||||
if (rc == SUCCESS) {
|
||||
state->is_connected = true;
|
||||
|
||||
if (!state->discovery_published) {
|
||||
publish_device_attributes(&state->client);
|
||||
|
||||
for (int i = 0; i < RS485_DEVICE_COUNT; i++) {
|
||||
publish_rs485_node_attributes(&state->client, &RS485_DEVICES[i]);
|
||||
}
|
||||
|
||||
// with onewire we can discover new devices on the bus during runtime
|
||||
// is it worth implementing?
|
||||
onewire_temp_publish_discovery(&state->client, NODE_CONFIG.id);
|
||||
|
||||
state->discovery_published = 1;
|
||||
}
|
||||
|
||||
char sub_topic[MAX_TOPIC_LENGTH];
|
||||
snprintf(sub_topic, sizeof(sub_topic), "homie/%s/+/+/set",
|
||||
NODE_CONFIG.id);
|
||||
|
||||
rc = subscribe_to_topic(&state->client, sub_topic, QOS1,
|
||||
message_arrived);
|
||||
if (rc != SUCCESS) {
|
||||
state->is_connected = false;
|
||||
}
|
||||
}
|
||||
state->last_reconnect = now;
|
||||
}
|
||||
led_status_set(LED_STATE_BUSY);
|
||||
|
||||
} else if (now - state->last_yield >= MQTT_YIELD_INTERVAL) {
|
||||
rc = MQTTYield(&state->client, MQTT_MAX_PACKET_WAIT);
|
||||
if (rc != SUCCESS) {
|
||||
DEBUG_PRINT("MQTT Yield failed with rc=%d, ping_outstanding=%d\n", rc,
|
||||
state->client.ping_outstanding);
|
||||
state->is_connected = false;
|
||||
state->discovery_published = false;
|
||||
}
|
||||
state->last_yield = now;
|
||||
led_status_set(LED_STATE_ON);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user