[v2,1/2] linux-dpdk: queue: implement using multi-producer/consumer ring

Message ID 1533538815-25409-2-git-send-email-odpbot@yandex.ru
State New
Headers show
Series
  • Implement queues using multi-producer/consumer rings
Related show

Commit Message

Github ODP bot Aug. 6, 2018, 7 a.m.
From: Matias Elo <matias.elo@nokia.com>

Implement queues using multi-producer/consumer rings. Enables
removing unnecessary locking.

Signed-off-by: Matias Elo <matias.elo@nokia.com>
---
/** Email created from pull request 55 (matiaselo:dev/queue_opt)
 ** https://github.com/Linaro/odp-dpdk/pull/55
 ** Patch: https://github.com/Linaro/odp-dpdk/pull/55.patch
 ** Base sha: d0bd42aa817eb79dffeab77cbe3ea2ea6f5e3db4
 ** Merge commit sha: bcca17023e693975a61eaecfc99d90fe63e8970b
 **/
 platform/linux-dpdk/Makefile.am               |   4 +-
 .../include/odp_queue_basic_internal.h        | 112 ++++++++++++++++++
 ...st_internal.h => odp_ring_mpmc_internal.h} |  40 +++----
 .../include/odp_ring_spsc_internal.h          |   2 +-
 platform/linux-dpdk/odp_queue_basic.c         |  25 ++--
 5 files changed, 147 insertions(+), 36 deletions(-)
 create mode 100644 platform/linux-dpdk/include/odp_queue_basic_internal.h
 rename platform/linux-dpdk/include/{odp_ring_st_internal.h => odp_ring_mpmc_internal.h} (51%)

Patch

diff --git a/platform/linux-dpdk/Makefile.am b/platform/linux-dpdk/Makefile.am
index 0b75c630..95265df6 100644
--- a/platform/linux-dpdk/Makefile.am
+++ b/platform/linux-dpdk/Makefile.am
@@ -110,12 +110,12 @@  noinst_HEADERS = \
 		  ${top_srcdir}/platform/linux-generic/include/odp_pkt_queue_internal.h \
 		  include/odp_pool_internal.h \
 		  include/odp_posix_extensions.h \
-		  ${top_srcdir}/platform/linux-generic/include/odp_queue_basic_internal.h \
+		  include/odp_queue_basic_internal.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_queue_if.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_queue_lf.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_ring_internal.h \
+		  include/odp_ring_mpmc_internal.h \
 		  include/odp_ring_spsc_internal.h \
-		  include/odp_ring_st_internal.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_schedule_if.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_sorted_list_internal.h \
 		  ${top_srcdir}/platform/linux-generic/include/odp_sysinfo_internal.h \
diff --git a/platform/linux-dpdk/include/odp_queue_basic_internal.h b/platform/linux-dpdk/include/odp_queue_basic_internal.h
new file mode 100644
index 00000000..c0d16840
--- /dev/null
+++ b/platform/linux-dpdk/include/odp_queue_basic_internal.h
@@ -0,0 +1,112 @@ 
+/* Copyright (c) 2013-2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_QUEUE_BASIC_INTERNAL_H_
+#define ODP_QUEUE_BASIC_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/plat/strong_types.h>
+#include <odp/api/queue.h>
+#include <odp_forward_typedefs_internal.h>
+#include <odp_queue_if.h>
+#include <odp_buffer_internal.h>
+#include <odp_align_internal.h>
+#include <odp/api/packet_io.h>
+#include <odp/api/align.h>
+#include <odp/api/hints.h>
+#include <odp/api/ticketlock.h>
+#include <odp_config_internal.h>
+#include <odp_ring_mpmc_internal.h>
+#include <odp_ring_spsc_internal.h>
+#include <odp_queue_lf.h>
+
+#define QUEUE_STATUS_FREE         0
+#define QUEUE_STATUS_DESTROYED    1
+#define QUEUE_STATUS_READY        2
+#define QUEUE_STATUS_NOTSCHED     3
+#define QUEUE_STATUS_SCHED        4
+
+struct queue_entry_s {
+	odp_ticketlock_t  ODP_ALIGNED_CACHE lock;
+	union {
+		ring_mpmc_t       ring_mpmc;
+		ring_spsc_t       ring_spsc;
+	};
+	int               status;
+
+	queue_enq_fn_t       ODP_ALIGNED_CACHE enqueue;
+	queue_deq_fn_t       dequeue;
+	queue_enq_multi_fn_t enqueue_multi;
+	queue_deq_multi_fn_t dequeue_multi;
+
+	uint32_t          index;
+	odp_queue_t       handle;
+	odp_queue_type_t  type;
+	odp_queue_param_t param;
+	odp_pktin_queue_t pktin;
+	odp_pktout_queue_t pktout;
+	void             *queue_lf;
+	int               spsc;
+	char              name[ODP_QUEUE_NAME_LEN];
+};
+
+union queue_entry_u {
+	struct queue_entry_s s;
+	uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
+};
+
+typedef struct queue_global_t {
+	queue_entry_t   queue[ODP_CONFIG_QUEUES];
+	uint32_t        *ring_data;
+	uint32_t        queue_lf_num;
+	uint32_t        queue_lf_size;
+	queue_lf_func_t queue_lf_func;
+	odp_shm_t       queue_gbl_shm;
+	odp_shm_t       queue_ring_shm;
+
+	struct {
+		uint32_t max_queue_size;
+		uint32_t default_queue_size;
+	} config;
+
+} queue_global_t;
+
+extern queue_global_t *queue_glb;
+
+static inline uint32_t queue_to_index(odp_queue_t handle)
+{
+	queue_entry_t *qentry = (queue_entry_t *)(uintptr_t)handle;
+
+	return qentry->s.index;
+}
+
+static inline queue_entry_t *qentry_from_index(uint32_t queue_id)
+{
+	return &queue_glb->queue[queue_id];
+}
+
+static inline odp_queue_t queue_from_index(uint32_t queue_id)
+{
+	return (odp_queue_t)qentry_from_index(queue_id);
+}
+
+void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size);
+
+/* Functions for schedulers */
+void sched_queue_destroy_finalize(uint32_t queue_index);
+void sched_queue_set_status(uint32_t queue_index, int status);
+int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int num,
+		    int update_status);
+int sched_queue_empty(uint32_t queue_index);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-dpdk/include/odp_ring_st_internal.h b/platform/linux-dpdk/include/odp_ring_mpmc_internal.h
similarity index 51%
rename from platform/linux-dpdk/include/odp_ring_st_internal.h
rename to platform/linux-dpdk/include/odp_ring_mpmc_internal.h
index 1a92aa0b..93cfcfb2 100644
--- a/platform/linux-dpdk/include/odp_ring_st_internal.h
+++ b/platform/linux-dpdk/include/odp_ring_mpmc_internal.h
@@ -4,15 +4,13 @@ 
  * SPDX-License-Identifier:     BSD-3-Clause
  */
 
-#ifndef ODP_RING_ST_INTERNAL_H_
-#define ODP_RING_ST_INTERNAL_H_
+#ifndef ODP_RING_MPMC_INTERNAL_H_
+#define ODP_RING_MPMC_INTERNAL_H_
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
-#include "config.h"
-
 #include <odp/api/queue.h>
 
 #include <odp_debug_internal.h>
@@ -21,34 +19,31 @@  extern "C" {
 #include <rte_ring.h>
 #include <rte_errno.h>
 
-typedef struct rte_ring *ring_st_t;
-
-/* Basic ring for single thread usage. Operations must be synchronized by using
- * locks (or other means), when multiple threads use the same ring. */
+/* Lock-free ring for multi-producer / multi-consumer usage. */
+typedef struct rte_ring *ring_mpmc_t;
 
-static void name_to_mz_name(const char *name, char *ring_name)
+static void ring_mpmc_name_to_mz_name(const char *name, char *ring_name)
 {
 	int i = 0;
 	int max_len = ODP_QUEUE_NAME_LEN < RTE_RING_NAMESIZE  ?
 			ODP_QUEUE_NAME_LEN : RTE_RING_NAMESIZE;
 
 	do {
-		snprintf(ring_name, max_len, "%d-%s", i++, name);
+		snprintf(ring_name, max_len, "%d-mpmc-%s", i++, name);
 		ring_name[max_len - 1] = 0;
 	} while (rte_ring_lookup(ring_name) != NULL);
 }
 
 /* Initialize ring. Ring size must be a power of two. */
-static inline ring_st_t ring_st_create(const char *name, uint32_t size)
+static inline ring_mpmc_t ring_mpmc_create(const char *name, uint32_t size)
 {
 	struct rte_ring *rte_ring;
 	char ring_name[RTE_RING_NAMESIZE];
 
 	/* Ring name must be unique */
-	name_to_mz_name(name, ring_name);
+	ring_mpmc_name_to_mz_name(name, ring_name);
 
-	rte_ring =  rte_ring_create(ring_name, size, rte_socket_id(),
-				    RING_F_SP_ENQ | RING_F_SC_DEQ);
+	rte_ring = rte_ring_create(ring_name, size, rte_socket_id(), 0);
 	if (rte_ring == NULL) {
 		ODP_ERR("Creating DPDK ring failed: %s\n",
 			rte_strerror(rte_errno));
@@ -58,27 +53,28 @@  static inline ring_st_t ring_st_create(const char *name, uint32_t size)
 	return rte_ring;
 }
 
-static inline void ring_st_free(ring_st_t ring)
+/*  Free all memory used by the ring. */
+static inline void ring_mpmc_free(ring_mpmc_t ring)
 {
 	rte_ring_free(ring);
 }
 
 /* Dequeue data from the ring head. Max_num is smaller than ring size.*/
-static inline uint32_t ring_st_deq_multi(ring_st_t ring, void **data,
-					 uint32_t max_num)
+static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t ring, void **data,
+					   uint32_t max_num)
 {
-	return rte_ring_dequeue_burst(ring, data, max_num, NULL);
+	return rte_ring_mc_dequeue_burst(ring, data, max_num, NULL);
 }
 
 /* Enqueue data into the ring tail. Num_data is smaller than ring size. */
-static inline uint32_t ring_st_enq_multi(ring_st_t ring, void **data,
-					 uint32_t num_data)
+static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t ring, void **data,
+					   uint32_t num_data)
 {
-	return rte_ring_enqueue_burst(ring, data, num_data, NULL);
+	return rte_ring_mp_enqueue_burst(ring, data, num_data, NULL);
 }
 
 /* Check if ring is empty */
-static inline int ring_st_is_empty(ring_st_t ring)
+static inline int ring_mpmc_is_empty(ring_mpmc_t ring)
 {
 	return rte_ring_empty(ring);
 }
diff --git a/platform/linux-dpdk/include/odp_ring_spsc_internal.h b/platform/linux-dpdk/include/odp_ring_spsc_internal.h
index 99dfebfa..8ea8c218 100644
--- a/platform/linux-dpdk/include/odp_ring_spsc_internal.h
+++ b/platform/linux-dpdk/include/odp_ring_spsc_internal.h
@@ -38,7 +38,7 @@  static void ring_spsc_name_to_mz_name(const char *name, char *ring_name)
 			ODP_QUEUE_NAME_LEN : RTE_RING_NAMESIZE;
 
 	do {
-		snprintf(ring_name, max_len, "%d-%s", i++, name);
+		snprintf(ring_name, max_len, "%d-spsc-%s", i++, name);
 		ring_name[max_len - 1] = 0;
 	} while (rte_ring_lookup(ring_name) != NULL);
 }
diff --git a/platform/linux-dpdk/odp_queue_basic.c b/platform/linux-dpdk/odp_queue_basic.c
index 2a33e194..2bc6688d 100644
--- a/platform/linux-dpdk/odp_queue_basic.c
+++ b/platform/linux-dpdk/odp_queue_basic.c
@@ -389,7 +389,7 @@  static int queue_destroy(odp_queue_t handle)
 	if (queue->s.spsc)
 		empty = ring_spsc_is_empty(queue->s.ring_spsc);
 	else
-		empty = ring_st_is_empty(queue->s.ring_st);
+		empty = ring_mpmc_is_empty(queue->s.ring_mpmc);
 
 	if (!empty) {
 		UNLOCK(queue);
@@ -397,9 +397,9 @@  static int queue_destroy(odp_queue_t handle)
 		return -1;
 	}
 	if (queue->s.spsc)
-		ring_spsc_free(queue->s.ring_st);
+		ring_spsc_free(queue->s.ring_spsc);
 	else
-		ring_st_free(queue->s.ring_st);
+		ring_mpmc_free(queue->s.ring_mpmc);
 
 	switch (queue->s.status) {
 	case QUEUE_STATUS_READY:
@@ -478,7 +478,8 @@  static inline int enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
 		return -1;
 	}
 
-	num_enq = ring_st_enq_multi(queue->s.ring_st, (void **)buf_hdr, num);
+	num_enq = ring_mpmc_enq_multi(queue->s.ring_mpmc, (void **)buf_hdr,
+				      num);
 
 	if (odp_unlikely(num_enq == 0)) {
 		UNLOCK(queue);
@@ -551,7 +552,8 @@  static inline int plain_queue_deq(queue_entry_t *queue,
 		return -1;
 	}
 
-	num_deq = ring_st_deq_multi(queue->s.ring_st, (void **)buf_hdr, num);
+	num_deq = ring_mpmc_deq_multi(queue->s.ring_mpmc, (void **)buf_hdr,
+				      num);
 
 	UNLOCK(queue);
 
@@ -682,8 +684,9 @@  static int queue_init(queue_entry_t *queue, const char *name,
 		queue->s.enqueue_multi = queue_int_enq_multi;
 		queue->s.dequeue_multi = queue_int_deq_multi;
 
-		queue->s.ring_st = ring_st_create(queue->s.name, queue_size);
-		if (queue->s.ring_st == NULL)
+		queue->s.ring_mpmc = ring_mpmc_create(queue->s.name,
+						      queue_size);
+		if (queue->s.ring_mpmc == NULL)
 			return -1;
 	}
 
@@ -745,11 +748,11 @@  int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
 		    int update_status)
 {
 	int num_deq;
-	ring_st_t ring_st;
+	ring_mpmc_t ring_mpmc;
 	queue_entry_t *queue = qentry_from_index(queue_index);
 	int status_sync = sched_fn->status_sync;
 
-	ring_st = queue->s.ring_st;
+	ring_mpmc = queue->s.ring_mpmc;
 
 	LOCK(queue);
 
@@ -760,7 +763,7 @@  int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
 		return -1;
 	}
 
-	num_deq = ring_st_deq_multi(ring_st, (void **)ev, max_num);
+	num_deq = ring_mpmc_deq_multi(ring_mpmc, (void **)ev, max_num);
 
 	if (num_deq == 0) {
 		/* Already empty queue */
@@ -797,7 +800,7 @@  int sched_queue_empty(uint32_t queue_index)
 		return -1;
 	}
 
-	if (ring_st_is_empty(queue->s.ring_st)) {
+	if (ring_mpmc_is_empty(queue->s.ring_mpmc)) {
 		/* Already empty queue. Update status. */
 		if (queue->s.status == QUEUE_STATUS_SCHED)
 			queue->s.status = QUEUE_STATUS_NOTSCHED;