geom_gate userland utility improvements
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1188 lines
27 KiB

  1. /*-
  2. * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
  3. *
  4. * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org>
  5. * All rights reserved.
  6. * Copyright 2020 John-Mark Gurney <jmg@FreeBSD.org>
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions
  10. * are met:
  11. * 1. Redistributions of source code must retain the above copyright
  12. * notice, this list of conditions and the following disclaimer.
  13. * 2. Redistributions in binary form must reproduce the above copyright
  14. * notice, this list of conditions and the following disclaimer in the
  15. * documentation and/or other materials provided with the distribution.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
  18. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
  21. * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  22. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  23. * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  24. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  25. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  26. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27. * SUCH DAMAGE.
  28. *
  29. * $FreeBSD$
  30. */
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <fcntl.h>
  34. #include <libutil.h>
  35. #include <paths.h>
  36. #include <pthread.h>
  37. #include <pthread_np.h>
  38. #include <err.h>
  39. #include <errno.h>
  40. #include <assert.h>
  41. #include <sys/param.h>
  42. #include <sys/ioctl.h>
  43. #include <sys/queue.h>
  44. #include <sys/socket.h>
  45. #include <sys/syslog.h>
  46. #include <sys/bio.h>
  47. #include <netdb.h>
  48. #include <semaphore.h>
  49. #include <libssh2.h>
  50. #include <libssh2_sftp.h>
  51. #include <geom/gate/g_gate.h>
  52. #include "ggate.h"
  53. static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
  54. struct ggs_connection {
  55. int c_fd;
  56. LIBSSH2_SESSION *c_session;
  57. LIBSSH2_SFTP *c_sftp_session;
  58. LIBSSH2_SFTP_HANDLE *c_handle;
  59. };
  60. static struct pidfh *pfh;
  61. static const char *username;
  62. static const char *hostname;
  63. static const char *imgpath;
  64. static const char *identityfile;
  65. static const char *pubkeyfile;
  66. static const char *sshport = "22";
  67. static char *ggatessh_pidfile;
  68. static int unit = G_GATE_UNIT_AUTO;
  69. static unsigned flags = 0;
  70. static int force = 0;
  71. static unsigned queue_size = G_GATE_QUEUE_SIZE;
  72. static off_t mediasize;
  73. static unsigned sectorsize = 4096;
  74. static unsigned timeout = G_GATE_TIMEOUT;
  75. static int pushfd, popfd; /* work semaphore */
  76. static pthread_t reqtd, proctd, mediatd;
  77. static unsigned maxconnections = 32;
  78. static struct ggs_connection start_conn; /* only used once/first */
  79. struct ggs_sess_cache {
  80. LIBSSH2_SESSION *sc_ssh_session;
  81. LIBSSH2_SFTP *sc_session;
  82. LIBSSH2_SFTP_HANDLE *sc_handle;
  83. TAILQ_ENTRY(ggs_sess_cache) sc_next;
  84. };
  85. struct ggs_req {
  86. struct g_gate_ctl_io r_ggio;
  87. #define r_ssh_session r_sesscache->sc_ssh_session
  88. #define r_session r_sesscache->sc_session
  89. #define r_handle r_sesscache->sc_handle
  90. struct ggs_sess_cache *r_sesscache;
  91. size_t r_bufoff;
  92. int r_didseek;
  93. TAILQ_ENTRY(ggs_req) r_next;
  94. };
  95. static TAILQ_HEAD(ggs_reqqueue, ggs_req) procqueue =
  96. TAILQ_HEAD_INITIALIZER(procqueue);
  97. static TAILQ_HEAD(ggs_sessqueue, ggs_sess_cache) session_cache =
  98. TAILQ_HEAD_INITIALIZER(session_cache);
  99. static sem_t nconn_sem;
  100. static pthread_mutex_t procqueue_mtx;
  101. static void
  102. usage(void)
  103. {
  104. fprintf(stderr, "usage: %s create [-v] [-o <ro|wo|rw>] "
  105. "[-F pidfile] [-i identifyfile] "
  106. "[-l username] [-p port] "
  107. "[-q queue_size] [-s sectorsize] [-r nrequests] "
  108. "[-t timeout] [-u unit] <host> <path>\n", getprogname());
  109. fprintf(stderr, " %s rescue [-v] [-o <ro|wo|rw>] "
  110. "[-F pidfile] [-i identifyfile] "
  111. "[-l username] [-p port] "
  112. "[-r nrequests] <-u unit> <host> <path>\n", getprogname());
  113. fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
  114. fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
  115. exit(EXIT_FAILURE);
  116. }
  117. static void
  118. libssh2_errorx(LIBSSH2_SESSION *session, const char *info)
  119. {
  120. char *errmsg;
  121. int rc;
  122. rc = libssh2_session_last_error(session, &errmsg, NULL, 0);
  123. g_gate_xlog("%s: %s", info, errmsg);
  124. }
  125. /*
  126. * Connect to the service (or port number) on host.
  127. *
  128. * Somewhat copied from freebsd/lib/libfetch/fetch_common.c:fetch_connect.
  129. */
  130. static int
  131. tcp_connect(const char *host, const char *service, int af)
  132. {
  133. struct addrinfo hints, *sai, *sai0;
  134. int err, sd;
  135. hints = (struct addrinfo){
  136. .ai_family = af,
  137. .ai_socktype = SOCK_STREAM,
  138. .ai_flags = AI_ADDRCONFIG,
  139. };
  140. /* resolve server address */
  141. if (getaddrinfo(host, service, &hints, &sai0) == -1)
  142. return -1;
  143. if (sai0 == NULL) {
  144. errno = ENOENT;
  145. return -1;
  146. }
  147. sd = -1;
  148. err = -1;
  149. /* try each server address in turn */
  150. for (sai = sai0; sai != NULL; sai = sai->ai_next) {
  151. /* open socket */
  152. if ((sd = socket(sai->ai_family, sai->ai_socktype,
  153. sai->ai_protocol)) == -1)
  154. break;
  155. /* attempt to connect to server address */
  156. if ((err = connect(sd, sai->ai_addr, sai->ai_addrlen)) == 0)
  157. break;
  158. /* clean up before next attempt */
  159. close(sd);
  160. sd = -1;
  161. }
  162. err = errno;
  163. fflush(stdout);
  164. freeaddrinfo(sai0);
  165. /* Fully close if it was opened; otherwise just don't leak the fd. */
  166. if (err == -1 && sd >= 0)
  167. close(sd);
  168. errno = err;
  169. return sd;
  170. }
  171. static int
  172. get_open_flags()
  173. {
  174. switch (flags) {
  175. case G_GATE_FLAG_READONLY:
  176. return LIBSSH2_FXF_READ;
  177. case G_GATE_FLAG_WRITEONLY:
  178. return LIBSSH2_FXF_WRITE;
  179. default:
  180. return LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE;
  181. }
  182. }
  183. static struct ggs_connection
  184. make_connection(void)
  185. {
  186. LIBSSH2_SESSION *session;
  187. LIBSSH2_SFTP *sftp_session;
  188. LIBSSH2_SFTP_HANDLE *handle;
  189. char *tmp;
  190. int sockfd;
  191. int rc;
  192. sockfd = tcp_connect(hostname, sshport, 0);
  193. if (sockfd == -1) {
  194. if (errno == ENOENT)
  195. g_gate_xlog("tcp_connect: failed to lookup %s",
  196. hostname);
  197. g_gate_xlog("tcp_connect: %s.", strerror(errno));
  198. }
  199. session = libssh2_session_init();
  200. if (session == NULL)
  201. libssh2_errorx(session, "libssh2_session_init");
  202. if (g_gate_verbose)
  203. libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_KEX|
  204. LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|
  205. LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  206. /* XXX - libssh2_session_flag to enable compression */
  207. rc = libssh2_session_handshake(session, sockfd);
  208. if (rc)
  209. libssh2_errorx(session, "libssh2_session_handshake");
  210. libssh2_session_set_blocking(session, 1);
  211. /* XXX - known hosts handling */
  212. if (identityfile == NULL) {
  213. tmp = NULL;
  214. asprintf(&tmp, "%s/.ssh/id_rsa", getenv("HOME"));
  215. identityfile = tmp;
  216. tmp = NULL;
  217. }
  218. asprintf(&tmp, "%s.pub", identityfile);
  219. pubkeyfile = tmp;
  220. tmp = NULL;
  221. g_gate_log(LOG_DEBUG, "trying identity file: %s", identityfile);
  222. rc = libssh2_userauth_publickey_fromfile(session, username, pubkeyfile,
  223. identityfile, NULL);
  224. //rc = libssh2_userauth_password(session, "freebsd", "freebsd");
  225. if (rc) {
  226. g_gate_log(LOG_ERR, "identity file: %s", identityfile);
  227. libssh2_errorx(session, "libssh2_userauth_publickey_fromfile");
  228. }
  229. /* always need at least one */
  230. sftp_session = libssh2_sftp_init(session);
  231. if (sftp_session == NULL)
  232. g_gate_xlog("libssh2_sftp_init");
  233. handle = libssh2_sftp_open(sftp_session, imgpath, get_open_flags(), 0);
  234. if (handle == NULL) {
  235. g_gate_log(LOG_ERR, "image file: %s", imgpath);
  236. libssh2_errorx(session, "libssh2_sftp_open");
  237. }
  238. return (struct ggs_connection){
  239. .c_fd = sockfd,
  240. .c_session = session,
  241. .c_sftp_session = sftp_session,
  242. .c_handle = handle,
  243. };
  244. }
  245. /*
  246. * Resize is required to run in a thread because the resize requires
  247. * that I/O is able to complete before it can return.
  248. */
  249. static void *
  250. mediachg(void *arg __unused)
  251. {
  252. struct g_gate_ctl_modify ggiom;
  253. /* update mediasize, it may have changed */
  254. ggiom = (struct g_gate_ctl_modify){
  255. .gctl_version = G_GATE_VERSION,
  256. .gctl_unit = unit,
  257. .gctl_modify = GG_MODIFY_MEDIASIZE,
  258. .gctl_mediasize = mediasize,
  259. };
  260. g_gate_ioctl(G_GATE_CMD_MODIFY, &ggiom);
  261. g_gate_log(LOG_DEBUG, "updated ggate%d mediasize to %zd", unit,
  262. mediasize);
  263. return NULL;
  264. }
  265. static void *
  266. req_thread(void *arg __unused)
  267. {
  268. struct ggs_req *greq;
  269. static char *buf;
  270. int buflen = 1024*1024;
  271. int error;
  272. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  273. greq = NULL;
  274. for (;;) {
  275. if (greq == NULL)
  276. greq = malloc(sizeof *greq);
  277. if (buf == NULL)
  278. buf = malloc(buflen);
  279. if (greq == NULL || buf == NULL) {
  280. /* XXX */
  281. g_gate_log(LOG_ERR, "Unable to allocate memory.");
  282. exit(1);
  283. }
  284. *greq = (struct ggs_req){
  285. .r_ggio = (struct g_gate_ctl_io){
  286. .gctl_version = G_GATE_VERSION,
  287. .gctl_unit = unit,
  288. .gctl_data = buf,
  289. .gctl_length = buflen,
  290. .gctl_error = 0,
  291. },
  292. };
  293. //g_gate_log(LOG_DEBUG, "waiting for ioctl");
  294. g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio);
  295. //g_gate_log(LOG_DEBUG, "got ioctl");
  296. error = greq->r_ggio.gctl_error;
  297. switch (error) {
  298. case 0:
  299. break;
  300. case ECANCELED:
  301. /* Exit gracefully. */
  302. g_gate_close_device();
  303. exit(EXIT_SUCCESS);
  304. case ENXIO:
  305. default:
  306. g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
  307. strerror(error));
  308. }
  309. g_gate_log(LOG_DEBUG, "ggio(%p), ver: %u, unit: %d, seq: %llu, "
  310. "cmd: %u, offset: %llu, len: %llu", greq,
  311. greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit,
  312. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd,
  313. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  314. switch (greq->r_ggio.gctl_cmd) {
  315. case BIO_READ:
  316. /* use a correctly sized allocation */
  317. greq->r_ggio.gctl_data =
  318. malloc(greq->r_ggio.gctl_length);
  319. break;
  320. case BIO_WRITE:
  321. /* r_ggio takes ownership of buf now */
  322. buf = NULL;
  323. break;
  324. case BIO_DELETE:
  325. case BIO_FLUSH:
  326. greq->r_ggio.gctl_data = NULL;
  327. break;
  328. default:
  329. greq->r_ggio.gctl_error = EOPNOTSUPP;
  330. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  331. continue; /* return EOPNOTSUPP */
  332. break;
  333. }
  334. //g_gate_log(LOG_DEBUG, "waiting for slot");
  335. sem_wait(&nconn_sem);
  336. #if 0
  337. int semval;
  338. sem_getvalue(&nconn_sem, &semval);
  339. g_gate_log(LOG_DEBUG, "slots: %d", semval);
  340. #endif
  341. error = pthread_mutex_lock(&procqueue_mtx);
  342. assert(error == 0);
  343. TAILQ_INSERT_TAIL(&procqueue, greq, r_next);
  344. error = pthread_mutex_unlock(&procqueue_mtx);
  345. assert(error == 0);
  346. /* notify processing thread a request is waiting */
  347. error = write(pushfd, "T", 1);
  348. if (error != 1)
  349. g_gate_xlog("write pushfd: %d, error: %s.", error,
  350. strerror(error));
  351. /* pass ownership */
  352. greq = NULL;
  353. }
  354. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  355. return (NULL);
  356. }
  357. static const char *
  358. sftperrno_str(int err)
  359. {
  360. const char *strs[] = {
  361. [0] = "ok",
  362. [1] = "eof",
  363. [2] = "no such file",
  364. [3] = "permission denied",
  365. [4] = "failure",
  366. [5] = "bad message",
  367. [6] = "no connection",
  368. [7] = "connection lost",
  369. [8] = "op unsupported",
  370. };
  371. if (err < 0 || err >= (int)nitems(strs))
  372. return "invalid errno";
  373. return strs[err];
  374. }
  375. static int
  376. process_pending(struct ggs_reqqueue *req_pending,
  377. struct ggs_sessqueue *sessqueue)
  378. {
  379. struct ggs_req *greq, *greq2;
  380. char *errmsg;
  381. int rc;
  382. int sftperrno;
  383. int didwork;
  384. didwork = 0;
  385. /* Work on each pending request */
  386. TAILQ_FOREACH_SAFE(greq, req_pending, r_next, greq2) {
  387. again:
  388. switch (greq->r_ggio.gctl_cmd) {
  389. case BIO_READ:
  390. g_gate_log(LOG_DEBUG, "sftp_read(%p): %d(%d), rem: %d",
  391. greq, greq->r_ggio.gctl_offset,
  392. greq->r_ggio.gctl_length,
  393. greq->r_ggio.gctl_length - greq->r_bufoff);
  394. if (greq->r_didseek == 0) {
  395. libssh2_sftp_seek64(greq->r_handle,
  396. greq->r_ggio.gctl_offset);
  397. greq->r_didseek = 1;
  398. }
  399. rc = libssh2_sftp_read(greq->r_handle,
  400. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  401. greq->r_ggio.gctl_length - greq->r_bufoff);
  402. g_gate_log(LOG_DEBUG, "sftp_read ret: %d", rc);
  403. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  404. g_gate_log(LOG_ERR, "libssh2_sftp_read");
  405. break;
  406. case BIO_WRITE:
  407. g_gate_log(LOG_DEBUG, "sftp_write(%p): %d(%d), rem: %d",
  408. greq, greq->r_ggio.gctl_offset,
  409. greq->r_ggio.gctl_length,
  410. greq->r_ggio.gctl_length - greq->r_bufoff);
  411. if (greq->r_didseek == 0) {
  412. libssh2_sftp_seek64(greq->r_handle,
  413. greq->r_ggio.gctl_offset);
  414. greq->r_didseek = 1;
  415. }
  416. rc = libssh2_sftp_write(greq->r_handle,
  417. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  418. greq->r_ggio.gctl_length - greq->r_bufoff);
  419. g_gate_log(LOG_DEBUG, "sftp_write ret: %d", rc);
  420. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  421. libssh2_errorx(greq->r_ssh_session,
  422. "libssh2_sftp_write");
  423. break;
  424. case BIO_FLUSH:
  425. g_gate_log(LOG_DEBUG, "sftp_flush(%p)", greq);
  426. rc = libssh2_sftp_fsync(greq->r_handle);
  427. didwork = 1; /* assume this always does work */
  428. switch (rc) {
  429. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  430. greq->r_ggio.gctl_error = EOPNOTSUPP;
  431. goto completeio;
  432. case LIBSSH2_ERROR_EAGAIN:
  433. continue;
  434. case 0: /* success */
  435. goto completeio;
  436. default:
  437. libssh2_session_last_error(greq->r_ssh_session,
  438. &errmsg, NULL, 0);
  439. g_gate_log(LOG_ERR, "sftp_flush(%p) ret %d: %s",
  440. greq, rc, errmsg);
  441. greq->r_ggio.gctl_error = EIO;
  442. goto completeio;
  443. }
  444. /* NOTREACHABLE */
  445. break;
  446. case BIO_DELETE:
  447. g_gate_log(LOG_DEBUG, "sftp_punchhole(%p)", greq);
  448. rc = libssh2_sftp_punchhole(greq->r_handle,
  449. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  450. didwork = 1; /* assume this always does work */
  451. switch (rc) {
  452. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  453. greq->r_ggio.gctl_error = EOPNOTSUPP;
  454. sftperrno = libssh2_sftp_last_error(
  455. greq->r_session);
  456. g_gate_log(LOG_DEBUG, "sftp_punchhole(%p) errno: %s(%d)", greq,
  457. sftperrno_str(sftperrno), sftperrno);
  458. goto completeio;
  459. case LIBSSH2_ERROR_EAGAIN:
  460. continue;
  461. case 0: /* success */
  462. goto completeio;
  463. default:
  464. libssh2_session_last_error(greq->r_ssh_session,
  465. &errmsg, NULL, 0);
  466. g_gate_log(LOG_ERR, "sftp_punchhole(%p) ret %d: %s",
  467. greq, rc, errmsg);
  468. greq->r_ggio.gctl_error = EIO;
  469. goto completeio;
  470. }
  471. /* NOTREACHABLE */
  472. break;
  473. default:
  474. rc = 0;
  475. g_gate_log(LOG_ERR, "unhandled op: %d",
  476. greq->r_ggio.gctl_cmd);
  477. continue;
  478. }
  479. if (rc > 0) {
  480. didwork = 1;
  481. greq->r_bufoff += rc;
  482. /*
  483. * try again on partial read/write,
  484. * might have more data pending
  485. */
  486. if ((off_t)greq->r_bufoff != greq->r_ggio.gctl_length)
  487. goto again;
  488. }
  489. if ((off_t)greq->r_bufoff == greq->r_ggio.gctl_length) {
  490. /* complete */
  491. completeio:
  492. g_gate_log(LOG_DEBUG, "cmd complete: seq: %d, cmd: %d",
  493. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd);
  494. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  495. TAILQ_REMOVE(req_pending, greq, r_next);
  496. TAILQ_INSERT_HEAD(sessqueue, greq->r_sesscache,
  497. sc_next);
  498. free(greq->r_ggio.gctl_data);
  499. free(greq);
  500. /* release this slot */
  501. sem_post(&nconn_sem);
  502. }
  503. }
  504. return didwork;
  505. }
  506. /*
  507. * sftp session management is a bit tricky.
  508. * if there is an entry in sessioncache, use that one.
  509. * if we are waiting for a new session (gsc_pend != NULL),
  510. * establish session, then open handle
  511. * when the new session completes, process the work queue
  512. */
  513. static void *
  514. proc_thread(void *arg __unused)
  515. {
  516. char scratch[32];
  517. struct ggs_reqqueue req_pending;
  518. struct timeval to;
  519. struct ggs_sess_cache *gsc, *gsc_pending;
  520. struct ggs_req *greq;
  521. LIBSSH2_SESSION *session;
  522. fd_set fdread;
  523. fd_set fdwrite;
  524. fd_set fdexcep;
  525. int sockfd;
  526. int maxfd;
  527. int error;
  528. int dir;
  529. int rc;
  530. int didwork; /* did libssh2 do any work? */
  531. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  532. TAILQ_INIT(&req_pending);
  533. /* make sure we don't block on reading */
  534. fcntl(popfd, F_SETFL, O_NONBLOCK);
  535. sockfd = start_conn.c_fd;
  536. session = start_conn.c_session;
  537. gsc = malloc(sizeof *gsc);
  538. gsc->sc_ssh_session = start_conn.c_session;
  539. gsc->sc_session = start_conn.c_sftp_session;
  540. gsc->sc_handle = start_conn.c_handle;
  541. TAILQ_INSERT_HEAD(&session_cache, gsc, sc_next);
  542. gsc = NULL;
  543. gsc_pending = NULL;
  544. didwork = 0;
  545. libssh2_session_set_blocking(session, 0);
  546. for (;;) {
  547. //g_gate_log(LOG_DEBUG, "looping");
  548. if (!didwork) {
  549. /* setup polling loop */
  550. maxfd = -1;
  551. FD_ZERO(&fdread);
  552. FD_ZERO(&fdwrite);
  553. FD_ZERO(&fdexcep);
  554. dir = libssh2_session_block_directions(session);
  555. if (dir & LIBSSH2_SESSION_BLOCK_INBOUND ||
  556. gsc_pending != NULL)
  557. FD_SET(sockfd, &fdread);
  558. if (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
  559. FD_SET(sockfd, &fdwrite);
  560. /* add in the pop descriptor */
  561. FD_SET(popfd, &fdread);
  562. maxfd = MAX(popfd, sockfd);
  563. g_gate_log(LOG_DEBUG, "selecting: %s %s, " \
  564. "read: sockfd: %d, popfd: %d, write: sockfd: %d",
  565. (dir & LIBSSH2_SESSION_BLOCK_INBOUND) ? "inbound" :
  566. "", (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) ?
  567. "outbound" : "", FD_ISSET(sockfd, &fdread),
  568. FD_ISSET(popfd, &fdread),
  569. FD_ISSET(sockfd, &fdwrite));
  570. to = (struct timeval){ .tv_sec = 1, .tv_usec = 1000 };
  571. (void)to;
  572. rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep,
  573. NULL);
  574. switch (rc) {
  575. case -1:
  576. g_gate_log(LOG_ERR, "%s: select failed: %s",
  577. __func__, strerror(errno));
  578. break;
  579. case 0:
  580. default:
  581. g_gate_log(LOG_DEBUG, "select: %d, " \
  582. "read: sockfd: %d, popfd: %d, " \
  583. "write: sockfd: %d", rc,
  584. FD_ISSET(sockfd, &fdread),
  585. FD_ISSET(popfd, &fdread),
  586. FD_ISSET(sockfd, &fdwrite));
  587. break;
  588. }
  589. }
  590. didwork = 0;
  591. /* process pending, so any completed can be reused */
  592. didwork |= process_pending(&req_pending, &session_cache);
  593. if (FD_ISSET(popfd, &fdread)) {
  594. /* read off the tokens */
  595. g_gate_log(LOG_DEBUG, "popping");
  596. read(popfd, scratch, sizeof scratch);
  597. for (;;) {
  598. procreq:
  599. /* get the request */
  600. error = pthread_mutex_lock(&procqueue_mtx);
  601. assert(error == 0);
  602. greq = TAILQ_FIRST(&procqueue);
  603. g_gate_log(LOG_DEBUG, "greq: %p", greq);
  604. if (greq != NULL)
  605. TAILQ_REMOVE(&procqueue, greq, r_next);
  606. error = pthread_mutex_unlock(&procqueue_mtx);
  607. assert(error == 0);
  608. /* no more to process */
  609. if (greq == NULL)
  610. break;
  611. gsc = TAILQ_FIRST(&session_cache);
  612. if (gsc == NULL) {
  613. if (gsc_pending == NULL) {
  614. /* need new session */
  615. g_gate_log(LOG_DEBUG,
  616. "need new session");
  617. gsc_pending =
  618. malloc(sizeof *gsc);
  619. gsc_pending->sc_ssh_session =
  620. session;
  621. gsc_pending->sc_session = NULL;
  622. gsc_pending->sc_handle = NULL;
  623. }
  624. /* put back request */
  625. error =
  626. pthread_mutex_lock(&procqueue_mtx);
  627. assert(error == 0);
  628. TAILQ_INSERT_HEAD(&procqueue, greq,
  629. r_next);
  630. error = pthread_mutex_unlock(
  631. &procqueue_mtx);
  632. assert(error == 0);
  633. break;
  634. } else {
  635. /* process request */
  636. TAILQ_REMOVE(&session_cache, gsc,
  637. sc_next);
  638. greq->r_sesscache = gsc;
  639. gsc = NULL;
  640. greq->r_bufoff = 0;
  641. TAILQ_INSERT_TAIL(&req_pending, greq,
  642. r_next);
  643. greq = NULL;
  644. }
  645. }
  646. }
  647. if (gsc_pending != NULL) {
  648. /* we are creating a new session */
  649. if (gsc_pending->sc_session == NULL) {
  650. didwork = 1;
  651. gsc_pending->sc_session =
  652. libssh2_sftp_init(session);
  653. }
  654. if (gsc_pending->sc_session != NULL) {
  655. didwork = 1;
  656. gsc_pending->sc_handle = libssh2_sftp_open(
  657. gsc_pending->sc_session, "fstest/data.img",
  658. get_open_flags(), 0);
  659. }
  660. g_gate_log(LOG_DEBUG,
  661. "pending: session: %p, handle: %p",
  662. gsc_pending->sc_session, gsc_pending->sc_handle);
  663. /* we have a fully initalized entry, use it */
  664. if (gsc_pending->sc_handle != NULL) {
  665. g_gate_log(LOG_DEBUG, "new session created");
  666. TAILQ_INSERT_HEAD(&session_cache, gsc_pending,
  667. sc_next);
  668. gsc_pending = NULL;
  669. didwork = 1;
  670. goto procreq;
  671. }
  672. }
  673. /* kick of any queued requests from above */
  674. didwork |= process_pending(&req_pending, &session_cache);
  675. }
  676. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  677. pthread_exit(NULL);
  678. }
  679. static void
  680. ggatessh_makepidfile(void)
  681. {
  682. pid_t otherpid;
  683. if (!g_gate_verbose) {
  684. if (ggatessh_pidfile == NULL) {
  685. asprintf(&ggatessh_pidfile,
  686. _PATH_VARRUN "/ggatessh.ggate%d.pid", unit);
  687. if (ggatessh_pidfile == NULL)
  688. err(EXIT_FAILURE,
  689. "Cannot allocate memory for pidfile");
  690. }
  691. pfh = pidfile_open(ggatessh_pidfile, 0600, &otherpid);
  692. if (pfh == NULL) {
  693. if (errno == EEXIST) {
  694. errx(EXIT_FAILURE,
  695. "Daemon already running, pid: %jd.",
  696. (intmax_t)otherpid);
  697. }
  698. err(EXIT_FAILURE, "Cannot open/create pidfile");
  699. }
  700. }
  701. }
  702. static void
  703. mydaemon(void)
  704. {
  705. if (g_gate_verbose > 0)
  706. return;
  707. if (daemon(0, 0) == 0)
  708. return;
  709. if (action == CREATE)
  710. g_gate_destroy(unit, 1);
  711. err(EXIT_FAILURE, "Cannot daemonize");
  712. }
  713. static int
  714. g_gatessh_connect(void)
  715. {
  716. struct ggs_connection conn;
  717. LIBSSH2_SFTP_ATTRIBUTES attrs;
  718. int rc;
  719. /* get the remote's size */
  720. conn = make_connection();
  721. rc = libssh2_sftp_fstat(conn.c_handle, &attrs);
  722. /* only allow regular and char devices */
  723. if (!(LIBSSH2_SFTP_S_ISREG(attrs.flags) ||
  724. !LIBSSH2_SFTP_S_ISCHR(attrs.flags))) {
  725. g_gate_xlog("remote file not a regular file");
  726. }
  727. mediasize = attrs.filesize;
  728. g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize);
  729. start_conn = conn; /* cache to use later */
  730. return 1;
  731. }
  732. static void
  733. g_gatessh_start(void)
  734. {
  735. int filedes[2];
  736. int error;
  737. pipe(filedes);
  738. pushfd = filedes[1];
  739. popfd = filedes[0];
  740. error = pthread_mutex_init(&procqueue_mtx, NULL);
  741. if (error != 0) {
  742. g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
  743. strerror(error));
  744. }
  745. sem_init(&nconn_sem, 0, maxconnections);
  746. error = pthread_create(&proctd, NULL, proc_thread, NULL);
  747. if (error != 0) {
  748. g_gate_destroy(unit, 1); /* XXX - remove */
  749. g_gate_xlog("pthread_create(proc_thread): %s.",
  750. strerror(error));
  751. }
  752. pthread_set_name_np(proctd, "proc");
  753. reqtd = pthread_self();
  754. pthread_set_name_np(reqtd, "req");
  755. req_thread(NULL);
  756. /* Disconnected. */
  757. close(pushfd);
  758. close(popfd);
  759. }
  760. static void
  761. signop(int sig __unused)
  762. {
  763. /* Do nothing. */
  764. }
  765. static void
  766. g_gatessh_loop(void)
  767. {
  768. struct g_gate_ctl_cancel ggioc;
  769. signal(SIGUSR1, signop);
  770. for (;;) {
  771. g_gatessh_start();
  772. g_gate_log(LOG_NOTICE, "Disconnected [%s@%s:%s]. Connecting...",
  773. username, hostname, imgpath);
  774. ggioc.gctl_version = G_GATE_VERSION;
  775. ggioc.gctl_unit = unit;
  776. ggioc.gctl_seq = 0;
  777. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  778. }
  779. }
  780. static void
  781. g_gatessh_create(void)
  782. {
  783. struct g_gate_ctl_create ggioc;
  784. if (!g_gatessh_connect())
  785. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  786. /*
  787. * Ok, got both sockets, time to create provider.
  788. */
  789. memset(&ggioc, 0, sizeof(ggioc));
  790. ggioc.gctl_version = G_GATE_VERSION;
  791. ggioc.gctl_mediasize = mediasize;
  792. ggioc.gctl_sectorsize = sectorsize;
  793. ggioc.gctl_flags = flags;
  794. ggioc.gctl_maxcount = queue_size;
  795. ggioc.gctl_timeout = timeout;
  796. ggioc.gctl_unit = unit;
  797. snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s@%s:%s",
  798. username, hostname, imgpath);
  799. g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
  800. if (unit == -1) {
  801. printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
  802. fflush(stdout);
  803. }
  804. unit = ggioc.gctl_unit;
  805. ggatessh_makepidfile();
  806. mydaemon();
  807. if (pfh != NULL)
  808. pidfile_write(pfh);
  809. g_gatessh_loop();
  810. }
  811. static void
  812. g_gatessh_rescue(void)
  813. {
  814. struct g_gate_ctl_cancel ggioc;
  815. int error;
  816. g_gate_log(LOG_ERR, "a");
  817. if (!g_gatessh_connect())
  818. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  819. g_gate_log(LOG_ERR, "b");
  820. ggioc = (struct g_gate_ctl_cancel){
  821. .gctl_version = G_GATE_VERSION,
  822. .gctl_unit = unit,
  823. .gctl_seq = 0,
  824. };
  825. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  826. ggatessh_makepidfile();
  827. mydaemon();
  828. pidfile_write(pfh);
  829. error = pthread_create(&mediatd, NULL, mediachg, NULL);
  830. if (error != 0)
  831. g_gate_xlog("unable to create mediasize change thread",
  832. strerror(errno));
  833. g_gatessh_loop();
  834. }
  835. /*
  836. * handle two methods of specifying things.
  837. * if only one arg, in the future split it how ssh does:
  838. * [user[:password]@]<host>:<img>
  839. * and URI:
  840. * sftp://[user[:password]@]<host>[:<port>]/<img>
  841. *
  842. * If both are specified, it's the same as above, but split
  843. * on the :.
  844. */
  845. static void
  846. handle_params(int argc, char *argv[])
  847. {
  848. if (username == NULL) {
  849. username = getenv("USER");
  850. if (username == NULL) {
  851. err(EXIT_FAILURE,
  852. "USER environment variable not present, set, "
  853. "or specify via -l argument.");
  854. }
  855. }
  856. if (argc != 2)
  857. usage();
  858. hostname = argv[0];
  859. imgpath = argv[1];
  860. }
  861. int
  862. main(int argc, char *argv[])
  863. {
  864. int rc;
  865. if (argc < 2)
  866. usage();
  867. if (strcasecmp(argv[1], "create") == 0)
  868. action = CREATE;
  869. else if (strcasecmp(argv[1], "destroy") == 0)
  870. action = DESTROY;
  871. else if (strcasecmp(argv[1], "list") == 0)
  872. action = LIST;
  873. else if (strcasecmp(argv[1], "rescue") == 0)
  874. action = RESCUE;
  875. else
  876. usage();
  877. argc -= 1;
  878. argv += 1;
  879. for (;;) {
  880. int ch;
  881. ch = getopt(argc, argv, "fF:i:l:o:p:q:r:s:t:u:v");
  882. if (ch == -1)
  883. break;
  884. switch (ch) {
  885. case 'f':
  886. if (action != DESTROY)
  887. usage();
  888. force = 1;
  889. break;
  890. case 'F':
  891. ggatessh_pidfile = optarg;
  892. break;
  893. case 'i':
  894. identityfile = optarg;
  895. break;
  896. case 'l':
  897. username = optarg;
  898. break;
  899. case 'o':
  900. if (action != CREATE && action != RESCUE)
  901. usage();
  902. if (strcasecmp("ro", optarg) == 0)
  903. flags = G_GATE_FLAG_READONLY;
  904. else if (strcasecmp("wo", optarg) == 0)
  905. flags = G_GATE_FLAG_WRITEONLY;
  906. else if (strcasecmp("rw", optarg) == 0)
  907. flags = 0;
  908. else {
  909. errx(EXIT_FAILURE,
  910. "Invalid argument for '-o' option.");
  911. }
  912. break;
  913. case 'p':
  914. sshport = optarg;
  915. break;
  916. case 'q':
  917. if (action != CREATE)
  918. usage();
  919. errno = 0;
  920. queue_size = strtoul(optarg, NULL, 10);
  921. if (queue_size == 0 && errno != 0)
  922. errx(EXIT_FAILURE, "Invalid queue_size.");
  923. break;
  924. case 'r':
  925. if (action != CREATE && action != RESCUE)
  926. usage();
  927. errno = 0;
  928. maxconnections = strtoul(optarg, NULL, 10);
  929. if (maxconnections == 0 && errno != 0)
  930. errx(EXIT_FAILURE, "Invalid queue_size.");
  931. break;
  932. case 's':
  933. if (action != CREATE)
  934. usage();
  935. errno = 0;
  936. sectorsize = strtoul(optarg, NULL, 10);
  937. if (sectorsize == 0 && errno != 0)
  938. errx(EXIT_FAILURE, "Invalid sectorsize.");
  939. break;
  940. case 't':
  941. if (action != CREATE)
  942. usage();
  943. errno = 0;
  944. timeout = strtoul(optarg, NULL, 10);
  945. if (timeout == 0 && errno != 0)
  946. errx(EXIT_FAILURE, "Invalid timeout.");
  947. break;
  948. case 'u':
  949. errno = 0;
  950. unit = strtol(optarg, NULL, 10);
  951. if (unit == 0 && errno != 0)
  952. errx(EXIT_FAILURE, "Invalid unit number.");
  953. break;
  954. case 'v':
  955. if (action == DESTROY)
  956. usage();
  957. g_gate_verbose++;
  958. break;
  959. default:
  960. usage();
  961. }
  962. }
  963. argc -= optind;
  964. argv += optind;
  965. g_gate_log(LOG_DEBUG, "libssh2_init");
  966. rc = libssh2_init(0);
  967. if (rc != 0) {
  968. fprintf(stderr, "libssh2 initialization failed (%d)\n", rc);
  969. return 1;
  970. }
  971. switch (action) {
  972. case CREATE:
  973. if (argc < 1 || argc > 2)
  974. usage();
  975. handle_params(argc, argv);
  976. g_gate_load_module();
  977. g_gate_open_device();
  978. g_gatessh_create();
  979. break;
  980. case DESTROY:
  981. if (argc != 0)
  982. usage();
  983. if (unit == -1) {
  984. fprintf(stderr, "Required unit number.\n");
  985. usage();
  986. }
  987. g_gate_verbose = 1;
  988. g_gate_open_device();
  989. g_gate_destroy(unit, force);
  990. break;
  991. case LIST:
  992. g_gate_list(unit, g_gate_verbose);
  993. break;
  994. case RESCUE:
  995. if (argc < 1 || argc > 2)
  996. usage();
  997. if (unit == -1) {
  998. fprintf(stderr, "Required unit number.\n");
  999. usage();
  1000. }
  1001. handle_params(argc, argv);
  1002. g_gate_open_device();
  1003. g_gatessh_rescue();
  1004. break;
  1005. case UNSET:
  1006. default:
  1007. usage();
  1008. }
  1009. g_gate_close_device();
  1010. exit(EXIT_SUCCESS);
  1011. }