Groups | Search | Server Info | Keyboard shortcuts | Login | Register [http] [https] [nntp] [nntps]


Groups > comp.os.linux.development.apps > #340

Blocking queue race condition?

From tomazos <tomazos@gmail.com>
Newsgroups comp.os.linux.development.apps
Subject Blocking queue race condition?
Date 2012-01-04 22:08 -0800
Organization http://groups.google.com
Message-ID <74ec83e6-eafc-440e-bb60-8a1ed6412811@h13g2000vbn.googlegroups.com> (permalink)

Show all headers | View raw


Hey all,

I'm trying to implement a high performance blocking queue backed by a
circular buffer on top of pthreads, semaphore.h and gcc atomic
builtins.  The queue needs to handle multiple simulataneous readers
and writers from different threads.

I've isolated some sort of race condition, and I'm not sure if it's a
faulty assumption about the behavior of some of the atomic operations
and semaphores, or whether my design is fundamentally flawed.

I've extracted and simplified it to the below standalone example.  I
would expect that this program never returns.  It does however return
after a few hundred thousand iterations with corruption detected in
the queue.

In the below example (for exposition) it doesn't actually store
anything, it just sets to 1 a cell that would hold the actual data,
and 0 to represent an empty cell.  There is a counting semaphore
(vacancies) representing the number of vacant cells, and another
counting semaphore (occupants) representing the number of occupied
cells.

Writers do the following:
	(1) decrement vacancies
	(2) atomically get next head position
	(3) write to it
	(4) increment occupants

Readers do the opposite:
	(1) decrement occupants
	(2) atomically get next tail position
	(3) read from it
	(4) increment vacancies

I would expect that given the above, precisely one thread can be
reading or writing any given cell at one time.

Any ideas about why it doesn't work or debugging strategies
appreciated.  Code and output below...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
	sem_t m;
	CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
	void post() { sem_post(&m); }
	void wait() { sem_wait(&m); }
	~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
	unsigned int head; // (head % capacity) is next head position
	unsigned int tail; // (tail % capacity) is next tail position
	CountingSemaphore vacancies; // how many cells are vacant
	CountingSemaphore occupants; // how many cells are occupied

	int cell[QUEUE_CAPACITY]; // cell[x] == 1 if cell x occupied, cell[x]
== 0 if cell x vacant

	BlockingQueue() :
		head(0),
		tail(0),
		vacancies(QUEUE_CAPACITY),
		occupants(0)
	{
		for (size_t i = 0; i < QUEUE_CAPACITY; i++)
			cell[i] = 0;
	}

	// put an item in the queue
	void put()
	{
		vacancies.wait();

		// __sync_fetch_and_add(&head,1) is an atomic post increment, ie head
++
		set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

		occupants.post();
	}

	// take an item from the queue
	void take()
	{
		occupants.wait();

		// __sync_fetch_and_add(&tail,1) is an atomic post increment, ie tail
++
		get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

		vacancies.post();
	}

	// set cell i
	void set(unsigned int i)
	{
		// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
		// swap 1 for 0 or die
		if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
		{
			corrupt("set", i);
			exit(-1);
		}
	}

	// get cell i
	void get(unsigned int i)
	{
		// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
		// swap 0 for 1 or die
		if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
		{
			corrupt("get", i);
			exit(-1);
		}
	}

	// corruption detected
	void corrupt(const char* action, unsigned int i)
	{
		static CountingSemaphore sem(1);
		sem.wait();

		cerr << "corruption detected" << endl;
		cerr << "action = " << action << endl;
		cerr << "i = " << i << endl;
		cerr << "head = " << head << endl;
		cerr << "tail = " << tail << endl;

		for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
			cerr << "cell[" << j << "] = " << cell[j] << endl;
	}
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
	while (true)
		q.put();

	return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
	while (true)
		q.take();

	return 0;
}

int main()
{
	pthread_t id;

	// start some pthreads to run Source function
	for (int i = 0; i < NUM_THREADS; i++)
		if (pthread_create(&id, NULL, &Source, 0))
			abort();

	// start some pthreads to run Sink function
	for (int i = 0; i < NUM_THREADS; i++)
		if (pthread_create(&id, NULL, &Sink, 0))
			abort();

	while (true);
}

Compile the above as follows:
	$ g++ -pthread AboveCode.cpp
	$ ./a.out

The output is different every time, but here is one example:

	corruption detected
	action = get
	i = 6
	head = 122685
	tail = 122685
	cell[0] = 0
	cell[1] = 0
	cell[2] = 1
	cell[3] = 0
	cell[4] = 1
	cell[5] = 0
	cell[6] = 1
	cell[7] = 1

My system is Ubuntu 11.10 on Intel Core 2:
	$ uname -a
	Linux 3.0.0-14-generic #23-Ubuntu SMP Mon Nov 21 20:28:43 UTC 2011
x86_64 x86_64 x86_64 GNU/Linux
	$ cat /proc/cpuinfo | grep Intel
	model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
	$ g++ --version
	g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

Thanks,
Andrew.

Back to comp.os.linux.development.apps | Previous | NextNext in thread | Find similar


Thread

Blocking queue race condition? tomazos <tomazos@gmail.com> - 2012-01-04 22:08 -0800
  Re: Blocking queue race condition? Rainer Weikusat <rweikusat@mssgmbh.com> - 2012-01-05 20:39 +0000
  Re: Blocking queue race condition? tomazos <andrew@tomazos.com> - 2012-01-05 14:01 -0800

csiph-web