diff --git a/src/pfs_core/CMakeLists.txt b/src/pfs_core/CMakeLists.txt index e7dd221..294f2d9 100644 --- a/src/pfs_core/CMakeLists.txt +++ b/src/pfs_core/CMakeLists.txt @@ -47,6 +47,7 @@ set(SRC_LIST pfs_version.cc pfs_chunk.cc pfs_avl.cc + pfs_fstream.cc ) # lib pfs diff --git a/src/pfs_core/pfs_api.cc b/src/pfs_core/pfs_api.cc index 1e72bf7..c128be0 100644 --- a/src/pfs_core/pfs_api.cc +++ b/src/pfs_core/pfs_api.cc @@ -25,6 +25,7 @@ #include #include "pfs_api.h" +#include "pfs_fstream.h" #include "pfs_mount.h" #include "pfs_file.h" #include "pfs_dir.h" @@ -126,6 +127,12 @@ pthread_mutex_t rename_mtx; #define PATH_ARG(path) \ (path) ? (path) : "NULL" +#define PFS_STRM_MAKE(stream) \ + (FILE *)((uint64_t)(stream) | (uint64_t)(0x03)) + +#define PFS_STRM_RAW(stream) \ + (FILE *)((uint64_t)(stream) & ~(uint64_t)(0x03)) + #define API_ENTER(level, fmt, ...) do { \ if (err != 0 && err != -EAGAIN) { \ pfs_etrace("%s invalid args(" fmt ")\n", \ @@ -167,6 +174,7 @@ static int error_number[] = { EROFS, EBUSY, ERANGE, + ENOTSUP, 0, /* sentinel */ }; @@ -596,6 +604,15 @@ _pfs_setxattr(const char *pbdpath, const char *name, const void *value, return err; } +static int +_pfs_mkstemp(char *tmpl) +{ + int fd; + + fd = gen_tempname(tmpl, 0, 0, PFS_INODET_FILE); + return fd; +} + static int _pfs_fmap(int fd, fmap_entry_t *fmapv, int count) { @@ -1276,6 +1293,27 @@ pfs_setxattr(const char *pbdpath, const char *name, const void *value, return 0; } +int +pfs_mkstemp(char *tmpl) +{ + int err = -EAGAIN; + int fd = -1; + + if (!tmpl) + err = -EINVAL; + API_ENTER(DEBUG, "%s", PATH_ARG(tmpl)); + + while (err == -EAGAIN) { + fd = _pfs_mkstemp(tmpl); + err = fd < 0 ? fd : 0; + } + + API_EXIT(err); + if (err < 0) + return -1; + return fd; +} + int pfs_fmap(int fd, fmap_entry_t *fmapv, int count) { @@ -1605,3 +1643,384 @@ pfs_du(const char *pbdpath, int all, int depth, pfs_printer_t *printer) return -1; return 0; } + +static int +_pfs_fopen(const char *pbdpath, const char *mode, FILE **streamp) +{ + pfs_fstrm_t **fstrmp = (pfs_fstrm_t **)streamp; + int err; + + err = pfs_fstrm_open(pbdpath, mode, fstrmp); + return err; +} + +static int +_pfs_fclose(FILE *stream) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xclose(fstrm); + return err; +} + +static int +_pfs_fgetc(FILE *stream, char *c) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + size_t nitem; + int err; + + err = pfs_fstrm_xread(fstrm, c, 1, 1, &nitem); + if (err == 0 && nitem == 0) + err = EOF; + PFS_ASSERT(err < 0 || nitem == 1); + return err; +} + +static int +_pfs_fread(FILE *stream, void *buf, size_t size, size_t nmemb, size_t *nitemp) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xread(fstrm, buf, size, nmemb, nitemp); + return err; +} + +static int +_pfs_fwrite(FILE *stream, const void *buf, size_t size, size_t nmemb, size_t *nitemp) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xwrite(fstrm, buf, size, nmemb, nitemp); + return err; +} + +static int +_pfs_fflush(FILE *stream) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xflush(fstrm); + return err; +} + +static int +_pfs_rewind(FILE *stream) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + off_t newoff; + int err; + + newoff = pfs_fstrm_xseekoff(fstrm, 0, SEEK_SET, true, true); + err = newoff < 0 ? newoff : 0; + return err; +} + +static int +_pfs_fseek(FILE *stream, off_t offset, int whence) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + off_t newoff; + int err; + + newoff = pfs_fstrm_xseekoff(fstrm, offset, whence, true, false); + err = newoff < 0 ? newoff : 0; + return err; +} + +static int +_pfs_ftell(FILE *stream, off_t *offset) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + off_t newoff; + int err; + + newoff = pfs_fstrm_xseekoff(fstrm, 0, SEEK_CUR, false, false); + err = newoff < 0 ? newoff : 0; + if (err < 0) + return err; + *offset = newoff; + return 0; +} + +static int +_pfs_feof(FILE *stream, bool *iseof) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xeof(fstrm, iseof); + return err; +} + +static int +_pfs_fileno(FILE *stream, int *fileno) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xfileno(fstrm, fileno); + return err; +} + +static int +_pfs_ferror(FILE *stream, bool *haserr) +{ + pfs_fstrm_t *fstrm = (pfs_fstrm_t *)stream; + int err; + + err = pfs_fstrm_xerror(fstrm, haserr); + return err; +} + +FILE * +pfs_fopen(const char *pbdpath, const char *mode) +{ + int err = -EAGAIN; + FILE *stream = NULL; + + if (!pbdpath || !mode) + err = -EINVAL; + API_ENTER(INFO, "%s, %s", PATH_ARG(pbdpath), PATH_ARG(mode)); + + while (err == -EAGAIN) { + err = _pfs_fopen(pbdpath, mode, &stream); + } + + API_EXIT(err); + if (err < 0) + return NULL; + + stream = PFS_STRM_MAKE(stream); + return stream; +} + +int +pfs_fclose(FILE *stream) +{ + int err = -EAGAIN; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fclose(stream); + } + + API_EXIT(err); + if (err < 0) + return EOF; + return 0; +} + +int +pfs_fgetc(FILE *stream) +{ + int err = -EAGAIN; + char rchar; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fgetc(stream, &rchar); + } + + API_EXIT(err); + if (err < 0) + return err; + return (int)rchar; +} + +size_t +pfs_fread(void *buf, size_t size, size_t nmemb, FILE *stream) +{ + int err = -EAGAIN; + size_t nitem = 0; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + else if (!buf) + err = -EINVAL; + API_ENTER(DEBUG, "%p, %lu, %lu, %p", buf, size, nmemb, stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fread(stream, buf, size, nmemb, &nitem); + } + + API_EXIT(err); + return nitem; +} + +size_t +pfs_fwrite(const void *buf, size_t size, size_t nmemb, FILE *stream) +{ + int err = -EAGAIN; + size_t nitem = 0; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + else if (!buf) + err = -EINVAL; + API_ENTER(DEBUG, "%p, %lu, %lu, %p", buf, size, nmemb, stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fwrite(stream, buf, size, nmemb, &nitem); + } + + API_EXIT(err); + return nitem; +} + +int +pfs_fflush(FILE *stream) +{ + int err = -EAGAIN; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fflush(stream); + } + + API_EXIT(err); + if (err < 0) + return EOF; + return 0; +} + +void +pfs_rewind(FILE *stream) +{ + int err = -EAGAIN; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_rewind(stream); + } + + API_EXIT(err); +} + +int +pfs_fseek(FILE *stream, off_t offset, int whence) +{ + int err = -EAGAIN; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p, %lu, %lu", stream, offset, whence); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fseek(stream, offset, whence); + } + + API_EXIT(err); + if (err < 0) + return -1; + return 0; +} + +off_t +pfs_ftell(FILE *stream) +{ + int err = -EAGAIN; + off_t offset = -1; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_ftell(stream, &offset); + } + + API_EXIT(err); + if (err < 0) + return -1; + return offset; +} + +int +pfs_feof(FILE *stream) +{ + int err = -EAGAIN; + bool iseof = false; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_feof(stream, &iseof); + } + + API_EXIT(err); + if (iseof) + return 1; + return 0; +} + +int +pfs_fileno(FILE *stream) +{ + int err = -EAGAIN; + int fileno = -1; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_fileno(stream, &fileno); + } + + API_EXIT(err); + if (err < 0) + return -1; + return fileno; +} + +int +pfs_ferror(FILE *stream) +{ + int err = -EAGAIN; + bool haserr = true; + + if (!PFS_STRM_ISVALID(stream)) + err = -EBADF; + API_ENTER(DEBUG, "%p", stream); + + stream = PFS_STRM_RAW(stream); + while (err == -EAGAIN) { + err = _pfs_ferror(stream, &haserr); + } + + API_EXIT(err); + if (err < 0) + return -1; + if (haserr) + return 1; + return 0; +} diff --git a/src/pfs_core/pfs_api.h b/src/pfs_core/pfs_api.h index 47357fe..486ca0a 100644 --- a/src/pfs_core/pfs_api.h +++ b/src/pfs_core/pfs_api.h @@ -80,6 +80,7 @@ int pfs_fallocate(int fd, int mode, off_t offset, off_t len); off_t pfs_lseek(int fd, off_t offset, int whence); int pfs_setxattr(const char *pbdpath, const char *name, const void *value, size_t size, int flags); +int pfs_mkstemp(char *tmpl); /* directory */ int pfs_mkdir(const char *pbdpath, mode_t mode); @@ -109,6 +110,20 @@ struct direntplus { }; struct direntplus *pfs_readdirplus(DIR *dir); +/* file stream */ +FILE *pfs_fopen(const char *pbdpath, const char *mode); +int pfs_fclose(FILE *stream); +int pfs_fgetc(FILE *stream); +size_t pfs_fread(void *buf, size_t size, size_t nmemb, FILE *stream); +size_t pfs_fwrite(const void *buf, size_t size, size_t nmemb, FILE *stream); +int pfs_fflush(FILE *stream); +void pfs_rewind(FILE *stream); +int pfs_fseek(FILE *stream, off_t offset, int whence); +off_t pfs_ftell(FILE *stream); +int pfs_feof(FILE *stream); +int pfs_fileno(FILE *stream); +int pfs_ferror(FILE *stream); + #ifdef __cplusplus } #endif @@ -137,11 +152,19 @@ struct direntplus *pfs_readdirplus(DIR *dir); #define PFS_DIR_ISVALID(dir) \ ( (dir) && ( (intptr_t)(dir) & 0x01 ) ) +#define PFS_STRM_ISVALID(stream) \ + ( (stream) && ( (intptr_t)(stream) & 0x03 ) ) + #define MYSQL_CALL(type, func, arg1, ...) \ ( PFS_##type##_ISVALID((arg1)) \ ? pfs_##func(arg1, ##__VA_ARGS__) \ : func(arg1, ##__VA_ARGS__)) +#define MYSQL_CALL_2(type, func, arg1, ...) \ + ( PFS_##type##_ISVALID((arg1)) \ + ? pfs_##func(__VA_ARGS__, arg1) \ + : func(__VA_ARGS__, arg1)) + #define MYSQLAPI_CREAT(path, mode) \ MYSQL_CALL(PATH, creat, path, mode) @@ -190,6 +213,9 @@ struct direntplus *pfs_readdirplus(DIR *dir); #define MYSQLAPI_SETXATTR(path, name, value, size, flags) \ MYSQL_CALL(PATH, setxattr, path, name, value, size, flags) +#define MYSQLAPI_MKSTEMP(tmpl) \ + pfs_mkstemp(tmpl) + #define MYSQLAPI_MKDIR(path, mode) \ MYSQL_CALL(PATH, mkdir, path, mode) @@ -241,6 +267,41 @@ struct direntplus *pfs_readdirplus(DIR *dir); #define MYSQLAPI_CHOWN(path, owner, group) \ MYSQL_CALL(PATH, chown, path, owner, group) +#define MYSQLAPI_FOPEN(path, mode) \ + MYSQL_CALL(PATH, fopen, path, mode) + +#define MYSQLAPI_FCLOSE(stream) \ + MYSQL_CALL(STRM, fclose, stream) + +#define MYSQLAPI_FGETC(stream) \ + MYSQL_CALL(STRM, fgetc, stream) + +#define MYSQLAPI_FREAD(buf, size, nmemb, stream) \ + MYSQL_CALL_2(STRM, fread, stream, buf, size, nmemb) + +#define MYSQLAPI_FWRITE(buf, size, nmemb, stream) \ + MYSQL_CALL_2(STRM, fwrite, stream, buf, size, nmemb) + +#define MYSQLAPI_FFLUSH(stream) \ + MYSQL_CALL(STRM, fflush, stream) + +#define MYSQLAPI_REWIND(stream) \ + MYSQL_CALL(STRM, rewind, stream) + +#define MYSQLAPI_FSEEK(stream, offset, whence) \ + MYSQL_CALL(STRM, fseek, stream, offset, whence) + +#define MYSQLAPI_FTELL(stream) \ + MYSQL_CALL(STRM, ftell, stream) + +#define MYSQLAPI_FEOF(stream) \ + MYSQL_CALL(STRM, feof, stream) + +#define MYSQLAPI_FILENO(stream) \ + MYSQL_CALL(STRM, fileno, stream) + +#define MYSQLAPI_FERROR(stream) \ + MYSQL_CALL(STRM, ferror, stream) // Errno enum PFSErrCode { diff --git a/src/pfs_core/pfs_fstream.cc b/src/pfs_core/pfs_fstream.cc new file mode 100644 index 0000000..34f317c --- /dev/null +++ b/src/pfs_core/pfs_fstream.cc @@ -0,0 +1,812 @@ +/* + * Copyright (c) 2017-2021, Alibaba Group Holding Limited + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "pfs_fstream.h" +#include "pfs_api.h" +#include "pfs_impl.h" +#include "pfs_memory.h" +#include "pfs_option.h" + +#define FSTRM_BUFSIZE (4096) + +#define CHECK_FILE(fp) \ + if (pfs_fstrmf_check(fp, FSTRMF_MAGIC)) \ + ERR_RETVAL(EINVAL) + +enum { + FSTRMF_MAGIC = 0xFFFF0000, + /* buffer type */ + FSTRMF_UNBUFFERED = 0x1, + FSTRMF_LINE_BUF = 0x2, + /* status bit */ + FSTRMF_EOF_SEEN = 0x4, + FSTRMF_ERR_SEEN = 0x8, + /* rwa mode */ + FSTRMF_READABLE = 0x10, + FSTRMF_WRITABLE = 0x20, + FSTRMF_IS_APPENDING = 0x40, // unused + /* link in chain */ + FSTRMF_LINKED = 0x80, + /* unimplemented */ + /* + FSTRMF_USER_BUF + FSTRMF_DELETE_DONT_CLOSE + FSTRMF_IN_BACKUP + FSTRMF_TIED_PUT_GET + FSTRMF_CURRENTLY_PUTTING + FSTRMF_IS_FILEBUF + FSTRMF_BAD_SEEN + FSTRMF_USER_LOCK + */ +}; + +enum { + FSTRM_IO_SYNC = 0, + FSTRM_IO_READING = 1, + FSTRM_IO_WRITING = 2, +}; + +static pfs_fstrm_t * fstrm_chain = NULL; +static pthread_mutex_t fstrm_chain_lock = PTHREAD_MUTEX_INITIALIZER; + +static inline void +pfs_fstrmf_set(pfs_fstrm_t *fp, uint64_t flags, uint64_t mask) +{ + /* set bits exclusively in subgroup */ + fp->f_flags = ((fp->f_flags & ~(mask)) | (flags & mask)); +} + +static inline bool +pfs_fstrmf_check(pfs_fstrm_t *fp, uint64_t flags) +{ + return ((fp->f_flags & flags) == flags); +} + +static void +pfs_fstrmf_set_buftype(pfs_fstrm_t *fp, int type) +{ + int mask; + uint64_t flags; + + mask = FSTRMF_UNBUFFERED|FSTRMF_LINE_BUF; + switch (type) { + case _IONBF: + flags = FSTRMF_UNBUFFERED; + break; + + case _IOLBF: + flags = FSTRMF_LINE_BUF; + break; + + case _IOFBF: + flags = 0; + break; + + default: + pfs_etrace("invalid buf type %d\n", type); + PFS_ASSERT("unreachable" == NULL); + } + + pfs_fstrmf_set(fp, flags, mask); +} + +static inline void +pfs_fstrmf_set_eof(pfs_fstrm_t *fp) +{ + pfs_fstrmf_set(fp, FSTRMF_EOF_SEEN, FSTRMF_EOF_SEEN); +} + +static inline void +pfs_fstrmf_unset_eof(pfs_fstrm_t *fp) +{ + pfs_fstrmf_set(fp, 0, FSTRMF_EOF_SEEN); +} + +/* + * error bit is set on the spot + */ +static inline void +pfs_fstrmf_set_err(pfs_fstrm_t *fp) +{ + pfs_fstrmf_set(fp, FSTRMF_ERR_SEEN, FSTRMF_ERR_SEEN); +} + +static inline void +pfs_fstrmf_unset_err(pfs_fstrm_t *fp) +{ + pfs_fstrmf_set(fp, 0, FSTRMF_EOF_SEEN); +} + +static void +pfs_fstrmf_set_rwmode(pfs_fstrm_t *fp, int accmode) +{ + uint64_t flags; + int mask; + + flags = 0; + mask = FSTRMF_READABLE|FSTRMF_WRITABLE|FSTRMF_IS_APPENDING; + switch (accmode) { + case O_RDONLY: + flags = FSTRMF_READABLE; + break; + + case O_WRONLY: + flags = FSTRMF_WRITABLE; + break; + + case O_RDWR: + flags = FSTRMF_READABLE|FSTRMF_WRITABLE; + break; + + case O_WRONLY|O_APPEND: + flags = FSTRMF_WRITABLE|FSTRMF_IS_APPENDING; + break; + + case O_RDWR|O_APPEND: + flags = FSTRMF_READABLE|FSTRMF_WRITABLE|FSTRMF_IS_APPENDING; + break; + + default: + pfs_etrace("invalid accmode: %#x\n", accmode); + PFS_ASSERT("unreachable" == NULL); + } + + pfs_fstrmf_set(fp, flags, mask); +} + +static void +pfs_fstrm_link_chain(pfs_fstrm_t *fp) +{ + PFS_ASSERT(!(fp->f_flags & FSTRMF_LINKED)); + mutex_lock(&fstrm_chain_lock); + + fp->f_next = fstrm_chain; + fstrm_chain = fp; + pfs_fstrmf_set(fp, FSTRMF_LINKED, FSTRMF_LINKED); + + mutex_unlock(&fstrm_chain_lock); +} + +static void +pfs_fstrm_unlink_chain(pfs_fstrm_t *fp) +{ + pfs_fstrm_t *prev, *cur; + PFS_ASSERT(fp->f_flags & FSTRMF_LINKED); + mutex_lock(&fstrm_chain_lock); + + prev = NULL; + cur = fstrm_chain; + while (cur != fp) { + prev = cur; + cur = cur->f_next; + } + PFS_ASSERT(cur == fp); + if (prev != NULL) + prev->f_next = fp->f_next; + pfs_fstrmf_set(fp, 0, FSTRMF_LINKED); + + mutex_unlock(&fstrm_chain_lock); +} + +static inline void +pfs_fstrm_lock(pfs_fstrm_t *fp) +{ + mutex_lock(&fp->f_mtx); +} + +static inline void +pfs_fstrm_unlock(pfs_fstrm_t *fp) +{ + mutex_unlock(&fp->f_mtx); +} + +static inline int +pfs_api_open(pfs_fstrm_t *fp, const char *pbdpath, int flags, int mode) +{ + int fd; + + fd = pfs_open(pbdpath, flags, mode); + if (fd < 0) { + pfs_fstrmf_set_err(fp); + return -errno; + } + return fd; +} + +static inline int +pfs_api_close(pfs_fstrm_t *fp) +{ + int err; + + err = pfs_close(fp->f_fileno); + if (err < 0) { + pfs_fstrmf_set_err(fp); + return -errno; + } + return 0; +} + +static off_t +pfs_api_lseek(pfs_fstrm_t *fp, off_t off, int whence) +{ + off_t newoff; + + newoff = pfs_lseek(fp->f_fileno, off, whence); + if (newoff < 0) { + pfs_fstrmf_set_err(fp); + return -errno; + } + return newoff; +} + +static ssize_t +pfs_api_read(pfs_fstrm_t *fp, char *buf, size_t len) +{ + ssize_t rlen; + + rlen = pfs_read(fp->f_fileno, buf, len); + if (rlen < 0) { + pfs_fstrmf_set_err(fp); + return -errno; + } + if (rlen == 0) + pfs_fstrmf_set_eof(fp); + + return rlen; +} + +static ssize_t +pfs_api_write(pfs_fstrm_t *fp, const char *buf, size_t len) +{ + ssize_t wlen; + + wlen = pfs_write(fp->f_fileno, buf, len); + PFS_ASSERT(wlen < 0 || (size_t)wlen == len); // guaranteed by pfs_write() + if (wlen < 0) { + pfs_fstrmf_set_err(fp); + return -errno; + } + + return wlen; +} + +static pfs_fstrm_t * +pfs_fstrm_create() +{ + size_t bufsz = FSTRM_BUFSIZE; + pfs_fstrm_t *fp; + + fp = (pfs_fstrm_t *)pfs_mem_malloc(sizeof(*fp) + bufsz, M_FSTRM); + if (fp == NULL) { + pfs_etrace("failed to create fstream: no memory\n"); + return NULL; + } + memset(fp, 0, sizeof(*fp) + bufsz); + + fp->f_fileno = -1; + fp->f_flags = 0; + fp->f_next = NULL; + mutex_init(&fp->f_mtx); + fp->f_base = (char *)(fp + 1); + fp->f_bufsz = bufsz; + fp->f_rw = FSTRM_IO_SYNC; + fp->f_cur = fp->f_end = fp->f_base; + return fp; +} + +static void +pfs_fstrm_destroy(pfs_fstrm_t *fp) +{ + mutex_destroy(&fp->f_mtx); + pfs_mem_free(fp, M_FSTRM); +} + +static int +pfs_fstrm_open_mode(const char *mode, int *omodep, int *oflagsp) +{ + int oflags = 0, omode; + + switch (*mode) { + case 'r': + omode = O_RDONLY; + break; + + case 'w': + omode = O_WRONLY; + oflags = O_CREAT|O_TRUNC; + break; + + case 'a': + omode = O_WRONLY; + oflags = O_CREAT|O_APPEND; + break; + + default: + ERR_RETVAL(EINVAL); + } + + while (*++mode != '\0') { + switch (*mode) { + case '+': + omode = O_RDWR; + continue; + + /* mode extensions are unsupported now */ + case 'x': + case 'b': + case 'm': + case 'c': + case 'e': + break; + default: + ERR_RETVAL(EINVAL); + } + } + + *omodep = omode; + *oflagsp = oflags; + return 0; +} + +static int +pfs_fstrm_open_fd(pfs_fstrm_t *fp, const char *pbdpath, int flags, int mode) +{ + int fd; + + fd = pfs_api_open(fp, pbdpath, flags, mode); + if (fd < 0) + return fd; + + fp->f_fileno = fd; + return 0; +} + +static int +pfs_fstrm_close_fd(pfs_fstrm_t *fp) +{ + int err; + + err = pfs_api_close(fp); + if (err < 0) + return err; + + fp->f_fileno = -1; + return 0; +} + +static off_t +pfs_fstrm_seekoff(pfs_fstrm_t *fp, off_t offset, int whence) +{ + PFS_ASSERT(fp->f_rw == FSTRM_IO_SYNC); + return pfs_api_lseek(fp, offset, whence); +} + +static ssize_t +pfs_fstrmb_read_prepare(pfs_fstrm_t *fp) +{ + ssize_t rlen; + + PFS_ASSERT(fp->f_rw == FSTRM_IO_SYNC); + rlen = pfs_api_read(fp, fp->f_base, fp->f_bufsz); + if (rlen <= 0) + return rlen; + PFS_ASSERT(rlen <= (ssize_t)fp->f_bufsz); + + /* INVARIANT: f_end indicates fd's offset */ + fp->f_rw = FSTRM_IO_READING; + fp->f_cur = fp->f_base; + fp->f_end = fp->f_base + rlen; + return rlen; +} + +static void +pfs_fstrmb_read_consume(pfs_fstrm_t *fp, char *buf, size_t len) +{ + PFS_ASSERT(fp->f_rw == FSTRM_IO_READING); + PFS_ASSERT(len <= (size_t)(fp->f_end - fp->f_cur)); + memcpy(buf, fp->f_cur, len); + fp->f_cur += len; + if (fp->f_cur == fp->f_end) { + fp->f_rw = FSTRM_IO_SYNC; + fp->f_cur = fp->f_end = fp->f_base; + } +} + +static int +pfs_fstrmb_read_discard(pfs_fstrm_t *fp) +{ + off_t off; + + PFS_ASSERT(fp->f_rw == FSTRM_IO_READING); + off = pfs_api_lseek(fp, fp->f_cur - fp->f_end, SEEK_CUR); + if (off < 0) + return off; + + fp->f_rw = FSTRM_IO_SYNC; + fp->f_cur = fp->f_end = fp->f_base; + return 0; +} + +static void +pfs_fstrmb_write_prepare(pfs_fstrm_t *fp) +{ + PFS_ASSERT(fp->f_rw == FSTRM_IO_SYNC); + fp->f_rw = FSTRM_IO_WRITING; + fp->f_cur = fp->f_base; + fp->f_end = fp->f_base + fp->f_bufsz; +} + +static void +pfs_fstrmb_write_fill(pfs_fstrm_t *fp, const char *buf, size_t len) +{ + PFS_ASSERT(fp->f_rw == FSTRM_IO_WRITING); + PFS_ASSERT(len <= (size_t)(fp->f_end - fp->f_cur)); + memcpy(fp->f_cur, buf, len); + fp->f_cur += len; +} + +static ssize_t +pfs_fstrmb_write_flush(pfs_fstrm_t *fp) +{ + ssize_t wlen, reqlen; + + PFS_ASSERT(fp->f_rw == FSTRM_IO_WRITING); + reqlen = (ssize_t)(fp->f_cur - fp->f_base); + wlen = pfs_api_write(fp, fp->f_base, reqlen); + if (wlen < 0) + return wlen; + + /* INVARIANT: f_base indicates fd's offset */ + fp->f_rw = FSTRM_IO_SYNC; + fp->f_cur = fp->f_end = fp->f_base; + return wlen; +} + +/* + * @donesz is guaranteed to be set + */ +static int +pfs_fstrm_getn(pfs_fstrm_t *fp, char *buf, size_t reqsz, size_t *donesz) +{ + char *rptr = buf; + size_t buffered, todo; + ssize_t rlen, iolen; + int err; + + *donesz = 0; + if (!pfs_fstrmf_check(fp, FSTRMF_READABLE)) + ERR_RETVAL(EBADF); + + if (fp->f_rw == FSTRM_IO_WRITING) { + iolen = pfs_fstrmb_write_flush(fp); + if (iolen < 0) + return iolen; + } + if (fp->f_rw == FSTRM_IO_SYNC) { + iolen = pfs_fstrmb_read_prepare(fp); + if (iolen <= 0) + return iolen; + } + PFS_ASSERT(fp->f_rw == FSTRM_IO_READING); + + err = 0; + for (todo = reqsz; todo > 0; todo -= rlen, rptr += rlen) { + // ignore EOF and ERR bit + + /* consume buffer if not empty */ + buffered = fp->f_end - fp->f_cur; + rlen = MIN(todo, buffered); + if (rlen > 0) { + pfs_fstrmb_read_consume(fp, rptr, rlen); + continue; + } + + PFS_ASSERT(fp->f_cur == fp->f_end); + /* buffer consumed, need to be refilled for last part */ + if (todo < fp->f_bufsz) { + iolen = pfs_fstrmb_read_prepare(fp); + err = iolen < 0 ? iolen : 0; + if (err < 0 || iolen == 0) + break; + rlen = 0; + continue; + } + + iolen = todo & ~(fp->f_bufsz - 1); + rlen = pfs_api_read(fp, rptr, iolen); + err = rlen < 0 ? rlen : 0; + if (err < 0 || rlen == 0) + break; + } + + *donesz = reqsz - todo; + PFS_ASSERT(*donesz == (size_t)(rptr - buf)); + return err; +} + +/* + * @donesz is guaranteed to be set + */ +static int +pfs_fstrm_putn(pfs_fstrm_t *fp, const char *buf, size_t reqsz, size_t *donesz) +{ + const char *wptr = buf; + size_t todo, space; + ssize_t wlen, iolen; + int err; + + *donesz = 0; + if (!pfs_fstrmf_check(fp, FSTRMF_WRITABLE)) + ERR_RETVAL(EBADF); + + if (fp->f_rw == FSTRM_IO_READING) { + err = pfs_fstrmb_read_discard(fp); + if (err < 0) + return err; + } + if (fp->f_rw == FSTRM_IO_SYNC) + pfs_fstrmb_write_prepare(fp); + PFS_ASSERT(fp->f_rw == FSTRM_IO_WRITING); + + for (todo = reqsz; todo > 0; todo -= wlen, wptr += wlen) { + // ignore EOF and ERR bit + + /* fill when buffer is not full */ + space = fp->f_end - fp->f_cur; + wlen = MIN(todo, space); + if (wlen > 0) { + pfs_fstrmb_write_fill(fp, wptr, wlen); + continue; + } + + PFS_ASSERT(fp->f_cur == fp->f_end); + /* buffer is full, need flush before writing more data */ + iolen = pfs_fstrmb_write_flush(fp); + err = iolen < 0 ? iolen : 0; + if (err < 0) + break; + + pfs_fstrmb_write_prepare(fp); + + /* last part can be buffered */ + if (todo < fp->f_bufsz) { + wlen = todo; + pfs_fstrmb_write_fill(fp, wptr, wlen); + continue; + } + + iolen = todo & ~(fp->f_bufsz - 1); + wlen = pfs_api_write(fp, wptr, iolen); + err = wlen < 0 ? wlen : 0; + if (err < 0) + break; + } + + *donesz = reqsz - todo; + PFS_ASSERT(*donesz == (size_t)(wptr - buf)); + return err; +} + +static int +pfs_fstrm_sync(pfs_fstrm_t *fp) +{ + int err; + ssize_t wlen; + + if (fp->f_rw == FSTRM_IO_SYNC) { + PFS_ASSERT((fp->f_cur == fp->f_base) && (fp->f_end == fp->f_base)); + return 0; + } + + if (fp->f_rw == FSTRM_IO_READING) { + err = pfs_fstrmb_read_discard(fp); + } else { + PFS_ASSERT(fp->f_rw == FSTRM_IO_WRITING); + wlen = pfs_fstrmb_write_flush(fp); + err = wlen < 0 ? wlen : 0; + } + + if (err < 0) + return err; + + PFS_ASSERT(fp->f_rw == FSTRM_IO_SYNC); + PFS_ASSERT((fp->f_cur == fp->f_base) && (fp->f_end == fp->f_base)); + return 0; +} + +int +pfs_fstrm_open(const char *pbdpath, const char *mode, pfs_fstrm_t **fpp) +{ + pfs_fstrm_t *fp; + int err, omode, oflags; + + err = pfs_fstrm_open_mode(mode, &omode, &oflags); + if (err < 0) + return err; + + fp = pfs_fstrm_create(); + if (fp == NULL) + ERR_RETVAL(ENOMEM); + + err = pfs_fstrm_open_fd(fp, pbdpath, omode|oflags, 0666); + if (err < 0) { + pfs_fstrm_destroy(fp); + return err; + } +#define FSTRMF_MASK (O_RDONLY|O_WRONLY|O_RDWR|O_APPEND) + pfs_fstrmf_set_rwmode(fp, (omode|oflags) & FSTRMF_MASK); + pfs_fstrmf_set_buftype(fp, _IOFBF); + pfs_fstrm_link_chain(fp); + + *fpp = fp; + return 0; +} + +int +pfs_fstrm_xclose(pfs_fstrm_t *fp) +{ + int err, serr; + + CHECK_FILE(fp); + + pfs_fstrm_lock(fp); + pfs_fstrm_unlink_chain(fp); + serr = pfs_fstrm_sync(fp); + err = pfs_fstrm_close_fd(fp); + pfs_fstrm_unlock(fp); + + pfs_fstrm_destroy(fp); + return err ? err : serr; +} + +int +pfs_fstrm_xread(pfs_fstrm_t *fp, void *buf, size_t size, size_t nmemb, + size_t *nitem) +{ + size_t reqsz, donesz; + int err; + + CHECK_FILE(fp); + reqsz = size * nmemb; + if (reqsz == 0) { + *nitem = 0; + return 0; + } + + pfs_fstrm_lock(fp); + err = pfs_fstrm_getn(fp, (char *)buf, reqsz, &donesz); + pfs_fstrm_unlock(fp); + + *nitem = (reqsz == donesz) ? nmemb : donesz / size; + return err; +} + +int +pfs_fstrm_xwrite(pfs_fstrm_t *fp, const void *buf, size_t size, size_t nmemb, + size_t *nitem) +{ + size_t reqsz, donesz; + int err; + + CHECK_FILE(fp); + reqsz = size * nmemb; + if (reqsz == 0) { + *nitem = 0; + return 0; + } + + pfs_fstrm_lock(fp); + err = pfs_fstrm_putn(fp, (const char *)buf, reqsz, &donesz); + pfs_fstrm_unlock(fp); + + *nitem = (reqsz == donesz) ? nmemb : donesz / size; + return err; +} + +int +pfs_fstrm_xflush(pfs_fstrm_t *fp) +{ + int err; + + CHECK_FILE(fp); + + pfs_fstrm_lock(fp); + err = pfs_fstrm_sync(fp); + pfs_fstrm_unlock(fp); + + return err; +} + +off_t +pfs_fstrm_xseekoff(pfs_fstrm_t *fp, off_t offset, int whence, bool reseteof, bool reseterr) +{ + off_t off; + int err; + + CHECK_FILE(fp); + + pfs_fstrm_lock(fp); + err = pfs_fstrm_sync(fp); + if (err < 0) + goto out; + + off = pfs_fstrm_seekoff(fp, offset, whence); + err = off < 0 ? off : 0; + if (err < 0) + goto out; + + if (reseteof) + pfs_fstrmf_unset_eof(fp); + if (reseterr) + pfs_fstrmf_unset_err(fp); + + pfs_fstrm_unlock(fp); + return off; + +out: + pfs_fstrm_unlock(fp); + return err; +} + +int +pfs_fstrm_xfileno(pfs_fstrm_t *fp, int *fileno) +{ + CHECK_FILE(fp); + + if (fp->f_fileno < 0) + ERR_RETVAL(EBADF); + pfs_fstrm_lock(fp); + *fileno = fp->f_fileno; + pfs_fstrm_unlock(fp); + return 0; +} + +int +pfs_fstrm_xeof(pfs_fstrm_t *fp, bool *iseof) +{ + CHECK_FILE(fp); + + pfs_fstrm_lock(fp); + *iseof = pfs_fstrmf_check(fp, FSTRMF_EOF_SEEN); + pfs_fstrm_unlock(fp); + return 0; +} + +int +pfs_fstrm_xerror(pfs_fstrm_t *fp, bool *haserr) +{ + CHECK_FILE(fp); + + pfs_fstrm_lock(fp); + *haserr = pfs_fstrmf_check(fp, FSTRMF_ERR_SEEN); + pfs_fstrm_unlock(fp); + return 0; +} + +int +pfs_fstrm_xclearerr(pfs_fstrm_t *fp) +{ + CHECK_FILE(fp); + + pfs_fstrm_lock(fp); + pfs_fstrmf_unset_eof(fp); + pfs_fstrmf_unset_err(fp); + pfs_fstrm_unlock(fp); + return 0; +} diff --git a/src/pfs_core/pfs_fstream.h b/src/pfs_core/pfs_fstream.h new file mode 100644 index 0000000..866dd96 --- /dev/null +++ b/src/pfs_core/pfs_fstream.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017-2021, Alibaba Group Holding Limited + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _PFS_FSTREAM_H_ +#define _PFS_FSTREAM_H_ + +#include +#include + +typedef struct pfs_mount pfs_mount_t; + +typedef struct pfs_fstrm { + int f_fileno; + int f_flags; + + pthread_mutex_t f_mtx; + struct pfs_fstrm *f_next; + + int f_rw; /* currently reading or writing */ + char *f_base; + size_t f_bufsz; + char *f_cur; + char *f_end; /* end of valid data */ +} pfs_fstrm_t; + +int pfs_fstrm_open(const char *pbdpath, const char *mode, + pfs_fstrm_t **fpp); +int pfs_fstrm_xclose(pfs_fstrm_t *fp); +int pfs_fstrm_xread(pfs_fstrm_t *fp, void *buf, size_t size, size_t nmemb, + size_t *nitem); +int pfs_fstrm_xwrite(pfs_fstrm_t *fp, const void *buf, size_t size, size_t nmemb, + size_t *nitem); +int pfs_fstrm_xflush(pfs_fstrm_t *fp); +off_t pfs_fstrm_xseekoff(pfs_fstrm_t *fp, off_t offset, int whence, bool reseterr, bool reseteof); +int pfs_fstrm_xfileno(pfs_fstrm_t *fp, int *fileno); +int pfs_fstrm_xeof(pfs_fstrm_t *fp, bool *iseof); +int pfs_fstrm_xerror(pfs_fstrm_t *fp, bool *haserr); +int pfs_fstrm_xclearerr(pfs_fstrm_t *fp); + +#endif diff --git a/src/pfs_core/pfs_memory.cc b/src/pfs_core/pfs_memory.cc index 65960ed..f385c7b 100644 --- a/src/pfs_core/pfs_memory.cc +++ b/src/pfs_core/pfs_memory.cc @@ -86,6 +86,7 @@ static pfs_memtype_t pfs_mem_type[M_NTYPE] = { MEMTYPE_ENTRY(M_CONFIG_KV), MEMTYPE_ENTRY(M_FDTBL_PTR), MEMTYPE_ENTRY(M_INODE_BLK_TABLE), + MEMTYPE_ENTRY(M_FSTRM), }; static inline const char * diff --git a/src/pfs_core/pfs_memory.h b/src/pfs_core/pfs_memory.h index a8c92a8..f08cb8e 100644 --- a/src/pfs_core/pfs_memory.h +++ b/src/pfs_core/pfs_memory.h @@ -71,6 +71,7 @@ enum { M_CONFIG_KV, M_FDTBL_PTR, M_INODE_BLK_TABLE, + M_FSTRM, M_NTYPE }; diff --git a/src/pfs_core/pfs_util.cc b/src/pfs_core/pfs_util.cc index 2c677d7..d3af5c3 100644 --- a/src/pfs_core/pfs_util.cc +++ b/src/pfs_core/pfs_util.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -25,6 +26,9 @@ #include "pfs_impl.h" #include "pfs_util.h" +#include "pfs_api.h" +#include "pfs_impl.h" +#include "pfs_inode.h" #include "pfs_memory.h" #include "pfs_trace.h" @@ -238,6 +242,65 @@ oidvect_fini(oidvect_t *ov) ov->ov_size = 0; } +static const char letters[] = +"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; +#define NLETTER 62 + +int +gen_tempname(char *tmpl, int suffixlen, int flags, int type) +{ + int save_errno = errno; + int fd = -1; + int len; + char *XXXXXX; + uint64_t random, retry, nretry; + + len = strlen(tmpl); + if (len < 6 + suffixlen || memcmp(&tmpl[len - 6 - suffixlen], "XXXXXX", 6)) + ERR_RETVAL(EINVAL); + XXXXXX = &tmpl[len - 6 - suffixlen]; + + random = (uint64_t)time(NULL) ^ (uint64_t)pthread_self(); + nretry = TMP_MAX; + for (retry = 0; retry < nretry; random += 7777, ++retry) { + + for (int i = 0; i < 6; i++) { + XXXXXX[i] = letters[random % NLETTER]; + random /= NLETTER; + } + + switch (type) { + case PFS_INODET_FILE: + fd = pfs_open(tmpl, + (flags & ~O_ACCMODE) | O_RDWR | O_CREAT | O_EXCL, + S_IRUSR|S_IWUSR); + break; + + case PFS_INODET_DIR: + ERR_RETVAL(ENOTSUP); + + default: + pfs_etrace("invalid type %d to generate temp file\n", type); + PFS_ASSERT("invalid argument" == NULL); + } + + if (fd < 0 && errno != EEXIST) + ERR_RETVAL(errno); + + if (fd >= 0) + break; + + /* file exist, retry */ + } + + PFS_ASSERT(fd >= 0 || retry >= nretry); + if (retry >= nretry) + ERR_RETVAL(EEXIST); + + errno = save_errno; + return fd; +} + int pfs_printf(pfs_printer_t *pr, const char *fmt, ...) { diff --git a/src/pfs_core/pfs_util.h b/src/pfs_core/pfs_util.h index 25066b5..7915891 100644 --- a/src/pfs_core/pfs_util.h +++ b/src/pfs_core/pfs_util.h @@ -24,6 +24,7 @@ uint64_t roundup_power2(uint64_t val); int strncpy_safe(char *dst, const char *src, size_t n); uint32_t crc32c_compute(const void *buf, size_t size, size_t offset); uint64_t gettimeofday_us(); +int gen_tempname(char *tmpl, int suffixlen, int flags, int type); #define DATA_SET_ATTR(set) \ __attribute__((used)) \