geom_gate userland utility improvements
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1238 lines
29 KiB

  1. /*-
  2. * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
  3. *
  4. * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org>
  5. * All rights reserved.
  6. * Copyright 2020 John-Mark Gurney <jmg@FreeBSD.org>
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions
  10. * are met:
  11. * 1. Redistributions of source code must retain the above copyright
  12. * notice, this list of conditions and the following disclaimer.
  13. * 2. Redistributions in binary form must reproduce the above copyright
  14. * notice, this list of conditions and the following disclaimer in the
  15. * documentation and/or other materials provided with the distribution.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
  18. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
  21. * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  22. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  23. * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  24. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  25. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  26. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27. * SUCH DAMAGE.
  28. *
  29. * $FreeBSD$
  30. */
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <fcntl.h>
  34. #include <libutil.h>
  35. #include <paths.h>
  36. #include <pthread.h>
  37. #include <pthread_np.h>
  38. #include <err.h>
  39. #include <errno.h>
  40. #include <assert.h>
  41. #include <sys/param.h>
  42. #include <sys/ioctl.h>
  43. #include <sys/queue.h>
  44. #include <sys/socket.h>
  45. #include <sys/syslog.h>
  46. #include <sys/bio.h>
  47. #include <netdb.h>
  48. #include <semaphore.h>
  49. #include <libssh2.h>
  50. #include <libssh2_sftp.h>
  51. #include <geom/gate/g_gate.h>
  52. #include "ggate.h"
  53. static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
  54. struct ggs_connection {
  55. int c_fd;
  56. int *c_didwork; /* allocated memory */
  57. LIBSSH2_SESSION *c_session;
  58. LIBSSH2_SFTP *c_sftp_session;
  59. LIBSSH2_SFTP_HANDLE *c_handle;
  60. };
  61. static struct pidfh *pfh;
  62. static const char *username;
  63. static const char *hostname;
  64. static const char *imgpath;
  65. static const char *identityfile;
  66. static const char *pubkeyfile;
  67. static const char *sshport = "22";
  68. static char *ggatessh_pidfile;
  69. static int unit = G_GATE_UNIT_AUTO;
  70. static unsigned flags = 0;
  71. static int force = 0;
  72. static unsigned queue_size = G_GATE_QUEUE_SIZE;
  73. static off_t mediasize;
  74. static unsigned sectorsize = 4096;
  75. static unsigned timeout = G_GATE_TIMEOUT;
  76. static int pushfd, popfd; /* work semaphore */
  77. static pthread_t reqtd, proctd, mediatd;
  78. static unsigned maxconnections = 32;
  79. static struct ggs_connection start_conn; /* only used once/first */
  80. struct ggs_sess_cache {
  81. LIBSSH2_SESSION *sc_ssh_session;
  82. LIBSSH2_SFTP *sc_session;
  83. LIBSSH2_SFTP_HANDLE *sc_handle;
  84. TAILQ_ENTRY(ggs_sess_cache) sc_next;
  85. };
  86. struct ggs_req {
  87. struct g_gate_ctl_io r_ggio;
  88. #define r_ssh_session r_sesscache->sc_ssh_session
  89. #define r_session r_sesscache->sc_session
  90. #define r_handle r_sesscache->sc_handle
  91. struct ggs_sess_cache *r_sesscache;
  92. size_t r_bufoff;
  93. int r_didseek;
  94. TAILQ_ENTRY(ggs_req) r_next;
  95. };
  96. static TAILQ_HEAD(ggs_reqqueue, ggs_req) procqueue =
  97. TAILQ_HEAD_INITIALIZER(procqueue);
  98. static TAILQ_HEAD(ggs_sessqueue, ggs_sess_cache) session_cache =
  99. TAILQ_HEAD_INITIALIZER(session_cache);
  100. static sem_t nconn_sem;
  101. static pthread_mutex_t procqueue_mtx;
  102. static void
  103. usage(void)
  104. {
  105. fprintf(stderr, "usage: %s create [-v] [-o <ro|wo|rw>] "
  106. "[-F pidfile] [-i identifyfile] "
  107. "[-l username] [-p port] "
  108. "[-q queue_size] [-s sectorsize] [-r nrequests] "
  109. "[-t timeout] [-u unit] <host> <path>\n", getprogname());
  110. fprintf(stderr, " %s rescue [-v] [-o <ro|wo|rw>] "
  111. "[-F pidfile] [-i identifyfile] "
  112. "[-l username] [-p port] "
  113. "[-r nrequests] <-u unit> <host> <path>\n", getprogname());
  114. fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
  115. fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
  116. exit(EXIT_FAILURE);
  117. }
  118. static void
  119. libssh2_errorx(LIBSSH2_SESSION *session, const char *info)
  120. {
  121. char *errmsg;
  122. int rc;
  123. rc = libssh2_session_last_error(session, &errmsg, NULL, 0);
  124. g_gate_xlog("%s: %s", info, errmsg);
  125. }
  126. /*
  127. * Connect to the service (or port number) on host.
  128. *
  129. * Somewhat copied from freebsd/lib/libfetch/fetch_common.c:fetch_connect.
  130. */
  131. static int
  132. tcp_connect(const char *host, const char *service, int af)
  133. {
  134. struct addrinfo hints, *sai, *sai0;
  135. int err, sd;
  136. hints = (struct addrinfo){
  137. .ai_family = af,
  138. .ai_socktype = SOCK_STREAM,
  139. .ai_flags = AI_ADDRCONFIG,
  140. };
  141. /* resolve server address */
  142. if (getaddrinfo(host, service, &hints, &sai0) == -1)
  143. return -1;
  144. if (sai0 == NULL) {
  145. errno = ENOENT;
  146. return -1;
  147. }
  148. sd = -1;
  149. err = -1;
  150. /* try each server address in turn */
  151. for (sai = sai0; sai != NULL; sai = sai->ai_next) {
  152. /* open socket */
  153. if ((sd = socket(sai->ai_family, sai->ai_socktype,
  154. sai->ai_protocol)) == -1)
  155. break;
  156. /* attempt to connect to server address */
  157. if ((err = connect(sd, sai->ai_addr, sai->ai_addrlen)) == 0)
  158. break;
  159. /* clean up before next attempt */
  160. close(sd);
  161. sd = -1;
  162. }
  163. err = errno;
  164. fflush(stdout);
  165. freeaddrinfo(sai0);
  166. /* Fully close if it was opened; otherwise just don't leak the fd. */
  167. if (err == -1 && sd >= 0)
  168. close(sd);
  169. errno = err;
  170. return sd;
  171. }
  172. static int
  173. get_open_flags()
  174. {
  175. switch (flags) {
  176. case G_GATE_FLAG_READONLY:
  177. return LIBSSH2_FXF_READ;
  178. case G_GATE_FLAG_WRITEONLY:
  179. return LIBSSH2_FXF_WRITE;
  180. default:
  181. return LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE;
  182. }
  183. }
  184. static struct ggs_connection
  185. make_connection(void)
  186. {
  187. LIBSSH2_SESSION *session;
  188. LIBSSH2_SFTP *sftp_session;
  189. LIBSSH2_SFTP_HANDLE *handle;
  190. char *tmp;
  191. int *didworkp;
  192. int sockfd;
  193. int rc;
  194. sockfd = tcp_connect(hostname, sshport, 0);
  195. if (sockfd == -1) {
  196. if (errno == ENOENT)
  197. g_gate_xlog("tcp_connect: failed to lookup %s",
  198. hostname);
  199. g_gate_xlog("tcp_connect: %s.", strerror(errno));
  200. }
  201. didworkp = malloc(sizeof *didworkp);
  202. if (didworkp == NULL)
  203. g_gate_xlog("malloc failed.");
  204. /* session = libssh2_session_init(); */
  205. session = libssh2_session_init_ex(NULL, NULL, NULL, didworkp);
  206. if (session == NULL)
  207. libssh2_errorx(session, "libssh2_session_init");
  208. if (g_gate_verbose)
  209. libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_KEX|
  210. LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|
  211. LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  212. /* XXX - libssh2_session_flag to enable compression */
  213. rc = libssh2_session_handshake(session, sockfd);
  214. if (rc)
  215. libssh2_errorx(session, "libssh2_session_handshake");
  216. libssh2_session_set_blocking(session, 1);
  217. /* XXX - known hosts handling */
  218. if (identityfile == NULL) {
  219. tmp = NULL;
  220. asprintf(&tmp, "%s/.ssh/id_rsa", getenv("HOME"));
  221. identityfile = tmp;
  222. tmp = NULL;
  223. }
  224. asprintf(&tmp, "%s.pub", identityfile);
  225. pubkeyfile = tmp;
  226. tmp = NULL;
  227. g_gate_log(LOG_DEBUG, "trying identity file: %s", identityfile);
  228. rc = libssh2_userauth_publickey_fromfile(session, username, pubkeyfile,
  229. identityfile, NULL);
  230. //rc = libssh2_userauth_password(session, "freebsd", "freebsd");
  231. if (rc) {
  232. g_gate_log(LOG_ERR, "identity file: %s", identityfile);
  233. libssh2_errorx(session, "libssh2_userauth_publickey_fromfile");
  234. }
  235. /* always need at least one */
  236. sftp_session = libssh2_sftp_init(session);
  237. if (sftp_session == NULL)
  238. g_gate_xlog("libssh2_sftp_init");
  239. handle = libssh2_sftp_open(sftp_session, imgpath, get_open_flags(), 0);
  240. if (handle == NULL) {
  241. g_gate_log(LOG_ERR, "image file: %s", imgpath);
  242. libssh2_errorx(session, "libssh2_sftp_open");
  243. }
  244. return (struct ggs_connection){
  245. .c_fd = sockfd,
  246. .c_didwork = didworkp,
  247. .c_session = session,
  248. .c_sftp_session = sftp_session,
  249. .c_handle = handle,
  250. };
  251. }
  252. /*
  253. * Resize is required to run in a thread because the resize requires
  254. * that I/O is able to complete before it can return.
  255. */
  256. static void *
  257. mediachg(void *arg __unused)
  258. {
  259. struct g_gate_ctl_modify ggiom;
  260. /* update mediasize, it may have changed */
  261. ggiom = (struct g_gate_ctl_modify){
  262. .gctl_version = G_GATE_VERSION,
  263. .gctl_unit = unit,
  264. .gctl_modify = GG_MODIFY_MEDIASIZE,
  265. .gctl_mediasize = mediasize,
  266. };
  267. g_gate_ioctl(G_GATE_CMD_MODIFY, &ggiom);
  268. g_gate_log(LOG_DEBUG, "updated ggate%d mediasize to %zd", unit,
  269. mediasize);
  270. return NULL;
  271. }
  272. static void *
  273. req_thread(void *arg __unused)
  274. {
  275. struct ggs_req *greq;
  276. static char *buf;
  277. int buflen = 1024*1024;
  278. int error;
  279. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  280. greq = NULL;
  281. for (;;) {
  282. if (greq == NULL)
  283. greq = malloc(sizeof *greq);
  284. if (buf == NULL)
  285. buf = malloc(buflen);
  286. if (greq == NULL || buf == NULL) {
  287. /* XXX */
  288. g_gate_log(LOG_ERR, "Unable to allocate memory.");
  289. exit(1);
  290. }
  291. *greq = (struct ggs_req){
  292. .r_ggio = (struct g_gate_ctl_io){
  293. .gctl_version = G_GATE_VERSION,
  294. .gctl_unit = unit,
  295. .gctl_data = buf,
  296. .gctl_length = buflen,
  297. .gctl_error = 0,
  298. },
  299. };
  300. //g_gate_log(LOG_DEBUG, "waiting for ioctl");
  301. g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio);
  302. //g_gate_log(LOG_DEBUG, "got ioctl");
  303. error = greq->r_ggio.gctl_error;
  304. switch (error) {
  305. case 0:
  306. break;
  307. case ECANCELED:
  308. /* Exit gracefully. */
  309. g_gate_close_device();
  310. exit(EXIT_SUCCESS);
  311. case ENXIO:
  312. default:
  313. g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
  314. strerror(error));
  315. }
  316. g_gate_log(LOG_DEBUG, "ggio(%p), ver: %u, unit: %d, seq: %llu, "
  317. "cmd: %u, offset: %llu, len: %llu", greq,
  318. greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit,
  319. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd,
  320. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  321. switch (greq->r_ggio.gctl_cmd) {
  322. case BIO_READ:
  323. /* use a correctly sized allocation */
  324. greq->r_ggio.gctl_data =
  325. malloc(greq->r_ggio.gctl_length);
  326. break;
  327. case BIO_WRITE:
  328. /* r_ggio takes ownership of buf now */
  329. buf = NULL;
  330. break;
  331. case BIO_DELETE:
  332. case BIO_FLUSH:
  333. greq->r_ggio.gctl_data = NULL;
  334. break;
  335. default:
  336. greq->r_ggio.gctl_error = EOPNOTSUPP;
  337. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  338. continue; /* return EOPNOTSUPP */
  339. break;
  340. }
  341. //g_gate_log(LOG_DEBUG, "waiting for slot");
  342. sem_wait(&nconn_sem);
  343. #if 0
  344. int semval;
  345. sem_getvalue(&nconn_sem, &semval);
  346. g_gate_log(LOG_DEBUG, "slots: %d", semval);
  347. #endif
  348. error = pthread_mutex_lock(&procqueue_mtx);
  349. assert(error == 0);
  350. TAILQ_INSERT_TAIL(&procqueue, greq, r_next);
  351. error = pthread_mutex_unlock(&procqueue_mtx);
  352. assert(error == 0);
  353. /* notify processing thread a request is waiting */
  354. error = write(pushfd, "T", 1);
  355. if (error != 1)
  356. g_gate_xlog("write pushfd: %d, error: %s.", error,
  357. strerror(error));
  358. /* pass ownership */
  359. greq = NULL;
  360. }
  361. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  362. return (NULL);
  363. }
  364. static const char *
  365. sftperrno_str(int err)
  366. {
  367. const char *strs[] = {
  368. [0] = "ok",
  369. [1] = "eof",
  370. [2] = "no such file",
  371. [3] = "permission denied",
  372. [4] = "failure",
  373. [5] = "bad message",
  374. [6] = "no connection",
  375. [7] = "connection lost",
  376. [8] = "op unsupported",
  377. };
  378. if (err < 0 || err >= (int)nitems(strs))
  379. return "invalid errno";
  380. return strs[err];
  381. }
  382. static int
  383. process_pending(struct ggs_reqqueue *req_pending,
  384. struct ggs_sessqueue *sessqueue)
  385. {
  386. struct ggs_req *greq, *greq2;
  387. char *errmsg;
  388. int rc;
  389. int sftperrno;
  390. int didwork;
  391. didwork = 0;
  392. /* Work on each pending request */
  393. TAILQ_FOREACH_SAFE(greq, req_pending, r_next, greq2) {
  394. again:
  395. switch (greq->r_ggio.gctl_cmd) {
  396. case BIO_READ:
  397. g_gate_log(LOG_DEBUG, "sftp_read(%p): %d(%d), rem: %d",
  398. greq, greq->r_ggio.gctl_offset,
  399. greq->r_ggio.gctl_length,
  400. greq->r_ggio.gctl_length - greq->r_bufoff);
  401. if (greq->r_didseek == 0) {
  402. libssh2_sftp_seek64(greq->r_handle,
  403. greq->r_ggio.gctl_offset);
  404. greq->r_didseek = 1;
  405. }
  406. rc = libssh2_sftp_read(greq->r_handle,
  407. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  408. greq->r_ggio.gctl_length - greq->r_bufoff);
  409. g_gate_log(LOG_DEBUG, "sftp_read ret: %d", rc);
  410. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  411. g_gate_log(LOG_ERR, "libssh2_sftp_read");
  412. break;
  413. case BIO_WRITE:
  414. g_gate_log(LOG_DEBUG, "sftp_write(%p): %d(%d), rem: %d",
  415. greq, greq->r_ggio.gctl_offset,
  416. greq->r_ggio.gctl_length,
  417. greq->r_ggio.gctl_length - greq->r_bufoff);
  418. if (greq->r_didseek == 0) {
  419. libssh2_sftp_seek64(greq->r_handle,
  420. greq->r_ggio.gctl_offset);
  421. greq->r_didseek = 1;
  422. }
  423. rc = libssh2_sftp_write(greq->r_handle,
  424. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  425. greq->r_ggio.gctl_length - greq->r_bufoff);
  426. g_gate_log(LOG_DEBUG, "sftp_write ret: %d", rc);
  427. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  428. libssh2_errorx(greq->r_ssh_session,
  429. "libssh2_sftp_write");
  430. break;
  431. case BIO_FLUSH:
  432. g_gate_log(LOG_DEBUG, "sftp_flush(%p)", greq);
  433. rc = libssh2_sftp_fsync(greq->r_handle);
  434. didwork = 1; /* assume this always does work */
  435. switch (rc) {
  436. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  437. greq->r_ggio.gctl_error = EOPNOTSUPP;
  438. goto completeio;
  439. case LIBSSH2_ERROR_EAGAIN:
  440. continue;
  441. case 0: /* success */
  442. goto completeio;
  443. default:
  444. libssh2_session_last_error(greq->r_ssh_session,
  445. &errmsg, NULL, 0);
  446. g_gate_log(LOG_ERR, "sftp_flush(%p) ret %d: %s",
  447. greq, rc, errmsg);
  448. greq->r_ggio.gctl_error = EIO;
  449. goto completeio;
  450. }
  451. /* NOTREACHABLE */
  452. break;
  453. case BIO_DELETE:
  454. g_gate_log(LOG_DEBUG, "sftp_punchhole(%p)", greq);
  455. rc = libssh2_sftp_punchhole(greq->r_handle,
  456. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  457. didwork = 1; /* assume this always does work */
  458. switch (rc) {
  459. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  460. greq->r_ggio.gctl_error = EOPNOTSUPP;
  461. sftperrno = libssh2_sftp_last_error(
  462. greq->r_session);
  463. g_gate_log(LOG_DEBUG, "sftp_punchhole(%p) errno: %s(%d)", greq,
  464. sftperrno_str(sftperrno), sftperrno);
  465. goto completeio;
  466. case LIBSSH2_ERROR_EAGAIN:
  467. continue;
  468. case 0: /* success */
  469. goto completeio;
  470. default:
  471. libssh2_session_last_error(greq->r_ssh_session,
  472. &errmsg, NULL, 0);
  473. g_gate_log(LOG_ERR, "sftp_punchhole(%p) ret %d: %s",
  474. greq, rc, errmsg);
  475. greq->r_ggio.gctl_error = EIO;
  476. goto completeio;
  477. }
  478. /* NOTREACHABLE */
  479. break;
  480. default:
  481. rc = 0;
  482. g_gate_log(LOG_ERR, "unhandled op: %d",
  483. greq->r_ggio.gctl_cmd);
  484. continue;
  485. }
  486. if (rc > 0) {
  487. didwork = 1;
  488. greq->r_bufoff += rc;
  489. /*
  490. * try again on partial read/write,
  491. * might have more data pending
  492. */
  493. if ((off_t)greq->r_bufoff != greq->r_ggio.gctl_length)
  494. goto again;
  495. }
  496. if ((off_t)greq->r_bufoff == greq->r_ggio.gctl_length) {
  497. /* complete */
  498. completeio:
  499. g_gate_log(LOG_DEBUG, "cmd complete: seq: %d, cmd: %d",
  500. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd);
  501. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  502. TAILQ_REMOVE(req_pending, greq, r_next);
  503. TAILQ_INSERT_HEAD(sessqueue, greq->r_sesscache,
  504. sc_next);
  505. free(greq->r_ggio.gctl_data);
  506. free(greq);
  507. /* release this slot */
  508. sem_post(&nconn_sem);
  509. }
  510. }
  511. return didwork;
  512. }
  513. /*
  514. * XXX - expose this, this is to try to silence the loop that can
  515. * happen when there is data waiting (oob or window info), but no
  516. * outstanding requests are pending.
  517. */
  518. int _libssh2_transport_read(LIBSSH2_SESSION *session);
  519. /*
  520. * libssh2 does not have a good way to handle detection when it's
  521. * truly time to sleep. Writing is an easy case, as if there's space
  522. * to write, and the _BLOCK_OUTBOUND flag is set, select will return
  523. * and the data will get processed.
  524. *
  525. * In the case of reading, it is more complicated, as a later request
  526. * could read data for a previous request, and there is no known way
  527. * to know when this is done. The solution I came up with is to wrap
  528. * the recv function, and continue to iterate through the pending
  529. * requests until no more data is read.
  530. */
  531. static ssize_t
  532. ggatessh_recv_libssh2_hack(libssh2_socket_t fd, void *buf,
  533. size_t len, int recv_flags, void **abstract)
  534. {
  535. int *didworkp = *abstract;
  536. ssize_t ret;
  537. ret = recv(fd, buf, len, recv_flags);
  538. if (ret > 0)
  539. *didworkp = 1;
  540. if (ret == -1 && errno == EAGAIN)
  541. return -EAGAIN;
  542. return ret;
  543. }
  544. /*
  545. * sftp session management is a bit tricky.
  546. * if there is an entry in sessioncache, use that one.
  547. * if we are waiting for a new session (gsc_pend != NULL),
  548. * establish session, then open handle
  549. * when the new session completes, process the work queue
  550. */
  551. static void *
  552. proc_thread(void *arg __unused)
  553. {
  554. char scratch[32];
  555. struct ggs_reqqueue req_pending;
  556. struct timeval to;
  557. struct ggs_sess_cache *gsc, *gsc_pending;
  558. struct ggs_req *greq;
  559. LIBSSH2_SESSION *session;
  560. int *didworkp; /* was any reads done, rescan work */
  561. fd_set fdread;
  562. fd_set fdwrite;
  563. fd_set fdexcep;
  564. int sockfd;
  565. int maxfd;
  566. int error;
  567. int dir;
  568. int rc;
  569. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  570. TAILQ_INIT(&req_pending);
  571. /* make sure we don't block on reading */
  572. fcntl(popfd, F_SETFL, O_NONBLOCK);
  573. sockfd = start_conn.c_fd;
  574. didworkp = start_conn.c_didwork;
  575. session = start_conn.c_session;
  576. gsc = malloc(sizeof *gsc);
  577. gsc->sc_ssh_session = start_conn.c_session;
  578. gsc->sc_session = start_conn.c_sftp_session;
  579. gsc->sc_handle = start_conn.c_handle;
  580. TAILQ_INSERT_HEAD(&session_cache, gsc, sc_next);
  581. gsc = NULL;
  582. gsc_pending = NULL;
  583. *didworkp = 0;
  584. libssh2_session_set_blocking(session, 0);
  585. libssh2_session_callback_set(session, LIBSSH2_CALLBACK_RECV,
  586. ggatessh_recv_libssh2_hack);
  587. for (;;) {
  588. //g_gate_log(LOG_DEBUG, "looping");
  589. if (!*didworkp) {
  590. /* setup polling loop */
  591. maxfd = -1;
  592. FD_ZERO(&fdread);
  593. FD_ZERO(&fdwrite);
  594. FD_ZERO(&fdexcep);
  595. dir = libssh2_session_block_directions(session);
  596. if (dir & LIBSSH2_SESSION_BLOCK_INBOUND ||
  597. gsc_pending != NULL)
  598. FD_SET(sockfd, &fdread);
  599. if (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
  600. FD_SET(sockfd, &fdwrite);
  601. /* add in the pop descriptor */
  602. FD_SET(popfd, &fdread);
  603. maxfd = MAX(popfd, sockfd);
  604. g_gate_log(LOG_DEBUG, "selecting: %s %s, " \
  605. "read: sockfd: %d, popfd: %d, write: sockfd: %d",
  606. (dir & LIBSSH2_SESSION_BLOCK_INBOUND) ? "inbound" :
  607. "", (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) ?
  608. "outbound" : "", FD_ISSET(sockfd, &fdread),
  609. FD_ISSET(popfd, &fdread),
  610. FD_ISSET(sockfd, &fdwrite));
  611. to = (struct timeval){ .tv_sec = 1, .tv_usec = 1000 };
  612. (void)to;
  613. rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep,
  614. NULL);
  615. switch (rc) {
  616. case -1:
  617. g_gate_log(LOG_ERR, "%s: select failed: %s",
  618. __func__, strerror(errno));
  619. break;
  620. case 0:
  621. default:
  622. g_gate_log(LOG_DEBUG, "select: %d, " \
  623. "read: sockfd: %d, popfd: %d, " \
  624. "write: sockfd: %d", rc,
  625. FD_ISSET(sockfd, &fdread),
  626. FD_ISSET(popfd, &fdread),
  627. FD_ISSET(sockfd, &fdwrite));
  628. break;
  629. }
  630. }
  631. _libssh2_transport_read(session);
  632. *didworkp = 0;
  633. /* process pending, so any completed can be reused */
  634. process_pending(&req_pending, &session_cache);
  635. if (FD_ISSET(popfd, &fdread)) {
  636. /* read off the tokens */
  637. g_gate_log(LOG_DEBUG, "popping");
  638. read(popfd, scratch, sizeof scratch);
  639. for (;;) {
  640. procreq:
  641. /* get the request */
  642. error = pthread_mutex_lock(&procqueue_mtx);
  643. assert(error == 0);
  644. greq = TAILQ_FIRST(&procqueue);
  645. g_gate_log(LOG_DEBUG, "greq: %p", greq);
  646. if (greq != NULL)
  647. TAILQ_REMOVE(&procqueue, greq, r_next);
  648. error = pthread_mutex_unlock(&procqueue_mtx);
  649. assert(error == 0);
  650. /* no more to process */
  651. if (greq == NULL)
  652. break;
  653. gsc = TAILQ_FIRST(&session_cache);
  654. if (gsc == NULL) {
  655. if (gsc_pending == NULL) {
  656. /* need new session */
  657. g_gate_log(LOG_DEBUG,
  658. "need new session");
  659. gsc_pending =
  660. malloc(sizeof *gsc);
  661. gsc_pending->sc_ssh_session =
  662. session;
  663. gsc_pending->sc_session = NULL;
  664. gsc_pending->sc_handle = NULL;
  665. }
  666. /* put back request */
  667. error =
  668. pthread_mutex_lock(&procqueue_mtx);
  669. assert(error == 0);
  670. TAILQ_INSERT_HEAD(&procqueue, greq,
  671. r_next);
  672. error = pthread_mutex_unlock(
  673. &procqueue_mtx);
  674. assert(error == 0);
  675. break;
  676. } else {
  677. /* process request */
  678. TAILQ_REMOVE(&session_cache, gsc,
  679. sc_next);
  680. greq->r_sesscache = gsc;
  681. gsc = NULL;
  682. greq->r_bufoff = 0;
  683. TAILQ_INSERT_TAIL(&req_pending, greq,
  684. r_next);
  685. greq = NULL;
  686. }
  687. }
  688. }
  689. if (gsc_pending != NULL) {
  690. /* we are creating a new session */
  691. if (gsc_pending->sc_session == NULL) {
  692. //didwork = 1;
  693. gsc_pending->sc_session =
  694. libssh2_sftp_init(session);
  695. }
  696. if (gsc_pending->sc_session != NULL) {
  697. //didwork = 1;
  698. gsc_pending->sc_handle = libssh2_sftp_open(
  699. gsc_pending->sc_session, "fstest/data.img",
  700. get_open_flags(), 0);
  701. }
  702. g_gate_log(LOG_DEBUG,
  703. "pending: session: %p, handle: %p",
  704. gsc_pending->sc_session, gsc_pending->sc_handle);
  705. /* we have a fully initalized entry, use it */
  706. if (gsc_pending->sc_handle != NULL) {
  707. g_gate_log(LOG_DEBUG, "new session created");
  708. TAILQ_INSERT_HEAD(&session_cache, gsc_pending,
  709. sc_next);
  710. gsc_pending = NULL;
  711. //didwork = 1;
  712. goto procreq;
  713. }
  714. }
  715. /* kick of any queued requests from above */
  716. process_pending(&req_pending, &session_cache);
  717. }
  718. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  719. pthread_exit(NULL);
  720. }
  721. static void
  722. ggatessh_makepidfile(void)
  723. {
  724. pid_t otherpid;
  725. if (!g_gate_verbose) {
  726. if (ggatessh_pidfile == NULL) {
  727. asprintf(&ggatessh_pidfile,
  728. _PATH_VARRUN "/ggatessh.ggate%d.pid", unit);
  729. if (ggatessh_pidfile == NULL)
  730. err(EXIT_FAILURE,
  731. "Cannot allocate memory for pidfile");
  732. }
  733. pfh = pidfile_open(ggatessh_pidfile, 0600, &otherpid);
  734. if (pfh == NULL) {
  735. if (errno == EEXIST) {
  736. errx(EXIT_FAILURE,
  737. "Daemon already running, pid: %jd.",
  738. (intmax_t)otherpid);
  739. }
  740. err(EXIT_FAILURE, "Cannot open/create pidfile");
  741. }
  742. }
  743. }
  744. static void
  745. mydaemon(void)
  746. {
  747. if (g_gate_verbose > 0)
  748. return;
  749. if (daemon(0, 0) == 0)
  750. return;
  751. if (action == CREATE)
  752. g_gate_destroy(unit, 1);
  753. err(EXIT_FAILURE, "Cannot daemonize");
  754. }
  755. static int
  756. g_gatessh_connect(void)
  757. {
  758. struct ggs_connection conn;
  759. LIBSSH2_SFTP_ATTRIBUTES attrs;
  760. int rc;
  761. /* get the remote's size */
  762. conn = make_connection();
  763. rc = libssh2_sftp_fstat(conn.c_handle, &attrs);
  764. /* only allow regular and char devices */
  765. if (!(LIBSSH2_SFTP_S_ISREG(attrs.flags) ||
  766. !LIBSSH2_SFTP_S_ISCHR(attrs.flags))) {
  767. g_gate_xlog("remote file not a regular file");
  768. }
  769. mediasize = attrs.filesize;
  770. g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize);
  771. start_conn = conn; /* cache to use later */
  772. return 1;
  773. }
  774. static void
  775. g_gatessh_start(void)
  776. {
  777. int filedes[2];
  778. int error;
  779. pipe(filedes);
  780. pushfd = filedes[1];
  781. popfd = filedes[0];
  782. error = pthread_mutex_init(&procqueue_mtx, NULL);
  783. if (error != 0) {
  784. g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
  785. strerror(error));
  786. }
  787. sem_init(&nconn_sem, 0, maxconnections);
  788. error = pthread_create(&proctd, NULL, proc_thread, NULL);
  789. if (error != 0) {
  790. g_gate_destroy(unit, 1); /* XXX - remove */
  791. g_gate_xlog("pthread_create(proc_thread): %s.",
  792. strerror(error));
  793. }
  794. pthread_set_name_np(proctd, "proc");
  795. reqtd = pthread_self();
  796. pthread_set_name_np(reqtd, "req");
  797. req_thread(NULL);
  798. /* Disconnected. */
  799. close(pushfd);
  800. close(popfd);
  801. }
  802. static void
  803. signop(int sig __unused)
  804. {
  805. /* Do nothing. */
  806. }
  807. static void
  808. g_gatessh_loop(void)
  809. {
  810. struct g_gate_ctl_cancel ggioc;
  811. signal(SIGUSR1, signop);
  812. for (;;) {
  813. g_gatessh_start();
  814. g_gate_log(LOG_NOTICE, "Disconnected [%s@%s:%s]. Connecting...",
  815. username, hostname, imgpath);
  816. ggioc.gctl_version = G_GATE_VERSION;
  817. ggioc.gctl_unit = unit;
  818. ggioc.gctl_seq = 0;
  819. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  820. }
  821. }
  822. static void
  823. g_gatessh_create(void)
  824. {
  825. struct g_gate_ctl_create ggioc;
  826. if (!g_gatessh_connect())
  827. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  828. /*
  829. * Ok, got both sockets, time to create provider.
  830. */
  831. memset(&ggioc, 0, sizeof(ggioc));
  832. ggioc.gctl_version = G_GATE_VERSION;
  833. ggioc.gctl_mediasize = mediasize;
  834. ggioc.gctl_sectorsize = sectorsize;
  835. ggioc.gctl_flags = flags;
  836. ggioc.gctl_maxcount = queue_size;
  837. ggioc.gctl_timeout = timeout;
  838. ggioc.gctl_unit = unit;
  839. snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s@%s:%s",
  840. username, hostname, imgpath);
  841. g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
  842. if (unit == -1) {
  843. printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
  844. fflush(stdout);
  845. }
  846. unit = ggioc.gctl_unit;
  847. ggatessh_makepidfile();
  848. mydaemon();
  849. if (pfh != NULL)
  850. pidfile_write(pfh);
  851. g_gatessh_loop();
  852. }
  853. static void
  854. g_gatessh_rescue(void)
  855. {
  856. struct g_gate_ctl_cancel ggioc;
  857. int error;
  858. g_gate_log(LOG_ERR, "a");
  859. if (!g_gatessh_connect())
  860. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  861. g_gate_log(LOG_ERR, "b");
  862. ggioc = (struct g_gate_ctl_cancel){
  863. .gctl_version = G_GATE_VERSION,
  864. .gctl_unit = unit,
  865. .gctl_seq = 0,
  866. };
  867. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  868. ggatessh_makepidfile();
  869. mydaemon();
  870. pidfile_write(pfh);
  871. error = pthread_create(&mediatd, NULL, mediachg, NULL);
  872. if (error != 0)
  873. g_gate_xlog("unable to create mediasize change thread",
  874. strerror(errno));
  875. g_gatessh_loop();
  876. }
  877. /*
  878. * handle two methods of specifying things.
  879. * if only one arg, in the future split it how ssh does:
  880. * [user[:password]@]<host>:<img>
  881. * and URI:
  882. * sftp://[user[:password]@]<host>[:<port>]/<img>
  883. *
  884. * If both are specified, it's the same as above, but split
  885. * on the :.
  886. */
  887. static void
  888. handle_params(int argc, char *argv[])
  889. {
  890. if (username == NULL) {
  891. username = getenv("USER");
  892. if (username == NULL) {
  893. err(EXIT_FAILURE,
  894. "USER environment variable not present, set, "
  895. "or specify via -l argument.");
  896. }
  897. }
  898. if (argc != 2)
  899. usage();
  900. hostname = argv[0];
  901. imgpath = argv[1];
  902. }
  903. int
  904. main(int argc, char *argv[])
  905. {
  906. int rc;
  907. if (argc < 2)
  908. usage();
  909. if (strcasecmp(argv[1], "create") == 0)
  910. action = CREATE;
  911. else if (strcasecmp(argv[1], "destroy") == 0)
  912. action = DESTROY;
  913. else if (strcasecmp(argv[1], "list") == 0)
  914. action = LIST;
  915. else if (strcasecmp(argv[1], "rescue") == 0)
  916. action = RESCUE;
  917. else
  918. usage();
  919. argc -= 1;
  920. argv += 1;
  921. for (;;) {
  922. int ch;
  923. ch = getopt(argc, argv, "fF:i:l:o:p:q:r:s:t:u:v");
  924. if (ch == -1)
  925. break;
  926. switch (ch) {
  927. case 'f':
  928. if (action != DESTROY)
  929. usage();
  930. force = 1;
  931. break;
  932. case 'F':
  933. ggatessh_pidfile = optarg;
  934. break;
  935. case 'i':
  936. identityfile = optarg;
  937. break;
  938. case 'l':
  939. username = optarg;
  940. break;
  941. case 'o':
  942. if (action != CREATE && action != RESCUE)
  943. usage();
  944. if (strcasecmp("ro", optarg) == 0)
  945. flags = G_GATE_FLAG_READONLY;
  946. else if (strcasecmp("wo", optarg) == 0)
  947. flags = G_GATE_FLAG_WRITEONLY;
  948. else if (strcasecmp("rw", optarg) == 0)
  949. flags = 0;
  950. else {
  951. errx(EXIT_FAILURE,
  952. "Invalid argument for '-o' option.");
  953. }
  954. break;
  955. case 'p':
  956. sshport = optarg;
  957. break;
  958. case 'q':
  959. if (action != CREATE)
  960. usage();
  961. errno = 0;
  962. queue_size = strtoul(optarg, NULL, 10);
  963. if (queue_size == 0 && errno != 0)
  964. errx(EXIT_FAILURE, "Invalid queue_size.");
  965. break;
  966. case 'r':
  967. if (action != CREATE && action != RESCUE)
  968. usage();
  969. errno = 0;
  970. maxconnections = strtoul(optarg, NULL, 10);
  971. if (maxconnections == 0 && errno != 0)
  972. errx(EXIT_FAILURE, "Invalid queue_size.");
  973. break;
  974. case 's':
  975. if (action != CREATE)
  976. usage();
  977. errno = 0;
  978. sectorsize = strtoul(optarg, NULL, 10);
  979. if (sectorsize == 0 && errno != 0)
  980. errx(EXIT_FAILURE, "Invalid sectorsize.");
  981. break;
  982. case 't':
  983. if (action != CREATE)
  984. usage();
  985. errno = 0;
  986. timeout = strtoul(optarg, NULL, 10);
  987. if (timeout == 0 && errno != 0)
  988. errx(EXIT_FAILURE, "Invalid timeout.");
  989. break;
  990. case 'u':
  991. errno = 0;
  992. unit = strtol(optarg, NULL, 10);
  993. if (unit == 0 && errno != 0)
  994. errx(EXIT_FAILURE, "Invalid unit number.");
  995. break;
  996. case 'v':
  997. if (action == DESTROY)
  998. usage();
  999. g_gate_verbose++;
  1000. break;
  1001. default:
  1002. usage();
  1003. }
  1004. }
  1005. argc -= optind;
  1006. argv += optind;
  1007. g_gate_log(LOG_DEBUG, "libssh2_init");
  1008. rc = libssh2_init(0);
  1009. if (rc != 0) {
  1010. fprintf(stderr, "libssh2 initialization failed (%d)\n", rc);
  1011. return 1;
  1012. }
  1013. switch (action) {
  1014. case CREATE:
  1015. if (argc < 1 || argc > 2)
  1016. usage();
  1017. handle_params(argc, argv);
  1018. g_gate_load_module();
  1019. g_gate_open_device();
  1020. g_gatessh_create();
  1021. break;
  1022. case DESTROY:
  1023. if (argc != 0)
  1024. usage();
  1025. if (unit == -1) {
  1026. fprintf(stderr, "Required unit number.\n");
  1027. usage();
  1028. }
  1029. g_gate_verbose = 1;
  1030. g_gate_open_device();
  1031. g_gate_destroy(unit, force);
  1032. break;
  1033. case LIST:
  1034. g_gate_list(unit, g_gate_verbose);
  1035. break;
  1036. case RESCUE:
  1037. if (argc < 1 || argc > 2)
  1038. usage();
  1039. if (unit == -1) {
  1040. fprintf(stderr, "Required unit number.\n");
  1041. usage();
  1042. }
  1043. handle_params(argc, argv);
  1044. g_gate_open_device();
  1045. g_gatessh_rescue();
  1046. break;
  1047. case UNSET:
  1048. default:
  1049. usage();
  1050. }
  1051. g_gate_close_device();
  1052. exit(EXIT_SUCCESS);
  1053. }