Line data Source code
1 : /* SPDX-License-Identifier: BSD-3-Clause
2 : * Copyright (C) 2018 Intel Corporation. All rights reserved.
3 : * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
4 : */
5 :
6 : /** \file
7 : * TCP network implementation abstraction layer
8 : */
9 :
10 : #ifndef SPDK_INTERNAL_SOCK_H
11 : #define SPDK_INTERNAL_SOCK_H
12 :
13 : #include "spdk/stdinc.h"
14 : #include "spdk/sock.h"
15 : #include "spdk/queue.h"
16 : #include "spdk/likely.h"
17 : #include "spdk/log.h"
18 : #include "spdk/trace.h"
19 : #include "spdk_internal/trace_defs.h"
20 :
21 : #ifdef __cplusplus
22 : extern "C" {
23 : #endif
24 :
25 : #define MAX_EVENTS_PER_POLL 32
26 : #define DEFAULT_SOCK_PRIORITY 0
27 : #define MIN_SOCK_PIPE_SIZE 1024
28 : #define DEFAULT_SO_RCVBUF_SIZE (2 * 1024 * 1024)
29 : #define DEFAULT_SO_SNDBUF_SIZE (2 * 1024 * 1024)
30 : #define MIN_SO_RCVBUF_SIZE (4 * 1024)
31 : #define MIN_SO_SNDBUF_SIZE (4 * 1024)
32 : #define IOV_BATCH_SIZE 64
33 :
34 : struct spdk_sock {
35 : struct spdk_net_impl *net_impl;
36 : struct spdk_sock_opts opts;
37 : struct spdk_sock_group_impl *group_impl;
38 : TAILQ_ENTRY(spdk_sock) link;
39 :
40 : TAILQ_HEAD(, spdk_sock_request) queued_reqs;
41 : TAILQ_HEAD(, spdk_sock_request) pending_reqs;
42 : struct spdk_sock_request *read_req;
43 : int queued_iovcnt;
44 : int cb_cnt;
45 : spdk_sock_cb cb_fn;
46 : void *cb_arg;
47 : struct {
48 : uint8_t closed : 1;
49 : uint8_t reserved : 7;
50 : } flags;
51 : struct spdk_sock_impl_opts impl_opts;
52 : };
53 :
54 : struct spdk_sock_group_provided_buf {
55 : size_t len;
56 : void *ctx;
57 : STAILQ_ENTRY(spdk_sock_group_provided_buf) link;
58 : };
59 :
60 : struct spdk_sock_group {
61 : STAILQ_HEAD(, spdk_sock_group_impl) group_impls;
62 : STAILQ_HEAD(, spdk_sock_group_provided_buf) pool;
63 : void *ctx;
64 : };
65 :
66 : struct spdk_sock_group_impl {
67 : struct spdk_net_impl *net_impl;
68 : struct spdk_sock_group *group;
69 : TAILQ_HEAD(, spdk_sock) socks;
70 : STAILQ_ENTRY(spdk_sock_group_impl) link;
71 : };
72 :
73 : struct spdk_sock_map {
74 : STAILQ_HEAD(, spdk_sock_placement_id_entry) entries;
75 : pthread_mutex_t mtx;
76 : };
77 :
78 : struct spdk_net_impl {
79 : const char *name;
80 :
81 : int (*getaddr)(struct spdk_sock *sock, char *saddr, int slen, uint16_t *sport, char *caddr,
82 : int clen, uint16_t *cport);
83 : const char *(*get_interface_name)(struct spdk_sock *sock);
84 : int32_t (*get_numa_id)(struct spdk_sock *sock);
85 : struct spdk_sock *(*connect)(const char *ip, int port, struct spdk_sock_opts *opts);
86 : struct spdk_sock *(*listen)(const char *ip, int port, struct spdk_sock_opts *opts);
87 : struct spdk_sock *(*accept)(struct spdk_sock *sock);
88 : int (*close)(struct spdk_sock *sock);
89 : ssize_t (*recv)(struct spdk_sock *sock, void *buf, size_t len);
90 : ssize_t (*readv)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
91 : ssize_t (*writev)(struct spdk_sock *sock, struct iovec *iov, int iovcnt);
92 :
93 : int (*recv_next)(struct spdk_sock *sock, void **buf, void **ctx);
94 : void (*writev_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
95 : void (*readv_async)(struct spdk_sock *sock, struct spdk_sock_request *req);
96 : int (*flush)(struct spdk_sock *sock);
97 :
98 : int (*set_recvlowat)(struct spdk_sock *sock, int nbytes);
99 : int (*set_recvbuf)(struct spdk_sock *sock, int sz);
100 : int (*set_sendbuf)(struct spdk_sock *sock, int sz);
101 :
102 : bool (*is_ipv6)(struct spdk_sock *sock);
103 : bool (*is_ipv4)(struct spdk_sock *sock);
104 : bool (*is_connected)(struct spdk_sock *sock);
105 :
106 : struct spdk_sock_group_impl *(*group_impl_get_optimal)(struct spdk_sock *sock,
107 : struct spdk_sock_group_impl *hint);
108 : struct spdk_sock_group_impl *(*group_impl_create)(void);
109 : int (*group_impl_add_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
110 : int (*group_impl_remove_sock)(struct spdk_sock_group_impl *group, struct spdk_sock *sock);
111 : int (*group_impl_poll)(struct spdk_sock_group_impl *group, int max_events,
112 : struct spdk_sock **socks);
113 : int (*group_impl_register_interrupt)(struct spdk_sock_group_impl *group, uint32_t events,
114 : spdk_interrupt_fn fn, void *arg, const char *name);
115 : void (*group_impl_unregister_interrupt)(struct spdk_sock_group_impl *group);
116 : int (*group_impl_close)(struct spdk_sock_group_impl *group);
117 :
118 : int (*get_opts)(struct spdk_sock_impl_opts *opts, size_t *len);
119 : int (*set_opts)(const struct spdk_sock_impl_opts *opts, size_t len);
120 :
121 : STAILQ_ENTRY(spdk_net_impl) link;
122 : };
123 :
124 : void spdk_net_impl_register(struct spdk_net_impl *impl);
125 :
126 : #define SPDK_NET_IMPL_REGISTER(name, impl) \
127 : static void __attribute__((constructor)) net_impl_register_##name(void) \
128 : { \
129 : spdk_net_impl_register(impl); \
130 : }
131 :
132 : #define SPDK_NET_IMPL_REGISTER_DEFAULT(name, impl) \
133 : static void __attribute__((constructor)) net_impl_register_default_##name(void) \
134 : { \
135 : spdk_net_impl_register(impl); \
136 : spdk_sock_set_default_impl(SPDK_STRINGIFY(name)); \
137 : }
138 :
139 : size_t spdk_sock_group_get_buf(struct spdk_sock_group *group, void **buf, void **ctx);
140 :
141 : static inline void
142 18 : spdk_sock_request_queue(struct spdk_sock *sock, struct spdk_sock_request *req)
143 : {
144 18 : assert(req->internal.curr_list == NULL);
145 18 : if (spdk_trace_tpoint_enabled(TRACE_SOCK_REQ_QUEUE)) {
146 0 : uint64_t len = 0;
147 : int i;
148 :
149 0 : for (i = 0; i < req->iovcnt; i++) {
150 0 : len += SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
151 : }
152 0 : spdk_trace_record(TRACE_SOCK_REQ_QUEUE, 0, len, (uintptr_t)req, (uintptr_t)req->cb_arg);
153 : }
154 18 : TAILQ_INSERT_TAIL(&sock->queued_reqs, req, internal.link);
155 : #ifdef DEBUG
156 18 : req->internal.curr_list = &sock->queued_reqs;
157 : #endif
158 18 : sock->queued_iovcnt += req->iovcnt;
159 18 : }
160 :
161 : static inline void
162 15 : spdk_sock_request_pend(struct spdk_sock *sock, struct spdk_sock_request *req)
163 : {
164 15 : assert(req->internal.curr_list == &sock->queued_reqs);
165 15 : spdk_trace_record(TRACE_SOCK_REQ_PEND, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
166 15 : TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
167 15 : assert(sock->queued_iovcnt >= req->iovcnt);
168 15 : sock->queued_iovcnt -= req->iovcnt;
169 15 : TAILQ_INSERT_TAIL(&sock->pending_reqs, req, internal.link);
170 : #ifdef DEBUG
171 15 : req->internal.curr_list = &sock->pending_reqs;
172 : #endif
173 15 : }
174 :
175 : static inline int
176 15 : spdk_sock_request_complete(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
177 : {
178 : bool closed;
179 15 : int rc = 0;
180 :
181 15 : spdk_trace_record(TRACE_SOCK_REQ_COMPLETE, 0, 0, (uintptr_t)req, (uintptr_t)req->cb_arg);
182 15 : req->internal.offset = 0;
183 15 : req->internal.is_zcopy = 0;
184 :
185 15 : closed = sock->flags.closed;
186 15 : sock->cb_cnt++;
187 15 : req->cb_fn(req->cb_arg, err);
188 15 : assert(sock->cb_cnt > 0);
189 15 : sock->cb_cnt--;
190 :
191 15 : if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
192 : /* The user closed the socket in response to a callback above. */
193 1 : rc = -1;
194 1 : spdk_sock_close(&sock);
195 : }
196 :
197 15 : return rc;
198 : }
199 :
200 : static inline int
201 15 : spdk_sock_request_put(struct spdk_sock *sock, struct spdk_sock_request *req, int err)
202 : {
203 15 : assert(req->internal.curr_list == &sock->pending_reqs);
204 15 : TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
205 : #ifdef DEBUG
206 15 : req->internal.curr_list = NULL;
207 : #endif
208 15 : return spdk_sock_request_complete(sock, req, err);
209 : }
210 :
211 : static inline int
212 35 : spdk_sock_abort_requests(struct spdk_sock *sock)
213 : {
214 : struct spdk_sock_request *req;
215 : bool closed;
216 35 : int rc = 0;
217 :
218 35 : closed = sock->flags.closed;
219 35 : sock->cb_cnt++;
220 :
221 35 : req = TAILQ_FIRST(&sock->pending_reqs);
222 35 : while (req) {
223 0 : assert(req->internal.curr_list == &sock->pending_reqs);
224 0 : TAILQ_REMOVE(&sock->pending_reqs, req, internal.link);
225 : #ifdef DEBUG
226 0 : req->internal.curr_list = NULL;
227 : #endif
228 :
229 0 : req->cb_fn(req->cb_arg, -ECANCELED);
230 :
231 0 : req = TAILQ_FIRST(&sock->pending_reqs);
232 : }
233 :
234 35 : req = TAILQ_FIRST(&sock->queued_reqs);
235 36 : while (req) {
236 1 : assert(req->internal.curr_list == &sock->queued_reqs);
237 1 : TAILQ_REMOVE(&sock->queued_reqs, req, internal.link);
238 : #ifdef DEBUG
239 1 : req->internal.curr_list = NULL;
240 : #endif
241 :
242 1 : assert(sock->queued_iovcnt >= req->iovcnt);
243 1 : sock->queued_iovcnt -= req->iovcnt;
244 :
245 1 : req->cb_fn(req->cb_arg, -ECANCELED);
246 :
247 1 : req = TAILQ_FIRST(&sock->queued_reqs);
248 : }
249 :
250 35 : req = sock->read_req;
251 35 : if (req != NULL) {
252 0 : sock->read_req = NULL;
253 0 : req->cb_fn(req->cb_arg, -ECANCELED);
254 : }
255 35 : assert(sock->cb_cnt > 0);
256 35 : sock->cb_cnt--;
257 :
258 35 : assert(TAILQ_EMPTY(&sock->queued_reqs));
259 35 : assert(TAILQ_EMPTY(&sock->pending_reqs));
260 :
261 35 : if (sock->cb_cnt == 0 && !closed && sock->flags.closed) {
262 : /* The user closed the socket in response to a callback above. */
263 0 : rc = -1;
264 0 : spdk_sock_close(&sock);
265 : }
266 :
267 35 : return rc;
268 : }
269 :
270 : static inline int
271 22 : spdk_sock_prep_req(struct spdk_sock_request *req, struct iovec *iovs, int index,
272 : uint64_t *num_bytes)
273 : {
274 : unsigned int offset;
275 : int iovcnt, i;
276 :
277 22 : assert(index < IOV_BATCH_SIZE);
278 22 : offset = req->internal.offset;
279 22 : iovcnt = index;
280 :
281 70 : for (i = 0; i < req->iovcnt; i++) {
282 : /* Consume any offset first */
283 48 : if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
284 1 : offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
285 1 : continue;
286 : }
287 :
288 47 : iovs[iovcnt].iov_base = (uint8_t *)SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
289 47 : iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
290 47 : if (num_bytes != NULL) {
291 47 : *num_bytes += iovs[iovcnt].iov_len;
292 : }
293 :
294 47 : iovcnt++;
295 47 : offset = 0;
296 :
297 47 : if (iovcnt >= IOV_BATCH_SIZE) {
298 0 : break;
299 : }
300 : }
301 :
302 22 : return iovcnt;
303 : }
304 :
305 : static inline int
306 38 : spdk_sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
307 : struct spdk_sock_request **last_req, int *flags)
308 : {
309 : int iovcnt;
310 : struct spdk_sock_request *req;
311 38 : uint64_t total = 0;
312 :
313 : /* Gather an iov */
314 38 : iovcnt = index;
315 38 : if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
316 0 : goto end;
317 : }
318 :
319 38 : if (last_req != NULL && *last_req != NULL) {
320 0 : req = TAILQ_NEXT(*last_req, internal.link);
321 : } else {
322 38 : req = TAILQ_FIRST(&_sock->queued_reqs);
323 : }
324 :
325 60 : while (req) {
326 22 : iovcnt = spdk_sock_prep_req(req, iovs, iovcnt, &total);
327 22 : if (iovcnt >= IOV_BATCH_SIZE) {
328 0 : break;
329 : }
330 :
331 22 : if (last_req != NULL) {
332 0 : *last_req = req;
333 : }
334 22 : req = TAILQ_NEXT(req, internal.link);
335 : }
336 :
337 38 : end:
338 :
339 : #if defined(MSG_ZEROCOPY)
340 : /* if data size < zerocopy_threshold, remove MSG_ZEROCOPY flag */
341 38 : if (total < _sock->impl_opts.zerocopy_threshold && flags != NULL) {
342 0 : *flags = *flags & (~MSG_ZEROCOPY);
343 : }
344 : #endif
345 :
346 38 : return iovcnt;
347 : }
348 :
349 : static inline void
350 24 : spdk_sock_get_placement_id(int fd, enum spdk_placement_mode mode, int *placement_id)
351 : {
352 24 : *placement_id = -1;
353 :
354 24 : switch (mode) {
355 24 : case PLACEMENT_NONE:
356 24 : break;
357 0 : case PLACEMENT_MARK:
358 : case PLACEMENT_NAPI: {
359 : #if defined(SO_INCOMING_NAPI_ID)
360 0 : socklen_t len = sizeof(int);
361 :
362 0 : int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &len);
363 0 : if (rc == -1) {
364 0 : SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
365 0 : assert(false);
366 : }
367 : #endif
368 0 : break;
369 : }
370 0 : case PLACEMENT_CPU: {
371 : #if defined(SO_INCOMING_CPU)
372 0 : socklen_t len = sizeof(int);
373 :
374 0 : int rc = getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, placement_id, &len);
375 0 : if (rc == -1) {
376 0 : SPDK_ERRLOG("getsockopt() failed: %s\n", strerror(errno));
377 0 : assert(false);
378 : }
379 : #endif
380 0 : break;
381 : }
382 0 : default:
383 0 : break;
384 : }
385 24 : }
386 :
387 : /**
388 : * Insert a group into the placement map.
389 : * If the group is already in the map, take a reference.
390 : */
391 : int spdk_sock_map_insert(struct spdk_sock_map *map, int placement_id,
392 : struct spdk_sock_group_impl *group_impl);
393 :
394 : /**
395 : * Release a reference for the given placement_id. If the reference count goes to 0, the
396 : * entry will no longer be associated with a group.
397 : */
398 : void spdk_sock_map_release(struct spdk_sock_map *map, int placement_id);
399 :
400 : /**
401 : * Look up the group for the given placement_id.
402 : */
403 : int spdk_sock_map_lookup(struct spdk_sock_map *map, int placement_id,
404 : struct spdk_sock_group_impl **group_impl, struct spdk_sock_group_impl *hint);
405 :
406 : /**
407 : * Find a placement id with no associated group
408 : */
409 : int spdk_sock_map_find_free(struct spdk_sock_map *map);
410 :
411 : /**
412 : * Clean up all memory associated with the given map
413 : */
414 : void spdk_sock_map_cleanup(struct spdk_sock_map *map);
415 :
416 : #ifdef __cplusplus
417 : }
418 : #endif
419 :
420 : #endif /* SPDK_INTERNAL_SOCK_H */
|