This is the mail archive of the libc-alpha@sourceware.org mailing list for the glibc project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

[PATCH v2] rwlock: Fix explicit hand-over.


On Sat, 2017-03-25 at 21:17 +0100, Florian Weimer wrote:
> * Torvald Riegel:
> 
> > +  bool registered_while_in_write_phase = false;
> >    if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
> >      return 0;
> > +  else
> > +    registered_while_in_write_phase = true;
> 
> Sorry, this doesn't look quite right.  Isn't
> registered_while_in_write_phase always true?

Attached is a v2 patch.  It's the same logic, but bigger.  Most of this
increase is due to reformatting, but I also adapted some of the
comments.
I get two failures, but I guess these are either due to the bad internet
connectivity I currently have, or something at the resolver.
FAIL: resolv/mtrace-tst-leaks
FAIL: resolv/tst-leaks


commit 68cb32e0202eb8f3c647e56e0a2eb88a4e903e23
Author: Torvald Riegel <triegel@redhat.com>
Date:   Sat Mar 25 00:36:46 2017 +0100

    rwlock: Fix explicit hand-over.
    
    Without this patch, the rwlock can fail to execute the explicit hand-over
    in certain cases (e.g., empty critical sections that switch quickly between
    read and write phases).  This can then lead to errors in how __wrphase_futex
    is accessed, which in turn can lead to deadlocks.
    
    	[BZ #21298]
    	* nptl/pthread_rwlock_common.c (__pthread_rwlock_rdlock_full): Fix
    	explicit hand-over.
    	(__pthread_rwlock_wrlock_full): Likewise.
    	* nptl/tst-rwlock20.c: New file.
    	* nptl/Makefile: Add new test.

diff --git a/nptl/Makefile b/nptl/Makefile
index 6d48c0c..0cc2873 100644
--- a/nptl/Makefile
+++ b/nptl/Makefile
@@ -241,7 +241,7 @@ tests = tst-typesizes \
 	tst-rwlock4 tst-rwlock5 tst-rwlock6 tst-rwlock7 tst-rwlock8 \
 	tst-rwlock9 tst-rwlock10 tst-rwlock11 tst-rwlock12 tst-rwlock13 \
 	tst-rwlock14 tst-rwlock15 tst-rwlock16 tst-rwlock17 tst-rwlock18 \
-	tst-rwlock19 \
+	tst-rwlock19 tst-rwlock20 \
 	tst-once1 tst-once2 tst-once3 tst-once4 tst-once5 \
 	tst-key1 tst-key2 tst-key3 tst-key4 \
 	tst-sem1 tst-sem2 tst-sem3 tst-sem4 tst-sem5 tst-sem6 tst-sem7 \
diff --git a/nptl/pthread_rwlock_common.c b/nptl/pthread_rwlock_common.c
index 256508c..c07eae4 100644
--- a/nptl/pthread_rwlock_common.c
+++ b/nptl/pthread_rwlock_common.c
@@ -70,15 +70,16 @@
    ---------------------------
    #1    0   0   0   0   Lock is idle (and in a read phase).
    #2    0   0   >0  0   Readers have acquired the lock.
-   #3    0   1   0   0   Lock is not acquired; a writer is waiting for a write
-			 phase to start or will try to start one.
+   #3    0   1   0   0   Lock is not acquired; a writer will try to start a
+			 write phase.
    #4    0   1   >0  0   Readers have acquired the lock; a writer is waiting
 			 and explicit hand-over to the writer is required.
    #4a   0   1   >0  1   Same as #4 except that there are further readers
 			 waiting because the writer is to be preferred.
    #5    1   0   0   0   Lock is idle (and in a write phase).
-   #6    1   0   >0  0   Write phase; readers are waiting for a read phase to
-			 start or will try to start one.
+   #6    1   0   >0  0   Write phase; readers will try to start a read phase
+			 (requires explicit hand-over to all readers that
+			 do not start the read phase).
    #7    1   1   0   0   Lock is acquired by a writer.
    #8    1   1   >0  0   Lock acquired by a writer and readers are waiting;
 			 explicit hand-over to the readers is required.
@@ -375,9 +376,9 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
      complexity.  */
   if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
     return 0;
-
-  /* If there is no primary writer but we are in a write phase, we can try
-     to install a read phase ourself.  */
+  /* Otherwise, if we were in a write phase (states #6 or #8), we must wait
+     for explicit hand-over of the read phase; the only exception is if we
+     can start a read phase if there is no primary writer currently.  */
   while (((r & PTHREAD_RWLOCK_WRPHASE) != 0)
       && ((r & PTHREAD_RWLOCK_WRLOCKED) == 0))
     {
@@ -390,15 +391,18 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
 	{
 	  /* We started the read phase, so we are also responsible for
 	     updating the write-phase futex.  Relaxed MO is sufficient.
-	     Note that there can be no other reader that we have to wake
-	     because all other readers will see the read phase started by us
-	     (or they will try to start it themselves); if a writer started
-	     the read phase, we cannot have started it.  Furthermore, we
-	     cannot discard a PTHREAD_RWLOCK_FUTEX_USED flag because we will
-	     overwrite the value set by the most recent writer (or the readers
-	     before it in case of explicit hand-over) and we know that there
-	     are no waiting readers.  */
-	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 0);
+	     We have to do the same steps as a writer would when handing
+	     over the read phase to us because other readers cannot
+	     distinguish between us and the writer; this includes
+	     explicit hand-over and potentially having to wake other readers
+	     (but we can pretend to do the setting and unsetting of WRLOCKED
+	     atomically, and thus can skip this step).  */
+	  if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
+	      & PTHREAD_RWLOCK_FUTEX_USED) != 0)
+	    {
+	      int private = __pthread_rwlock_get_private (rwlock);
+	      futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
+	    }
 	  return 0;
 	}
       else
@@ -407,102 +411,98 @@ __pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
 	}
     }
 
-  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
+  /* We were in a write phase but did not install the read phase.  We cannot
+     distinguish between a writer and another reader starting the read phase,
+     so we must wait for explicit hand-over via __wrphase_futex.
+     However, __wrphase_futex might not have been set to 1 yet (either
+     because explicit hand-over to the writer is still ongoing, or because
+     the writer has started the write phase but does not yet have updated
+     __wrphase_futex).  The least recent value of __wrphase_futex we can
+     read from here is the modification of the last read phase (because
+     we synchronize with the last reader in this read phase through
+     __readers; see the use of acquire MO on the fetch_add above).
+     Therefore, if we observe a value of 0 for __wrphase_futex, we need
+     to subsequently check that __readers now indicates a read phase; we
+     need to use acquire MO for this so that if we observe a read phase,
+     we will also see the modification of __wrphase_futex by the previous
+     writer.  We then need to load __wrphase_futex again and continue to
+     wait if it is not 0, so that we do not skip explicit hand-over.
+     Relaxed MO is sufficient for the load from __wrphase_futex because
+     we just use it as an indicator for when we can proceed; we use
+     __readers and the acquire MO accesses to it to eventually read from
+     the proper stores to __wrphase_futex.  */
+  unsigned int wpf;
+  bool ready = false;
+  for (;;)
     {
-      /* We are in a write phase, and there must be a primary writer because
-	 of the previous loop.  Block until the primary writer gives up the
-	 write phase.  This case requires explicit hand-over using
-	 __wrphase_futex.
-	 However, __wrphase_futex might not have been set to 1 yet (either
-	 because explicit hand-over to the writer is still ongoing, or because
-	 the writer has started the write phase but does not yet have updated
-	 __wrphase_futex).  The least recent value of __wrphase_futex we can
-	 read from here is the modification of the last read phase (because
-	 we synchronize with the last reader in this read phase through
-	 __readers; see the use of acquire MO on the fetch_add above).
-	 Therefore, if we observe a value of 0 for __wrphase_futex, we need
-	 to subsequently check that __readers now indicates a read phase; we
-	 need to use acquire MO for this so that if we observe a read phase,
-	 we will also see the modification of __wrphase_futex by the previous
-	 writer.  We then need to load __wrphase_futex again and continue to
-	 wait if it is not 0, so that we do not skip explicit hand-over.
-	 Relaxed MO is sufficient for the load from __wrphase_futex because
-	 we just use it as an indicator for when we can proceed; we use
-	 __readers and the acquire MO accesses to it to eventually read from
-	 the proper stores to __wrphase_futex.  */
-      unsigned int wpf;
-      bool ready = false;
-      for (;;)
+      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
+	  | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
 	{
-	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
-	      | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
+	  int private = __pthread_rwlock_get_private (rwlock);
+	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
+	      && !atomic_compare_exchange_weak_relaxed
+		  (&rwlock->__data.__wrphase_futex,
+		   &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
+	    continue;
+	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
+	      1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
+	  if (err == ETIMEDOUT)
 	    {
-	      int private = __pthread_rwlock_get_private (rwlock);
-	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
-		  && !atomic_compare_exchange_weak_relaxed
-		      (&rwlock->__data.__wrphase_futex,
-		       &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
-		continue;
-	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
-		  1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
-	      if (err == ETIMEDOUT)
+	      /* If we timed out, we need to unregister.  If no read phase
+		 has been installed while we waited, we can just decrement
+		 the number of readers.  Otherwise, we just acquire the
+		 lock, which is allowed because we give no precise timing
+		 guarantees, and because the timeout is only required to
+		 be in effect if we would have had to wait for other
+		 threads (e.g., if futex_wait would time-out immediately
+		 because the given absolute time is in the past).  */
+	      r = atomic_load_relaxed (&rwlock->__data.__readers);
+	      while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
 		{
-		  /* If we timed out, we need to unregister.  If no read phase
-		     has been installed while we waited, we can just decrement
-		     the number of readers.  Otherwise, we just acquire the
-		     lock, which is allowed because we give no precise timing
-		     guarantees, and because the timeout is only required to
-		     be in effect if we would have had to wait for other
-		     threads (e.g., if futex_wait would time-out immediately
-		     because the given absolute time is in the past).  */
-		  r = atomic_load_relaxed (&rwlock->__data.__readers);
-		  while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
-		    {
-		      /* We don't need to make anything else visible to
-			 others besides unregistering, so relaxed MO is
-			 sufficient.  */
-		      if (atomic_compare_exchange_weak_relaxed
-			  (&rwlock->__data.__readers, &r,
-			   r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
-			return ETIMEDOUT;
-		      /* TODO Back-off.  */
-		    }
-		  /* Use the acquire MO fence to mirror the steps taken in the
-		     non-timeout case.  Note that the read can happen both
-		     in the atomic_load above as well as in the failure case
-		     of the CAS operation.  */
-		  atomic_thread_fence_acquire ();
-		  /* We still need to wait for explicit hand-over, but we must
-		     not use futex_wait anymore because we would just time out
-		     in this case and thus make the spin-waiting we need
-		     unnecessarily expensive.  */
-		  while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
-		      | PTHREAD_RWLOCK_FUTEX_USED)
-		      == (1 | PTHREAD_RWLOCK_FUTEX_USED))
-		    {
-		      /* TODO Back-off?  */
-		    }
-		  ready = true;
-		  break;
+		  /* We don't need to make anything else visible to
+		     others besides unregistering, so relaxed MO is
+		     sufficient.  */
+		  if (atomic_compare_exchange_weak_relaxed
+		      (&rwlock->__data.__readers, &r,
+		       r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
+		    return ETIMEDOUT;
+		  /* TODO Back-off.  */
+		}
+	      /* Use the acquire MO fence to mirror the steps taken in the
+		 non-timeout case.  Note that the read can happen both
+		 in the atomic_load above as well as in the failure case
+		 of the CAS operation.  */
+	      atomic_thread_fence_acquire ();
+	      /* We still need to wait for explicit hand-over, but we must
+		 not use futex_wait anymore because we would just time out
+		 in this case and thus make the spin-waiting we need
+		 unnecessarily expensive.  */
+	      while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
+		  | PTHREAD_RWLOCK_FUTEX_USED)
+		  == (1 | PTHREAD_RWLOCK_FUTEX_USED))
+		{
+		  /* TODO Back-off?  */
 		}
-	      /* If we got interrupted (EINTR) or the futex word does not have the
-		 expected value (EAGAIN), retry.  */
+	      ready = true;
+	      break;
 	    }
-	  if (ready)
-	    /* See below.  */
-	    break;
-	  /* We need acquire MO here so that we synchronize with the lock
-	     release of the writer, and so that we observe a recent value of
-	     __wrphase_futex (see below).  */
-	  if ((atomic_load_acquire (&rwlock->__data.__readers)
-	      & PTHREAD_RWLOCK_WRPHASE) == 0)
-	    /* We are in a read phase now, so the least recent modification of
-	       __wrphase_futex we can read from is the store by the writer
-	       with value 1.  Thus, only now we can assume that if we observe
-	       a value of 0, explicit hand-over is finished. Retry the loop
-	       above one more time.  */
-	    ready = true;
+	  /* If we got interrupted (EINTR) or the futex word does not have the
+	     expected value (EAGAIN), retry.  */
 	}
+      if (ready)
+	/* See below.  */
+	break;
+      /* We need acquire MO here so that we synchronize with the lock
+	 release of the writer, and so that we observe a recent value of
+	 __wrphase_futex (see below).  */
+      if ((atomic_load_acquire (&rwlock->__data.__readers)
+	  & PTHREAD_RWLOCK_WRPHASE) == 0)
+	/* We are in a read phase now, so the least recent modification of
+	   __wrphase_futex we can read from is the store by the writer
+	   with value 1.  Thus, only now we can assume that if we observe
+	   a value of 0, explicit hand-over is finished. Retry the loop
+	   above one more time.  */
+	ready = true;
     }
 
   return 0;
@@ -741,10 +741,23 @@ __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
 	  r = atomic_load_relaxed (&rwlock->__data.__readers);
 	}
       /* Our snapshot of __readers is up-to-date at this point because we
-	 either set WRLOCKED using a CAS or were handed over WRLOCKED from
+	 either set WRLOCKED using a CAS (and update r accordingly below,
+	 which was used as expected value for the CAS) or got WRLOCKED from
 	 another writer whose snapshot of __readers we inherit.  */
+      r |= PTHREAD_RWLOCK_WRLOCKED;
     }
 
+  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
+     MO is sufficient for futex words; acquire MO on the previous
+     modifications of __readers ensures that this store happens after the
+     store of value 0 by the previous primary writer.  */
+  atomic_store_relaxed (&rwlock->__data.__writers_futex,
+      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
+
+  /* If we are in a write phase, we have acquired the lock.  */
+  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
+    goto done;
+
   /* If we are in a read phase and there are no readers, try to start a write
      phase.  */
   while (((r & PTHREAD_RWLOCK_WRPHASE) == 0)
@@ -758,166 +771,156 @@ __pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
 	  &r, r | PTHREAD_RWLOCK_WRPHASE))
 	{
 	  /* We have started a write phase, so need to enable readers to wait.
-	     See the similar case in__pthread_rwlock_rdlock_full.  */
+	     See the similar case in __pthread_rwlock_rdlock_full.  Unlike in
+	     that similar case, we are the (only) primary writer and so do
+	     not need to wake another writer.  */
 	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 1);
-	  /* Make sure we fall through to the end of the function.  */
-	  r |= PTHREAD_RWLOCK_WRPHASE;
-	  break;
+
+	  goto done;
 	}
       /* TODO Back-off.  */
     }
 
-  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
-     MO is sufficient for futex words; acquire MO on the previous
-     modifications of __readers ensures that this store happens after the
-     store of value 0 by the previous primary writer.  */
-  atomic_store_relaxed (&rwlock->__data.__writers_futex,
-      1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
-
-  if (__glibc_unlikely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
+  /* We became the primary writer in a read phase and there were readers when
+     we did (because of the previous loop).  Thus, we have to wait for
+     explicit hand-over from one of these readers.
+     We basically do the same steps as for the similar case in
+     __pthread_rwlock_rdlock_full, except that we additionally might try
+     to directly hand over to another writer and need to wake up
+     other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
+  unsigned int wpf;
+  bool ready = false;
+  for (;;)
     {
-      /* We are not in a read phase and there are readers (because of the
-	 previous loop).  Thus, we have to wait for explicit hand-over from
-	 one of these readers.
-	 We basically do the same steps as for the similar case in
-	 __pthread_rwlock_rdlock_full, except that we additionally might try
-	 to directly hand over to another writer and need to wake up
-	 other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
-      unsigned int wpf;
-      bool ready = false;
-      for (;;)
+      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
+	  | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
 	{
-	  while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
-	      | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
+	  int private = __pthread_rwlock_get_private (rwlock);
+	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
+	      && !atomic_compare_exchange_weak_relaxed
+		  (&rwlock->__data.__wrphase_futex, &wpf,
+		   PTHREAD_RWLOCK_FUTEX_USED))
+	    continue;
+	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
+	      PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
+	  if (err == ETIMEDOUT)
 	    {
-	      int private = __pthread_rwlock_get_private (rwlock);
-	      if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
-		  && !atomic_compare_exchange_weak_relaxed
-		      (&rwlock->__data.__wrphase_futex, &wpf,
-		       PTHREAD_RWLOCK_FUTEX_USED))
-		continue;
-	      int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
-		  PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
-	      if (err == ETIMEDOUT)
+	      if (rwlock->__data.__flags
+		  != PTHREAD_RWLOCK_PREFER_READER_NP)
 		{
-		  if (rwlock->__data.__flags
-		      != PTHREAD_RWLOCK_PREFER_READER_NP)
-		    {
-		      /* We try writer--writer hand-over.  */
-		      unsigned int w = atomic_load_relaxed
-			  (&rwlock->__data.__writers);
-		      if (w != 0)
-			{
-			  /* We are about to hand over WRLOCKED, so we must
-			     release __writers_futex too; otherwise, we'd have
-			     a pending store, which could at least prevent
-			     other threads from waiting using the futex
-			     because it could interleave with the stores
-			     by subsequent writers.  In turn, this means that
-			     we have to clean up when we do not hand over
-			     WRLOCKED.
-			     Release MO so that another writer that gets
-			     WRLOCKED from us can take over our view of
-			     __readers.  */
-			  unsigned int wf = atomic_exchange_relaxed
-			      (&rwlock->__data.__writers_futex, 0);
-			  while (w != 0)
-			    {
-			      if (atomic_compare_exchange_weak_release
-				  (&rwlock->__data.__writers, &w,
-				      w | PTHREAD_RWLOCK_WRHANDOVER))
-				{
-				  /* Wake other writers.  */
-				  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
-				    futex_wake
-					(&rwlock->__data.__writers_futex, 1,
-					 private);
-				  return ETIMEDOUT;
-				}
-			      /* TODO Back-off.  */
-			    }
-			  /* We still own WRLOCKED and someone else might set
-			     a write phase concurrently, so enable waiting
-			     again.  Make sure we don't loose the flag that
-			     signals whether there are threads waiting on
-			     this futex.  */
-			  atomic_store_relaxed
-			      (&rwlock->__data.__writers_futex, wf);
-			}
-		    }
-		  /* If we timed out and we are not in a write phase, we can
-		     just stop being a primary writer.  Otherwise, we just
-		     acquire the lock.  */
-		  r = atomic_load_relaxed (&rwlock->__data.__readers);
-		  if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		  /* We try writer--writer hand-over.  */
+		  unsigned int w = atomic_load_relaxed
+		      (&rwlock->__data.__writers);
+		  if (w != 0)
 		    {
-		      /* We are about to release WRLOCKED, so we must release
-			 __writers_futex too; see the handling of
-			 writer--writer hand-over above.  */
+		      /* We are about to hand over WRLOCKED, so we must
+			 release __writers_futex too; otherwise, we'd have
+			 a pending store, which could at least prevent
+			 other threads from waiting using the futex
+			 because it could interleave with the stores
+			 by subsequent writers.  In turn, this means that
+			 we have to clean up when we do not hand over
+			 WRLOCKED.
+			 Release MO so that another writer that gets
+			 WRLOCKED from us can take over our view of
+			 __readers.  */
 		      unsigned int wf = atomic_exchange_relaxed
 			  (&rwlock->__data.__writers_futex, 0);
-		      while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		      while (w != 0)
 			{
-			  /* While we don't need to make anything from a
-			     caller's critical section visible to other
-			     threads, we need to ensure that our changes to
-			     __writers_futex are properly ordered.
-			     Therefore, use release MO to synchronize with
-			     subsequent primary writers.  Also wake up any
-			     waiting readers as they are waiting because of
-			     us.  */
 			  if (atomic_compare_exchange_weak_release
-			      (&rwlock->__data.__readers, &r,
-			       (r ^ PTHREAD_RWLOCK_WRLOCKED)
-			       & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
+			      (&rwlock->__data.__writers, &w,
+				  w | PTHREAD_RWLOCK_WRHANDOVER))
 			    {
 			      /* Wake other writers.  */
 			      if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
 				futex_wake (&rwlock->__data.__writers_futex,
-				    1, private);
-			      /* Wake waiting readers.  */
-			      if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
-				futex_wake (&rwlock->__data.__readers,
-				    INT_MAX, private);
+					    1, private);
 			      return ETIMEDOUT;
 			    }
+			  /* TODO Back-off.  */
 			}
-		      /* We still own WRLOCKED and someone else might set a
-			 write phase concurrently, so enable waiting again.
-			 Make sure we don't loose the flag that signals
-			 whether there are threads waiting on this futex.  */
-		      atomic_store_relaxed (&rwlock->__data.__writers_futex,
-			  wf);
+		      /* We still own WRLOCKED and someone else might set
+			 a write phase concurrently, so enable waiting
+			 again.  Make sure we don't loose the flag that
+			 signals whether there are threads waiting on
+			 this futex.  */
+		      atomic_store_relaxed
+			  (&rwlock->__data.__writers_futex, wf);
 		    }
-		  /* Use the acquire MO fence to mirror the steps taken in the
-		     non-timeout case.  Note that the read can happen both
-		     in the atomic_load above as well as in the failure case
-		     of the CAS operation.  */
-		  atomic_thread_fence_acquire ();
-		  /* We still need to wait for explicit hand-over, but we must
-		     not use futex_wait anymore.  */
-		  while ((atomic_load_relaxed
-		      (&rwlock->__data.__wrphase_futex)
-		       | PTHREAD_RWLOCK_FUTEX_USED)
-		      == PTHREAD_RWLOCK_FUTEX_USED)
+		}
+	      /* If we timed out and we are not in a write phase, we can
+		 just stop being a primary writer.  Otherwise, we just
+		 acquire the lock.  */
+	      r = atomic_load_relaxed (&rwlock->__data.__readers);
+	      if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
+		{
+		  /* We are about to release WRLOCKED, so we must release
+		     __writers_futex too; see the handling of
+		     writer--writer hand-over above.  */
+		  unsigned int wf = atomic_exchange_relaxed
+		      (&rwlock->__data.__writers_futex, 0);
+		  while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
 		    {
-		      /* TODO Back-off.  */
+		      /* While we don't need to make anything from a
+			 caller's critical section visible to other
+			 threads, we need to ensure that our changes to
+			 __writers_futex are properly ordered.
+			 Therefore, use release MO to synchronize with
+			 subsequent primary writers.  Also wake up any
+			 waiting readers as they are waiting because of
+			 us.  */
+		      if (atomic_compare_exchange_weak_release
+			  (&rwlock->__data.__readers, &r,
+			   (r ^ PTHREAD_RWLOCK_WRLOCKED)
+			   & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
+			{
+			  /* Wake other writers.  */
+			  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
+			    futex_wake (&rwlock->__data.__writers_futex,
+				1, private);
+			  /* Wake waiting readers.  */
+			  if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
+			    futex_wake (&rwlock->__data.__readers,
+				INT_MAX, private);
+			  return ETIMEDOUT;
+			}
 		    }
-		  ready = true;
-		  break;
+		  /* We still own WRLOCKED and someone else might set a
+		     write phase concurrently, so enable waiting again.
+		     Make sure we don't loose the flag that signals
+		     whether there are threads waiting on this futex.  */
+		  atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);
 		}
-	      /* If we got interrupted (EINTR) or the futex word does not have
-		 the expected value (EAGAIN), retry.  */
+	      /* Use the acquire MO fence to mirror the steps taken in the
+		 non-timeout case.  Note that the read can happen both
+		 in the atomic_load above as well as in the failure case
+		 of the CAS operation.  */
+	      atomic_thread_fence_acquire ();
+	      /* We still need to wait for explicit hand-over, but we must
+		 not use futex_wait anymore.  */
+	      while ((atomic_load_relaxed
+		  (&rwlock->__data.__wrphase_futex)
+		   | PTHREAD_RWLOCK_FUTEX_USED)
+		  == PTHREAD_RWLOCK_FUTEX_USED)
+		{
+		  /* TODO Back-off.  */
+		}
+	      ready = true;
+	      break;
 	    }
-	  /* See pthread_rwlock_rdlock_full.  */
-	  if (ready)
-	    break;
-	  if ((atomic_load_acquire (&rwlock->__data.__readers)
-	      & PTHREAD_RWLOCK_WRPHASE) != 0)
-	    ready = true;
+	  /* If we got interrupted (EINTR) or the futex word does not have
+	     the expected value (EAGAIN), retry.  */
 	}
+      /* See pthread_rwlock_rdlock_full.  */
+      if (ready)
+	break;
+      if ((atomic_load_acquire (&rwlock->__data.__readers)
+	  & PTHREAD_RWLOCK_WRPHASE) != 0)
+	ready = true;
     }
 
+ done:
   atomic_store_relaxed (&rwlock->__data.__cur_writer,
       THREAD_GETMEM (THREAD_SELF, tid));
   return 0;
diff --git a/nptl/tst-rwlock20.c b/nptl/tst-rwlock20.c
new file mode 100644
index 0000000..fc636e3
--- /dev/null
+++ b/nptl/tst-rwlock20.c
@@ -0,0 +1,128 @@
+/* Test program for a read-phase / write-phase explicit hand-over.
+   Copyright (C) 2000-2017 Free Software Foundation, Inc.
+
+   The GNU C Library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public License as
+   published by the Free Software Foundation; either version 2.1 of the
+   License, or (at your option) any later version.
+
+   The GNU C Library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with the GNU C Library; see the file COPYING.LIB.  If
+   not, see <http://www.gnu.org/licenses/>.  */
+
+#include <errno.h>
+#include <error.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <time.h>
+#include <atomic.h>
+
+#define THREADS 2
+
+#define KIND PTHREAD_RWLOCK_PREFER_READER_NP
+
+static pthread_rwlock_t lock;
+static int done = 0;
+
+static void*
+tf (void* arg)
+{
+  while (atomic_load_relaxed (&done) == 0)
+    {
+      int rcnt = 0;
+      int wcnt = 100;
+      if ((uintptr_t) arg == 0)
+	{
+	  rcnt = 1;
+	  wcnt = 1;
+	}
+
+      do
+	{
+	  if (wcnt)
+	    {
+	      pthread_rwlock_wrlock (&lock);
+	      pthread_rwlock_unlock (&lock);
+	      wcnt--;
+	  }
+	  if (rcnt)
+	    {
+	      pthread_rwlock_rdlock (&lock);
+	      pthread_rwlock_unlock (&lock);
+	      rcnt--;
+	  }
+	}
+      while ((atomic_load_relaxed (&done) == 0) && (rcnt + wcnt > 0));
+
+    }
+    return NULL;
+}
+
+
+
+static int
+do_test (void)
+{
+  pthread_t thr[THREADS];
+  int n;
+  void *res;
+  pthread_rwlockattr_t a;
+
+  if (pthread_rwlockattr_init (&a) != 0)
+    {
+      puts ("rwlockattr_t failed");
+      exit (1);
+    }
+
+  if (pthread_rwlockattr_setkind_np (&a, KIND) != 0)
+    {
+      puts ("rwlockattr_setkind failed");
+      exit (1);
+    }
+
+  if (pthread_rwlock_init (&lock, &a) != 0)
+    {
+      puts ("rwlock_init failed");
+      exit (1);
+    }
+
+  /* Make standard error the same as standard output.  */
+  dup2 (1, 2);
+
+  /* Make sure we see all message, even those on stdout.  */
+  setvbuf (stdout, NULL, _IONBF, 0);
+
+  for (n = 0; n < THREADS; ++n)
+    if (pthread_create (&thr[n], NULL, tf, (void *) (uintptr_t) n) != 0)
+      {
+	puts ("thread create failed");
+	exit (1);
+      }
+  struct timespec delay;
+  delay.tv_sec = 10;
+  delay.tv_nsec = 0;
+  nanosleep (&delay, NULL);
+  atomic_store_relaxed (&done, 1);
+
+  /* Wait for all the threads.  */
+  for (n = 0; n < THREADS; ++n)
+    if (pthread_join (thr[n], &res) != 0)
+      {
+	puts ("writer join failed");
+	exit (1);
+      }
+
+  return 0;
+}
+
+#define TIMEOUT 15
+#define TEST_FUNCTION do_test ()
+#include "../test-skeleton.c"

Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]