eventfd в Linux

Written by elwood

При разработке многопоточных приложений часто применяется следующий паттерн для синхронизации потоков : создается массив сигнализирующих объектов и вызывается wait для них всех, который блокирует выполнение потока и ожидает, пока один из этих объектов не даст сигнал. В это время другие потоки что-то делают, и рано или поздно сигнал вызывается. После этого wait завершает ожидание, возвращая индекс сработавшего объекта-события. Далее производятся какие-то действия по обработке этого конкретного события, и wait вызывается снова. В псевдокоде это выглядит примерно так:

event[] events = initializeEvents();
while (true) {
    int index = wait(events, TIMEOUT);
    if (0 == index) {
	// wait finished by timeout
    } else if (-1 == index) {
	// wait finished with error
    } else {
	event signalledEvent = events[index];
	// todo : working with signalled event
 
	if (signalledEvent == EXIT_EVENT) {
	    break;
	}
    }
}

Это очень удобный паттерн, который можно применять как в синхронизации потоков (различные очереди сообщений или просто разные действия), так и в обработке ввода (в Windows функции ожидания можно применить к STD_INPUT_HANDLE, поместив STD_INPUT_HANDLE в один массив с другими объектами-событиями).

И вот мне нужно было сделать аналогичное решение для обработки консольного ввода в Linux.

Была найдена функция poll(), которая работает с массивом файловых дескрипторов. Однако, мьютекс в этот массив засунуть нельзя, а отдельный файл или pipe создавать для этого дела не хочется. Должен же быть некий объект ядра, который будет достаточно легковесным для использования в этом сценарии ! И действительно, такой объект нашелся. Им оказался eventfd – сравнительно недавно появившийся в Linux API (первая версия появилась в ядре 2.6, а последние изменения в API были добавлены в версии 2.6.30).

Итак, как же работает eventfd. Обычный объект eventfd (не-семафорный, то есть не созданный с флагом EFD_SEMAPHORE) работает достаточно просто. Он содержит в себе 64-битный счетчик (изначально равный нулю) и позволяет писать в себя вызовом write() и читать вызовом read(). При записи (поддерживается только запись 64-битного значения) записываемое значение добавляется к счетчику. То есть вызвали 3 раза запись числа 10 – счетчик будет содержать значение 30. При вызове read() это значение будет считано, а счётчик вновь сброшен в ноль. Когда счетчик содержит 0, вызов poll с флагом POLLIN будет ожидать до тех пор, пока кто-то не запишет в eventfd значение. Если же счетчик содержит 1, вызов poll с флагом POLLIN вернёт управление сразу (раз есть, что считывать – eventfd “доступен для чтения”). Причем, так как при вызове poll() счётчик не сбрасывается, повторные вызовы будут тут же возвращать управление до тех пор, пока кто-то не вызовет read() и не сбросит тем самым значение внутреннего счетчика в ноль. Вызов же poll() с флагом POLLOUT будет, как правило, всегда возвращать true для флага POLLOUT (поскольку запись в него в принципе всегда разрешена, за исключением сценариев, при которых eventfd может быть заблокирован из-за другого активного в этот момент писателя).

Ну и так как eventfd представляет собой файловый дескриптор, связанный с объектом ядра, то его нужно закрывать вызовом close().

Итак, чтобы сделать аналогичную схему, нам достаточно простого алгоритма :

int fd = eventfd(0, EFD_CLOEXEC); // пусть это событие сигнализирует нам о продолжении некой работы
int fd2 = eventfd(0, EFD_CLOEXEC); // а это - о необходимости выйти из цикла приёма сообщений
pollfd pollfds[2];
pollfds[0].fd = fd;
pollfds[1].fd = fd2;
pollfds[0].events = POLLIN;
pollfds[1].events = POLLIN;
 
bool exitFlag = false;
while (!exitFlag) {
    int pollResult = poll(pollfds, 2, -1);
    if (pollResult == -1) {
	// handle error
    } else if (pollResult > 0) {
	// сработало одно или несколько событий - но без проверки revents -
	// флагов событий, действительно сработавших - нам не узнать индексы сработавших событий
	for (int i = 0; i < 2; i++) {
	    if (pollfds[i].revents != 0) {
		if (i == 0) {
		    // делаем работу, относящуюся к первому событию
		} else if (i == 1) {
		    exitFlag = true;
		    break;
		}
		// обнуляем внутренний счетчик, чтобы следующий вызов poll снова ожидал записи в eventfd
		uint64_t u;
		read(pollfds[i].fd, &u, sizeof(uint64_t));
	    }
	}
    }
}
 
...
 
// а тут где-то в другом потоке вызываем сигналы вот так
uint64_t u = 1;
write(fd, &u, sizeof(uint64_t));

При необходимости можно усовершенствовать схему, заменив третий аргумент poll с -1 (TIMEOUT_INDEFINITE) на конкретный таймаут и добавив обработку того случая, когда pollResult равен нулю (что как раз соответствует возврату по таймауту).

В общем, мы видим, что в принципе, ничего сложного в синхронизации через файловые дескрипторы нет, и можно легко построить цикл обработки сообщений из других потоков / процессов по аналогии с тем, как это привыкли делать в Windows с помощью объектов ядра Event и функций WaitForSingleObject / WaitForMultipleObjects.

PS. Возможно, производительность этого способа может быть хуже, чем при использовании мьютексов. Поэтому если производительность критична, наверное, стоит задуматься о том, как реализовать аналогичное поведение на POSIX мьютексах. На гитхабе есть проект https://github.com/NeoSmart/PEvents – он как раз про это (правда, там довольно много кода по сравнению с приведённым сниппетом).