Line data Source code
1 : /* SPDX-License-Identifier: BSD-3-Clause
2 : * Copyright (C) 2016 Intel Corporation.
3 : * All rights reserved.
4 : */
5 :
6 : #include "jsonrpc_internal.h"
7 : #include "spdk/string.h"
8 : #include "spdk/util.h"
9 :
10 : struct spdk_jsonrpc_server *
11 0 : spdk_jsonrpc_server_listen(int domain, int protocol,
12 : struct sockaddr *listen_addr, socklen_t addrlen,
13 : spdk_jsonrpc_handle_request_fn handle_request)
14 : {
15 : struct spdk_jsonrpc_server *server;
16 : int rc, val, i;
17 :
18 0 : server = calloc(1, sizeof(struct spdk_jsonrpc_server));
19 0 : if (server == NULL) {
20 0 : return NULL;
21 : }
22 :
23 0 : TAILQ_INIT(&server->free_conns);
24 0 : TAILQ_INIT(&server->conns);
25 :
26 0 : for (i = 0; i < SPDK_JSONRPC_MAX_CONNS; i++) {
27 0 : TAILQ_INSERT_TAIL(&server->free_conns, &server->conns_array[i], link);
28 : }
29 :
30 0 : server->handle_request = handle_request;
31 :
32 0 : server->sockfd = socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
33 0 : if (server->sockfd < 0) {
34 0 : SPDK_ERRLOG("socket() failed\n");
35 0 : free(server);
36 0 : return NULL;
37 : }
38 :
39 0 : val = 1;
40 0 : rc = setsockopt(server->sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
41 0 : if (rc != 0) {
42 0 : SPDK_ERRLOG("could not set SO_REUSEADDR sock option: %s\n", spdk_strerror(errno));
43 0 : close(server->sockfd);
44 0 : free(server);
45 0 : return NULL;
46 : }
47 :
48 0 : rc = bind(server->sockfd, listen_addr, addrlen);
49 0 : if (rc != 0) {
50 0 : SPDK_ERRLOG("could not bind JSON-RPC server: %s\n", spdk_strerror(errno));
51 0 : close(server->sockfd);
52 0 : free(server);
53 0 : return NULL;
54 : }
55 :
56 0 : rc = listen(server->sockfd, 512);
57 0 : if (rc != 0) {
58 0 : SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
59 0 : close(server->sockfd);
60 0 : free(server);
61 0 : return NULL;
62 : }
63 :
64 0 : return server;
65 : }
66 :
67 : static struct spdk_jsonrpc_request *
68 0 : jsonrpc_server_dequeue_request(struct spdk_jsonrpc_server_conn *conn)
69 : {
70 0 : struct spdk_jsonrpc_request *request = NULL;
71 :
72 0 : pthread_spin_lock(&conn->queue_lock);
73 0 : request = STAILQ_FIRST(&conn->send_queue);
74 0 : if (request) {
75 0 : STAILQ_REMOVE_HEAD(&conn->send_queue, link);
76 : }
77 0 : pthread_spin_unlock(&conn->queue_lock);
78 0 : return request;
79 : }
80 :
81 : static void
82 0 : jsonrpc_server_free_conn_request(struct spdk_jsonrpc_server_conn *conn)
83 : {
84 : struct spdk_jsonrpc_request *request;
85 :
86 0 : jsonrpc_free_request(conn->send_request);
87 0 : conn->send_request = NULL ;
88 :
89 0 : pthread_spin_lock(&conn->queue_lock);
90 : /* There might still be some requests being processed.
91 : * We need to tell them that this connection is closed. */
92 0 : STAILQ_FOREACH(request, &conn->outstanding_queue, link) {
93 0 : request->conn = NULL;
94 : }
95 0 : pthread_spin_unlock(&conn->queue_lock);
96 :
97 0 : while ((request = jsonrpc_server_dequeue_request(conn)) != NULL) {
98 0 : jsonrpc_free_request(request);
99 : }
100 0 : }
101 :
102 : static void
103 0 : jsonrpc_server_conn_close(struct spdk_jsonrpc_server_conn *conn)
104 : {
105 0 : conn->closed = true;
106 :
107 0 : if (conn->sockfd >= 0) {
108 0 : jsonrpc_server_free_conn_request(conn);
109 0 : close(conn->sockfd);
110 0 : conn->sockfd = -1;
111 :
112 0 : if (conn->close_cb) {
113 0 : conn->close_cb(conn, conn->close_cb_ctx);
114 : }
115 : }
116 0 : }
117 :
118 : void
119 0 : spdk_jsonrpc_server_shutdown(struct spdk_jsonrpc_server *server)
120 : {
121 : struct spdk_jsonrpc_server_conn *conn;
122 :
123 0 : close(server->sockfd);
124 :
125 0 : TAILQ_FOREACH(conn, &server->conns, link) {
126 0 : jsonrpc_server_conn_close(conn);
127 : }
128 :
129 0 : free(server);
130 0 : }
131 :
132 : static void
133 0 : jsonrpc_server_conn_remove(struct spdk_jsonrpc_server_conn *conn)
134 : {
135 0 : struct spdk_jsonrpc_server *server = conn->server;
136 :
137 0 : jsonrpc_server_conn_close(conn);
138 :
139 0 : pthread_spin_destroy(&conn->queue_lock);
140 0 : assert(STAILQ_EMPTY(&conn->send_queue));
141 :
142 0 : TAILQ_REMOVE(&server->conns, conn, link);
143 0 : TAILQ_INSERT_HEAD(&server->free_conns, conn, link);
144 0 : }
145 :
146 : int
147 0 : spdk_jsonrpc_conn_add_close_cb(struct spdk_jsonrpc_server_conn *conn,
148 : spdk_jsonrpc_conn_closed_fn cb, void *ctx)
149 : {
150 0 : int rc = 0;
151 :
152 0 : pthread_spin_lock(&conn->queue_lock);
153 0 : if (conn->close_cb == NULL) {
154 0 : conn->close_cb = cb;
155 0 : conn->close_cb_ctx = ctx;
156 : } else {
157 0 : rc = conn->close_cb == cb && conn->close_cb_ctx == ctx ? -EEXIST : -ENOSPC;
158 : }
159 0 : pthread_spin_unlock(&conn->queue_lock);
160 :
161 0 : return rc;
162 : }
163 :
164 : int
165 0 : spdk_jsonrpc_conn_del_close_cb(struct spdk_jsonrpc_server_conn *conn,
166 : spdk_jsonrpc_conn_closed_fn cb, void *ctx)
167 : {
168 0 : int rc = 0;
169 :
170 0 : pthread_spin_lock(&conn->queue_lock);
171 0 : if (conn->close_cb == NULL || conn->close_cb != cb || conn->close_cb_ctx != ctx) {
172 0 : rc = -ENOENT;
173 : } else {
174 0 : conn->close_cb = NULL;
175 : }
176 0 : pthread_spin_unlock(&conn->queue_lock);
177 :
178 0 : return rc;
179 : }
180 :
181 : static int
182 0 : jsonrpc_server_accept(struct spdk_jsonrpc_server *server)
183 : {
184 : struct spdk_jsonrpc_server_conn *conn;
185 : int rc, flag;
186 :
187 0 : rc = accept(server->sockfd, NULL, NULL);
188 0 : if (rc >= 0) {
189 0 : conn = TAILQ_FIRST(&server->free_conns);
190 0 : assert(conn != NULL);
191 :
192 0 : conn->server = server;
193 0 : conn->sockfd = rc;
194 0 : conn->closed = false;
195 0 : conn->recv_len = 0;
196 0 : conn->outstanding_requests = 0;
197 0 : STAILQ_INIT(&conn->send_queue);
198 0 : STAILQ_INIT(&conn->outstanding_queue);
199 0 : conn->send_request = NULL;
200 :
201 0 : if (pthread_spin_init(&conn->queue_lock, PTHREAD_PROCESS_PRIVATE)) {
202 0 : SPDK_ERRLOG("Unable to create queue lock for socket: %d", conn->sockfd);
203 0 : close(conn->sockfd);
204 0 : return -1;
205 : }
206 :
207 0 : flag = fcntl(conn->sockfd, F_GETFL);
208 0 : if (fcntl(conn->sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
209 0 : SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n",
210 : conn->sockfd, spdk_strerror(errno));
211 0 : close(conn->sockfd);
212 0 : pthread_spin_destroy(&conn->queue_lock);
213 0 : return -1;
214 : }
215 :
216 0 : TAILQ_REMOVE(&server->free_conns, conn, link);
217 0 : TAILQ_INSERT_TAIL(&server->conns, conn, link);
218 0 : return 0;
219 : }
220 :
221 0 : if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
222 0 : return 0;
223 : }
224 :
225 0 : return -1;
226 : }
227 :
228 : void
229 0 : jsonrpc_server_handle_request(struct spdk_jsonrpc_request *request,
230 : const struct spdk_json_val *method, const struct spdk_json_val *params)
231 : {
232 0 : request->conn->server->handle_request(request, method, params);
233 0 : }
234 :
235 : void
236 0 : jsonrpc_server_handle_error(struct spdk_jsonrpc_request *request, int error)
237 : {
238 : const char *msg;
239 :
240 0 : switch (error) {
241 0 : case SPDK_JSONRPC_ERROR_PARSE_ERROR:
242 0 : msg = "Parse error";
243 0 : break;
244 :
245 0 : case SPDK_JSONRPC_ERROR_INVALID_REQUEST:
246 0 : msg = "Invalid request";
247 0 : break;
248 :
249 0 : case SPDK_JSONRPC_ERROR_METHOD_NOT_FOUND:
250 0 : msg = "Method not found";
251 0 : break;
252 :
253 0 : case SPDK_JSONRPC_ERROR_INVALID_PARAMS:
254 0 : msg = "Invalid parameters";
255 0 : break;
256 :
257 0 : case SPDK_JSONRPC_ERROR_INTERNAL_ERROR:
258 0 : msg = "Internal error";
259 0 : break;
260 :
261 0 : default:
262 0 : msg = "Error";
263 0 : break;
264 : }
265 :
266 0 : spdk_jsonrpc_send_error_response(request, error, msg);
267 0 : }
268 :
269 : static int
270 0 : jsonrpc_server_conn_recv(struct spdk_jsonrpc_server_conn *conn)
271 : {
272 : ssize_t rc, offset;
273 0 : size_t recv_avail = SPDK_JSONRPC_RECV_BUF_SIZE - conn->recv_len;
274 :
275 0 : rc = recv(conn->sockfd, conn->recv_buf + conn->recv_len, recv_avail, 0);
276 0 : if (rc == -1) {
277 0 : if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
278 0 : return 0;
279 : }
280 0 : SPDK_DEBUGLOG(rpc, "recv() failed: %s\n", spdk_strerror(errno));
281 0 : return -1;
282 : }
283 :
284 0 : if (rc == 0) {
285 0 : SPDK_DEBUGLOG(rpc, "remote closed connection\n");
286 0 : conn->closed = true;
287 0 : return 0;
288 : }
289 :
290 0 : conn->recv_len += rc;
291 :
292 0 : offset = 0;
293 : do {
294 0 : rc = jsonrpc_parse_request(conn, conn->recv_buf + offset, conn->recv_len - offset);
295 0 : if (rc < 0) {
296 0 : SPDK_ERRLOG("jsonrpc parse request failed\n");
297 0 : return -1;
298 : }
299 :
300 0 : offset += rc;
301 0 : } while (rc > 0);
302 :
303 0 : if (offset > 0) {
304 : /*
305 : * Successfully parsed a requests - move any data past the end of the
306 : * parsed requests down to the beginning.
307 : */
308 0 : assert((size_t)offset <= conn->recv_len);
309 0 : memmove(conn->recv_buf, conn->recv_buf + offset, conn->recv_len - offset);
310 0 : conn->recv_len -= offset;
311 : }
312 :
313 0 : return 0;
314 : }
315 :
316 : void
317 0 : jsonrpc_server_send_response(struct spdk_jsonrpc_request *request)
318 : {
319 0 : struct spdk_jsonrpc_server_conn *conn = request->conn;
320 :
321 0 : if (conn == NULL) {
322 : /* We cannot respond to the request, because the connection is closed. */
323 0 : SPDK_WARNLOG("Unable to send response: connection closed.\n");
324 0 : jsonrpc_free_request(request);
325 0 : return;
326 : }
327 :
328 : /* Queue the response to be sent */
329 0 : pthread_spin_lock(&conn->queue_lock);
330 0 : STAILQ_REMOVE(&conn->outstanding_queue, request, spdk_jsonrpc_request, link);
331 0 : STAILQ_INSERT_TAIL(&conn->send_queue, request, link);
332 0 : pthread_spin_unlock(&conn->queue_lock);
333 : }
334 :
335 :
336 : static int
337 0 : jsonrpc_server_conn_send(struct spdk_jsonrpc_server_conn *conn)
338 : {
339 : struct spdk_jsonrpc_request *request;
340 : ssize_t rc;
341 :
342 0 : more:
343 0 : if (conn->outstanding_requests == 0) {
344 0 : return 0;
345 : }
346 :
347 0 : if (conn->send_request == NULL) {
348 0 : conn->send_request = jsonrpc_server_dequeue_request(conn);
349 : }
350 :
351 0 : request = conn->send_request;
352 0 : if (request == NULL) {
353 : /* Nothing to send right now */
354 0 : return 0;
355 : }
356 :
357 0 : if (request->send_offset == 0) {
358 : /* A byte for the null terminator is included in the send buffer. */
359 0 : request->send_buf[request->send_len] = '\0';
360 : }
361 :
362 0 : if (request->send_len > 0) {
363 0 : rc = send(conn->sockfd, request->send_buf + request->send_offset,
364 : request->send_len, 0);
365 0 : if (rc < 0) {
366 0 : if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
367 0 : return 0;
368 : }
369 :
370 0 : SPDK_DEBUGLOG(rpc, "send() failed: %s\n", spdk_strerror(errno));
371 0 : return -1;
372 : }
373 :
374 0 : request->send_offset += rc;
375 0 : request->send_len -= rc;
376 : }
377 :
378 0 : if (request->send_len == 0) {
379 : /*
380 : * Full response has been sent.
381 : * Free it and set send_request to NULL to move on to the next queued response.
382 : */
383 0 : conn->send_request = NULL;
384 0 : jsonrpc_complete_request(request);
385 0 : goto more;
386 : }
387 :
388 0 : return 0;
389 : }
390 :
391 : int
392 0 : spdk_jsonrpc_server_poll(struct spdk_jsonrpc_server *server)
393 : {
394 : int rc;
395 : struct spdk_jsonrpc_server_conn *conn, *conn_tmp;
396 :
397 0 : TAILQ_FOREACH_SAFE(conn, &server->conns, link, conn_tmp) {
398 : /* If we can't receive and there are no outstanding requests close the connection. */
399 0 : if (conn->closed == true && conn->outstanding_requests == 0) {
400 0 : jsonrpc_server_conn_close(conn);
401 : }
402 :
403 0 : if (conn->sockfd == -1 && conn->outstanding_requests == 0) {
404 0 : jsonrpc_server_conn_remove(conn);
405 : }
406 : }
407 :
408 : /* Check listen socket */
409 0 : if (!TAILQ_EMPTY(&server->free_conns)) {
410 0 : jsonrpc_server_accept(server);
411 : }
412 :
413 0 : TAILQ_FOREACH(conn, &server->conns, link) {
414 0 : if (conn->sockfd == -1) {
415 0 : continue;
416 : }
417 :
418 0 : rc = jsonrpc_server_conn_send(conn);
419 0 : if (rc != 0) {
420 0 : jsonrpc_server_conn_close(conn);
421 0 : continue;
422 : }
423 :
424 0 : if (!conn->closed) {
425 0 : rc = jsonrpc_server_conn_recv(conn);
426 0 : if (rc != 0) {
427 0 : jsonrpc_server_conn_close(conn);
428 : }
429 : }
430 : }
431 :
432 0 : return 0;
433 : }
|