#include "mqtt_handler.h" #include #include #include #include "debug.h" #include "gpio.h" #include "modbus.h" #include "onewire_temp.h" #include "systick.h" // MQTT #define MQTT_YIELD_INTERVAL 100 // 100ms between yields in main loop #define MQTT_MAX_PACKET_WAIT 20 // Only wait up to 20ms for packet processing #define MQTT_RECONNECT_INTERVAL 5000 // 5 seconds between reconnection attempts // Homie convention constants #define HOMIE_VERSION "4.0" #define HOMIE_STATE_READY "ready" #define HOMIE_STATE_LOST "lost" #define HOMIE_STATE_SLEEPING "sleeping" #define HOMIE_STATE_DISCONNECTED "disconnected" // nodes list buffer char nodes_list[MAX_PAYLOAD_LENGTH]; // Parse Homie topic format: homie/node-id/device-name/property/[set|get] static bool parse_homie_topic(const char* topic, size_t topic_len, char* device_name, size_t name_max, char* property, size_t prop_max, uint8_t* is_set) { const char* segment_start = topic; const char* topic_end = topic + topic_len; uint8_t segment = 0; // Skip first three segments (homie/node-id/device-name/) while (segment < 3 && segment_start < topic_end) { const char* slash = memchr(segment_start, '/', topic_end - segment_start); if (!slash) return false; if (segment == 2) { size_t len = slash - segment_start; if (len >= name_max) return false; memcpy(device_name, segment_start, len); device_name[len] = '\0'; } segment_start = slash + 1; segment++; } const char* slash = memchr(segment_start, '/', topic_end - segment_start); if (!slash) return false; size_t len = slash - segment_start; if (len >= prop_max) return false; memcpy(property, segment_start, len); property[len] = '\0'; segment_start = slash + 1; if (segment_start >= topic_end) return false; // Missing set/get *is_set = (*segment_start == 's'); return true; } // MQTT client int setup_mqtt_client(Network* network, ch32_mqtt_options_t* opts, MQTTClient* client) { static unsigned char tx_buffer[MQTT_TX_BUFFER_SIZE]; static unsigned char rx_buffer[MQTT_RX_BUFFER_SIZE]; int rc; uint8_t target_ip[] = MQTT_SERVER_IP; // Initialize network NewNetwork(network, TCP_SOCKET); rc = ConnectNetwork(network, target_ip, MQTT_PORT); if (rc != SOCK_OK) { DEBUG_PRINT("Network connection failed: %d\n", rc); return -1; } // Initialize the MQTT client MQTTClientInit(client, network, MQTT_COMMAND_TIMEOUT_MS, tx_buffer, sizeof(tx_buffer), rx_buffer, sizeof(rx_buffer)); // Setup MQTT connection data MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer; // Configure Last Will and Testament char will_topic[MAX_TOPIC_LENGTH]; snprintf(will_topic, sizeof(will_topic), "homie/%s/$state", NODE_CONFIG.id); connect_data.willFlag = 1; // Enable LWT connect_data.will = (MQTTPacket_willOptions)MQTTPacket_willOptions_initializer; connect_data.will.topicName.cstring = will_topic; connect_data.will.message.cstring = HOMIE_STATE_LOST; connect_data.will.retained = 1; connect_data.will.qos = QOS1; // rest connect_data.MQTTVersion = 3; connect_data.clientID.cstring = opts->clientid; connect_data.username.cstring = opts->username; connect_data.password.cstring = opts->password; connect_data.keepAliveInterval = MQTT_KEEP_ALIVE_INTERVAL; connect_data.cleansession = 1; // Connect to MQTT broker rc = MQTTConnect(client, &connect_data); if (rc != 0) { DEBUG_PRINT("Failed to connect: %d\n", rc); return -2; } return 0; } int subscribe_to_topic(MQTTClient* client, const char* topic, enum QoS qos, messageHandler message_callback) { int rc = MQTTSubscribe(client, topic, qos, message_callback); if (rc != 0) { DEBUG_PRINT("Failed to subscribe to %s: %d\n", topic, rc); return rc; } return 0; } void publish_message(MQTTClient* client, const char* payload, const char* topic) { MQTTMessage message = {.qos = QOS0, .retained = 0, .dup = 0, .payload = (void*)payload, .payloadlen = strlen(payload) }; if (MQTTPublish(client, topic, &message) != 0) { DEBUG_PRINT("Publish failed\n"); } } // publish retained messages void publish_retained(MQTTClient* client, const char* topic, const char* payload) { MQTTMessage message = {.qos = QOS1, .retained = 1, .dup = 0, .payload = (void*)payload, .payloadlen = strlen(payload)}; if (MQTTPublish(client, topic, &message) != 0) { DEBUG_PRINT("Failed to publish to %s\n", topic); } } static void publish_device_attributes(MQTTClient* client) { char topic[MAX_TOPIC_LENGTH]; char mac_str[18]; char* ptr = nodes_list; size_t remaining = sizeof(nodes_list); // Device attributes snprintf(topic, sizeof(topic), "homie/%s/$homie", NODE_CONFIG.id); publish_retained(client, topic, HOMIE_VERSION); snprintf(topic, sizeof(topic), "homie/%s/$name", NODE_CONFIG.id); publish_retained(client, topic, NODE_CONFIG.name); snprintf(topic, sizeof(topic), "homie/%s/$state", NODE_CONFIG.id); publish_retained(client, topic, HOMIE_STATE_READY); // Format MAC address as XX:XX:XX:XX:XX:XX snprintf(mac_str, sizeof(mac_str), "%02X:%02X:%02X:%02X:%02X:%02X", NODE_CONFIG.mac[0], NODE_CONFIG.mac[1], NODE_CONFIG.mac[2], NODE_CONFIG.mac[3], NODE_CONFIG.mac[4], NODE_CONFIG.mac[5]); snprintf(topic, sizeof(topic), "homie/%s/$mac", NODE_CONFIG.id); publish_retained(client, topic, mac_str); ptr = nodes_list; *ptr = '\0'; // add rs485 devices for (int i = 0; i < RS485_DEVICE_COUNT; i++) { if (i > 0 && remaining > 1) { *ptr++ = ','; remaining--; } size_t len = strlen(RS485_DEVICES[i].name); if (len >= remaining) break; memcpy(ptr, RS485_DEVICES[i].name, len); ptr += len; remaining -= len; } *ptr = '\0'; snprintf(topic, sizeof(topic), "homie/%s/$nodes", NODE_CONFIG.id); publish_retained(client, topic, nodes_list); // loc attribute if (NODE_CONFIG.location[0] != '\0') { snprintf(topic, sizeof(topic), "homie/%s/$location", NODE_CONFIG.id); publish_retained(client, topic, NODE_CONFIG.location); } } static void publish_rs485_node_attributes(MQTTClient* client, const rs485_device_t* device) { char topic[MAX_TOPIC_LENGTH]; // Node base attributes snprintf(topic, sizeof(topic), "homie/%s/%s/$name", NODE_CONFIG.id, device->name); publish_retained(client, topic, device->name); // Set properties based on device type switch (device->type) { case DEVICE_RELAY: snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id, device->name); publish_retained(client, topic, "state"); snprintf(topic, sizeof(topic), "homie/%s/%s/state/$name", NODE_CONFIG.id, device->name); publish_retained(client, topic, "State"); snprintf(topic, sizeof(topic), "homie/%s/%s/state/$datatype", NODE_CONFIG.id, device->name); publish_retained(client, topic, "boolean"); snprintf(topic, sizeof(topic), "homie/%s/%s/state/$settable", NODE_CONFIG.id, device->name); publish_retained(client, topic, "true"); break; case DEVICE_SOIL_SENSOR: snprintf(topic, sizeof(topic), "homie/%s/%s/$properties", NODE_CONFIG.id, device->name); publish_retained(client, topic, "moisture,temperature"); snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$name", NODE_CONFIG.id, device->name); publish_retained(client, topic, "Moisture Level"); snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$datatype", NODE_CONFIG.id, device->name); publish_retained(client, topic, "integer"); snprintf(topic, sizeof(topic), "homie/%s/%s/moisture/$unit", NODE_CONFIG.id, device->name); publish_retained(client, topic, "%"); // Temperature property snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$name", NODE_CONFIG.id, device->name); publish_retained(client, topic, "Temperature"); snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$datatype", NODE_CONFIG.id, device->name); publish_retained(client, topic, "float"); snprintf(topic, sizeof(topic), "homie/%s/%s/temperature/$unit", NODE_CONFIG.id, device->name); publish_retained(client, topic, "°C"); break; case DEVICE_THERMOMETER: // TODO DEBUG_PRINT("not implemented\n"); } } // Initialize MQTT with Homie discovery void mqtt_init(mqtt_state_t* state) { state->opts.clientid = (char*)NODE_CONFIG.id; // Use node ID as client ID state->opts.username = ""; state->opts.password = ""; state->opts.qos = QOS1; state->last_reconnect = 0; state->last_yield = 0; state->is_connected = false; } // Find device by name and return its index static int8_t find_device_index(const char* name) { for (uint8_t i = 0; i < RS485_DEVICE_COUNT; i++) { if (strcmp(RS485_DEVICES[i].name, name) == 0) { return i; } } return -1; } void message_arrived(MessageData* md) { if (!md || !md->message || !md->topicName) { DEBUG_PRINT("Error: MessageData is NULL.\n"); return; } char device_name[16]; char property[16]; uint8_t is_set; if (!parse_homie_topic(md->topicName->lenstring.data, md->topicName->lenstring.len, device_name, sizeof(device_name), property, sizeof(property), &is_set)) { DEBUG_PRINT("Failed to parse topic\n"); return; } // For set operations, parse value if (is_set && md->message->payloadlen > 0) { uint16_t value = 0; int8_t idx = find_device_index(device_name); if (idx < 0) return; if (RS485_DEVICES[idx].type == DEVICE_RELAY) { if (md->message->payloadlen != 16) return; char* str = (char*)md->message->payload; for (int i = 0; i < 16; i++) { value = (value << 1) | (str[i] - '0'); } } else { // todo: DEBUG_PRINT("not implemented\n"); } modbus_handler_send_request(idx, property, is_set, value); } } void mqtt_process(mqtt_state_t* state) { uint32_t now = millis(); int rc; if (!state->is_connected) { if (now - state->last_reconnect >= MQTT_RECONNECT_INTERVAL) { rc = setup_mqtt_client(&state->network, &state->opts, &state->client); if (rc == SUCCESS) { state->is_connected = true; if (!state->discovery_published) { publish_device_attributes(&state->client); for (int i = 0; i < RS485_DEVICE_COUNT; i++) { publish_rs485_node_attributes(&state->client, &RS485_DEVICES[i]); } // with onewire we can discover new devices on the bus during runtime // is it worth implementing? onewire_temp_publish_discovery(&state->client, NODE_CONFIG.id); state->discovery_published = 1; } char sub_topic[MAX_TOPIC_LENGTH]; snprintf(sub_topic, sizeof(sub_topic), "homie/%s/+/+/set", NODE_CONFIG.id); rc = subscribe_to_topic(&state->client, sub_topic, QOS1, message_arrived); if (rc != SUCCESS) { state->is_connected = false; } } state->last_reconnect = now; } led_status_set(LED_STATE_BUSY); } else if (now - state->last_yield >= MQTT_YIELD_INTERVAL) { rc = MQTTYield(&state->client, MQTT_MAX_PACKET_WAIT); if (rc != SUCCESS) { DEBUG_PRINT("MQTT Yield failed with rc=%d, ping_outstanding=%d\n", rc, state->client.ping_outstanding); state->is_connected = false; state->discovery_published = false; } state->last_yield = now; led_status_set(LED_STATE_ON); } }