LCOV - code coverage report
Current view: top level - module/sock/uring - uring.c (source / functions) Hit Total Coverage
Test: ut_cov_unit.info Lines: 99 1182 8.4 %
Date: 2024-12-15 10:36:05 Functions: 7 52 13.5 %

          Line data    Source code
       1             : /*   SPDX-License-Identifier: BSD-3-Clause
       2             :  *   Copyright (C) 2019 Intel Corporation.
       3             :  *   All rights reserved.
       4             :  */
       5             : 
       6             : #include "spdk/stdinc.h"
       7             : #include "spdk/config.h"
       8             : 
       9             : #include <linux/errqueue.h>
      10             : #include <liburing.h>
      11             : 
      12             : #include "spdk/barrier.h"
      13             : #include "spdk/env.h"
      14             : #include "spdk/log.h"
      15             : #include "spdk/pipe.h"
      16             : #include "spdk/sock.h"
      17             : #include "spdk/string.h"
      18             : #include "spdk/util.h"
      19             : #include "spdk/net.h"
      20             : #include "spdk/file.h"
      21             : 
      22             : #include "spdk_internal/sock.h"
      23             : #include "spdk_internal/assert.h"
      24             : #include "spdk/net.h"
      25             : 
      26             : #define MAX_TMPBUF 1024
      27             : #define PORTNUMLEN 32
      28             : #define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
      29             : #define SPDK_SOCK_CMG_INFO_SIZE (sizeof(struct cmsghdr) + sizeof(struct sock_extended_err))
      30             : 
      31             : enum uring_task_type {
      32             :         URING_TASK_READ = 0,
      33             :         URING_TASK_ERRQUEUE,
      34             :         URING_TASK_WRITE,
      35             :         URING_TASK_CANCEL,
      36             : };
      37             : 
      38             : #if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
      39             : #define SPDK_ZEROCOPY
      40             : #endif
      41             : 
      42             : /* We don't know how big the buffers that the user posts will be, but this
      43             :  * is the maximum we'll ever allow it to receive in a single command.
      44             :  * If the user buffers are smaller, it will just receive less. */
      45             : #define URING_MAX_RECV_SIZE (128 * 1024)
      46             : 
      47             : /* We don't know how many buffers the user will post, but this is the
      48             :  * maximum number we'll take from the pool to post per group. */
      49             : #define URING_BUF_POOL_SIZE 128
      50             : 
      51             : /* We use 1 just so it's not zero and we can validate it's right. */
      52             : #define URING_BUF_GROUP_ID 1
      53             : 
      54             : enum spdk_uring_sock_task_status {
      55             :         SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
      56             :         SPDK_URING_SOCK_TASK_IN_PROCESS,
      57             : };
      58             : 
      59             : struct spdk_uring_task {
      60             :         enum spdk_uring_sock_task_status        status;
      61             :         enum uring_task_type            type;
      62             :         struct spdk_uring_sock                  *sock;
      63             :         struct msghdr                           msg;
      64             :         struct iovec                            iovs[IOV_BATCH_SIZE];
      65             :         int                                     iov_cnt;
      66             :         struct spdk_sock_request                *last_req;
      67             :         bool                                    is_zcopy;
      68             :         STAILQ_ENTRY(spdk_uring_task)           link;
      69             : };
      70             : 
      71             : struct spdk_uring_sock {
      72             :         struct spdk_sock                        base;
      73             :         int                                     fd;
      74             :         uint32_t                                sendmsg_idx;
      75             :         struct spdk_uring_sock_group_impl       *group;
      76             :         STAILQ_HEAD(, spdk_uring_buf_tracker)   recv_stream;
      77             :         size_t                                  recv_offset;
      78             :         struct spdk_uring_task                  write_task;
      79             :         struct spdk_uring_task                  errqueue_task;
      80             :         struct spdk_uring_task                  read_task;
      81             :         struct spdk_uring_task                  cancel_task;
      82             :         struct spdk_pipe                        *recv_pipe;
      83             :         void                                    *recv_buf;
      84             :         int                                     recv_buf_sz;
      85             :         bool                                    zcopy;
      86             :         bool                                    pending_recv;
      87             :         bool                                    pending_group_remove;
      88             :         int                                     zcopy_send_flags;
      89             :         int                                     connection_status;
      90             :         int                                     placement_id;
      91             :         uint8_t                                 reserved[4];
      92             :         uint8_t                                 buf[SPDK_SOCK_CMG_INFO_SIZE];
      93             :         TAILQ_ENTRY(spdk_uring_sock)            link;
      94             :         char                                    interface_name[IFNAMSIZ];
      95             : };
      96             : /* 'struct cmsghdr' is mapped to the buffer 'buf', and while first element
      97             :  * of this control message header has a size of 8 bytes, 'buf'
      98             :  * must be 8-byte aligned.
      99             :  */
     100             : SPDK_STATIC_ASSERT(offsetof(struct spdk_uring_sock, buf) % 8 == 0,
     101             :                    "Incorrect alignment: `buf` must be aligned to 8 bytes");
     102             : 
     103             : TAILQ_HEAD(pending_recv_list, spdk_uring_sock);
     104             : 
     105             : struct spdk_uring_buf_tracker {
     106             :         void                                    *buf;
     107             :         size_t                                  buflen;
     108             :         size_t                                  len;
     109             :         void                                    *ctx;
     110             :         int                                     id;
     111             :         STAILQ_ENTRY(spdk_uring_buf_tracker)    link;
     112             : };
     113             : 
     114             : struct spdk_uring_sock_group_impl {
     115             :         struct spdk_sock_group_impl             base;
     116             :         struct io_uring                         uring;
     117             :         uint32_t                                io_inflight;
     118             :         uint32_t                                io_queued;
     119             :         uint32_t                                io_avail;
     120             :         struct pending_recv_list                pending_recv;
     121             : 
     122             :         struct io_uring_buf_ring                *buf_ring;
     123             :         uint32_t                                buf_ring_count;
     124             :         struct spdk_uring_buf_tracker           *trackers;
     125             :         STAILQ_HEAD(, spdk_uring_buf_tracker)   free_trackers;
     126             : };
     127             : 
     128             : static struct spdk_sock_impl_opts g_spdk_uring_sock_impl_opts = {
     129             :         .recv_buf_size = DEFAULT_SO_RCVBUF_SIZE,
     130             :         .send_buf_size = DEFAULT_SO_SNDBUF_SIZE,
     131             :         .enable_recv_pipe = true,
     132             :         .enable_quickack = false,
     133             :         .enable_placement_id = PLACEMENT_NONE,
     134             :         .enable_zerocopy_send_server = false,
     135             :         .enable_zerocopy_send_client = false,
     136             :         .zerocopy_threshold = 0,
     137             :         .tls_version = 0,
     138             :         .enable_ktls = false,
     139             :         .psk_key = NULL,
     140             :         .psk_identity = NULL
     141             : };
     142             : 
     143             : static struct spdk_sock_map g_map = {
     144             :         .entries = STAILQ_HEAD_INITIALIZER(g_map.entries),
     145             :         .mtx = PTHREAD_MUTEX_INITIALIZER
     146             : };
     147             : 
     148             : __attribute((destructor)) static void
     149           1 : uring_sock_map_cleanup(void)
     150             : {
     151           1 :         spdk_sock_map_cleanup(&g_map);
     152           1 : }
     153             : 
     154             : #define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
     155             : 
     156             : #define __uring_sock(sock) (struct spdk_uring_sock *)sock
     157             : #define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
     158             : 
     159             : static void
     160           0 : uring_sock_copy_impl_opts(struct spdk_sock_impl_opts *dest, const struct spdk_sock_impl_opts *src,
     161             :                           size_t len)
     162             : {
     163             : #define FIELD_OK(field) \
     164             :         offsetof(struct spdk_sock_impl_opts, field) + sizeof(src->field) <= len
     165             : 
     166             : #define SET_FIELD(field) \
     167             :         if (FIELD_OK(field)) { \
     168             :                 dest->field = src->field; \
     169             :         }
     170             : 
     171           0 :         SET_FIELD(recv_buf_size);
     172           0 :         SET_FIELD(send_buf_size);
     173           0 :         SET_FIELD(enable_recv_pipe);
     174           0 :         SET_FIELD(enable_quickack);
     175           0 :         SET_FIELD(enable_placement_id);
     176           0 :         SET_FIELD(enable_zerocopy_send_server);
     177           0 :         SET_FIELD(enable_zerocopy_send_client);
     178           0 :         SET_FIELD(zerocopy_threshold);
     179           0 :         SET_FIELD(tls_version);
     180           0 :         SET_FIELD(enable_ktls);
     181           0 :         SET_FIELD(psk_key);
     182           0 :         SET_FIELD(psk_identity);
     183             : 
     184             : #undef SET_FIELD
     185             : #undef FIELD_OK
     186           0 : }
     187             : 
     188             : static int
     189           0 : uring_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
     190             : {
     191           0 :         if (!opts || !len) {
     192           0 :                 errno = EINVAL;
     193           0 :                 return -1;
     194             :         }
     195             : 
     196           0 :         assert(sizeof(*opts) >= *len);
     197           0 :         memset(opts, 0, *len);
     198             : 
     199           0 :         uring_sock_copy_impl_opts(opts, &g_spdk_uring_sock_impl_opts, *len);
     200           0 :         *len = spdk_min(*len, sizeof(g_spdk_uring_sock_impl_opts));
     201             : 
     202           0 :         return 0;
     203           0 : }
     204             : 
     205             : static int
     206           0 : uring_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
     207             : {
     208           0 :         if (!opts) {
     209           0 :                 errno = EINVAL;
     210           0 :                 return -1;
     211             :         }
     212             : 
     213           0 :         assert(sizeof(*opts) >= len);
     214           0 :         uring_sock_copy_impl_opts(&g_spdk_uring_sock_impl_opts, opts, len);
     215             : 
     216           0 :         return 0;
     217           0 : }
     218             : 
     219             : static void
     220           0 : uring_opts_get_impl_opts(const struct spdk_sock_opts *opts, struct spdk_sock_impl_opts *dest)
     221             : {
     222             :         /* Copy the default impl_opts first to cover cases when user's impl_opts is smaller */
     223           0 :         memcpy(dest, &g_spdk_uring_sock_impl_opts, sizeof(*dest));
     224             : 
     225           0 :         if (opts->impl_opts != NULL) {
     226           0 :                 assert(sizeof(*dest) >= opts->impl_opts_size);
     227           0 :                 uring_sock_copy_impl_opts(dest, opts->impl_opts, opts->impl_opts_size);
     228           0 :         }
     229           0 : }
     230             : 
     231             : static int
     232           0 : uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
     233             :                    char *caddr, int clen, uint16_t *cport)
     234             : {
     235           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     236             : 
     237           0 :         assert(sock != NULL);
     238           0 :         return spdk_net_getaddr(sock->fd, saddr, slen, sport, caddr, clen, cport);
     239           0 : }
     240             : 
     241             : static const char *
     242           0 : uring_sock_get_interface_name(struct spdk_sock *_sock)
     243             : {
     244           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     245           0 :         char saddr[64];
     246           0 :         int rc;
     247             : 
     248           0 :         rc = spdk_net_getaddr(sock->fd, saddr, sizeof(saddr), NULL, NULL, 0, NULL);
     249           0 :         if (rc != 0) {
     250           0 :                 return NULL;
     251             :         }
     252             : 
     253           0 :         rc = spdk_net_get_interface_name(saddr, sock->interface_name,
     254             :                                          sizeof(sock->interface_name));
     255           0 :         if (rc != 0) {
     256           0 :                 return NULL;
     257             :         }
     258             : 
     259           0 :         return sock->interface_name;
     260           0 : }
     261             : 
     262             : static int32_t
     263           0 : uring_sock_get_numa_id(struct spdk_sock *sock)
     264             : {
     265           0 :         const char *interface_name;
     266           0 :         uint32_t numa_id;
     267           0 :         int rc;
     268             : 
     269           0 :         interface_name = uring_sock_get_interface_name(sock);
     270           0 :         if (interface_name == NULL) {
     271           0 :                 return SPDK_ENV_NUMA_ID_ANY;
     272             :         }
     273             : 
     274           0 :         rc = spdk_read_sysfs_attribute_uint32(&numa_id,
     275           0 :                                               "/sys/class/net/%s/device/numa_node", interface_name);
     276           0 :         if (rc == 0 && numa_id <= INT32_MAX) {
     277           0 :                 return (int32_t)numa_id;
     278             :         } else {
     279           0 :                 return SPDK_ENV_NUMA_ID_ANY;
     280             :         }
     281           0 : }
     282             : 
     283             : enum uring_sock_create_type {
     284             :         SPDK_SOCK_CREATE_LISTEN,
     285             :         SPDK_SOCK_CREATE_CONNECT,
     286             : };
     287             : 
     288             : static int
     289           0 : uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
     290             : {
     291           0 :         uint8_t *new_buf;
     292           0 :         struct spdk_pipe *new_pipe;
     293           0 :         struct iovec siov[2];
     294           0 :         struct iovec diov[2];
     295           0 :         int sbytes;
     296           0 :         ssize_t bytes;
     297           0 :         int rc;
     298             : 
     299           0 :         if (sock->recv_buf_sz == sz) {
     300           0 :                 return 0;
     301             :         }
     302             : 
     303             :         /* If the new size is 0, just free the pipe */
     304           0 :         if (sz == 0) {
     305           0 :                 spdk_pipe_destroy(sock->recv_pipe);
     306           0 :                 free(sock->recv_buf);
     307           0 :                 sock->recv_pipe = NULL;
     308           0 :                 sock->recv_buf = NULL;
     309           0 :                 return 0;
     310           0 :         } else if (sz < MIN_SOCK_PIPE_SIZE) {
     311           0 :                 SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
     312           0 :                 return -1;
     313             :         }
     314             : 
     315             :         /* Round up to next 64 byte multiple */
     316           0 :         rc = posix_memalign((void **)&new_buf, 64, sz);
     317           0 :         if (rc != 0) {
     318           0 :                 SPDK_ERRLOG("socket recv buf allocation failed\n");
     319           0 :                 return -ENOMEM;
     320             :         }
     321           0 :         memset(new_buf, 0, sz);
     322             : 
     323           0 :         new_pipe = spdk_pipe_create(new_buf, sz);
     324           0 :         if (new_pipe == NULL) {
     325           0 :                 SPDK_ERRLOG("socket pipe allocation failed\n");
     326           0 :                 free(new_buf);
     327           0 :                 return -ENOMEM;
     328             :         }
     329             : 
     330           0 :         if (sock->recv_pipe != NULL) {
     331             :                 /* Pull all of the data out of the old pipe */
     332           0 :                 sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
     333           0 :                 if (sbytes > sz) {
     334             :                         /* Too much data to fit into the new pipe size */
     335           0 :                         spdk_pipe_destroy(new_pipe);
     336           0 :                         free(new_buf);
     337           0 :                         return -EINVAL;
     338             :                 }
     339             : 
     340           0 :                 sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
     341           0 :                 assert(sbytes == sz);
     342             : 
     343           0 :                 bytes = spdk_iovcpy(siov, 2, diov, 2);
     344           0 :                 spdk_pipe_writer_advance(new_pipe, bytes);
     345             : 
     346           0 :                 spdk_pipe_destroy(sock->recv_pipe);
     347           0 :                 free(sock->recv_buf);
     348           0 :         }
     349             : 
     350           0 :         sock->recv_buf_sz = sz;
     351           0 :         sock->recv_buf = new_buf;
     352           0 :         sock->recv_pipe = new_pipe;
     353             : 
     354           0 :         return 0;
     355           0 : }
     356             : 
     357             : static int
     358           0 : uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
     359             : {
     360           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     361           0 :         int min_size;
     362           0 :         int rc;
     363             : 
     364           0 :         assert(sock != NULL);
     365             : 
     366           0 :         if (_sock->impl_opts.enable_recv_pipe) {
     367           0 :                 rc = uring_sock_alloc_pipe(sock, sz);
     368           0 :                 if (rc) {
     369           0 :                         SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
     370           0 :                         return rc;
     371             :                 }
     372           0 :         }
     373             : 
     374             :         /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE and
     375             :          * g_spdk_uring_sock_impl_opts.recv_buf_size. */
     376           0 :         min_size = spdk_max(MIN_SO_RCVBUF_SIZE, g_spdk_uring_sock_impl_opts.recv_buf_size);
     377             : 
     378           0 :         if (sz < min_size) {
     379           0 :                 sz = min_size;
     380           0 :         }
     381             : 
     382           0 :         rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
     383           0 :         if (rc < 0) {
     384           0 :                 return rc;
     385             :         }
     386             : 
     387           0 :         _sock->impl_opts.recv_buf_size = sz;
     388             : 
     389           0 :         return 0;
     390           0 : }
     391             : 
     392             : static int
     393           0 : uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
     394             : {
     395           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     396           0 :         int min_size;
     397           0 :         int rc;
     398             : 
     399           0 :         assert(sock != NULL);
     400             : 
     401             :         /* Set kernel buffer size to be at least MIN_SO_SNDBUF_SIZE and
     402             :          * g_spdk_uring_sock_impl_opts.seend_buf_size. */
     403           0 :         min_size = spdk_max(MIN_SO_SNDBUF_SIZE, g_spdk_uring_sock_impl_opts.send_buf_size);
     404             : 
     405           0 :         if (sz < min_size) {
     406           0 :                 sz = min_size;
     407           0 :         }
     408             : 
     409           0 :         rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
     410           0 :         if (rc < 0) {
     411           0 :                 return rc;
     412             :         }
     413             : 
     414           0 :         _sock->impl_opts.send_buf_size = sz;
     415             : 
     416           0 :         return 0;
     417           0 : }
     418             : 
     419             : static struct spdk_uring_sock *
     420           0 : uring_sock_alloc(int fd, struct spdk_sock_impl_opts *impl_opts, bool enable_zero_copy)
     421             : {
     422           0 :         struct spdk_uring_sock *sock;
     423             : #if defined(__linux__)
     424           0 :         int flag;
     425           0 :         int rc;
     426             : #endif
     427             : 
     428           0 :         sock = calloc(1, sizeof(*sock));
     429           0 :         if (sock == NULL) {
     430           0 :                 SPDK_ERRLOG("sock allocation failed\n");
     431           0 :                 return NULL;
     432             :         }
     433             : 
     434           0 :         sock->fd = fd;
     435           0 :         memcpy(&sock->base.impl_opts, impl_opts, sizeof(*impl_opts));
     436             : 
     437           0 :         STAILQ_INIT(&sock->recv_stream);
     438             : 
     439             : #if defined(__linux__)
     440           0 :         flag = 1;
     441             : 
     442           0 :         if (sock->base.impl_opts.enable_quickack) {
     443           0 :                 rc = setsockopt(sock->fd, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(flag));
     444           0 :                 if (rc != 0) {
     445           0 :                         SPDK_ERRLOG("quickack was failed to set\n");
     446           0 :                 }
     447           0 :         }
     448             : 
     449           0 :         spdk_sock_get_placement_id(sock->fd, sock->base.impl_opts.enable_placement_id,
     450           0 :                                    &sock->placement_id);
     451             : #ifdef SPDK_ZEROCOPY
     452             :         /* Try to turn on zero copy sends */
     453           0 :         flag = 1;
     454             : 
     455           0 :         if (enable_zero_copy) {
     456           0 :                 rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
     457           0 :                 if (rc == 0) {
     458           0 :                         sock->zcopy = true;
     459           0 :                         sock->zcopy_send_flags = MSG_ZEROCOPY;
     460           0 :                 }
     461           0 :         }
     462             : #endif
     463             : #endif
     464             : 
     465           0 :         return sock;
     466           0 : }
     467             : 
     468             : static struct spdk_sock *
     469           0 : uring_sock_create(const char *ip, int port,
     470             :                   enum uring_sock_create_type type,
     471             :                   struct spdk_sock_opts *opts)
     472             : {
     473           0 :         struct spdk_uring_sock *sock;
     474           0 :         struct spdk_sock_impl_opts impl_opts;
     475           0 :         char buf[MAX_TMPBUF];
     476           0 :         char portnum[PORTNUMLEN];
     477           0 :         char *p;
     478           0 :         const char *src_addr;
     479           0 :         uint16_t src_port;
     480           0 :         struct addrinfo hints, *res, *res0, *src_ai;
     481           0 :         int fd, flag;
     482           0 :         int val = 1;
     483           0 :         int rc;
     484           0 :         bool enable_zcopy_impl_opts = false;
     485           0 :         bool enable_zcopy_user_opts = true;
     486             : 
     487           0 :         assert(opts != NULL);
     488           0 :         uring_opts_get_impl_opts(opts, &impl_opts);
     489             : 
     490           0 :         if (ip == NULL) {
     491           0 :                 return NULL;
     492             :         }
     493           0 :         if (ip[0] == '[') {
     494           0 :                 snprintf(buf, sizeof(buf), "%s", ip + 1);
     495           0 :                 p = strchr(buf, ']');
     496           0 :                 if (p != NULL) {
     497           0 :                         *p = '\0';
     498           0 :                 }
     499           0 :                 ip = (const char *) &buf[0];
     500           0 :         }
     501             : 
     502           0 :         snprintf(portnum, sizeof portnum, "%d", port);
     503           0 :         memset(&hints, 0, sizeof hints);
     504           0 :         hints.ai_family = PF_UNSPEC;
     505           0 :         hints.ai_socktype = SOCK_STREAM;
     506           0 :         hints.ai_flags = AI_NUMERICSERV;
     507           0 :         hints.ai_flags |= AI_PASSIVE;
     508           0 :         hints.ai_flags |= AI_NUMERICHOST;
     509           0 :         rc = getaddrinfo(ip, portnum, &hints, &res0);
     510           0 :         if (rc != 0) {
     511           0 :                 SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n", gai_strerror(rc), rc);
     512           0 :                 return NULL;
     513             :         }
     514             : 
     515             :         /* try listen */
     516           0 :         fd = -1;
     517           0 :         for (res = res0; res != NULL; res = res->ai_next) {
     518             : retry:
     519           0 :                 fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
     520           0 :                 if (fd < 0) {
     521             :                         /* error */
     522           0 :                         continue;
     523             :                 }
     524             : 
     525           0 :                 val = impl_opts.recv_buf_size;
     526           0 :                 rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
     527           0 :                 if (rc) {
     528             :                         /* Not fatal */
     529           0 :                 }
     530             : 
     531           0 :                 val = impl_opts.send_buf_size;
     532           0 :                 rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
     533           0 :                 if (rc) {
     534             :                         /* Not fatal */
     535           0 :                 }
     536             : 
     537           0 :                 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
     538           0 :                 if (rc != 0) {
     539           0 :                         close(fd);
     540           0 :                         fd = -1;
     541             :                         /* error */
     542           0 :                         continue;
     543             :                 }
     544           0 :                 rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
     545           0 :                 if (rc != 0) {
     546           0 :                         close(fd);
     547           0 :                         fd = -1;
     548             :                         /* error */
     549           0 :                         continue;
     550             :                 }
     551             : 
     552           0 :                 if (opts->ack_timeout) {
     553             : #if defined(__linux__)
     554           0 :                         val = opts->ack_timeout;
     555           0 :                         rc = setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &val, sizeof val);
     556           0 :                         if (rc != 0) {
     557           0 :                                 close(fd);
     558           0 :                                 fd = -1;
     559             :                                 /* error */
     560           0 :                                 continue;
     561             :                         }
     562             : #else
     563             :                         SPDK_WARNLOG("TCP_USER_TIMEOUT is not supported.\n");
     564             : #endif
     565           0 :                 }
     566             : 
     567             : 
     568             : 
     569             : #if defined(SO_PRIORITY)
     570           0 :                 if (opts != NULL && opts->priority) {
     571           0 :                         rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
     572           0 :                         if (rc != 0) {
     573           0 :                                 close(fd);
     574           0 :                                 fd = -1;
     575             :                                 /* error */
     576           0 :                                 continue;
     577             :                         }
     578           0 :                 }
     579             : #endif
     580           0 :                 if (res->ai_family == AF_INET6) {
     581           0 :                         rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
     582           0 :                         if (rc != 0) {
     583           0 :                                 close(fd);
     584           0 :                                 fd = -1;
     585             :                                 /* error */
     586           0 :                                 continue;
     587             :                         }
     588           0 :                 }
     589             : 
     590           0 :                 if (type == SPDK_SOCK_CREATE_LISTEN) {
     591           0 :                         rc = bind(fd, res->ai_addr, res->ai_addrlen);
     592           0 :                         if (rc != 0) {
     593           0 :                                 SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
     594           0 :                                 switch (errno) {
     595             :                                 case EINTR:
     596             :                                         /* interrupted? */
     597           0 :                                         close(fd);
     598           0 :                                         goto retry;
     599             :                                 case EADDRNOTAVAIL:
     600           0 :                                         SPDK_ERRLOG("IP address %s not available. "
     601             :                                                     "Verify IP address in config file "
     602             :                                                     "and make sure setup script is "
     603             :                                                     "run before starting spdk app.\n", ip);
     604             :                                 /* FALLTHROUGH */
     605             :                                 default:
     606             :                                         /* try next family */
     607           0 :                                         close(fd);
     608           0 :                                         fd = -1;
     609           0 :                                         continue;
     610             :                                 }
     611             :                         }
     612             :                         /* bind OK */
     613           0 :                         rc = listen(fd, 512);
     614           0 :                         if (rc != 0) {
     615           0 :                                 SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
     616           0 :                                 close(fd);
     617           0 :                                 fd = -1;
     618           0 :                                 break;
     619             :                         }
     620             : 
     621           0 :                         flag = fcntl(fd, F_GETFL);
     622           0 :                         if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
     623           0 :                                 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
     624           0 :                                 close(fd);
     625           0 :                                 fd = -1;
     626           0 :                                 break;
     627             :                         }
     628             : 
     629           0 :                         enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_server;
     630           0 :                 } else if (type == SPDK_SOCK_CREATE_CONNECT) {
     631           0 :                         src_addr = SPDK_GET_FIELD(opts, src_addr, NULL, opts->opts_size);
     632           0 :                         src_port = SPDK_GET_FIELD(opts, src_port, 0, opts->opts_size);
     633           0 :                         if (src_addr != NULL || src_port != 0) {
     634           0 :                                 snprintf(portnum, sizeof(portnum), "%"PRIu16, src_port);
     635           0 :                                 memset(&hints, 0, sizeof hints);
     636           0 :                                 hints.ai_family = AF_UNSPEC;
     637           0 :                                 hints.ai_socktype = SOCK_STREAM;
     638           0 :                                 hints.ai_flags = AI_NUMERICSERV | AI_NUMERICHOST | AI_PASSIVE;
     639           0 :                                 rc = getaddrinfo(src_addr, src_port > 0 ? portnum : NULL,
     640             :                                                  &hints, &src_ai);
     641           0 :                                 if (rc != 0 || src_ai == NULL) {
     642           0 :                                         SPDK_ERRLOG("getaddrinfo() failed %s (%d)\n",
     643             :                                                     rc != 0 ? gai_strerror(rc) : "", rc);
     644           0 :                                         close(fd);
     645           0 :                                         fd = -1;
     646           0 :                                         break;
     647             :                                 }
     648           0 :                                 rc = bind(fd, src_ai->ai_addr, src_ai->ai_addrlen);
     649           0 :                                 if (rc != 0) {
     650           0 :                                         SPDK_ERRLOG("bind() failed errno %d (%s:%s)\n", errno,
     651             :                                                     src_addr ? src_addr : "", portnum);
     652           0 :                                         close(fd);
     653           0 :                                         fd = -1;
     654           0 :                                         freeaddrinfo(src_ai);
     655           0 :                                         src_ai = NULL;
     656           0 :                                         break;
     657             :                                 }
     658           0 :                                 freeaddrinfo(src_ai);
     659           0 :                                 src_ai = NULL;
     660           0 :                         }
     661             : 
     662           0 :                         rc = connect(fd, res->ai_addr, res->ai_addrlen);
     663           0 :                         if (rc != 0) {
     664           0 :                                 SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
     665             :                                 /* try next family */
     666           0 :                                 close(fd);
     667           0 :                                 fd = -1;
     668           0 :                                 continue;
     669             :                         }
     670             : 
     671           0 :                         flag = fcntl(fd, F_GETFL);
     672           0 :                         if (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0) {
     673           0 :                                 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
     674           0 :                                 close(fd);
     675           0 :                                 fd = -1;
     676           0 :                                 break;
     677             :                         }
     678             : 
     679           0 :                         enable_zcopy_impl_opts = impl_opts.enable_zerocopy_send_client;
     680           0 :                 }
     681           0 :                 break;
     682             :         }
     683           0 :         freeaddrinfo(res0);
     684             : 
     685           0 :         if (fd < 0) {
     686           0 :                 return NULL;
     687             :         }
     688             : 
     689           0 :         enable_zcopy_user_opts = opts->zcopy && !spdk_net_is_loopback(fd);
     690           0 :         sock = uring_sock_alloc(fd, &impl_opts, enable_zcopy_user_opts && enable_zcopy_impl_opts);
     691           0 :         if (sock == NULL) {
     692           0 :                 SPDK_ERRLOG("sock allocation failed\n");
     693           0 :                 close(fd);
     694           0 :                 return NULL;
     695             :         }
     696             : 
     697           0 :         return &sock->base;
     698           0 : }
     699             : 
     700             : static struct spdk_sock *
     701           0 : uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
     702             : {
     703           0 :         if (spdk_interrupt_mode_is_enabled()) {
     704           0 :                 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
     705           0 :                 return NULL;
     706             :         }
     707             : 
     708           0 :         return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
     709           0 : }
     710             : 
     711             : static struct spdk_sock *
     712           0 : uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
     713             : {
     714           0 :         if (spdk_interrupt_mode_is_enabled()) {
     715           0 :                 SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
     716           0 :                 return NULL;
     717             :         }
     718             : 
     719           0 :         return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
     720           0 : }
     721             : 
     722             : static struct spdk_sock *
     723           0 : uring_sock_accept(struct spdk_sock *_sock)
     724             : {
     725           0 :         struct spdk_uring_sock          *sock = __uring_sock(_sock);
     726           0 :         struct sockaddr_storage         sa;
     727           0 :         socklen_t                       salen;
     728           0 :         int                             rc, fd;
     729           0 :         struct spdk_uring_sock          *new_sock;
     730           0 :         int                             flag;
     731             : 
     732           0 :         memset(&sa, 0, sizeof(sa));
     733           0 :         salen = sizeof(sa);
     734             : 
     735           0 :         assert(sock != NULL);
     736             : 
     737           0 :         rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
     738             : 
     739           0 :         if (rc == -1) {
     740           0 :                 return NULL;
     741             :         }
     742             : 
     743           0 :         fd = rc;
     744             : 
     745           0 :         flag = fcntl(fd, F_GETFL);
     746           0 :         if ((flag & O_NONBLOCK) && (fcntl(fd, F_SETFL, flag & ~O_NONBLOCK) < 0)) {
     747           0 :                 SPDK_ERRLOG("fcntl can't set blocking mode for socket, fd: %d (%d)\n", fd, errno);
     748           0 :                 close(fd);
     749           0 :                 return NULL;
     750             :         }
     751             : 
     752             : #if defined(SO_PRIORITY)
     753             :         /* The priority is not inherited, so call this function again */
     754           0 :         if (sock->base.opts.priority) {
     755           0 :                 rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
     756           0 :                 if (rc != 0) {
     757           0 :                         close(fd);
     758           0 :                         return NULL;
     759             :                 }
     760           0 :         }
     761             : #endif
     762             : 
     763           0 :         new_sock = uring_sock_alloc(fd, &sock->base.impl_opts, sock->zcopy);
     764           0 :         if (new_sock == NULL) {
     765           0 :                 close(fd);
     766           0 :                 return NULL;
     767             :         }
     768             : 
     769           0 :         return &new_sock->base;
     770           0 : }
     771             : 
     772             : static int
     773           0 : uring_sock_close(struct spdk_sock *_sock)
     774             : {
     775           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     776             : 
     777           0 :         assert(TAILQ_EMPTY(&_sock->pending_reqs));
     778           0 :         assert(sock->group == NULL);
     779             : 
     780             :         /* If the socket fails to close, the best choice is to
     781             :          * leak the fd but continue to free the rest of the sock
     782             :          * memory. */
     783           0 :         close(sock->fd);
     784             : 
     785           0 :         spdk_pipe_destroy(sock->recv_pipe);
     786           0 :         free(sock->recv_buf);
     787           0 :         free(sock);
     788             : 
     789           0 :         return 0;
     790           0 : }
     791             : 
     792             : static ssize_t
     793           0 : uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
     794             : {
     795           0 :         struct iovec siov[2];
     796           0 :         int sbytes;
     797           0 :         ssize_t bytes;
     798           0 :         struct spdk_uring_sock_group_impl *group;
     799             : 
     800           0 :         sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
     801           0 :         if (sbytes < 0) {
     802           0 :                 errno = EINVAL;
     803           0 :                 return -1;
     804           0 :         } else if (sbytes == 0) {
     805           0 :                 errno = EAGAIN;
     806           0 :                 return -1;
     807             :         }
     808             : 
     809           0 :         bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
     810             : 
     811           0 :         if (bytes == 0) {
     812             :                 /* The only way this happens is if diov is 0 length */
     813           0 :                 errno = EINVAL;
     814           0 :                 return -1;
     815             :         }
     816             : 
     817           0 :         spdk_pipe_reader_advance(sock->recv_pipe, bytes);
     818             : 
     819             :         /* If we drained the pipe, take it off the level-triggered list */
     820           0 :         if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
     821           0 :                 group = __uring_group_impl(sock->base.group_impl);
     822           0 :                 TAILQ_REMOVE(&group->pending_recv, sock, link);
     823           0 :                 sock->pending_recv = false;
     824           0 :         }
     825             : 
     826           0 :         return bytes;
     827           0 : }
     828             : 
     829             : static inline ssize_t
     830           0 : sock_readv(int fd, struct iovec *iov, int iovcnt)
     831             : {
     832           0 :         struct msghdr msg = {
     833           0 :                 .msg_iov = iov,
     834           0 :                 .msg_iovlen = iovcnt,
     835             :         };
     836             : 
     837           0 :         return recvmsg(fd, &msg, MSG_DONTWAIT);
     838           0 : }
     839             : 
     840             : static inline ssize_t
     841           0 : uring_sock_read(struct spdk_uring_sock *sock)
     842             : {
     843           0 :         struct iovec iov[2];
     844           0 :         int bytes;
     845           0 :         struct spdk_uring_sock_group_impl *group;
     846             : 
     847           0 :         bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
     848             : 
     849           0 :         if (bytes > 0) {
     850           0 :                 bytes = sock_readv(sock->fd, iov, 2);
     851           0 :                 if (bytes > 0) {
     852           0 :                         spdk_pipe_writer_advance(sock->recv_pipe, bytes);
     853           0 :                         if (sock->base.group_impl && !sock->pending_recv) {
     854           0 :                                 group = __uring_group_impl(sock->base.group_impl);
     855           0 :                                 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
     856           0 :                                 sock->pending_recv = true;
     857           0 :                         }
     858           0 :                 }
     859           0 :         }
     860             : 
     861           0 :         return bytes;
     862           0 : }
     863             : 
     864             : static int
     865           0 : uring_sock_recv_next(struct spdk_sock *_sock, void **_buf, void **ctx)
     866             : {
     867           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     868           0 :         struct spdk_uring_sock_group_impl *group;
     869           0 :         struct spdk_uring_buf_tracker *tr;
     870             : 
     871           0 :         if (sock->connection_status < 0) {
     872           0 :                 errno = -sock->connection_status;
     873           0 :                 return -1;
     874             :         }
     875             : 
     876           0 :         if (sock->recv_pipe != NULL) {
     877           0 :                 errno = ENOTSUP;
     878           0 :                 return -1;
     879             :         }
     880             : 
     881           0 :         group = __uring_group_impl(_sock->group_impl);
     882             : 
     883           0 :         tr = STAILQ_FIRST(&sock->recv_stream);
     884           0 :         if (tr == NULL) {
     885           0 :                 if (sock->group->buf_ring_count > 0) {
     886             :                         /* There are buffers posted, but data hasn't arrived. */
     887           0 :                         errno = EAGAIN;
     888           0 :                 } else {
     889             :                         /* There are no buffers posted, so this won't ever
     890             :                          * make forward progress. */
     891           0 :                         errno = ENOBUFS;
     892             :                 }
     893           0 :                 return -1;
     894             :         }
     895           0 :         assert(sock->pending_recv == true);
     896           0 :         assert(tr->buf != NULL);
     897             : 
     898           0 :         *_buf = tr->buf + sock->recv_offset;
     899           0 :         *ctx = tr->ctx;
     900             : 
     901           0 :         STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
     902           0 :         STAILQ_INSERT_HEAD(&group->free_trackers, tr, link);
     903             : 
     904           0 :         if (STAILQ_EMPTY(&sock->recv_stream)) {
     905           0 :                 sock->pending_recv = false;
     906           0 :                 TAILQ_REMOVE(&group->pending_recv, sock, link);
     907           0 :         }
     908             : 
     909           0 :         return tr->len - sock->recv_offset;
     910           0 : }
     911             : 
     912             : static ssize_t
     913           0 : uring_sock_readv_no_pipe(struct spdk_sock *_sock, struct iovec *iovs, int iovcnt)
     914             : {
     915           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     916           0 :         struct spdk_uring_buf_tracker *tr;
     917           0 :         struct iovec iov;
     918           0 :         ssize_t total, len;
     919           0 :         int i;
     920             : 
     921           0 :         if (sock->connection_status < 0) {
     922           0 :                 errno = -sock->connection_status;
     923           0 :                 return -1;
     924             :         }
     925             : 
     926           0 :         if (_sock->group_impl == NULL) {
     927             :                 /* If not in a group just read from the socket the regular way. */
     928           0 :                 return sock_readv(sock->fd, iovs, iovcnt);
     929             :         }
     930             : 
     931           0 :         if (STAILQ_EMPTY(&sock->recv_stream)) {
     932           0 :                 if (sock->group->buf_ring_count == 0) {
     933             :                         /* If the user hasn't posted any buffers, read from the socket
     934             :                          * directly. */
     935             : 
     936           0 :                         if (sock->pending_recv) {
     937           0 :                                 sock->pending_recv = false;
     938           0 :                                 TAILQ_REMOVE(&(__uring_group_impl(_sock->group_impl))->pending_recv, sock, link);
     939           0 :                         }
     940             : 
     941           0 :                         return sock_readv(sock->fd, iovs, iovcnt);
     942             :                 }
     943             : 
     944           0 :                 errno = EAGAIN;
     945           0 :                 return -1;
     946             :         }
     947             : 
     948           0 :         total = 0;
     949           0 :         for (i = 0; i < iovcnt; i++) {
     950             :                 /* Copy to stack so we can change it */
     951           0 :                 iov = iovs[i];
     952             : 
     953           0 :                 tr = STAILQ_FIRST(&sock->recv_stream);
     954           0 :                 while (tr != NULL) {
     955           0 :                         len = spdk_min(iov.iov_len, tr->len - sock->recv_offset);
     956           0 :                         memcpy(iov.iov_base, tr->buf + sock->recv_offset, len);
     957             : 
     958           0 :                         total += len;
     959           0 :                         sock->recv_offset += len;
     960           0 :                         iov.iov_base += len;
     961           0 :                         iov.iov_len -= len;
     962             : 
     963           0 :                         if (sock->recv_offset == tr->len) {
     964           0 :                                 sock->recv_offset = 0;
     965           0 :                                 STAILQ_REMOVE_HEAD(&sock->recv_stream, link);
     966           0 :                                 STAILQ_INSERT_HEAD(&sock->group->free_trackers, tr, link);
     967           0 :                                 spdk_sock_group_provide_buf(sock->group->base.group, tr->buf, tr->buflen, tr->ctx);
     968           0 :                                 tr = STAILQ_FIRST(&sock->recv_stream);
     969           0 :                         }
     970             : 
     971           0 :                         if (iov.iov_len == 0) {
     972           0 :                                 break;
     973             :                         }
     974             :                 }
     975           0 :         }
     976             : 
     977           0 :         if (STAILQ_EMPTY(&sock->recv_stream)) {
     978           0 :                 struct spdk_uring_sock_group_impl *group;
     979             : 
     980           0 :                 group = __uring_group_impl(_sock->group_impl);
     981           0 :                 sock->pending_recv = false;
     982           0 :                 TAILQ_REMOVE(&group->pending_recv, sock, link);
     983           0 :         }
     984             : 
     985           0 :         assert(total > 0);
     986           0 :         return total;
     987           0 : }
     988             : 
     989             : static ssize_t
     990           0 : uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
     991             : {
     992           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
     993           0 :         int rc, i;
     994           0 :         size_t len;
     995             : 
     996           0 :         if (sock->connection_status < 0) {
     997           0 :                 errno = -sock->connection_status;
     998           0 :                 return -1;
     999             :         }
    1000             : 
    1001           0 :         if (sock->recv_pipe == NULL) {
    1002           0 :                 return uring_sock_readv_no_pipe(_sock, iov, iovcnt);
    1003             :         }
    1004             : 
    1005           0 :         len = 0;
    1006           0 :         for (i = 0; i < iovcnt; i++) {
    1007           0 :                 len += iov[i].iov_len;
    1008           0 :         }
    1009             : 
    1010           0 :         if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
    1011             :                 /* If the user is receiving a sufficiently large amount of data,
    1012             :                  * receive directly to their buffers. */
    1013           0 :                 if (len >= MIN_SOCK_PIPE_SIZE) {
    1014           0 :                         return sock_readv(sock->fd, iov, iovcnt);
    1015             :                 }
    1016             : 
    1017             :                 /* Otherwise, do a big read into our pipe */
    1018           0 :                 rc = uring_sock_read(sock);
    1019           0 :                 if (rc <= 0) {
    1020           0 :                         return rc;
    1021             :                 }
    1022           0 :         }
    1023             : 
    1024           0 :         return uring_sock_recv_from_pipe(sock, iov, iovcnt);
    1025           0 : }
    1026             : 
    1027             : static ssize_t
    1028           0 : uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
    1029             : {
    1030           0 :         struct iovec iov[1];
    1031             : 
    1032           0 :         iov[0].iov_base = buf;
    1033           0 :         iov[0].iov_len = len;
    1034             : 
    1035           0 :         return uring_sock_readv(sock, iov, 1);
    1036           0 : }
    1037             : 
    1038             : static ssize_t
    1039           0 : uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
    1040             : {
    1041           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1042           0 :         struct msghdr msg = {
    1043           0 :                 .msg_iov = iov,
    1044           0 :                 .msg_iovlen = iovcnt,
    1045             :         };
    1046             : 
    1047           0 :         if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
    1048           0 :                 errno = EAGAIN;
    1049           0 :                 return -1;
    1050             :         }
    1051             : 
    1052           0 :         return sendmsg(sock->fd, &msg, MSG_DONTWAIT);
    1053           0 : }
    1054             : 
    1055             : static ssize_t
    1056          13 : sock_request_advance_offset(struct spdk_sock_request *req, ssize_t rc)
    1057             : {
    1058          13 :         unsigned int offset;
    1059          13 :         size_t len;
    1060          13 :         int i;
    1061             : 
    1062          13 :         offset = req->internal.offset;
    1063          37 :         for (i = 0; i < req->iovcnt; i++) {
    1064             :                 /* Advance by the offset first */
    1065          28 :                 if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
    1066           2 :                         offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
    1067           2 :                         continue;
    1068             :                 }
    1069             : 
    1070             :                 /* Calculate the remaining length of this element */
    1071          26 :                 len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
    1072             : 
    1073          26 :                 if (len > (size_t)rc) {
    1074           4 :                         req->internal.offset += rc;
    1075           4 :                         return -1;
    1076             :                 }
    1077             : 
    1078          22 :                 offset = 0;
    1079          22 :                 req->internal.offset += len;
    1080          22 :                 rc -= len;
    1081          22 :         }
    1082             : 
    1083           9 :         return rc;
    1084          13 : }
    1085             : 
    1086             : static int
    1087          11 : sock_complete_write_reqs(struct spdk_sock *_sock, ssize_t rc, bool is_zcopy)
    1088             : {
    1089          11 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1090          11 :         struct spdk_sock_request *req;
    1091          11 :         int retval;
    1092             : 
    1093          11 :         if (is_zcopy) {
    1094             :                 /* Handling overflow case, because we use psock->sendmsg_idx - 1 for the
    1095             :                  * req->internal.offset, so sendmsg_idx should not be zero */
    1096           0 :                 if (spdk_unlikely(sock->sendmsg_idx == UINT32_MAX)) {
    1097           0 :                         sock->sendmsg_idx = 1;
    1098           0 :                 } else {
    1099           0 :                         sock->sendmsg_idx++;
    1100             :                 }
    1101           0 :         }
    1102             : 
    1103             :         /* Consume the requests that were actually written */
    1104          11 :         req = TAILQ_FIRST(&_sock->queued_reqs);
    1105          13 :         while (req) {
    1106             :                 /* req->internal.is_zcopy is true when the whole req or part of it is sent with zerocopy */
    1107          13 :                 req->internal.is_zcopy = is_zcopy;
    1108             : 
    1109          13 :                 rc = sock_request_advance_offset(req, rc);
    1110          13 :                 if (rc < 0) {
    1111             :                         /* This element was partially sent. */
    1112           4 :                         return 0;
    1113             :                 }
    1114             : 
    1115             :                 /* Handled a full request. */
    1116           9 :                 spdk_sock_request_pend(_sock, req);
    1117             : 
    1118           9 :                 if (!req->internal.is_zcopy && req == TAILQ_FIRST(&_sock->pending_reqs)) {
    1119           9 :                         retval = spdk_sock_request_put(_sock, req, 0);
    1120           9 :                         if (retval) {
    1121           0 :                                 return retval;
    1122             :                         }
    1123           9 :                 } else {
    1124             :                         /* Re-use the offset field to hold the sendmsg call index. The
    1125             :                          * index is 0 based, so subtract one here because we've already
    1126             :                          * incremented above. */
    1127           0 :                         req->internal.offset = sock->sendmsg_idx - 1;
    1128             :                 }
    1129             : 
    1130           9 :                 if (rc == 0) {
    1131           7 :                         break;
    1132             :                 }
    1133             : 
    1134           2 :                 req = TAILQ_FIRST(&_sock->queued_reqs);
    1135             :         }
    1136             : 
    1137           7 :         return 0;
    1138          11 : }
    1139             : 
    1140             : #ifdef SPDK_ZEROCOPY
    1141             : static int
    1142           0 : _sock_check_zcopy(struct spdk_sock *_sock, int status)
    1143             : {
    1144           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1145           0 :         ssize_t rc;
    1146           0 :         struct sock_extended_err *serr;
    1147           0 :         struct cmsghdr *cm;
    1148           0 :         uint32_t idx;
    1149           0 :         struct spdk_sock_request *req, *treq;
    1150           0 :         bool found;
    1151             : 
    1152           0 :         assert(sock->zcopy == true);
    1153           0 :         if (spdk_unlikely(status) < 0) {
    1154           0 :                 if (!TAILQ_EMPTY(&_sock->pending_reqs)) {
    1155           0 :                         SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries, status =%d\n",
    1156             :                                     status);
    1157           0 :                 } else {
    1158           0 :                         SPDK_WARNLOG("Recvmsg yielded an error!\n");
    1159             :                 }
    1160           0 :                 return 0;
    1161             :         }
    1162             : 
    1163           0 :         cm = CMSG_FIRSTHDR(&sock->errqueue_task.msg);
    1164           0 :         if (!((cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR) ||
    1165           0 :               (cm->cmsg_level == SOL_IPV6 && cm->cmsg_type == IPV6_RECVERR))) {
    1166           0 :                 SPDK_WARNLOG("Unexpected cmsg level or type!\n");
    1167           0 :                 return 0;
    1168             :         }
    1169             : 
    1170           0 :         serr = (struct sock_extended_err *)CMSG_DATA(cm);
    1171           0 :         if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
    1172           0 :                 SPDK_WARNLOG("Unexpected extended error origin\n");
    1173           0 :                 return 0;
    1174             :         }
    1175             : 
    1176             :         /* Most of the time, the pending_reqs array is in the exact
    1177             :          * order we need such that all of the requests to complete are
    1178             :          * in order, in the front. It is guaranteed that all requests
    1179             :          * belonging to the same sendmsg call are sequential, so once
    1180             :          * we encounter one match we can stop looping as soon as a
    1181             :          * non-match is found.
    1182             :          */
    1183           0 :         for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
    1184           0 :                 found = false;
    1185           0 :                 TAILQ_FOREACH_SAFE(req, &_sock->pending_reqs, internal.link, treq) {
    1186           0 :                         if (!req->internal.is_zcopy) {
    1187             :                                 /* This wasn't a zcopy request. It was just waiting in line to complete */
    1188           0 :                                 rc = spdk_sock_request_put(_sock, req, 0);
    1189           0 :                                 if (rc < 0) {
    1190           0 :                                         return rc;
    1191             :                                 }
    1192           0 :                         } else if (req->internal.offset == idx) {
    1193           0 :                                 found = true;
    1194           0 :                                 rc = spdk_sock_request_put(_sock, req, 0);
    1195           0 :                                 if (rc < 0) {
    1196           0 :                                         return rc;
    1197             :                                 }
    1198           0 :                         } else if (found) {
    1199           0 :                                 break;
    1200             :                         }
    1201           0 :                 }
    1202           0 :         }
    1203             : 
    1204           0 :         return 0;
    1205           0 : }
    1206             : 
    1207             : static void
    1208           0 : _sock_prep_errqueue(struct spdk_sock *_sock)
    1209             : {
    1210           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1211           0 :         struct spdk_uring_task *task = &sock->errqueue_task;
    1212           0 :         struct io_uring_sqe *sqe;
    1213             : 
    1214           0 :         if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
    1215           0 :                 return;
    1216             :         }
    1217             : 
    1218           0 :         if (sock->pending_group_remove) {
    1219           0 :                 return;
    1220             :         }
    1221             : 
    1222           0 :         assert(sock->group != NULL);
    1223           0 :         sock->group->io_queued++;
    1224             : 
    1225           0 :         sqe = io_uring_get_sqe(&sock->group->uring);
    1226           0 :         io_uring_prep_recvmsg(sqe, sock->fd, &task->msg, MSG_ERRQUEUE);
    1227           0 :         io_uring_sqe_set_data(sqe, task);
    1228           0 :         task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
    1229           0 : }
    1230             : 
    1231             : #endif
    1232             : 
    1233             : static void
    1234           0 : _sock_flush(struct spdk_sock *_sock)
    1235             : {
    1236           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1237           0 :         struct spdk_uring_task *task = &sock->write_task;
    1238           0 :         uint32_t iovcnt;
    1239           0 :         struct io_uring_sqe *sqe;
    1240           0 :         int flags;
    1241             : 
    1242           0 :         if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
    1243           0 :                 return;
    1244             :         }
    1245             : 
    1246             : #ifdef SPDK_ZEROCOPY
    1247           0 :         if (sock->zcopy) {
    1248           0 :                 flags = MSG_DONTWAIT | sock->zcopy_send_flags;
    1249           0 :         } else
    1250             : #endif
    1251             :         {
    1252           0 :                 flags = MSG_DONTWAIT;
    1253             :         }
    1254             : 
    1255           0 :         iovcnt = spdk_sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req, &flags);
    1256           0 :         if (!iovcnt) {
    1257           0 :                 return;
    1258             :         }
    1259             : 
    1260           0 :         task->iov_cnt = iovcnt;
    1261           0 :         assert(sock->group != NULL);
    1262           0 :         task->msg.msg_iov = task->iovs;
    1263           0 :         task->msg.msg_iovlen = task->iov_cnt;
    1264             : #ifdef SPDK_ZEROCOPY
    1265           0 :         task->is_zcopy = (flags & MSG_ZEROCOPY) ? true : false;
    1266             : #endif
    1267           0 :         sock->group->io_queued++;
    1268             : 
    1269           0 :         sqe = io_uring_get_sqe(&sock->group->uring);
    1270           0 :         io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, flags);
    1271           0 :         io_uring_sqe_set_data(sqe, task);
    1272           0 :         task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
    1273           0 : }
    1274             : 
    1275             : static void
    1276           0 : _sock_prep_read(struct spdk_sock *_sock)
    1277             : {
    1278           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1279           0 :         struct spdk_uring_task *task = &sock->read_task;
    1280           0 :         struct io_uring_sqe *sqe;
    1281             : 
    1282             :         /* Do not prepare read event */
    1283           0 :         if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
    1284           0 :                 return;
    1285             :         }
    1286             : 
    1287           0 :         if (sock->pending_group_remove) {
    1288           0 :                 return;
    1289             :         }
    1290             : 
    1291           0 :         assert(sock->group != NULL);
    1292           0 :         sock->group->io_queued++;
    1293             : 
    1294           0 :         sqe = io_uring_get_sqe(&sock->group->uring);
    1295           0 :         io_uring_prep_recv(sqe, sock->fd, NULL, URING_MAX_RECV_SIZE, 0);
    1296           0 :         sqe->buf_group = URING_BUF_GROUP_ID;
    1297           0 :         sqe->flags |= IOSQE_BUFFER_SELECT;
    1298           0 :         io_uring_sqe_set_data(sqe, task);
    1299           0 :         task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
    1300           0 : }
    1301             : 
    1302             : static void
    1303           0 : _sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
    1304             : {
    1305           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1306           0 :         struct spdk_uring_task *task = &sock->cancel_task;
    1307           0 :         struct io_uring_sqe *sqe;
    1308             : 
    1309           0 :         if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
    1310           0 :                 return;
    1311             :         }
    1312             : 
    1313           0 :         assert(sock->group != NULL);
    1314           0 :         sock->group->io_queued++;
    1315             : 
    1316           0 :         sqe = io_uring_get_sqe(&sock->group->uring);
    1317           0 :         io_uring_prep_cancel(sqe, user_data, 0);
    1318           0 :         io_uring_sqe_set_data(sqe, task);
    1319           0 :         task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
    1320           0 : }
    1321             : 
    1322             : static void
    1323           0 : uring_sock_fail(struct spdk_uring_sock *sock, int status)
    1324             : {
    1325           0 :         struct spdk_uring_sock_group_impl *group = sock->group;
    1326           0 :         int rc;
    1327             : 
    1328           0 :         sock->connection_status = status;
    1329           0 :         rc = spdk_sock_abort_requests(&sock->base);
    1330             : 
    1331             :         /* The user needs to be notified that this socket is dead. */
    1332           0 :         if (rc == 0 && sock->base.cb_fn != NULL &&
    1333           0 :             sock->pending_recv == false) {
    1334           0 :                 sock->pending_recv = true;
    1335           0 :                 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
    1336           0 :         }
    1337           0 : }
    1338             : 
    1339             : static int
    1340           0 : sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
    1341             :                       struct spdk_sock **socks)
    1342             : {
    1343           0 :         int i, count, ret;
    1344           0 :         struct io_uring_cqe *cqe;
    1345           0 :         struct spdk_uring_sock *sock, *tmp;
    1346           0 :         struct spdk_uring_task *task;
    1347           0 :         int status, bid, flags;
    1348           0 :         bool is_zcopy;
    1349             : 
    1350           0 :         for (i = 0; i < max; i++) {
    1351           0 :                 ret = io_uring_peek_cqe(&group->uring, &cqe);
    1352           0 :                 if (ret != 0) {
    1353           0 :                         break;
    1354             :                 }
    1355             : 
    1356           0 :                 if (cqe == NULL) {
    1357           0 :                         break;
    1358             :                 }
    1359             : 
    1360           0 :                 task = (struct spdk_uring_task *)cqe->user_data;
    1361           0 :                 assert(task != NULL);
    1362           0 :                 sock = task->sock;
    1363           0 :                 assert(sock != NULL);
    1364           0 :                 assert(sock->group != NULL);
    1365           0 :                 assert(sock->group == group);
    1366           0 :                 sock->group->io_inflight--;
    1367           0 :                 sock->group->io_avail++;
    1368           0 :                 status = cqe->res;
    1369           0 :                 flags = cqe->flags;
    1370           0 :                 io_uring_cqe_seen(&group->uring, cqe);
    1371             : 
    1372           0 :                 task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
    1373             : 
    1374           0 :                 switch (task->type) {
    1375             :                 case URING_TASK_READ:
    1376           0 :                         if (status == -EAGAIN || status == -EWOULDBLOCK) {
    1377             :                                 /* This likely shouldn't happen, but would indicate that the
    1378             :                                  * kernel didn't have enough resources to queue a task internally. */
    1379           0 :                                 _sock_prep_read(&sock->base);
    1380           0 :                         } else if (status == -ECANCELED) {
    1381           0 :                                 continue;
    1382           0 :                         } else if (status == -ENOBUFS) {
    1383             :                                 /* There's data in the socket but the user hasn't provided any buffers.
    1384             :                                  * We need to notify the user that the socket has data pending. */
    1385           0 :                                 if (sock->base.cb_fn != NULL &&
    1386           0 :                                     sock->pending_recv == false) {
    1387           0 :                                         sock->pending_recv = true;
    1388           0 :                                         TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
    1389           0 :                                 }
    1390             : 
    1391           0 :                                 _sock_prep_read(&sock->base);
    1392           0 :                         } else if (spdk_unlikely(status <= 0)) {
    1393           0 :                                 uring_sock_fail(sock, status < 0 ? status : -ECONNRESET);
    1394           0 :                         } else {
    1395           0 :                                 struct spdk_uring_buf_tracker *tracker;
    1396             : 
    1397           0 :                                 assert((flags & IORING_CQE_F_BUFFER) != 0);
    1398             : 
    1399           0 :                                 bid = flags >> IORING_CQE_BUFFER_SHIFT;
    1400           0 :                                 tracker = &group->trackers[bid];
    1401             : 
    1402           0 :                                 assert(tracker->buf != NULL);
    1403           0 :                                 assert(tracker->len != 0);
    1404             : 
    1405             :                                 /* Append this data to the stream */
    1406           0 :                                 tracker->len = status;
    1407           0 :                                 STAILQ_INSERT_TAIL(&sock->recv_stream, tracker, link);
    1408           0 :                                 assert(group->buf_ring_count > 0);
    1409           0 :                                 group->buf_ring_count--;
    1410             : 
    1411           0 :                                 if (sock->base.cb_fn != NULL &&
    1412           0 :                                     sock->pending_recv == false) {
    1413           0 :                                         sock->pending_recv = true;
    1414           0 :                                         TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
    1415           0 :                                 }
    1416             : 
    1417           0 :                                 _sock_prep_read(&sock->base);
    1418           0 :                         }
    1419           0 :                         break;
    1420             :                 case URING_TASK_WRITE:
    1421           0 :                         if (status == -EAGAIN || status == -EWOULDBLOCK ||
    1422           0 :                             (status == -ENOBUFS && sock->zcopy) ||
    1423           0 :                             status == -ECANCELED) {
    1424           0 :                                 continue;
    1425           0 :                         } else if (spdk_unlikely(status) < 0) {
    1426           0 :                                 uring_sock_fail(sock, status);
    1427           0 :                         } else {
    1428           0 :                                 task->last_req = NULL;
    1429           0 :                                 task->iov_cnt = 0;
    1430           0 :                                 is_zcopy = task->is_zcopy;
    1431           0 :                                 task->is_zcopy = false;
    1432           0 :                                 sock_complete_write_reqs(&sock->base, status, is_zcopy);
    1433             :                         }
    1434             : 
    1435           0 :                         break;
    1436             : #ifdef SPDK_ZEROCOPY
    1437             :                 case URING_TASK_ERRQUEUE:
    1438           0 :                         if (status == -EAGAIN || status == -EWOULDBLOCK) {
    1439           0 :                                 _sock_prep_errqueue(&sock->base);
    1440           0 :                         } else if (status == -ECANCELED) {
    1441           0 :                                 continue;
    1442           0 :                         } else if (spdk_unlikely(status < 0)) {
    1443           0 :                                 uring_sock_fail(sock, status);
    1444           0 :                         } else {
    1445           0 :                                 _sock_check_zcopy(&sock->base, status);
    1446           0 :                                 _sock_prep_errqueue(&sock->base);
    1447             :                         }
    1448           0 :                         break;
    1449             : #endif
    1450             :                 case URING_TASK_CANCEL:
    1451             :                         /* Do nothing */
    1452           0 :                         break;
    1453             :                 default:
    1454           0 :                         SPDK_UNREACHABLE();
    1455             :                 }
    1456           0 :         }
    1457             : 
    1458           0 :         if (!socks) {
    1459           0 :                 return 0;
    1460             :         }
    1461           0 :         count = 0;
    1462           0 :         TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
    1463           0 :                 if (count == max_read_events) {
    1464           0 :                         break;
    1465             :                 }
    1466             : 
    1467             :                 /* If the socket's cb_fn is NULL, do not add it to socks array */
    1468           0 :                 if (spdk_unlikely(sock->base.cb_fn == NULL)) {
    1469           0 :                         assert(sock->pending_recv == true);
    1470           0 :                         sock->pending_recv = false;
    1471           0 :                         TAILQ_REMOVE(&group->pending_recv, sock, link);
    1472           0 :                         continue;
    1473             :                 }
    1474             : 
    1475           0 :                 socks[count++] = &sock->base;
    1476           0 :         }
    1477             : 
    1478             : 
    1479             :         /* Cycle the pending_recv list so that each time we poll things aren't
    1480             :          * in the same order. Say we have 6 sockets in the list, named as follows:
    1481             :          * A B C D E F
    1482             :          * And all 6 sockets had the poll events, but max_events is only 3. That means
    1483             :          * psock currently points at D. We want to rearrange the list to the following:
    1484             :          * D E F A B C
    1485             :          *
    1486             :          * The variables below are named according to this example to make it easier to
    1487             :          * follow the swaps.
    1488             :          */
    1489           0 :         if (sock != NULL) {
    1490           0 :                 struct spdk_uring_sock *ua, *uc, *ud, *uf;
    1491             : 
    1492             :                 /* Capture pointers to the elements we need */
    1493           0 :                 ud = sock;
    1494             : 
    1495           0 :                 ua = TAILQ_FIRST(&group->pending_recv);
    1496           0 :                 if (ua == ud) {
    1497           0 :                         goto end;
    1498             :                 }
    1499             : 
    1500           0 :                 uf = TAILQ_LAST(&group->pending_recv, pending_recv_list);
    1501           0 :                 if (uf == ud) {
    1502           0 :                         TAILQ_REMOVE(&group->pending_recv, ud, link);
    1503           0 :                         TAILQ_INSERT_HEAD(&group->pending_recv, ud, link);
    1504           0 :                         goto end;
    1505             :                 }
    1506             : 
    1507           0 :                 uc = TAILQ_PREV(ud, pending_recv_list, link);
    1508           0 :                 assert(uc != NULL);
    1509             : 
    1510             :                 /* Break the link between C and D */
    1511           0 :                 uc->link.tqe_next = NULL;
    1512             : 
    1513             :                 /* Connect F to A */
    1514           0 :                 uf->link.tqe_next = ua;
    1515           0 :                 ua->link.tqe_prev = &uf->link.tqe_next;
    1516             : 
    1517             :                 /* Fix up the list first/last pointers */
    1518           0 :                 group->pending_recv.tqh_first = ud;
    1519           0 :                 group->pending_recv.tqh_last = &uc->link.tqe_next;
    1520             : 
    1521             :                 /* D is in front of the list, make tqe prev pointer point to the head of list */
    1522           0 :                 ud->link.tqe_prev = &group->pending_recv.tqh_first;
    1523           0 :         }
    1524             : 
    1525             : end:
    1526           0 :         return count;
    1527           0 : }
    1528             : 
    1529             : static int uring_sock_flush(struct spdk_sock *_sock);
    1530             : 
    1531             : static void
    1532           0 : uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
    1533             : {
    1534           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1535           0 :         int rc;
    1536             : 
    1537           0 :         if (spdk_unlikely(sock->connection_status)) {
    1538           0 :                 req->cb_fn(req->cb_arg, sock->connection_status);
    1539           0 :                 return;
    1540             :         }
    1541             : 
    1542           0 :         spdk_sock_request_queue(_sock, req);
    1543             : 
    1544           0 :         if (!sock->group) {
    1545           0 :                 if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
    1546           0 :                         rc = uring_sock_flush(_sock);
    1547           0 :                         if (rc < 0 && errno != EAGAIN) {
    1548           0 :                                 spdk_sock_abort_requests(_sock);
    1549           0 :                         }
    1550           0 :                 }
    1551           0 :         }
    1552           0 : }
    1553             : 
    1554             : static int
    1555           0 : uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
    1556             : {
    1557           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1558           0 :         int val;
    1559           0 :         int rc;
    1560             : 
    1561           0 :         assert(sock != NULL);
    1562             : 
    1563           0 :         val = nbytes;
    1564           0 :         rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
    1565           0 :         if (rc != 0) {
    1566           0 :                 return -1;
    1567             :         }
    1568           0 :         return 0;
    1569           0 : }
    1570             : 
    1571             : static bool
    1572           0 : uring_sock_is_ipv6(struct spdk_sock *_sock)
    1573             : {
    1574           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1575           0 :         struct sockaddr_storage sa;
    1576           0 :         socklen_t salen;
    1577           0 :         int rc;
    1578             : 
    1579           0 :         assert(sock != NULL);
    1580             : 
    1581           0 :         memset(&sa, 0, sizeof sa);
    1582           0 :         salen = sizeof sa;
    1583           0 :         rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
    1584           0 :         if (rc != 0) {
    1585           0 :                 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
    1586           0 :                 return false;
    1587             :         }
    1588             : 
    1589           0 :         return (sa.ss_family == AF_INET6);
    1590           0 : }
    1591             : 
    1592             : static bool
    1593           0 : uring_sock_is_ipv4(struct spdk_sock *_sock)
    1594             : {
    1595           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1596           0 :         struct sockaddr_storage sa;
    1597           0 :         socklen_t salen;
    1598           0 :         int rc;
    1599             : 
    1600           0 :         assert(sock != NULL);
    1601             : 
    1602           0 :         memset(&sa, 0, sizeof sa);
    1603           0 :         salen = sizeof sa;
    1604           0 :         rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
    1605           0 :         if (rc != 0) {
    1606           0 :                 SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
    1607           0 :                 return false;
    1608             :         }
    1609             : 
    1610           0 :         return (sa.ss_family == AF_INET);
    1611           0 : }
    1612             : 
    1613             : static bool
    1614           0 : uring_sock_is_connected(struct spdk_sock *_sock)
    1615             : {
    1616           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1617           0 :         uint8_t byte;
    1618           0 :         int rc;
    1619             : 
    1620           0 :         rc = recv(sock->fd, &byte, 1, MSG_PEEK | MSG_DONTWAIT);
    1621           0 :         if (rc == 0) {
    1622           0 :                 return false;
    1623             :         }
    1624             : 
    1625           0 :         if (rc < 0) {
    1626           0 :                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
    1627           0 :                         return true;
    1628             :                 }
    1629             : 
    1630           0 :                 return false;
    1631             :         }
    1632             : 
    1633           0 :         return true;
    1634           0 : }
    1635             : 
    1636             : static struct spdk_sock_group_impl *
    1637           0 : uring_sock_group_impl_get_optimal(struct spdk_sock *_sock, struct spdk_sock_group_impl *hint)
    1638             : {
    1639           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1640           0 :         struct spdk_sock_group_impl *group;
    1641             : 
    1642           0 :         if (sock->placement_id != -1) {
    1643           0 :                 spdk_sock_map_lookup(&g_map, sock->placement_id, &group, hint);
    1644           0 :                 return group;
    1645             :         }
    1646             : 
    1647           0 :         return NULL;
    1648           0 : }
    1649             : 
    1650             : static int
    1651           0 : uring_sock_group_impl_buf_pool_free(struct spdk_uring_sock_group_impl *group_impl)
    1652             : {
    1653           0 :         if (group_impl->buf_ring) {
    1654           0 :                 io_uring_unregister_buf_ring(&group_impl->uring, URING_BUF_GROUP_ID);
    1655           0 :                 free(group_impl->buf_ring);
    1656           0 :         }
    1657             : 
    1658           0 :         free(group_impl->trackers);
    1659             : 
    1660           0 :         return 0;
    1661             : }
    1662             : 
    1663             : static int
    1664           1 : uring_sock_group_impl_buf_pool_alloc(struct spdk_uring_sock_group_impl *group_impl)
    1665             : {
    1666           1 :         struct io_uring_buf_reg buf_reg = {};
    1667           1 :         struct io_uring_buf_ring *buf_ring;
    1668           1 :         int i, rc;
    1669             : 
    1670           1 :         rc = posix_memalign((void **)&buf_ring, 0x1000, URING_BUF_POOL_SIZE * sizeof(struct io_uring_buf));
    1671           1 :         if (rc != 0) {
    1672             :                 /* posix_memalign returns positive errno values */
    1673           0 :                 return -rc;
    1674             :         }
    1675             : 
    1676           1 :         buf_reg.ring_addr = (unsigned long long)buf_ring;
    1677           1 :         buf_reg.ring_entries = URING_BUF_POOL_SIZE;
    1678           1 :         buf_reg.bgid = URING_BUF_GROUP_ID;
    1679             : 
    1680           1 :         rc = io_uring_register_buf_ring(&group_impl->uring, &buf_reg, 0);
    1681           1 :         if (rc != 0) {
    1682           1 :                 free(buf_ring);
    1683           1 :                 return rc;
    1684             :         }
    1685             : 
    1686           0 :         group_impl->buf_ring = buf_ring;
    1687           0 :         io_uring_buf_ring_init(group_impl->buf_ring);
    1688           0 :         group_impl->buf_ring_count = 0;
    1689             : 
    1690           0 :         group_impl->trackers = calloc(URING_BUF_POOL_SIZE, sizeof(struct spdk_uring_buf_tracker));
    1691           0 :         if (group_impl->trackers == NULL) {
    1692           0 :                 uring_sock_group_impl_buf_pool_free(group_impl);
    1693           0 :                 return -ENOMEM;
    1694             :         }
    1695             : 
    1696           0 :         STAILQ_INIT(&group_impl->free_trackers);
    1697             : 
    1698           0 :         for (i = 0; i < URING_BUF_POOL_SIZE; i++) {
    1699           0 :                 struct spdk_uring_buf_tracker *tracker = &group_impl->trackers[i];
    1700             : 
    1701           0 :                 tracker->buf = NULL;
    1702           0 :                 tracker->len = 0;
    1703           0 :                 tracker->ctx = NULL;
    1704           0 :                 tracker->id = i;
    1705             : 
    1706           0 :                 STAILQ_INSERT_TAIL(&group_impl->free_trackers, tracker, link);
    1707           0 :         }
    1708             : 
    1709           0 :         return 0;
    1710           1 : }
    1711             : 
    1712             : static struct spdk_sock_group_impl *
    1713           1 : uring_sock_group_impl_create(void)
    1714             : {
    1715           1 :         struct spdk_uring_sock_group_impl *group_impl;
    1716             : 
    1717           1 :         group_impl = calloc(1, sizeof(*group_impl));
    1718           1 :         if (group_impl == NULL) {
    1719           0 :                 SPDK_ERRLOG("group_impl allocation failed\n");
    1720           0 :                 return NULL;
    1721             :         }
    1722             : 
    1723           1 :         group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
    1724             : 
    1725           1 :         if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
    1726           0 :                 SPDK_ERRLOG("uring I/O context setup failure\n");
    1727           0 :                 free(group_impl);
    1728           0 :                 return NULL;
    1729             :         }
    1730             : 
    1731           1 :         TAILQ_INIT(&group_impl->pending_recv);
    1732             : 
    1733           1 :         if (uring_sock_group_impl_buf_pool_alloc(group_impl) < 0) {
    1734           1 :                 SPDK_ERRLOG("Failed to create buffer ring."
    1735             :                             "uring sock implementation is likely not supported on this kernel.\n");
    1736           1 :                 io_uring_queue_exit(&group_impl->uring);
    1737           1 :                 free(group_impl);
    1738           1 :                 return NULL;
    1739             :         }
    1740             : 
    1741           0 :         if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
    1742           0 :                 spdk_sock_map_insert(&g_map, spdk_env_get_current_core(), &group_impl->base);
    1743           0 :         }
    1744             : 
    1745           0 :         return &group_impl->base;
    1746           1 : }
    1747             : 
    1748             : static int
    1749           0 : uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
    1750             :                                struct spdk_sock *_sock)
    1751             : {
    1752           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1753           0 :         struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
    1754           0 :         int rc;
    1755             : 
    1756           0 :         sock->group = group;
    1757           0 :         sock->write_task.sock = sock;
    1758           0 :         sock->write_task.type = URING_TASK_WRITE;
    1759             : 
    1760           0 :         sock->read_task.sock = sock;
    1761           0 :         sock->read_task.type = URING_TASK_READ;
    1762             : 
    1763           0 :         sock->errqueue_task.sock = sock;
    1764           0 :         sock->errqueue_task.type = URING_TASK_ERRQUEUE;
    1765           0 :         sock->errqueue_task.msg.msg_control = sock->buf;
    1766           0 :         sock->errqueue_task.msg.msg_controllen = sizeof(sock->buf);
    1767             : 
    1768           0 :         sock->cancel_task.sock = sock;
    1769           0 :         sock->cancel_task.type = URING_TASK_CANCEL;
    1770             : 
    1771             :         /* switched from another polling group due to scheduling */
    1772           0 :         if (spdk_unlikely(sock->recv_pipe != NULL &&
    1773             :                           (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
    1774           0 :                 assert(sock->pending_recv == false);
    1775           0 :                 sock->pending_recv = true;
    1776           0 :                 TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
    1777           0 :         }
    1778             : 
    1779           0 :         if (sock->placement_id != -1) {
    1780           0 :                 rc = spdk_sock_map_insert(&g_map, sock->placement_id, &group->base);
    1781           0 :                 if (rc != 0) {
    1782           0 :                         SPDK_ERRLOG("Failed to insert sock group into map: %d", rc);
    1783             :                         /* Do not treat this as an error. The system will continue running. */
    1784           0 :                 }
    1785           0 :         }
    1786             : 
    1787             :         /* We get an async read going immediately */
    1788           0 :         _sock_prep_read(&sock->base);
    1789             : #ifdef SPDK_ZEROCOPY
    1790           0 :         if (sock->zcopy) {
    1791           0 :                 _sock_prep_errqueue(_sock);
    1792           0 :         }
    1793             : #endif
    1794             : 
    1795           0 :         return 0;
    1796           0 : }
    1797             : 
    1798             : static void
    1799           0 : uring_sock_group_populate_buf_ring(struct spdk_uring_sock_group_impl *group)
    1800             : {
    1801           0 :         struct spdk_uring_buf_tracker *tracker;
    1802           0 :         int count, mask;
    1803             : 
    1804           0 :         if (g_spdk_uring_sock_impl_opts.enable_recv_pipe) {
    1805             :                 /* If recv_pipe is enabled, we do not post buffers. */
    1806           0 :                 return;
    1807             :         }
    1808             : 
    1809             :         /* Try to re-populate the io_uring's buffer pool using user-provided buffers */
    1810           0 :         tracker = STAILQ_FIRST(&group->free_trackers);
    1811           0 :         count = 0;
    1812           0 :         mask = io_uring_buf_ring_mask(URING_BUF_POOL_SIZE);
    1813           0 :         while (tracker != NULL) {
    1814           0 :                 tracker->buflen = spdk_sock_group_get_buf(group->base.group, &tracker->buf, &tracker->ctx);
    1815           0 :                 if (tracker->buflen == 0) {
    1816           0 :                         break;
    1817             :                 }
    1818             : 
    1819           0 :                 assert(tracker->buf != NULL);
    1820           0 :                 STAILQ_REMOVE_HEAD(&group->free_trackers, link);
    1821           0 :                 assert(STAILQ_FIRST(&group->free_trackers) != tracker);
    1822             : 
    1823           0 :                 io_uring_buf_ring_add(group->buf_ring, tracker->buf, tracker->buflen, tracker->id, mask, count);
    1824           0 :                 count++;
    1825           0 :                 tracker = STAILQ_FIRST(&group->free_trackers);
    1826             :         }
    1827             : 
    1828           0 :         if (count > 0) {
    1829           0 :                 group->buf_ring_count += count;
    1830           0 :                 io_uring_buf_ring_advance(group->buf_ring, count);
    1831           0 :         }
    1832           0 : }
    1833             : 
    1834             : static int
    1835           0 : uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
    1836             :                            struct spdk_sock **socks)
    1837             : {
    1838           0 :         struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
    1839           0 :         int count, ret;
    1840           0 :         int to_complete, to_submit;
    1841           0 :         struct spdk_sock *_sock, *tmp;
    1842           0 :         struct spdk_uring_sock *sock;
    1843             : 
    1844           0 :         if (spdk_likely(socks)) {
    1845           0 :                 TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
    1846           0 :                         sock = __uring_sock(_sock);
    1847           0 :                         if (spdk_unlikely(sock->connection_status)) {
    1848           0 :                                 continue;
    1849             :                         }
    1850           0 :                         _sock_flush(_sock);
    1851           0 :                 }
    1852           0 :         }
    1853             : 
    1854             :         /* Try to re-populate the io_uring's buffer pool using user-provided buffers */
    1855           0 :         uring_sock_group_populate_buf_ring(group);
    1856             : 
    1857           0 :         to_submit = group->io_queued;
    1858             : 
    1859             :         /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
    1860           0 :         if (to_submit > 0) {
    1861             :                 /* If there are I/O to submit, use io_uring_submit here.
    1862             :                  * It will automatically call io_uring_enter appropriately. */
    1863           0 :                 ret = io_uring_submit(&group->uring);
    1864           0 :                 if (ret < 0) {
    1865           0 :                         return 1;
    1866             :                 }
    1867           0 :                 group->io_queued = 0;
    1868           0 :                 group->io_inflight += to_submit;
    1869           0 :                 group->io_avail -= to_submit;
    1870           0 :         }
    1871             : 
    1872           0 :         count = 0;
    1873           0 :         to_complete = group->io_inflight;
    1874           0 :         if (to_complete > 0 || !TAILQ_EMPTY(&group->pending_recv)) {
    1875           0 :                 count = sock_uring_group_reap(group, to_complete, max_events, socks);
    1876           0 :         }
    1877             : 
    1878           0 :         return count;
    1879           0 : }
    1880             : 
    1881             : static int
    1882           0 : uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
    1883             :                                   struct spdk_sock *_sock)
    1884             : {
    1885           0 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1886           0 :         struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
    1887             : 
    1888           0 :         sock->pending_group_remove = true;
    1889             : 
    1890           0 :         if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
    1891           0 :                 _sock_prep_cancel_task(_sock, &sock->write_task);
    1892             :                 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so
    1893             :                  * currently can use a while loop here. */
    1894           0 :                 while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
    1895           0 :                        (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
    1896           0 :                         uring_sock_group_impl_poll(_group, 32, NULL);
    1897             :                 }
    1898           0 :         }
    1899             : 
    1900           0 :         if (sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
    1901           0 :                 _sock_prep_cancel_task(_sock, &sock->read_task);
    1902             :                 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so
    1903             :                  * currently can use a while loop here. */
    1904           0 :                 while ((sock->read_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
    1905           0 :                        (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
    1906           0 :                         uring_sock_group_impl_poll(_group, 32, NULL);
    1907             :                 }
    1908           0 :         }
    1909             : 
    1910           0 :         if (sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
    1911           0 :                 _sock_prep_cancel_task(_sock, &sock->errqueue_task);
    1912             :                 /* Since spdk_sock_group_remove_sock is not asynchronous interface, so
    1913             :                  * currently can use a while loop here. */
    1914           0 :                 while ((sock->errqueue_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
    1915           0 :                        (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
    1916           0 :                         uring_sock_group_impl_poll(_group, 32, NULL);
    1917             :                 }
    1918           0 :         }
    1919             : 
    1920             :         /* Make sure the cancelling the tasks above didn't cause sending new requests */
    1921           0 :         assert(sock->write_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
    1922           0 :         assert(sock->read_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
    1923           0 :         assert(sock->errqueue_task.status == SPDK_URING_SOCK_TASK_NOT_IN_USE);
    1924             : 
    1925           0 :         if (sock->pending_recv) {
    1926           0 :                 TAILQ_REMOVE(&group->pending_recv, sock, link);
    1927           0 :                 sock->pending_recv = false;
    1928           0 :         }
    1929           0 :         assert(sock->pending_recv == false);
    1930             : 
    1931             :         /* We have no way to handle this case. We could let the user read this
    1932             :          * buffer, but the buffer came from a group and we have lost the association
    1933             :          * to that so we couldn't release it. */
    1934           0 :         assert(STAILQ_EMPTY(&sock->recv_stream));
    1935             : 
    1936           0 :         if (sock->placement_id != -1) {
    1937           0 :                 spdk_sock_map_release(&g_map, sock->placement_id);
    1938           0 :         }
    1939             : 
    1940           0 :         sock->pending_group_remove = false;
    1941           0 :         sock->group = NULL;
    1942           0 :         return 0;
    1943           0 : }
    1944             : 
    1945             : static int
    1946           0 : uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
    1947             : {
    1948           0 :         struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
    1949             : 
    1950             :         /* try to reap all the active I/O */
    1951           0 :         while (group->io_inflight) {
    1952           0 :                 uring_sock_group_impl_poll(_group, 32, NULL);
    1953             :         }
    1954           0 :         assert(group->io_inflight == 0);
    1955           0 :         assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
    1956             : 
    1957           0 :         uring_sock_group_impl_buf_pool_free(group);
    1958             : 
    1959           0 :         io_uring_queue_exit(&group->uring);
    1960             : 
    1961           0 :         if (g_spdk_uring_sock_impl_opts.enable_placement_id == PLACEMENT_CPU) {
    1962           0 :                 spdk_sock_map_release(&g_map, spdk_env_get_current_core());
    1963           0 :         }
    1964             : 
    1965           0 :         free(group);
    1966           0 :         return 0;
    1967           0 : }
    1968             : 
    1969             : static int
    1970           6 : uring_sock_flush(struct spdk_sock *_sock)
    1971             : {
    1972           6 :         struct spdk_uring_sock *sock = __uring_sock(_sock);
    1973           6 :         struct msghdr msg = {};
    1974           6 :         struct iovec iovs[IOV_BATCH_SIZE];
    1975           6 :         int iovcnt;
    1976           6 :         ssize_t rc;
    1977           6 :         int flags = sock->zcopy_send_flags;
    1978           6 :         int retval;
    1979           6 :         bool is_zcopy = false;
    1980           6 :         struct spdk_uring_task *task = &sock->errqueue_task;
    1981             : 
    1982             :         /* Can't flush from within a callback or we end up with recursive calls */
    1983           6 :         if (_sock->cb_cnt > 0) {
    1984           0 :                 errno = EAGAIN;
    1985           0 :                 return -1;
    1986             :         }
    1987             : 
    1988             :         /* Can't flush while a write is already outstanding */
    1989           6 :         if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
    1990           0 :                 errno = EAGAIN;
    1991           0 :                 return -1;
    1992             :         }
    1993             : 
    1994             :         /* Gather an iov */
    1995           6 :         iovcnt = spdk_sock_prep_reqs(_sock, iovs, 0, NULL, &flags);
    1996           6 :         if (iovcnt == 0) {
    1997             :                 /* Nothing to send */
    1998           0 :                 return 0;
    1999             :         }
    2000             : 
    2001             :         /* Perform the vectored write */
    2002           6 :         msg.msg_iov = iovs;
    2003           6 :         msg.msg_iovlen = iovcnt;
    2004           6 :         rc = sendmsg(sock->fd, &msg, flags | MSG_DONTWAIT);
    2005           6 :         if (rc <= 0) {
    2006           0 :                 if (rc == 0 || errno == EAGAIN || errno == EWOULDBLOCK || (errno == ENOBUFS && sock->zcopy)) {
    2007           0 :                         errno = EAGAIN;
    2008           0 :                 }
    2009           0 :                 return -1;
    2010             :         }
    2011             : 
    2012             : #ifdef SPDK_ZEROCOPY
    2013           6 :         is_zcopy = flags & MSG_ZEROCOPY;
    2014             : #endif
    2015           6 :         retval = sock_complete_write_reqs(_sock, rc, is_zcopy);
    2016           6 :         if (retval < 0) {
    2017             :                 /* if the socket is closed, return to avoid heap-use-after-free error */
    2018           0 :                 errno = ENOTCONN;
    2019           0 :                 return -1;
    2020             :         }
    2021             : 
    2022             : #ifdef SPDK_ZEROCOPY
    2023             :         /* At least do once to check zero copy case */
    2024           6 :         if (sock->zcopy && !TAILQ_EMPTY(&_sock->pending_reqs)) {
    2025           0 :                 retval = recvmsg(sock->fd, &task->msg, MSG_ERRQUEUE);
    2026           0 :                 if (retval < 0) {
    2027           0 :                         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    2028           0 :                                 return rc;
    2029             :                         }
    2030           0 :                 }
    2031           0 :                 _sock_check_zcopy(_sock, retval);;
    2032           0 :         }
    2033             : #endif
    2034             : 
    2035           6 :         return rc;
    2036           6 : }
    2037             : 
    2038             : static int
    2039           0 : uring_sock_group_impl_register_interrupt(struct spdk_sock_group_impl *_group, uint32_t events,
    2040             :                 spdk_interrupt_fn fn, void *arg, const char *name)
    2041             : {
    2042           0 :         SPDK_ERRLOG("Interrupt mode is not supported in the uring sock implementation.");
    2043             : 
    2044           0 :         return -ENOTSUP;
    2045             : }
    2046             : 
    2047             : static void
    2048           0 : uring_sock_group_impl_unregister_interrupt(struct spdk_sock_group_impl *_group)
    2049             : {
    2050           0 : }
    2051             : 
    2052             : static struct spdk_net_impl g_uring_net_impl = {
    2053             :         .name           = "uring",
    2054             :         .getaddr        = uring_sock_getaddr,
    2055             :         .get_interface_name = uring_sock_get_interface_name,
    2056             :         .get_numa_id    = uring_sock_get_numa_id,
    2057             :         .connect        = uring_sock_connect,
    2058             :         .listen         = uring_sock_listen,
    2059             :         .accept         = uring_sock_accept,
    2060             :         .close          = uring_sock_close,
    2061             :         .recv           = uring_sock_recv,
    2062             :         .readv          = uring_sock_readv,
    2063             :         .writev         = uring_sock_writev,
    2064             :         .recv_next      = uring_sock_recv_next,
    2065             :         .writev_async   = uring_sock_writev_async,
    2066             :         .flush          = uring_sock_flush,
    2067             :         .set_recvlowat  = uring_sock_set_recvlowat,
    2068             :         .set_recvbuf    = uring_sock_set_recvbuf,
    2069             :         .set_sendbuf    = uring_sock_set_sendbuf,
    2070             :         .is_ipv6        = uring_sock_is_ipv6,
    2071             :         .is_ipv4        = uring_sock_is_ipv4,
    2072             :         .is_connected   = uring_sock_is_connected,
    2073             :         .group_impl_get_optimal = uring_sock_group_impl_get_optimal,
    2074             :         .group_impl_create      = uring_sock_group_impl_create,
    2075             :         .group_impl_add_sock    = uring_sock_group_impl_add_sock,
    2076             :         .group_impl_remove_sock = uring_sock_group_impl_remove_sock,
    2077             :         .group_impl_poll        = uring_sock_group_impl_poll,
    2078             :         .group_impl_register_interrupt    = uring_sock_group_impl_register_interrupt,
    2079             :         .group_impl_unregister_interrupt  = uring_sock_group_impl_unregister_interrupt,
    2080             :         .group_impl_close       = uring_sock_group_impl_close,
    2081             :         .get_opts               = uring_sock_impl_get_opts,
    2082             :         .set_opts               = uring_sock_impl_set_opts,
    2083             : };
    2084             : 
    2085             : __attribute__((constructor)) static void
    2086           1 : net_impl_register_uring(void)
    2087             : {
    2088           1 :         struct spdk_sock_group_impl *impl;
    2089             : 
    2090             :         /* Check if we can create a uring sock group before we register
    2091             :          * it as a valid impl. */
    2092           1 :         impl = uring_sock_group_impl_create();
    2093           1 :         if (impl) {
    2094           0 :                 uring_sock_group_impl_close(impl);
    2095           0 :                 spdk_net_impl_register(&g_uring_net_impl);
    2096           0 :         }
    2097           1 : }

Generated by: LCOV version 1.15