LCOV - code coverage report
Current view: top level - lib/reduce - reduce.c (source / functions) Hit Total Coverage
Test: ut_cov_unit.info Lines: 813 1063 76.5 %
Date: 2024-12-14 20:56:31 Functions: 62 73 84.9 %

          Line data    Source code
       1             : /*   SPDX-License-Identifier: BSD-3-Clause
       2             :  *   Copyright (C) 2018 Intel Corporation.
       3             :  *   All rights reserved.
       4             :  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
       5             :  */
       6             : 
       7             : #include "spdk/stdinc.h"
       8             : 
       9             : #include "queue_internal.h"
      10             : 
      11             : #include "spdk/reduce.h"
      12             : #include "spdk/env.h"
      13             : #include "spdk/string.h"
      14             : #include "spdk/bit_array.h"
      15             : #include "spdk/util.h"
      16             : #include "spdk/log.h"
      17             : #include "spdk/memory.h"
      18             : #include "spdk/tree.h"
      19             : 
      20             : #include "libpmem.h"
      21             : 
      22             : /* Always round up the size of the PM region to the nearest cacheline. */
      23             : #define REDUCE_PM_SIZE_ALIGNMENT        64
      24             : 
      25             : /* Offset into the backing device where the persistent memory file's path is stored. */
      26             : #define REDUCE_BACKING_DEV_PATH_OFFSET  4096
      27             : 
      28             : #define REDUCE_EMPTY_MAP_ENTRY  -1ULL
      29             : 
      30             : #define REDUCE_NUM_VOL_REQUESTS 256
      31             : 
      32             : /* Structure written to offset 0 of both the pm file and the backing device. */
      33             : struct spdk_reduce_vol_superblock {
      34             :         uint8_t                         signature[8];
      35             :         struct spdk_reduce_vol_params   params;
      36             :         uint8_t                         reserved[4040];
      37             : };
      38             : SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect");
      39             : 
      40             : #define SPDK_REDUCE_SIGNATURE "SPDKREDU"
      41             : /* null terminator counts one */
      42             : SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 ==
      43             :                    SPDK_SIZEOF_MEMBER(struct spdk_reduce_vol_superblock, signature), "size incorrect");
      44             : 
      45             : #define REDUCE_PATH_MAX 4096
      46             : 
      47             : #define REDUCE_ZERO_BUF_SIZE 0x100000
      48             : 
      49             : /**
      50             :  * Describes a persistent memory file used to hold metadata associated with a
      51             :  *  compressed volume.
      52             :  */
      53             : struct spdk_reduce_pm_file {
      54             :         char                    path[REDUCE_PATH_MAX];
      55             :         void                    *pm_buf;
      56             :         int                     pm_is_pmem;
      57             :         uint64_t                size;
      58             : };
      59             : 
      60             : #define REDUCE_IO_READV         1
      61             : #define REDUCE_IO_WRITEV        2
      62             : #define REDUCE_IO_UNMAP         3
      63             : 
      64             : struct spdk_reduce_chunk_map {
      65             :         uint32_t                compressed_size;
      66             :         uint32_t                reserved;
      67             :         uint64_t                io_unit_index[0];
      68             : };
      69             : 
      70             : struct spdk_reduce_vol_request {
      71             :         /**
      72             :          *  Scratch buffer used for uncompressed chunk.  This is used for:
      73             :          *   1) source buffer for compression operations
      74             :          *   2) destination buffer for decompression operations
      75             :          *   3) data buffer when writing uncompressed chunk to disk
      76             :          *   4) data buffer when reading uncompressed chunk from disk
      77             :          */
      78             :         uint8_t                                 *decomp_buf;
      79             :         struct iovec                            *decomp_buf_iov;
      80             : 
      81             :         /**
      82             :          * These are used to construct the iovecs that are sent to
      83             :          *  the decomp engine, they point to a mix of the scratch buffer
      84             :          *  and user buffer
      85             :          */
      86             :         struct iovec                            decomp_iov[REDUCE_MAX_IOVECS + 2];
      87             :         int                                     decomp_iovcnt;
      88             : 
      89             :         /**
      90             :          *  Scratch buffer used for compressed chunk.  This is used for:
      91             :          *   1) destination buffer for compression operations
      92             :          *   2) source buffer for decompression operations
      93             :          *   3) data buffer when writing compressed chunk to disk
      94             :          *   4) data buffer when reading compressed chunk from disk
      95             :          */
      96             :         uint8_t                                 *comp_buf;
      97             :         struct iovec                            *comp_buf_iov;
      98             :         struct iovec                            *iov;
      99             :         bool                                    rmw;
     100             :         struct spdk_reduce_vol                  *vol;
     101             :         int                                     type;
     102             :         int                                     reduce_errno;
     103             :         int                                     iovcnt;
     104             :         int                                     num_backing_ops;
     105             :         uint32_t                                num_io_units;
     106             :         struct spdk_reduce_backing_io           *backing_io;
     107             :         bool                                    chunk_is_compressed;
     108             :         bool                                    copy_after_decompress;
     109             :         uint64_t                                offset;
     110             :         uint64_t                                logical_map_index;
     111             :         uint64_t                                length;
     112             :         uint64_t                                chunk_map_index;
     113             :         struct spdk_reduce_chunk_map            *chunk;
     114             :         spdk_reduce_vol_op_complete             cb_fn;
     115             :         void                                    *cb_arg;
     116             :         TAILQ_ENTRY(spdk_reduce_vol_request)    tailq;
     117             :         RB_ENTRY(spdk_reduce_vol_request)       rbnode;
     118             :         struct spdk_reduce_vol_cb_args          backing_cb_args;
     119             : };
     120             : 
     121             : struct spdk_reduce_vol {
     122             :         struct spdk_reduce_vol_params           params;
     123             :         struct spdk_reduce_vol_info             info;
     124             :         uint32_t                                backing_io_units_per_chunk;
     125             :         uint32_t                                backing_lba_per_io_unit;
     126             :         uint32_t                                logical_blocks_per_chunk;
     127             :         struct spdk_reduce_pm_file              pm_file;
     128             :         struct spdk_reduce_backing_dev          *backing_dev;
     129             :         struct spdk_reduce_vol_superblock       *backing_super;
     130             :         struct spdk_reduce_vol_superblock       *pm_super;
     131             :         uint64_t                                *pm_logical_map;
     132             :         uint64_t                                *pm_chunk_maps;
     133             : 
     134             :         struct spdk_bit_array                   *allocated_chunk_maps;
     135             :         /* The starting position when looking for a block from allocated_chunk_maps */
     136             :         uint64_t                                find_chunk_offset;
     137             :         /* Cache free chunks to speed up lookup of free chunk. */
     138             :         struct reduce_queue                     free_chunks_queue;
     139             :         struct spdk_bit_array                   *allocated_backing_io_units;
     140             :         /* The starting position when looking for a block from allocated_backing_io_units */
     141             :         uint64_t                                find_block_offset;
     142             :         /* Cache free blocks for backing bdev to speed up lookup of free backing blocks. */
     143             :         struct reduce_queue                     free_backing_blocks_queue;
     144             : 
     145             :         struct spdk_reduce_vol_request          *request_mem;
     146             :         TAILQ_HEAD(, spdk_reduce_vol_request)   free_requests;
     147             :         RB_HEAD(executing_req_tree, spdk_reduce_vol_request) executing_requests;
     148             :         TAILQ_HEAD(, spdk_reduce_vol_request)   queued_requests;
     149             : 
     150             :         /* Single contiguous buffer used for all request buffers for this volume. */
     151             :         uint8_t                                 *buf_mem;
     152             :         struct iovec                            *buf_iov_mem;
     153             :         /* Single contiguous buffer used for backing io buffers for this volume. */
     154             :         uint8_t                                 *buf_backing_io_mem;
     155             : };
     156             : 
     157             : static void _start_readv_request(struct spdk_reduce_vol_request *req);
     158             : static void _start_writev_request(struct spdk_reduce_vol_request *req);
     159             : static uint8_t *g_zero_buf;
     160             : static int g_vol_count = 0;
     161             : 
     162             : /*
     163             :  * Allocate extra metadata chunks and corresponding backing io units to account for
     164             :  *  outstanding IO in worst case scenario where logical map is completely allocated
     165             :  *  and no data can be compressed.  We need extra chunks in this case to handle
     166             :  *  in-flight writes since reduce never writes data in place.
     167             :  */
     168             : #define REDUCE_NUM_EXTRA_CHUNKS 128
     169             : 
     170             : static void
     171          48 : _reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len)
     172             : {
     173          48 :         if (vol->pm_file.pm_is_pmem) {
     174          48 :                 pmem_persist(addr, len);
     175             :         } else {
     176           0 :                 pmem_msync(addr, len);
     177             :         }
     178          48 : }
     179             : 
     180             : static uint64_t
     181          49 : _get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size)
     182             : {
     183             :         uint64_t chunks_in_logical_map, logical_map_size;
     184             : 
     185          49 :         chunks_in_logical_map = vol_size / chunk_size;
     186          49 :         logical_map_size = chunks_in_logical_map * sizeof(uint64_t);
     187             : 
     188             :         /* Round up to next cacheline. */
     189          49 :         return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) *
     190             :                REDUCE_PM_SIZE_ALIGNMENT;
     191             : }
     192             : 
     193             : static uint64_t
     194         349 : _get_total_chunks(uint64_t vol_size, uint64_t chunk_size)
     195             : {
     196             :         uint64_t num_chunks;
     197             : 
     198         349 :         num_chunks = vol_size / chunk_size;
     199         349 :         num_chunks += REDUCE_NUM_EXTRA_CHUNKS;
     200             : 
     201         349 :         return num_chunks;
     202             : }
     203             : 
     204             : static inline uint32_t
     205         341 : _reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk)
     206             : {
     207         341 :         return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk;
     208             : }
     209             : 
     210             : static uint64_t
     211          25 : _get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size)
     212             : {
     213             :         uint64_t io_units_per_chunk, num_chunks, total_chunks_size;
     214             : 
     215          25 :         num_chunks = _get_total_chunks(vol_size, chunk_size);
     216          25 :         io_units_per_chunk = chunk_size / backing_io_unit_size;
     217             : 
     218          25 :         total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk);
     219             : 
     220          25 :         return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) *
     221             :                REDUCE_PM_SIZE_ALIGNMENT;
     222             : }
     223             : 
     224             : static struct spdk_reduce_chunk_map *
     225         300 : _reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
     226             : {
     227             :         uintptr_t chunk_map_addr;
     228             : 
     229         300 :         assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size));
     230             : 
     231         300 :         chunk_map_addr = (uintptr_t)vol->pm_chunk_maps;
     232         300 :         chunk_map_addr += chunk_map_index *
     233         300 :                           _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
     234             : 
     235         300 :         return (struct spdk_reduce_chunk_map *)chunk_map_addr;
     236             : }
     237             : 
     238             : static int
     239          21 : _validate_vol_params(struct spdk_reduce_vol_params *params)
     240             : {
     241          21 :         if (params->vol_size > 0) {
     242             :                 /**
     243             :                  * User does not pass in the vol size - it gets calculated by libreduce from
     244             :                  *  values in this structure plus the size of the backing device.
     245             :                  */
     246           0 :                 return -EINVAL;
     247             :         }
     248             : 
     249          21 :         if (params->chunk_size == 0 || params->backing_io_unit_size == 0 ||
     250          21 :             params->logical_block_size == 0) {
     251           0 :                 return -EINVAL;
     252             :         }
     253             : 
     254             :         /* Chunk size must be an even multiple of the backing io unit size. */
     255          21 :         if ((params->chunk_size % params->backing_io_unit_size) != 0) {
     256           0 :                 return -EINVAL;
     257             :         }
     258             : 
     259             :         /* Chunk size must be an even multiple of the logical block size. */
     260          21 :         if ((params->chunk_size % params->logical_block_size) != 0) {
     261           0 :                 return -1;
     262             :         }
     263             : 
     264          21 :         return 0;
     265             : }
     266             : 
     267             : static uint64_t
     268          27 : _get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size)
     269             : {
     270             :         uint64_t num_chunks;
     271             : 
     272          27 :         num_chunks = backing_dev_size / chunk_size;
     273          27 :         if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) {
     274           1 :                 return 0;
     275             :         }
     276             : 
     277          26 :         num_chunks -= REDUCE_NUM_EXTRA_CHUNKS;
     278          26 :         return num_chunks * chunk_size;
     279             : }
     280             : 
     281             : static uint64_t
     282          25 : _get_pm_file_size(struct spdk_reduce_vol_params *params)
     283             : {
     284             :         uint64_t total_pm_size;
     285             : 
     286          25 :         total_pm_size = sizeof(struct spdk_reduce_vol_superblock);
     287          25 :         total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size);
     288          25 :         total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size,
     289          25 :                          params->backing_io_unit_size);
     290          25 :         return total_pm_size;
     291             : }
     292             : 
     293             : const struct spdk_uuid *
     294           1 : spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol)
     295             : {
     296           1 :         return &vol->params.uuid;
     297             : }
     298             : 
     299             : static void
     300          24 : _initialize_vol_pm_pointers(struct spdk_reduce_vol *vol)
     301             : {
     302             :         uint64_t logical_map_size;
     303             : 
     304             :         /* Superblock is at the beginning of the pm file. */
     305          24 :         vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf;
     306             : 
     307             :         /* Logical map immediately follows the super block. */
     308          24 :         vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1);
     309             : 
     310             :         /* Chunks maps follow the logical map. */
     311          24 :         logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size);
     312          24 :         vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size);
     313          24 : }
     314             : 
     315             : /* We need 2 iovs during load - one for the superblock, another for the path */
     316             : #define LOAD_IOV_COUNT  2
     317             : 
     318             : struct reduce_init_load_ctx {
     319             :         struct spdk_reduce_vol                  *vol;
     320             :         struct spdk_reduce_vol_cb_args          backing_cb_args;
     321             :         spdk_reduce_vol_op_with_handle_complete cb_fn;
     322             :         void                                    *cb_arg;
     323             :         struct iovec                            iov[LOAD_IOV_COUNT];
     324             :         void                                    *path;
     325             :         struct spdk_reduce_backing_io           *backing_io;
     326             : };
     327             : 
     328             : static inline bool
     329       14342 : _addr_crosses_huge_page(const void *addr, size_t *size)
     330             : {
     331             :         size_t _size;
     332             :         uint64_t rc;
     333             : 
     334       14342 :         assert(size);
     335             : 
     336       14342 :         _size = *size;
     337       14342 :         rc = spdk_vtophys(addr, size);
     338             : 
     339       14342 :         return rc == SPDK_VTOPHYS_ERROR || _size != *size;
     340             : }
     341             : 
     342             : static inline int
     343       14336 : _set_buffer(uint8_t **vol_buffer, uint8_t **_addr, uint8_t *addr_range, size_t buffer_size)
     344             : {
     345             :         uint8_t *addr;
     346       14336 :         size_t size_tmp = buffer_size;
     347             : 
     348       14336 :         addr = *_addr;
     349             : 
     350             :         /* Verify that addr + buffer_size doesn't cross huge page boundary */
     351       14336 :         if (_addr_crosses_huge_page(addr, &size_tmp)) {
     352             :                 /* Memory start is aligned on 2MiB, so buffer should be located at the end of the page.
     353             :                  * Skip remaining bytes and continue from the beginning of the next page */
     354           6 :                 addr += size_tmp;
     355             :         }
     356             : 
     357       14336 :         if (addr + buffer_size > addr_range) {
     358           0 :                 SPDK_ERRLOG("Vol buffer %p out of range %p\n", addr, addr_range);
     359           0 :                 return -ERANGE;
     360             :         }
     361             : 
     362       14336 :         *vol_buffer = addr;
     363       14336 :         *_addr = addr + buffer_size;
     364             : 
     365       14336 :         return 0;
     366             : }
     367             : 
     368             : static int
     369          28 : _allocate_vol_requests(struct spdk_reduce_vol *vol)
     370             : {
     371             :         struct spdk_reduce_vol_request *req;
     372          28 :         struct spdk_reduce_backing_dev *backing_dev = vol->backing_dev;
     373             :         uint32_t reqs_in_2mb_page, huge_pages_needed;
     374          28 :         uint8_t *buffer, *buffer_end;
     375          28 :         int i = 0;
     376          28 :         int rc = 0;
     377             : 
     378             :         /* It is needed to allocate comp and decomp buffers so that they do not cross physical
     379             :         * page boundaries. Assume that the system uses default 2MiB pages and chunk_size is not
     380             :         * necessarily power of 2
     381             :         * Allocate 2x since we need buffers for both read/write and compress/decompress
     382             :         * intermediate buffers. */
     383          28 :         reqs_in_2mb_page = VALUE_2MB / (vol->params.chunk_size * 2);
     384          28 :         if (!reqs_in_2mb_page) {
     385           0 :                 return -EINVAL;
     386             :         }
     387          28 :         huge_pages_needed = SPDK_CEIL_DIV(REDUCE_NUM_VOL_REQUESTS, reqs_in_2mb_page);
     388             : 
     389          28 :         vol->buf_mem = spdk_dma_malloc(VALUE_2MB * huge_pages_needed, VALUE_2MB, NULL);
     390          28 :         if (vol->buf_mem == NULL) {
     391           0 :                 return -ENOMEM;
     392             :         }
     393             : 
     394          28 :         vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req));
     395          28 :         if (vol->request_mem == NULL) {
     396           0 :                 spdk_free(vol->buf_mem);
     397           0 :                 vol->buf_mem = NULL;
     398           0 :                 return -ENOMEM;
     399             :         }
     400             : 
     401             :         /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate
     402             :          *  buffers.
     403             :          */
     404          28 :         vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS,
     405          28 :                                   2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk);
     406          28 :         if (vol->buf_iov_mem == NULL) {
     407           0 :                 free(vol->request_mem);
     408           0 :                 spdk_free(vol->buf_mem);
     409           0 :                 vol->request_mem = NULL;
     410           0 :                 vol->buf_mem = NULL;
     411           0 :                 return -ENOMEM;
     412             :         }
     413             : 
     414          28 :         vol->buf_backing_io_mem = calloc(REDUCE_NUM_VOL_REQUESTS, (sizeof(struct spdk_reduce_backing_io) +
     415          28 :                                          backing_dev->user_ctx_size) * vol->backing_io_units_per_chunk);
     416          28 :         if (vol->buf_backing_io_mem == NULL) {
     417           0 :                 free(vol->request_mem);
     418           0 :                 free(vol->buf_iov_mem);
     419           0 :                 spdk_free(vol->buf_mem);
     420           0 :                 vol->request_mem = NULL;
     421           0 :                 vol->buf_iov_mem = NULL;
     422           0 :                 vol->buf_mem = NULL;
     423           0 :                 return -ENOMEM;
     424             :         }
     425             : 
     426          28 :         buffer = vol->buf_mem;
     427          28 :         buffer_end = buffer + VALUE_2MB * huge_pages_needed;
     428             : 
     429        7196 :         for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) {
     430        7168 :                 req = &vol->request_mem[i];
     431        7168 :                 TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
     432       14336 :                 req->backing_io = (struct spdk_reduce_backing_io *)(vol->buf_backing_io_mem + i *
     433        7168 :                                   (sizeof(struct spdk_reduce_backing_io) + backing_dev->user_ctx_size) *
     434        7168 :                                   vol->backing_io_units_per_chunk);
     435             : 
     436        7168 :                 req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk];
     437        7168 :                 req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk];
     438             : 
     439        7168 :                 rc = _set_buffer(&req->comp_buf, &buffer, buffer_end, vol->params.chunk_size);
     440        7168 :                 if (rc) {
     441           0 :                         SPDK_ERRLOG("Failed to set comp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
     442             :                                     vol->buf_mem, buffer_end);
     443           0 :                         break;
     444             :                 }
     445        7168 :                 rc = _set_buffer(&req->decomp_buf, &buffer, buffer_end, vol->params.chunk_size);
     446        7168 :                 if (rc) {
     447           0 :                         SPDK_ERRLOG("Failed to set decomp buffer for req idx %u, addr %p, start %p, end %p\n", i, buffer,
     448             :                                     vol->buf_mem, buffer_end);
     449           0 :                         break;
     450             :                 }
     451             :         }
     452             : 
     453          28 :         if (rc) {
     454           0 :                 free(vol->buf_backing_io_mem);
     455           0 :                 free(vol->buf_iov_mem);
     456           0 :                 free(vol->request_mem);
     457           0 :                 spdk_free(vol->buf_mem);
     458           0 :                 vol->buf_mem = NULL;
     459           0 :                 vol->buf_backing_io_mem = NULL;
     460           0 :                 vol->buf_iov_mem = NULL;
     461           0 :                 vol->request_mem = NULL;
     462             :         }
     463             : 
     464          28 :         return rc;
     465             : }
     466             : 
     467             : const struct spdk_reduce_vol_info *
     468           0 : spdk_reduce_vol_get_info(const struct spdk_reduce_vol *vol)
     469             : {
     470           0 :         return &vol->info;
     471             : }
     472             : 
     473             : static void
     474          55 : _init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx)
     475             : {
     476          55 :         if (ctx != NULL) {
     477          26 :                 spdk_free(ctx->path);
     478          26 :                 free(ctx->backing_io);
     479          26 :                 free(ctx);
     480             :         }
     481             : 
     482          55 :         if (vol != NULL) {
     483          30 :                 if (vol->pm_file.pm_buf != NULL) {
     484          24 :                         pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size);
     485             :                 }
     486             : 
     487          30 :                 spdk_free(vol->backing_super);
     488          30 :                 spdk_bit_array_free(&vol->allocated_chunk_maps);
     489          30 :                 spdk_bit_array_free(&vol->allocated_backing_io_units);
     490          30 :                 free(vol->request_mem);
     491          30 :                 free(vol->buf_backing_io_mem);
     492          30 :                 free(vol->buf_iov_mem);
     493          30 :                 spdk_free(vol->buf_mem);
     494          30 :                 free(vol);
     495             :         }
     496          55 : }
     497             : 
     498             : static int
     499          26 : _alloc_zero_buff(void)
     500             : {
     501          26 :         int rc = 0;
     502             : 
     503             :         /* The zero buffer is shared between all volumes and just used
     504             :          * for reads so allocate one global instance here if not already
     505             :          * allocated when another vol init'd or loaded.
     506             :          */
     507          26 :         if (g_vol_count++ == 0) {
     508          24 :                 g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE,
     509             :                                           64, NULL, SPDK_ENV_LCORE_ID_ANY,
     510             :                                           SPDK_MALLOC_DMA);
     511          24 :                 if (g_zero_buf == NULL) {
     512           0 :                         g_vol_count--;
     513           0 :                         rc = -ENOMEM;
     514             :                 }
     515             :         }
     516          26 :         return rc;
     517             : }
     518             : 
     519             : static void
     520          15 : _init_write_super_cpl(void *cb_arg, int reduce_errno)
     521             : {
     522          15 :         struct reduce_init_load_ctx *init_ctx = cb_arg;
     523          15 :         int rc = 0;
     524             : 
     525          15 :         if (reduce_errno != 0) {
     526           0 :                 rc = reduce_errno;
     527           0 :                 goto err;
     528             :         }
     529             : 
     530          15 :         rc = _allocate_vol_requests(init_ctx->vol);
     531          15 :         if (rc != 0) {
     532           0 :                 goto err;
     533             :         }
     534             : 
     535          15 :         rc = _alloc_zero_buff();
     536          15 :         if (rc != 0) {
     537           0 :                 goto err;
     538             :         }
     539             : 
     540          15 :         init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, rc);
     541             :         /* Only clean up the ctx - the vol has been passed to the application
     542             :          *  for use now that initialization was successful.
     543             :          */
     544          15 :         _init_load_cleanup(NULL, init_ctx);
     545             : 
     546          15 :         return;
     547           0 : err:
     548           0 :         if (unlink(init_ctx->path)) {
     549           0 :                 SPDK_ERRLOG("%s could not be unlinked: %s\n",
     550             :                             (char *)init_ctx->path, spdk_strerror(errno));
     551             :         }
     552             : 
     553           0 :         init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc);
     554           0 :         _init_load_cleanup(init_ctx->vol, init_ctx);
     555             : }
     556             : 
     557             : static void
     558          15 : _init_write_path_cpl(void *cb_arg, int reduce_errno)
     559             : {
     560          15 :         struct reduce_init_load_ctx *init_ctx = cb_arg;
     561          15 :         struct spdk_reduce_vol *vol = init_ctx->vol;
     562          15 :         struct spdk_reduce_backing_io *backing_io = init_ctx->backing_io;
     563             : 
     564          15 :         if (reduce_errno != 0) {
     565           0 :                 _init_write_super_cpl(cb_arg, reduce_errno);
     566           0 :                 return;
     567             :         }
     568             : 
     569          15 :         init_ctx->iov[0].iov_base = vol->backing_super;
     570          15 :         init_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
     571          15 :         init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl;
     572          15 :         init_ctx->backing_cb_args.cb_arg = init_ctx;
     573             : 
     574          15 :         backing_io->dev = vol->backing_dev;
     575          15 :         backing_io->iov = init_ctx->iov;
     576          15 :         backing_io->iovcnt = 1;
     577          15 :         backing_io->lba = 0;
     578          15 :         backing_io->lba_count = sizeof(*vol->backing_super) / vol->backing_dev->blocklen;
     579          15 :         backing_io->backing_cb_args = &init_ctx->backing_cb_args;
     580          15 :         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
     581             : 
     582          15 :         vol->backing_dev->submit_backing_io(backing_io);
     583             : }
     584             : 
     585             : static int
     586          24 : _allocate_bit_arrays(struct spdk_reduce_vol *vol)
     587             : {
     588             :         uint64_t total_chunks, total_backing_io_units;
     589             :         uint32_t i, num_metadata_io_units;
     590             : 
     591          24 :         total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
     592          24 :         vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks);
     593          24 :         vol->find_chunk_offset = 0;
     594          24 :         total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size);
     595          24 :         vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units);
     596          24 :         vol->find_block_offset = 0;
     597             : 
     598          24 :         if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) {
     599           0 :                 return -ENOMEM;
     600             :         }
     601             : 
     602             :         /* Set backing io unit bits associated with metadata. */
     603           0 :         num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
     604          24 :                                 vol->params.backing_io_unit_size;
     605         198 :         for (i = 0; i < num_metadata_io_units; i++) {
     606         174 :                 spdk_bit_array_set(vol->allocated_backing_io_units, i);
     607         174 :                 vol->info.allocated_io_units++;
     608             :         }
     609             : 
     610          24 :         return 0;
     611             : }
     612             : 
     613             : static int
     614           1 : overlap_cmp(struct spdk_reduce_vol_request *req1, struct spdk_reduce_vol_request *req2)
     615             : {
     616           2 :         return (req1->logical_map_index < req2->logical_map_index ? -1 : req1->logical_map_index >
     617           1 :                 req2->logical_map_index);
     618             : }
     619        1145 : RB_GENERATE_STATIC(executing_req_tree, spdk_reduce_vol_request, rbnode, overlap_cmp);
     620             : 
     621             : 
     622             : void
     623          17 : spdk_reduce_vol_init(struct spdk_reduce_vol_params *params,
     624             :                      struct spdk_reduce_backing_dev *backing_dev,
     625             :                      const char *pm_file_dir,
     626             :                      spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
     627             : {
     628             :         struct spdk_reduce_vol *vol;
     629             :         struct reduce_init_load_ctx *init_ctx;
     630             :         struct spdk_reduce_backing_io *backing_io;
     631             :         uint64_t backing_dev_size;
     632          17 :         size_t mapped_len;
     633             :         int dir_len, max_dir_len, rc;
     634             : 
     635             :         /* We need to append a path separator and the UUID to the supplied
     636             :          * path.
     637             :          */
     638          17 :         max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1;
     639          17 :         dir_len = strnlen(pm_file_dir, max_dir_len);
     640             :         /* Strip trailing slash if the user provided one - we will add it back
     641             :          * later when appending the filename.
     642             :          */
     643          17 :         if (pm_file_dir[dir_len - 1] == '/') {
     644           0 :                 dir_len--;
     645             :         }
     646          17 :         if (dir_len == max_dir_len) {
     647           0 :                 SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir);
     648           0 :                 cb_fn(cb_arg, NULL, -EINVAL);
     649           0 :                 return;
     650             :         }
     651             : 
     652          17 :         rc = _validate_vol_params(params);
     653          17 :         if (rc != 0) {
     654           0 :                 SPDK_ERRLOG("invalid vol params\n");
     655           0 :                 cb_fn(cb_arg, NULL, rc);
     656           0 :                 return;
     657             :         }
     658             : 
     659          17 :         backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen;
     660          17 :         params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size);
     661          17 :         if (params->vol_size == 0) {
     662           1 :                 SPDK_ERRLOG("backing device is too small\n");
     663           1 :                 cb_fn(cb_arg, NULL, -EINVAL);
     664           1 :                 return;
     665             :         }
     666             : 
     667          16 :         if (backing_dev->submit_backing_io == NULL) {
     668           1 :                 SPDK_ERRLOG("backing_dev function pointer not specified\n");
     669           1 :                 cb_fn(cb_arg, NULL, -EINVAL);
     670           1 :                 return;
     671             :         }
     672             : 
     673          15 :         vol = calloc(1, sizeof(*vol));
     674          15 :         if (vol == NULL) {
     675           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     676           0 :                 return;
     677             :         }
     678             : 
     679          15 :         TAILQ_INIT(&vol->free_requests);
     680          15 :         RB_INIT(&vol->executing_requests);
     681          15 :         TAILQ_INIT(&vol->queued_requests);
     682          15 :         queue_init(&vol->free_chunks_queue);
     683          15 :         queue_init(&vol->free_backing_blocks_queue);
     684             : 
     685          15 :         vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL,
     686             :                                           SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
     687          15 :         if (vol->backing_super == NULL) {
     688           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     689           0 :                 _init_load_cleanup(vol, NULL);
     690           0 :                 return;
     691             :         }
     692             : 
     693          15 :         init_ctx = calloc(1, sizeof(*init_ctx));
     694          15 :         if (init_ctx == NULL) {
     695           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     696           0 :                 _init_load_cleanup(vol, NULL);
     697           0 :                 return;
     698             :         }
     699             : 
     700          15 :         backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
     701          15 :         if (backing_io == NULL) {
     702           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     703           0 :                 _init_load_cleanup(vol, init_ctx);
     704           0 :                 return;
     705             :         }
     706          15 :         init_ctx->backing_io = backing_io;
     707             : 
     708          15 :         init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL,
     709             :                                       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
     710          15 :         if (init_ctx->path == NULL) {
     711           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     712           0 :                 _init_load_cleanup(vol, init_ctx);
     713           0 :                 return;
     714             :         }
     715             : 
     716          15 :         if (spdk_uuid_is_null(&params->uuid)) {
     717           1 :                 spdk_uuid_generate(&params->uuid);
     718             :         }
     719             : 
     720          15 :         memcpy(vol->pm_file.path, pm_file_dir, dir_len);
     721          15 :         vol->pm_file.path[dir_len] = '/';
     722          15 :         spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN,
     723          15 :                             &params->uuid);
     724          15 :         vol->pm_file.size = _get_pm_file_size(params);
     725          15 :         vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size,
     726             :                                             PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600,
     727             :                                             &mapped_len, &vol->pm_file.pm_is_pmem);
     728          15 :         if (vol->pm_file.pm_buf == NULL) {
     729           0 :                 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n",
     730             :                             vol->pm_file.path, strerror(errno));
     731           0 :                 cb_fn(cb_arg, NULL, -errno);
     732           0 :                 _init_load_cleanup(vol, init_ctx);
     733           0 :                 return;
     734             :         }
     735             : 
     736          15 :         if (vol->pm_file.size != mapped_len) {
     737           0 :                 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
     738             :                             vol->pm_file.size, mapped_len);
     739           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     740           0 :                 _init_load_cleanup(vol, init_ctx);
     741           0 :                 return;
     742             :         }
     743             : 
     744          15 :         vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size;
     745          15 :         vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size;
     746          15 :         vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen;
     747          15 :         memcpy(&vol->params, params, sizeof(*params));
     748             : 
     749          15 :         vol->backing_dev = backing_dev;
     750             : 
     751          15 :         rc = _allocate_bit_arrays(vol);
     752          15 :         if (rc != 0) {
     753           0 :                 cb_fn(cb_arg, NULL, rc);
     754           0 :                 _init_load_cleanup(vol, init_ctx);
     755           0 :                 return;
     756             :         }
     757             : 
     758          15 :         memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE,
     759             :                sizeof(vol->backing_super->signature));
     760          15 :         memcpy(&vol->backing_super->params, params, sizeof(*params));
     761             : 
     762          15 :         _initialize_vol_pm_pointers(vol);
     763             : 
     764          15 :         memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super));
     765             :         /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY.
     766             :          * Note that this writes 0xFF to not just the logical map but the chunk maps as well.
     767             :          */
     768          15 :         memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super));
     769          15 :         _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size);
     770             : 
     771          15 :         init_ctx->vol = vol;
     772          15 :         init_ctx->cb_fn = cb_fn;
     773          15 :         init_ctx->cb_arg = cb_arg;
     774             : 
     775          15 :         memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX);
     776          15 :         init_ctx->iov[0].iov_base = init_ctx->path;
     777          15 :         init_ctx->iov[0].iov_len = REDUCE_PATH_MAX;
     778          15 :         init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl;
     779          15 :         init_ctx->backing_cb_args.cb_arg = init_ctx;
     780             :         /* Write path to offset 4K on backing device - just after where the super
     781             :          *  block will be written.  We wait until this is committed before writing the
     782             :          *  super block to guarantee we don't get the super block written without the
     783             :          *  the path if the system crashed in the middle of a write operation.
     784             :          */
     785          15 :         backing_io->dev = vol->backing_dev;
     786          15 :         backing_io->iov = init_ctx->iov;
     787          15 :         backing_io->iovcnt = 1;
     788          15 :         backing_io->lba = REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen;
     789          15 :         backing_io->lba_count = REDUCE_PATH_MAX / vol->backing_dev->blocklen;
     790          15 :         backing_io->backing_cb_args = &init_ctx->backing_cb_args;
     791          15 :         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
     792             : 
     793          15 :         vol->backing_dev->submit_backing_io(backing_io);
     794             : }
     795             : 
     796             : static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno);
     797             : 
     798             : static void
     799          11 : _load_read_super_and_path_cpl(void *cb_arg, int reduce_errno)
     800             : {
     801          11 :         struct reduce_init_load_ctx *load_ctx = cb_arg;
     802          11 :         struct spdk_reduce_vol *vol = load_ctx->vol;
     803             :         uint64_t backing_dev_size;
     804             :         uint64_t i, num_chunks, logical_map_index;
     805             :         struct spdk_reduce_chunk_map *chunk;
     806          11 :         size_t mapped_len;
     807             :         uint32_t j;
     808             :         int rc;
     809             : 
     810          11 :         if (reduce_errno != 0) {
     811           0 :                 rc = reduce_errno;
     812           0 :                 goto error;
     813             :         }
     814             : 
     815          11 :         rc = _alloc_zero_buff();
     816          11 :         if (rc) {
     817           0 :                 goto error;
     818             :         }
     819             : 
     820          11 :         if (memcmp(vol->backing_super->signature,
     821             :                    SPDK_REDUCE_SIGNATURE,
     822             :                    sizeof(vol->backing_super->signature)) != 0) {
     823             :                 /* This backing device isn't a libreduce backing device. */
     824           1 :                 rc = -EILSEQ;
     825           1 :                 goto error;
     826             :         }
     827             : 
     828             :         /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev.
     829             :          *  So don't bother getting the volume ready to use - invoke the callback immediately
     830             :          *  so destroy_load_cb can delete the metadata off of the block device and delete the
     831             :          *  persistent memory file if it exists.
     832             :          */
     833          10 :         memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path));
     834          10 :         if (load_ctx->cb_fn == (*destroy_load_cb)) {
     835           1 :                 load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
     836           1 :                 _init_load_cleanup(NULL, load_ctx);
     837           1 :                 return;
     838             :         }
     839             : 
     840           9 :         memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params));
     841           9 :         vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size;
     842           9 :         vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size;
     843           9 :         vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen;
     844             : 
     845           9 :         rc = _allocate_bit_arrays(vol);
     846           9 :         if (rc != 0) {
     847           0 :                 goto error;
     848             :         }
     849             : 
     850           9 :         backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen;
     851           9 :         if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) {
     852           0 :                 SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n",
     853             :                             backing_dev_size);
     854           0 :                 rc = -EILSEQ;
     855           0 :                 goto error;
     856             :         }
     857             : 
     858           9 :         vol->pm_file.size = _get_pm_file_size(&vol->params);
     859           9 :         vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len,
     860             :                                             &vol->pm_file.pm_is_pmem);
     861           9 :         if (vol->pm_file.pm_buf == NULL) {
     862           0 :                 SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno));
     863           0 :                 rc = -errno;
     864           0 :                 goto error;
     865             :         }
     866             : 
     867           9 :         if (vol->pm_file.size != mapped_len) {
     868           0 :                 SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n",
     869             :                             vol->pm_file.size, mapped_len);
     870           0 :                 rc = -ENOMEM;
     871           0 :                 goto error;
     872             :         }
     873             : 
     874           9 :         rc = _allocate_vol_requests(vol);
     875           9 :         if (rc != 0) {
     876           0 :                 goto error;
     877             :         }
     878             : 
     879           9 :         _initialize_vol_pm_pointers(vol);
     880             : 
     881           9 :         num_chunks = vol->params.vol_size / vol->params.chunk_size;
     882        1161 :         for (i = 0; i < num_chunks; i++) {
     883        1152 :                 logical_map_index = vol->pm_logical_map[i];
     884        1152 :                 if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) {
     885        1146 :                         continue;
     886             :                 }
     887           6 :                 spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index);
     888           6 :                 chunk = _reduce_vol_get_chunk_map(vol, logical_map_index);
     889          30 :                 for (j = 0; j < vol->backing_io_units_per_chunk; j++) {
     890          24 :                         if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) {
     891          12 :                                 spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]);
     892          12 :                                 vol->info.allocated_io_units++;
     893             :                         }
     894             :                 }
     895             :         }
     896             : 
     897           9 :         load_ctx->cb_fn(load_ctx->cb_arg, vol, 0);
     898             :         /* Only clean up the ctx - the vol has been passed to the application
     899             :          *  for use now that volume load was successful.
     900             :          */
     901           9 :         _init_load_cleanup(NULL, load_ctx);
     902           9 :         return;
     903             : 
     904           1 : error:
     905           1 :         load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc);
     906           1 :         _init_load_cleanup(vol, load_ctx);
     907             : }
     908             : 
     909             : void
     910          11 : spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev,
     911             :                      spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg)
     912             : {
     913             :         struct spdk_reduce_vol *vol;
     914             :         struct reduce_init_load_ctx *load_ctx;
     915             :         struct spdk_reduce_backing_io *backing_io;
     916             : 
     917          11 :         if (backing_dev->submit_backing_io == NULL) {
     918           0 :                 SPDK_ERRLOG("backing_dev function pointer not specified\n");
     919           0 :                 cb_fn(cb_arg, NULL, -EINVAL);
     920           0 :                 return;
     921             :         }
     922             : 
     923          11 :         vol = calloc(1, sizeof(*vol));
     924          11 :         if (vol == NULL) {
     925           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     926           0 :                 return;
     927             :         }
     928             : 
     929          11 :         TAILQ_INIT(&vol->free_requests);
     930          11 :         RB_INIT(&vol->executing_requests);
     931          11 :         TAILQ_INIT(&vol->queued_requests);
     932          11 :         queue_init(&vol->free_chunks_queue);
     933          11 :         queue_init(&vol->free_backing_blocks_queue);
     934             : 
     935          11 :         vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL,
     936             :                                           SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
     937          11 :         if (vol->backing_super == NULL) {
     938           0 :                 _init_load_cleanup(vol, NULL);
     939           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     940           0 :                 return;
     941             :         }
     942             : 
     943          11 :         vol->backing_dev = backing_dev;
     944             : 
     945          11 :         load_ctx = calloc(1, sizeof(*load_ctx));
     946          11 :         if (load_ctx == NULL) {
     947           0 :                 _init_load_cleanup(vol, NULL);
     948           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     949           0 :                 return;
     950             :         }
     951             : 
     952          11 :         backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
     953          11 :         if (backing_io == NULL) {
     954           0 :                 _init_load_cleanup(vol, load_ctx);
     955           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     956           0 :                 return;
     957             :         }
     958             : 
     959          11 :         load_ctx->backing_io = backing_io;
     960             : 
     961          11 :         load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL,
     962             :                                       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
     963          11 :         if (load_ctx->path == NULL) {
     964           0 :                 _init_load_cleanup(vol, load_ctx);
     965           0 :                 cb_fn(cb_arg, NULL, -ENOMEM);
     966           0 :                 return;
     967             :         }
     968             : 
     969          11 :         load_ctx->vol = vol;
     970          11 :         load_ctx->cb_fn = cb_fn;
     971          11 :         load_ctx->cb_arg = cb_arg;
     972             : 
     973          11 :         load_ctx->iov[0].iov_base = vol->backing_super;
     974          11 :         load_ctx->iov[0].iov_len = sizeof(*vol->backing_super);
     975          11 :         load_ctx->iov[1].iov_base = load_ctx->path;
     976          11 :         load_ctx->iov[1].iov_len = REDUCE_PATH_MAX;
     977          11 :         backing_io->dev = vol->backing_dev;
     978          11 :         backing_io->iov = load_ctx->iov;
     979          11 :         backing_io->iovcnt = LOAD_IOV_COUNT;
     980          11 :         backing_io->lba = 0;
     981          11 :         backing_io->lba_count = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) /
     982          11 :                                 vol->backing_dev->blocklen;
     983          11 :         backing_io->backing_cb_args = &load_ctx->backing_cb_args;
     984          11 :         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
     985             : 
     986          11 :         load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl;
     987          11 :         load_ctx->backing_cb_args.cb_arg = load_ctx;
     988          11 :         vol->backing_dev->submit_backing_io(backing_io);
     989             : }
     990             : 
     991             : void
     992          25 : spdk_reduce_vol_unload(struct spdk_reduce_vol *vol,
     993             :                        spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
     994             : {
     995          25 :         if (vol == NULL) {
     996             :                 /* This indicates a programming error. */
     997           0 :                 assert(false);
     998             :                 cb_fn(cb_arg, -EINVAL);
     999             :                 return;
    1000             :         }
    1001             : 
    1002          25 :         if (--g_vol_count == 0) {
    1003          23 :                 spdk_free(g_zero_buf);
    1004             :         }
    1005          25 :         assert(g_vol_count >= 0);
    1006          25 :         _init_load_cleanup(vol, NULL);
    1007          25 :         cb_fn(cb_arg, 0);
    1008             : }
    1009             : 
    1010             : struct reduce_destroy_ctx {
    1011             :         spdk_reduce_vol_op_complete             cb_fn;
    1012             :         void                                    *cb_arg;
    1013             :         struct spdk_reduce_vol                  *vol;
    1014             :         struct spdk_reduce_vol_superblock       *super;
    1015             :         struct iovec                            iov;
    1016             :         struct spdk_reduce_vol_cb_args          backing_cb_args;
    1017             :         int                                     reduce_errno;
    1018             :         char                                    pm_path[REDUCE_PATH_MAX];
    1019             :         struct spdk_reduce_backing_io           *backing_io;
    1020             : };
    1021             : 
    1022             : static void
    1023           1 : destroy_unload_cpl(void *cb_arg, int reduce_errno)
    1024             : {
    1025           1 :         struct reduce_destroy_ctx *destroy_ctx = cb_arg;
    1026             : 
    1027           1 :         if (destroy_ctx->reduce_errno == 0) {
    1028           1 :                 if (unlink(destroy_ctx->pm_path)) {
    1029           0 :                         SPDK_ERRLOG("%s could not be unlinked: %s\n",
    1030             :                                     destroy_ctx->pm_path, strerror(errno));
    1031             :                 }
    1032             :         }
    1033             : 
    1034             :         /* Even if the unload somehow failed, we still pass the destroy_ctx
    1035             :          * reduce_errno since that indicates whether or not the volume was
    1036             :          * actually destroyed.
    1037             :          */
    1038           1 :         destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno);
    1039           1 :         spdk_free(destroy_ctx->super);
    1040           1 :         free(destroy_ctx->backing_io);
    1041           1 :         free(destroy_ctx);
    1042           1 : }
    1043             : 
    1044             : static void
    1045           1 : _destroy_zero_super_cpl(void *cb_arg, int reduce_errno)
    1046             : {
    1047           1 :         struct reduce_destroy_ctx *destroy_ctx = cb_arg;
    1048           1 :         struct spdk_reduce_vol *vol = destroy_ctx->vol;
    1049             : 
    1050           1 :         destroy_ctx->reduce_errno = reduce_errno;
    1051           1 :         spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx);
    1052           1 : }
    1053             : 
    1054             : static void
    1055           1 : destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
    1056             : {
    1057           1 :         struct reduce_destroy_ctx *destroy_ctx = cb_arg;
    1058           1 :         struct spdk_reduce_backing_io *backing_io = destroy_ctx->backing_io;
    1059             : 
    1060           1 :         if (reduce_errno != 0) {
    1061           0 :                 destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno);
    1062           0 :                 spdk_free(destroy_ctx->super);
    1063           0 :                 free(destroy_ctx);
    1064           0 :                 return;
    1065             :         }
    1066             : 
    1067           1 :         destroy_ctx->vol = vol;
    1068           1 :         memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path));
    1069           1 :         destroy_ctx->iov.iov_base = destroy_ctx->super;
    1070           1 :         destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super);
    1071           1 :         destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl;
    1072           1 :         destroy_ctx->backing_cb_args.cb_arg = destroy_ctx;
    1073             : 
    1074           1 :         backing_io->dev = vol->backing_dev;
    1075           1 :         backing_io->iov = &destroy_ctx->iov;
    1076           1 :         backing_io->iovcnt = 1;
    1077           1 :         backing_io->lba = 0;
    1078           1 :         backing_io->lba_count = sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen;
    1079           1 :         backing_io->backing_cb_args = &destroy_ctx->backing_cb_args;
    1080           1 :         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
    1081             : 
    1082           1 :         vol->backing_dev->submit_backing_io(backing_io);
    1083             : }
    1084             : 
    1085             : void
    1086           1 : spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev,
    1087             :                         spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
    1088             : {
    1089             :         struct reduce_destroy_ctx *destroy_ctx;
    1090             :         struct spdk_reduce_backing_io *backing_io;
    1091             : 
    1092           1 :         destroy_ctx = calloc(1, sizeof(*destroy_ctx));
    1093           1 :         if (destroy_ctx == NULL) {
    1094           0 :                 cb_fn(cb_arg, -ENOMEM);
    1095           0 :                 return;
    1096             :         }
    1097             : 
    1098           1 :         backing_io = calloc(1, sizeof(*backing_io) + backing_dev->user_ctx_size);
    1099           1 :         if (backing_io == NULL) {
    1100           0 :                 free(destroy_ctx);
    1101           0 :                 cb_fn(cb_arg, -ENOMEM);
    1102           0 :                 return;
    1103             :         }
    1104             : 
    1105           1 :         destroy_ctx->backing_io = backing_io;
    1106             : 
    1107           1 :         destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL,
    1108             :                                           SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
    1109           1 :         if (destroy_ctx->super == NULL) {
    1110           0 :                 free(destroy_ctx);
    1111           0 :                 free(backing_io);
    1112           0 :                 cb_fn(cb_arg, -ENOMEM);
    1113           0 :                 return;
    1114             :         }
    1115           1 :         destroy_ctx->cb_fn = cb_fn;
    1116           1 :         destroy_ctx->cb_arg = cb_arg;
    1117           1 :         spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx);
    1118             : }
    1119             : 
    1120             : static bool
    1121         283 : _request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length)
    1122             : {
    1123             :         uint64_t start_chunk, end_chunk;
    1124             : 
    1125         283 :         start_chunk = offset / vol->logical_blocks_per_chunk;
    1126         283 :         end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk;
    1127             : 
    1128         283 :         return (start_chunk != end_chunk);
    1129             : }
    1130             : 
    1131             : typedef void (*reduce_request_fn)(void *_req, int reduce_errno);
    1132             : static void _start_unmap_request_full_chunk(void *ctx);
    1133             : 
    1134             : static void
    1135         288 : _reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno)
    1136             : {
    1137             :         struct spdk_reduce_vol_request *next_req;
    1138         288 :         struct spdk_reduce_vol *vol = req->vol;
    1139             : 
    1140         288 :         req->cb_fn(req->cb_arg, reduce_errno);
    1141         288 :         RB_REMOVE(executing_req_tree, &vol->executing_requests, req);
    1142             : 
    1143         288 :         TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) {
    1144           1 :                 if (next_req->logical_map_index == req->logical_map_index) {
    1145           1 :                         TAILQ_REMOVE(&vol->queued_requests, next_req, tailq);
    1146           1 :                         if (next_req->type == REDUCE_IO_READV) {
    1147           0 :                                 _start_readv_request(next_req);
    1148           1 :                         } else if (next_req->type == REDUCE_IO_WRITEV) {
    1149           1 :                                 _start_writev_request(next_req);
    1150             :                         } else {
    1151           0 :                                 assert(next_req->type == REDUCE_IO_UNMAP);
    1152           0 :                                 _start_unmap_request_full_chunk(next_req);
    1153             :                         }
    1154           1 :                         break;
    1155             :                 }
    1156             :         }
    1157             : 
    1158         288 :         TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq);
    1159         288 : }
    1160             : 
    1161             : static void
    1162           7 : _reduce_vol_reset_chunk(struct spdk_reduce_vol *vol, uint64_t chunk_map_index)
    1163             : {
    1164             :         struct spdk_reduce_chunk_map *chunk;
    1165             :         uint64_t index;
    1166             :         bool success;
    1167             :         uint32_t i;
    1168             : 
    1169           7 :         chunk = _reduce_vol_get_chunk_map(vol, chunk_map_index);
    1170          20 :         for (i = 0; i < vol->backing_io_units_per_chunk; i++) {
    1171          18 :                 index = chunk->io_unit_index[i];
    1172          18 :                 if (index == REDUCE_EMPTY_MAP_ENTRY) {
    1173           5 :                         break;
    1174             :                 }
    1175          13 :                 assert(spdk_bit_array_get(vol->allocated_backing_io_units,
    1176             :                                           index) == true);
    1177          13 :                 spdk_bit_array_clear(vol->allocated_backing_io_units, index);
    1178          13 :                 vol->info.allocated_io_units--;
    1179          13 :                 success = queue_enqueue(&vol->free_backing_blocks_queue, index);
    1180          13 :                 if (!success && index < vol->find_block_offset) {
    1181           0 :                         vol->find_block_offset = index;
    1182             :                 }
    1183          13 :                 chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY;
    1184             :         }
    1185           7 :         success = queue_enqueue(&vol->free_chunks_queue, chunk_map_index);
    1186           7 :         if (!success && chunk_map_index < vol->find_chunk_offset) {
    1187           0 :                 vol->find_chunk_offset = chunk_map_index;
    1188             :         }
    1189           7 :         spdk_bit_array_clear(vol->allocated_chunk_maps, chunk_map_index);
    1190           7 : }
    1191             : 
    1192             : static void
    1193          16 : _write_write_done(void *_req, int reduce_errno)
    1194             : {
    1195          16 :         struct spdk_reduce_vol_request *req = _req;
    1196          16 :         struct spdk_reduce_vol *vol = req->vol;
    1197             :         uint64_t old_chunk_map_index;
    1198             : 
    1199          16 :         if (reduce_errno != 0) {
    1200           0 :                 req->reduce_errno = reduce_errno;
    1201             :         }
    1202             : 
    1203          16 :         assert(req->num_backing_ops > 0);
    1204          16 :         if (--req->num_backing_ops > 0) {
    1205           0 :                 return;
    1206             :         }
    1207             : 
    1208          16 :         if (req->reduce_errno != 0) {
    1209           0 :                 _reduce_vol_reset_chunk(vol, req->chunk_map_index);
    1210           0 :                 _reduce_vol_complete_req(req, req->reduce_errno);
    1211           0 :                 return;
    1212             :         }
    1213             : 
    1214          16 :         old_chunk_map_index = vol->pm_logical_map[req->logical_map_index];
    1215          16 :         if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
    1216           6 :                 _reduce_vol_reset_chunk(vol, old_chunk_map_index);
    1217             :         }
    1218             : 
    1219             :         /*
    1220             :          * We don't need to persist the clearing of the old chunk map here.  The old chunk map
    1221             :          * becomes invalid after we update the logical map, since the old chunk map will no
    1222             :          * longer have a reference to it in the logical map.
    1223             :          */
    1224             : 
    1225             :         /* Persist the new chunk map.  This must be persisted before we update the logical map. */
    1226          16 :         _reduce_persist(vol, req->chunk,
    1227          16 :                         _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk));
    1228             : 
    1229          16 :         vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index;
    1230             : 
    1231          16 :         _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
    1232             : 
    1233          16 :         _reduce_vol_complete_req(req, 0);
    1234             : }
    1235             : 
    1236             : static struct spdk_reduce_backing_io *
    1237         283 : _reduce_vol_req_get_backing_io(struct spdk_reduce_vol_request *req, uint32_t index)
    1238             : {
    1239         283 :         struct spdk_reduce_backing_dev *backing_dev = req->vol->backing_dev;
    1240             :         struct spdk_reduce_backing_io *backing_io;
    1241             : 
    1242         283 :         backing_io = (struct spdk_reduce_backing_io *)((uint8_t *)req->backing_io +
    1243         283 :                         (sizeof(*backing_io) + backing_dev->user_ctx_size) * index);
    1244             : 
    1245         283 :         return backing_io;
    1246             : 
    1247             : }
    1248             : 
    1249             : struct reduce_merged_io_desc {
    1250             :         uint64_t io_unit_index;
    1251             :         uint32_t num_io_units;
    1252             : };
    1253             : 
    1254             : static void
    1255           0 : _issue_backing_ops_without_merge(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
    1256             :                                  reduce_request_fn next_fn, bool is_write)
    1257             : {
    1258             :         struct iovec *iov;
    1259             :         struct spdk_reduce_backing_io *backing_io;
    1260             :         uint8_t *buf;
    1261             :         uint32_t i;
    1262             : 
    1263           0 :         if (req->chunk_is_compressed) {
    1264           0 :                 iov = req->comp_buf_iov;
    1265           0 :                 buf = req->comp_buf;
    1266             :         } else {
    1267           0 :                 iov = req->decomp_buf_iov;
    1268           0 :                 buf = req->decomp_buf;
    1269             :         }
    1270             : 
    1271           0 :         req->num_backing_ops = req->num_io_units;
    1272           0 :         req->backing_cb_args.cb_fn = next_fn;
    1273           0 :         req->backing_cb_args.cb_arg = req;
    1274           0 :         for (i = 0; i < req->num_io_units; i++) {
    1275           0 :                 backing_io = _reduce_vol_req_get_backing_io(req, i);
    1276           0 :                 iov[i].iov_base = buf + i * vol->params.backing_io_unit_size;
    1277           0 :                 iov[i].iov_len = vol->params.backing_io_unit_size;
    1278           0 :                 backing_io->dev  = vol->backing_dev;
    1279           0 :                 backing_io->iov = &iov[i];
    1280           0 :                 backing_io->iovcnt = 1;
    1281           0 :                 backing_io->lba = req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit;
    1282           0 :                 backing_io->lba_count = vol->backing_lba_per_io_unit;
    1283           0 :                 backing_io->backing_cb_args = &req->backing_cb_args;
    1284           0 :                 if (is_write) {
    1285           0 :                         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
    1286             :                 } else {
    1287           0 :                         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
    1288             :                 }
    1289           0 :                 vol->backing_dev->submit_backing_io(backing_io);
    1290             :         }
    1291           0 : }
    1292             : 
    1293             : static void
    1294         283 : _issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol,
    1295             :                    reduce_request_fn next_fn, bool is_write)
    1296             : {
    1297             :         struct iovec *iov;
    1298             :         struct spdk_reduce_backing_io *backing_io;
    1299         283 :         struct reduce_merged_io_desc merged_io_desc[4];
    1300             :         uint8_t *buf;
    1301         283 :         bool merge = false;
    1302         283 :         uint32_t num_io = 0;
    1303         283 :         uint32_t io_unit_counts = 0;
    1304         283 :         uint32_t merged_io_idx = 0;
    1305             :         uint32_t i;
    1306             : 
    1307             :         /* The merged_io_desc value is defined here to contain four elements,
    1308             :          * and the chunk size must be four times the maximum of the io unit.
    1309             :          * if chunk size is too big, don't merge IO.
    1310             :          */
    1311         283 :         if (vol->backing_io_units_per_chunk > 4) {
    1312           0 :                 _issue_backing_ops_without_merge(req, vol, next_fn, is_write);
    1313           0 :                 return;
    1314             :         }
    1315             : 
    1316         283 :         if (req->chunk_is_compressed) {
    1317         279 :                 iov = req->comp_buf_iov;
    1318         279 :                 buf = req->comp_buf;
    1319             :         } else {
    1320           4 :                 iov = req->decomp_buf_iov;
    1321           4 :                 buf = req->decomp_buf;
    1322             :         }
    1323             : 
    1324         295 :         for (i = 0; i < req->num_io_units; i++) {
    1325         295 :                 if (!merge) {
    1326         283 :                         merged_io_desc[merged_io_idx].io_unit_index = req->chunk->io_unit_index[i];
    1327         283 :                         merged_io_desc[merged_io_idx].num_io_units = 1;
    1328         283 :                         num_io++;
    1329             :                 }
    1330             : 
    1331         295 :                 if (i + 1 == req->num_io_units) {
    1332         283 :                         break;
    1333             :                 }
    1334             : 
    1335          12 :                 if (req->chunk->io_unit_index[i] + 1 == req->chunk->io_unit_index[i + 1]) {
    1336          12 :                         merged_io_desc[merged_io_idx].num_io_units += 1;
    1337          12 :                         merge = true;
    1338          12 :                         continue;
    1339             :                 }
    1340           0 :                 merge = false;
    1341           0 :                 merged_io_idx++;
    1342             :         }
    1343             : 
    1344         283 :         req->num_backing_ops = num_io;
    1345         283 :         req->backing_cb_args.cb_fn = next_fn;
    1346         283 :         req->backing_cb_args.cb_arg = req;
    1347         566 :         for (i = 0; i < num_io; i++) {
    1348         283 :                 backing_io = _reduce_vol_req_get_backing_io(req, i);
    1349         283 :                 iov[i].iov_base = buf + io_unit_counts * vol->params.backing_io_unit_size;
    1350         283 :                 iov[i].iov_len = vol->params.backing_io_unit_size * merged_io_desc[i].num_io_units;
    1351         283 :                 backing_io->dev  = vol->backing_dev;
    1352         283 :                 backing_io->iov = &iov[i];
    1353         283 :                 backing_io->iovcnt = 1;
    1354         283 :                 backing_io->lba = merged_io_desc[i].io_unit_index * vol->backing_lba_per_io_unit;
    1355         283 :                 backing_io->lba_count = vol->backing_lba_per_io_unit * merged_io_desc[i].num_io_units;
    1356         283 :                 backing_io->backing_cb_args = &req->backing_cb_args;
    1357         283 :                 if (is_write) {
    1358          16 :                         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_WRITE;
    1359             :                 } else {
    1360         267 :                         backing_io->backing_io_type = SPDK_REDUCE_BACKING_IO_READ;
    1361             :                 }
    1362         283 :                 vol->backing_dev->submit_backing_io(backing_io);
    1363             : 
    1364             :                 /* Collects the number of processed I/O. */
    1365         283 :                 io_unit_counts += merged_io_desc[i].num_io_units;
    1366             :         }
    1367             : }
    1368             : 
    1369             : static void
    1370          16 : _reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn,
    1371             :                         uint32_t compressed_size)
    1372             : {
    1373          16 :         struct spdk_reduce_vol *vol = req->vol;
    1374             :         uint32_t i;
    1375          16 :         uint64_t chunk_offset, remainder, free_index, total_len = 0;
    1376             :         uint8_t *buf;
    1377             :         bool success;
    1378             :         int j;
    1379             : 
    1380          16 :         success = queue_dequeue(&vol->free_chunks_queue, &free_index);
    1381          16 :         if (success) {
    1382           0 :                 req->chunk_map_index = free_index;
    1383             :         } else {
    1384          16 :                 req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps,
    1385          16 :                                        vol->find_chunk_offset);
    1386          16 :                 vol->find_chunk_offset = req->chunk_map_index + 1;
    1387             :         }
    1388             : 
    1389             :         /* TODO: fail if no chunk map found - but really this should not happen if we
    1390             :          * size the number of requests similarly to number of extra chunk maps
    1391             :          */
    1392          16 :         assert(req->chunk_map_index != REDUCE_EMPTY_MAP_ENTRY);
    1393          16 :         spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index);
    1394             : 
    1395          16 :         req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
    1396          16 :         req->num_io_units = spdk_divide_round_up(compressed_size,
    1397          16 :                             vol->params.backing_io_unit_size);
    1398          16 :         req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
    1399          16 :         req->chunk->compressed_size =
    1400          16 :                 req->chunk_is_compressed ? compressed_size : vol->params.chunk_size;
    1401             : 
    1402             :         /* if the chunk is uncompressed we need to copy the data from the host buffers. */
    1403          16 :         if (req->chunk_is_compressed == false) {
    1404           4 :                 chunk_offset = req->offset % vol->logical_blocks_per_chunk;
    1405           4 :                 buf = req->decomp_buf;
    1406           4 :                 total_len = chunk_offset * vol->params.logical_block_size;
    1407             : 
    1408             :                 /* zero any offset into chunk */
    1409           4 :                 if (req->rmw == false && chunk_offset) {
    1410           0 :                         memset(buf, 0, total_len);
    1411             :                 }
    1412           4 :                 buf += total_len;
    1413             : 
    1414             :                 /* copy the data */
    1415           8 :                 for (j = 0; j < req->iovcnt; j++) {
    1416           4 :                         memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len);
    1417           4 :                         buf += req->iov[j].iov_len;
    1418           4 :                         total_len += req->iov[j].iov_len;
    1419             :                 }
    1420             : 
    1421             :                 /* zero any remainder */
    1422           4 :                 remainder = vol->params.chunk_size - total_len;
    1423           4 :                 total_len += remainder;
    1424           4 :                 if (req->rmw == false && remainder) {
    1425           0 :                         memset(buf, 0, remainder);
    1426             :                 }
    1427           4 :                 assert(total_len == vol->params.chunk_size);
    1428             :         }
    1429             : 
    1430          44 :         for (i = 0; i < req->num_io_units; i++) {
    1431          28 :                 success = queue_dequeue(&vol->free_backing_blocks_queue, &free_index);
    1432          28 :                 if (success) {
    1433           0 :                         req->chunk->io_unit_index[i] = free_index;
    1434             :                 } else {
    1435          28 :                         req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units,
    1436          28 :                                                        vol->find_block_offset);
    1437          28 :                         vol->find_block_offset = req->chunk->io_unit_index[i] + 1;
    1438             :                 }
    1439             :                 /* TODO: fail if no backing block found - but really this should also not
    1440             :                  * happen (see comment above).
    1441             :                  */
    1442          28 :                 assert(req->chunk->io_unit_index[i] != REDUCE_EMPTY_MAP_ENTRY);
    1443          28 :                 spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]);
    1444          28 :                 vol->info.allocated_io_units++;
    1445             :         }
    1446             : 
    1447          16 :         _issue_backing_ops(req, vol, next_fn, true /* write */);
    1448          16 : }
    1449             : 
    1450             : static void
    1451          16 : _write_compress_done(void *_req, int reduce_errno)
    1452             : {
    1453          16 :         struct spdk_reduce_vol_request *req = _req;
    1454             : 
    1455             :         /* Negative reduce_errno indicates failure for compression operations.
    1456             :          * Just write the uncompressed data instead.  Force this to happen
    1457             :          * by just passing the full chunk size to _reduce_vol_write_chunk.
    1458             :          * When it sees the data couldn't be compressed, it will just write
    1459             :          * the uncompressed buffer to disk.
    1460             :          */
    1461          16 :         if (reduce_errno < 0) {
    1462           4 :                 req->backing_cb_args.output_size = req->vol->params.chunk_size;
    1463             :         }
    1464             : 
    1465          16 :         _reduce_vol_write_chunk(req, _write_write_done, req->backing_cb_args.output_size);
    1466          16 : }
    1467             : 
    1468             : static void
    1469          16 : _reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
    1470             : {
    1471          16 :         struct spdk_reduce_vol *vol = req->vol;
    1472             : 
    1473          16 :         req->backing_cb_args.cb_fn = next_fn;
    1474          16 :         req->backing_cb_args.cb_arg = req;
    1475          16 :         req->comp_buf_iov[0].iov_base = req->comp_buf;
    1476          16 :         req->comp_buf_iov[0].iov_len = vol->params.chunk_size;
    1477          32 :         vol->backing_dev->compress(vol->backing_dev,
    1478          16 :                                    req->decomp_iov, req->decomp_iovcnt, req->comp_buf_iov, 1,
    1479             :                                    &req->backing_cb_args);
    1480          16 : }
    1481             : 
    1482             : static void
    1483           4 : _reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
    1484             : {
    1485           4 :         struct spdk_reduce_vol *vol = req->vol;
    1486             : 
    1487           4 :         req->backing_cb_args.cb_fn = next_fn;
    1488           4 :         req->backing_cb_args.cb_arg = req;
    1489           4 :         req->comp_buf_iov[0].iov_base = req->comp_buf;
    1490           4 :         req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
    1491           4 :         req->decomp_buf_iov[0].iov_base = req->decomp_buf;
    1492           4 :         req->decomp_buf_iov[0].iov_len = vol->params.chunk_size;
    1493           4 :         vol->backing_dev->decompress(vol->backing_dev,
    1494             :                                      req->comp_buf_iov, 1, req->decomp_buf_iov, 1,
    1495             :                                      &req->backing_cb_args);
    1496           4 : }
    1497             : 
    1498             : static void
    1499         271 : _reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
    1500             : {
    1501         271 :         struct spdk_reduce_vol *vol = req->vol;
    1502         271 :         uint64_t chunk_offset, remainder = 0;
    1503         271 :         uint64_t ttl_len = 0;
    1504         271 :         size_t iov_len;
    1505             :         int i;
    1506             : 
    1507         271 :         req->decomp_iovcnt = 0;
    1508         271 :         chunk_offset = req->offset % vol->logical_blocks_per_chunk;
    1509             : 
    1510             :         /* If backing device doesn't support SGL output then we should copy the result of decompression to user's buffer
    1511             :          * if at least one of the conditions below is true:
    1512             :          * 1. User's buffer is fragmented
    1513             :          * 2. Length of the user's buffer is less than the chunk
    1514             :          * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
    1515         271 :         iov_len = req->iov[0].iov_len;
    1516         273 :         req->copy_after_decompress = !vol->backing_dev->sgl_out && (req->iovcnt > 1 ||
    1517           4 :                                      req->iov[0].iov_len < vol->params.chunk_size ||
    1518           2 :                                      _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len));
    1519         271 :         if (req->copy_after_decompress) {
    1520           4 :                 req->decomp_iov[0].iov_base = req->decomp_buf;
    1521           4 :                 req->decomp_iov[0].iov_len = vol->params.chunk_size;
    1522           4 :                 req->decomp_iovcnt = 1;
    1523           4 :                 goto decompress;
    1524             :         }
    1525             : 
    1526         267 :         if (chunk_offset) {
    1527             :                 /* first iov point to our scratch buffer for any offset into the chunk */
    1528         249 :                 req->decomp_iov[0].iov_base = req->decomp_buf;
    1529         249 :                 req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size;
    1530         249 :                 ttl_len += req->decomp_iov[0].iov_len;
    1531         249 :                 req->decomp_iovcnt = 1;
    1532             :         }
    1533             : 
    1534             :         /* now the user data iov, direct to the user buffer */
    1535         537 :         for (i = 0; i < req->iovcnt; i++) {
    1536         270 :                 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
    1537         270 :                 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
    1538         270 :                 ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len;
    1539             :         }
    1540         267 :         req->decomp_iovcnt += req->iovcnt;
    1541             : 
    1542             :         /* send the rest of the chunk to our scratch buffer */
    1543         267 :         remainder = vol->params.chunk_size - ttl_len;
    1544         267 :         if (remainder) {
    1545         252 :                 req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len;
    1546         252 :                 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
    1547         252 :                 ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len;
    1548         252 :                 req->decomp_iovcnt++;
    1549             :         }
    1550         267 :         assert(ttl_len == vol->params.chunk_size);
    1551             : 
    1552         271 : decompress:
    1553         271 :         assert(!req->copy_after_decompress || (req->copy_after_decompress && req->decomp_iovcnt == 1));
    1554         271 :         req->backing_cb_args.cb_fn = next_fn;
    1555         271 :         req->backing_cb_args.cb_arg = req;
    1556         271 :         req->comp_buf_iov[0].iov_base = req->comp_buf;
    1557         271 :         req->comp_buf_iov[0].iov_len = req->chunk->compressed_size;
    1558         542 :         vol->backing_dev->decompress(vol->backing_dev,
    1559         271 :                                      req->comp_buf_iov, 1, req->decomp_iov, req->decomp_iovcnt,
    1560             :                                      &req->backing_cb_args);
    1561         271 : }
    1562             : 
    1563             : static inline void
    1564           8 : _prepare_compress_chunk_copy_user_buffers(struct spdk_reduce_vol_request *req, bool zero_paddings)
    1565             : {
    1566           8 :         struct spdk_reduce_vol *vol = req->vol;
    1567           8 :         uint64_t chunk_offset, ttl_len = 0;
    1568           8 :         uint64_t remainder = 0;
    1569           8 :         char *copy_offset = NULL;
    1570           8 :         uint32_t lbsize = vol->params.logical_block_size;
    1571             :         int i;
    1572             : 
    1573           8 :         req->decomp_iov[0].iov_base = req->decomp_buf;
    1574           8 :         req->decomp_iov[0].iov_len = vol->params.chunk_size;
    1575           8 :         req->decomp_iovcnt = 1;
    1576           8 :         copy_offset = req->decomp_iov[0].iov_base;
    1577           8 :         chunk_offset = req->offset % vol->logical_blocks_per_chunk;
    1578             : 
    1579           8 :         if (chunk_offset) {
    1580           2 :                 ttl_len += chunk_offset * lbsize;
    1581             :                 /* copy_offset already points to the correct buffer if zero_paddings=false */
    1582           2 :                 if (zero_paddings) {
    1583           1 :                         memset(copy_offset, 0, ttl_len);
    1584             :                 }
    1585           2 :                 copy_offset += ttl_len;
    1586             :         }
    1587             : 
    1588             :         /* now the user data iov, direct from the user buffer */
    1589          22 :         for (i = 0; i < req->iovcnt; i++) {
    1590          14 :                 memcpy(copy_offset, req->iov[i].iov_base, req->iov[i].iov_len);
    1591          14 :                 copy_offset += req->iov[i].iov_len;
    1592          14 :                 ttl_len += req->iov[i].iov_len;
    1593             :         }
    1594             : 
    1595           8 :         remainder = vol->params.chunk_size - ttl_len;
    1596           8 :         if (remainder) {
    1597             :                 /* copy_offset already points to the correct buffer if zero_paddings=false */
    1598           4 :                 if (zero_paddings) {
    1599           2 :                         memset(copy_offset, 0, remainder);
    1600             :                 }
    1601           4 :                 ttl_len += remainder;
    1602             :         }
    1603             : 
    1604           8 :         assert(ttl_len == req->vol->params.chunk_size);
    1605           8 : }
    1606             : 
    1607             : /* This function can be called when we are compressing a new data or in case of read-modify-write
    1608             :  * In the first case possible paddings should be filled with zeroes, in the second case the paddings
    1609             :  * should point to already read and decompressed buffer */
    1610             : static inline void
    1611          33 : _prepare_compress_chunk(struct spdk_reduce_vol_request *req, bool zero_paddings)
    1612             : {
    1613          33 :         struct spdk_reduce_vol *vol = req->vol;
    1614          33 :         char *padding_buffer = zero_paddings ? g_zero_buf : req->decomp_buf;
    1615          33 :         uint64_t chunk_offset, ttl_len = 0;
    1616          33 :         uint64_t remainder = 0;
    1617          33 :         uint32_t lbsize = vol->params.logical_block_size;
    1618          33 :         size_t iov_len;
    1619             :         int i;
    1620             : 
    1621             :         /* If backing device doesn't support SGL input then we should copy user's buffer into decomp_buf
    1622             :          * if at least one of the conditions below is true:
    1623             :          * 1. User's buffer is fragmented
    1624             :          * 2. Length of the user's buffer is less than the chunk
    1625             :          * 3. User's buffer is contig, equals chunk_size but crosses huge page boundary */
    1626          33 :         iov_len = req->iov[0].iov_len;
    1627          33 :         if (!vol->backing_dev->sgl_in && (req->iovcnt > 1 ||
    1628           8 :                                           req->iov[0].iov_len < vol->params.chunk_size ||
    1629           4 :                                           _addr_crosses_huge_page(req->iov[0].iov_base, &iov_len))) {
    1630           8 :                 _prepare_compress_chunk_copy_user_buffers(req, zero_paddings);
    1631           8 :                 return;
    1632             :         }
    1633             : 
    1634          25 :         req->decomp_iovcnt = 0;
    1635          25 :         chunk_offset = req->offset % vol->logical_blocks_per_chunk;
    1636             : 
    1637          25 :         if (chunk_offset != 0) {
    1638          11 :                 ttl_len += chunk_offset * lbsize;
    1639          11 :                 req->decomp_iov[0].iov_base = padding_buffer;
    1640          11 :                 req->decomp_iov[0].iov_len = ttl_len;
    1641          11 :                 req->decomp_iovcnt = 1;
    1642             :         }
    1643             : 
    1644             :         /* now the user data iov, direct from the user buffer */
    1645          57 :         for (i = 0; i < req->iovcnt; i++) {
    1646          32 :                 req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base;
    1647          32 :                 req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len;
    1648          32 :                 ttl_len += req->iov[i].iov_len;
    1649             :         }
    1650          25 :         req->decomp_iovcnt += req->iovcnt;
    1651             : 
    1652          25 :         remainder = vol->params.chunk_size - ttl_len;
    1653          25 :         if (remainder) {
    1654          14 :                 req->decomp_iov[req->decomp_iovcnt].iov_base = padding_buffer + ttl_len;
    1655          14 :                 req->decomp_iov[req->decomp_iovcnt].iov_len = remainder;
    1656          14 :                 req->decomp_iovcnt++;
    1657          14 :                 ttl_len += remainder;
    1658             :         }
    1659          25 :         assert(ttl_len == req->vol->params.chunk_size);
    1660             : }
    1661             : 
    1662             : static void
    1663           4 : _write_decompress_done(void *_req, int reduce_errno)
    1664             : {
    1665           4 :         struct spdk_reduce_vol_request *req = _req;
    1666             : 
    1667             :         /* Negative reduce_errno indicates failure for compression operations. */
    1668           4 :         if (reduce_errno < 0) {
    1669           0 :                 _reduce_vol_complete_req(req, reduce_errno);
    1670           0 :                 return;
    1671             :         }
    1672             : 
    1673             :         /* Positive reduce_errno indicates that the output size field in the backing_cb_args
    1674             :          * represents the output_size.
    1675             :          */
    1676           4 :         if (req->backing_cb_args.output_size != req->vol->params.chunk_size) {
    1677           0 :                 _reduce_vol_complete_req(req, -EIO);
    1678           0 :                 return;
    1679             :         }
    1680             : 
    1681           4 :         _prepare_compress_chunk(req, false);
    1682           4 :         _reduce_vol_compress_chunk(req, _write_compress_done);
    1683             : }
    1684             : 
    1685             : static void
    1686           4 : _write_read_done(void *_req, int reduce_errno)
    1687             : {
    1688           4 :         struct spdk_reduce_vol_request *req = _req;
    1689             : 
    1690           4 :         if (reduce_errno != 0) {
    1691           0 :                 req->reduce_errno = reduce_errno;
    1692             :         }
    1693             : 
    1694           4 :         assert(req->num_backing_ops > 0);
    1695           4 :         if (--req->num_backing_ops > 0) {
    1696           0 :                 return;
    1697             :         }
    1698             : 
    1699           4 :         if (req->reduce_errno != 0) {
    1700           0 :                 _reduce_vol_complete_req(req, req->reduce_errno);
    1701           0 :                 return;
    1702             :         }
    1703             : 
    1704           4 :         if (req->chunk_is_compressed) {
    1705           4 :                 _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done);
    1706             :         } else {
    1707           0 :                 req->backing_cb_args.output_size = req->chunk->compressed_size;
    1708             : 
    1709           0 :                 _write_decompress_done(req, 0);
    1710             :         }
    1711             : }
    1712             : 
    1713             : static void
    1714         271 : _read_decompress_done(void *_req, int reduce_errno)
    1715             : {
    1716         271 :         struct spdk_reduce_vol_request *req = _req;
    1717         271 :         struct spdk_reduce_vol *vol = req->vol;
    1718             : 
    1719             :         /* Negative reduce_errno indicates failure for compression operations. */
    1720         271 :         if (reduce_errno < 0) {
    1721           0 :                 _reduce_vol_complete_req(req, reduce_errno);
    1722           0 :                 return;
    1723             :         }
    1724             : 
    1725             :         /* Positive reduce_errno indicates that the output size field in the backing_cb_args
    1726             :          * represents the output_size.
    1727             :          */
    1728         271 :         if (req->backing_cb_args.output_size != vol->params.chunk_size) {
    1729           0 :                 _reduce_vol_complete_req(req, -EIO);
    1730           0 :                 return;
    1731             :         }
    1732             : 
    1733         271 :         if (req->copy_after_decompress) {
    1734           4 :                 uint64_t chunk_offset = req->offset % vol->logical_blocks_per_chunk;
    1735           4 :                 char *decomp_buffer = (char *)req->decomp_buf + chunk_offset * vol->params.logical_block_size;
    1736             :                 int i;
    1737             : 
    1738          11 :                 for (i = 0; i < req->iovcnt; i++) {
    1739           7 :                         memcpy(req->iov[i].iov_base, decomp_buffer, req->iov[i].iov_len);
    1740           7 :                         decomp_buffer += req->iov[i].iov_len;
    1741           7 :                         assert(decomp_buffer <= (char *)req->decomp_buf + vol->params.chunk_size);
    1742             :                 }
    1743             :         }
    1744             : 
    1745         271 :         _reduce_vol_complete_req(req, 0);
    1746             : }
    1747             : 
    1748             : static void
    1749         263 : _read_read_done(void *_req, int reduce_errno)
    1750             : {
    1751         263 :         struct spdk_reduce_vol_request *req = _req;
    1752             : 
    1753         263 :         if (reduce_errno != 0) {
    1754           0 :                 req->reduce_errno = reduce_errno;
    1755             :         }
    1756             : 
    1757         263 :         assert(req->num_backing_ops > 0);
    1758         263 :         if (--req->num_backing_ops > 0) {
    1759           0 :                 return;
    1760             :         }
    1761             : 
    1762         263 :         if (req->reduce_errno != 0) {
    1763           0 :                 _reduce_vol_complete_req(req, req->reduce_errno);
    1764           0 :                 return;
    1765             :         }
    1766             : 
    1767         263 :         if (req->chunk_is_compressed) {
    1768         263 :                 _reduce_vol_decompress_chunk(req, _read_decompress_done);
    1769             :         } else {
    1770             : 
    1771             :                 /* If the chunk was compressed, the data would have been sent to the
    1772             :                  *  host buffers by the decompression operation, if not we need to memcpy
    1773             :                  *  from req->decomp_buf.
    1774             :                  */
    1775           0 :                 req->copy_after_decompress = true;
    1776           0 :                 req->backing_cb_args.output_size = req->chunk->compressed_size;
    1777             : 
    1778           0 :                 _read_decompress_done(req, 0);
    1779             :         }
    1780             : }
    1781             : 
    1782             : static void
    1783         267 : _reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn)
    1784             : {
    1785         267 :         struct spdk_reduce_vol *vol = req->vol;
    1786             : 
    1787         267 :         req->chunk_map_index = vol->pm_logical_map[req->logical_map_index];
    1788         267 :         assert(req->chunk_map_index != REDUCE_EMPTY_MAP_ENTRY);
    1789             : 
    1790         267 :         req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index);
    1791         267 :         req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size,
    1792         267 :                             vol->params.backing_io_unit_size);
    1793         267 :         req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk);
    1794             : 
    1795         267 :         _issue_backing_ops(req, vol, next_fn, false /* read */);
    1796         267 : }
    1797             : 
    1798             : static bool
    1799         280 : _iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt,
    1800             :                     uint64_t length)
    1801             : {
    1802         280 :         uint64_t size = 0;
    1803             :         int i;
    1804             : 
    1805         280 :         if (iovcnt > REDUCE_MAX_IOVECS) {
    1806           0 :                 return false;
    1807             :         }
    1808             : 
    1809         560 :         for (i = 0; i < iovcnt; i++) {
    1810         280 :                 size += iov[i].iov_len;
    1811             :         }
    1812             : 
    1813         280 :         return size == (length * vol->params.logical_block_size);
    1814             : }
    1815             : 
    1816             : static bool
    1817         281 : _check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index)
    1818             : {
    1819         281 :         struct spdk_reduce_vol_request req;
    1820             : 
    1821         281 :         req.logical_map_index = logical_map_index;
    1822             : 
    1823         281 :         return (NULL != RB_FIND(executing_req_tree, &vol->executing_requests, &req));
    1824             : }
    1825             : 
    1826             : static void
    1827         263 : _start_readv_request(struct spdk_reduce_vol_request *req)
    1828             : {
    1829         263 :         RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
    1830         263 :         _reduce_vol_read_chunk(req, _read_read_done);
    1831         263 : }
    1832             : 
    1833             : void
    1834         264 : spdk_reduce_vol_readv(struct spdk_reduce_vol *vol,
    1835             :                       struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
    1836             :                       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
    1837             : {
    1838             :         struct spdk_reduce_vol_request *req;
    1839             :         uint64_t logical_map_index;
    1840             :         bool overlapped;
    1841             :         int i;
    1842             : 
    1843         264 :         if (length == 0) {
    1844           0 :                 cb_fn(cb_arg, 0);
    1845           0 :                 return;
    1846             :         }
    1847             : 
    1848         264 :         if (_request_spans_chunk_boundary(vol, offset, length)) {
    1849           0 :                 cb_fn(cb_arg, -EINVAL);
    1850           0 :                 return;
    1851             :         }
    1852             : 
    1853         264 :         if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
    1854           0 :                 cb_fn(cb_arg, -EINVAL);
    1855           0 :                 return;
    1856             :         }
    1857             : 
    1858         264 :         logical_map_index = offset / vol->logical_blocks_per_chunk;
    1859         264 :         overlapped = _check_overlap(vol, logical_map_index);
    1860             : 
    1861         264 :         if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
    1862             :                 /*
    1863             :                  * This chunk hasn't been allocated.  So treat the data as all
    1864             :                  * zeroes for this chunk - do the memset and immediately complete
    1865             :                  * the operation.
    1866             :                  */
    1867           2 :                 for (i = 0; i < iovcnt; i++) {
    1868           1 :                         memset(iov[i].iov_base, 0, iov[i].iov_len);
    1869             :                 }
    1870           1 :                 cb_fn(cb_arg, 0);
    1871           1 :                 return;
    1872             :         }
    1873             : 
    1874         263 :         req = TAILQ_FIRST(&vol->free_requests);
    1875         263 :         if (req == NULL) {
    1876           0 :                 cb_fn(cb_arg, -ENOMEM);
    1877           0 :                 return;
    1878             :         }
    1879             : 
    1880         263 :         TAILQ_REMOVE(&vol->free_requests, req, tailq);
    1881         263 :         req->type = REDUCE_IO_READV;
    1882         263 :         req->vol = vol;
    1883         263 :         req->iov = iov;
    1884         263 :         req->iovcnt = iovcnt;
    1885         263 :         req->offset = offset;
    1886         263 :         req->logical_map_index = logical_map_index;
    1887         263 :         req->length = length;
    1888         263 :         req->copy_after_decompress = false;
    1889         263 :         req->cb_fn = cb_fn;
    1890         263 :         req->cb_arg = cb_arg;
    1891         263 :         req->reduce_errno = 0;
    1892             : 
    1893         263 :         if (!overlapped) {
    1894         263 :                 _start_readv_request(req);
    1895             :         } else {
    1896           0 :                 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
    1897             :         }
    1898             : }
    1899             : 
    1900             : static void
    1901          16 : _start_writev_request(struct spdk_reduce_vol_request *req)
    1902             : {
    1903          16 :         struct spdk_reduce_vol *vol = req->vol;
    1904             : 
    1905          16 :         RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
    1906          16 :         if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) {
    1907           6 :                 if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) {
    1908             :                         /* Read old chunk, then overwrite with data from this write
    1909             :                          *  operation.
    1910             :                          */
    1911           4 :                         req->rmw = true;
    1912           4 :                         _reduce_vol_read_chunk(req, _write_read_done);
    1913           4 :                         return;
    1914             :                 }
    1915             :         }
    1916             : 
    1917          12 :         req->rmw = false;
    1918             : 
    1919          12 :         _prepare_compress_chunk(req, true);
    1920          12 :         _reduce_vol_compress_chunk(req, _write_compress_done);
    1921             : }
    1922             : 
    1923             : void
    1924          18 : spdk_reduce_vol_writev(struct spdk_reduce_vol *vol,
    1925             :                        struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length,
    1926             :                        spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
    1927             : {
    1928             :         struct spdk_reduce_vol_request *req;
    1929             :         uint64_t logical_map_index;
    1930             :         bool overlapped;
    1931             : 
    1932          18 :         if (length == 0) {
    1933           0 :                 cb_fn(cb_arg, 0);
    1934           0 :                 return;
    1935             :         }
    1936             : 
    1937          18 :         if (_request_spans_chunk_boundary(vol, offset, length)) {
    1938           2 :                 cb_fn(cb_arg, -EINVAL);
    1939           2 :                 return;
    1940             :         }
    1941             : 
    1942          16 :         if (!_iov_array_is_valid(vol, iov, iovcnt, length)) {
    1943           0 :                 cb_fn(cb_arg, -EINVAL);
    1944           0 :                 return;
    1945             :         }
    1946             : 
    1947          16 :         logical_map_index = offset / vol->logical_blocks_per_chunk;
    1948          16 :         overlapped = _check_overlap(vol, logical_map_index);
    1949             : 
    1950          16 :         req = TAILQ_FIRST(&vol->free_requests);
    1951          16 :         if (req == NULL) {
    1952           0 :                 cb_fn(cb_arg, -ENOMEM);
    1953           0 :                 return;
    1954             :         }
    1955             : 
    1956          16 :         TAILQ_REMOVE(&vol->free_requests, req, tailq);
    1957          16 :         req->type = REDUCE_IO_WRITEV;
    1958          16 :         req->vol = vol;
    1959          16 :         req->iov = iov;
    1960          16 :         req->iovcnt = iovcnt;
    1961          16 :         req->offset = offset;
    1962          16 :         req->logical_map_index = logical_map_index;
    1963          16 :         req->length = length;
    1964          16 :         req->copy_after_decompress = false;
    1965          16 :         req->cb_fn = cb_fn;
    1966          16 :         req->cb_arg = cb_arg;
    1967          16 :         req->reduce_errno = 0;
    1968             : 
    1969          16 :         if (!overlapped) {
    1970          15 :                 _start_writev_request(req);
    1971             :         } else {
    1972           1 :                 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
    1973             :         }
    1974             : }
    1975             : 
    1976             : static void
    1977           1 : _start_unmap_request_full_chunk(void *ctx)
    1978             : {
    1979           1 :         struct spdk_reduce_vol_request *req = ctx;
    1980           1 :         struct spdk_reduce_vol *vol = req->vol;
    1981             :         uint64_t chunk_map_index;
    1982             : 
    1983           1 :         RB_INSERT(executing_req_tree, &req->vol->executing_requests, req);
    1984             : 
    1985           1 :         chunk_map_index = vol->pm_logical_map[req->logical_map_index];
    1986           1 :         if (chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) {
    1987           1 :                 _reduce_vol_reset_chunk(vol, chunk_map_index);
    1988           1 :                 vol->pm_logical_map[req->logical_map_index] = REDUCE_EMPTY_MAP_ENTRY;
    1989           1 :                 _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t));
    1990             :         }
    1991           1 :         _reduce_vol_complete_req(req, 0);
    1992           1 : }
    1993             : 
    1994             : static void
    1995           1 : _reduce_vol_unmap_full_chunk(struct spdk_reduce_vol *vol,
    1996             :                              uint64_t offset, uint64_t length,
    1997             :                              spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
    1998             : {
    1999             :         struct spdk_reduce_vol_request *req;
    2000             :         uint64_t logical_map_index;
    2001             :         bool overlapped;
    2002             : 
    2003           1 :         if (_request_spans_chunk_boundary(vol, offset, length)) {
    2004           0 :                 cb_fn(cb_arg, -EINVAL);
    2005           0 :                 return;
    2006             :         }
    2007             : 
    2008           1 :         logical_map_index = offset / vol->logical_blocks_per_chunk;
    2009           1 :         overlapped = _check_overlap(vol, logical_map_index);
    2010             : 
    2011           1 :         if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) {
    2012             :                 /*
    2013             :                  * This chunk hasn't been allocated. Nothing needs to be done.
    2014             :                  */
    2015           0 :                 cb_fn(cb_arg, 0);
    2016           0 :                 return;
    2017             :         }
    2018             : 
    2019           1 :         req = TAILQ_FIRST(&vol->free_requests);
    2020           1 :         if (req == NULL) {
    2021           0 :                 cb_fn(cb_arg, -ENOMEM);
    2022           0 :                 return;
    2023             :         }
    2024             : 
    2025           1 :         TAILQ_REMOVE(&vol->free_requests, req, tailq);
    2026           1 :         req->type = REDUCE_IO_UNMAP;
    2027           1 :         req->vol = vol;
    2028           1 :         req->iov = NULL;
    2029           1 :         req->iovcnt = 0;
    2030           1 :         req->offset = offset;
    2031           1 :         req->logical_map_index = logical_map_index;
    2032           1 :         req->length = length;
    2033           1 :         req->copy_after_decompress = false;
    2034           1 :         req->cb_fn = cb_fn;
    2035           1 :         req->cb_arg = cb_arg;
    2036           1 :         req->reduce_errno = 0;
    2037             : 
    2038           1 :         if (!overlapped) {
    2039           1 :                 _start_unmap_request_full_chunk(req);
    2040             :         } else {
    2041           0 :                 TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq);
    2042             :         }
    2043             : }
    2044             : 
    2045             : struct unmap_partial_chunk_ctx {
    2046             :         struct spdk_reduce_vol *vol;
    2047             :         struct iovec iov;
    2048             :         spdk_reduce_vol_op_complete cb_fn;
    2049             :         void *cb_arg;
    2050             : };
    2051             : 
    2052             : static void
    2053           1 : _reduce_unmap_partial_chunk_complete(void *_ctx, int reduce_errno)
    2054             : {
    2055           1 :         struct unmap_partial_chunk_ctx *ctx = _ctx;
    2056             : 
    2057           1 :         ctx->cb_fn(ctx->cb_arg, reduce_errno);
    2058           1 :         free(ctx);
    2059           1 : }
    2060             : 
    2061             : static void
    2062           1 : _reduce_vol_unmap_partial_chunk(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length,
    2063             :                                 spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
    2064             : {
    2065             :         struct unmap_partial_chunk_ctx *ctx;
    2066             : 
    2067           1 :         ctx = calloc(1, sizeof(struct unmap_partial_chunk_ctx));
    2068           1 :         if (ctx == NULL) {
    2069           0 :                 cb_fn(cb_arg, -ENOMEM);
    2070           0 :                 return;
    2071             :         }
    2072             : 
    2073           1 :         ctx->vol = vol;
    2074           1 :         ctx->iov.iov_base = g_zero_buf;
    2075           1 :         ctx->iov.iov_len = length * vol->params.logical_block_size;
    2076           1 :         ctx->cb_fn = cb_fn;
    2077           1 :         ctx->cb_arg = cb_arg;
    2078             : 
    2079           1 :         spdk_reduce_vol_writev(vol, &ctx->iov, 1, offset, length, _reduce_unmap_partial_chunk_complete,
    2080             :                                ctx);
    2081             : }
    2082             : 
    2083             : void
    2084           3 : spdk_reduce_vol_unmap(struct spdk_reduce_vol *vol,
    2085             :                       uint64_t offset, uint64_t length,
    2086             :                       spdk_reduce_vol_op_complete cb_fn, void *cb_arg)
    2087             : {
    2088           3 :         if (length < vol->logical_blocks_per_chunk) {
    2089           1 :                 _reduce_vol_unmap_partial_chunk(vol, offset, length, cb_fn, cb_arg);
    2090           2 :         } else if (length == vol->logical_blocks_per_chunk) {
    2091           1 :                 _reduce_vol_unmap_full_chunk(vol, offset, length, cb_fn, cb_arg);
    2092             :         } else {
    2093           1 :                 cb_fn(cb_arg, -EINVAL);
    2094             :         }
    2095           3 : }
    2096             : 
    2097             : const struct spdk_reduce_vol_params *
    2098           0 : spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol)
    2099             : {
    2100           0 :         return &vol->params;
    2101             : }
    2102             : 
    2103             : const char *
    2104           0 : spdk_reduce_vol_get_pm_path(const struct spdk_reduce_vol *vol)
    2105             : {
    2106           0 :         return vol->pm_file.path;
    2107             : }
    2108             : 
    2109             : void
    2110           0 : spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol)
    2111             : {
    2112             :         uint64_t logical_map_size, num_chunks, ttl_chunk_sz;
    2113             :         uint32_t struct_size;
    2114             :         uint64_t chunk_map_size;
    2115             : 
    2116           0 :         SPDK_NOTICELOG("vol info:\n");
    2117           0 :         SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size);
    2118           0 :         SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size);
    2119           0 :         SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size);
    2120           0 :         SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size);
    2121           0 :         num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size);
    2122           0 :         SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks);
    2123           0 :         SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n",
    2124             :                        vol->params.vol_size / vol->params.chunk_size);
    2125           0 :         ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
    2126           0 :                         vol->params.backing_io_unit_size);
    2127           0 :         SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz);
    2128           0 :         struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk);
    2129           0 :         SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size);
    2130             : 
    2131           0 :         SPDK_NOTICELOG("pmem info:\n");
    2132           0 :         SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size);
    2133           0 :         SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf);
    2134           0 :         SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super);
    2135           0 :         SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map);
    2136           0 :         logical_map_size = _get_pm_logical_map_size(vol->params.vol_size,
    2137           0 :                            vol->params.chunk_size);
    2138           0 :         SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size);
    2139           0 :         SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps);
    2140           0 :         chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size,
    2141           0 :                          vol->params.backing_io_unit_size);
    2142           0 :         SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size);
    2143           0 : }
    2144             : 
    2145           1 : SPDK_LOG_REGISTER_COMPONENT(reduce)

Generated by: LCOV version 1.15