Line data Source code
1 : /* SPDX-License-Identifier: BSD-3-Clause
2 : * Copyright (C) 2023 Intel Corporation.
3 : * All rights reserved.
4 : */
5 :
6 : #include "spdk/env.h"
7 : #include "spdk/util.h"
8 : #include "spdk/likely.h"
9 : #include "spdk/log.h"
10 : #include "spdk/thread.h"
11 :
12 : #define IOBUF_MIN_SMALL_POOL_SIZE 64
13 : #define IOBUF_MIN_LARGE_POOL_SIZE 8
14 : #define IOBUF_DEFAULT_SMALL_POOL_SIZE 8192
15 : #define IOBUF_DEFAULT_LARGE_POOL_SIZE 1024
16 : #define IOBUF_ALIGNMENT 4096
17 : #define IOBUF_MIN_SMALL_BUFSIZE 4096
18 : #define IOBUF_MIN_LARGE_BUFSIZE 8192
19 : #define IOBUF_DEFAULT_SMALL_BUFSIZE (8 * 1024)
20 : /* 132k is a weird choice at first, but this needs to be large enough to accommodate
21 : * the default maximum size (128k) plus metadata everywhere. For code paths that
22 : * are explicitly configured, the math is instead done properly. This is only
23 : * for the default. */
24 : #define IOBUF_DEFAULT_LARGE_BUFSIZE (132 * 1024)
25 : #define IOBUF_MAX_CHANNELS 64
26 :
27 : SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
28 : "Invalid data offset");
29 :
30 : static bool g_iobuf_is_initialized = false;
31 :
32 : struct iobuf_channel_node {
33 : spdk_iobuf_entry_stailq_t small_queue;
34 : spdk_iobuf_entry_stailq_t large_queue;
35 : };
36 :
37 : struct iobuf_channel {
38 : struct iobuf_channel_node node[SPDK_CONFIG_MAX_NUMA_NODES];
39 : struct spdk_iobuf_channel *channels[IOBUF_MAX_CHANNELS];
40 : };
41 :
42 : struct iobuf_module {
43 : char *name;
44 : TAILQ_ENTRY(iobuf_module) tailq;
45 : };
46 :
47 : struct iobuf_node {
48 : struct spdk_ring *small_pool;
49 : struct spdk_ring *large_pool;
50 : void *small_pool_base;
51 : void *large_pool_base;
52 : };
53 :
54 : struct iobuf {
55 : struct spdk_iobuf_opts opts;
56 : TAILQ_HEAD(, iobuf_module) modules;
57 : spdk_iobuf_finish_cb finish_cb;
58 : void *finish_arg;
59 : struct iobuf_node node[SPDK_CONFIG_MAX_NUMA_NODES];
60 : };
61 :
62 : #define IOBUF_FOREACH_NUMA_ID(i) \
63 : for (i = g_iobuf.opts.enable_numa ? spdk_env_get_first_numa_id() : 0; \
64 : i < INT32_MAX; \
65 : i = g_iobuf.opts.enable_numa ? spdk_env_get_next_numa_id(i) : INT32_MAX)
66 :
67 : static struct iobuf g_iobuf = {
68 : .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
69 : .node = {},
70 : .opts = {
71 : .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
72 : .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
73 : .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
74 : .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
75 : },
76 : };
77 :
78 : struct iobuf_get_stats_ctx {
79 : struct spdk_iobuf_module_stats *modules;
80 : uint32_t num_modules;
81 : spdk_iobuf_get_stats_cb cb_fn;
82 : void *cb_arg;
83 : };
84 :
85 : static int
86 78 : iobuf_channel_create_cb(void *io_device, void *ctx)
87 : {
88 78 : struct iobuf_channel *ch = ctx;
89 : struct iobuf_channel_node *node;
90 : int32_t i;
91 :
92 156 : IOBUF_FOREACH_NUMA_ID(i) {
93 78 : node = &ch->node[i];
94 78 : STAILQ_INIT(&node->small_queue);
95 78 : STAILQ_INIT(&node->large_queue);
96 : }
97 :
98 78 : return 0;
99 : }
100 :
101 : static void
102 78 : iobuf_channel_destroy_cb(void *io_device, void *ctx)
103 : {
104 78 : struct iobuf_channel *ch = ctx;
105 : struct iobuf_channel_node *node __attribute__((unused));
106 : int32_t i;
107 :
108 156 : IOBUF_FOREACH_NUMA_ID(i) {
109 78 : node = &ch->node[i];
110 78 : assert(STAILQ_EMPTY(&node->small_queue));
111 78 : assert(STAILQ_EMPTY(&node->large_queue));
112 : }
113 78 : }
114 :
115 : static int
116 70 : iobuf_node_initialize(struct iobuf_node *node, uint32_t numa_id)
117 : {
118 70 : struct spdk_iobuf_opts *opts = &g_iobuf.opts;
119 70 : struct spdk_iobuf_buffer *buf;
120 : uint64_t i;
121 : int rc;
122 :
123 70 : if (!g_iobuf.opts.enable_numa) {
124 70 : numa_id = SPDK_ENV_NUMA_ID_ANY;
125 : }
126 :
127 70 : node->small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
128 : numa_id);
129 70 : if (!node->small_pool) {
130 0 : SPDK_ERRLOG("Failed to create small iobuf pool\n");
131 0 : rc = -ENOMEM;
132 0 : goto error;
133 : }
134 :
135 70 : node->small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
136 : NULL, numa_id, SPDK_MALLOC_DMA);
137 70 : if (node->small_pool_base == NULL) {
138 0 : SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
139 0 : rc = -ENOMEM;
140 0 : goto error;
141 : }
142 :
143 70 : node->large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
144 : numa_id);
145 70 : if (!node->large_pool) {
146 0 : SPDK_ERRLOG("Failed to create large iobuf pool\n");
147 0 : rc = -ENOMEM;
148 0 : goto error;
149 : }
150 :
151 70 : node->large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
152 : NULL, numa_id, SPDK_MALLOC_DMA);
153 70 : if (node->large_pool_base == NULL) {
154 0 : SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
155 0 : rc = -ENOMEM;
156 0 : goto error;
157 : }
158 :
159 548942 : for (i = 0; i < opts->small_pool_count; i++) {
160 548872 : buf = node->small_pool_base + i * opts->small_bufsize;
161 548872 : spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
162 : }
163 :
164 68686 : for (i = 0; i < opts->large_pool_count; i++) {
165 68616 : buf = node->large_pool_base + i * opts->large_bufsize;
166 68616 : spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
167 : }
168 :
169 70 : return 0;
170 :
171 0 : error:
172 0 : spdk_free(node->small_pool_base);
173 0 : spdk_ring_free(node->small_pool);
174 0 : spdk_free(node->large_pool_base);
175 0 : spdk_ring_free(node->large_pool);
176 0 : memset(node, 0, sizeof(*node));
177 :
178 0 : return rc;
179 : }
180 :
181 : static void
182 70 : iobuf_node_free(struct iobuf_node *node)
183 : {
184 70 : if (node->small_pool == NULL) {
185 : /* This node didn't get allocated, so just return immediately. */
186 0 : return;
187 : }
188 :
189 70 : if (spdk_ring_count(node->small_pool) != g_iobuf.opts.small_pool_count) {
190 7 : SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
191 : spdk_ring_count(node->small_pool), g_iobuf.opts.small_pool_count);
192 : }
193 :
194 70 : if (spdk_ring_count(node->large_pool) != g_iobuf.opts.large_pool_count) {
195 2 : SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
196 : spdk_ring_count(node->large_pool), g_iobuf.opts.large_pool_count);
197 : }
198 :
199 70 : spdk_free(node->small_pool_base);
200 70 : node->small_pool_base = NULL;
201 70 : spdk_ring_free(node->small_pool);
202 70 : node->small_pool = NULL;
203 :
204 70 : spdk_free(node->large_pool_base);
205 70 : node->large_pool_base = NULL;
206 70 : spdk_ring_free(node->large_pool);
207 70 : node->large_pool = NULL;
208 : }
209 :
210 : int
211 70 : spdk_iobuf_initialize(void)
212 : {
213 70 : struct spdk_iobuf_opts *opts = &g_iobuf.opts;
214 : struct iobuf_node *node;
215 : int32_t i;
216 70 : int rc = 0;
217 :
218 : /* Round up to the nearest alignment so that each element remains aligned */
219 70 : opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
220 70 : opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
221 :
222 140 : IOBUF_FOREACH_NUMA_ID(i) {
223 70 : node = &g_iobuf.node[i];
224 70 : rc = iobuf_node_initialize(node, i);
225 70 : if (rc) {
226 0 : goto err;
227 : }
228 : }
229 :
230 70 : spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
231 : sizeof(struct iobuf_channel), "iobuf");
232 70 : g_iobuf_is_initialized = true;
233 :
234 70 : return 0;
235 :
236 0 : err:
237 0 : IOBUF_FOREACH_NUMA_ID(i) {
238 0 : node = &g_iobuf.node[i];
239 0 : iobuf_node_free(node);
240 : }
241 0 : return rc;
242 : }
243 :
244 : static void
245 70 : iobuf_unregister_cb(void *io_device)
246 : {
247 : struct iobuf_module *module;
248 : struct iobuf_node *node;
249 : int32_t i;
250 :
251 142 : while (!TAILQ_EMPTY(&g_iobuf.modules)) {
252 72 : module = TAILQ_FIRST(&g_iobuf.modules);
253 72 : TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
254 72 : free(module->name);
255 72 : free(module);
256 : }
257 :
258 140 : IOBUF_FOREACH_NUMA_ID(i) {
259 70 : node = &g_iobuf.node[i];
260 70 : iobuf_node_free(node);
261 : }
262 :
263 70 : if (g_iobuf.finish_cb != NULL) {
264 70 : g_iobuf.finish_cb(g_iobuf.finish_arg);
265 : }
266 70 : }
267 :
268 : void
269 70 : spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
270 : {
271 70 : if (!g_iobuf_is_initialized) {
272 0 : cb_fn(cb_arg);
273 0 : return;
274 : }
275 :
276 70 : g_iobuf_is_initialized = false;
277 70 : g_iobuf.finish_cb = cb_fn;
278 70 : g_iobuf.finish_arg = cb_arg;
279 :
280 70 : spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
281 : }
282 :
283 : int
284 0 : spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
285 : {
286 0 : if (!opts) {
287 0 : SPDK_ERRLOG("opts cannot be NULL\n");
288 0 : return -1;
289 : }
290 :
291 0 : if (!opts->opts_size) {
292 0 : SPDK_ERRLOG("opts_size inside opts cannot be zero value\n");
293 0 : return -1;
294 : }
295 :
296 0 : if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
297 0 : SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
298 : IOBUF_MIN_SMALL_POOL_SIZE);
299 0 : return -EINVAL;
300 : }
301 0 : if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
302 0 : SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
303 : IOBUF_MIN_LARGE_POOL_SIZE);
304 0 : return -EINVAL;
305 : }
306 :
307 0 : if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
308 0 : SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
309 : IOBUF_MIN_SMALL_BUFSIZE);
310 0 : return -EINVAL;
311 : }
312 :
313 0 : if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
314 0 : SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
315 : IOBUF_MIN_LARGE_BUFSIZE);
316 0 : return -EINVAL;
317 : }
318 :
319 0 : if (opts->enable_numa &&
320 0 : spdk_env_get_last_numa_id() >= SPDK_CONFIG_MAX_NUMA_NODES) {
321 0 : SPDK_ERRLOG("max NUMA ID %" PRIu32 " cannot be supported with "
322 : "SPDK_CONFIG_MAX_NUMA_NODES %" PRIu32 "\n",
323 : spdk_env_get_last_numa_id(), SPDK_CONFIG_MAX_NUMA_NODES);
324 0 : SPDK_ERRLOG("Re-configure with --max-numa-nodes=%" PRIu32 "\n",
325 : spdk_env_get_last_numa_id() + 1);
326 0 : return -EINVAL;
327 : }
328 :
329 : #define SET_FIELD(field) \
330 : if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts->opts_size) { \
331 : g_iobuf.opts.field = opts->field; \
332 : } \
333 :
334 0 : SET_FIELD(small_pool_count);
335 0 : SET_FIELD(large_pool_count);
336 0 : SET_FIELD(small_bufsize);
337 0 : SET_FIELD(large_bufsize);
338 0 : SET_FIELD(enable_numa);
339 :
340 0 : g_iobuf.opts.opts_size = opts->opts_size;
341 :
342 : #undef SET_FIELD
343 :
344 0 : return 0;
345 : }
346 :
347 : void
348 131 : spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size)
349 : {
350 131 : if (!opts) {
351 0 : SPDK_ERRLOG("opts should not be NULL\n");
352 0 : return;
353 : }
354 :
355 131 : if (!opts_size) {
356 0 : SPDK_ERRLOG("opts_size should not be zero value\n");
357 0 : return;
358 : }
359 :
360 131 : opts->opts_size = opts_size;
361 :
362 : #define SET_FIELD(field) \
363 : if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts_size) { \
364 : opts->field = g_iobuf.opts.field; \
365 : } \
366 :
367 131 : SET_FIELD(small_pool_count);
368 131 : SET_FIELD(large_pool_count);
369 131 : SET_FIELD(small_bufsize);
370 131 : SET_FIELD(large_bufsize);
371 131 : SET_FIELD(enable_numa);
372 :
373 : #undef SET_FIELD
374 :
375 : /* Do not remove this statement, you should always update this statement when you adding a new field,
376 : * and do not forget to add the SET_FIELD statement for your added field. */
377 : SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 40, "Incorrect size");
378 : }
379 :
380 : static void
381 85 : iobuf_channel_node_init(struct spdk_iobuf_channel *ch, struct iobuf_channel *iobuf_ch,
382 : int32_t numa_id, uint32_t small_cache_size, uint32_t large_cache_size)
383 : {
384 85 : struct iobuf_node *node = &g_iobuf.node[numa_id];
385 85 : struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
386 85 : struct iobuf_channel_node *ch_node = &iobuf_ch->node[numa_id];
387 :
388 85 : cache->small.queue = &ch_node->small_queue;
389 85 : cache->large.queue = &ch_node->large_queue;
390 85 : cache->small.pool = node->small_pool;
391 85 : cache->large.pool = node->large_pool;
392 85 : cache->small.bufsize = g_iobuf.opts.small_bufsize;
393 85 : cache->large.bufsize = g_iobuf.opts.large_bufsize;
394 85 : cache->small.cache_size = small_cache_size;
395 85 : cache->large.cache_size = large_cache_size;
396 85 : cache->small.cache_count = 0;
397 85 : cache->large.cache_count = 0;
398 :
399 85 : STAILQ_INIT(&cache->small.cache);
400 85 : STAILQ_INIT(&cache->large.cache);
401 85 : }
402 :
403 : static int
404 85 : iobuf_channel_node_populate(struct spdk_iobuf_channel *ch, const char *name, int32_t numa_id)
405 : {
406 85 : struct iobuf_node *node = &g_iobuf.node[numa_id];
407 85 : struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
408 85 : uint32_t small_cache_size = cache->small.cache_size;
409 85 : uint32_t large_cache_size = cache->large.cache_size;
410 85 : struct spdk_iobuf_buffer *buf;
411 : uint32_t i;
412 :
413 9317 : for (i = 0; i < small_cache_size; ++i) {
414 9234 : if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) {
415 2 : SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
416 : "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
417 : name, i, small_cache_size, g_iobuf.opts.small_pool_count);
418 2 : SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
419 : "this value.\n");
420 2 : return -ENOMEM;
421 : }
422 9232 : STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq);
423 9232 : cache->small.cache_count++;
424 : }
425 1250 : for (i = 0; i < large_cache_size; ++i) {
426 1168 : if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) {
427 1 : SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
428 : "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
429 : name, i, large_cache_size, g_iobuf.opts.large_pool_count);
430 1 : SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
431 : "this value.\n");
432 1 : return -ENOMEM;
433 : }
434 1167 : STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq);
435 1167 : cache->large.cache_count++;
436 : }
437 :
438 82 : return 0;
439 : }
440 :
441 : int
442 85 : spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
443 : uint32_t small_cache_size, uint32_t large_cache_size)
444 : {
445 : struct spdk_io_channel *ioch;
446 : struct iobuf_channel *iobuf_ch;
447 : struct iobuf_module *module;
448 : uint32_t i;
449 : int32_t numa_id;
450 : int rc;
451 :
452 90 : TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
453 90 : if (strcmp(name, module->name) == 0) {
454 85 : break;
455 : }
456 : }
457 :
458 85 : if (module == NULL) {
459 0 : SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
460 0 : return -ENODEV;
461 : }
462 :
463 85 : ioch = spdk_get_io_channel(&g_iobuf);
464 85 : if (ioch == NULL) {
465 0 : SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
466 0 : return -ENOMEM;
467 : }
468 :
469 85 : iobuf_ch = spdk_io_channel_get_ctx(ioch);
470 :
471 90 : for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
472 90 : if (iobuf_ch->channels[i] == NULL) {
473 85 : iobuf_ch->channels[i] = ch;
474 85 : break;
475 : }
476 : }
477 :
478 85 : if (i == IOBUF_MAX_CHANNELS) {
479 0 : SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
480 0 : rc = -ENOMEM;
481 0 : goto error;
482 : }
483 :
484 85 : ch->parent = ioch;
485 85 : ch->module = module;
486 :
487 170 : IOBUF_FOREACH_NUMA_ID(numa_id) {
488 85 : iobuf_channel_node_init(ch, iobuf_ch, numa_id,
489 : small_cache_size, large_cache_size);
490 : }
491 :
492 167 : IOBUF_FOREACH_NUMA_ID(numa_id) {
493 85 : rc = iobuf_channel_node_populate(ch, name, numa_id);
494 85 : if (rc) {
495 3 : goto error;
496 : }
497 : }
498 :
499 82 : return 0;
500 3 : error:
501 3 : spdk_iobuf_channel_fini(ch);
502 :
503 3 : return rc;
504 : }
505 :
506 : static void
507 85 : iobuf_channel_node_fini(struct spdk_iobuf_channel *ch, int32_t numa_id)
508 : {
509 85 : struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
510 85 : struct iobuf_node *node = &g_iobuf.node[numa_id];
511 : struct spdk_iobuf_entry *entry __attribute__((unused));
512 85 : struct spdk_iobuf_buffer *buf;
513 :
514 : /* Make sure none of the wait queue entries are coming from this module */
515 85 : STAILQ_FOREACH(entry, cache->small.queue, stailq) {
516 0 : assert(entry->module != ch->module);
517 : }
518 85 : STAILQ_FOREACH(entry, cache->large.queue, stailq) {
519 0 : assert(entry->module != ch->module);
520 : }
521 :
522 : /* Release cached buffers back to the pool */
523 9285 : while (!STAILQ_EMPTY(&cache->small.cache)) {
524 9200 : buf = STAILQ_FIRST(&cache->small.cache);
525 9200 : STAILQ_REMOVE_HEAD(&cache->small.cache, stailq);
526 9200 : spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
527 9200 : cache->small.cache_count--;
528 : }
529 1242 : while (!STAILQ_EMPTY(&cache->large.cache)) {
530 1157 : buf = STAILQ_FIRST(&cache->large.cache);
531 1157 : STAILQ_REMOVE_HEAD(&cache->large.cache, stailq);
532 1157 : spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
533 1157 : cache->large.cache_count--;
534 : }
535 :
536 85 : assert(cache->small.cache_count == 0);
537 85 : assert(cache->large.cache_count == 0);
538 85 : }
539 :
540 : void
541 85 : spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
542 : {
543 : struct iobuf_channel *iobuf_ch;
544 : uint32_t i;
545 :
546 170 : IOBUF_FOREACH_NUMA_ID(i) {
547 85 : iobuf_channel_node_fini(ch, i);
548 : }
549 :
550 85 : iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
551 90 : for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
552 90 : if (iobuf_ch->channels[i] == ch) {
553 85 : iobuf_ch->channels[i] = NULL;
554 85 : break;
555 : }
556 : }
557 :
558 85 : spdk_put_io_channel(ch->parent);
559 85 : ch->parent = NULL;
560 85 : }
561 :
562 : int
563 73 : spdk_iobuf_register_module(const char *name)
564 : {
565 : struct iobuf_module *module;
566 :
567 75 : TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
568 2 : if (strcmp(name, module->name) == 0) {
569 0 : return -EEXIST;
570 : }
571 : }
572 :
573 73 : module = calloc(1, sizeof(*module));
574 73 : if (module == NULL) {
575 0 : return -ENOMEM;
576 : }
577 :
578 73 : module->name = strdup(name);
579 73 : if (module->name == NULL) {
580 0 : free(module);
581 0 : return -ENOMEM;
582 : }
583 :
584 73 : TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
585 :
586 73 : return 0;
587 : }
588 :
589 : int
590 0 : spdk_iobuf_unregister_module(const char *name)
591 : {
592 : struct iobuf_module *module;
593 :
594 0 : TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
595 0 : if (strcmp(name, module->name) == 0) {
596 0 : TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
597 0 : free(module->name);
598 0 : free(module);
599 0 : return 0;
600 : }
601 : }
602 :
603 0 : return -ENOENT;
604 : }
605 :
606 : static int
607 236 : iobuf_pool_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool_cache *pool,
608 : spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
609 : {
610 : struct spdk_iobuf_entry *entry, *tmp;
611 : int rc;
612 :
613 252 : STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
614 : /* We only want to iterate over the entries requested by the module which owns ch */
615 16 : if (entry->module != ch->module) {
616 8 : continue;
617 : }
618 :
619 8 : rc = cb_fn(ch, entry, cb_ctx);
620 8 : if (rc != 0) {
621 0 : return rc;
622 : }
623 : }
624 :
625 236 : return 0;
626 : }
627 :
628 : int
629 118 : spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch,
630 : spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
631 : {
632 : struct spdk_iobuf_node_cache *cache;
633 : uint32_t i;
634 : int rc;
635 :
636 236 : IOBUF_FOREACH_NUMA_ID(i) {
637 118 : cache = &ch->cache[i];
638 :
639 118 : rc = iobuf_pool_for_each_entry(ch, &cache->small, cb_fn, cb_ctx);
640 118 : if (rc != 0) {
641 0 : return rc;
642 : }
643 118 : rc = iobuf_pool_for_each_entry(ch, &cache->large, cb_fn, cb_ctx);
644 118 : if (rc != 0) {
645 0 : return rc;
646 : }
647 : }
648 :
649 118 : return 0;
650 : }
651 :
652 : static bool
653 12 : iobuf_entry_abort_node(struct spdk_iobuf_channel *ch, int32_t numa_id,
654 : struct spdk_iobuf_entry *entry, uint64_t len)
655 : {
656 : struct spdk_iobuf_node_cache *cache;
657 : struct spdk_iobuf_pool_cache *pool;
658 : struct spdk_iobuf_entry *e;
659 :
660 12 : cache = &ch->cache[numa_id];
661 :
662 12 : if (len <= cache->small.bufsize) {
663 6 : pool = &cache->small;
664 : } else {
665 6 : assert(len <= cache->large.bufsize);
666 6 : pool = &cache->large;
667 : }
668 :
669 12 : STAILQ_FOREACH(e, pool->queue, stailq) {
670 12 : if (e == entry) {
671 12 : STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
672 12 : return true;
673 : }
674 : }
675 :
676 0 : return false;
677 : }
678 :
679 : void
680 12 : spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
681 : uint64_t len)
682 : {
683 : uint32_t i;
684 :
685 24 : IOBUF_FOREACH_NUMA_ID(i) {
686 12 : iobuf_entry_abort_node(ch, i, entry, len);
687 : }
688 12 : }
689 :
690 : #define IOBUF_BATCH_SIZE 32
691 :
692 : void *
693 99 : spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
694 : struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
695 : {
696 : struct spdk_iobuf_node_cache *cache;
697 : struct spdk_iobuf_pool_cache *pool;
698 : void *buf;
699 :
700 99 : cache = &ch->cache[0];
701 :
702 99 : assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
703 99 : if (len <= cache->small.bufsize) {
704 62 : pool = &cache->small;
705 : } else {
706 37 : assert(len <= cache->large.bufsize);
707 37 : pool = &cache->large;
708 : }
709 :
710 99 : buf = (void *)STAILQ_FIRST(&pool->cache);
711 99 : if (buf) {
712 50 : STAILQ_REMOVE_HEAD(&pool->cache, stailq);
713 50 : assert(pool->cache_count > 0);
714 50 : pool->cache_count--;
715 50 : pool->stats.cache++;
716 : } else {
717 49 : struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
718 : size_t sz, i;
719 :
720 : /* If we're going to dequeue, we may as well dequeue a batch. */
721 49 : sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
722 : spdk_max(pool->cache_size, 1)));
723 49 : if (sz == 0) {
724 28 : if (entry) {
725 28 : STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
726 28 : entry->module = ch->module;
727 28 : entry->cb_fn = cb_fn;
728 28 : pool->stats.retry++;
729 : }
730 :
731 28 : return NULL;
732 : }
733 :
734 21 : pool->stats.main++;
735 22 : for (i = 0; i < (sz - 1); i++) {
736 1 : STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
737 1 : pool->cache_count++;
738 : }
739 :
740 : /* The last one is the one we'll return */
741 21 : buf = bufs[i];
742 : }
743 :
744 71 : return (char *)buf;
745 : }
746 :
747 : void
748 45 : spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
749 : {
750 : struct spdk_iobuf_entry *entry;
751 : struct spdk_iobuf_buffer *iobuf_buf;
752 : struct spdk_iobuf_node_cache *cache;
753 : struct spdk_iobuf_pool_cache *pool;
754 : uint32_t numa_id;
755 : size_t sz;
756 :
757 45 : if (g_iobuf.opts.enable_numa) {
758 0 : numa_id = spdk_mem_get_numa_id(buf, NULL);
759 : } else {
760 45 : numa_id = 0;
761 : }
762 :
763 45 : cache = &ch->cache[numa_id];
764 :
765 45 : assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
766 45 : if (len <= cache->small.bufsize) {
767 24 : pool = &cache->small;
768 : } else {
769 21 : pool = &cache->large;
770 : }
771 :
772 45 : if (STAILQ_EMPTY(pool->queue)) {
773 29 : if (pool->cache_size == 0) {
774 18 : spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
775 18 : return;
776 : }
777 :
778 11 : iobuf_buf = (struct spdk_iobuf_buffer *)buf;
779 :
780 11 : STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
781 11 : pool->cache_count++;
782 :
783 : /* The cache size may exceed the configured amount. We always dequeue from the
784 : * central pool in batches of known size, so wait until at least a batch
785 : * has been returned to actually return the buffers to the central pool. */
786 11 : sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
787 11 : if (pool->cache_count >= pool->cache_size + sz) {
788 3 : struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
789 : size_t i;
790 :
791 7 : for (i = 0; i < sz; i++) {
792 4 : bufs[i] = STAILQ_FIRST(&pool->cache);
793 4 : STAILQ_REMOVE_HEAD(&pool->cache, stailq);
794 4 : assert(pool->cache_count > 0);
795 4 : pool->cache_count--;
796 : }
797 :
798 3 : spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
799 : }
800 : } else {
801 16 : entry = STAILQ_FIRST(pool->queue);
802 16 : STAILQ_REMOVE_HEAD(pool->queue, stailq);
803 16 : entry->cb_fn(entry, buf);
804 16 : if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) {
805 2 : STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
806 2 : STAILQ_INSERT_HEAD(pool->queue, entry, stailq);
807 : }
808 : }
809 : }
810 :
811 : static void
812 0 : iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status)
813 : {
814 0 : struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
815 :
816 0 : ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg);
817 0 : free(ctx->modules);
818 0 : free(ctx);
819 0 : }
820 :
821 : static void
822 0 : iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
823 : {
824 0 : struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
825 0 : struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter);
826 0 : struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch);
827 : struct spdk_iobuf_channel *channel;
828 : struct iobuf_module *module;
829 : struct spdk_iobuf_module_stats *it;
830 : uint32_t i, j;
831 :
832 0 : for (i = 0; i < ctx->num_modules; ++i) {
833 0 : for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) {
834 0 : channel = iobuf_ch->channels[j];
835 0 : if (channel == NULL) {
836 0 : continue;
837 : }
838 :
839 0 : it = &ctx->modules[i];
840 0 : module = (struct iobuf_module *)channel->module;
841 0 : if (strcmp(it->module, module->name) == 0) {
842 : struct spdk_iobuf_pool_cache *cache;
843 : uint32_t i;
844 :
845 0 : IOBUF_FOREACH_NUMA_ID(i) {
846 0 : cache = &channel->cache[i].small;
847 0 : it->small_pool.cache += cache->stats.cache;
848 0 : it->small_pool.main += cache->stats.main;
849 0 : it->small_pool.retry += cache->stats.retry;
850 :
851 0 : cache = &channel->cache[i].large;
852 0 : it->large_pool.cache += cache->stats.cache;
853 0 : it->large_pool.main += cache->stats.main;
854 0 : it->large_pool.retry += cache->stats.retry;
855 : }
856 0 : break;
857 : }
858 : }
859 : }
860 :
861 0 : spdk_for_each_channel_continue(iter, 0);
862 0 : }
863 :
864 : int
865 0 : spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg)
866 : {
867 : struct iobuf_module *module;
868 : struct iobuf_get_stats_ctx *ctx;
869 : uint32_t i;
870 :
871 0 : ctx = calloc(1, sizeof(*ctx));
872 0 : if (ctx == NULL) {
873 0 : return -ENOMEM;
874 : }
875 :
876 0 : TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
877 0 : ++ctx->num_modules;
878 : }
879 :
880 0 : ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats));
881 0 : if (ctx->modules == NULL) {
882 0 : free(ctx);
883 0 : return -ENOMEM;
884 : }
885 :
886 0 : i = 0;
887 0 : TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
888 0 : ctx->modules[i].module = module->name;
889 0 : ++i;
890 : }
891 :
892 0 : ctx->cb_fn = cb_fn;
893 0 : ctx->cb_arg = cb_arg;
894 :
895 0 : spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx,
896 : iobuf_get_channel_stats_done);
897 0 : return 0;
898 : }
|