Line data Source code
1 : /* SPDX-License-Identifier: BSD-3-Clause
2 : * Copyright (C) 2016 Intel Corporation.
3 : * All rights reserved.
4 : */
5 :
6 : #include "spdk/stdinc.h"
7 : #include "spdk/likely.h"
8 :
9 : #include "event_internal.h"
10 :
11 : #include "spdk_internal/event.h"
12 : #include "spdk_internal/usdt.h"
13 :
14 : #include "spdk/log.h"
15 : #include "spdk/thread.h"
16 : #include "spdk/env.h"
17 : #include "spdk/util.h"
18 : #include "spdk/scheduler.h"
19 : #include "spdk/string.h"
20 : #include "spdk/fd_group.h"
21 : #include "spdk/trace.h"
22 : #include "spdk_internal/trace_defs.h"
23 :
24 : #ifdef __linux__
25 : #include <sys/prctl.h>
26 : #include <sys/eventfd.h>
27 : #endif
28 :
29 : #ifdef __FreeBSD__
30 : #include <pthread_np.h>
31 : #endif
32 :
33 : #define SPDK_EVENT_BATCH_SIZE 8
34 :
35 : static struct spdk_reactor *g_reactors;
36 : static uint32_t g_reactor_count;
37 : static struct spdk_cpuset g_reactor_core_mask;
38 : static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
39 :
40 : static bool g_framework_context_switch_monitor_enabled = true;
41 :
42 : static struct spdk_mempool *g_spdk_event_mempool = NULL;
43 :
44 : TAILQ_HEAD(, spdk_scheduler) g_scheduler_list
45 : = TAILQ_HEAD_INITIALIZER(g_scheduler_list);
46 :
47 : static struct spdk_scheduler *g_scheduler = NULL;
48 : static struct spdk_reactor *g_scheduling_reactor;
49 : bool g_scheduling_in_progress = false;
50 : static uint64_t g_scheduler_period_in_tsc = 0;
51 : static uint64_t g_scheduler_period_in_us;
52 : static uint32_t g_scheduler_core_number;
53 : static struct spdk_scheduler_core_info *g_core_infos = NULL;
54 : static struct spdk_cpuset g_scheduler_isolated_core_mask;
55 :
56 : TAILQ_HEAD(, spdk_governor) g_governor_list
57 : = TAILQ_HEAD_INITIALIZER(g_governor_list);
58 :
59 : static struct spdk_governor *g_governor = NULL;
60 :
61 : static int reactor_interrupt_init(struct spdk_reactor *reactor);
62 : static void reactor_interrupt_fini(struct spdk_reactor *reactor);
63 :
64 : static pthread_mutex_t g_stopping_reactors_mtx = PTHREAD_MUTEX_INITIALIZER;
65 : static bool g_stopping_reactors = false;
66 :
67 : static struct spdk_scheduler *
68 5 : _scheduler_find(const char *name)
69 : {
70 5 : struct spdk_scheduler *tmp;
71 :
72 9 : TAILQ_FOREACH(tmp, &g_scheduler_list, link) {
73 7 : if (strcmp(name, tmp->name) == 0) {
74 3 : return tmp;
75 : }
76 4 : }
77 :
78 2 : return NULL;
79 5 : }
80 :
81 : int
82 3 : spdk_scheduler_set(const char *name)
83 : {
84 3 : struct spdk_scheduler *scheduler;
85 3 : int rc = 0;
86 :
87 : /* NULL scheduler was specifically requested */
88 3 : if (name == NULL) {
89 0 : if (g_scheduler) {
90 0 : g_scheduler->deinit();
91 0 : }
92 0 : g_scheduler = NULL;
93 0 : return 0;
94 : }
95 :
96 3 : scheduler = _scheduler_find(name);
97 3 : if (scheduler == NULL) {
98 0 : SPDK_ERRLOG("Requested scheduler is missing\n");
99 0 : return -EINVAL;
100 : }
101 :
102 3 : if (g_scheduler == scheduler) {
103 2 : return 0;
104 : }
105 :
106 1 : if (g_scheduler) {
107 0 : g_scheduler->deinit();
108 0 : }
109 :
110 1 : rc = scheduler->init();
111 1 : if (rc == 0) {
112 1 : g_scheduler = scheduler;
113 1 : } else {
114 : /* Could not switch to the new scheduler, so keep the old
115 : * one. We need to check if it wasn't NULL, and ->init() it again.
116 : */
117 0 : if (g_scheduler) {
118 0 : SPDK_ERRLOG("Could not ->init() '%s' scheduler, reverting to '%s'\n",
119 : name, g_scheduler->name);
120 0 : g_scheduler->init();
121 0 : } else {
122 0 : SPDK_ERRLOG("Could not ->init() '%s' scheduler.\n", name);
123 : }
124 : }
125 :
126 1 : return rc;
127 3 : }
128 :
129 : struct spdk_scheduler *
130 6 : spdk_scheduler_get(void)
131 : {
132 6 : return g_scheduler;
133 : }
134 :
135 : uint64_t
136 0 : spdk_scheduler_get_period(void)
137 : {
138 0 : return g_scheduler_period_in_us;
139 : }
140 :
141 : void
142 0 : spdk_scheduler_set_period(uint64_t period)
143 : {
144 0 : g_scheduler_period_in_us = period;
145 0 : g_scheduler_period_in_tsc = period * spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
146 0 : }
147 :
148 : void
149 2 : spdk_scheduler_register(struct spdk_scheduler *scheduler)
150 : {
151 2 : if (_scheduler_find(scheduler->name)) {
152 0 : SPDK_ERRLOG("scheduler named '%s' already registered.\n", scheduler->name);
153 0 : assert(false);
154 : return;
155 : }
156 :
157 2 : TAILQ_INSERT_TAIL(&g_scheduler_list, scheduler, link);
158 2 : }
159 :
160 : uint32_t
161 18 : spdk_scheduler_get_scheduling_lcore(void)
162 : {
163 18 : return g_scheduling_reactor->lcore;
164 : }
165 :
166 : bool
167 0 : spdk_scheduler_set_scheduling_lcore(uint32_t core)
168 : {
169 0 : struct spdk_reactor *reactor = spdk_reactor_get(core);
170 0 : if (reactor == NULL) {
171 0 : SPDK_ERRLOG("Failed to set scheduling reactor. Reactor(lcore:%d) does not exist", core);
172 0 : return false;
173 : }
174 :
175 0 : g_scheduling_reactor = reactor;
176 0 : return true;
177 0 : }
178 :
179 : bool
180 3 : scheduler_set_isolated_core_mask(struct spdk_cpuset isolated_core_mask)
181 : {
182 3 : struct spdk_cpuset tmp_mask;
183 :
184 3 : spdk_cpuset_copy(&tmp_mask, spdk_app_get_core_mask());
185 3 : spdk_cpuset_or(&tmp_mask, &isolated_core_mask);
186 3 : if (spdk_cpuset_equal(&tmp_mask, spdk_app_get_core_mask()) == false) {
187 2 : SPDK_ERRLOG("Isolated core mask is not included in app core mask.\n");
188 2 : return false;
189 : }
190 1 : spdk_cpuset_copy(&g_scheduler_isolated_core_mask, &isolated_core_mask);
191 1 : return true;
192 3 : }
193 :
194 : const char *
195 0 : scheduler_get_isolated_core_mask(void)
196 : {
197 0 : return spdk_cpuset_fmt(&g_scheduler_isolated_core_mask);
198 : }
199 :
200 : static bool
201 15 : scheduler_is_isolated_core(uint32_t core)
202 : {
203 15 : return spdk_cpuset_get_cpu(&g_scheduler_isolated_core_mask, core);
204 : }
205 :
206 : static void
207 31 : reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
208 : {
209 31 : reactor->lcore = lcore;
210 31 : reactor->flags.is_valid = true;
211 :
212 31 : TAILQ_INIT(&reactor->threads);
213 31 : reactor->thread_count = 0;
214 31 : spdk_cpuset_zero(&reactor->notify_cpuset);
215 :
216 31 : reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_NUMA_ID_ANY);
217 31 : if (reactor->events == NULL) {
218 0 : SPDK_ERRLOG("Failed to allocate events ring\n");
219 0 : assert(false);
220 : }
221 :
222 : /* Always initialize interrupt facilities for reactor */
223 31 : if (reactor_interrupt_init(reactor) != 0) {
224 : /* Reactor interrupt facilities are necessary if setting app to interrupt mode. */
225 0 : if (spdk_interrupt_mode_is_enabled()) {
226 0 : SPDK_ERRLOG("Failed to prepare intr facilities\n");
227 0 : assert(false);
228 : }
229 0 : return;
230 : }
231 :
232 : /* If application runs with full interrupt ability,
233 : * all reactors are going to run in interrupt mode.
234 : */
235 31 : if (spdk_interrupt_mode_is_enabled()) {
236 0 : uint32_t i;
237 :
238 0 : SPDK_ENV_FOREACH_CORE(i) {
239 0 : spdk_cpuset_set_cpu(&reactor->notify_cpuset, i, true);
240 0 : }
241 0 : reactor->in_interrupt = true;
242 0 : }
243 31 : }
244 :
245 : struct spdk_reactor *
246 410 : spdk_reactor_get(uint32_t lcore)
247 : {
248 410 : struct spdk_reactor *reactor;
249 :
250 410 : if (g_reactors == NULL) {
251 0 : SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
252 0 : return NULL;
253 : }
254 :
255 410 : if (lcore >= g_reactor_count) {
256 0 : return NULL;
257 : }
258 :
259 410 : reactor = &g_reactors[lcore];
260 :
261 410 : if (reactor->flags.is_valid == false) {
262 0 : return NULL;
263 : }
264 :
265 410 : return reactor;
266 410 : }
267 :
268 : static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
269 : static bool reactor_thread_op_supported(enum spdk_thread_op op);
270 :
271 : /* Power of 2 minus 1 is optimal for memory consumption */
272 : #define EVENT_MSG_MEMPOOL_SHIFT 14 /* 2^14 = 16384 */
273 : #define EVENT_MSG_MEMPOOL_SIZE ((1 << EVENT_MSG_MEMPOOL_SHIFT) - 1)
274 :
275 : int
276 11 : spdk_reactors_init(size_t msg_mempool_size)
277 : {
278 11 : struct spdk_reactor *reactor;
279 11 : int rc;
280 11 : uint32_t i, current_core;
281 11 : char mempool_name[32];
282 :
283 11 : snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
284 22 : g_spdk_event_mempool = spdk_mempool_create(mempool_name,
285 11 : EVENT_MSG_MEMPOOL_SIZE,
286 : sizeof(struct spdk_event),
287 : SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
288 : SPDK_ENV_NUMA_ID_ANY);
289 :
290 11 : if (g_spdk_event_mempool == NULL) {
291 0 : SPDK_ERRLOG("spdk_event_mempool creation failed\n");
292 0 : return -1;
293 : }
294 :
295 : /* struct spdk_reactor must be aligned on 64 byte boundary */
296 11 : g_reactor_count = spdk_env_get_last_core() + 1;
297 11 : rc = posix_memalign((void **)&g_reactors, 64,
298 11 : g_reactor_count * sizeof(struct spdk_reactor));
299 11 : if (rc != 0) {
300 0 : SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
301 : g_reactor_count);
302 0 : spdk_mempool_free(g_spdk_event_mempool);
303 0 : return -1;
304 : }
305 :
306 11 : g_core_infos = calloc(g_reactor_count, sizeof(*g_core_infos));
307 11 : if (g_core_infos == NULL) {
308 0 : SPDK_ERRLOG("Could not allocate memory for g_core_infos\n");
309 0 : spdk_mempool_free(g_spdk_event_mempool);
310 0 : free(g_reactors);
311 0 : return -ENOMEM;
312 : }
313 :
314 11 : memset(g_reactors, 0, (g_reactor_count) * sizeof(struct spdk_reactor));
315 :
316 11 : rc = spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
317 11 : sizeof(struct spdk_lw_thread), msg_mempool_size);
318 11 : if (rc != 0) {
319 0 : SPDK_ERRLOG("Initialize spdk thread lib failed\n");
320 0 : spdk_mempool_free(g_spdk_event_mempool);
321 0 : free(g_reactors);
322 0 : free(g_core_infos);
323 0 : return rc;
324 : }
325 :
326 41 : SPDK_ENV_FOREACH_CORE(i) {
327 30 : reactor_construct(&g_reactors[i], i);
328 30 : }
329 :
330 11 : current_core = spdk_env_get_current_core();
331 11 : reactor = spdk_reactor_get(current_core);
332 11 : assert(reactor != NULL);
333 11 : g_scheduling_reactor = reactor;
334 :
335 11 : g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
336 :
337 11 : return 0;
338 11 : }
339 :
340 : void
341 11 : spdk_reactors_fini(void)
342 : {
343 11 : uint32_t i;
344 11 : struct spdk_reactor *reactor;
345 :
346 11 : if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
347 0 : return;
348 : }
349 :
350 11 : spdk_thread_lib_fini();
351 :
352 41 : SPDK_ENV_FOREACH_CORE(i) {
353 30 : reactor = spdk_reactor_get(i);
354 30 : assert(reactor != NULL);
355 30 : assert(reactor->thread_count == 0);
356 30 : if (reactor->events != NULL) {
357 30 : spdk_ring_free(reactor->events);
358 30 : }
359 :
360 30 : reactor_interrupt_fini(reactor);
361 :
362 30 : if (g_core_infos != NULL) {
363 30 : free(g_core_infos[i].thread_infos);
364 30 : }
365 30 : }
366 :
367 11 : spdk_mempool_free(g_spdk_event_mempool);
368 :
369 11 : free(g_reactors);
370 11 : g_reactors = NULL;
371 11 : free(g_core_infos);
372 11 : g_core_infos = NULL;
373 11 : }
374 :
375 : static void _reactor_set_interrupt_mode(void *arg1, void *arg2);
376 :
377 : static void
378 4 : _reactor_set_notify_cpuset(void *arg1, void *arg2)
379 : {
380 4 : struct spdk_reactor *target = arg1;
381 4 : struct spdk_reactor *reactor = spdk_reactor_get(spdk_env_get_current_core());
382 :
383 4 : assert(reactor != NULL);
384 4 : spdk_cpuset_set_cpu(&reactor->notify_cpuset, target->lcore, target->new_in_interrupt);
385 4 : }
386 :
387 : static void
388 22 : _event_call(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
389 : {
390 22 : struct spdk_event *ev;
391 :
392 22 : ev = spdk_event_allocate(lcore, fn, arg1, arg2);
393 22 : assert(ev);
394 22 : spdk_event_call(ev);
395 22 : }
396 :
397 : static void
398 2 : _reactor_set_notify_cpuset_cpl(void *arg1, void *arg2)
399 : {
400 2 : struct spdk_reactor *target = arg1;
401 :
402 2 : if (target->new_in_interrupt == false) {
403 1 : target->set_interrupt_mode_in_progress = false;
404 2 : _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
405 1 : target->set_interrupt_mode_cb_arg, NULL);
406 1 : } else {
407 1 : _event_call(target->lcore, _reactor_set_interrupt_mode, target, NULL);
408 : }
409 2 : }
410 :
411 : static void
412 0 : _reactor_set_thread_interrupt_mode(void *ctx)
413 : {
414 0 : struct spdk_reactor *reactor = ctx;
415 :
416 0 : spdk_thread_set_interrupt_mode(reactor->in_interrupt);
417 0 : }
418 :
419 : static void
420 2 : _reactor_set_interrupt_mode(void *arg1, void *arg2)
421 : {
422 2 : struct spdk_reactor *target = arg1;
423 2 : struct spdk_thread *thread;
424 2 : struct spdk_fd_group *grp;
425 2 : struct spdk_lw_thread *lw_thread, *tmp;
426 :
427 2 : assert(target == spdk_reactor_get(spdk_env_get_current_core()));
428 2 : assert(target != NULL);
429 2 : assert(target->in_interrupt != target->new_in_interrupt);
430 2 : SPDK_DEBUGLOG(reactor, "Do reactor set on core %u from %s to state %s\n",
431 : target->lcore, target->in_interrupt ? "intr" : "poll", target->new_in_interrupt ? "intr" : "poll");
432 :
433 2 : target->in_interrupt = target->new_in_interrupt;
434 :
435 2 : if (spdk_interrupt_mode_is_enabled()) {
436 : /* Align spdk_thread with reactor to interrupt mode or poll mode */
437 0 : TAILQ_FOREACH_SAFE(lw_thread, &target->threads, link, tmp) {
438 0 : thread = spdk_thread_get_from_ctx(lw_thread);
439 0 : if (target->in_interrupt) {
440 0 : grp = spdk_thread_get_interrupt_fd_group(thread);
441 0 : spdk_fd_group_nest(target->fgrp, grp);
442 0 : } else {
443 0 : grp = spdk_thread_get_interrupt_fd_group(thread);
444 0 : spdk_fd_group_unnest(target->fgrp, grp);
445 : }
446 :
447 0 : spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, target);
448 0 : }
449 0 : }
450 :
451 2 : if (target->new_in_interrupt == false) {
452 : /* Reactor is no longer in interrupt mode. Refresh the tsc_last to accurately
453 : * track reactor stats. */
454 1 : target->tsc_last = spdk_get_ticks();
455 1 : spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
456 1 : } else {
457 1 : uint64_t notify = 1;
458 1 : int rc = 0;
459 :
460 : /* Always trigger spdk_event and resched event in case of race condition */
461 1 : rc = write(target->events_fd, ¬ify, sizeof(notify));
462 1 : if (rc < 0) {
463 0 : SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
464 0 : }
465 1 : rc = write(target->resched_fd, ¬ify, sizeof(notify));
466 1 : if (rc < 0) {
467 0 : SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
468 0 : }
469 :
470 1 : target->set_interrupt_mode_in_progress = false;
471 2 : _event_call(spdk_scheduler_get_scheduling_lcore(), target->set_interrupt_mode_cb_fn,
472 1 : target->set_interrupt_mode_cb_arg, NULL);
473 1 : }
474 2 : }
475 :
476 : int
477 2 : spdk_reactor_set_interrupt_mode(uint32_t lcore, bool new_in_interrupt,
478 : spdk_reactor_set_interrupt_mode_cb cb_fn, void *cb_arg)
479 : {
480 2 : struct spdk_reactor *target;
481 :
482 2 : target = spdk_reactor_get(lcore);
483 2 : if (target == NULL) {
484 0 : return -EINVAL;
485 : }
486 :
487 : /* Eventfd has to be supported in order to use interrupt functionality. */
488 2 : if (target->fgrp == NULL) {
489 0 : return -ENOTSUP;
490 : }
491 :
492 2 : if (spdk_env_get_current_core() != g_scheduling_reactor->lcore) {
493 0 : SPDK_ERRLOG("It is only permitted within scheduling reactor.\n");
494 0 : return -EPERM;
495 : }
496 :
497 2 : if (target->in_interrupt == new_in_interrupt) {
498 0 : cb_fn(cb_arg, NULL);
499 0 : return 0;
500 : }
501 :
502 2 : if (target->set_interrupt_mode_in_progress) {
503 0 : SPDK_NOTICELOG("Reactor(%u) is already in progress to set interrupt mode\n", lcore);
504 0 : return -EBUSY;
505 : }
506 2 : target->set_interrupt_mode_in_progress = true;
507 :
508 2 : target->new_in_interrupt = new_in_interrupt;
509 2 : target->set_interrupt_mode_cb_fn = cb_fn;
510 2 : target->set_interrupt_mode_cb_arg = cb_arg;
511 :
512 2 : SPDK_DEBUGLOG(reactor, "Starting reactor event from %d to %d\n",
513 : spdk_env_get_current_core(), lcore);
514 :
515 2 : if (new_in_interrupt == false) {
516 : /* For potential race cases, when setting the reactor to poll mode,
517 : * first change the mode of the reactor and then clear the corresponding
518 : * bit of the notify_cpuset of each reactor.
519 : */
520 1 : _event_call(lcore, _reactor_set_interrupt_mode, target, NULL);
521 1 : } else {
522 : /* For race cases, when setting the reactor to interrupt mode, first set the
523 : * corresponding bit of the notify_cpuset of each reactor and then change the mode.
524 : */
525 1 : spdk_for_each_reactor(_reactor_set_notify_cpuset, target, NULL, _reactor_set_notify_cpuset_cpl);
526 : }
527 :
528 2 : return 0;
529 2 : }
530 :
531 : struct spdk_event *
532 54 : spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
533 : {
534 54 : struct spdk_event *event = NULL;
535 54 : struct spdk_reactor *reactor = spdk_reactor_get(lcore);
536 :
537 54 : if (!reactor) {
538 0 : assert(false);
539 : return NULL;
540 : }
541 :
542 54 : event = spdk_mempool_get(g_spdk_event_mempool);
543 54 : if (event == NULL) {
544 0 : assert(false);
545 : return NULL;
546 : }
547 :
548 54 : event->lcore = lcore;
549 54 : event->fn = fn;
550 54 : event->arg1 = arg1;
551 54 : event->arg2 = arg2;
552 :
553 108 : return event;
554 54 : }
555 :
556 : void
557 54 : spdk_event_call(struct spdk_event *event)
558 : {
559 54 : int rc;
560 54 : struct spdk_reactor *reactor;
561 54 : struct spdk_reactor *local_reactor = NULL;
562 54 : uint32_t current_core = spdk_env_get_current_core();
563 :
564 54 : reactor = spdk_reactor_get(event->lcore);
565 :
566 54 : assert(reactor != NULL);
567 54 : assert(reactor->events != NULL);
568 :
569 54 : rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
570 54 : if (rc != 1) {
571 0 : assert(false);
572 : }
573 :
574 54 : if (current_core != SPDK_ENV_LCORE_ID_ANY) {
575 54 : local_reactor = spdk_reactor_get(current_core);
576 54 : }
577 :
578 : /* If spdk_event_call isn't called on a reactor, always send a notification.
579 : * If it is called on a reactor, send a notification if the destination reactor
580 : * is indicated in interrupt mode state.
581 : */
582 108 : if (spdk_unlikely(local_reactor == NULL) ||
583 54 : spdk_unlikely(spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, event->lcore))) {
584 4 : uint64_t notify = 1;
585 :
586 4 : rc = write(reactor->events_fd, ¬ify, sizeof(notify));
587 4 : if (rc < 0) {
588 0 : SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
589 0 : }
590 4 : }
591 54 : }
592 :
593 : static inline int
594 147 : event_queue_run_batch(void *arg)
595 : {
596 147 : struct spdk_reactor *reactor = arg;
597 147 : size_t count, i;
598 147 : void *events[SPDK_EVENT_BATCH_SIZE];
599 :
600 : #ifdef DEBUG
601 : /*
602 : * spdk_ring_dequeue() fills events and returns how many entries it wrote,
603 : * so we will never actually read uninitialized data from events, but just to be sure
604 : * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
605 : */
606 147 : memset(events, 0, sizeof(events));
607 : #endif
608 :
609 : /* Operate event notification if this reactor currently runs in interrupt state */
610 147 : if (spdk_unlikely(reactor->in_interrupt)) {
611 3 : uint64_t notify = 1;
612 3 : int rc;
613 :
614 3 : count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
615 :
616 3 : if (spdk_ring_count(reactor->events) != 0) {
617 : /* Trigger new notification if there are still events in event-queue waiting for processing. */
618 0 : rc = write(reactor->events_fd, ¬ify, sizeof(notify));
619 0 : if (rc < 0) {
620 0 : SPDK_ERRLOG("failed to notify event queue: %s.\n", spdk_strerror(errno));
621 0 : return -errno;
622 : }
623 0 : }
624 3 : } else {
625 144 : count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
626 : }
627 :
628 147 : if (count == 0) {
629 95 : return 0;
630 : }
631 :
632 106 : for (i = 0; i < count; i++) {
633 54 : struct spdk_event *event = events[i];
634 :
635 54 : assert(event != NULL);
636 54 : assert(spdk_get_thread() == NULL);
637 : SPDK_DTRACE_PROBE3(event_exec, event->fn,
638 : event->arg1, event->arg2);
639 54 : event->fn(event->arg1, event->arg2);
640 54 : }
641 :
642 52 : spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
643 :
644 52 : return (int)count;
645 147 : }
646 :
647 : /* 1s */
648 : #define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
649 :
650 : static int
651 8 : get_rusage(struct spdk_reactor *reactor)
652 : {
653 8 : struct rusage rusage;
654 :
655 8 : if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
656 0 : return -1;
657 : }
658 :
659 8 : if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
660 0 : SPDK_INFOLOG(reactor,
661 : "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
662 : reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
663 : rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
664 0 : }
665 8 : reactor->rusage = rusage;
666 :
667 8 : return -1;
668 8 : }
669 :
670 : void
671 0 : spdk_framework_enable_context_switch_monitor(bool enable)
672 : {
673 : /* This global is being read by multiple threads, so this isn't
674 : * strictly thread safe. However, we're toggling between true and
675 : * false here, and if a thread sees the value update later than it
676 : * should, it's no big deal. */
677 0 : g_framework_context_switch_monitor_enabled = enable;
678 0 : }
679 :
680 : bool
681 0 : spdk_framework_context_switch_monitor_enabled(void)
682 : {
683 0 : return g_framework_context_switch_monitor_enabled;
684 : }
685 :
686 : static void
687 9 : _set_thread_name(const char *thread_name)
688 : {
689 : #if defined(__linux__)
690 9 : prctl(PR_SET_NAME, thread_name, 0, 0, 0);
691 : #elif defined(__FreeBSD__)
692 : pthread_set_name_np(pthread_self(), thread_name);
693 : #else
694 : pthread_setname_np(pthread_self(), thread_name);
695 : #endif
696 9 : }
697 :
698 : static void
699 15 : _init_thread_stats(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
700 : {
701 15 : struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
702 15 : struct spdk_thread_stats prev_total_stats;
703 :
704 : /* Read total_stats before updating it to calculate stats during the last scheduling period. */
705 15 : prev_total_stats = lw_thread->total_stats;
706 :
707 15 : spdk_set_thread(thread);
708 15 : spdk_thread_get_stats(&lw_thread->total_stats);
709 15 : spdk_set_thread(NULL);
710 :
711 15 : lw_thread->current_stats.busy_tsc = lw_thread->total_stats.busy_tsc - prev_total_stats.busy_tsc;
712 15 : lw_thread->current_stats.idle_tsc = lw_thread->total_stats.idle_tsc - prev_total_stats.idle_tsc;
713 15 : }
714 :
715 : static void
716 8 : _threads_reschedule_thread(struct spdk_scheduler_thread_info *thread_info)
717 : {
718 8 : struct spdk_lw_thread *lw_thread;
719 8 : struct spdk_thread *thread;
720 :
721 8 : thread = spdk_thread_get_by_id(thread_info->thread_id);
722 8 : if (thread == NULL) {
723 : /* Thread no longer exists. */
724 0 : return;
725 : }
726 8 : lw_thread = spdk_thread_get_ctx(thread);
727 8 : assert(lw_thread != NULL);
728 :
729 8 : lw_thread->lcore = thread_info->lcore;
730 8 : lw_thread->resched = true;
731 8 : }
732 :
733 : static void
734 6 : _threads_reschedule(struct spdk_scheduler_core_info *cores_info)
735 : {
736 6 : struct spdk_scheduler_core_info *core;
737 6 : struct spdk_scheduler_thread_info *thread_info;
738 6 : uint32_t i, j;
739 :
740 21 : SPDK_ENV_FOREACH_CORE(i) {
741 15 : core = &cores_info[i];
742 30 : for (j = 0; j < core->threads_count; j++) {
743 15 : thread_info = &core->thread_infos[j];
744 15 : if (thread_info->lcore != i) {
745 8 : if (core->isolated || cores_info[thread_info->lcore].isolated) {
746 0 : SPDK_ERRLOG("A thread cannot be moved from an isolated core or \
747 : moved to an isolated core. Skip rescheduling thread\n");
748 0 : continue;
749 : }
750 8 : _threads_reschedule_thread(thread_info);
751 8 : }
752 15 : }
753 15 : core->threads_count = 0;
754 15 : free(core->thread_infos);
755 15 : core->thread_infos = NULL;
756 15 : }
757 6 : }
758 :
759 : static void
760 6 : _reactors_scheduler_fini(void)
761 : {
762 : /* Reschedule based on the balancing output */
763 6 : _threads_reschedule(g_core_infos);
764 :
765 6 : g_scheduling_in_progress = false;
766 6 : }
767 :
768 : static void
769 8 : _reactors_scheduler_update_core_mode(void *ctx1, void *ctx2)
770 : {
771 8 : struct spdk_reactor *reactor;
772 8 : uint32_t i;
773 8 : int rc = 0;
774 :
775 21 : for (i = g_scheduler_core_number; i < SPDK_ENV_LCORE_ID_ANY; i = spdk_env_get_next_core(i)) {
776 15 : reactor = spdk_reactor_get(i);
777 15 : assert(reactor != NULL);
778 15 : if (reactor->in_interrupt != g_core_infos[i].interrupt_mode) {
779 : /* Switch next found reactor to new state */
780 2 : rc = spdk_reactor_set_interrupt_mode(i, g_core_infos[i].interrupt_mode,
781 : _reactors_scheduler_update_core_mode, NULL);
782 2 : if (rc == 0) {
783 : /* Set core to start with after callback completes */
784 2 : g_scheduler_core_number = spdk_env_get_next_core(i);
785 2 : return;
786 : }
787 0 : }
788 13 : }
789 6 : _reactors_scheduler_fini();
790 8 : }
791 :
792 : static void
793 0 : _reactors_scheduler_cancel(void *arg1, void *arg2)
794 : {
795 0 : struct spdk_scheduler_core_info *core;
796 0 : uint32_t i;
797 :
798 0 : SPDK_ENV_FOREACH_CORE(i) {
799 0 : core = &g_core_infos[i];
800 0 : core->threads_count = 0;
801 0 : free(core->thread_infos);
802 0 : core->thread_infos = NULL;
803 0 : }
804 :
805 0 : g_scheduling_in_progress = false;
806 0 : }
807 :
808 : static void
809 6 : _reactors_scheduler_balance(void *arg1, void *arg2)
810 : {
811 6 : struct spdk_scheduler *scheduler = spdk_scheduler_get();
812 :
813 6 : if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING || scheduler == NULL) {
814 0 : _reactors_scheduler_cancel(NULL, NULL);
815 0 : return;
816 : }
817 :
818 6 : scheduler->balance(g_core_infos, g_reactor_count);
819 :
820 6 : g_scheduler_core_number = spdk_env_get_first_core();
821 6 : _reactors_scheduler_update_core_mode(NULL, NULL);
822 6 : }
823 :
824 : /* Phase 1 of thread scheduling is to gather metrics on the existing threads */
825 : static void
826 15 : _reactors_scheduler_gather_metrics(void *arg1, void *arg2)
827 : {
828 15 : struct spdk_scheduler_core_info *core_info;
829 15 : struct spdk_lw_thread *lw_thread;
830 15 : struct spdk_thread *thread;
831 15 : struct spdk_reactor *reactor;
832 15 : uint32_t next_core;
833 15 : uint32_t i = 0;
834 :
835 15 : reactor = spdk_reactor_get(spdk_env_get_current_core());
836 15 : assert(reactor != NULL);
837 15 : core_info = &g_core_infos[reactor->lcore];
838 15 : core_info->lcore = reactor->lcore;
839 15 : core_info->current_idle_tsc = reactor->idle_tsc - core_info->total_idle_tsc;
840 15 : core_info->total_idle_tsc = reactor->idle_tsc;
841 15 : core_info->current_busy_tsc = reactor->busy_tsc - core_info->total_busy_tsc;
842 15 : core_info->total_busy_tsc = reactor->busy_tsc;
843 15 : core_info->interrupt_mode = reactor->in_interrupt;
844 15 : core_info->threads_count = 0;
845 15 : core_info->isolated = scheduler_is_isolated_core(reactor->lcore);
846 :
847 15 : SPDK_DEBUGLOG(reactor, "Gathering metrics on %u\n", reactor->lcore);
848 :
849 15 : spdk_trace_record(TRACE_SCHEDULER_CORE_STATS, reactor->trace_id, 0, 0,
850 : core_info->current_busy_tsc,
851 : core_info->current_idle_tsc);
852 :
853 15 : if (reactor->thread_count > 0) {
854 11 : core_info->thread_infos = calloc(reactor->thread_count, sizeof(*core_info->thread_infos));
855 11 : if (core_info->thread_infos == NULL) {
856 0 : SPDK_ERRLOG("Failed to allocate memory when gathering metrics on %u\n", reactor->lcore);
857 :
858 : /* Cancel this round of schedule work */
859 0 : _event_call(spdk_scheduler_get_scheduling_lcore(), _reactors_scheduler_cancel, NULL, NULL);
860 0 : return;
861 : }
862 :
863 26 : TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
864 15 : _init_thread_stats(reactor, lw_thread);
865 :
866 15 : core_info->thread_infos[i].lcore = lw_thread->lcore;
867 15 : thread = spdk_thread_get_from_ctx(lw_thread);
868 15 : assert(thread != NULL);
869 15 : core_info->thread_infos[i].thread_id = spdk_thread_get_id(thread);
870 15 : core_info->thread_infos[i].total_stats = lw_thread->total_stats;
871 15 : core_info->thread_infos[i].current_stats = lw_thread->current_stats;
872 15 : core_info->threads_count++;
873 15 : assert(core_info->threads_count <= reactor->thread_count);
874 :
875 15 : spdk_trace_record(TRACE_SCHEDULER_THREAD_STATS, spdk_thread_get_trace_id(thread), 0, 0,
876 : lw_thread->current_stats.busy_tsc,
877 : lw_thread->current_stats.idle_tsc);
878 :
879 15 : i++;
880 15 : }
881 11 : }
882 :
883 15 : next_core = spdk_env_get_next_core(reactor->lcore);
884 15 : if (next_core == UINT32_MAX) {
885 6 : next_core = spdk_env_get_first_core();
886 6 : }
887 :
888 : /* If we've looped back around to the scheduler thread, move to the next phase */
889 15 : if (next_core == spdk_scheduler_get_scheduling_lcore()) {
890 : /* Phase 2 of scheduling is rebalancing - deciding which threads to move where */
891 6 : _event_call(next_core, _reactors_scheduler_balance, NULL, NULL);
892 6 : return;
893 : }
894 :
895 9 : _event_call(next_core, _reactors_scheduler_gather_metrics, NULL, NULL);
896 15 : }
897 :
898 : static int _reactor_schedule_thread(struct spdk_thread *thread);
899 : static uint64_t g_rusage_period;
900 :
901 : static void
902 20 : _reactor_remove_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
903 : {
904 20 : struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
905 20 : struct spdk_fd_group *grp;
906 :
907 20 : TAILQ_REMOVE(&reactor->threads, lw_thread, link);
908 20 : assert(reactor->thread_count > 0);
909 20 : reactor->thread_count--;
910 :
911 : /* Operate thread intr if running with full interrupt ability */
912 20 : if (spdk_interrupt_mode_is_enabled()) {
913 0 : if (reactor->in_interrupt) {
914 0 : grp = spdk_thread_get_interrupt_fd_group(thread);
915 0 : spdk_fd_group_unnest(reactor->fgrp, grp);
916 0 : }
917 0 : }
918 20 : }
919 :
920 : static bool
921 59 : reactor_post_process_lw_thread(struct spdk_reactor *reactor, struct spdk_lw_thread *lw_thread)
922 : {
923 59 : struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
924 :
925 59 : if (spdk_unlikely(spdk_thread_is_exited(thread) &&
926 : spdk_thread_is_idle(thread))) {
927 12 : _reactor_remove_lw_thread(reactor, lw_thread);
928 12 : spdk_thread_destroy(thread);
929 12 : return true;
930 : }
931 :
932 47 : if (spdk_unlikely(lw_thread->resched && !spdk_thread_is_bound(thread))) {
933 8 : lw_thread->resched = false;
934 8 : _reactor_remove_lw_thread(reactor, lw_thread);
935 8 : _reactor_schedule_thread(thread);
936 8 : return true;
937 : }
938 :
939 39 : return false;
940 59 : }
941 :
942 : static void
943 0 : reactor_interrupt_run(struct spdk_reactor *reactor)
944 : {
945 0 : int block_timeout = -1; /* _EPOLL_WAIT_FOREVER */
946 :
947 0 : spdk_fd_group_wait(reactor->fgrp, block_timeout);
948 0 : }
949 :
950 : static void
951 45 : _reactor_run(struct spdk_reactor *reactor)
952 : {
953 45 : struct spdk_thread *thread;
954 45 : struct spdk_lw_thread *lw_thread, *tmp;
955 45 : uint64_t now;
956 45 : int rc;
957 :
958 45 : event_queue_run_batch(reactor);
959 :
960 : /* If no threads are present on the reactor,
961 : * tsc_last gets outdated. Update it to track
962 : * thread execution time correctly. */
963 45 : if (spdk_unlikely(TAILQ_EMPTY(&reactor->threads))) {
964 4 : now = spdk_get_ticks();
965 4 : reactor->idle_tsc += now - reactor->tsc_last;
966 4 : reactor->tsc_last = now;
967 4 : return;
968 : }
969 :
970 100 : TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
971 59 : thread = spdk_thread_get_from_ctx(lw_thread);
972 59 : rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
973 :
974 59 : now = spdk_thread_get_last_tsc(thread);
975 59 : if (rc == 0) {
976 51 : reactor->idle_tsc += now - reactor->tsc_last;
977 59 : } else if (rc > 0) {
978 8 : reactor->busy_tsc += now - reactor->tsc_last;
979 8 : }
980 59 : reactor->tsc_last = now;
981 :
982 59 : reactor_post_process_lw_thread(reactor, lw_thread);
983 59 : }
984 45 : }
985 :
986 : static int
987 9 : reactor_run(void *arg)
988 : {
989 9 : struct spdk_reactor *reactor = arg;
990 9 : struct spdk_thread *thread;
991 9 : struct spdk_lw_thread *lw_thread, *tmp;
992 9 : char thread_name[32];
993 9 : uint64_t last_sched = 0;
994 :
995 9 : SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
996 :
997 : /* Rename the POSIX thread because the reactor is tied to the POSIX
998 : * thread in the SPDK event library.
999 : */
1000 9 : snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
1001 9 : _set_thread_name(thread_name);
1002 :
1003 9 : reactor->trace_id = spdk_trace_register_owner(OWNER_TYPE_REACTOR, thread_name);
1004 :
1005 9 : reactor->tsc_last = spdk_get_ticks();
1006 :
1007 9 : while (1) {
1008 : /* Execute interrupt process fn if this reactor currently runs in interrupt state */
1009 9 : if (spdk_unlikely(reactor->in_interrupt)) {
1010 0 : reactor_interrupt_run(reactor);
1011 0 : } else {
1012 9 : _reactor_run(reactor);
1013 : }
1014 :
1015 9 : if (g_framework_context_switch_monitor_enabled) {
1016 9 : if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
1017 8 : get_rusage(reactor);
1018 8 : reactor->last_rusage = reactor->tsc_last;
1019 8 : }
1020 9 : }
1021 :
1022 9 : if (spdk_unlikely(g_scheduler_period_in_tsc > 0 &&
1023 : (reactor->tsc_last - last_sched) > g_scheduler_period_in_tsc &&
1024 : reactor == g_scheduling_reactor &&
1025 : !g_scheduling_in_progress)) {
1026 0 : last_sched = reactor->tsc_last;
1027 0 : g_scheduling_in_progress = true;
1028 0 : spdk_trace_record(TRACE_SCHEDULER_PERIOD_START, 0, 0, 0);
1029 0 : _reactors_scheduler_gather_metrics(NULL, NULL);
1030 0 : }
1031 :
1032 9 : if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
1033 9 : break;
1034 : }
1035 : }
1036 :
1037 9 : TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
1038 0 : thread = spdk_thread_get_from_ctx(lw_thread);
1039 : /* All threads should have already had spdk_thread_exit() called on them, except
1040 : * for the app thread.
1041 : */
1042 0 : if (spdk_thread_is_running(thread)) {
1043 0 : if (!spdk_thread_is_app_thread(thread)) {
1044 0 : SPDK_ERRLOG("spdk_thread_exit() was not called on thread '%s'\n",
1045 : spdk_thread_get_name(thread));
1046 0 : SPDK_ERRLOG("This will result in a non-zero exit code in a future release.\n");
1047 0 : }
1048 0 : spdk_set_thread(thread);
1049 0 : spdk_thread_exit(thread);
1050 0 : }
1051 0 : }
1052 :
1053 9 : while (!TAILQ_EMPTY(&reactor->threads)) {
1054 0 : TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1055 0 : thread = spdk_thread_get_from_ctx(lw_thread);
1056 0 : spdk_set_thread(thread);
1057 0 : if (spdk_thread_is_exited(thread)) {
1058 0 : _reactor_remove_lw_thread(reactor, lw_thread);
1059 0 : spdk_thread_destroy(thread);
1060 0 : } else {
1061 0 : if (spdk_unlikely(reactor->in_interrupt)) {
1062 0 : reactor_interrupt_run(reactor);
1063 0 : } else {
1064 0 : spdk_thread_poll(thread, 0, 0);
1065 : }
1066 : }
1067 0 : }
1068 : }
1069 :
1070 9 : return 0;
1071 9 : }
1072 :
1073 : int
1074 0 : spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
1075 : {
1076 0 : int ret;
1077 0 : const struct spdk_cpuset *validmask;
1078 :
1079 0 : ret = spdk_cpuset_parse(cpumask, mask);
1080 0 : if (ret < 0) {
1081 0 : return ret;
1082 : }
1083 :
1084 0 : validmask = spdk_app_get_core_mask();
1085 0 : spdk_cpuset_and(cpumask, validmask);
1086 :
1087 0 : return 0;
1088 0 : }
1089 :
1090 : const struct spdk_cpuset *
1091 6 : spdk_app_get_core_mask(void)
1092 : {
1093 6 : return &g_reactor_core_mask;
1094 : }
1095 :
1096 : void
1097 0 : spdk_reactors_start(void)
1098 : {
1099 0 : struct spdk_reactor *reactor;
1100 0 : uint32_t i, current_core;
1101 0 : int rc;
1102 :
1103 0 : g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
1104 0 : g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
1105 : /* Reinitialize to false, in case the app framework is restarting in the same process. */
1106 0 : g_stopping_reactors = false;
1107 :
1108 0 : current_core = spdk_env_get_current_core();
1109 0 : SPDK_ENV_FOREACH_CORE(i) {
1110 0 : if (i != current_core) {
1111 0 : reactor = spdk_reactor_get(i);
1112 0 : if (reactor == NULL) {
1113 0 : continue;
1114 : }
1115 :
1116 0 : rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
1117 0 : if (rc < 0) {
1118 0 : SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
1119 0 : assert(false);
1120 : return;
1121 : }
1122 0 : }
1123 0 : spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
1124 0 : }
1125 :
1126 : /* Start the main reactor */
1127 0 : reactor = spdk_reactor_get(current_core);
1128 0 : assert(reactor != NULL);
1129 0 : reactor_run(reactor);
1130 :
1131 0 : spdk_env_thread_wait_all();
1132 :
1133 0 : g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
1134 0 : }
1135 :
1136 : static void
1137 0 : _reactors_stop(void *arg1, void *arg2)
1138 : {
1139 0 : uint32_t i;
1140 0 : int rc;
1141 0 : struct spdk_reactor *reactor;
1142 0 : struct spdk_reactor *local_reactor;
1143 0 : uint64_t notify = 1;
1144 :
1145 0 : g_reactor_state = SPDK_REACTOR_STATE_EXITING;
1146 0 : local_reactor = spdk_reactor_get(spdk_env_get_current_core());
1147 :
1148 0 : SPDK_ENV_FOREACH_CORE(i) {
1149 : /* If spdk_event_call isn't called on a reactor, always send a notification.
1150 : * If it is called on a reactor, send a notification if the destination reactor
1151 : * is indicated in interrupt mode state.
1152 : */
1153 0 : if (local_reactor == NULL || spdk_cpuset_get_cpu(&local_reactor->notify_cpuset, i)) {
1154 0 : reactor = spdk_reactor_get(i);
1155 0 : assert(reactor != NULL);
1156 0 : rc = write(reactor->events_fd, ¬ify, sizeof(notify));
1157 0 : if (rc < 0) {
1158 0 : SPDK_ERRLOG("failed to notify event queue for reactor(%u): %s.\n", i, spdk_strerror(errno));
1159 0 : continue;
1160 : }
1161 0 : }
1162 0 : }
1163 0 : }
1164 :
1165 : static void
1166 0 : nop(void *arg1, void *arg2)
1167 : {
1168 0 : }
1169 :
1170 : void
1171 0 : spdk_reactors_stop(void *arg1)
1172 : {
1173 0 : spdk_for_each_reactor(nop, NULL, NULL, _reactors_stop);
1174 0 : }
1175 :
1176 : static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
1177 : static uint32_t g_next_core = UINT32_MAX;
1178 :
1179 : static void
1180 22 : _schedule_thread(void *arg1, void *arg2)
1181 : {
1182 22 : struct spdk_lw_thread *lw_thread = arg1;
1183 22 : struct spdk_thread *thread;
1184 22 : struct spdk_reactor *reactor;
1185 22 : uint32_t current_core;
1186 22 : struct spdk_fd_group *grp;
1187 :
1188 22 : current_core = spdk_env_get_current_core();
1189 22 : reactor = spdk_reactor_get(current_core);
1190 22 : assert(reactor != NULL);
1191 :
1192 : /* Update total_stats to reflect state of thread
1193 : * at the end of the move. */
1194 22 : thread = spdk_thread_get_from_ctx(lw_thread);
1195 22 : spdk_set_thread(thread);
1196 22 : spdk_thread_get_stats(&lw_thread->total_stats);
1197 22 : spdk_set_thread(NULL);
1198 :
1199 22 : if (lw_thread->initial_lcore == SPDK_ENV_LCORE_ID_ANY) {
1200 14 : lw_thread->initial_lcore = current_core;
1201 14 : }
1202 22 : lw_thread->lcore = current_core;
1203 :
1204 22 : TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
1205 22 : reactor->thread_count++;
1206 :
1207 : /* Operate thread intr if running with full interrupt ability */
1208 22 : if (spdk_interrupt_mode_is_enabled()) {
1209 0 : int rc;
1210 :
1211 0 : if (reactor->in_interrupt) {
1212 0 : grp = spdk_thread_get_interrupt_fd_group(thread);
1213 0 : rc = spdk_fd_group_nest(reactor->fgrp, grp);
1214 0 : if (rc < 0) {
1215 0 : SPDK_ERRLOG("Failed to schedule spdk_thread: %s.\n", spdk_strerror(-rc));
1216 0 : }
1217 0 : }
1218 :
1219 : /* Align spdk_thread with reactor to interrupt mode or poll mode */
1220 0 : spdk_thread_send_msg(thread, _reactor_set_thread_interrupt_mode, reactor);
1221 0 : }
1222 22 : }
1223 :
1224 : static int
1225 22 : _reactor_schedule_thread(struct spdk_thread *thread)
1226 : {
1227 22 : uint32_t core, initial_core;
1228 22 : struct spdk_lw_thread *lw_thread;
1229 22 : struct spdk_event *evt = NULL;
1230 22 : struct spdk_cpuset *cpumask;
1231 22 : uint32_t i;
1232 22 : struct spdk_reactor *local_reactor = NULL;
1233 22 : uint32_t current_lcore = spdk_env_get_current_core();
1234 22 : struct spdk_cpuset polling_cpumask;
1235 22 : struct spdk_cpuset valid_cpumask;
1236 :
1237 22 : cpumask = spdk_thread_get_cpumask(thread);
1238 :
1239 22 : lw_thread = spdk_thread_get_ctx(thread);
1240 22 : assert(lw_thread != NULL);
1241 22 : core = lw_thread->lcore;
1242 22 : initial_core = lw_thread->initial_lcore;
1243 22 : memset(lw_thread, 0, sizeof(*lw_thread));
1244 22 : lw_thread->initial_lcore = initial_core;
1245 :
1246 22 : if (current_lcore != SPDK_ENV_LCORE_ID_ANY) {
1247 22 : local_reactor = spdk_reactor_get(current_lcore);
1248 22 : assert(local_reactor);
1249 22 : }
1250 :
1251 : /* When interrupt ability of spdk_thread is not enabled and the current
1252 : * reactor runs on DPDK thread, skip reactors which are in interrupt mode.
1253 : */
1254 22 : if (!spdk_interrupt_mode_is_enabled() && local_reactor != NULL) {
1255 : /* Get the cpumask of all reactors in polling */
1256 22 : spdk_cpuset_zero(&polling_cpumask);
1257 78 : SPDK_ENV_FOREACH_CORE(i) {
1258 56 : spdk_cpuset_set_cpu(&polling_cpumask, i, true);
1259 56 : }
1260 22 : spdk_cpuset_xor(&polling_cpumask, &local_reactor->notify_cpuset);
1261 :
1262 22 : if (core == SPDK_ENV_LCORE_ID_ANY) {
1263 : /* Get the cpumask of all valid reactors which are suggested and also in polling */
1264 15 : spdk_cpuset_copy(&valid_cpumask, &polling_cpumask);
1265 15 : spdk_cpuset_and(&valid_cpumask, spdk_thread_get_cpumask(thread));
1266 :
1267 : /* If there are any valid reactors, spdk_thread should be scheduled
1268 : * into one of the valid reactors.
1269 : * If there is no valid reactors, spdk_thread should be scheduled
1270 : * into one of the polling reactors.
1271 : */
1272 15 : if (spdk_cpuset_count(&valid_cpumask) != 0) {
1273 15 : cpumask = &valid_cpumask;
1274 15 : } else {
1275 0 : cpumask = &polling_cpumask;
1276 : }
1277 22 : } else if (!spdk_cpuset_get_cpu(&polling_cpumask, core)) {
1278 : /* If specified reactor is not in polling, spdk_thread should be scheduled
1279 : * into one of the polling reactors.
1280 : */
1281 0 : core = SPDK_ENV_LCORE_ID_ANY;
1282 0 : cpumask = &polling_cpumask;
1283 0 : }
1284 22 : }
1285 :
1286 22 : pthread_mutex_lock(&g_scheduler_mtx);
1287 22 : if (core == SPDK_ENV_LCORE_ID_ANY) {
1288 20 : for (i = 0; i < spdk_env_get_core_count(); i++) {
1289 20 : if (g_next_core >= g_reactor_count) {
1290 6 : g_next_core = spdk_env_get_first_core();
1291 6 : }
1292 20 : core = g_next_core;
1293 20 : g_next_core = spdk_env_get_next_core(g_next_core);
1294 :
1295 20 : if (spdk_cpuset_get_cpu(cpumask, core)) {
1296 15 : break;
1297 : }
1298 5 : }
1299 15 : }
1300 :
1301 22 : evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
1302 :
1303 22 : if (current_lcore != core) {
1304 9 : spdk_trace_record(TRACE_SCHEDULER_MOVE_THREAD, spdk_thread_get_trace_id(thread), 0, 0,
1305 : current_lcore, core);
1306 9 : }
1307 :
1308 22 : pthread_mutex_unlock(&g_scheduler_mtx);
1309 :
1310 22 : assert(evt != NULL);
1311 22 : if (evt == NULL) {
1312 0 : SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
1313 0 : return -1;
1314 : }
1315 :
1316 22 : lw_thread->tsc_start = spdk_get_ticks();
1317 :
1318 22 : spdk_event_call(evt);
1319 :
1320 22 : return 0;
1321 22 : }
1322 :
1323 : static void
1324 2 : _reactor_request_thread_reschedule(struct spdk_thread *thread)
1325 : {
1326 2 : struct spdk_lw_thread *lw_thread;
1327 2 : struct spdk_reactor *reactor;
1328 2 : uint32_t current_core;
1329 :
1330 2 : assert(thread == spdk_get_thread());
1331 :
1332 2 : lw_thread = spdk_thread_get_ctx(thread);
1333 :
1334 2 : assert(lw_thread != NULL);
1335 2 : lw_thread->resched = true;
1336 2 : lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1337 :
1338 2 : current_core = spdk_env_get_current_core();
1339 2 : reactor = spdk_reactor_get(current_core);
1340 2 : assert(reactor != NULL);
1341 :
1342 : /* Send a notification if the destination reactor is indicated in intr mode state */
1343 2 : if (spdk_unlikely(spdk_cpuset_get_cpu(&reactor->notify_cpuset, reactor->lcore))) {
1344 0 : uint64_t notify = 1;
1345 :
1346 0 : if (write(reactor->resched_fd, ¬ify, sizeof(notify)) < 0) {
1347 0 : SPDK_ERRLOG("failed to notify reschedule: %s.\n", spdk_strerror(errno));
1348 0 : }
1349 0 : }
1350 2 : }
1351 :
1352 : static int
1353 16 : reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
1354 : {
1355 16 : struct spdk_lw_thread *lw_thread;
1356 :
1357 16 : switch (op) {
1358 : case SPDK_THREAD_OP_NEW:
1359 14 : lw_thread = spdk_thread_get_ctx(thread);
1360 14 : lw_thread->lcore = SPDK_ENV_LCORE_ID_ANY;
1361 14 : lw_thread->initial_lcore = SPDK_ENV_LCORE_ID_ANY;
1362 14 : return _reactor_schedule_thread(thread);
1363 : case SPDK_THREAD_OP_RESCHED:
1364 2 : _reactor_request_thread_reschedule(thread);
1365 2 : return 0;
1366 : default:
1367 0 : return -ENOTSUP;
1368 : }
1369 16 : }
1370 :
1371 : static bool
1372 16 : reactor_thread_op_supported(enum spdk_thread_op op)
1373 : {
1374 16 : switch (op) {
1375 : case SPDK_THREAD_OP_NEW:
1376 : case SPDK_THREAD_OP_RESCHED:
1377 16 : return true;
1378 : default:
1379 0 : return false;
1380 : }
1381 16 : }
1382 :
1383 : struct call_reactor {
1384 : uint32_t cur_core;
1385 : spdk_event_fn fn;
1386 : void *arg1;
1387 : void *arg2;
1388 :
1389 : uint32_t orig_core;
1390 : spdk_event_fn cpl;
1391 : };
1392 :
1393 : static void
1394 9 : on_reactor(void *arg1, void *arg2)
1395 : {
1396 9 : struct call_reactor *cr = arg1;
1397 9 : struct spdk_event *evt;
1398 :
1399 9 : cr->fn(cr->arg1, cr->arg2);
1400 :
1401 9 : cr->cur_core = spdk_env_get_next_core(cr->cur_core);
1402 :
1403 9 : if (cr->cur_core >= g_reactor_count) {
1404 3 : SPDK_DEBUGLOG(reactor, "Completed reactor iteration\n");
1405 :
1406 3 : evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
1407 3 : free(cr);
1408 3 : } else {
1409 6 : SPDK_DEBUGLOG(reactor, "Continuing reactor iteration to %d\n",
1410 : cr->cur_core);
1411 :
1412 6 : evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
1413 : }
1414 9 : assert(evt != NULL);
1415 9 : spdk_event_call(evt);
1416 9 : }
1417 :
1418 : void
1419 3 : spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
1420 : {
1421 3 : struct call_reactor *cr;
1422 :
1423 : /* When the application framework is shutting down, we will send one
1424 : * final for_each_reactor operation with completion callback _reactors_stop,
1425 : * to flush any existing for_each_reactor operations to avoid any memory
1426 : * leaks. We use a mutex here to protect a boolean flag that will ensure
1427 : * we don't start any more operations once we've started shutting down.
1428 : */
1429 3 : pthread_mutex_lock(&g_stopping_reactors_mtx);
1430 3 : if (g_stopping_reactors) {
1431 0 : pthread_mutex_unlock(&g_stopping_reactors_mtx);
1432 0 : return;
1433 3 : } else if (cpl == _reactors_stop) {
1434 0 : g_stopping_reactors = true;
1435 0 : }
1436 3 : pthread_mutex_unlock(&g_stopping_reactors_mtx);
1437 :
1438 3 : cr = calloc(1, sizeof(*cr));
1439 3 : if (!cr) {
1440 0 : SPDK_ERRLOG("Unable to perform reactor iteration\n");
1441 0 : cpl(arg1, arg2);
1442 0 : return;
1443 : }
1444 :
1445 3 : cr->fn = fn;
1446 3 : cr->arg1 = arg1;
1447 3 : cr->arg2 = arg2;
1448 3 : cr->cpl = cpl;
1449 3 : cr->orig_core = spdk_env_get_current_core();
1450 3 : cr->cur_core = spdk_env_get_first_core();
1451 :
1452 3 : SPDK_DEBUGLOG(reactor, "Starting reactor iteration from %d\n", cr->orig_core);
1453 :
1454 3 : _event_call(cr->cur_core, on_reactor, cr, NULL);
1455 3 : }
1456 :
1457 : #ifdef __linux__
1458 : static int
1459 0 : reactor_schedule_thread_event(void *arg)
1460 : {
1461 0 : struct spdk_reactor *reactor = arg;
1462 0 : struct spdk_lw_thread *lw_thread, *tmp;
1463 0 : uint32_t count = 0;
1464 :
1465 0 : assert(reactor->in_interrupt);
1466 :
1467 0 : TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
1468 0 : count += reactor_post_process_lw_thread(reactor, lw_thread) ? 1 : 0;
1469 0 : }
1470 :
1471 0 : return count;
1472 0 : }
1473 :
1474 : static int
1475 31 : reactor_interrupt_init(struct spdk_reactor *reactor)
1476 : {
1477 31 : struct spdk_event_handler_opts opts = {};
1478 31 : int rc;
1479 :
1480 31 : rc = spdk_fd_group_create(&reactor->fgrp);
1481 31 : if (rc != 0) {
1482 0 : return rc;
1483 : }
1484 :
1485 31 : reactor->resched_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1486 31 : if (reactor->resched_fd < 0) {
1487 0 : rc = -EBADF;
1488 0 : goto err;
1489 : }
1490 :
1491 31 : spdk_fd_group_get_default_event_handler_opts(&opts, sizeof(opts));
1492 31 : opts.fd_type = SPDK_FD_TYPE_EVENTFD;
1493 :
1494 31 : rc = SPDK_FD_GROUP_ADD_EXT(reactor->fgrp, reactor->resched_fd,
1495 : reactor_schedule_thread_event, reactor, &opts);
1496 31 : if (rc) {
1497 0 : close(reactor->resched_fd);
1498 0 : goto err;
1499 : }
1500 :
1501 31 : reactor->events_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
1502 31 : if (reactor->events_fd < 0) {
1503 0 : spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1504 0 : close(reactor->resched_fd);
1505 :
1506 0 : rc = -EBADF;
1507 0 : goto err;
1508 : }
1509 :
1510 31 : rc = SPDK_FD_GROUP_ADD_EXT(reactor->fgrp, reactor->events_fd,
1511 : event_queue_run_batch, reactor, &opts);
1512 31 : if (rc) {
1513 0 : spdk_fd_group_remove(reactor->fgrp, reactor->resched_fd);
1514 0 : close(reactor->resched_fd);
1515 0 : close(reactor->events_fd);
1516 0 : goto err;
1517 : }
1518 :
1519 31 : return 0;
1520 :
1521 : err:
1522 0 : spdk_fd_group_destroy(reactor->fgrp);
1523 0 : reactor->fgrp = NULL;
1524 0 : return rc;
1525 31 : }
1526 : #else
1527 : static int
1528 : reactor_interrupt_init(struct spdk_reactor *reactor)
1529 : {
1530 : return -ENOTSUP;
1531 : }
1532 : #endif
1533 :
1534 : static void
1535 31 : reactor_interrupt_fini(struct spdk_reactor *reactor)
1536 : {
1537 31 : struct spdk_fd_group *fgrp = reactor->fgrp;
1538 :
1539 31 : if (!fgrp) {
1540 0 : return;
1541 : }
1542 :
1543 31 : spdk_fd_group_remove(fgrp, reactor->events_fd);
1544 31 : spdk_fd_group_remove(fgrp, reactor->resched_fd);
1545 :
1546 31 : close(reactor->events_fd);
1547 31 : close(reactor->resched_fd);
1548 :
1549 31 : spdk_fd_group_destroy(fgrp);
1550 31 : reactor->fgrp = NULL;
1551 31 : }
1552 :
1553 : static struct spdk_governor *
1554 3 : _governor_find(const char *name)
1555 : {
1556 3 : struct spdk_governor *governor, *tmp;
1557 :
1558 3 : TAILQ_FOREACH_SAFE(governor, &g_governor_list, link, tmp) {
1559 1 : if (strcmp(name, governor->name) == 0) {
1560 1 : return governor;
1561 : }
1562 0 : }
1563 :
1564 2 : return NULL;
1565 3 : }
1566 :
1567 : int
1568 2 : spdk_governor_set(const char *name)
1569 : {
1570 2 : struct spdk_governor *governor;
1571 2 : int rc = 0;
1572 :
1573 : /* NULL governor was specifically requested */
1574 2 : if (name == NULL) {
1575 0 : if (g_governor) {
1576 0 : g_governor->deinit();
1577 0 : }
1578 0 : g_governor = NULL;
1579 0 : return 0;
1580 : }
1581 :
1582 2 : governor = _governor_find(name);
1583 2 : if (governor == NULL) {
1584 1 : return -EINVAL;
1585 : }
1586 :
1587 1 : if (g_governor == governor) {
1588 0 : return 0;
1589 : }
1590 :
1591 1 : rc = governor->init();
1592 1 : if (rc == 0) {
1593 1 : if (g_governor) {
1594 0 : g_governor->deinit();
1595 0 : }
1596 1 : g_governor = governor;
1597 1 : }
1598 :
1599 1 : return rc;
1600 2 : }
1601 :
1602 : struct spdk_governor *
1603 10 : spdk_governor_get(void)
1604 : {
1605 10 : return g_governor;
1606 : }
1607 :
1608 : void
1609 1 : spdk_governor_register(struct spdk_governor *governor)
1610 : {
1611 1 : if (_governor_find(governor->name)) {
1612 0 : SPDK_ERRLOG("governor named '%s' already registered.\n", governor->name);
1613 0 : assert(false);
1614 : return;
1615 : }
1616 :
1617 1 : TAILQ_INSERT_TAIL(&g_governor_list, governor, link);
1618 1 : }
1619 :
1620 1 : SPDK_LOG_REGISTER_COMPONENT(reactor)
1621 :
1622 : static void
1623 0 : scheduler_trace(void)
1624 : {
1625 0 : struct spdk_trace_tpoint_opts opts[] = {
1626 : {
1627 : "SCHEDULER_PERIOD_START", TRACE_SCHEDULER_PERIOD_START,
1628 : OWNER_TYPE_NONE, OBJECT_NONE, 0,
1629 : {
1630 :
1631 : }
1632 : },
1633 : {
1634 : "SCHEDULER_CORE_STATS", TRACE_SCHEDULER_CORE_STATS,
1635 : OWNER_TYPE_REACTOR, OBJECT_NONE, 0,
1636 : {
1637 : { "busy", SPDK_TRACE_ARG_TYPE_INT, 8},
1638 : { "idle", SPDK_TRACE_ARG_TYPE_INT, 8}
1639 : }
1640 : },
1641 : {
1642 : "SCHEDULER_THREAD_STATS", TRACE_SCHEDULER_THREAD_STATS,
1643 : OWNER_TYPE_THREAD, OBJECT_NONE, 0,
1644 : {
1645 : { "busy", SPDK_TRACE_ARG_TYPE_INT, 8},
1646 : { "idle", SPDK_TRACE_ARG_TYPE_INT, 8}
1647 : }
1648 : },
1649 : {
1650 : "SCHEDULER_MOVE_THREAD", TRACE_SCHEDULER_MOVE_THREAD,
1651 : OWNER_TYPE_THREAD, OBJECT_NONE, 0,
1652 : {
1653 : { "src", SPDK_TRACE_ARG_TYPE_INT, 8 },
1654 : { "dst", SPDK_TRACE_ARG_TYPE_INT, 8 }
1655 : }
1656 : }
1657 : };
1658 :
1659 0 : spdk_trace_register_owner_type(OWNER_TYPE_REACTOR, 'r');
1660 0 : spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
1661 :
1662 0 : }
1663 :
1664 1 : SPDK_TRACE_REGISTER_FN(scheduler_trace, "scheduler", TRACE_GROUP_SCHEDULER)
|