Aller au contenu

Les Queues

Les queues sont des structures de données qui permettent d’échanger des données entre plusieurs tâches. Les queues sont implémentées par des FIFOs (First In First Out) et des mécanismes de synchronisation permettent à plusieurs tâches d’écrire dans une queue et à plusieurs autres de lire une queue.

Les routines de traitement d’interruptions (ISR - Interrupt Service Routine) peuvent elles aussi, avec quelques restrictions, écrire dans des queues. Dans la pratique, c’est même une démarche très courante.

Création d’une queue

Avec FreeRTOS, une queue est créée avec la fonction xQueueCreate().

QueueHandle_t xQueueCreate(UBaseType_t uxQueueLength,
                           UBaseType_t uxItemSize);

Exemple :

struct AMessage {
    char ucMessageID;
    char ucData[ 20 ];
};

void vATask( void *pvParameters ) {
QueueHandle_t xQueue1, xQueue2;

    // Create a queue capable of containing 10 unsigned long values.
    xQueue1 = xQueueCreate( 10, sizeof( unsigned long ) );

    if( xQueue1 == NULL ) {
        // Queue was not created and must not be used.
    }

    // Create a queue capable of containing 10 pointers to AMessage
    // structures.  These are to be queued by pointers as they are
    // relatively large structures.
    xQueue2 = xQueueCreate( 10, sizeof( struct AMessage * ) );

    if( xQueue2 == NULL ) {
        // Queue was not created and must not be used.
    }

    // ... Rest of task code.
 }

Envoi de messages

La macro xQueueSend() permet d’envoyer un message dans une queue :

BaseType_t xQueueSend(QueueHandle_t xQueue,
                      const void* pvItemToQueue,
                      TickType_t xTicksToWait);

Exemple:

struct AMessage {
    char ucMessageID;
    char ucData[20];
} xMessage;

unsigned long ulVar = 10UL;

void vATask(void* pvParameters) {
    QueueHandle_t xQueue1, xQueue2;
    struct AMessage* pxMessage;

    // Create a queue capable of containing 10 unsigned long values.
    xQueue1 = xQueueCreate(10, sizeof(unsigned long));

    // Create a queue capable of containing 10 pointers to AMessage structures.
    // These should be passed by pointer as they contain a lot of data.
    xQueue2 = xQueueCreate(10, sizeof(struct AMessage*));

    /* ... */

    if (xQueue1 != 0) {
        // Send an unsigned long.  Wait for 10 ticks for space to become
        // available if necessary.
        if (xQueueSend(xQueue1, (void*)&ulVar, (TickType_t)10) != pdPASS) {
            // Failed to post the message, even after 10 ticks.
        }
    }

    if (xQueue2 != 0) {
        // Send a pointer to a struct AMessage object.  Don't block if the
        // queue is already full.
        pxMessage = &xMessage;
        xQueueSend(xQueue2, (void*)&pxMessage, (TickType_t)0);
    }

    // ... Rest of task code.
}

Note sur les timeouts

Les timeouts ainsi que la plupart des temps gérés par FreeRTOS sont donnés en tick et non en micro- ou millisecondes. La fréquence des ticks est spécifiée dans le fichier FreeRTOSConfig.h par la constante configTICK_RATE_HZ. Dans notre configuration, la fréquence des ticks est de 1kHz et 1 tick vaut bien une milliseconde, mais ça ne sera pas toujours le cas. Si vous avez besoin d’un temps en milliseconde, c’est une bonne pratique de diviser ce temps par la constante portTICK_PERIOD_MS pour obtenir le nombre de ticks correspondant.

Par exemple, si vous avez besoin d’un délais de 500ms, le nombre de ticks correspondant est de 500 / portTICK_PERIOD_MS.

Cette division fait du sens si la fréquence est de 1000Kz, 500Kz, ou 100Hz, … et que la période est un nombre entier, mais dans la pratique c’est très souvent le cas. Il est rare de voir des fréquences différentes de 1000Hz et si on a besoin de quelque chose de plus rapide, on fait appel aux timers du matériel.

Une autre technique pour convertir des millisecondes en tick consiste à utiliser la macro pdMS_TO_TICKS(ms) ou alors la constante configTICK_RATE_HZ.

Réception de messages

Pour recevoir des données, on utilise la macro xQueueReceive() :

BaseType_t xQueueReceive(QueueHandle_t xQueue,
                         void* pvBuffer,
                         TickType_t xTicksToWait);

exemple :

// Define a variable of type struct AMMessage.  The examples below demonstrate
// how to pass the whole variable through the queue, and as the structure is
// moderately large, also how to pass a reference to the variable through a
// queue.

struct AMessage {
    char ucMessageID;
    char ucData[20];
} xMessage;

// Queue used to send and receive complete struct AMessage structures.
QueueHandle_t xStructQueue = NULL;

// Queue used to send and receive pointers to struct AMessage structures.
QueueHandle_t xPointerQueue = NULL;

void vCreateQueues(void) {
    xMessage.ucMessageID = 0xab;
    memset(&(xMessage.ucData), 0x12, 20);

    // Create the queue used to send complete struct AMessage structures.  This
    // can also be created after the schedule starts, but care must be task to
    // ensure nothing uses the queue until after it has been created.
    xStructQueue = xQueueCreate(
        // The number of items the queue can hold.
        10,
        // Size of each item is big enough to hold the
        // whole structure.
        sizeof(xMessage));

    // Create the queue used to send pointers to struct AMessage structures.
    xPointerQueue = xQueueCreate(
        // The number of items the queue can hold.
        10,
        // Size of each item is big enough to hold only a
        // pointer.
        sizeof(&xMessage));

    if ((xStructQueue == NULL) || (xPointerQueue == NULL)) {
        // One or more queues were not created successfully as there was not
        // enough heap memory available.  Handle the error here.  Queues can
        // also be created statically.
    }
}

// Task that writes to the queues.
void vATask(void* pvParameters) {
    struct AMessage* pxPointerToxMessage;

    // Send the entire structure to the queue created to hold 10 structures. */
    xQueueSend(  // The handle of the queue.
        xStructQueue,
        // The address of the xMessage variable.  sizeof(struct AMessage)
        // bytes are copied from here into the queue.
        (void*)&xMessage,
        // Block time of 0 says don't block if the queue is already full.
        // Check the value returned by xQueueSend() to know if the message
        // was sent to the queue successfully.
        (TickType_t)0);

    // Store the address of the xMessage variable in a pointer variable.
    pxPointerToxMessage =
        &xMessage

            // Send the address of xMessage to the queue created to hold 10
            // pointers.
            xQueueSend(  // The handle of the queue.
                xPointerQueue,
                // The address of the variable that holds the address
                // of xMessage. sizeof( &xMessage ) bytes are copied
                // from here into the queue. As the variable holds the
                // address of xMessage it is the address of xMessage
                // that is copied into the queue.
                (void*)&pxPointerToxMessage,
                (TickType_t)0);

    // ... Rest of task code goes here.
}

// Task that reads from the queues.
void vADifferentTask(void* pvParameters) {
    struct AMessage xRxedStructure, *pxRxedPointer;

    if (xStructQueue != NULL) {
        // Receive a message from the created queue to hold complex struct
        // AMessage structure.  Block for 10 ticks if a message is not
        // immediately available. The value is read into a struct AMessage
        // variable, so after calling xQueueReceive() xRxedStructure will hold a
        // copy of xMessage.
        if (xQueueReceive(xStructQueue, &(xRxedStructure), (TickType_t)10) ==
            pdPASS) {
            // xRxedStructure now contains a copy of xMessage.
        }
    }

    if (xPointerQueue != NULL) {
        // Receive a message from the created queue to hold pointers.  Block for
        // 10 ticks if a message is not immediately available.  The value is
        // read into a pointer variable, and as the value received is the
        // address of the xMessage variable, after this call pxRxedPointer will
        // point to xMessage.
        if (xQueueReceive(xPointerQueue, &(pxRxedPointer), (TickType_t)10) ==
            pdPASS) {
            // *pxRxedPointer now points to xMessage.
        }
    }

    // ... Rest of task code goes here.
}

Exemple 1 avec des petits messages

Si les messages sont petits (jusqu’à 8 Bytes), on peut sans autre passer les messages directement dans la queue :

const int kQueueSize = 10;

struct SmallItem {
    uint32_t value;
};

xQueueHandle smallItemsQueue;

void Producer(void* params) {
    SmallItem item = {0};
    while (1) {
        printf("Sending %lu\n", item.value);
        xQueueSend(smallItemsQueue, &item, portMAX_DELAY);
        item.value++;
        vTaskDelay(pdMS_TO_TICKS(1000));
    }
}

void Consumer(void* params) {
    SmallItem item;
    while (1) {
        xQueueReceive(smallItemsQueue, &item, portMAX_DELAY);
        printf("Received %lu\n", item.value);
    }
}

int main(void) {
    osKernelInitialize();
    smallItemsQueue  = xQueueCreate(kQueueSize, sizeof(SmallItem));
    xTaskCreate(Producer, "Producer", 1024, NULL, osPriorityNormal, NULL);
    xTaskCreate(Consumer, "Consumer", 1024, NULL, osPriorityNormal, NULL);
    osKernelStart();
}

Exemple 2 avec de gros messages

Si on a de plus gros messages, c’est possible de copier ces messages dans la queue, mais ça ne sera pas efficace, car ces copies de messages prennent beaucoup de temps:

const int kQueueSize = 10;

struct BigItem {
    char message[100];
};

xQueueHandle bigItemsQueue;

void Producer(void* params) {
    BigItem item = {0};
    uint32_t nr  = 0;
    while (1) {
        sprintf(item.message, "Message %lu", nr++);
        printf("Sending %s\n", item.message);
        xQueueSend(bigItemsQueue, &item, portMAX_DELAY); // Expensive copy
        vTaskDelay(pdMS_TO_TICKS(1000));
    }
}

void Consumer(void* params) {
    BigItem item;
    while (1) {
        xQueueReceive(bigItemsQueue, &item, portMAX_DELAY); // Expensive copy
        printf("Received %s\n", item.message);
    }
}

int main(void) {
    osKernelInitialize();
    bigItemsQueue    = xQueueCreate(kQueueSize, sizeof(BigItem));
    xTaskCreate(Producer, "Producer", 1024, NULL, osPriorityNormal, NULL);
    xTaskCreate(Consumer, "Consumer", 1024, NULL, osPriorityNormal, NULL);
    osKernelStart();
}

Exemple 3 avec des pointeurs

Pour résoudre le problème des copies, on peut utiliser des pointeurs. Mais attention, le code ci-dessous est faux! Dans la procédure Producer, nous n’avons qu’un seul BigItem et nous passons toujours la même adresse dans la queue.

// WARNING : WRONG CODE

const int kQueueSize = 10;

struct BigItem {
    char message[100];
};

xQueueHandle bigItemsPtrQueue;

void Producer(void* params) {
    BigItem item     = {0};
    BigItem* itemPtr = &item;
    uint32_t nr      = 0;
    while (1) {
        sprintf(item.message, "Message %lu", nr++);
        printf("Sending %s\n", item.message);
        xQueueSend(bigItemsPtrQueue, &itemPtr, portMAX_DELAY); // WRONG !
        vTaskDelay(pdMS_TO_TICKS(1000));
    }
}

void Consumer(void* params) {
    BigItem* itemPtr;
    while (1) {
        xQueueReceive(bigItemsPtrQueue, &itemPtr, portMAX_DELAY); // WRONG !!
        printf("Received %s\n", itemPtr->message);
    }
}

int main(void) {
    osKernelInitialize();
    bigItemsPtrQueue = xQueueCreate(kQueueSize, sizeof(BigItem*));
    xTaskCreate(Producer, "Producer", 1024, NULL, osPriorityNormal, NULL);
    xTaskCreate(Consumer, "Consumer", 1024, NULL, osPriorityNormal, NULL);
    osKernelStart();
}

Exemple 4 avec des pointeurs et de l’allocation dynamique

Pour résoudre le problème précédent, nous pouvons allouer dynamiquement chaque message que nous souhaitons envoyer. Cette technique fonctionne, mais l’allocation et la libération de la mémoire coûtent du temps et nous savons que c’est une mauvaise pratique, car la mémoire risque d’être fragmentée :

const int kQueueSize = 10;

struct BigItem {
    char message[100];
};

xQueueHandle bigItemsPtrQueue;

void Producer(void* params) {
    BigItem* itemPtr;
    uint32_t nr = 0;
    while (1) {
        itemPtr = (BigItem*)malloc(sizeof(BigItem)); // Dynamic allocation
        sprintf(itemPtr->message, "Message %lu", nr++);
        printf("Sending %s\n", itemPtr->message);
        xQueueSend(bigItemsPtrQueue, &itemPtr, portMAX_DELAY);
        vTaskDelay(pdMS_TO_TICKS(1000));
    }
}

void Consumer(void* params) {
    BigItem* itemPtr;
    while (1) {
        xQueueReceive(bigItemsPtrQueue, &itemPtr, portMAX_DELAY);
        printf("Received %s\n", itemPtr->message);
        free(itemPtr); // Don't forget the free
    }
}

int main(void) {
    osKernelInitialize();
    bigItemsPtrQueue = xQueueCreate(kQueueSize, sizeof(BigItem*));
    xTaskCreate(Producer, "Producer", 1024, NULL, osPriorityNormal, NULL);
    xTaskCreate(Consumer, "Consumer", 1024, NULL, osPriorityNormal, NULL);
    osKernelStart();
}

Exemple 5 avec une memory pool

La meilleure solution avec de gros messages consiste à utiliser une memory pool. Il s’agit d’un array d’éléments, alloué une seule fois à l’initialisation, et utilisé pour contenir les messages. La memory pool est aussi longue que la taille de la queue plus un1.

const int kQueueSize = 10;

struct BigItem {
    char message[100];
};

xQueueHandle bigItemsPtrQueue;

void Producer(void* params) {
    BigItem* itemsPool = (BigItem*)malloc(sizeof(BigItem) * (kQueueSize + 1));
    int pos            = 0;
    uint32_t nr        = 0;
    while (1) {
        BigItem* itemPtr = &itemsPool[pos];
        pos              = (pos + 1) % (kQueueSize + 1);
        sprintf(itemPtr->message, "Message %lu", nr++);
        printf("Sending %s\n", itemPtr->message);
        xQueueSend(bigItemsPtrQueue, &itemPtr, portMAX_DELAY);
        vTaskDelay(pdMS_TO_TICKS(1000));
    }
}

void Consumer(void* params) {
    BigItem* itemPtr;
    while (1) {
        xQueueReceive(bigItemsPtrQueue, &itemPtr, portMAX_DELAY);
        printf("Received %s\n", itemPtr->message);
    }
}

int main(void) {
    osKernelInitialize();
    bigItemsPtrQueue = xQueueCreate(kQueueSize, sizeof(BigItem*));
    xTaskCreate(Producer, "Producer", 1024, NULL, osPriorityNormal, NULL);
    xTaskCreate(Consumer, "Consumer", 1024, NULL, osPriorityNormal, NULL);
    osKernelStart();
}

  1. La taille de la memory pool est égale à la taille de la queue plus un, car nous devons pouvoir préparer le prochain élément à mettre dans la queue, même si celle-ci est pleine.