Blog coding and discussion of coding about JavaScript, PHP, CGI, general web building etc.

Friday, July 29, 2016

Implementing a FIFO mutex in pthreads

Implementing a FIFO mutex in pthreads


I'm trying to implement a binary tree supporting concurrent insertions (which could occur even between nodes), but without having to allocate a global lock or a separate mutex or mutexes for each node. Rather, the quantity of such locks allocated should be on the order of the quantity of threads using the tree.

Consequently, I end up with a type of lock convoy problem. Explained more simply, it's what potentially happens when two or more threads do the following:

  1 for(;;) {  2   lock(mutex)  3   do_stuff  4   unlock(mutex)  5 }  

That is, if Thread#1 executes instructions 4->5->1->2 all in one "cpu burst" then Thread#2 gets starved from execution.

On the other hand, if there was a FIFO-type locking option for mutexes in pthreads, then such a problem could be avoided. So, is there a way to implement FIFO-type mutex locking in pthreads? Can altering thread priorities accomplish this?

Answer by Macmade for Implementing a FIFO mutex in pthreads


You might take a look at the pthread_mutexattr_setprioceiling function.

int pthread_mutexattr_setprioceiling  (      pthread_mutexatt_t * attr,       int prioceiling,      int * oldceiling  );  

From the documentation:

pthread_mutexattr_setprioceiling(3THR) sets the priority ceiling attribute of a mutex attribute object.

attr points to a mutex attribute object created by an earlier call to pthread_mutexattr_init().

prioceiling specifies the priority ceiling of initialized mutexes. The ceiling defines the minimum priority level at which the critical section guarded by the mutex is executed. prioceiling will be within the maximum range of priorities defined by SCHED_FIFO. To avoid priority inversion, prioceiling will be set to a priority higher than or equal to the highest priority of all the threads that might lock the particular mutex.

oldceiling contains the old priority ceiling value.

Answer by David Gelhar for Implementing a FIFO mutex in pthreads


You could do something like this:

  • define a "queued lock" that consists of a free/busy flag plus a linked-list of pthread condition variables. access to the queued_lock is protected by a mutex

  • to lock the queued_lock:

    • seize the mutex
    • check the 'busy' flag
    • if not busy; set busy = true; release mutex; done
    • if busy; create a new condition @ end of queue & wait on it (releasing mutex)
  • to unlock:

    • seize the mutex
    • if no other thread is queued, busy = false; release mutex; done
    • pthread_cond_signal the first waiting condition
    • do not clear the 'busy' flag - ownership is passing to the other thread
    • release mutex
  • when waiting thread unblocked by pthread_cond_signal:

    • remove our condition var from head of queue
    • release mutex

Note that the mutex is locked only while the state of the queued_lock is being altered, not for the whole duration that the queued_lock is held.

Answer by caf for Implementing a FIFO mutex in pthreads


You can implement a fair queuing system where each thread is added to a queue when it blocks, and the first thread on the queue always gets the resource when it becomes available. Such a "fair" ticket lock built on pthreads primitives might look like this:

#include     typedef struct ticket_lock {      pthread_cond_t cond;      pthread_mutex_t mutex;      unsigned long queue_head, queue_tail;  } ticket_lock_t;    #define TICKET_LOCK_INITIALIZER { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER }    void ticket_lock(ticket_lock_t *ticket)  {      unsigned long queue_me;        pthread_mutex_lock(&ticket->mutex);      queue_me = ticket->queue_tail++;      while (queue_me != ticket->queue_head)      {          pthread_cond_wait(&ticket->cond, &ticket->mutex);      }      pthread_mutex_unlock(&ticket->mutex);  }    void ticket_unlock(ticket_lock_t *ticket)  {      pthread_mutex_lock(&ticket->mutex);      ticket->queue_head++;      pthread_cond_broadcast(&ticket->cond);      pthread_mutex_unlock(&ticket->mutex);  }  

Answer by Jens Gustedt for Implementing a FIFO mutex in pthreads


The example as you post it has no solution. Basically you only have one critical section and there is no place for parallelism.

That said, you see that it is important to reduce the period that your threads hold the mutex to a minimum, just a handful of instructions. This is difficult for insertion in a dynamic data structure such as a tree. The conceptually simplest solution is to have one read-write lock per tree node.

If you don't want to have individual locks per tree node you could have one lock structure per level of the tree. I'd experiment with read-write locks for that. You may use just read-locking of the level of the node in hand (plus the next level) when you traverse the tree. Then when you have found the right one to insert lock that level for writing.

Answer by johnnycrash for Implementing a FIFO mutex in pthreads


The solution could be to use atomic operations. No locking, no context switching, no sleeping, and much much faster than mutexes or condition variables. Atomic ops are not the end all solution to everything, but we have created a lot of thread safe versions of common data structures using just atomic ops. They are very fast.

Atomic ops are a series of simple operations like increment, or decrement or assignment that are guaranteed to execute atomically in a multi threaded environment. If two threads hit the op at the same time, the cpu makes sure one thread executes the op at a time. Atomic ops are hardware instructions, so they are fast. "Compare and swap" is very useful for thread safe data structures. in our testing atomic compare and swap is about as fast as 32 bit integer assignment. Maybe 2x as slow. When you consider how much cpu is consumed with mutexes, atomic ops are infinitely faster.

Its not trivial to do rotations to balance your tree with atomic operations, but not impossible. I ran into this requirement in the past and cheated by making a thread safe skiplist since a skiplist can be done real easy with atomic operations. Sorry I can't give you a copy of our code...my company would fire me, but its easy enough to do yourself.

How atomic ops work to make lock free data structures can be visualized by the simple threadsafe linked list example. To add an item to a global linked list (_pHead) without using locks. First save a copy of _pHead, pOld. I think of these copies as "the state of the world" when executing concurrent ops. Next create a new node, pNew, and set its pNext to the COPY. Then use atomic "compare and swap" to change _pHead to pNew ONLY IF pHead IS STILL pOld. The atomic op will succeed only if _pHead hasn't changed. If it fails, loop back to get a copy of the new _pHead and repeat.

If the op succeeds, the rest of the world will now see a new head. If a thread got the old head a nanosecond before, that thread wont see the new item, but the list will still be safe to iterate through. Since we preset the pNext to the old head BEFORE we added our new item to the list, if a thread sees the new head a nanosecond after we added it, the list is safe to traverse.

Global stuff:

typedef struct _TList {    int data;    struct _TList *pNext;  } TList;    TList *_pHead;  

Add to list:

TList *pOld, *pNew;  ...  // allocate/fill/whatever to make pNew  ...  while (1) { // concurrency loop    pOld = _pHead;  // copy the state of the world. We operate on the copy    pNew->pNext = pOld; // chain the new node to the current head of recycled items    if (CAS(&_pHead, pOld, pNew))  // switch head of recycled items to new node      break; // success  }  

CAS is shorthand for __sync_bool_compare_and_swap or the like. See how easy? No Mutexes...no locks! In the rare event that 2 threads hit that code at the same time, one simply loops a second time. We only see the second loop because the scheduler swaps a thread out while in the concurrency loop. so it is rare and inconsequential.

Things can be pulled off the head of a linked list in a similar way. You can atomically change more than one value if you use unions and you can use uup to 128 bit atomic ops. We have tested 128 bit on 32 bit redhat linux and they are ~same speed as the 32, 64 bit atomic ops.

You will have to figure out how to use this type of technique with your tree. A b tree node will have two ptrs to child nodes. You can CAS them to change them. The balancing problem is tough. I can see how you could analyze a tree branch before you add something and make a copy of the branch from a certain point. when you finish changing the branch, you CAS the new one in. This would be a problem for large branches. Maybe balancing can be done "later" when the threads are not fighting over the tree. Maybe you can make it so the tree is still searchable even though you haven't cascaded the rotation all the way...in other words if thread A added a node and is recursively rotating nodes, thread b can still read or add nodes. Just some ideas. In some cases, we make a structure that has version numbers or lock flags in the 32 bits after the 32 bits of pNext. We use 64 bit CAS then. Maybe you could make the tree safe to read at all times without locks, but you might have to use the versioning technique on a branch that is being modified.

Here are a bunch of posts I have made talking about the advantages of atomic ops:

Pthreads and mutexes; locking part of an array

Efficient and fast way for thread argument

Configuration auto reloading with pthreads

Advantages of using condition variables over mutex

single bit manipulation

Is memory allocation in linux non-blocking?

Answer by Flavio for Implementing a FIFO mutex in pthreads


You can obtain a fair Mutex with the idea sketched by @caf, but using atomic operations to acquire the ticket before doing the actual lock.

#if defined(_MSC_VER)  typedef volatile LONG Sync32_t;  #define SyncFetchAndIncrement32(V) (InterlockedIncrement(V) - 1)  #elif (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) > 40100  typedef volatile uint32_t Sync32_t;  #define SyncFetchAndIncrement32(V) __sync_fetch_and_add(V, 1)  #else  #error No atomic operations  

0 comments:

Post a Comment

Popular Posts

Powered by Blogger.