LCOV - code coverage report
Current view: top level - lib/thread - iobuf.c (source / functions) Hit Total Coverage
Test: ut_cov_unit.info Lines: 320 469 68.2 %
Date: 2024-12-02 08:25:18 Functions: 20 25 80.0 %

          Line data    Source code
       1             : /*   SPDX-License-Identifier: BSD-3-Clause
       2             :  *   Copyright (C) 2023 Intel Corporation.
       3             :  *   All rights reserved.
       4             :  */
       5             : 
       6             : #include "spdk/env.h"
       7             : #include "spdk/util.h"
       8             : #include "spdk/likely.h"
       9             : #include "spdk/log.h"
      10             : #include "spdk/thread.h"
      11             : 
      12             : #define IOBUF_MIN_SMALL_POOL_SIZE       64
      13             : #define IOBUF_MIN_LARGE_POOL_SIZE       8
      14             : #define IOBUF_DEFAULT_SMALL_POOL_SIZE   8192
      15             : #define IOBUF_DEFAULT_LARGE_POOL_SIZE   1024
      16             : #define IOBUF_ALIGNMENT                 4096
      17             : #define IOBUF_MIN_SMALL_BUFSIZE         4096
      18             : #define IOBUF_MIN_LARGE_BUFSIZE         8192
      19             : #define IOBUF_DEFAULT_SMALL_BUFSIZE     (8 * 1024)
      20             : /* 132k is a weird choice at first, but this needs to be large enough to accommodate
      21             :  * the default maximum size (128k) plus metadata everywhere. For code paths that
      22             :  * are explicitly configured, the math is instead done properly. This is only
      23             :  * for the default. */
      24             : #define IOBUF_DEFAULT_LARGE_BUFSIZE     (132 * 1024)
      25             : #define IOBUF_MAX_CHANNELS              64
      26             : 
      27             : SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_buffer) <= IOBUF_MIN_SMALL_BUFSIZE,
      28             :                    "Invalid data offset");
      29             : 
      30             : static bool g_iobuf_is_initialized = false;
      31             : 
      32             : struct iobuf_channel_node {
      33             :         spdk_iobuf_entry_stailq_t       small_queue;
      34             :         spdk_iobuf_entry_stailq_t       large_queue;
      35             : };
      36             : 
      37             : struct iobuf_channel {
      38             :         struct iobuf_channel_node       node[SPDK_CONFIG_MAX_NUMA_NODES];
      39             :         struct spdk_iobuf_channel       *channels[IOBUF_MAX_CHANNELS];
      40             : };
      41             : 
      42             : struct iobuf_module {
      43             :         char                            *name;
      44             :         TAILQ_ENTRY(iobuf_module)       tailq;
      45             : };
      46             : 
      47             : struct iobuf_node {
      48             :         struct spdk_ring                *small_pool;
      49             :         struct spdk_ring                *large_pool;
      50             :         void                            *small_pool_base;
      51             :         void                            *large_pool_base;
      52             : };
      53             : 
      54             : struct iobuf {
      55             :         struct spdk_iobuf_opts          opts;
      56             :         TAILQ_HEAD(, iobuf_module)      modules;
      57             :         spdk_iobuf_finish_cb            finish_cb;
      58             :         void                            *finish_arg;
      59             :         struct iobuf_node               node[SPDK_CONFIG_MAX_NUMA_NODES];
      60             : };
      61             : 
      62             : #define IOBUF_FOREACH_NUMA_ID(i)                                                \
      63             :         for (i = g_iobuf.opts.enable_numa ? spdk_env_get_first_numa_id() : 0;   \
      64             :              i < INT32_MAX;                                                  \
      65             :              i = g_iobuf.opts.enable_numa ? spdk_env_get_next_numa_id(i) : INT32_MAX)
      66             : 
      67             : static struct iobuf g_iobuf = {
      68             :         .modules = TAILQ_HEAD_INITIALIZER(g_iobuf.modules),
      69             :         .node = {},
      70             :         .opts = {
      71             :                 .small_pool_count = IOBUF_DEFAULT_SMALL_POOL_SIZE,
      72             :                 .large_pool_count = IOBUF_DEFAULT_LARGE_POOL_SIZE,
      73             :                 .small_bufsize = IOBUF_DEFAULT_SMALL_BUFSIZE,
      74             :                 .large_bufsize = IOBUF_DEFAULT_LARGE_BUFSIZE,
      75             :         },
      76             : };
      77             : 
      78             : struct iobuf_get_stats_ctx {
      79             :         struct spdk_iobuf_module_stats  *modules;
      80             :         uint32_t                        num_modules;
      81             :         spdk_iobuf_get_stats_cb         cb_fn;
      82             :         void                            *cb_arg;
      83             : };
      84             : 
      85             : static int
      86          78 : iobuf_channel_create_cb(void *io_device, void *ctx)
      87             : {
      88          78 :         struct iobuf_channel *ch = ctx;
      89             :         struct iobuf_channel_node *node;
      90             :         int32_t i;
      91             : 
      92         156 :         IOBUF_FOREACH_NUMA_ID(i) {
      93          78 :                 node = &ch->node[i];
      94          78 :                 STAILQ_INIT(&node->small_queue);
      95          78 :                 STAILQ_INIT(&node->large_queue);
      96          78 :         }
      97             : 
      98          78 :         return 0;
      99             : }
     100             : 
     101             : static void
     102          78 : iobuf_channel_destroy_cb(void *io_device, void *ctx)
     103             : {
     104          78 :         struct iobuf_channel *ch = ctx;
     105             :         struct iobuf_channel_node *node __attribute__((unused));
     106             :         int32_t i;
     107             : 
     108         156 :         IOBUF_FOREACH_NUMA_ID(i) {
     109          78 :                 node = &ch->node[i];
     110          78 :                 assert(STAILQ_EMPTY(&node->small_queue));
     111          78 :                 assert(STAILQ_EMPTY(&node->large_queue));
     112          78 :         }
     113          78 : }
     114             : 
     115             : static int
     116          70 : iobuf_node_initialize(struct iobuf_node *node, uint32_t numa_id)
     117             : {
     118          70 :         struct spdk_iobuf_opts *opts = &g_iobuf.opts;
     119             :         struct spdk_iobuf_buffer *buf;
     120             :         uint64_t i;
     121             :         int rc;
     122             : 
     123          70 :         if (!g_iobuf.opts.enable_numa) {
     124          70 :                 numa_id = SPDK_ENV_NUMA_ID_ANY;
     125          70 :         }
     126             : 
     127         140 :         node->small_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->small_pool_count,
     128          70 :                                             numa_id);
     129          70 :         if (!node->small_pool) {
     130           0 :                 SPDK_ERRLOG("Failed to create small iobuf pool\n");
     131           0 :                 rc = -ENOMEM;
     132           0 :                 goto error;
     133             :         }
     134             : 
     135         140 :         node->small_pool_base = spdk_malloc(opts->small_bufsize * opts->small_pool_count, IOBUF_ALIGNMENT,
     136          70 :                                             NULL, numa_id, SPDK_MALLOC_DMA);
     137          70 :         if (node->small_pool_base == NULL) {
     138           0 :                 SPDK_ERRLOG("Unable to allocate requested small iobuf pool size\n");
     139           0 :                 rc = -ENOMEM;
     140           0 :                 goto error;
     141             :         }
     142             : 
     143         140 :         node->large_pool = spdk_ring_create(SPDK_RING_TYPE_MP_MC, opts->large_pool_count,
     144          70 :                                             numa_id);
     145          70 :         if (!node->large_pool) {
     146           0 :                 SPDK_ERRLOG("Failed to create large iobuf pool\n");
     147           0 :                 rc = -ENOMEM;
     148           0 :                 goto error;
     149             :         }
     150             : 
     151         140 :         node->large_pool_base = spdk_malloc(opts->large_bufsize * opts->large_pool_count, IOBUF_ALIGNMENT,
     152          70 :                                             NULL, numa_id, SPDK_MALLOC_DMA);
     153          70 :         if (node->large_pool_base == NULL) {
     154           0 :                 SPDK_ERRLOG("Unable to allocate requested large iobuf pool size\n");
     155           0 :                 rc = -ENOMEM;
     156           0 :                 goto error;
     157             :         }
     158             : 
     159      548942 :         for (i = 0; i < opts->small_pool_count; i++) {
     160      548872 :                 buf = node->small_pool_base + i * opts->small_bufsize;
     161      548872 :                 spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
     162      548872 :         }
     163             : 
     164       68686 :         for (i = 0; i < opts->large_pool_count; i++) {
     165       68616 :                 buf = node->large_pool_base + i * opts->large_bufsize;
     166       68616 :                 spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
     167       68616 :         }
     168             : 
     169          70 :         return 0;
     170             : 
     171             : error:
     172           0 :         spdk_free(node->small_pool_base);
     173           0 :         spdk_ring_free(node->small_pool);
     174           0 :         spdk_free(node->large_pool_base);
     175           0 :         spdk_ring_free(node->large_pool);
     176           0 :         memset(node, 0, sizeof(*node));
     177             : 
     178           0 :         return rc;
     179          70 : }
     180             : 
     181             : static void
     182          70 : iobuf_node_free(struct iobuf_node *node)
     183             : {
     184          70 :         if (node->small_pool == NULL) {
     185             :                 /* This node didn't get allocated, so just return immediately. */
     186           0 :                 return;
     187             :         }
     188             : 
     189          70 :         if (spdk_ring_count(node->small_pool) != g_iobuf.opts.small_pool_count) {
     190           7 :                 SPDK_ERRLOG("small iobuf pool count is %zu, expected %"PRIu64"\n",
     191             :                             spdk_ring_count(node->small_pool), g_iobuf.opts.small_pool_count);
     192           7 :         }
     193             : 
     194          70 :         if (spdk_ring_count(node->large_pool) != g_iobuf.opts.large_pool_count) {
     195           2 :                 SPDK_ERRLOG("large iobuf pool count is %zu, expected %"PRIu64"\n",
     196             :                             spdk_ring_count(node->large_pool), g_iobuf.opts.large_pool_count);
     197           2 :         }
     198             : 
     199          70 :         spdk_free(node->small_pool_base);
     200          70 :         node->small_pool_base = NULL;
     201          70 :         spdk_ring_free(node->small_pool);
     202          70 :         node->small_pool = NULL;
     203             : 
     204          70 :         spdk_free(node->large_pool_base);
     205          70 :         node->large_pool_base = NULL;
     206          70 :         spdk_ring_free(node->large_pool);
     207          70 :         node->large_pool = NULL;
     208          70 : }
     209             : 
     210             : int
     211          70 : spdk_iobuf_initialize(void)
     212             : {
     213          70 :         struct spdk_iobuf_opts *opts = &g_iobuf.opts;
     214             :         struct iobuf_node *node;
     215             :         int32_t i;
     216          70 :         int rc = 0;
     217             : 
     218             :         /* Round up to the nearest alignment so that each element remains aligned */
     219          70 :         opts->small_bufsize = SPDK_ALIGN_CEIL(opts->small_bufsize, IOBUF_ALIGNMENT);
     220          70 :         opts->large_bufsize = SPDK_ALIGN_CEIL(opts->large_bufsize, IOBUF_ALIGNMENT);
     221             : 
     222         140 :         IOBUF_FOREACH_NUMA_ID(i) {
     223          70 :                 node = &g_iobuf.node[i];
     224          70 :                 rc = iobuf_node_initialize(node, i);
     225          70 :                 if (rc) {
     226           0 :                         goto err;
     227             :                 }
     228          70 :         }
     229             : 
     230          70 :         spdk_io_device_register(&g_iobuf, iobuf_channel_create_cb, iobuf_channel_destroy_cb,
     231             :                                 sizeof(struct iobuf_channel), "iobuf");
     232          70 :         g_iobuf_is_initialized = true;
     233             : 
     234          70 :         return 0;
     235             : 
     236             : err:
     237           0 :         IOBUF_FOREACH_NUMA_ID(i) {
     238           0 :                 node = &g_iobuf.node[i];
     239           0 :                 iobuf_node_free(node);
     240           0 :         }
     241           0 :         return rc;
     242          70 : }
     243             : 
     244             : static void
     245          70 : iobuf_unregister_cb(void *io_device)
     246             : {
     247             :         struct iobuf_module *module;
     248             :         struct iobuf_node *node;
     249             :         int32_t i;
     250             : 
     251         142 :         while (!TAILQ_EMPTY(&g_iobuf.modules)) {
     252          72 :                 module = TAILQ_FIRST(&g_iobuf.modules);
     253          72 :                 TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
     254          72 :                 free(module->name);
     255          72 :                 free(module);
     256             :         }
     257             : 
     258         140 :         IOBUF_FOREACH_NUMA_ID(i) {
     259          70 :                 node = &g_iobuf.node[i];
     260          70 :                 iobuf_node_free(node);
     261          70 :         }
     262             : 
     263          70 :         if (g_iobuf.finish_cb != NULL) {
     264          70 :                 g_iobuf.finish_cb(g_iobuf.finish_arg);
     265          70 :         }
     266          70 : }
     267             : 
     268             : void
     269          70 : spdk_iobuf_finish(spdk_iobuf_finish_cb cb_fn, void *cb_arg)
     270             : {
     271          70 :         if (!g_iobuf_is_initialized) {
     272           0 :                 cb_fn(cb_arg);
     273           0 :                 return;
     274             :         }
     275             : 
     276          70 :         g_iobuf_is_initialized = false;
     277          70 :         g_iobuf.finish_cb = cb_fn;
     278          70 :         g_iobuf.finish_arg = cb_arg;
     279             : 
     280          70 :         spdk_io_device_unregister(&g_iobuf, iobuf_unregister_cb);
     281          70 : }
     282             : 
     283             : int
     284           0 : spdk_iobuf_set_opts(const struct spdk_iobuf_opts *opts)
     285             : {
     286           0 :         if (!opts) {
     287           0 :                 SPDK_ERRLOG("opts cannot be NULL\n");
     288           0 :                 return -1;
     289             :         }
     290             : 
     291           0 :         if (!opts->opts_size) {
     292           0 :                 SPDK_ERRLOG("opts_size inside opts cannot be zero value\n");
     293           0 :                 return -1;
     294             :         }
     295             : 
     296           0 :         if (opts->small_pool_count < IOBUF_MIN_SMALL_POOL_SIZE) {
     297           0 :                 SPDK_ERRLOG("small_pool_count must be at least %" PRIu32 "\n",
     298             :                             IOBUF_MIN_SMALL_POOL_SIZE);
     299           0 :                 return -EINVAL;
     300             :         }
     301           0 :         if (opts->large_pool_count < IOBUF_MIN_LARGE_POOL_SIZE) {
     302           0 :                 SPDK_ERRLOG("large_pool_count must be at least %" PRIu32 "\n",
     303             :                             IOBUF_MIN_LARGE_POOL_SIZE);
     304           0 :                 return -EINVAL;
     305             :         }
     306             : 
     307           0 :         if (opts->small_bufsize < IOBUF_MIN_SMALL_BUFSIZE) {
     308           0 :                 SPDK_ERRLOG("small_bufsize must be at least %" PRIu32 "\n",
     309             :                             IOBUF_MIN_SMALL_BUFSIZE);
     310           0 :                 return -EINVAL;
     311             :         }
     312             : 
     313           0 :         if (opts->large_bufsize < IOBUF_MIN_LARGE_BUFSIZE) {
     314           0 :                 SPDK_ERRLOG("large_bufsize must be at least %" PRIu32 "\n",
     315             :                             IOBUF_MIN_LARGE_BUFSIZE);
     316           0 :                 return -EINVAL;
     317             :         }
     318             : 
     319           0 :         if (opts->enable_numa &&
     320           0 :             spdk_env_get_last_numa_id() >= SPDK_CONFIG_MAX_NUMA_NODES) {
     321           0 :                 SPDK_ERRLOG("max NUMA ID %" PRIu32 " cannot be supported with "
     322             :                             "SPDK_CONFIG_MAX_NUMA_NODES %" PRIu32 "\n",
     323             :                             spdk_env_get_last_numa_id(), SPDK_CONFIG_MAX_NUMA_NODES);
     324           0 :                 SPDK_ERRLOG("Re-configure with --max-numa-nodes=%" PRIu32 "\n",
     325             :                             spdk_env_get_last_numa_id() + 1);
     326           0 :                 return -EINVAL;
     327             :         }
     328             : 
     329             : #define SET_FIELD(field) \
     330             :         if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts->opts_size) { \
     331             :                 g_iobuf.opts.field = opts->field; \
     332             :         } \
     333             : 
     334           0 :         SET_FIELD(small_pool_count);
     335           0 :         SET_FIELD(large_pool_count);
     336           0 :         SET_FIELD(small_bufsize);
     337           0 :         SET_FIELD(large_bufsize);
     338           0 :         SET_FIELD(enable_numa);
     339             : 
     340           0 :         g_iobuf.opts.opts_size = opts->opts_size;
     341             : 
     342             : #undef SET_FIELD
     343             : 
     344           0 :         return 0;
     345           0 : }
     346             : 
     347             : void
     348         131 : spdk_iobuf_get_opts(struct spdk_iobuf_opts *opts, size_t opts_size)
     349             : {
     350         131 :         if (!opts) {
     351           0 :                 SPDK_ERRLOG("opts should not be NULL\n");
     352           0 :                 return;
     353             :         }
     354             : 
     355         131 :         if (!opts_size) {
     356           0 :                 SPDK_ERRLOG("opts_size should not be zero value\n");
     357           0 :                 return;
     358             :         }
     359             : 
     360         131 :         opts->opts_size = opts_size;
     361             : 
     362             : #define SET_FIELD(field) \
     363             :         if (offsetof(struct spdk_iobuf_opts, field) + sizeof(opts->field) <= opts_size) { \
     364             :                 opts->field = g_iobuf.opts.field; \
     365             :         } \
     366             : 
     367         131 :         SET_FIELD(small_pool_count);
     368         131 :         SET_FIELD(large_pool_count);
     369         131 :         SET_FIELD(small_bufsize);
     370         131 :         SET_FIELD(large_bufsize);
     371         131 :         SET_FIELD(enable_numa);
     372             : 
     373             : #undef SET_FIELD
     374             : 
     375             :         /* Do not remove this statement, you should always update this statement when you adding a new field,
     376             :          * and do not forget to add the SET_FIELD statement for your added field. */
     377             :         SPDK_STATIC_ASSERT(sizeof(struct spdk_iobuf_opts) == 40, "Incorrect size");
     378         131 : }
     379             : 
     380             : static void
     381          85 : iobuf_channel_node_init(struct spdk_iobuf_channel *ch, struct iobuf_channel *iobuf_ch,
     382             :                         int32_t numa_id, uint32_t small_cache_size, uint32_t large_cache_size)
     383             : {
     384          85 :         struct iobuf_node *node = &g_iobuf.node[numa_id];
     385          85 :         struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
     386          85 :         struct iobuf_channel_node *ch_node = &iobuf_ch->node[numa_id];
     387             : 
     388          85 :         cache->small.queue = &ch_node->small_queue;
     389          85 :         cache->large.queue = &ch_node->large_queue;
     390          85 :         cache->small.pool = node->small_pool;
     391          85 :         cache->large.pool = node->large_pool;
     392          85 :         cache->small.bufsize = g_iobuf.opts.small_bufsize;
     393          85 :         cache->large.bufsize = g_iobuf.opts.large_bufsize;
     394          85 :         cache->small.cache_size = small_cache_size;
     395          85 :         cache->large.cache_size = large_cache_size;
     396          85 :         cache->small.cache_count = 0;
     397          85 :         cache->large.cache_count = 0;
     398             : 
     399          85 :         STAILQ_INIT(&cache->small.cache);
     400          85 :         STAILQ_INIT(&cache->large.cache);
     401          85 : }
     402             : 
     403             : static int
     404          85 : iobuf_channel_node_populate(struct spdk_iobuf_channel *ch, const char *name, int32_t numa_id)
     405             : {
     406          85 :         struct iobuf_node *node = &g_iobuf.node[numa_id];
     407          85 :         struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
     408          85 :         uint32_t small_cache_size = cache->small.cache_size;
     409          85 :         uint32_t large_cache_size = cache->large.cache_size;
     410             :         struct spdk_iobuf_buffer *buf;
     411             :         uint32_t i;
     412             : 
     413        9317 :         for (i = 0; i < small_cache_size; ++i) {
     414        9234 :                 if (spdk_ring_dequeue(node->small_pool, (void **)&buf, 1) == 0) {
     415           2 :                         SPDK_ERRLOG("Failed to populate '%s' iobuf small buffer cache at %d/%d entries. "
     416             :                                     "You may need to increase spdk_iobuf_opts.small_pool_count (%"PRIu64")\n",
     417             :                                     name, i, small_cache_size, g_iobuf.opts.small_pool_count);
     418           2 :                         SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
     419             :                                     "this value.\n");
     420           2 :                         return -ENOMEM;
     421             :                 }
     422        9232 :                 STAILQ_INSERT_TAIL(&cache->small.cache, buf, stailq);
     423        9232 :                 cache->small.cache_count++;
     424        9232 :         }
     425        1250 :         for (i = 0; i < large_cache_size; ++i) {
     426        1168 :                 if (spdk_ring_dequeue(node->large_pool, (void **)&buf, 1) == 0) {
     427           1 :                         SPDK_ERRLOG("Failed to populate '%s' iobuf large buffer cache at %d/%d entries. "
     428             :                                     "You may need to increase spdk_iobuf_opts.large_pool_count (%"PRIu64")\n",
     429             :                                     name, i, large_cache_size, g_iobuf.opts.large_pool_count);
     430           1 :                         SPDK_ERRLOG("See scripts/calc-iobuf.py for guidance on how to calculate "
     431             :                                     "this value.\n");
     432           1 :                         return -ENOMEM;
     433             :                 }
     434        1167 :                 STAILQ_INSERT_TAIL(&cache->large.cache, buf, stailq);
     435        1167 :                 cache->large.cache_count++;
     436        1167 :         }
     437             : 
     438          82 :         return 0;
     439          85 : }
     440             : 
     441             : int
     442          85 : spdk_iobuf_channel_init(struct spdk_iobuf_channel *ch, const char *name,
     443             :                         uint32_t small_cache_size, uint32_t large_cache_size)
     444             : {
     445             :         struct spdk_io_channel *ioch;
     446             :         struct iobuf_channel *iobuf_ch;
     447             :         struct iobuf_module *module;
     448             :         uint32_t i;
     449             :         int32_t numa_id;
     450             :         int rc;
     451             : 
     452          90 :         TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
     453          90 :                 if (strcmp(name, module->name) == 0) {
     454          85 :                         break;
     455             :                 }
     456           5 :         }
     457             : 
     458          85 :         if (module == NULL) {
     459           0 :                 SPDK_ERRLOG("Couldn't find iobuf module: '%s'\n", name);
     460           0 :                 return -ENODEV;
     461             :         }
     462             : 
     463          85 :         ioch = spdk_get_io_channel(&g_iobuf);
     464          85 :         if (ioch == NULL) {
     465           0 :                 SPDK_ERRLOG("Couldn't get iobuf IO channel\n");
     466           0 :                 return -ENOMEM;
     467             :         }
     468             : 
     469          85 :         iobuf_ch = spdk_io_channel_get_ctx(ioch);
     470             : 
     471          90 :         for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
     472          90 :                 if (iobuf_ch->channels[i] == NULL) {
     473          85 :                         iobuf_ch->channels[i] = ch;
     474          85 :                         break;
     475             :                 }
     476           5 :         }
     477             : 
     478          85 :         if (i == IOBUF_MAX_CHANNELS) {
     479           0 :                 SPDK_ERRLOG("Max number of iobuf channels (%" PRIu32 ") exceeded.\n", i);
     480           0 :                 rc = -ENOMEM;
     481           0 :                 goto error;
     482             :         }
     483             : 
     484          85 :         ch->parent = ioch;
     485          85 :         ch->module = module;
     486             : 
     487         170 :         IOBUF_FOREACH_NUMA_ID(numa_id) {
     488         170 :                 iobuf_channel_node_init(ch, iobuf_ch, numa_id,
     489          85 :                                         small_cache_size, large_cache_size);
     490          85 :         }
     491             : 
     492         167 :         IOBUF_FOREACH_NUMA_ID(numa_id) {
     493          85 :                 rc = iobuf_channel_node_populate(ch, name, numa_id);
     494          85 :                 if (rc) {
     495           3 :                         goto error;
     496             :                 }
     497          82 :         }
     498             : 
     499          82 :         return 0;
     500             : error:
     501           3 :         spdk_iobuf_channel_fini(ch);
     502             : 
     503           3 :         return rc;
     504          85 : }
     505             : 
     506             : static void
     507          85 : iobuf_channel_node_fini(struct spdk_iobuf_channel *ch, int32_t numa_id)
     508             : {
     509          85 :         struct spdk_iobuf_node_cache *cache = &ch->cache[numa_id];
     510          85 :         struct iobuf_node *node = &g_iobuf.node[numa_id];
     511             :         struct spdk_iobuf_entry *entry __attribute__((unused));
     512             :         struct spdk_iobuf_buffer *buf;
     513             : 
     514             :         /* Make sure none of the wait queue entries are coming from this module */
     515          85 :         STAILQ_FOREACH(entry, cache->small.queue, stailq) {
     516           0 :                 assert(entry->module != ch->module);
     517           0 :         }
     518          85 :         STAILQ_FOREACH(entry, cache->large.queue, stailq) {
     519           0 :                 assert(entry->module != ch->module);
     520           0 :         }
     521             : 
     522             :         /* Release cached buffers back to the pool */
     523        9285 :         while (!STAILQ_EMPTY(&cache->small.cache)) {
     524        9200 :                 buf = STAILQ_FIRST(&cache->small.cache);
     525        9200 :                 STAILQ_REMOVE_HEAD(&cache->small.cache, stailq);
     526        9200 :                 spdk_ring_enqueue(node->small_pool, (void **)&buf, 1, NULL);
     527        9200 :                 cache->small.cache_count--;
     528             :         }
     529        1242 :         while (!STAILQ_EMPTY(&cache->large.cache)) {
     530        1157 :                 buf = STAILQ_FIRST(&cache->large.cache);
     531        1157 :                 STAILQ_REMOVE_HEAD(&cache->large.cache, stailq);
     532        1157 :                 spdk_ring_enqueue(node->large_pool, (void **)&buf, 1, NULL);
     533        1157 :                 cache->large.cache_count--;
     534             :         }
     535             : 
     536          85 :         assert(cache->small.cache_count == 0);
     537          85 :         assert(cache->large.cache_count == 0);
     538          85 : }
     539             : 
     540             : void
     541          85 : spdk_iobuf_channel_fini(struct spdk_iobuf_channel *ch)
     542             : {
     543             :         struct iobuf_channel *iobuf_ch;
     544             :         uint32_t i;
     545             : 
     546         170 :         IOBUF_FOREACH_NUMA_ID(i) {
     547          85 :                 iobuf_channel_node_fini(ch, i);
     548          85 :         }
     549             : 
     550          85 :         iobuf_ch = spdk_io_channel_get_ctx(ch->parent);
     551          90 :         for (i = 0; i < IOBUF_MAX_CHANNELS; ++i) {
     552          90 :                 if (iobuf_ch->channels[i] == ch) {
     553          85 :                         iobuf_ch->channels[i] = NULL;
     554          85 :                         break;
     555             :                 }
     556           5 :         }
     557             : 
     558          85 :         spdk_put_io_channel(ch->parent);
     559          85 :         ch->parent = NULL;
     560          85 : }
     561             : 
     562             : int
     563          73 : spdk_iobuf_register_module(const char *name)
     564             : {
     565             :         struct iobuf_module *module;
     566             : 
     567          75 :         TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
     568           2 :                 if (strcmp(name, module->name) == 0) {
     569           0 :                         return -EEXIST;
     570             :                 }
     571           2 :         }
     572             : 
     573          73 :         module = calloc(1, sizeof(*module));
     574          73 :         if (module == NULL) {
     575           0 :                 return -ENOMEM;
     576             :         }
     577             : 
     578          73 :         module->name = strdup(name);
     579          73 :         if (module->name == NULL) {
     580           0 :                 free(module);
     581           0 :                 return -ENOMEM;
     582             :         }
     583             : 
     584          73 :         TAILQ_INSERT_TAIL(&g_iobuf.modules, module, tailq);
     585             : 
     586          73 :         return 0;
     587          73 : }
     588             : 
     589             : int
     590           0 : spdk_iobuf_unregister_module(const char *name)
     591             : {
     592             :         struct iobuf_module *module;
     593             : 
     594           0 :         TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
     595           0 :                 if (strcmp(name, module->name) == 0) {
     596           0 :                         TAILQ_REMOVE(&g_iobuf.modules, module, tailq);
     597           0 :                         free(module->name);
     598           0 :                         free(module);
     599           0 :                         return 0;
     600             :                 }
     601           0 :         }
     602             : 
     603           0 :         return -ENOENT;
     604           0 : }
     605             : 
     606             : static int
     607         236 : iobuf_pool_for_each_entry(struct spdk_iobuf_channel *ch, struct spdk_iobuf_pool_cache *pool,
     608             :                           spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
     609             : {
     610             :         struct spdk_iobuf_entry *entry, *tmp;
     611             :         int rc;
     612             : 
     613         252 :         STAILQ_FOREACH_SAFE(entry, pool->queue, stailq, tmp) {
     614             :                 /* We only want to iterate over the entries requested by the module which owns ch */
     615          16 :                 if (entry->module != ch->module) {
     616           8 :                         continue;
     617             :                 }
     618             : 
     619           8 :                 rc = cb_fn(ch, entry, cb_ctx);
     620           8 :                 if (rc != 0) {
     621           0 :                         return rc;
     622             :                 }
     623           8 :         }
     624             : 
     625         236 :         return 0;
     626         236 : }
     627             : 
     628             : int
     629         118 : spdk_iobuf_for_each_entry(struct spdk_iobuf_channel *ch,
     630             :                           spdk_iobuf_for_each_entry_fn cb_fn, void *cb_ctx)
     631             : {
     632             :         struct spdk_iobuf_node_cache *cache;
     633             :         uint32_t i;
     634             :         int rc;
     635             : 
     636         236 :         IOBUF_FOREACH_NUMA_ID(i) {
     637         118 :                 cache = &ch->cache[i];
     638             : 
     639         118 :                 rc = iobuf_pool_for_each_entry(ch, &cache->small, cb_fn, cb_ctx);
     640         118 :                 if (rc != 0) {
     641           0 :                         return rc;
     642             :                 }
     643         118 :                 rc = iobuf_pool_for_each_entry(ch, &cache->large, cb_fn, cb_ctx);
     644         118 :                 if (rc != 0) {
     645           0 :                         return rc;
     646             :                 }
     647         118 :         }
     648             : 
     649         118 :         return 0;
     650         118 : }
     651             : 
     652             : static bool
     653          12 : iobuf_entry_abort_node(struct spdk_iobuf_channel *ch, int32_t numa_id,
     654             :                        struct spdk_iobuf_entry *entry, uint64_t len)
     655             : {
     656             :         struct spdk_iobuf_node_cache *cache;
     657             :         struct spdk_iobuf_pool_cache *pool;
     658             :         struct spdk_iobuf_entry *e;
     659             : 
     660          12 :         cache = &ch->cache[numa_id];
     661             : 
     662          12 :         if (len <= cache->small.bufsize) {
     663           6 :                 pool = &cache->small;
     664           6 :         } else {
     665           6 :                 assert(len <= cache->large.bufsize);
     666           6 :                 pool = &cache->large;
     667             :         }
     668             : 
     669          12 :         STAILQ_FOREACH(e, pool->queue, stailq) {
     670          12 :                 if (e == entry) {
     671          12 :                         STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
     672          12 :                         return true;
     673             :                 }
     674           0 :         }
     675             : 
     676           0 :         return false;
     677          12 : }
     678             : 
     679             : void
     680          12 : spdk_iobuf_entry_abort(struct spdk_iobuf_channel *ch, struct spdk_iobuf_entry *entry,
     681             :                        uint64_t len)
     682             : {
     683             :         uint32_t i;
     684             : 
     685          24 :         IOBUF_FOREACH_NUMA_ID(i) {
     686          12 :                 iobuf_entry_abort_node(ch, i, entry, len);
     687          12 :         }
     688          12 : }
     689             : 
     690             : #define IOBUF_BATCH_SIZE 32
     691             : 
     692             : void *
     693          99 : spdk_iobuf_get(struct spdk_iobuf_channel *ch, uint64_t len,
     694             :                struct spdk_iobuf_entry *entry, spdk_iobuf_get_cb cb_fn)
     695             : {
     696             :         struct spdk_iobuf_node_cache *cache;
     697             :         struct spdk_iobuf_pool_cache *pool;
     698             :         void *buf;
     699             : 
     700          99 :         cache = &ch->cache[0];
     701             : 
     702          99 :         assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
     703          99 :         if (len <= cache->small.bufsize) {
     704          62 :                 pool = &cache->small;
     705          62 :         } else {
     706          37 :                 assert(len <= cache->large.bufsize);
     707          37 :                 pool = &cache->large;
     708             :         }
     709             : 
     710          99 :         buf = (void *)STAILQ_FIRST(&pool->cache);
     711          99 :         if (buf) {
     712          50 :                 STAILQ_REMOVE_HEAD(&pool->cache, stailq);
     713          50 :                 assert(pool->cache_count > 0);
     714          50 :                 pool->cache_count--;
     715          50 :                 pool->stats.cache++;
     716          50 :         } else {
     717             :                 struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
     718             :                 size_t sz, i;
     719             : 
     720             :                 /* If we're going to dequeue, we may as well dequeue a batch. */
     721          49 :                 sz = spdk_ring_dequeue(pool->pool, (void **)bufs, spdk_min(IOBUF_BATCH_SIZE,
     722             :                                        spdk_max(pool->cache_size, 1)));
     723          49 :                 if (sz == 0) {
     724          28 :                         if (entry) {
     725          28 :                                 STAILQ_INSERT_TAIL(pool->queue, entry, stailq);
     726          28 :                                 entry->module = ch->module;
     727          28 :                                 entry->cb_fn = cb_fn;
     728          28 :                                 pool->stats.retry++;
     729          28 :                         }
     730             : 
     731          28 :                         return NULL;
     732             :                 }
     733             : 
     734          21 :                 pool->stats.main++;
     735          22 :                 for (i = 0; i < (sz - 1); i++) {
     736           1 :                         STAILQ_INSERT_HEAD(&pool->cache, bufs[i], stailq);
     737           1 :                         pool->cache_count++;
     738           1 :                 }
     739             : 
     740             :                 /* The last one is the one we'll return */
     741          21 :                 buf = bufs[i];
     742             :         }
     743             : 
     744          71 :         return (char *)buf;
     745          99 : }
     746             : 
     747             : void
     748          45 : spdk_iobuf_put(struct spdk_iobuf_channel *ch, void *buf, uint64_t len)
     749             : {
     750             :         struct spdk_iobuf_entry *entry;
     751             :         struct spdk_iobuf_buffer *iobuf_buf;
     752             :         struct spdk_iobuf_node_cache *cache;
     753             :         struct spdk_iobuf_pool_cache *pool;
     754             :         uint32_t numa_id;
     755             :         size_t sz;
     756             : 
     757          45 :         if (g_iobuf.opts.enable_numa) {
     758           0 :                 numa_id = spdk_mem_get_numa_id(buf, NULL);
     759           0 :         } else {
     760          45 :                 numa_id = 0;
     761             :         }
     762             : 
     763          45 :         cache = &ch->cache[numa_id];
     764             : 
     765          45 :         assert(spdk_io_channel_get_thread(ch->parent) == spdk_get_thread());
     766          45 :         if (len <= cache->small.bufsize) {
     767          24 :                 pool = &cache->small;
     768          24 :         } else {
     769          21 :                 pool = &cache->large;
     770             :         }
     771             : 
     772          45 :         if (STAILQ_EMPTY(pool->queue)) {
     773          29 :                 if (pool->cache_size == 0) {
     774          18 :                         spdk_ring_enqueue(pool->pool, (void **)&buf, 1, NULL);
     775          18 :                         return;
     776             :                 }
     777             : 
     778          11 :                 iobuf_buf = (struct spdk_iobuf_buffer *)buf;
     779             : 
     780          11 :                 STAILQ_INSERT_HEAD(&pool->cache, iobuf_buf, stailq);
     781          11 :                 pool->cache_count++;
     782             : 
     783             :                 /* The cache size may exceed the configured amount. We always dequeue from the
     784             :                  * central pool in batches of known size, so wait until at least a batch
     785             :                  * has been returned to actually return the buffers to the central pool. */
     786          11 :                 sz = spdk_min(IOBUF_BATCH_SIZE, pool->cache_size);
     787          11 :                 if (pool->cache_count >= pool->cache_size + sz) {
     788             :                         struct spdk_iobuf_buffer *bufs[IOBUF_BATCH_SIZE];
     789             :                         size_t i;
     790             : 
     791           7 :                         for (i = 0; i < sz; i++) {
     792           4 :                                 bufs[i] = STAILQ_FIRST(&pool->cache);
     793           4 :                                 STAILQ_REMOVE_HEAD(&pool->cache, stailq);
     794           4 :                                 assert(pool->cache_count > 0);
     795           4 :                                 pool->cache_count--;
     796           4 :                         }
     797             : 
     798           3 :                         spdk_ring_enqueue(pool->pool, (void **)bufs, sz, NULL);
     799           3 :                 }
     800          11 :         } else {
     801          16 :                 entry = STAILQ_FIRST(pool->queue);
     802          16 :                 STAILQ_REMOVE_HEAD(pool->queue, stailq);
     803          16 :                 entry->cb_fn(entry, buf);
     804          16 :                 if (spdk_unlikely(entry == STAILQ_LAST(pool->queue, spdk_iobuf_entry, stailq))) {
     805           3 :                         STAILQ_REMOVE(pool->queue, entry, spdk_iobuf_entry, stailq);
     806           2 :                         STAILQ_INSERT_HEAD(pool->queue, entry, stailq);
     807           2 :                 }
     808             :         }
     809          45 : }
     810             : 
     811             : static void
     812           0 : iobuf_get_channel_stats_done(struct spdk_io_channel_iter *iter, int status)
     813             : {
     814           0 :         struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
     815             : 
     816           0 :         ctx->cb_fn(ctx->modules, ctx->num_modules, ctx->cb_arg);
     817           0 :         free(ctx->modules);
     818           0 :         free(ctx);
     819           0 : }
     820             : 
     821             : static void
     822           0 : iobuf_get_channel_stats(struct spdk_io_channel_iter *iter)
     823             : {
     824           0 :         struct iobuf_get_stats_ctx *ctx = spdk_io_channel_iter_get_ctx(iter);
     825           0 :         struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(iter);
     826           0 :         struct iobuf_channel *iobuf_ch = spdk_io_channel_get_ctx(ch);
     827             :         struct spdk_iobuf_channel *channel;
     828             :         struct iobuf_module *module;
     829             :         struct spdk_iobuf_module_stats *it;
     830             :         uint32_t i, j;
     831             : 
     832           0 :         for (i = 0; i < ctx->num_modules; ++i) {
     833           0 :                 for (j = 0; j < IOBUF_MAX_CHANNELS; ++j) {
     834           0 :                         channel = iobuf_ch->channels[j];
     835           0 :                         if (channel == NULL) {
     836           0 :                                 continue;
     837             :                         }
     838             : 
     839           0 :                         it = &ctx->modules[i];
     840           0 :                         module = (struct iobuf_module *)channel->module;
     841           0 :                         if (strcmp(it->module, module->name) == 0) {
     842             :                                 struct spdk_iobuf_pool_cache *cache;
     843             :                                 uint32_t i;
     844             : 
     845           0 :                                 IOBUF_FOREACH_NUMA_ID(i) {
     846           0 :                                         cache = &channel->cache[i].small;
     847           0 :                                         it->small_pool.cache += cache->stats.cache;
     848           0 :                                         it->small_pool.main += cache->stats.main;
     849           0 :                                         it->small_pool.retry += cache->stats.retry;
     850             : 
     851           0 :                                         cache = &channel->cache[i].large;
     852           0 :                                         it->large_pool.cache += cache->stats.cache;
     853           0 :                                         it->large_pool.main += cache->stats.main;
     854           0 :                                         it->large_pool.retry += cache->stats.retry;
     855           0 :                                 }
     856           0 :                                 break;
     857             :                         }
     858           0 :                 }
     859           0 :         }
     860             : 
     861           0 :         spdk_for_each_channel_continue(iter, 0);
     862           0 : }
     863             : 
     864             : int
     865           0 : spdk_iobuf_get_stats(spdk_iobuf_get_stats_cb cb_fn, void *cb_arg)
     866             : {
     867             :         struct iobuf_module *module;
     868             :         struct iobuf_get_stats_ctx *ctx;
     869             :         uint32_t i;
     870             : 
     871           0 :         ctx = calloc(1, sizeof(*ctx));
     872           0 :         if (ctx == NULL) {
     873           0 :                 return -ENOMEM;
     874             :         }
     875             : 
     876           0 :         TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
     877           0 :                 ++ctx->num_modules;
     878           0 :         }
     879             : 
     880           0 :         ctx->modules = calloc(ctx->num_modules, sizeof(struct spdk_iobuf_module_stats));
     881           0 :         if (ctx->modules == NULL) {
     882           0 :                 free(ctx);
     883           0 :                 return -ENOMEM;
     884             :         }
     885             : 
     886           0 :         i = 0;
     887           0 :         TAILQ_FOREACH(module, &g_iobuf.modules, tailq) {
     888           0 :                 ctx->modules[i].module = module->name;
     889           0 :                 ++i;
     890           0 :         }
     891             : 
     892           0 :         ctx->cb_fn = cb_fn;
     893           0 :         ctx->cb_arg = cb_arg;
     894             : 
     895           0 :         spdk_for_each_channel(&g_iobuf, iobuf_get_channel_stats, ctx,
     896             :                               iobuf_get_channel_stats_done);
     897           0 :         return 0;
     898           0 : }

Generated by: LCOV version 1.15