diff --git a/.gitignore b/.gitignore index 5bef084..8c6858f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,6 @@ *.8.gz *.full *.debug -ggatehttp/ggatehttp +ggatessh/ggatessh tests/Kyuafile -tests/ggatehttp_test +tests/ggatessh_test diff --git a/Makefile b/Makefile index 9580d61..80f2d57 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ # $FreeBSD$ -SUBDIR= ${_ggatehttp} \ +SUBDIR= ${_ggatessh} \ tests -_ggatehttp= ggatehttp +_ggatessh= ggatessh .PHONY: devtest diff --git a/README.md b/README.md index 97a4a41..5752719 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,16 @@ -https ggate working tree -======================== +ggatessh working tree +===================== This is a working tree for ggate work. -This is a variant of ggatec using http(s) GET/PUT instead of talking -to ggated. +This is a variant of ggatec using sftp instead of talking to ggated. -Note that when I started on this project, that this would be completely -standards complaint. After running into an issue w/ the wsgidav server -not supporting partial PUTs, I did some research, and came across this -[post](https://blog.sphere.chronosempire.org.uk/2012/11/21/webdav-and-the-http-patch-nightmare) -that talks about how the IETF intentionally broke partial PUTs, despite -having the same problems w/ other parts of their spec. +It uses the libssh2 library with a few modifications. The build system +will be updated to compile and use libssh2. There is at least one minor +modification needed to turn off read-ahead for sftp support. -Servers known to work: -- apache 2.2.x: can truncate file under some conditions - -Services known to not work: -- wsgidav (Python): Does not implement partial PUT +By default, libssh2 assumes that you'll read a whole file sequentially +in blocking mode, and if it does not do this, there will be pipeline +stalls. In our case, this would result in lots of wasted bandwith as +we will be seeking around the file to read and write, and so this +feature needs to be disabled. diff --git a/ggatehttp/Makefile b/ggatehttp/Makefile deleted file mode 100644 index d6a8a79..0000000 --- a/ggatehttp/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -# $FreeBSD$ - -.PATH: ${.CURDIR:H}/shared - -PROG= ggatehttp -MAN= ggatehttp.8 -SRCS= ggatehttp.c ggate.c - -CFLAGS+= -DMAX_SEND_SIZE=32768 -CFLAGS+= -DLIBGEOM -CFLAGS+= -I${.CURDIR:H}/shared -CFLAGS+= -I/usr/local/include -#CFLAGS+= -O0 -#CFLAGS+= -fprofile-instr-generate - -LDFLAGS+= -L/usr/local/lib -LDADD= -lgeom -lutil -lpthread -lcurl - -.include diff --git a/ggatehttp/ggatehttp.c b/ggatehttp/ggatehttp.c deleted file mode 100644 index 4da6443..0000000 --- a/ggatehttp/ggatehttp.c +++ /dev/null @@ -1,770 +0,0 @@ -/*- - * SPDX-License-Identifier: BSD-2-Clause-FreeBSD - * - * Copyright (c) 2004 Pawel Jakub Dawidek - * All rights reserved. - * Copyright 2020 John-Mark Gurney - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - * - * $FreeBSD$ - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include -#include - -#include -#include "ggate.h" - -static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET; - -static const char *url = NULL; -static int unit = G_GATE_UNIT_AUTO; -static unsigned flags = 0; -static int force = 0; -static unsigned queue_size = G_GATE_QUEUE_SIZE; -static off_t mediasize; -static unsigned sectorsize = 4096; -static unsigned timeout = G_GATE_TIMEOUT; -static int pushfd, popfd; /* work semaphore */ -static pthread_t reqtd, proctd; -static unsigned maxconnections = 32; - -struct ggh_req { - struct g_gate_ctl_io r_ggio; - CURL *r_chandle; - size_t r_bufoff; - TAILQ_ENTRY(ggh_req) r_next; -}; - -static TAILQ_HEAD(, ggh_req) procqueue = TAILQ_HEAD_INITIALIZER(procqueue); -static sem_t nconn_sem; -static pthread_mutex_t procqueue_mtx; - -static void -usage(void) -{ - - fprintf(stderr, "usage: %s create [-v] [-o ] " - "[-q queue_size] [-s sectorsize] [-r nrequests] " - "[-t timeout] [-u unit] \n", getprogname()); - fprintf(stderr, " %s rescue [-v] [-o ] " - "[-r nrequests] <-u unit> \n", getprogname()); - fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname()); - fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname()); - exit(EXIT_FAILURE); -} - -static void * -req_thread(void *arg __unused) -{ - struct ggh_req *greq; - static char *buf; - int buflen = 1024*1024; - int error; - - g_gate_log(LOG_NOTICE, "%s: started!", __func__); - - greq = NULL; - - for (;;) { - if (greq == NULL) - greq = malloc(sizeof *greq); - - if (buf == NULL) - buf = malloc(buflen); - - if (greq == NULL || buf == NULL) { - /* XXX */ - g_gate_log(LOG_ERR, "Unable to allocate memory."); - exit(1); - } - - greq->r_ggio.gctl_version = G_GATE_VERSION; - greq->r_ggio.gctl_unit = unit; - greq->r_ggio.gctl_data = buf; - greq->r_ggio.gctl_length = buflen; - greq->r_ggio.gctl_error = 0; - - //g_gate_log(LOG_DEBUG, "waiting for ioctl"); - g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio); - //g_gate_log(LOG_DEBUG, "got ioctl"); - - error = greq->r_ggio.gctl_error; - switch (error) { - case 0: - break; - case ECANCELED: - /* Exit gracefully. */ - g_gate_close_device(); - exit(EXIT_SUCCESS); - case ENXIO: - default: - g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME, - strerror(error)); - } - - g_gate_log(LOG_DEBUG, "ggio, ver: %u, unit: %d, seq: %llu, " - "cmd: %u, offset: %llu, len: %llu", - greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit, - greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd, - greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length); - - switch (greq->r_ggio.gctl_cmd) { - case BIO_READ: - /* use a correctly sized allocation */ - greq->r_ggio.gctl_data = - malloc(greq->r_ggio.gctl_length); - break; - case BIO_WRITE: - /* r_ggio takes ownership of buf now */ - buf = NULL; - break; - - case BIO_DELETE: - case BIO_FLUSH: - default: - greq->r_ggio.gctl_error = EOPNOTSUPP; - g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio); - continue; /* return EOPNOTSUPP */ - break; - } - - //g_gate_log(LOG_DEBUG, "waiting for slot"); - sem_wait(&nconn_sem); -#if 0 - int semval; - sem_getvalue(&nconn_sem, &semval); - g_gate_log(LOG_DEBUG, "slots: %d", semval); -#endif - - error = pthread_mutex_lock(&procqueue_mtx); - assert(error == 0); - TAILQ_INSERT_TAIL(&procqueue, greq, r_next); - error = pthread_mutex_unlock(&procqueue_mtx); - assert(error == 0); - - /* notify processing thread a request is waiting */ - error = write(pushfd, "T", 1); - if (error != 1) - g_gate_xlog("write pushfd: %d, error: %s.", error, - strerror(error)); - - /* pass ownership */ - greq = NULL; - } - g_gate_log(LOG_DEBUG, "%s: Died.", __func__); - return (NULL); -} - -/* - * To support any auth: - * https://curl.haxx.se/libcurl/c/anyauthput.html - */ -static curlioerr -curl_ioctl(CURL *hndl, curliocmd cmd, void *userdata) -{ - struct ggh_req *greq; - - (void)hndl; - - greq = (struct ggh_req *)userdata; - - switch (cmd) { - case CURLIOCMD_RESTARTREAD: - greq->r_bufoff = 0; - break; - - default: - return CURLIOE_UNKNOWNCMD; - } - - return CURLIOE_OK; -} - -/* - * file the curl buffer with data to send to remote server. - */ -static size_t -curl_readfun(char *buffer, size_t size, size_t nitems, void *userdata) -{ - struct ggh_req *greq; - size_t cnt; - - greq = (struct ggh_req *)userdata; - - cnt = MIN(size * nitems, greq->r_ggio.gctl_length - greq->r_bufoff); - - //g_gate_log(LOG_DEBUG, "sending %zd bytes on %p", cnt, greq); - - memcpy(buffer, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, cnt); - - greq->r_bufoff += cnt; - - return cnt; -} - -static size_t -curl_writefun(char *buffer, size_t size, size_t nitems, void *userdata) -{ - struct ggh_req *greq; - size_t cnt; - - greq = (struct ggh_req *)userdata; - - cnt = size * nitems; - - assert((off_t)(greq->r_bufoff + cnt) <= greq->r_ggio.gctl_length); - - memcpy((char *)greq->r_ggio.gctl_data + greq->r_bufoff, buffer, cnt); - - greq->r_bufoff += cnt; - - return cnt; -} - -static void -process_greq(CURLM *cmulti, struct ggh_req *greq) -{ - char range_header[256]; - off_t start, length, end; - - /* start processing */ - greq->r_chandle = curl_easy_init(); - - curl_easy_setopt(greq->r_chandle, CURLOPT_URL, url); - curl_easy_setopt(greq->r_chandle, CURLOPT_PRIVATE, (char *)greq); - //curl_easy_setopt(greq->r_chandle, CURLOPT_VERBOSE, (long)1); - - start = greq->r_ggio.gctl_offset; - length = greq->r_ggio.gctl_length; - end = start + length; - - greq->r_bufoff = 0; - switch (greq->r_ggio.gctl_cmd) { - case BIO_READ: - curl_easy_setopt(greq->r_chandle, CURLOPT_WRITEFUNCTION, - curl_writefun); - curl_easy_setopt(greq->r_chandle, CURLOPT_WRITEDATA, greq); - - sprintf(range_header, "%zd-%zd", start, end - 1); - g_gate_log(LOG_DEBUG, "read range: %s", range_header); - curl_easy_setopt(greq->r_chandle, CURLOPT_RANGE, range_header); - curl_multi_add_handle(cmulti, greq->r_chandle); - break; - - case BIO_WRITE: - curl_easy_setopt(greq->r_chandle, CURLOPT_IOCTLFUNCTION, - curl_ioctl); - curl_easy_setopt(greq->r_chandle, CURLOPT_IOCTLDATA, greq); - curl_easy_setopt(greq->r_chandle, CURLOPT_READFUNCTION, - curl_readfun); - curl_easy_setopt(greq->r_chandle, CURLOPT_READDATA, greq); - curl_easy_setopt(greq->r_chandle, CURLOPT_UPLOAD, (long)1); - /* XXX - support more than basic */ - //curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPAUTH, (long)CURLAUTH_ANY); - curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPAUTH, - (long)CURLAUTH_BASIC); - - //curl_easy_setopt(greq->r_chandle, CURLOPT_VERBOSE, (long)1); - - /* https://curl.haxx.se/mail/lib-2019-05/0012.html */ - curl_easy_setopt(greq->r_chandle, CURLOPT_INFILESIZE_LARGE, - (curl_off_t)length); - /* we don't need resume from as we don't seek */ - //curl_easy_setopt(greq->r_chandle, CURLOPT_RESUME_FROM_LARGE, (curl_off_t)start); - sprintf(range_header, "Content-Range: bytes %zd-%zd/%zd", - start, end - 1, mediasize); - g_gate_log(LOG_DEBUG, "write range: %s", range_header); - - struct curl_slist *header_list; - header_list = curl_slist_append(NULL, range_header); - curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPHEADER, - header_list); - -#if 1 - curl_multi_add_handle(cmulti, greq->r_chandle); -#else - CURLcode res; - res = curl_easy_perform(greq->r_chandle); - curl_easy_getinfo(greq->r_chandle, CURLINFO_RESPONSE_CODE, - &code); - if (code != 200) { - g_gate_log(LOG_ERR, - "Got invalid response, HTTP code %03d.", code); - } -#endif - break; - } - - /* start processing */ - //curl_multi_add_handle(cmulti, greq->r_chandle); -} - -static void * -proc_thread(void *arg __unused) -{ - char scratch[32]; - struct timeval to; - CURLMsg *m; - CURLM *cmulti; - struct ggh_req *greq; - fd_set fdread; - fd_set fdwrite; - fd_set fdexcep; - CURLMcode mc; - long curl_timeo; - long code; - int rc; - int maxfd; - int error; - int still_running; - - g_gate_log(LOG_NOTICE, "%s: started!", __func__); - - /* make sure we don't block on reading */ - fcntl(popfd, F_SETFL, O_NONBLOCK); - - cmulti = curl_multi_init(); - //mc = curl_multi_setopt(cmulti, CURLOPT_VERBOSE, (long)1); - for (;;) { - //g_gate_log(LOG_DEBUG, "looping"); - - /* setup polling loop */ - maxfd = -1; - FD_ZERO(&fdread); - FD_ZERO(&fdwrite); - FD_ZERO(&fdexcep); - to = (struct timeval){ .tv_sec = 1 }; - curl_timeo = -1; - curl_multi_timeout(cmulti, &curl_timeo); - if (curl_timeo >= 0) { - to.tv_sec = curl_timeo / 1000; - if (to.tv_sec > 1) - to.tv_sec = 1; - else - to.tv_usec = (curl_timeo % 1000) * 1000; - } - mc = curl_multi_fdset(cmulti, &fdread, &fdwrite, &fdexcep, &maxfd); - if (mc != CURLM_OK) { - g_gate_log(LOG_ERR, "%s: fdset failed.", __func__); - break; - } - - /* add in the pop descriptor */ - FD_SET(popfd, &fdread); - maxfd = MAX(popfd, maxfd); - - rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &to); - switch (rc) { - case -1: - g_gate_log(LOG_ERR, "%s: select failed: %s", __func__, - strerror(errno)); - break; - case 0: - default: - curl_multi_perform(cmulti, &still_running); - break; - } - - /* Check for completed requests */ - do { - int msgq = 0; - m = curl_multi_info_read(cmulti, &msgq); - if (m != NULL && m->msg == CURLMSG_DONE) { - CURL *e = m->easy_handle; - - curl_easy_getinfo(e, CURLINFO_PRIVATE, - (char *)&greq); - curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, - &code); - g_gate_log(LOG_DEBUG, "request code: %d", code); - if (code != 206 && code != 204) { - g_gate_log(LOG_ERR, - "request failed: %d", code); - greq->r_ggio.gctl_error = EIO; - } - - g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio); - - //g_gate_log(LOG_DEBUG, "releasing slot"); - sem_post(&nconn_sem); - - curl_multi_remove_handle(cmulti, e); - curl_easy_cleanup(e); - - free(greq->r_ggio.gctl_data); - free(greq); - } else if (m != NULL) { - g_gate_log(LOG_ERR, "unknown curl msg: %d", - m->msg); - } - } while (m != NULL); - - if (FD_ISSET(popfd, &fdread)) { - /* read off the tokens */ - read(popfd, scratch, sizeof scratch); - - do { - /* get the request */ - error = pthread_mutex_lock(&procqueue_mtx); - assert(error == 0); - greq = TAILQ_FIRST(&procqueue); - if (greq != NULL) - TAILQ_REMOVE(&procqueue, greq, r_next); - error = pthread_mutex_unlock(&procqueue_mtx); - assert(error == 0); - - /* no more to process */ - if (greq == NULL) - break; - - process_greq(cmulti, greq); - } while (greq != NULL); - } - } - - curl_multi_cleanup(cmulti); - g_gate_log(LOG_DEBUG, "%s: Died.", __func__); - pthread_exit(NULL); -} - -static void -mydaemon(void) -{ - - if (g_gate_verbose > 0) - return; - if (daemon(0, 0) == 0) - return; - if (action == CREATE) - g_gate_destroy(unit, 1); - err(EXIT_FAILURE, "Cannot daemonize"); -} - -static int -g_gatehttp_connect(void) -{ - CURL *hndl; - CURLcode cc; - long code; - curl_off_t cl; - - /* get the remote's size */ - hndl = curl_easy_init(); - curl_easy_setopt(hndl, CURLOPT_URL, url); - curl_easy_setopt(hndl, CURLOPT_NOBODY, (long)1); - //curl_easy_setopt(hndl, CURLOPT_VERBOSE, (long)1); - - cc = curl_easy_perform(hndl); - - if (cc != CURLE_OK) { - g_gate_log(LOG_ERR, "curl request failed."); - return 0; - } - - curl_easy_getinfo(hndl, CURLINFO_RESPONSE_CODE, &code); - if (code != 200) { - g_gate_log(LOG_ERR, "Got invalid response, HTTP code %03d.", code); - return 0; - } - - curl_easy_getinfo(hndl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &cl); - mediasize = cl; - g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize); - - curl_easy_cleanup(hndl); - - return 1; -} - -static void -g_gatehttp_start(void) -{ - int filedes[2]; - int error; - - pipe(filedes); - pushfd = filedes[1]; - popfd = filedes[0]; - - error = pthread_mutex_init(&procqueue_mtx, NULL); - if (error != 0) { - g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", - strerror(error)); - } - - sem_init(&nconn_sem, 0, maxconnections); - - error = pthread_create(&proctd, NULL, proc_thread, NULL); - if (error != 0) { - g_gate_destroy(unit, 1); - g_gate_xlog("pthread_create(proc_thread): %s.", - strerror(error)); - } - pthread_set_name_np(proctd, "proc"); - - reqtd = pthread_self(); - pthread_set_name_np(reqtd, "req"); - req_thread(NULL); - - /* Disconnected. */ - close(pushfd); - close(popfd); -} - -static void -signop(int sig __unused) -{ - - /* Do nothing. */ -} - -static void -g_gatehttp_loop(void) -{ - struct g_gate_ctl_cancel ggioc; - - signal(SIGUSR1, signop); - for (;;) { - g_gatehttp_start(); - g_gate_log(LOG_NOTICE, "Disconnected [%s]. Connecting...", - url); - - ggioc.gctl_version = G_GATE_VERSION; - ggioc.gctl_unit = unit; - ggioc.gctl_seq = 0; - g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); - } -} - -static void -g_gatehttp_create(void) -{ - struct g_gate_ctl_create ggioc; - - if (!g_gatehttp_connect()) - g_gate_xlog("Cannot connect: %s.", strerror(errno)); - - /* - * Ok, got both sockets, time to create provider. - */ - memset(&ggioc, 0, sizeof(ggioc)); - ggioc.gctl_version = G_GATE_VERSION; - ggioc.gctl_mediasize = mediasize; - ggioc.gctl_sectorsize = sectorsize; - ggioc.gctl_flags = flags; - ggioc.gctl_maxcount = queue_size; - ggioc.gctl_timeout = timeout; - ggioc.gctl_unit = unit; - snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s", url); - g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc); - if (unit == -1) { - printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit); - fflush(stdout); - } - unit = ggioc.gctl_unit; - - mydaemon(); - g_gatehttp_loop(); -} - -static void -g_gatehttp_rescue(void) -{ - struct g_gate_ctl_cancel ggioc; - - if (!g_gatehttp_connect()) - g_gate_xlog("Cannot connect: %s.", strerror(errno)); - - ggioc.gctl_version = G_GATE_VERSION; - ggioc.gctl_unit = unit; - ggioc.gctl_seq = 0; - g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); - - mydaemon(); - g_gatehttp_loop(); -} - -int -main(int argc, char *argv[]) -{ - - if (argc < 2) - usage(); - if (strcasecmp(argv[1], "create") == 0) - action = CREATE; - else if (strcasecmp(argv[1], "destroy") == 0) - action = DESTROY; - else if (strcasecmp(argv[1], "list") == 0) - action = LIST; - else if (strcasecmp(argv[1], "rescue") == 0) - action = RESCUE; - else - usage(); - argc -= 1; - argv += 1; - for (;;) { - int ch; - - ch = getopt(argc, argv, "fo:q:r:s:t:u:v"); - if (ch == -1) - break; - switch (ch) { - case 'f': - if (action != DESTROY) - usage(); - force = 1; - break; - case 'o': - if (action != CREATE && action != RESCUE) - usage(); - if (strcasecmp("ro", optarg) == 0) - flags = G_GATE_FLAG_READONLY; - else if (strcasecmp("wo", optarg) == 0) - flags = G_GATE_FLAG_WRITEONLY; - else if (strcasecmp("rw", optarg) == 0) - flags = 0; - else { - errx(EXIT_FAILURE, - "Invalid argument for '-o' option."); - } - break; - case 'q': - if (action != CREATE) - usage(); - errno = 0; - queue_size = strtoul(optarg, NULL, 10); - if (queue_size == 0 && errno != 0) - errx(EXIT_FAILURE, "Invalid queue_size."); - break; - case 'r': - if (action != CREATE && action != RESCUE) - usage(); - errno = 0; - maxconnections = strtoul(optarg, NULL, 10); - if (maxconnections == 0 && errno != 0) - errx(EXIT_FAILURE, "Invalid queue_size."); - break; - case 's': - if (action != CREATE) - usage(); - errno = 0; - sectorsize = strtoul(optarg, NULL, 10); - if (sectorsize == 0 && errno != 0) - errx(EXIT_FAILURE, "Invalid sectorsize."); - break; - case 't': - if (action != CREATE) - usage(); - errno = 0; - timeout = strtoul(optarg, NULL, 10); - if (timeout == 0 && errno != 0) - errx(EXIT_FAILURE, "Invalid timeout."); - break; - case 'u': - errno = 0; - unit = strtol(optarg, NULL, 10); - if (unit == 0 && errno != 0) - errx(EXIT_FAILURE, "Invalid unit number."); - break; - case 'v': - if (action == DESTROY) - usage(); - g_gate_verbose++; - break; - default: - usage(); - } - } - argc -= optind; - argv += optind; - - switch (action) { - case CREATE: - if (argc != 1) - usage(); - g_gate_load_module(); - g_gate_open_device(); - url = argv[0]; - g_gatehttp_create(); - break; - case DESTROY: - if (unit == -1) { - fprintf(stderr, "Required unit number.\n"); - usage(); - } - g_gate_verbose = 1; - g_gate_open_device(); - g_gate_destroy(unit, force); - break; - case LIST: - g_gate_list(unit, g_gate_verbose); - break; - case RESCUE: - if (argc != 1) - usage(); - if (unit == -1) { - fprintf(stderr, "Required unit number.\n"); - usage(); - } - g_gate_open_device(); - url = argv[0]; - g_gatehttp_rescue(); - break; - case UNSET: - default: - usage(); - } - g_gate_close_device(); - exit(EXIT_SUCCESS); -} diff --git a/ggatessh/Makefile b/ggatessh/Makefile new file mode 100644 index 0000000..20db200 --- /dev/null +++ b/ggatessh/Makefile @@ -0,0 +1,19 @@ +# $FreeBSD$ + +.PATH: ${.CURDIR:H}/shared + +PROG= ggatessh +MAN= ggatessh.8 +SRCS= ggatessh.c ggate.c + +CFLAGS+= -DMAX_SEND_SIZE=32768 +CFLAGS+= -DLIBGEOM +CFLAGS+= -I${.CURDIR:H}/shared +CFLAGS+= -I/home/freebsd/libssh2/include +#CFLAGS+= -O0 -g +#CFLAGS+= -fprofile-instr-generate + +LDFLAGS+= -L/home/freebsd/libssh2/src/.libs +LDADD= -lgeom -lutil -lpthread -lssh2 + +.include diff --git a/ggatehttp/Makefile.depend b/ggatessh/Makefile.depend similarity index 100% rename from ggatehttp/Makefile.depend rename to ggatessh/Makefile.depend diff --git a/ggatehttp/ggatehttp.8 b/ggatessh/ggatessh.8 similarity index 72% rename from ggatehttp/ggatehttp.8 rename to ggatessh/ggatessh.8 index 98302d9..9a9050e 100644 --- a/ggatehttp/ggatehttp.8 +++ b/ggatessh/ggatessh.8 @@ -24,35 +24,37 @@ .\" .\" $FreeBSD$ .\" -.Dd September 8, 2016 -.Dt GGATEC 8 +.Dd October 21, 2020 +.Dt GGATESSH 8 .Os .Sh NAME -.Nm ggatec -.Nd "GEOM Gate network client and control utility" +.Nm ggatessh +.Nd "GEOM Gate SSH/SFTP client and control utility" .Sh SYNOPSIS .Nm .Cm create -.Op Fl n .Op Fl v .Op Fl o Cm ro | wo | rw +.Op Fl F Ar pidfile +.Op Fl i Ar identifyfile +.Op Fl l Ar username .Op Fl p Ar port .Op Fl q Ar queue_size -.Op Fl R Ar rcvbuf -.Op Fl S Ar sndbuf .Op Fl s Ar sectorsize +.Op Fl r Ar nrequests .Op Fl t Ar timeout .Op Fl u Ar unit .Ar host .Ar path .Nm .Cm rescue -.Op Fl n .Op Fl v .Op Fl o Cm ro | wo | rw +.Op Fl F Ar pidfile +.Op Fl i Ar identifyfile +.Op Fl l Ar username .Op Fl p Ar port -.Op Fl R Ar rcvbuf -.Op Fl S Ar sndbuf +.Op Fl r Ar nrequests .Fl u Ar unit .Ar host .Ar path @@ -70,26 +72,28 @@ The utility is a network client for the GEOM Gate class. It is responsible for the creation of .Nm ggate -devices and forwarding I/O requests between the -.Nm GEOM Gate -kernel subsystem and the -.Xr ggated 8 -network daemon. +devices and forwarding I/O requests between the GEOM Gate +kernel subsystem and an +.Xr sftp-server 8 +over SSH. +.Pp Available commands: .Bl -tag -width ".Cm destroy" .It Cm create -Connect to a -.Xr ggated 8 -daemon on the specified host and create a +Connect to an +.Xr sftp-server 8 +via SSH on the specified host and create a .Nm ggate provider for the specified remote file or device. .It Cm rescue Create a new connection after the .Nm -process has died or been killed. +process has died or has been killed. The new connection to the -.Xr ggated 8 -daemon handles pending and future requests. +.Xr sftp-server 8 +handles pending and future requests. +If the remote file has changed in size, the ggate device +will be resized to match the new remote size. .It Cm destroy Destroy the given .Nm ggate @@ -102,14 +106,28 @@ providers. .Pp Available options: .Bl -tag -width ".Fl s Cm ro | wo | rw" +.It Fl F Ar pidfile +Write out the daemon's pid to +.Ar pidfile . +The default is +.Pa /var/run/ggatessh.ggate.pid . .It Fl f Forcibly destroy .Nm ggate provider (cancels all pending requests). -.It Fl n -Do not use -.Dv TCP_NODELAY -option on TCP sockets. +.It Fl i Ar identityfile +The path to the identity file to use for ssh public key authentication. +If the +.Ar identityfile +is not specified, the default of +.Pa $HOME/.ssh/id_rsa +will be used. +.It Fl l Ar username +The user name to authentice to the server with. +If +.Ar username +is not specified, the default will be +.Ev $USER . .It Fl o Cm ro | wo | rw Specify permissions to use when opening the file or device: read-only .Pq Cm ro , @@ -121,17 +139,17 @@ Default is .Cm rw . .It Fl p Ar port Port to connect to on the remote host. -Default is 3080. +Default is 22. .It Fl q Ar queue_size Number of pending I/O requests that can be queued before they will start to be canceled. Default is 1024. -.It Fl R Ar rcvbuf -Size of receive buffer to use. -When not specified, the system default is used. -.It Fl S Ar sndbuf -Size of send buffer to use. -When not specified, the system default is used. +.It Fl r Ar nrequests +Specifies how many requests may be outstanding at a single time. +This determines that maximum number of connections to the +.Xr sftp-server 8 +that will be established at a time. +If unspecified, the default is 32. .It Fl s Ar sectorsize Sector size for .Nm ggate @@ -146,9 +164,9 @@ Unit number to use. Do not fork, run in foreground and print debug information on standard output. .It Ar host -Remote host to connect to. +Remote SSH server to connect to. .It Ar path -Path to a regular file or device. +Path to a regular file or device on the remote host. .El .Sh EXIT STATUS Exit status is 0 on success, or 1 if the command fails. diff --git a/ggatessh/ggatessh.c b/ggatessh/ggatessh.c new file mode 100644 index 0000000..2b1f27c --- /dev/null +++ b/ggatessh/ggatessh.c @@ -0,0 +1,1064 @@ +/*- + * SPDX-License-Identifier: BSD-2-Clause-FreeBSD + * + * Copyright (c) 2004 Pawel Jakub Dawidek + * All rights reserved. + * Copyright 2020 John-Mark Gurney + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * $FreeBSD$ + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +#include "ggate.h" + +static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET; + +struct ggs_connection { + int c_fd; + LIBSSH2_SESSION *c_session; + LIBSSH2_SFTP *c_sftp_session; + LIBSSH2_SFTP_HANDLE *c_handle; +}; + +static struct pidfh *pfh; + +static const char *username; +static const char *hostname; +static const char *imgpath; +static const char *identityfile; +static const char *pubkeyfile; +static const char *sshport = "22"; +static char *ggatessh_pidfile; +static int unit = G_GATE_UNIT_AUTO; +static unsigned flags = 0; +static int force = 0; +static unsigned queue_size = G_GATE_QUEUE_SIZE; +static off_t mediasize; +static unsigned sectorsize = 4096; +static unsigned timeout = G_GATE_TIMEOUT; +static int pushfd, popfd; /* work semaphore */ +static pthread_t reqtd, proctd, mediatd; +static unsigned maxconnections = 32; +static struct ggs_connection start_conn; /* only used once/first */ + +struct ggs_sess_cache { + LIBSSH2_SESSION *sc_ssh_session; + LIBSSH2_SFTP *sc_session; + LIBSSH2_SFTP_HANDLE *sc_handle; + TAILQ_ENTRY(ggs_sess_cache) sc_next; +}; + +struct ggs_req { + struct g_gate_ctl_io r_ggio; +#define r_ssh_session r_sesscache->sc_ssh_session +#define r_session r_sesscache->sc_session +#define r_handle r_sesscache->sc_handle + struct ggs_sess_cache *r_sesscache; + size_t r_bufoff; + int r_didseek; + TAILQ_ENTRY(ggs_req) r_next; +}; + +static TAILQ_HEAD(ggs_reqqueue, ggs_req) procqueue = TAILQ_HEAD_INITIALIZER(procqueue); +static TAILQ_HEAD(ggs_sessqueue, ggs_sess_cache) session_cache = TAILQ_HEAD_INITIALIZER(session_cache); +static sem_t nconn_sem; +static pthread_mutex_t procqueue_mtx; + +static void +usage(void) +{ + + fprintf(stderr, "usage: %s create [-v] [-o ] " + "[-F pidfile] [-i identifyfile] " + "[-l username] [-p port] " + "[-q queue_size] [-s sectorsize] [-r nrequests] " + "[-t timeout] [-u unit] \n", getprogname()); + fprintf(stderr, " %s rescue [-v] [-o ] " + "[-F pidfile] [-i identifyfile] " + "[-l username] [-p port] " + "[-r nrequests] <-u unit> \n", getprogname()); + fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname()); + fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname()); + exit(EXIT_FAILURE); +} + +static void +libssh2_errorx(LIBSSH2_SESSION *session, const char *info) +{ + char *errmsg; + int rc; + + rc = libssh2_session_last_error(session, &errmsg, NULL, 0); + g_gate_xlog("%s: %s", info, errmsg); +} + +/* + * Connect to the service (or port number) on host. + * + * Somewhat copied from freebsd/lib/libfetch/fetch_common.c:fetch_connect. + */ +static int +tcp_connect(const char *host, const char *service, int af) +{ + struct addrinfo hints, *sai, *sai0; + int err, sd; + + hints = (struct addrinfo){ + .ai_family = af, + .ai_socktype = SOCK_STREAM, + .ai_flags = AI_ADDRCONFIG, + }; + + /* resolve server address */ + if (getaddrinfo(host, service, &hints, &sai0) == -1) + return -1; + + if (sai0 == NULL) { + errno = ENOENT; + return -1; + } + + sd = -1; + err = -1; + /* try each server address in turn */ + for (sai = sai0; sai != NULL; sai = sai->ai_next) { + /* open socket */ + if ((sd = socket(sai->ai_family, sai->ai_socktype, + sai->ai_protocol)) == -1) + break; + + /* attempt to connect to server address */ + if ((err = connect(sd, sai->ai_addr, sai->ai_addrlen)) == 0) + break; + + /* clean up before next attempt */ + close(sd); + sd = -1; + } + + err = errno; + + fflush(stdout); + + freeaddrinfo(sai0); + + /* Fully close if it was opened; otherwise just don't leak the fd. */ + if (err == -1 && sd >= 0) + close(sd); + + errno = err; + + return sd; +} + +static struct ggs_connection +make_connection(void) +{ + LIBSSH2_SESSION *session; + LIBSSH2_SFTP *sftp_session; + LIBSSH2_SFTP_HANDLE *handle; + char *tmp; + int sockfd; + int rc; + + sockfd = tcp_connect(hostname, sshport, 0); + if (sockfd == -1) { + if (errno == ENOENT) + g_gate_xlog("tcp_connect: failed to lookup %s", hostname); + g_gate_xlog("tcp_connect: %s.", strerror(errno)); + } + + + session = libssh2_session_init(); + if (session == NULL) + libssh2_errorx(session, "libssh2_session_init"); + + if (g_gate_verbose) { + //libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_TRANS|LIBSSH2_TRACE_KEX|LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY); + libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_KEX|LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY); + //libssh2_trace(session, LIBSSH2_TRACE_KEX|LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY); + } + + /* XXX - libssh2_session_flag to enable compression */ + + rc = libssh2_session_handshake(session, sockfd); + if (rc) + libssh2_errorx(session, "libssh2_session_handshake"); + + libssh2_session_set_blocking(session, 1); + + /* XXX - known hosts handling */ + + if (identityfile == NULL) { + tmp = NULL; + asprintf(&tmp, "%s/.ssh/id_rsa", getenv("HOME")); + identityfile = tmp; + tmp = NULL; + } + + asprintf(&tmp, "%s.pub", identityfile); + pubkeyfile = tmp; + tmp = NULL; + + g_gate_log(LOG_DEBUG, "trying identity file: %s", identityfile); + + rc = libssh2_userauth_publickey_fromfile(session, username, pubkeyfile, identityfile, NULL); + //rc = libssh2_userauth_password(session, "freebsd", "freebsd"); + if (rc) { + g_gate_log(LOG_ERR, "identity file: %s", identityfile); + libssh2_errorx(session, "libssh2_userauth_publickey_fromfile"); + } + + /* always need at least one */ + sftp_session = libssh2_sftp_init(session); + + if (sftp_session == NULL) + g_gate_xlog("libssh2_sftp_init"); + + handle = libssh2_sftp_open(sftp_session, imgpath, LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE, 0); + if (handle == NULL) { + g_gate_log(LOG_ERR, "image file: %s", imgpath); + libssh2_errorx(session, "libssh2_sftp_open"); + } + + return (struct ggs_connection){ + .c_fd = sockfd, + .c_session = session, + .c_sftp_session = sftp_session, + .c_handle = handle, + }; +} + +/* + * Resize is required to run in a thread because the resize requires + * that I/O is able to complete before it can return. + */ +static void * +mediachg(void *arg __unused) +{ + struct g_gate_ctl_modify ggiom; + + /* update mediasize, it may have changed */ + ggiom = (struct g_gate_ctl_modify){ + .gctl_version = G_GATE_VERSION, + .gctl_unit = unit, + .gctl_modify = GG_MODIFY_MEDIASIZE, + .gctl_mediasize = mediasize, + }; + g_gate_ioctl(G_GATE_CMD_MODIFY, &ggiom); + g_gate_log(LOG_DEBUG, "updated ggate%d mediasize to %zd", unit, mediasize); + + return NULL; +} + +static void * +req_thread(void *arg __unused) +{ + struct ggs_req *greq; + static char *buf; + int buflen = 1024*1024; + int error; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); + + greq = NULL; + + for (;;) { + if (greq == NULL) + greq = malloc(sizeof *greq); + + if (buf == NULL) + buf = malloc(buflen); + + if (greq == NULL || buf == NULL) { + /* XXX */ + g_gate_log(LOG_ERR, "Unable to allocate memory."); + exit(1); + } + + *greq = (struct ggs_req){ + .r_ggio = (struct g_gate_ctl_io){ + .gctl_version = G_GATE_VERSION, + .gctl_unit = unit, + .gctl_data = buf, + .gctl_length = buflen, + .gctl_error = 0, + }, + }; + + //g_gate_log(LOG_DEBUG, "waiting for ioctl"); + g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio); + //g_gate_log(LOG_DEBUG, "got ioctl"); + + error = greq->r_ggio.gctl_error; + switch (error) { + case 0: + break; + case ECANCELED: + /* Exit gracefully. */ + g_gate_close_device(); + exit(EXIT_SUCCESS); + case ENXIO: + default: + g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME, + strerror(error)); + } + + g_gate_log(LOG_DEBUG, "ggio(%p), ver: %u, unit: %d, seq: %llu, " + "cmd: %u, offset: %llu, len: %llu", greq, + greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit, + greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd, + greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length); + + switch (greq->r_ggio.gctl_cmd) { + case BIO_READ: + /* use a correctly sized allocation */ + greq->r_ggio.gctl_data = + malloc(greq->r_ggio.gctl_length); + break; + + case BIO_WRITE: + /* r_ggio takes ownership of buf now */ + buf = NULL; + break; + + case BIO_FLUSH: + greq->r_ggio.gctl_data = NULL; + break; + + case BIO_DELETE: + default: + greq->r_ggio.gctl_error = EOPNOTSUPP; + g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio); + continue; /* return EOPNOTSUPP */ + break; + } + + //g_gate_log(LOG_DEBUG, "waiting for slot"); + sem_wait(&nconn_sem); +#if 0 + int semval; + sem_getvalue(&nconn_sem, &semval); + g_gate_log(LOG_DEBUG, "slots: %d", semval); +#endif + + error = pthread_mutex_lock(&procqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&procqueue, greq, r_next); + error = pthread_mutex_unlock(&procqueue_mtx); + assert(error == 0); + + /* notify processing thread a request is waiting */ + error = write(pushfd, "T", 1); + if (error != 1) + g_gate_xlog("write pushfd: %d, error: %s.", error, + strerror(error)); + + /* pass ownership */ + greq = NULL; + } + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + return (NULL); +} + +static int +process_pending(struct ggs_reqqueue *req_pending, struct ggs_sessqueue *sessqueue) +{ + struct ggs_req *greq, *greq2; + char *errmsg; + int rc; + int didwork; + + didwork = 0; + + /* Work on each pending request */ + TAILQ_FOREACH_SAFE(greq, req_pending, r_next, greq2) { +again: + switch (greq->r_ggio.gctl_cmd) { + case BIO_READ: + g_gate_log(LOG_DEBUG, "sftp_read(%p): %d(%d), rem: %d", greq, greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length, greq->r_ggio.gctl_length - greq->r_bufoff); + if (greq->r_didseek == 0) { + libssh2_sftp_seek64(greq->r_handle, greq->r_ggio.gctl_offset); + greq->r_didseek = 1; + } + rc = libssh2_sftp_read(greq->r_handle, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, greq->r_ggio.gctl_length - greq->r_bufoff); + g_gate_log(LOG_DEBUG, "sftp_read ret: %d", rc); + if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) + g_gate_log(LOG_ERR, "libssh2_sftp_read"); + break; + + case BIO_WRITE: + g_gate_log(LOG_DEBUG, "sftp_write(%p): %d(%d), rem: %d", greq, greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length, greq->r_ggio.gctl_length - greq->r_bufoff); + if (greq->r_didseek == 0) { + libssh2_sftp_seek64(greq->r_handle, greq->r_ggio.gctl_offset); + greq->r_didseek = 1; + } + rc = libssh2_sftp_write(greq->r_handle, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, greq->r_ggio.gctl_length - greq->r_bufoff); + g_gate_log(LOG_DEBUG, "sftp_write ret: %d", rc); + if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) + libssh2_errorx(greq->r_ssh_session, "libssh2_sftp_write"); + break; + + case BIO_FLUSH: + g_gate_log(LOG_DEBUG, "sftp_flush(%p)", greq); + rc = libssh2_sftp_fsync(greq->r_handle); + + didwork = 1; /* assume this always does work */ + switch (rc) { + case LIBSSH2_ERROR_SFTP_PROTOCOL: + greq->r_ggio.gctl_error = EOPNOTSUPP; + goto completeio; + + case LIBSSH2_ERROR_EAGAIN: + continue; + + case 0: /* success */ + goto completeio; + + default: + libssh2_session_last_error(greq->r_ssh_session, &errmsg, NULL, 0); + g_gate_log(LOG_ERR, "sftp_flush(%p) ret %d: %s", greq, rc, errmsg); + greq->r_ggio.gctl_error = EIO; + goto completeio; + } + /* NOTREACHABLE */ + break; + + default: + rc = 0; + g_gate_log(LOG_ERR, "unhandled op: %d", greq->r_ggio.gctl_cmd); + continue; + } + + if (rc > 0) { + didwork = 1; + greq->r_bufoff += rc; + + /* try again on partial read/write, might have more data pending */ + if ((off_t)greq->r_bufoff != greq->r_ggio.gctl_length) + goto again; + } + + if ((off_t)greq->r_bufoff == greq->r_ggio.gctl_length) { + /* complete */ +completeio: + g_gate_log(LOG_DEBUG, "cmd complete: seq: %d, cmd: %d", greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd); + g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio); + TAILQ_REMOVE(req_pending, greq, r_next); + TAILQ_INSERT_HEAD(sessqueue, greq->r_sesscache, sc_next); + free(greq->r_ggio.gctl_data); + free(greq); + + /* release this slot */ + sem_post(&nconn_sem); + } + } + + return didwork; +} + +/* + * sftp session management is a bit tricky. + * if there is an entry in sessioncache, use that one. + * if we are waiting for a new session (gsc_pend != NULL), + * establish session, then open handle + * when the new session completes, process the work queue + */ +static void * +proc_thread(void *arg __unused) +{ + char scratch[32]; + struct ggs_reqqueue req_pending; + struct timeval to; + struct ggs_sess_cache *gsc, *gsc_pending; + struct ggs_req *greq; + LIBSSH2_SESSION *session; + fd_set fdread; + fd_set fdwrite; + fd_set fdexcep; + int sockfd; + int maxfd; + int error; + int dir; + int rc; + int didwork; /* did libssh2 do any work? */ + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); + + TAILQ_INIT(&req_pending); + + /* make sure we don't block on reading */ + fcntl(popfd, F_SETFL, O_NONBLOCK); + + sockfd = start_conn.c_fd; + session = start_conn.c_session; + + gsc = malloc(sizeof *gsc); + gsc->sc_ssh_session = start_conn.c_session; + gsc->sc_session = start_conn.c_sftp_session; + gsc->sc_handle = start_conn.c_handle; + + TAILQ_INSERT_HEAD(&session_cache, gsc, sc_next); + gsc = NULL; + gsc_pending = NULL; + + didwork = 0; + + libssh2_session_set_blocking(session, 0); + + for (;;) { + //g_gate_log(LOG_DEBUG, "looping"); + + if (!didwork) { + /* setup polling loop */ + maxfd = -1; + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); + + dir = libssh2_session_block_directions(session); + if (dir & LIBSSH2_SESSION_BLOCK_INBOUND || gsc_pending != NULL) + FD_SET(sockfd, &fdread); + if (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) + FD_SET(sockfd, &fdwrite); + + /* add in the pop descriptor */ + FD_SET(popfd, &fdread); + maxfd = MAX(popfd, sockfd); + + #if 0 + /* we need to be kj */ + if (gsc_pending != NULL) + FD_SET(sockfd, &fdread); + #endif + + g_gate_log(LOG_DEBUG, "selecting: %s %s, read: sockfd: %d, popfd: %d, write: sockfd: %d", (dir & LIBSSH2_SESSION_BLOCK_INBOUND) ? "inbound" : "", (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) ? "outbound" : "", FD_ISSET(sockfd, &fdread), FD_ISSET(popfd, &fdread), FD_ISSET(sockfd, &fdwrite)); + to = (struct timeval){ .tv_sec = 1, .tv_usec = 1000 }; + (void)to; + rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, NULL); + switch (rc) { + case -1: + g_gate_log(LOG_ERR, "%s: select failed: %s", __func__, + strerror(errno)); + break; + case 0: + default: + g_gate_log(LOG_DEBUG, "select: %d, read: sockfd: %d, popfd: %d, write: sockfd: %d", rc, FD_ISSET(sockfd, &fdread), FD_ISSET(popfd, &fdread), FD_ISSET(sockfd, &fdwrite)); + break; + } + } + + didwork = 0; + + /* process pending, so any completed can be reused */ + didwork |= process_pending(&req_pending, &session_cache); + + if (FD_ISSET(popfd, &fdread)) { + /* read off the tokens */ + g_gate_log(LOG_DEBUG, "popping"); + read(popfd, scratch, sizeof scratch); + + for (;;) { +procreq: + /* get the request */ + error = pthread_mutex_lock(&procqueue_mtx); + assert(error == 0); + greq = TAILQ_FIRST(&procqueue); + g_gate_log(LOG_DEBUG, "greq: %p", greq); + if (greq != NULL) + TAILQ_REMOVE(&procqueue, greq, r_next); + error = pthread_mutex_unlock(&procqueue_mtx); + assert(error == 0); + + /* no more to process */ + if (greq == NULL) + break; + + gsc = TAILQ_FIRST(&session_cache); + if (gsc == NULL) { + if (gsc_pending == NULL) { + /* need new session */ + g_gate_log(LOG_DEBUG, "need new session"); + gsc_pending = malloc(sizeof *gsc); + gsc_pending->sc_ssh_session = session; + gsc_pending->sc_session = NULL; + gsc_pending->sc_handle = NULL; + } + + /* put back request */ + error = pthread_mutex_lock(&procqueue_mtx); + assert(error == 0); + TAILQ_INSERT_HEAD(&procqueue, greq, r_next); + error = pthread_mutex_unlock(&procqueue_mtx); + assert(error == 0); + + break; + } else { + /* process request */ + TAILQ_REMOVE(&session_cache, gsc, sc_next); + greq->r_sesscache = gsc; + gsc = NULL; + + greq->r_bufoff = 0; + + TAILQ_INSERT_TAIL(&req_pending, greq, r_next); + + greq = NULL; + } + } + } + + if (gsc_pending != NULL) { + /* we are creating a new session */ + if (gsc_pending->sc_session == NULL) { + didwork = 1; + gsc_pending->sc_session = libssh2_sftp_init(session); + } + + if (gsc_pending->sc_session != NULL) { + didwork = 1; + gsc_pending->sc_handle = libssh2_sftp_open(gsc_pending->sc_session, "fstest/data.img", LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE, 0); + } + + g_gate_log(LOG_DEBUG, "pending: session: %p, handle: %p", gsc_pending->sc_session, gsc_pending->sc_handle); + + /* we have a fully initalized entry, use it */ + if (gsc_pending->sc_handle != NULL) { + g_gate_log(LOG_DEBUG, "new session created"); + TAILQ_INSERT_HEAD(&session_cache, gsc_pending, sc_next); + gsc_pending = NULL; + didwork = 1; + goto procreq; + } + } + + /* kick of any queued requests from above */ + didwork |= process_pending(&req_pending, &session_cache); + } + + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + pthread_exit(NULL); +} + +static void +mydaemon(void) +{ + + if (g_gate_verbose > 0) + return; + if (daemon(0, 0) == 0) + return; + if (action == CREATE) + g_gate_destroy(unit, 1); + err(EXIT_FAILURE, "Cannot daemonize"); +} + +static int +g_gatessh_connect(void) +{ + struct ggs_connection conn; + LIBSSH2_SFTP_ATTRIBUTES attrs; + int rc; + + /* get the remote's size */ + conn = make_connection(); + + rc = libssh2_sftp_fstat(conn.c_handle, &attrs); + + /* only allow regular and char devices */ + if (!(LIBSSH2_SFTP_S_ISREG(attrs.flags) || + !LIBSSH2_SFTP_S_ISCHR(attrs.flags))) { + g_gate_xlog("remote file not a regular file"); + } + + mediasize = attrs.filesize; + g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize); + + start_conn = conn; /* cache to use later */ + + return 1; +} + +static void +g_gatessh_start(void) +{ + int filedes[2]; + int error; + + pipe(filedes); + pushfd = filedes[1]; + popfd = filedes[0]; + + error = pthread_mutex_init(&procqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", + strerror(error)); + } + + sem_init(&nconn_sem, 0, maxconnections); + + error = pthread_create(&proctd, NULL, proc_thread, NULL); + if (error != 0) { + g_gate_destroy(unit, 1); /* XXX - remove */ + g_gate_xlog("pthread_create(proc_thread): %s.", + strerror(error)); + } + pthread_set_name_np(proctd, "proc"); + + reqtd = pthread_self(); + pthread_set_name_np(reqtd, "req"); + req_thread(NULL); + + /* Disconnected. */ + close(pushfd); + close(popfd); +} + +static void +signop(int sig __unused) +{ + + /* Do nothing. */ +} + +static void +g_gatessh_loop(void) +{ + struct g_gate_ctl_cancel ggioc; + + signal(SIGUSR1, signop); + for (;;) { + g_gatessh_start(); + g_gate_log(LOG_NOTICE, "Disconnected [%s@%s:%s]. Connecting...", + username, hostname, imgpath); + + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + } +} + +static void +g_gatessh_create(void) +{ + struct g_gate_ctl_create ggioc; + + if (!g_gatessh_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + /* + * Ok, got both sockets, time to create provider. + */ + memset(&ggioc, 0, sizeof(ggioc)); + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_mediasize = mediasize; + ggioc.gctl_sectorsize = sectorsize; + ggioc.gctl_flags = flags; + ggioc.gctl_maxcount = queue_size; + ggioc.gctl_timeout = timeout; + ggioc.gctl_unit = unit; + snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s@%s:%s", username, hostname, imgpath); + g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc); + if (unit == -1) { + printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit); + fflush(stdout); + } + unit = ggioc.gctl_unit; + + ggatessh_makepidfile(); + + mydaemon(); + if (pfh != NULL) + pidfile_write(pfh); + g_gatessh_loop(); +} + +static void +g_gatessh_rescue(void) +{ + struct g_gate_ctl_cancel ggioc; + int error; + + g_gate_log(LOG_ERR, "a"); + if (!g_gatessh_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + g_gate_log(LOG_ERR, "b"); + ggioc = (struct g_gate_ctl_cancel){ + .gctl_version = G_GATE_VERSION, + .gctl_unit = unit, + .gctl_seq = 0, + }; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + + ggatessh_makepidfile(); + + mydaemon(); + pidfile_write(pfh); + + error = pthread_create(&mediatd, NULL, mediachg, NULL); + if (error != 0) + g_gate_xlog("unable to create mediasize change thread", strerror(errno)); + + g_gatessh_loop(); +} + +/* + * handle two methods of specifying things. + * if only one arg, in the future split it how ssh does: + * [user[:password]@]: + * and URI: + * sftp://[user[:password]@][:]/ + * + * If both are specified, it's the same as above, but split + * on the :. + */ +static void +handle_params(int argc, char *argv[]) +{ + + if (username == NULL) { + username = getenv("USER"); + if (username == NULL) { + err(EXIT_FAILURE, + "USER environment variable not present, set, " + "or specify via -l argument."); + } + } + + if (argc != 2) + usage(); + + hostname = argv[0]; + imgpath = argv[1]; +} + +void +ggatessh_makepidfile(void) +{ + + if (!g_gate_verbose) { + if (ggatessh_pidfile == NULL) { + asprintf(&ggatessh_pidfile, _PATH_VARRUN "/ggatessh.ggate%d.pid", unit); + err(EXIT_FAILURE, "Cannot allocate memory for pidfile"); + } + pfh = pidfile_open(ggatessh_pidfile, 0600, &otherpid); + if (pfh == NULL) { + if (errno == EEXIST) { + errx(EXIT_FAILURE, "Daemon already running, pid: %jd.", + (intmax_t)otherpid); + } + err(EXIT_FAILURE, "Cannot open/create pidfile"); + } + } +} + +int +main(int argc, char *argv[]) +{ + pid_t otherpid; + int rc; + + if (argc < 2) + usage(); + if (strcasecmp(argv[1], "create") == 0) + action = CREATE; + else if (strcasecmp(argv[1], "destroy") == 0) + action = DESTROY; + else if (strcasecmp(argv[1], "list") == 0) + action = LIST; + else if (strcasecmp(argv[1], "rescue") == 0) + action = RESCUE; + else + usage(); + + argc -= 1; + argv += 1; + for (;;) { + int ch; + + ch = getopt(argc, argv, "fF:i:l:o:p:q:r:s:t:u:v"); + if (ch == -1) + break; + switch (ch) { + case 'f': + if (action != DESTROY) + usage(); + force = 1; + break; + + case 'F': + ggatessh_pidfile = optarg; + break; + + case 'i': + identityfile = optarg; + break; + + case 'l': + username = optarg; + break; + + case 'o': + if (action != CREATE && action != RESCUE) + usage(); + if (strcasecmp("ro", optarg) == 0) + flags = G_GATE_FLAG_READONLY; + else if (strcasecmp("wo", optarg) == 0) + flags = G_GATE_FLAG_WRITEONLY; + else if (strcasecmp("rw", optarg) == 0) + flags = 0; + else { + errx(EXIT_FAILURE, + "Invalid argument for '-o' option."); + } + break; + + case 'p': + sshport = optarg; + break; + + case 'q': + if (action != CREATE) + usage(); + errno = 0; + queue_size = strtoul(optarg, NULL, 10); + if (queue_size == 0 && errno != 0) + errx(EXIT_FAILURE, "Invalid queue_size."); + break; + case 'r': + if (action != CREATE && action != RESCUE) + usage(); + errno = 0; + maxconnections = strtoul(optarg, NULL, 10); + if (maxconnections == 0 && errno != 0) + errx(EXIT_FAILURE, "Invalid queue_size."); + break; + case 's': + if (action != CREATE) + usage(); + errno = 0; + sectorsize = strtoul(optarg, NULL, 10); + if (sectorsize == 0 && errno != 0) + errx(EXIT_FAILURE, "Invalid sectorsize."); + break; + case 't': + if (action != CREATE) + usage(); + errno = 0; + timeout = strtoul(optarg, NULL, 10); + if (timeout == 0 && errno != 0) + errx(EXIT_FAILURE, "Invalid timeout."); + break; + case 'u': + errno = 0; + unit = strtol(optarg, NULL, 10); + if (unit == 0 && errno != 0) + errx(EXIT_FAILURE, "Invalid unit number."); + break; + case 'v': + if (action == DESTROY) + usage(); + g_gate_verbose++; + break; + default: + usage(); + } + } + argc -= optind; + argv += optind; + + g_gate_log(LOG_DEBUG, "libssh2_init"); + rc = libssh2_init(0); + if (rc != 0) { + fprintf(stderr, "libssh2 initialization failed (%d)\n", rc); + return 1; + } + + switch (action) { + case CREATE: + if (argc < 1 || argc > 2) + usage(); + handle_params(argc, argv); + g_gate_load_module(); + g_gate_open_device(); + g_gatessh_create(); + break; + case DESTROY: + if (argc != 0) + usage(); + if (unit == -1) { + fprintf(stderr, "Required unit number.\n"); + usage(); + } + g_gate_verbose = 1; + g_gate_open_device(); + g_gate_destroy(unit, force); + break; + case LIST: + g_gate_list(unit, g_gate_verbose); + break; + case RESCUE: + if (argc < 1 || argc > 2) + usage(); + if (unit == -1) { + fprintf(stderr, "Required unit number.\n"); + usage(); + } + handle_params(argc, argv); + g_gate_open_device(); + g_gatessh_rescue(); + break; + case UNSET: + default: + usage(); + } + g_gate_close_device(); + exit(EXIT_SUCCESS); +} diff --git a/tests/Makefile b/tests/Makefile index 2506729..0fa31a1 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -4,6 +4,6 @@ PACKAGE= tests TESTSDIR= ${TESTSBASE}/sys/geom/class/gate -ATF_TESTS_SH+= ggatehttp_test +ATF_TESTS_SH+= ggatessh_test .include diff --git a/tests/ggatehttp_test.sh b/tests/ggatehttp_test.sh deleted file mode 100644 index 645dba0..0000000 --- a/tests/ggatehttp_test.sh +++ /dev/null @@ -1,151 +0,0 @@ -# $FreeBSD$ - -PIDFILE=ggated.pid -TESTURL="$GGATEHTTP_URL" -TEMPFILE="random.data" - -atf_test_case ggatehttp cleanup -ggatehttp_head() -{ - atf_set "descr" "ggatehttp can proxy to http" - atf_set "require.progs" "ggatehttp" - atf_set "require.user" "root" - atf_set "timeout" 10 -} - -ggatehttp_body() -{ - load_ggate - us=$(alloc_ggate_dev) - src=$(alloc_md) - - n1mchunks=10 - - atf_check -e ignore -o ignore \ - dd if=/dev/random of="$TEMPFILE" bs=1m count=$n1mchunks conv=notrunc - - atf_check ggatehttp create -u $us "$TESTURL" - - ggate_dev=/dev/ggate${us} - - wait_for_ggate_device ${ggate_dev} - - # Test writing - atf_check -e ignore -o ignore \ - dd if="$TEMPFILE" of=${ggate_dev} bs=1m count=$n1mchunks conv=notrunc - - # Test reading - atf_check -e ignore -o ignore \ - dd of="$TEMPFILE"2 if=${ggate_dev} bs=1m count=$n1mchunks conv=notrunc - - ls -l "$TEMPFILE" "$TEMPFILE"2 - - # Verify that we read what we wrote - atf_check cmp "$TEMPFILE" "$TEMPFILE"2 - - rm "$TEMPFILE" "$TEMPFILE"2 -} - -ggatehttp_cleanup() -{ - common_cleanup -} - -atf_init_test_cases() -{ - atf_add_test_case ggatehttp -} - -alloc_ggate_dev() -{ - local us - - us=0 - while [ -c /dev/ggate${us} ]; do - : $(( us += 1 )) - done - echo ${us} > ggate.devs - echo ${us} -} - -alloc_md() -{ - local md - - md=$(mdconfig -a -t malloc -s 1M) || \ - atf_fail "failed to allocate md device" - echo ${md} >> md.devs - echo ${md} -} - -checksum() -{ - local src work - src=$1 - work=$2 - - src_checksum=$(md5 -q $src) - work_checksum=$(md5 -q $work) - - if [ "$work_checksum" != "$src_checksum" ]; then - atf_fail "work md5 checksum didn't match" - fi - - ggate_checksum=$(md5 -q /dev/ggate${us}) - if [ "$ggate_checksum" != "$src_checksum" ]; then - atf_fail "ggate md5 checksum didn't match" - fi -} - -common_cleanup() -{ - if [ -f "ggate.devs" ]; then - while read test_ggate; do - ggatec destroy -f -u $test_ggate >/dev/null - done < ggate.devs - rm ggate.devs - fi - - if [ -f "$PIDFILE" ]; then - pkill -F "$PIDFILE" - rm $PIDFILE - fi - - if [ -f "PLAINFILES" ]; then - while read f; do - rm -f ${f} - done < ${PLAINFILES} - rm ${PLAINFILES} - fi - - if [ -f "md.devs" ]; then - while read test_md; do - mdconfig -d -u $test_md 2>/dev/null - done < md.devs - rm md.devs - fi - true -} - -load_ggate() -{ - local class=gate - - # If the geom class isn't already loaded, try loading it. - if ! kldstat -q -m g_${class}; then - if ! geom ${class} load; then - atf_skip "could not load module for geom class=${class}" - fi - fi -} - -# Bug 204616: ggatel(8) creates /dev/ggate* asynchronously if `ggatel create` -# isn't called with `-v`. -wait_for_ggate_device() -{ - ggate_device=$1 - - while [ ! -c $ggate_device ]; do - sleep 0.5 - done -} diff --git a/tests/ggatessh_test.sh b/tests/ggatessh_test.sh new file mode 100644 index 0000000..75dcb9e --- /dev/null +++ b/tests/ggatessh_test.sh @@ -0,0 +1,288 @@ +# $FreeBSD$ + +PIDFILE=ggatessh.pid +TESTIMG="test.img" +TEMPFILE="random.data" +SFTPSERVER="/usr/libexec/sftp-server" +PORT=2222 + +atf_test_case ggatessh cleanup +ggatessh_head() +{ + atf_set "descr" "ggatessh can proxy to sftp" + atf_set "require.progs" "ggatessh" + atf_set "require.user" "root" + atf_set "timeout" 20 +} + +ggatessh_body() +{ + load_ggate + us=$(alloc_ggate_dev) + + n1mchunks=3 + secsize=4096 + + atf_check -e ignore -o ignore \ + dd if=/dev/random of="$TEMPFILE" bs=1m count=$n1mchunks conv=notrunc + + startup_sshd + + truncate -s ${n1mchunks}m "$TESTIMG" + # sshd authenticates and switches to USER + chown "$USER" "$TESTIMG" + + echo 'WARNING: ggatessh error messages goes to syslog (aka /var/log/messages)' + + atf_check ggatessh create -i "$(pwd)/id_rsa" -p "$PORT" -F "$PIDFILE" -u $us -l "$USER" 127.0.0.1 "$(pwd)/$TESTIMG" + + ggate_dev=/dev/ggate${us} + + wait_for_ggate_device ${ggate_dev} + + # make sure it has correct size and sector sizekj + read _dev _secsize _size _nsecs _stripesize _stripeoff < ggate.devs + echo ${us} +} + +alloc_md() +{ + local md + + md=$(mdconfig -a -t malloc -s 1M) || \ + atf_fail "failed to allocate md device" + echo ${md} >> md.devs + echo ${md} +} + +# slightly modified from: +# https://serverfault.com/questions/344295/is-it-possible-to-run-sshd-as-a-normal-user +startup_sshd() +{ + # Host keys + ssh-keygen -f ssh_host_rsa_key -N '' -t rsa > /dev/null + + # user key + ssh-keygen -f id_rsa -N '' -t rsa > /dev/null + + (echo -n 'command="/usr/libexec/sftp-server" '; cat id_rsa.pub) > authorized_keys + + cat > sshd_config </dev/null + done < ggate.devs + rm ggate.devs + fi + + if [ -f "sshd.pid" ]; then + pkill -F sshd.pid + # clean up after startup_sshd + rm ssh_host_rsa_key + rm id_rsa id_rsa.pub + rm authorized_keys + fi + + if [ -f "$PIDFILE" ]; then + pkill -F "$PIDFILE" + rm $PIDFILE + fi + + if [ -f "PLAINFILES" ]; then + while read f; do + rm -f ${f} + done < ${PLAINFILES} + rm ${PLAINFILES} + fi + + if [ -f "md.devs" ]; then + while read test_md; do + mdconfig -d -u $test_md 2>/dev/null + done < md.devs + rm md.devs + fi + true +} + +load_ggate() +{ + local class=gate + + # If the geom class isn't already loaded, try loading it. + if ! kldstat -q -m g_${class}; then + if ! geom ${class} load; then + atf_skip "could not load module for geom class=${class}" + fi + fi +} + +# Bug 204616: ggatel(8) creates /dev/ggate* asynchronously if `ggatel create` +# isn't called with `-v`. +wait_for_ggate_device() +{ + ggate_device=$1 + + while [ ! -c $ggate_device ]; do + sleep 0.5 + done +}