MQTT Task

this module manages the connection to the broker MQTT server. this includes initial connection and retries.

on connecting to a broker, this module subscrives to a command channel.

once connected this task runs its own internal schedule with which to send messages. finaly this module also provides a handler for messages received from the command channel.

Configuration

MQTT_VERBOSE_LOGGING

if set to 1, aditional debug logs will be printed to the console whenever an mqtt event fires (connect, disconnect, message, etc). message data will not be printed.

TOPIC_*

these are the topics that the mqtt task will subscribe to.

#define TOPIC_STATUS "/egym/status"
#define TOPIC_METRICS "/egym/metrics"
#define TOPIC_SESION "/egym/sessions"
#define TOPIC_USER "/egym/user"

Public Functions

mqtt_task_init

currently this function does nothing, but is here as a placeholder for if any initialization is needed in the future.

void mqtt_task_init(void)
{
}

mqtt_task

this is the main function of the mqtt task. it is registered with FreeRTOS extenraly

void mqtt_task(void* arg)
{
    while (true)
    {
        mqtt_tick();
        esp_task_wdt_reset();
        vTaskDelay(100);
    };
}

Private Types

message_event_t

a callback function for the internal message schedule. simply a void function that takes no arguments.

typedef void (*message_event_t)(void);

schedule_t

a struct to hold a callback function and a number n which is used to determine how often the callback function is called.

typedef struct schedule_s
{
    message_event_t event;
    uint32_t n;
} schedule_t;

Private Variables

schedule

the schedule of messages to be sent, units are in 100ms. so status is sent every 10 seconds, metrics every 500ms, session every 500ms, and user every 1 second.

static schedule_t schedule[] = {
    {send_status, 100},
    {send_metric, 5},
    {send_session, 5},
    {send_user, 10},
};

client and server keys

these are the keys used to authenticate with the mqtt server. they are stored in the flash memory of the esp32. the server_cert_pem authenticates the server, and the client_cert_pem authenticates the client (us).

extern const uint8_t client_cert_pem_start[] asm("_binary_esp_crt_start");
extern const uint8_t client_cert_pem_end[] asm("_binary_esp_crt_end");
extern const uint8_t client_key_pem_start[] asm("_binary_esp_key_start");
extern const uint8_t client_key_pem_end[] asm("_binary_esp_key_end");
extern const uint8_t server_cert_pem_start[] asm("_binary_egymCA_crt_start");
extern const uint8_t server_cert_pem_end[] asm("_binary_egymCA_crt_end");

command_channel

stores the name of the topic that the ESP subscribes to for commands. is a null terminated string in ther format "/egym/command/[device_id]" where device_id takes the form regen_[bike_mac], and bike mac is in the format XXXXXXXXXXXX where X is a hex digit, this is the same exact format and string as the bikes mdns hostname, see the mdns module for details.

static char command_channel[64] = {0};

handle

a handle to the esp_mqtt_client_t object provided by the esp32 mqtt library.

static esp_mqtt_client_handle_t handle;

username

the username used to authenticate with the mqtt server.

static const char* username = "esp";

password

the password used to authenticate with the mqtt server.

static const char* password = <obfuscated>;

mqtt_connected

boolean flag to indicate if the mqtt client is connected to the server.

static bool mqtt_connected = false;

config

stores the configuration for the mqtt client. is stored statically in memory because the MQTT library may be restarted at any time due to a connection error. initialised to all 0's but is populated in set_config().

static esp_mqtt_client_config_t config = {0};

short_id_cache

a cache of the short id of the bike. we cache this because we need to send it in every user message, and we have to get it from the flash memory, which is slow.

static uint16_t short_id_chache = 0;

short_id_cache_age

the age of the short id cache in terms of how many messages we have sent the same value for, this is used to determine when to refresh the cache. currently we refresh the cache every 10 messages. GMTODO: make this a config option.

static uint8_t short_id_cache_age = 0;

mqtt_current_tick

the current tick of the mqtt task. used for running the internal message schedule.

static uint32_t mqtt_current_tick = 0;

Private Functions

mqtt_send

helper function, sends a protocol buffer message to the mqtt server. this is also where we check if the mqtt client is connected before sending the message, and serialise the protocol buffer message into a buffer before sending it. argument "fields" is a pointer to a "pb_msgdesc_t" from the protocol buffer library. and are all defined staticaly in messages.pb.h

GMTODO: move buffer off of the stack, will let us reduce the size of the task stack.

static int mqtt_send(const char* channel, const pb_msgdesc_t* fields, const void* message)
{
    if (handle == NULL)
    {
        return -1;
    }
    if(!mqtt_connected)
    {
        return -1;
    }
    uint8_t buffer[256] = {0};
    pb_ostream_t stream = pb_ostream_from_buffer(buffer, sizeof(buffer));
    if (pb_encode(&stream, fields, message))
    {
        return esp_mqtt_client_publish(handle, channel, (const char*)buffer, stream.bytes_written, 0, 0);
    }
    else
    {
        return -1;
    }
}

on_command

callback function for when a message is received on the command channel. decodes the protocol buffer message and then takes the appropriate action based off of the message command.

as we add more commands, this should probably be refactored into a number of command specific callback functions, and a switch statement to call the correct one.

static void on_command(uint8_t* data, size_t len)
{
    MQTT_LOG_VERBOSE("got command of %d \n", len);
    for (int n = 0; n < len; n++)
    {
        MQTT_LOG_VERBOSE("%02X", data[n]);
    }
    MQTT_LOG_VERBOSE("\n");
    Command command = Command_init_zero;
    pb_istream_t stream = pb_istream_from_buffer(data, len);
    if (!pb_decode(&stream, Command_fields, &command))
    {
        MQTT_LOG_VERBOSE("decode error %s\n", stream.errmsg);
        return;
    }
    switch (command.command_id)
    {
        case 0:
            MQTT_LOG_VERBOSE("invalid command\n");
            break;
        case 1:
            MQTT_LOG_VERBOSE("reboot command\n");
            esp_restart();
        case 2:
            MQTT_LOG_VERBOSE("firmware command, depricated, ignroing\n");
            break;
        case 3:
            MQTT_LOG_VERBOSE("psoc command, depreicated, ignoring\n");
            break;
        case 4:
            MQTT_LOG_VERBOSE("got reset command\n");
            session_end();
            session_start();
            break;
        case 5:
            if (!command.has_set_resistance)
            {
                MQTT_LOG_VERBOSE("no resistance on resistance command\n");
                break;
            }
            resistance_count_global = boundi(command.set_resistance.resistance, 0, 100);
            MQTT_LOG_VERBOSE("got resistance command\n");
            break;
        default:
            MQTT_LOG_VERBOSE("unknown command\n");
            break;
    }
}

mqtt_event_handler

event handler registered to the ESP mqtt client library. handles re-subscribing to the command channel on reconnect, setting the mqtt_connected flag, and receiving messages on the command channel.

static void mqtt_event_handler(void* handler_args, esp_event_base_t base, int32_t event_id, void* event_data)
{
    esp_mqtt_event_handle_t event = event_data;
    char buff[256] = {0};
    switch ((esp_mqtt_event_id_t)event_id)
    {
        case MQTT_EVENT_CONNECTED:
            mqtt_connected = true;
            esp_mqtt_client_subscribe(handle, command_channel, 2);
            MQTT_LOG_VERBOSE("MQTT_EVENT_CONNECTED\n");
            break;
        case MQTT_EVENT_DISCONNECTED:
            mqtt_connected = false;
            MQTT_LOG_VERBOSE("MQTT_EVENT_DISCONNECTED\n");
            break;
        case MQTT_EVENT_SUBSCRIBED:
            MQTT_LOG_VERBOSE("MQTT_EVENT_SUBSCRIBED, msg_id=%d\n", event->msg_id);
            break;
        case MQTT_EVENT_UNSUBSCRIBED:
            MQTT_LOG_VERBOSE("MQTT_EVENT_UNSUBSCRIBED, msg_id=%d\n", event->msg_id);
            break;
        case MQTT_EVENT_PUBLISHED:
            break;
        case MQTT_EVENT_DATA:
            memset(buff, 0, sizeof(buff));
            memcpy(buff, event->topic, event->topic_len);
            if (event->topic_len && strcmp(buff, command_channel) == 0)
            {
                memset(buff, 0, sizeof(buff));
                memcpy(buff, event->data, event->data_len);
                on_command((uint8_t*)(buff), event->data_len);
            }
            break;
        case MQTT_EVENT_ERROR:
            MQTT_LOG_VERBOSE("MQTT_EVENT_ERROR\n");
            MQTT_LOG_VERBOSE("error type (%s)", strerror(event->error_handle->error_type));
            break;
        default:
            MQTT_LOG_VERBOSE("Other event id:%d\n", event->event_id);
            break;
    }
}

populate_header

helpder function to fill in the "header" field (sub-message) which is common to all messages, populates the mac address of the ESP32, so messages cannot be mixed up between bikes.

static void populate_header(Header* header)
{
    esp_read_mac(header->mac.bytes, ESP_MAC_BT);
    header->mac.size = 8;
}

set_config

helper function to set the configuration for the mqtt client. most of this is static, but the host ip of the server is dynamic, and is set in this function.

static void set_config()
{
    config.host = server_ip;
    config.client_id = hostname;
    config.port = 8883;
    config.keepalive = 30;
    config.username = username;
    config.password = password;
    config.transport = MQTT_TRANSPORT_OVER_SSL;
    config.skip_cert_common_name_check = true;
    config.cert_pem = (const char*)server_cert_pem_start;
    config.cert_len = server_cert_pem_end - server_cert_pem_start;
    config.client_cert_pem = (const char*)client_cert_pem_start;
    config.client_cert_len = client_cert_pem_end - client_cert_pem_start;
    config.client_key_pem = (const char*)client_key_pem_start;
    config.client_key_len = client_key_pem_end - client_key_pem_start;
}

start_mqtt

helper function to look for an MQTT server and if found, connect to it. this function is called by the mqtt_task() function until a connection is established. once a client object is created, this function stops being called, and the reconnect logic in mqtt_tick is used instead.

static void start_mqtt()
{
    find_broker();
    set_config();
    if(config.host == NULL || strlen(config.host) == 0)
    {
        MQTT_LOG_VERBOSE("no broker found, skipping mqtt");
        vTaskDelay(5000);
        return;
    }
    handle = esp_mqtt_client_init(&config);
    if(handle == NULL)
    {
        MQTT_LOG_VERBOSE("failed to create mqtt client");
        vTaskDelay(5000);
        return;
    }
    esp_mqtt_client_register_event(handle, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
    esp_mqtt_client_start(handle);
    sprintf(command_channel, "/egym/command/%s", hostname);
}

mqtt_tick

main tick function for the mqtt task. this function is called every 100ms by the mqtt_task() function. it is responsible for managing the connection to the mqtt server, and running an internal schedule for sending messages.

if handle isnt set, we try to connect to the mqtt server with start_mqtt().

if wifi is not connected, we wait for it to connect.

if mqtt is not connected, we try to reconnect to the mqtt server.

if all of the above is not the case, we run the internal schedule for sending messages.

static void mqtt_tick()
{
    if (handle == NULL)
    {
        if(wifi_is_connected())
        {
            start_mqtt();
        }
        return;
    }

    if(!wifi_is_connected())
    {
        return;
    }

    if(!mqtt_connected)
    {
        // wait for 5 seconds before trying to reconnect
        vTaskDelay(5000);
        if(!mqtt_connected)
        {
            // check again for the broker ip, in case it has changed
            find_broker();
            esp_mqtt_client_stop(handle);
            esp_mqtt_client_set_uri(handle, server_ip);
            esp_mqtt_client_start(handle);
            sprintf(command_channel, "/egym/command/%s", hostname);
            vTaskDelay(5000);
            return;
        }
    }

    for (size_t n = 0; n < sizeof(schedule) / sizeof(schedule_t); n++)
    {
        schedule_t entry = schedule[n];
        if (mqtt_current_tick % entry.n == 0)
        {
            entry.event();
        }
    }
    mqtt_current_tick++;
}

send_x functions

these have been given their own section because they are all very similar. they handle gathering data for a specific message type into a protocol buffer message, then sending that message to the mqtt server with mqtt_send().

all of the messages are documented in lib comms, and are defined in messages.pb.h, see here for more information.

send_status

sends the status message to the mqtt server. the status message contains information pertanent to the technical status of the bike:

  • firmware version
  • ip address
  • mac of paired BLE device
  • uptime
static void send_status(void)
{
    Status status = Status_init_zero;
    populate_header(&status.header);
    strcpy(status.firmware_version, fw_version);
    strcpy(status.ip_address, current_ip);

    bool found = false;
    // report the first connected device's mac address
    for (int i = 0; i < CONFIG_BT_NIMBLE_MAX_CONNECTIONS; i++)
    {
        if(connected_devices[i].mac[0] != 0)
        {
            memcpy(status.paired_device_mac.bytes, connected_devices[i].mac, 6);
            status.paired_device_mac.size = 6;
            found = true;
            break;
        }
    }
    if(!found)
    {
        memset(status.paired_device_mac.bytes, 0, 6);
        status.paired_device_mac.size = 6;
    }

    status.uptime = xTaskGetTickCount();
    mqtt_send(TOPIC_STATUS, Status_fields, &status);
}

send_metrics

sends the metrics message to the mqtt server. the metrics message contains information pertanent to the current workout session:

  • average power
  • battery state of charge
  • break power (power sent to the resistor)
  • cadence
  • charge power (power sent to the battery)
  • distance
  • gradient
  • instant power
  • resistance
  • resistance change (this is legacy and should be removed)
  • velocity
static void send_metric(void)
{
    Metric _metrics = Metric_init_zero;
    populate_header(&_metrics.header);
    _metrics.average_power = metrics.avg_power;
    _metrics.batt_soc = metrics.bat_soc;
    _metrics.break_power = metrics.break_power;
    _metrics.cadance = metrics.cadence;
    _metrics.charge_power = metrics.battery_power;
    _metrics.distance = metrics.distance;
    _metrics.gradient = metrics.gradient;
    _metrics.instantanious_power = metrics.inst_power;
    _metrics.resistance = metrics.resistance;
    _metrics.resistance_change = 0;
    _metrics.velocity = metrics.velocity;
    mqtt_send(TOPIC_METRICS, Metric_fields, &_metrics);
}

send_session

sends the session message to the mqtt server. the session message contains information pertanent to the current workout session:

  • time elapsed
  • total energy
  • battery energy
static void send_session(void)
{
    Session _session = Session_init_zero;
    populate_header(&_session.header);
    _session.time_elapsed = session.time;
    _session.total_energy = session.energy;
    _session.battery_energy = session.battery;
    mqtt_send(TOPIC_SESION, Session_fields, &_session);
}

send_user

sends the user message to the mqtt server. the user message contains information pertanent to the current bike and rider:

  • bike short id
  • ftp

bike_short_id is populated from the flash memory by the io task periodically. this is because executing a flash read in the mqtt task sometimes causes a deadlock crash.

static void send_user(void)
{
    User user = User_init_zero;
    populate_header(&user.header);
    user.ftp = global_ftp;
    user.bike_short_id = bike_short_id;
    mqtt_send(TOPIC_USER, User_fields, &user);
}