eventfd в Linux
При разработке многопоточных приложений часто применяется следующий паттерн для синхронизации потоков : создается массив сигнализирующих объектов и вызывается wait для них всех, который блокирует выполнение потока и ожидает, пока один из этих объектов не даст сигнал. В это время другие потоки что-то делают, и рано или поздно сигнал вызывается. После этого wait завершает ожидание, возвращая индекс сработавшего объекта-события. Далее производятся какие-то действия по обработке этого конкретного события, и wait вызывается снова. В псевдокоде это выглядит примерно так:
event[] events = initializeEvents(); while (true) { int index = wait(events, TIMEOUT); if (0 == index) { // wait finished by timeout } else if (-1 == index) { // wait finished with error } else { event signalledEvent = events[index]; // todo : working with signalled event if (signalledEvent == EXIT_EVENT) { break; } } } |
Это очень удобный паттерн, который можно применять как в синхронизации потоков (различные очереди сообщений или просто разные действия), так и в обработке ввода (в Windows функции ожидания можно применить к STD_INPUT_HANDLE, поместив STD_INPUT_HANDLE в один массив с другими объектами-событиями).
И вот мне нужно было сделать аналогичное решение для обработки консольного ввода в Linux.
Была найдена функция poll(), которая работает с массивом файловых дескрипторов. Однако, мьютекс в этот массив засунуть нельзя, а отдельный файл или pipe создавать для этого дела не хочется. Должен же быть некий объект ядра, который будет достаточно легковесным для использования в этом сценарии ! И действительно, такой объект нашелся. Им оказался eventfd – сравнительно недавно появившийся в Linux API (первая версия появилась в ядре 2.6, а последние изменения в API были добавлены в версии 2.6.30).
Итак, как же работает eventfd. Обычный объект eventfd (не-семафорный, то есть не созданный с флагом EFD_SEMAPHORE) работает достаточно просто. Он содержит в себе 64-битный счетчик (изначально равный нулю) и позволяет писать в себя вызовом write() и читать вызовом read(). При записи (поддерживается только запись 64-битного значения) записываемое значение добавляется к счетчику. То есть вызвали 3 раза запись числа 10 – счетчик будет содержать значение 30. При вызове read() это значение будет считано, а счётчик вновь сброшен в ноль. Когда счетчик содержит 0, вызов poll с флагом POLLIN будет ожидать до тех пор, пока кто-то не запишет в eventfd значение. Если же счетчик содержит 1, вызов poll с флагом POLLIN вернёт управление сразу (раз есть, что считывать – eventfd “доступен для чтения”). Причем, так как при вызове poll() счётчик не сбрасывается, повторные вызовы будут тут же возвращать управление до тех пор, пока кто-то не вызовет read() и не сбросит тем самым значение внутреннего счетчика в ноль. Вызов же poll() с флагом POLLOUT будет, как правило, всегда возвращать true для флага POLLOUT (поскольку запись в него в принципе всегда разрешена, за исключением сценариев, при которых eventfd может быть заблокирован из-за другого активного в этот момент писателя).
Ну и так как eventfd представляет собой файловый дескриптор, связанный с объектом ядра, то его нужно закрывать вызовом close().
Итак, чтобы сделать аналогичную схему, нам достаточно простого алгоритма :
int fd = eventfd(0, EFD_CLOEXEC); // пусть это событие сигнализирует нам о продолжении некой работы int fd2 = eventfd(0, EFD_CLOEXEC); // а это - о необходимости выйти из цикла приёма сообщений pollfd pollfds[2]; pollfds[0].fd = fd; pollfds[1].fd = fd2; pollfds[0].events = POLLIN; pollfds[1].events = POLLIN; bool exitFlag = false; while (!exitFlag) { int pollResult = poll(pollfds, 2, -1); if (pollResult == -1) { // handle error } else if (pollResult > 0) { // сработало одно или несколько событий - но без проверки revents - // флагов событий, действительно сработавших - нам не узнать индексы сработавших событий for (int i = 0; i < 2; i++) { if (pollfds[i].revents != 0) { if (i == 0) { // делаем работу, относящуюся к первому событию } else if (i == 1) { exitFlag = true; break; } // обнуляем внутренний счетчик, чтобы следующий вызов poll снова ожидал записи в eventfd uint64_t u; read(pollfds[i].fd, &u, sizeof(uint64_t)); } } } } ... // а тут где-то в другом потоке вызываем сигналы вот так uint64_t u = 1; write(fd, &u, sizeof(uint64_t)); |
При необходимости можно усовершенствовать схему, заменив третий аргумент poll с -1 (TIMEOUT_INDEFINITE) на конкретный таймаут и добавив обработку того случая, когда pollResult равен нулю (что как раз соответствует возврату по таймауту).
В общем, мы видим, что в принципе, ничего сложного в синхронизации через файловые дескрипторы нет, и можно легко построить цикл обработки сообщений из других потоков / процессов по аналогии с тем, как это привыкли делать в Windows с помощью объектов ядра Event и функций WaitForSingleObject / WaitForMultipleObjects.
PS. Возможно, производительность этого способа может быть хуже, чем при использовании мьютексов. Поэтому если производительность критична, наверное, стоит задуматься о том, как реализовать аналогичное поведение на POSIX мьютексах. На гитхабе есть проект https://github.com/NeoSmart/PEvents – он как раз про это (правда, там довольно много кода по сравнению с приведённым сниппетом).
3