[v1,1/3] test: queue_perf: prepare for multiple worker threads

Message ID 1535025607-21626-2-git-send-email-odpbot@yandex.ru
State New
Headers show
Series
  • Multi-thread support for queue perf test
Related show

Commit Message

Github ODP bot Aug. 23, 2018, noon
From: Petri Savolainen <petri.savolainen@linaro.org>


Split queue create, test run and queue destroy into separate
functions and use helper to create a single worker thread.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 680 (psavol:master-test-queue-perf-multithread)
 ** https://github.com/Linaro/odp/pull/680
 ** Patch: https://github.com/Linaro/odp/pull/680.patch
 ** Base sha: 1c36bf726387b291d73bee1448cf163527cf5fb0
 ** Merge commit sha: cce6d22c7a83b846bd9589a48869c664f75980ae
 **/
 test/performance/odp_queue_perf.c | 199 ++++++++++++++++++++++++------
 1 file changed, 161 insertions(+), 38 deletions(-)

Patch

diff --git a/test/performance/odp_queue_perf.c b/test/performance/odp_queue_perf.c
index 1ca639ebc..e1c02f33f 100644
--- a/test/performance/odp_queue_perf.c
+++ b/test/performance/odp_queue_perf.c
@@ -12,6 +12,9 @@ 
 #include <getopt.h>
 
 #include <odp_api.h>
+#include <odp/helper/odph_api.h>
+
+#define MAX_QUEUES (32 * 1024)
 
 typedef struct test_options_t {
 	uint32_t num_queue;
@@ -19,9 +22,23 @@  typedef struct test_options_t {
 	uint32_t num_round;
 	odp_nonblocking_t nonblock;
 	int single;
+	int num_cpu;
 
 } test_options_t;
 
+typedef struct test_global_t {
+	odp_barrier_t    barrier;
+	test_options_t   options;
+	odp_instance_t   instance;
+	odp_shm_t        shm;
+	odp_pool_t       pool;
+	odp_queue_t      queue[MAX_QUEUES];
+	odph_odpthread_t thread_tbl[ODP_THREAD_COUNT_MAX];
+
+} test_global_t;
+
+static test_global_t test_global;
+
 static void print_usage(void)
 {
 	printf("\n"
@@ -58,6 +75,7 @@  static int parse_options(int argc, char *argv[], test_options_t *test_options)
 
 	static const char *shortopts = "+q:e:r:lwsh";
 
+	test_options->num_cpu   = 1;
 	test_options->num_queue = 1;
 	test_options->num_event = 1;
 	test_options->num_round = 1000;
@@ -98,28 +116,31 @@  static int parse_options(int argc, char *argv[], test_options_t *test_options)
 		}
 	}
 
+	if (test_options->num_queue > MAX_QUEUES) {
+		printf("Too many queues %u. Test maximum %u.\n",
+		       test_options->num_queue, MAX_QUEUES);
+		return -1;
+	}
+
 	return ret;
 }
 
-static int test_queue(test_options_t *test_options)
+static int create_queues(test_global_t *global)
 {
 	odp_pool_capability_t pool_capa;
 	odp_queue_capability_t queue_capa;
 	odp_pool_param_t pool_param;
 	odp_queue_param_t queue_param;
 	odp_pool_t pool;
-	odp_event_t ev;
-	uint32_t i, j, rounds;
-	uint32_t max_size;
-	uint64_t c1, c2, diff, ops, nsec;
-	odp_time_t t1, t2;
-	uint64_t num_retry = 0;
+	uint32_t i, j, max_size;
+	test_options_t *test_options = &global->options;
 	odp_nonblocking_t nonblock = test_options->nonblock;
 	uint32_t num_queue = test_options->num_queue;
 	uint32_t num_event = test_options->num_event;
 	uint32_t num_round = test_options->num_round;
 	uint32_t tot_event = num_queue * num_event;
-	odp_queue_t queue[num_queue];
+	int ret = 0;
+	odp_queue_t *queue = global->queue;
 	odp_event_t event[tot_event];
 
 	printf("\nTesting %s queues\n",
@@ -128,7 +149,7 @@  static int test_queue(test_options_t *test_options)
 	       (nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
 	printf("  num rounds           %u\n", num_round);
 	printf("  num queues           %u\n", num_queue);
-	printf("  num events per queue %u\n\n", num_event);
+	printf("  num events per queue %u\n", num_event);
 
 	for (i = 0; i < num_queue; i++)
 		queue[i] = ODP_QUEUE_INVALID;
@@ -215,6 +236,8 @@  static int test_queue(test_options_t *test_options)
 		return -1;
 	}
 
+	global->pool = pool;
+
 	odp_queue_param_init(&queue_param);
 	queue_param.type        = ODP_QUEUE_TYPE_PLAIN;
 	queue_param.nonblocking = nonblock;
@@ -230,7 +253,7 @@  static int test_queue(test_options_t *test_options)
 
 		if (queue[i] == ODP_QUEUE_INVALID) {
 			printf("Error: Queue create failed %u.\n", i);
-			goto error;
+			return -1;
 		}
 	}
 
@@ -239,7 +262,8 @@  static int test_queue(test_options_t *test_options)
 
 		if (event[i] == ODP_EVENT_INVALID) {
 			printf("Error: Event alloc failed %u.\n", i);
-			goto error;
+			ret = -1;
+			goto free_events;
 		}
 	}
 
@@ -249,13 +273,78 @@  static int test_queue(test_options_t *test_options)
 
 			if (odp_queue_enq(queue[i], event[id])) {
 				printf("Error: Queue enq failed %u/%u\n", i, j);
-				goto error;
+				ret = -1;
+				goto free_events;
 			}
 
 			event[id] = ODP_EVENT_INVALID;
 		}
 	}
 
+free_events:
+	/* Free events that were not stored into queues */
+	for (i = 0; i < tot_event; i++) {
+		if (event[i] != ODP_EVENT_INVALID)
+			odp_event_free(event[i]);
+	}
+
+	return ret;
+}
+
+static int destroy_queues(test_global_t *global)
+{
+	odp_event_t ev;
+	uint32_t i, j;
+	int ret = 0;
+	test_options_t *test_options = &global->options;
+	uint32_t num_queue = test_options->num_queue;
+	uint32_t num_event = test_options->num_event;
+	odp_queue_t *queue = global->queue;
+	odp_pool_t pool    = global->pool;
+
+	for (i = 0; i < num_queue; i++) {
+		if (queue[i] == ODP_QUEUE_INVALID) {
+			printf("Error: Invalid queue handle %u.\n", i);
+			ret = -1;
+			break;
+		}
+
+		for (j = 0; j < num_event; j++) {
+			ev = odp_queue_deq(queue[i]);
+
+			if (ev != ODP_EVENT_INVALID)
+				odp_event_free(ev);
+		}
+
+		if (odp_queue_destroy(queue[i])) {
+			printf("Error: Queue destroy failed %u.\n", i);
+			ret = -1;
+			break;
+		}
+	}
+
+	if (odp_pool_destroy(pool)) {
+		printf("Error: Pool destroy failed.\n");
+		ret = -1;
+	}
+
+	return ret;
+}
+
+static int run_test(void *arg)
+{
+	uint64_t c1, c2, diff, ops, nsec;
+	odp_time_t t1, t2;
+	odp_event_t ev;
+	uint32_t i, rounds;
+	test_global_t *global = arg;
+	test_options_t *test_options = &global->options;
+	odp_queue_t *queue = global->queue;
+	uint64_t num_retry = 0;
+	uint32_t num_queue = test_options->num_queue;
+	uint32_t num_round = test_options->num_round;
+	int ret = 0;
+
 	t1 = odp_time_local();
 	c1 = odp_cpu_cycles();
 
@@ -273,6 +362,7 @@  static int test_queue(test_options_t *test_options)
 				}
 
 				printf("Error: Queue deq failed %u\n", i);
+				ret = -1;
 				goto error;
 			}
 
@@ -280,6 +370,7 @@  static int test_queue(test_options_t *test_options)
 
 			if (odp_queue_enq(queue[i], ev)) {
 				printf("Error: Queue enq failed %u\n", i);
+				ret = -1;
 				goto error;
 			}
 		}
@@ -294,41 +385,52 @@  static int test_queue(test_options_t *test_options)
 
 	printf("RESULT:\n");
 	printf("  num deq + enq operations: %" PRIu64 "\n", ops);
+	printf("  num events:               %" PRIu64 "\n", ops);
 	printf("  duration (nsec):          %" PRIu64 "\n", nsec);
 	printf("  num cycles:               %" PRIu64 "\n", diff);
 	printf("  cycles per deq + enq:     %.3f\n", (double)diff / ops);
+	printf("  events per sec:           %.3f M\n", (1000.0 * ops) / nsec);
 	printf("  num retries:              %" PRIu64 "\n\n", num_retry);
 
 error:
 
-	for (i = 0; i < num_queue; i++) {
-		for (j = 0; j < num_event; j++) {
-			ev = odp_queue_deq(queue[i]);
+	return ret;
+}
 
-			if (ev != ODP_EVENT_INVALID)
-				odp_event_free(ev);
-		}
+static int start_workers(test_global_t *global)
+{
+	odph_odpthread_params_t thr_params;
+	odp_cpumask_t cpumask;
+	int ret;
+	test_options_t *test_options = &global->options;
+	int num_cpu = test_options->num_cpu;
+
+	memset(&thr_params, 0, sizeof(thr_params));
+	thr_params.thr_type = ODP_THREAD_WORKER;
+	thr_params.instance = global->instance;
+	thr_params.start    = run_test;
+	thr_params.arg      = global;
+
+	ret = odp_cpumask_default_worker(&cpumask, num_cpu);
+
+	if (num_cpu && ret != num_cpu) {
+		printf("Error: Too many workers. Max supported %i\n.", ret);
+		return -1;
 	}
 
-	for (i = 0; i < tot_event; i++) {
-		if (event[i] != ODP_EVENT_INVALID)
-			odp_event_free(event[i]);
+	/* Zero: all available workers */
+	if (num_cpu == 0) {
+		num_cpu = ret;
+		test_options->num_cpu = num_cpu;
 	}
 
-	for (i = 0; i < num_queue; i++) {
-		if (queue[i] == ODP_QUEUE_INVALID)
-			break;
+	printf("  num workers          %u\n\n", num_cpu);
 
-		if (odp_queue_destroy(queue[i])) {
-			printf("Error: Queue destroy failed %u.\n", i);
-			break;
-		}
-	}
+	odp_barrier_init(&global->barrier, num_cpu);
 
-	if (odp_pool_destroy(pool)) {
-		printf("Error: Pool destroy failed.\n");
+	if (odph_odpthreads_create(global->thread_tbl, &cpumask, &thr_params)
+	    != num_cpu)
 		return -1;
-	}
 
 	return 0;
 }
@@ -337,10 +439,7 @@  int main(int argc, char **argv)
 {
 	odp_instance_t instance;
 	odp_init_t init;
-	test_options_t test_options;
-
-	if (parse_options(argc, argv, &test_options))
-		return -1;
+	test_global_t *global;
 
 	/* List features not to be used */
 	odp_init_param_init(&init);
@@ -363,8 +462,32 @@  int main(int argc, char **argv)
 		return -1;
 	}
 
-	if (test_queue(&test_options))
-		printf("Error: Queue test failed.\n");
+	global = &test_global;
+	memset(global, 0, sizeof(test_global_t));
+
+	if (parse_options(argc, argv, &global->options))
+		return -1;
+
+	global->instance = instance;
+
+	if (create_queues(global)) {
+		printf("Error: Create queues failed.\n");
+		goto destroy;
+	}
+
+	if (start_workers(global)) {
+		printf("Error: Test start failed.\n");
+		return -1;
+	}
+
+	/* Wait workers to exit */
+	odph_odpthreads_join(global->thread_tbl);
+
+destroy:
+	if (destroy_queues(global)) {
+		printf("Error: Destroy queues failed.\n");
+		return -1;
+	}
 
 	if (odp_term_local()) {
 		printf("Error: term local failed.\n");