Программа-пример Queue
Эта программа, «09 Queue.exe» (см. листинг па рис. 9-2), управляет очередью обраба тываемых элементов данных, используя мьютекс и семафор. Файлы исходного кода и ресурсов этой программы находятся в каталоге 09-Queue на компакт-диске, прилд гасмом к книге. После запуска Queue открывается окно, показанное ниже.
При инициализации Queue создает четыре клиентских и два серверных потока. Каждый клиентский поток засыпает на определенный период времени, а затем поме щает в очередь элемент данных. Когда в очередь ставится новый элемент, содержи мое списка Client Threads обновляется Каждый элемент данных состоит из номера клиентского потока и порядкового номера запроса, выданного этим потоком. Напри мер, первая запись в списке сообщает, что клиентский поток 0 поставил в очередь свой первый запрос. Следующие записи свидетельствуют, что далее свои первые зап росы выдают потоки 1-3, потом поток 0 помещает второй запрос, то же самое дела ют остальные потоки, и все повторяется.
Серверные потоки ничего не делают, пока в очереди не появится хотя бы один элемент данных. Как только он появляется, для его обработки пробуждается один из серверных потоков. Состояние серверных потоков отражается в списке Server Threads Первая запись говорит о том, что первый запрос от клиентского потока 0 обрабаты вается серверным потоком 0, вторая запись — что первый запрос от клиентского потока 1 обрабатывается серверным потоком 1, и т. д.
В этом примере серверные потоки не успевают обрабатывать клиентские запро сы и очередь в конечном счете заполняется до максимума. Я установил максималь ную длину очереди равной 10 элементам, что приводит к быстрому заполнению этой очереди. Кроме того, на четыре клиентских потока приходится лишь два серверных. В итоге очередь полностью заполняется к тому моменту, когда клиентский поток 3 пытается выдать свой пятый запрос.
О'кэй, что делает программа, Вы поняли-, теперь посмотрим — как она это делает (что гораздо интереснее). Очередью управляет С++-класс CQueue:
class CQueue
{
public:
Struct ELEMENT
{
int m_nThreadNum, m_nRequestNum;
// другие элементы данных должны быть определены здесь
};
typedef ELEMENT* PELEMENT;
private:
PELEMENT m_pElements; // массив элементе, подлежащих обработке
int m_nMaxElements; // количество элементов в массиве
HANDLE m_h[2]; // описатели мьютекса и семафора
HANDLE &m_hmtxQ; // ссылка на m_h[0]
HANDLE &rn_hsemNumElemenls; // ссылка на rc_h[1]
public:
COueue(int nMaxElements);
~CQueue();
BOOL Append(PELtMLNT pElement, DWORD dwMilHseconds);
BOOL Remove(PELEMENT pElement, DWORD dwMilliseconds);
};
Открытая структура ELEMENT внутри этого класса определяет, что представляет собой элемент данных, помещаемый в очередь. Его реальное содержимое в данном случае не имеет значения. В этой программе-примере клиентские потоки записыва ют в элемент данных собственный номер и порядковый номер своего очередного запроса, а серверные потоки, обрабатывая запросы, показывают эту информацию в списке. В реальном приложении такая информация вряд ли бы понадобилась.
Что касается закрытых элементов класса, мы имеем т_pElements, который указы вает на массив (фиксированного размера) структур ELEMENT. Эти данные как раз и нужно защищать от одновременного доступа к ним со стороны клиентских и сервер ных потоков. Элемент m_nMaxElements определяет размер массива при создании объекта CQueue. Следующий элемент, m_h, — это массив из двух описателей объек тов ядра. Для корректной защиты элементов данных в очереди нам нужно два объек та ядра: мьютекс и семафор. Эти дня объекта создаются в конструкторе CQueuc; в нем же их описатели помещаются в массив m_h.
Как Вы вскоре увидите, программа периодически вызывает WaitForMultipleObjectS, передавая этой функции адрес массива описателей. Вы также убедитесь, что програм ме время от времени приходится ссылаться только на один из этих описателей. Что бы облегчить чтение кода и его модификацию, я объявил два элемента, каждый из которых содержит ссылку на один из описателей, — m_bmtxQ и m_hsemNumElements. Конструктор CQueue инициализирует эти элементы содержимым m_h[0] и m_h[l] соответственно.
Теперь Вы и сами без труда разберетесь в методах конструктора и деструктора CQueue, поэтому я перейду сразу к методу Append. Этот метод пытается добавить ELEMENT в очередь. Но сначала он должен убедиться, что вызывающему потоку раз решен монопольный доступ к очереди. Для этого метод Append вызывает WaitFor~ SingleObject, передавая ей описатель объекта-мьютекса, m_hmlxQ. Если функция воз вращает WAIT_OBJECT_0, значит, поток получил монопольный доступ к очереди.
Далее метод Append должен попытаться увеличить число элементов в очереди, вызвав функцию ReleaseSemaphore и передав ей счетчик числа освобождений (release count), равный 1. Если вызов ReleaseSemaphore проходит успешно, в очереди еще есть место, и в нее можно поместить новый элемент. К счастью, ReleaseSemapbore возвра щает в переменной lPreviousCount предыдущее количество элементов в очереди. Бла годаря этому Вы точно знаете, в какой элемент массива следует записать новый эле
мент данных. Скопировав элемент в массив очсрсди, функция возвращает управле ние. По окончании этой операции Append вызывает ReleaseMutex, чтобы и другие потоки могли получить доступ к очереди. Остальной код в методе Append отвечает за обработку ошибок и неудачных вызовов.
Теперь посмотрим, как серверный поток вызывает метод Remove для выборки эле мента из очереди. Сначала этот метод должен убедиться, что вызывающий поток по лучил монопольный доступ к очереди и что в ней есть хотя бы один элемент. Разуме ется, серверному потоку нст смысла пробуждаться, если очередь пуста. Поэтому ме- i тод Remove предварительно обращается к WaitForMultipleObjects, передавая ей описа тели мьютекса и семафора. И только после освобождения обоих объектов серверный поток может пробудиться.
Если возвращается WAIT_OBJECT_0, значит, поток получил-монопольный доступ к очереди и в ней есть хотя бы один элемент. В этот момент программа извлекает из массива элемент с индексом 0, а остяльные элементы сдвигает вниз на одну позицию. Это, конечно, не самый эффективный способ реализации очереди, так как требует слишком большого количества операций копирования в памяти, но наша цсль зак лючается лишь в том, чтобы продемонстрировать синхронизацию потоков. По окон чании этих операций вызывается ReleaseMutex, и очередь становится доступной дру гим потокам.
Заметьте, что объект-семафор отслеживает, сколько элементов находится в оче реди. Вы, наверное, сразу же поняли, что это значение увеличивается, когда метод Append вызывает ReleaseSemaphore после добавления нового элемента к очереди. Но как оно уменьшается после удаления элемента из очереди, уже не столь очевидно. Эта операция выполняется вызовом WaitForMultipleObjects из метода Remove. Тут надо вспомнить, что побочный эффект успешного ожидания семафора заключается в уменьшении его счетчика на 1. Очень удобно для нас.
Теперь, когда Вы понимаете, как работает класс CQueue, Вы легко разберетесь в остальном коде этой программы.
Queue