[v1,4/4] linux-gen: ring_spsc: move ring mask and data pointer

Message ID 1535119206-23556-5-git-send-email-odpbot@yandex.ru
State New
Headers show
Series
  • Optimize plain queues by using mpmc ring instead of lock
Related show

Commit Message

Github ODP bot Aug. 24, 2018, 2 p.m.
From: Petri Savolainen <petri.savolainen@linaro.org>


Store mask and data pointer in queue entry instead of ring
structure. Data is constant and can be stored among other
frequently used read only data. Also other ring type use
the same variables.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 683 (psavol:master-queue-lockless-enqdeq-3)
 ** https://github.com/Linaro/odp/pull/683
 ** Patch: https://github.com/Linaro/odp/pull/683.patch
 ** Base sha: 989df5d2f97ab4711328b11282dcc743f5740e00
 ** Merge commit sha: 28073c54671148efdd01c9cf38c1a235d5a133f0
 **/
 .../include/odp_ring_spsc_internal.h          | 33 +++++++++----------
 platform/linux-generic/odp_queue_spsc.c       | 11 ++++---
 2 files changed, 22 insertions(+), 22 deletions(-)

Patch

diff --git a/platform/linux-generic/include/odp_ring_spsc_internal.h b/platform/linux-generic/include/odp_ring_spsc_internal.h
index e38bda1da..de122bf57 100644
--- a/platform/linux-generic/include/odp_ring_spsc_internal.h
+++ b/platform/linux-generic/include/odp_ring_spsc_internal.h
@@ -29,31 +29,27 @@  extern "C" {
 typedef struct {
 	odp_atomic_u32_t head;
 	odp_atomic_u32_t tail;
-	uint32_t mask;
-	uint32_t *data;
 
 } ring_spsc_t;
 
 /* Initialize ring. Ring size must be a power of two. */
-static inline void ring_spsc_init(ring_spsc_t *ring, uint32_t *data,
-				  uint32_t size)
+static inline void ring_spsc_init(ring_spsc_t *ring)
 {
 	odp_atomic_init_u32(&ring->head, 0);
 	odp_atomic_init_u32(&ring->tail, 0);
-	ring->mask = size - 1;
-	ring->data = data;
 }
 
 /* Dequeue data from the ring head. Max_num is smaller than ring size.*/
-static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
+static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring,
+					   uint32_t *ring_data,
+					   uint32_t ring_mask, uint32_t data[],
 					   uint32_t max_num)
 {
-	uint32_t head, tail, mask, idx;
+	uint32_t head, tail, idx;
 	uint32_t num, i;
 
 	tail = odp_atomic_load_acq_u32(&ring->tail);
 	head = odp_atomic_load_u32(&ring->head);
-	mask = ring->mask;
 	num  = tail - head;
 
 	/* Empty */
@@ -63,11 +59,11 @@  static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
 	if (num > max_num)
 		num = max_num;
 
-	idx = head & mask;
+	idx = head & ring_mask;
 
 	for (i = 0; i < num; i++) {
-		data[i] = ring->data[idx];
-		idx = (idx + 1) & mask;
+		data[i] = ring_data[idx];
+		idx = (idx + 1) & ring_mask;
 	}
 
 	odp_atomic_store_rel_u32(&ring->head, head + num);
@@ -77,16 +73,17 @@  static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
 
 /* Enqueue data into the ring tail. Num_data is smaller than ring size. */
 static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring,
+					   uint32_t *ring_data,
+					   uint32_t ring_mask,
 					   const uint32_t data[],
 					   uint32_t num_data)
 {
-	uint32_t head, tail, mask, size, idx;
+	uint32_t head, tail, size, idx;
 	uint32_t num, i;
 
 	head = odp_atomic_load_acq_u32(&ring->head);
 	tail = odp_atomic_load_u32(&ring->tail);
-	mask = ring->mask;
-	size = mask + 1;
+	size = ring_mask + 1;
 	num  = size - (tail - head);
 
 	/* Full */
@@ -96,11 +93,11 @@  static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring,
 	if (num > num_data)
 		num = num_data;
 
-	idx = tail & mask;
+	idx = tail & ring_mask;
 
 	for (i = 0; i < num; i++) {
-		ring->data[idx] = data[i];
-		idx = (idx + 1) & mask;
+		ring_data[idx] = data[i];
+		idx = (idx + 1) & ring_mask;
 	}
 
 	odp_atomic_store_rel_u32(&ring->tail, tail + num);
diff --git a/platform/linux-generic/odp_queue_spsc.c b/platform/linux-generic/odp_queue_spsc.c
index 0fd8d85a7..002561a49 100644
--- a/platform/linux-generic/odp_queue_spsc.c
+++ b/platform/linux-generic/odp_queue_spsc.c
@@ -49,7 +49,8 @@  static inline int spsc_enq_multi(odp_queue_t handle,
 		return -1;
 	}
 
-	return ring_spsc_enq_multi(ring_spsc, buf_idx, num);
+	return ring_spsc_enq_multi(ring_spsc, queue->s.ring_data,
+				   queue->s.ring_mask, buf_idx, num);
 }
 
 static inline int spsc_deq_multi(odp_queue_t handle,
@@ -68,7 +69,8 @@  static inline int spsc_deq_multi(odp_queue_t handle,
 		return -1;
 	}
 
-	num_deq = ring_spsc_deq_multi(ring_spsc, buf_idx, num);
+	num_deq = ring_spsc_deq_multi(ring_spsc, queue->s.ring_data,
+				      queue->s.ring_mask, buf_idx, num);
 
 	if (num_deq == 0)
 		return 0;
@@ -127,6 +129,7 @@  void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
 
 	offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
 
-	ring_spsc_init(&queue->s.ring_spsc, &queue_glb->ring_data[offset],
-		       queue_size);
+	queue->s.ring_data = &queue_glb->ring_data[offset];
+	queue->s.ring_mask = queue_size - 1;
+	ring_spsc_init(&queue->s.ring_spsc);
 }