[v1,5/6] linux-gen: queue: prepare for separate queue operations

Message ID 1534510807-11066-6-git-send-email-odpbot@yandex.ru
State New
Headers show
Series
  • Separate queue operations for plain and scheduled queues
Related show

Commit Message

Github ODP bot Aug. 17, 2018, 1 p.m.
From: Petri Savolainen <petri.savolainen@linaro.org>


Prepare for separate plain and scheduled queue enqueue and
dequeue operations. Plain queue enq/deq operations will be
simplified and scheduled queues do not have dequeue
functions (error functions instead). Enqueue/dequeue
functionality is not changed yet, functions are only renamed
and moved.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 675 (psavol:master-queue-inline)
 ** https://github.com/Linaro/odp/pull/675
 ** Patch: https://github.com/Linaro/odp/pull/675.patch
 ** Base sha: dc28824415ea510e3ef62e47f7640bf4a8420fde
 ** Merge commit sha: a68fc608f4c460289d09e09e68f3646d1846737a
 **/
 platform/linux-generic/odp_queue_basic.c | 293 ++++++++++++++---------
 1 file changed, 184 insertions(+), 109 deletions(-)

Patch

diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index a881766a7..d2de677df 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -485,8 +485,8 @@  static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[],
 	}
 }
 
-static inline int enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
-			    int num)
+static inline int _sched_queue_enq_multi(odp_queue_t handle,
+					 odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int sched = 0;
 	int ret;
@@ -532,17 +532,17 @@  static inline int enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
 	return num_enq;
 }
 
-static int queue_int_enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
-			       int num)
+static int plain_queue_enq_multi(odp_queue_t handle,
+				 odp_buffer_hdr_t *buf_hdr[], int num)
 {
-	return enq_multi(handle, buf_hdr, num);
+	return _sched_queue_enq_multi(handle, buf_hdr, num);
 }
 
-static int queue_int_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
+static int plain_queue_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
 {
 	int ret;
 
-	ret = enq_multi(handle, &buf_hdr, 1);
+	ret = _sched_queue_enq_multi(handle, &buf_hdr, 1);
 
 	if (ret == 1)
 		return 0;
@@ -550,30 +550,8 @@  static int queue_int_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
 		return -1;
 }
 
-static int queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
-{
-	queue_entry_t *queue = qentry_from_handle(handle);
-
-	if (odp_unlikely(num == 0))
-		return 0;
-
-	if (num > QUEUE_MULTI_MAX)
-		num = QUEUE_MULTI_MAX;
-
-	return queue->s.enqueue_multi(handle,
-				      (odp_buffer_hdr_t **)(uintptr_t)ev, num);
-}
-
-static int queue_enq(odp_queue_t handle, odp_event_t ev)
-{
-	queue_entry_t *queue = qentry_from_handle(handle);
-
-	return queue->s.enqueue(handle,
-				(odp_buffer_hdr_t *)(uintptr_t)ev);
-}
-
-static inline int plain_queue_deq(odp_queue_t handle,
-				  odp_buffer_hdr_t *buf_hdr[], int num)
+static inline int _plain_queue_deq_multi(odp_queue_t handle,
+					 odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int num_deq;
 	queue_entry_t *queue;
@@ -603,18 +581,18 @@  static inline int plain_queue_deq(odp_queue_t handle,
 	return num_deq;
 }
 
-static int queue_int_deq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
-			       int num)
+static int plain_queue_deq_multi(odp_queue_t handle,
+				 odp_buffer_hdr_t *buf_hdr[], int num)
 {
-	return plain_queue_deq(handle, buf_hdr, num);
+	return _plain_queue_deq_multi(handle, buf_hdr, num);
 }
 
-static odp_buffer_hdr_t *queue_int_deq(odp_queue_t handle)
+static odp_buffer_hdr_t *plain_queue_deq(odp_queue_t handle)
 {
 	odp_buffer_hdr_t *buf_hdr = NULL;
 	int ret;
 
-	ret = plain_queue_deq(handle, &buf_hdr, 1);
+	ret = _plain_queue_deq_multi(handle, &buf_hdr, 1);
 
 	if (ret == 1)
 		return buf_hdr;
@@ -622,89 +600,46 @@  static odp_buffer_hdr_t *queue_int_deq(odp_queue_t handle)
 		return NULL;
 }
 
-static int queue_deq_multi(odp_queue_t handle, odp_event_t ev[], int num)
+static int error_enqueue(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
 {
-	queue_entry_t *queue = qentry_from_handle(handle);
+	(void)buf_hdr;
 
-	if (num > QUEUE_MULTI_MAX)
-		num = QUEUE_MULTI_MAX;
+	ODP_ERR("Enqueue not supported (" PRIu64 ")\n",
+		odp_queue_to_u64(handle));
 
-	return queue->s.dequeue_multi(handle,
-				      (odp_buffer_hdr_t **)ev, num);
+	return -1;
 }
 
-static odp_event_t queue_deq(odp_queue_t handle)
+static int error_enqueue_multi(odp_queue_t handle,
+			       odp_buffer_hdr_t *buf_hdr[], int num)
 {
-	queue_entry_t *queue = qentry_from_handle(handle);
+	(void)buf_hdr;
+	(void)num;
 
-	return (odp_event_t)queue->s.dequeue(handle);
+	ODP_ERR("Enqueue multi not supported (" PRIu64 ")\n",
+		odp_queue_to_u64(handle));
+
+	return -1;
 }
 
-static int queue_init(queue_entry_t *queue, const char *name,
-		      const odp_queue_param_t *param)
+static odp_buffer_hdr_t *error_dequeue(odp_queue_t handle)
 {
-	uint64_t offset;
-	uint32_t queue_size;
-	int spsc;
-
-	if (name == NULL) {
-		queue->s.name[0] = 0;
-	} else {
-		strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1);
-		queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
-	}
-	memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
-	if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
-		return -1;
-
-	if (param->type == ODP_QUEUE_TYPE_SCHED)
-		queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
-
-	queue->s.type = queue->s.param.type;
+	ODP_ERR("Dequeue not supported (" PRIu64 ")\n",
+		odp_queue_to_u64(handle));
 
-	queue->s.pktin = PKTIN_INVALID;
-	queue->s.pktout = PKTOUT_INVALID;
-
-	/* Use default size for all small queues to quarantee performance
-	 * level. */
-	queue_size = queue_glb->config.default_queue_size;
-	if (param->size > queue_glb->config.default_queue_size)
-		queue_size = param->size;
-
-	/* Round up if not already a power of two */
-	queue_size = ROUNDUP_POWER2_U32(queue_size);
-
-	if (queue_size > queue_glb->config.max_queue_size) {
-		ODP_ERR("Too large queue size %u\n", queue_size);
-		return -1;
-	}
-
-	offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
-
-	/* Single-producer / single-consumer plain queue has simple and
-	 * lock-free implementation */
-	spsc = (param->type == ODP_QUEUE_TYPE_PLAIN) &&
-	       (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) &&
-	       (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE);
-
-	queue->s.spsc = spsc;
-	queue->s.queue_lf = NULL;
-
-	if (spsc) {
-		queue_spsc_init(queue, queue_size);
-	} else {
-		queue->s.enqueue = queue_int_enq;
-		queue->s.dequeue = queue_int_deq;
-		queue->s.enqueue_multi = queue_int_enq_multi;
-		queue->s.dequeue_multi = queue_int_deq_multi;
+	return NULL;
+}
 
-		queue->s.orig_dequeue_multi = queue_int_deq_multi;
+static int error_dequeue_multi(odp_queue_t handle,
+			       odp_buffer_hdr_t *buf_hdr[], int num)
+{
+	(void)buf_hdr;
+	(void)num;
 
-		ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
-			     queue_size);
-	}
+	ODP_ERR("Dequeue multi not supported (" PRIu64 ")\n",
+		odp_queue_to_u64(handle));
 
-	return 0;
+	return -1;
 }
 
 static void queue_param_init(odp_queue_param_t *params)
@@ -758,6 +693,24 @@  static int queue_info(odp_queue_t handle, odp_queue_info_t *info)
 	return 0;
 }
 
+static int sched_queue_enq_multi(odp_queue_t handle,
+				 odp_buffer_hdr_t *buf_hdr[], int num)
+{
+	return _sched_queue_enq_multi(handle, buf_hdr, num);
+}
+
+static int sched_queue_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
+{
+	int ret;
+
+	ret = _sched_queue_enq_multi(handle, &buf_hdr, 1);
+
+	if (ret == 1)
+		return 0;
+	else
+		return -1;
+}
+
 int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
 		    int update_status)
 {
@@ -830,6 +783,87 @@  int sched_queue_empty(uint32_t queue_index)
 	return ret;
 }
 
+static int queue_init(queue_entry_t *queue, const char *name,
+		      const odp_queue_param_t *param)
+{
+	uint64_t offset;
+	uint32_t queue_size;
+	odp_queue_type_t queue_type;
+	int spsc;
+
+	queue_type = param->type;
+
+	if (name == NULL) {
+		queue->s.name[0] = 0;
+	} else {
+		strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1);
+		queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
+	}
+	memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
+	if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
+		return -1;
+
+	if (queue_type == ODP_QUEUE_TYPE_SCHED)
+		queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
+
+	queue->s.type = queue_type;
+
+	queue->s.pktin = PKTIN_INVALID;
+	queue->s.pktout = PKTOUT_INVALID;
+
+	/* Use default size for all small queues to quarantee performance
+	 * level. */
+	queue_size = queue_glb->config.default_queue_size;
+	if (param->size > queue_glb->config.default_queue_size)
+		queue_size = param->size;
+
+	/* Round up if not already a power of two */
+	queue_size = ROUNDUP_POWER2_U32(queue_size);
+
+	if (queue_size > queue_glb->config.max_queue_size) {
+		ODP_ERR("Too large queue size %u\n", queue_size);
+		return -1;
+	}
+
+	offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
+
+	/* Single-producer / single-consumer plain queue has simple and
+	 * lock-free implementation */
+	spsc = (queue_type == ODP_QUEUE_TYPE_PLAIN) &&
+	       (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) &&
+	       (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE);
+
+	queue->s.spsc = spsc;
+	queue->s.queue_lf = NULL;
+
+	/* Default to error functions */
+	queue->s.enqueue            = error_enqueue;
+	queue->s.enqueue_multi      = error_enqueue_multi;
+	queue->s.dequeue            = error_dequeue;
+	queue->s.dequeue_multi      = error_dequeue_multi;
+	queue->s.orig_dequeue_multi = error_dequeue_multi;
+
+	if (spsc) {
+		queue_spsc_init(queue, queue_size);
+	} else {
+		if (queue_type == ODP_QUEUE_TYPE_PLAIN) {
+			queue->s.enqueue            = plain_queue_enq;
+			queue->s.enqueue_multi      = plain_queue_enq_multi;
+			queue->s.dequeue            = plain_queue_deq;
+			queue->s.dequeue_multi      = plain_queue_deq_multi;
+			queue->s.orig_dequeue_multi = plain_queue_deq_multi;
+		} else {
+			queue->s.enqueue            = sched_queue_enq;
+			queue->s.enqueue_multi      = sched_queue_enq_multi;
+		}
+
+		ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
+			     queue_size);
+	}
+
+	return 0;
+}
+
 static uint64_t queue_to_u64(odp_queue_t hdl)
 {
 	return _odp_pri(hdl);
@@ -894,6 +928,47 @@  static int queue_orig_multi(odp_queue_t handle,
 	return queue->s.orig_dequeue_multi(handle, buf_hdr, num);
 }
 
+static int queue_api_enq_multi(odp_queue_t handle,
+			       const odp_event_t ev[], int num)
+{
+	queue_entry_t *queue = qentry_from_handle(handle);
+
+	if (odp_unlikely(num == 0))
+		return 0;
+
+	if (num > QUEUE_MULTI_MAX)
+		num = QUEUE_MULTI_MAX;
+
+	return queue->s.enqueue_multi(handle,
+				      (odp_buffer_hdr_t **)(uintptr_t)ev, num);
+}
+
+static int queue_api_enq(odp_queue_t handle, odp_event_t ev)
+{
+	queue_entry_t *queue = qentry_from_handle(handle);
+
+	return queue->s.enqueue(handle,
+				(odp_buffer_hdr_t *)(uintptr_t)ev);
+}
+
+static int queue_api_deq_multi(odp_queue_t handle, odp_event_t ev[], int num)
+{
+	queue_entry_t *queue = qentry_from_handle(handle);
+
+	if (num > QUEUE_MULTI_MAX)
+		num = QUEUE_MULTI_MAX;
+
+	return queue->s.dequeue_multi(handle,
+				      (odp_buffer_hdr_t **)ev, num);
+}
+
+static odp_event_t queue_api_deq(odp_queue_t handle)
+{
+	queue_entry_t *queue = qentry_from_handle(handle);
+
+	return (odp_event_t)queue->s.dequeue(handle);
+}
+
 /* API functions */
 _odp_queue_api_fn_t queue_basic_api = {
 	.queue_create = queue_create,
@@ -901,10 +976,10 @@  _odp_queue_api_fn_t queue_basic_api = {
 	.queue_lookup = queue_lookup,
 	.queue_capability = queue_capability,
 	.queue_context_set = queue_context_set,
-	.queue_enq = queue_enq,
-	.queue_enq_multi = queue_enq_multi,
-	.queue_deq = queue_deq,
-	.queue_deq_multi = queue_deq_multi,
+	.queue_enq = queue_api_enq,
+	.queue_enq_multi = queue_api_enq_multi,
+	.queue_deq = queue_api_deq,
+	.queue_deq_multi = queue_api_deq_multi,
 	.queue_type = queue_type,
 	.queue_sched_type = queue_sched_type,
 	.queue_sched_prio = queue_sched_prio,