From 70baa472b2bee69f205cc1aada304d597b858005 Mon Sep 17 00:00:00 2001 From: Alejandro Soto Date: Tue, 4 Jan 2022 06:49:48 -0600 Subject: Move crate::fuse::* to the top-level --- src/fuse/io.rs | 379 --------------------------------- src/fuse/mod.rs | 43 ---- src/fuse/mount.rs | 168 --------------- src/fuse/ops/dir.rs | 253 ---------------------- src/fuse/ops/entry.rs | 80 ------- src/fuse/ops/global.rs | 88 -------- src/fuse/ops/mod.rs | 68 ------ src/fuse/ops/open.rs | 131 ------------ src/fuse/ops/rw.rs | 123 ----------- src/fuse/ops/xattr.rs | 139 ------------ src/fuse/session.rs | 559 ------------------------------------------------- src/io.rs | 379 +++++++++++++++++++++++++++++++++ src/lib.rs | 60 +++++- src/mod.rs | 43 ++++ src/mount.rs | 168 +++++++++++++++ src/ops/dir.rs | 253 ++++++++++++++++++++++ src/ops/entry.rs | 79 +++++++ src/ops/global.rs | 87 ++++++++ src/ops/mod.rs | 67 ++++++ src/ops/open.rs | 130 ++++++++++++ src/ops/rw.rs | 122 +++++++++++ src/ops/xattr.rs | 141 +++++++++++++ src/session.rs | 559 +++++++++++++++++++++++++++++++++++++++++++++++++ 23 files changed, 2079 insertions(+), 2040 deletions(-) delete mode 100644 src/fuse/io.rs delete mode 100644 src/fuse/mod.rs delete mode 100644 src/fuse/mount.rs delete mode 100644 src/fuse/ops/dir.rs delete mode 100644 src/fuse/ops/entry.rs delete mode 100644 src/fuse/ops/global.rs delete mode 100644 src/fuse/ops/mod.rs delete mode 100644 src/fuse/ops/open.rs delete mode 100644 src/fuse/ops/rw.rs delete mode 100644 src/fuse/ops/xattr.rs delete mode 100644 src/fuse/session.rs create mode 100644 src/io.rs create mode 100644 src/mod.rs create mode 100644 src/mount.rs create mode 100644 src/ops/dir.rs create mode 100644 src/ops/entry.rs create mode 100644 src/ops/global.rs create mode 100644 src/ops/mod.rs create mode 100644 src/ops/open.rs create mode 100644 src/ops/rw.rs create mode 100644 src/ops/xattr.rs create mode 100644 src/session.rs diff --git a/src/fuse/io.rs b/src/fuse/io.rs deleted file mode 100644 index 7124fda..0000000 --- a/src/fuse/io.rs +++ /dev/null @@ -1,379 +0,0 @@ -use bytemuck::Zeroable; -use nix::{errno::Errno, sys::stat::SFlag}; - -use std::{ - convert::Infallible, - ffi::OsStr, - future::Future, - ops::{ControlFlow, FromResidual, Try}, -}; - -use super::{Done, Operation, Reply, Request}; -use crate::{proto, FuseResult, Ino, Timestamp, Ttl}; - -#[doc(no_inline)] -pub use nix::{ - dir::Type as EntryType, - fcntl::OFlag as OpenFlags, - sys::stat::Mode, - unistd::{AccessFlags, Gid, Pid, Uid}, -}; - -pub enum Interruptible<'o, O: Operation<'o>, T> { - Completed(Reply<'o, O>, T), - Interrupted(Done<'o>), -} - -pub trait Stat { - fn ino(&self) -> Ino; - fn inode_type(&self) -> EntryType; - fn attrs(&self) -> (Attrs, Ttl); -} - -pub trait Known { - type Inode: Stat; - - fn inode(&self) -> &Self::Inode; - fn unveil(self); -} - -pub struct Failed<'o, E>(pub Done<'o>, pub E); - -pub trait Finish<'o, O: Operation<'o>> { - fn finish(&self, reply: Reply<'o, O>) -> Done<'o>; -} - -#[derive(Clone)] -pub struct Attrs(proto::Attrs); - -pub struct Entry<'a, K> { - pub offset: u64, - pub name: &'a OsStr, - pub inode: K, - pub ttl: Ttl, -} - -#[derive(Copy, Clone)] -pub struct FsInfo(proto::StatfsOut); - -impl<'o, E> From> for Done<'o> { - fn from(failed: Failed<'o, E>) -> Done<'o> { - failed.0 - } -} - -impl<'o, O: Operation<'o>> Finish<'o, O> for Errno { - fn finish(&self, reply: Reply<'o, O>) -> Done<'o> { - reply.fail(*self) - } -} - -impl<'o, O: Operation<'o>> Finish<'o, O> for std::io::Error { - fn finish(&self, reply: Reply<'o, O>) -> Done<'o> { - reply.fail( - self.raw_os_error() - .map(Errno::from_i32) - .unwrap_or(Errno::EIO), - ) - } -} - -impl<'o, O: Operation<'o>> Request<'o, O> { - pub fn ino(&self) -> Ino { - Ino(self.header.ino) - } - - pub fn generation(&self) -> u64 { - 0 - } - - pub fn uid(&self) -> Uid { - Uid::from_raw(self.header.uid) - } - - pub fn gid(&self) -> Gid { - Gid::from_raw(self.header.gid) - } - - pub fn pid(&self) -> Pid { - Pid::from_raw(self.header.pid as i32) - } -} - -impl<'o, O: Operation<'o>> Reply<'o, O> { - pub async fn interruptible(self, f: F) -> Interruptible<'o, O, T> - where - F: Future, - { - tokio::pin!(f); - let mut rx = self.session.interrupt_rx(); - - use Interruptible::*; - loop { - tokio::select! { - output = &mut f => break Completed(self, output), - - result = rx.recv() => match result { - Ok(unique) if unique == self.unique => { - break Interrupted(self.interrupted()); - } - - _ => continue, - } - } - } - } - - pub fn and_then(self, result: Result) -> Result<(Self, T), Failed<'o, E>> - where - E: Finish<'o, O>, - { - match result { - Ok(t) => Ok((self, t)), - Err(error) => { - let done = error.finish(self); - Err(Failed(done, error)) - } - } - } - - pub fn fail(self, errno: Errno) -> Done<'o> { - let result = self.session.fail(self.unique, errno as i32); - self.finish(result) - } - - pub fn not_implemented(self) -> Done<'o> { - self.fail(Errno::ENOSYS) - } - - pub fn not_permitted(self) -> Done<'o> { - self.fail(Errno::EPERM) - } - - pub fn io_error(self) -> Done<'o> { - self.fail(Errno::EIO) - } - - pub fn invalid_argument(self) -> Done<'o> { - self.fail(Errno::EINVAL) - } - - pub fn interrupted(self) -> Done<'o> { - self.fail(Errno::EINTR) - } - - pub(crate) fn finish(self, result: FuseResult<()>) -> Done<'o> { - if let Err(error) = result { - log::error!("Replying to request {}: {}", self.unique, error); - } - - Done::new() - } -} - -impl<'o, O: Operation<'o>> From<(Reply<'o, O>, Errno)> for Done<'o> { - fn from((reply, errno): (Reply<'o, O>, Errno)) -> Done<'o> { - reply.fail(errno) - } -} - -impl<'o> FromResidual> for Done<'o> { - fn from_residual(residual: Done<'o>) -> Self { - residual - } -} - -impl<'o, T: Into>> FromResidual> for Done<'o> { - fn from_residual(residual: Result) -> Self { - match residual { - Ok(_) => unreachable!(), - Err(t) => t.into(), - } - } -} - -impl<'o, O: Operation<'o>> FromResidual> for Done<'o> { - fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { - match residual { - Interruptible::Completed(_, _) => unreachable!(), - Interruptible::Interrupted(done) => done, - } - } -} - -impl Try for Done<'_> { - type Output = Self; - type Residual = Self; - - fn from_output(output: Self::Output) -> Self { - output - } - - fn branch(self) -> ControlFlow { - ControlFlow::Break(self) - } -} - -impl<'o, O: Operation<'o>, T> FromResidual> - for Interruptible<'o, O, T> -{ - fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { - use Interruptible::*; - - match residual { - Completed(_, _) => unreachable!(), - Interrupted(done) => Interrupted(done), - } - } -} - -impl<'o, O: Operation<'o>, T> Try for Interruptible<'o, O, T> { - type Output = (Reply<'o, O>, T); - type Residual = Interruptible<'o, O, Infallible>; - - fn from_output((reply, t): Self::Output) -> Self { - Self::Completed(reply, t) - } - - fn branch(self) -> ControlFlow { - use Interruptible::*; - - match self { - Completed(reply, t) => ControlFlow::Continue((reply, t)), - Interrupted(done) => ControlFlow::Break(Interrupted(done)), - } - } -} - -impl Attrs { - #[must_use] - pub fn size(self, size: u64) -> Self { - Attrs(proto::Attrs { size, ..self.0 }) - } - - #[must_use] - pub fn owner(self, uid: Uid, gid: Gid) -> Self { - Attrs(proto::Attrs { - uid: uid.as_raw(), - gid: gid.as_raw(), - ..self.0 - }) - } - - #[must_use] - pub fn mode(self, mode: Mode) -> Self { - Attrs(proto::Attrs { - mode: mode.bits(), - ..self.0 - }) - } - - #[must_use] - pub fn blocks(self, blocks: u64) -> Self { - Attrs(proto::Attrs { blocks, ..self.0 }) - } - - #[must_use] - pub fn block_size(self, block_size: u32) -> Self { - Attrs(proto::Attrs { - blksize: block_size, - ..self.0 - }) - } - - #[must_use] - pub fn device(self, device: u32) -> Self { - Attrs(proto::Attrs { - rdev: device, - ..self.0 - }) - } - - #[must_use] - pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self { - Attrs(proto::Attrs { - atime: access.seconds as _, - mtime: modify.seconds as _, - ctime: change.seconds as _, - atimensec: access.nanoseconds, - mtimensec: modify.nanoseconds, - ctimensec: change.nanoseconds, - ..self.0 - }) - } - - #[must_use] - pub fn links(self, links: u32) -> Self { - Attrs(proto::Attrs { - nlink: links, - ..self.0 - }) - } - - pub(crate) fn finish(self, inode: &impl Stat) -> proto::Attrs { - let Ino(ino) = inode.ino(); - let inode_type = match inode.inode_type() { - EntryType::Fifo => SFlag::S_IFIFO, - EntryType::CharacterDevice => SFlag::S_IFCHR, - EntryType::Directory => SFlag::S_IFDIR, - EntryType::BlockDevice => SFlag::S_IFBLK, - EntryType::File => SFlag::S_IFREG, - EntryType::Symlink => SFlag::S_IFLNK, - EntryType::Socket => SFlag::S_IFSOCK, - }; - - proto::Attrs { - ino, - mode: self.0.mode | inode_type.bits(), - ..self.0 - } - } -} - -impl Default for Attrs { - fn default() -> Self { - Attrs(Zeroable::zeroed()).links(1) - } -} - -impl FsInfo { - #[must_use] - pub fn blocks(self, size: u32, total: u64, free: u64, available: u64) -> Self { - FsInfo(proto::StatfsOut { - bsize: size, - blocks: total, - bfree: free, - bavail: available, - ..self.0 - }) - } - - #[must_use] - pub fn inodes(self, total: u64, free: u64) -> Self { - FsInfo(proto::StatfsOut { - files: total, - ffree: free, - ..self.0 - }) - } - - #[must_use] - pub fn max_filename(self, max: u32) -> Self { - FsInfo(proto::StatfsOut { - namelen: max, - ..self.0 - }) - } -} - -impl Default for FsInfo { - fn default() -> Self { - FsInfo(Zeroable::zeroed()) - } -} - -impl From for proto::StatfsOut { - fn from(FsInfo(statfs): FsInfo) -> proto::StatfsOut { - statfs - } -} diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs deleted file mode 100644 index 84c6878..0000000 --- a/src/fuse/mod.rs +++ /dev/null @@ -1,43 +0,0 @@ -use crate::proto; -use std::marker::PhantomData; - -pub mod io; -pub mod mount; -pub mod ops; -pub mod session; - -mod private_trait { - pub trait Sealed {} -} - -pub trait Operation<'o>: private_trait::Sealed + Sized { - type RequestBody: crate::proto::Structured<'o>; - type ReplyTail; -} - -pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>); - -pub struct Request<'o, O: Operation<'o>> { - header: proto::InHeader, - body: O::RequestBody, -} - -#[must_use] -pub struct Reply<'o, O: Operation<'o>> { - session: &'o session::Session, - unique: u64, - tail: O::ReplyTail, -} - -#[must_use] -pub struct Done<'o>(PhantomData<&'o mut &'o ()>); - -impl Done<'_> { - fn new() -> Self { - Done(PhantomData) - } - - fn consume(self) { - drop(self); - } -} diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs deleted file mode 100644 index c924a9a..0000000 --- a/src/fuse/mount.rs +++ /dev/null @@ -1,168 +0,0 @@ -use std::{ - ffi::{OsStr, OsString}, - io, - os::unix::{ - ffi::OsStrExt, - io::{AsRawFd, RawFd}, - net::UnixStream, - }, - path::{Path, PathBuf}, - process::Command, -}; - -use nix::{ - self, cmsg_space, - fcntl::{fcntl, FcntlArg, FdFlag}, - sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}, -}; - -use quick_error::quick_error; - -use super::session::Start; -use crate::util::DumbFd; - -quick_error! { - #[derive(Debug)] - pub enum MountError { - Io(err: std::io::Error) { from() } - Fusermount { display("fusermount failed") } - } -} - -#[derive(Default)] -pub struct Options(OsString); - -impl Options { - pub fn fs_name>(&mut self, fs_name: O) -> &mut Self { - self.push_key_value("fsname", fs_name) - } - - pub fn read_only(&mut self) -> &mut Self { - self.push("ro") - } - - pub fn push>(&mut self, option: O) -> &mut Self { - self.push_parts(&[option.as_ref()]) - } - - pub fn push_key_value(&mut self, key: K, value: V) -> &mut Self - where - K: AsRef, - V: AsRef, - { - let (key, value) = (key.as_ref(), value.as_ref()); - - let assert_valid = |part: &OsStr| { - let bytes = part.as_bytes(); - assert!( - !bytes.is_empty() && bytes.iter().all(|b| !matches!(*b, b',' | b'=')), - "invalid key or value: {}", - part.to_string_lossy() - ); - }; - - assert_valid(key); - assert_valid(value); - - self.push_parts(&[key, OsStr::new("="), value]) - } - - fn push_parts(&mut self, segment: &[&OsStr]) -> &mut Self { - if !self.0.is_empty() { - self.0.push(","); - } - - let start = self.0.as_bytes().len(); - segment.iter().for_each(|part| self.0.push(part)); - - let bytes = self.0.as_bytes(); - let last = bytes.len() - 1; - - assert!( - last >= start && bytes[start] != b',' && bytes[last] != b',', - "invalid option string: {}", - OsStr::from_bytes(&bytes[start..]).to_string_lossy() - ); - - self - } -} - -impl> Extend for Options { - fn extend>(&mut self, iter: I) { - iter.into_iter().for_each(|option| { - self.push(option); - }); - } -} - -pub fn mount_sync(mountpoint: M, options: &Options) -> Result -where - M: AsRef + Into, -{ - let (left_side, right_side) = UnixStream::pair()?; - - // The fusermount protocol requires us to preserve right_fd across execve() - let right_fd = right_side.as_raw_fd(); - fcntl( - right_fd, - FcntlArg::F_SETFD( - FdFlag::from_bits(fcntl(right_fd, FcntlArg::F_GETFD).unwrap()).unwrap() - & !FdFlag::FD_CLOEXEC, - ), - ) - .unwrap(); - - let mut command = Command::new(FUSERMOUNT_CMD); - if !options.0.is_empty() { - command.args(&[OsStr::new("-o"), &options.0]); - } - - command.args(&[OsStr::new("--"), mountpoint.as_ref().as_ref()]); - let mut fusermount = command.env("_FUSE_COMMFD", right_fd.to_string()).spawn()?; - - // recvmsg() should fail if fusermount exits (last open fd is closed) - drop(right_side); - - let session_fd = { - let mut buffer = cmsg_space!(RawFd); - let message = recvmsg( - left_side.as_raw_fd(), - &[], - Some(&mut buffer), - MsgFlags::empty(), - ) - .map_err(io::Error::from)?; - - let session_fd = match message.cmsgs().next() { - Some(ControlMessageOwned::ScmRights(fds)) => fds.into_iter().next(), - _ => None, - }; - - session_fd.ok_or(MountError::Fusermount) - }; - - match session_fd { - Ok(session_fd) => Ok(Start::new(DumbFd(session_fd), mountpoint.into())), - - Err(error) => { - drop(left_side); - fusermount.wait()?; - Err(error) - } - } -} - -pub(crate) fn unmount_sync>(mountpoint: M) -> Result<(), MountError> { - let status = Command::new(FUSERMOUNT_CMD) - .args(&[OsStr::new("-zuq"), OsStr::new("--"), mountpoint.as_ref()]) - .status()?; - - if status.success() { - Ok(()) - } else { - Err(MountError::Fusermount) - } -} - -const FUSERMOUNT_CMD: &str = "fusermount3"; diff --git a/src/fuse/ops/dir.rs b/src/fuse/ops/dir.rs deleted file mode 100644 index 76267ec..0000000 --- a/src/fuse/ops/dir.rs +++ /dev/null @@ -1,253 +0,0 @@ -use std::{ - convert::Infallible, - ffi::{CStr, OsStr}, - marker::PhantomData, - os::unix::ffi::OsStrExt, -}; - -use crate::fuse::{ - io::{Entry, EntryType, Interruptible, Known, Stat}, - private_trait::Sealed, - Done, Operation, Reply, Request, -}; - -use super::{c_to_os, FromRequest}; -use crate::{proto, Errno, Ino, Ttl}; -use bytemuck::{bytes_of, Zeroable}; -use bytes::BufMut; -use nix::sys::stat::SFlag; - -pub enum Lookup {} -pub enum Readdir {} -pub struct BufferedReaddir(Infallible, PhantomData); - -pub struct ReaddirState { - max_read: usize, - is_plus: bool, - buffer: B, -} - -impl Sealed for Lookup {} -impl Sealed for Readdir {} -impl Sealed for BufferedReaddir {} - -impl<'o> Operation<'o> for Lookup { - type RequestBody = &'o CStr; // name() - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Readdir { - type RequestBody = proto::OpcodeSelect< - &'o proto::ReaddirPlusIn, - &'o proto::ReaddirIn, - { proto::Opcode::ReaddirPlus as u32 }, - >; - - type ReplyTail = ReaddirState<()>; -} - -impl<'o, B> Operation<'o> for BufferedReaddir { - type RequestBody = (); // Never actually created - type ReplyTail = ReaddirState; -} - -impl<'o> Request<'o, Lookup> { - /// Returns the name of the entry being looked up in this directory. - pub fn name(&self) -> &OsStr { - c_to_os(self.body) - } -} - -impl<'o> Reply<'o, Lookup> { - /// The requested entry was found. The FUSE client will become aware of the found inode if - /// it wasn't before. This result may be cached by the client for up to the given TTL. - pub fn found(self, entry: impl Known, ttl: Ttl) -> Done<'o> { - let (attrs, attrs_ttl) = entry.inode().attrs(); - let attrs = attrs.finish(entry.inode()); - - let done = self.single(&make_entry((entry.inode().ino(), ttl), (attrs, attrs_ttl))); - entry.unveil(); - - done - } - - /// The requested entry was not found in this directory. The FUSE clint may include this - /// response in negative cache for up to the given TTL. - pub fn not_found(self, ttl: Ttl) -> Done<'o> { - self.single(&make_entry( - (Ino::NULL, ttl), - (Zeroable::zeroed(), Ttl::NULL), - )) - } - - /// The requested entry was not found in this directory, but unlike [`Reply::not_found()`] - /// this does not report back a TTL to the FUSE client. The client should not cache the - /// response. - pub fn not_found_uncached(self) -> Done<'o> { - self.fail(Errno::ENOENT) - } -} - -impl<'o> Request<'o, Readdir> { - pub fn handle(&self) -> u64 { - self.read_in().fh - } - - /// Returns the base offset in the directory stream to read from. - pub fn offset(&self) -> u64 { - self.read_in().offset - } - - pub fn size(&self) -> u32 { - self.read_in().size - } - - fn read_in(&self) -> &proto::ReadIn { - use proto::OpcodeSelect::*; - - match &self.body { - Match(readdir_plus) => &readdir_plus.read_in, - Alt(readdir) => &readdir.read_in, - } - } -} - -impl<'o> Reply<'o, Readdir> { - pub fn buffered(self, buffer: B) -> Reply<'o, BufferedReaddir> - where - B: BufMut + AsRef<[u8]>, - { - assert!(buffer.as_ref().is_empty()); - - let ReaddirState { - max_read, - is_plus, - buffer: (), - } = self.tail; - - Reply { - session: self.session, - unique: self.unique, - tail: ReaddirState { - max_read, - is_plus, - buffer, - }, - } - } -} - -impl<'o, B: BufMut + AsRef<[u8]>> Reply<'o, BufferedReaddir> { - pub fn entry(mut self, entry: Entry) -> Interruptible<'o, BufferedReaddir, ()> { - let entry_header_len = if self.tail.is_plus { - std::mem::size_of::() - } else { - std::mem::size_of::() - }; - - let name = entry.name.as_bytes(); - let padding_len = dirent_pad_bytes(entry_header_len + name.len()); - - let buffer = &mut self.tail.buffer; - let remaining = buffer - .remaining_mut() - .min(self.tail.max_read - buffer.as_ref().len()); - - let record_len = entry_header_len + name.len() + padding_len; - if remaining < record_len { - if buffer.as_ref().is_empty() { - log::error!("Buffer for readdir req #{} is too small", self.unique); - return Interruptible::Interrupted(self.fail(Errno::ENOBUFS)); - } - - return Interruptible::Interrupted(self.end()); - } - - let inode = entry.inode.inode(); - let entry_type = match inode.inode_type() { - EntryType::Fifo => SFlag::S_IFIFO, - EntryType::CharacterDevice => SFlag::S_IFCHR, - EntryType::Directory => SFlag::S_IFDIR, - EntryType::BlockDevice => SFlag::S_IFBLK, - EntryType::File => SFlag::S_IFREG, - EntryType::Symlink => SFlag::S_IFLNK, - EntryType::Socket => SFlag::S_IFSOCK, - }; - - let ino = inode.ino(); - let dirent = proto::Dirent { - ino: ino.as_raw(), - off: entry.offset, - namelen: name.len().try_into().unwrap(), - entry_type: entry_type.bits() >> 12, - }; - - enum Ent { - Dirent(proto::Dirent), - DirentPlus(proto::DirentPlus), - } - - let ent = if self.tail.is_plus { - let (attrs, attrs_ttl) = inode.attrs(); - let attrs = attrs.finish(inode); - let entry_out = make_entry((ino, entry.ttl), (attrs, attrs_ttl)); - - if name != ".".as_bytes() && name != "..".as_bytes() { - entry.inode.unveil(); - } - - Ent::DirentPlus(proto::DirentPlus { entry_out, dirent }) - } else { - Ent::Dirent(dirent) - }; - - let entry_header = match &ent { - Ent::Dirent(dirent) => bytes_of(dirent), - Ent::DirentPlus(dirent_plus) => bytes_of(dirent_plus), - }; - - buffer.put_slice(entry_header); - buffer.put_slice(name); - buffer.put_slice(&[0; 7][..padding_len]); - - if remaining - record_len >= entry_header.len() + (1 << proto::DIRENT_ALIGNMENT_BITS) { - Interruptible::Completed(self, ()) - } else { - Interruptible::Interrupted(self.end()) - } - } - - pub fn end(self) -> Done<'o> { - self.inner(|this| this.tail.buffer.as_ref()) - } -} - -impl<'o> FromRequest<'o, Readdir> for ReaddirState<()> { - fn from_request(request: &Request<'o, Readdir>) -> Self { - ReaddirState { - max_read: request.size() as usize, - is_plus: matches!(request.body, proto::OpcodeSelect::Match(_)), - buffer: (), - } - } -} - -fn make_entry( - (Ino(ino), entry_ttl): (Ino, Ttl), - (attrs, attr_ttl): (proto::Attrs, Ttl), -) -> proto::EntryOut { - proto::EntryOut { - nodeid: ino, - generation: 0, //TODO - entry_valid: entry_ttl.seconds, - attr_valid: attr_ttl.seconds, - entry_valid_nsec: entry_ttl.nanoseconds, - attr_valid_nsec: attr_ttl.nanoseconds, - attr: attrs, - } -} - -fn dirent_pad_bytes(entry_len: usize) -> usize { - const ALIGN_MASK: usize = (1 << proto::DIRENT_ALIGNMENT_BITS) - 1; - ((entry_len + ALIGN_MASK) & !ALIGN_MASK) - entry_len -} diff --git a/src/fuse/ops/entry.rs b/src/fuse/ops/entry.rs deleted file mode 100644 index 9475a61..0000000 --- a/src/fuse/ops/entry.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::{proto, Ino}; -use crate::fuse::{io::Stat, private_trait::Sealed, Done, Operation, Reply, Request}; - -pub enum Forget {} -pub enum Getattr {} - -impl Sealed for Forget {} -impl Sealed for Getattr {} - -impl<'o> Operation<'o> for Forget { - type RequestBody = proto::OpcodeSelect< - (&'o proto::BatchForgetIn, &'o [proto::ForgetOne]), - &'o proto::ForgetIn, - { proto::Opcode::BatchForget as u32 }, - >; - - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Getattr { - type RequestBody = &'o proto::GetattrIn; - type ReplyTail = (); -} - -impl<'o> Request<'o, Forget> { - pub fn forget_list(&self) -> impl '_ + Iterator { - use proto::OpcodeSelect::*; - - enum List<'a> { - Single(Option<(Ino, u64)>), - Batch(std::slice::Iter<'a, proto::ForgetOne>), - } - - impl Iterator for List<'_> { - type Item = (Ino, u64); - - fn next(&mut self) -> Option { - match self { - List::Single(single) => single.take(), - List::Batch(batch) => { - let forget = batch.next()?; - Some((Ino(forget.ino), forget.nlookup)) - } - } - } - } - - match self.body { - Match((_, slice)) => List::Batch(slice.iter()), - Alt(single) => List::Single(Some((self.ino(), single.nlookup))), - } - } -} - -impl<'o> Reply<'o, Forget> { - pub fn ok(self) -> Done<'o> { - // No reply for forget requests - Done::new() - } -} - -impl<'o> Request<'o, Getattr> { - pub fn handle(&self) -> u64 { - self.body.fh - } -} - -impl<'o> Reply<'o, Getattr> { - pub fn known(self, inode: &impl Stat) -> Done<'o> { - let (attrs, ttl) = inode.attrs(); - let attrs = attrs.finish(inode); - - self.single(&proto::AttrOut { - attr_valid: ttl.seconds, - attr_valid_nsec: ttl.nanoseconds, - dummy: Default::default(), - attr: attrs, - }) - } -} diff --git a/src/fuse/ops/global.rs b/src/fuse/ops/global.rs deleted file mode 100644 index e2ecd7b..0000000 --- a/src/fuse/ops/global.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::{proto, util::page_size}; -use crate::fuse::{io::FsInfo, private_trait::Sealed, Done, Operation, Reply}; - -pub enum Init {} -pub enum Statfs {} - -pub struct InitState { - pub(crate) kernel_flags: proto::InitFlags, - pub(crate) buffer_pages: usize, -} - -impl Sealed for Init {} -impl Sealed for Statfs {} - -impl<'o> Operation<'o> for Init { - type RequestBody = &'o proto::InitIn; - type ReplyTail = InitState; -} - -impl<'o> Operation<'o> for Statfs { - type RequestBody = (); - type ReplyTail = (); -} - -impl<'o> Reply<'o, Init> { - pub fn ok(self) -> Done<'o> { - let InitState { - kernel_flags, - buffer_pages, - } = self.tail; - - let flags = { - use proto::InitFlags; - - //TODO: Conditions for these feature flags - // - Locks - // - ASYNC_DIO - // - WRITEBACK_CACHE - // - NO_OPEN_SUPPORT - // - HANDLE_KILLPRIV - // - POSIX_ACL - // - NO_OPENDIR_SUPPORT - // - EXPLICIT_INVAL_DATA - - let supported = InitFlags::ASYNC_READ - | InitFlags::FILE_OPS - | InitFlags::ATOMIC_O_TRUNC - | InitFlags::EXPORT_SUPPORT - | InitFlags::BIG_WRITES - | InitFlags::HAS_IOCTL_DIR - | InitFlags::AUTO_INVAL_DATA - | InitFlags::DO_READDIRPLUS - | InitFlags::READDIRPLUS_AUTO - | InitFlags::PARALLEL_DIROPS - | InitFlags::ABORT_ERROR - | InitFlags::MAX_PAGES - | InitFlags::CACHE_SYMLINKS; - - kernel_flags & supported - }; - - let buffer_size = page_size() * buffer_pages; - - // See fs/fuse/dev.c in the kernel source tree for details about max_write - let max_write = buffer_size - std::mem::size_of::<(proto::InHeader, proto::WriteIn)>(); - - self.single(&proto::InitOut { - major: proto::MAJOR_VERSION, - minor: proto::TARGET_MINOR_VERSION, - max_readahead: 0, //TODO - flags: flags.bits(), - max_background: 0, //TODO - congestion_threshold: 0, //TODO - max_write: max_write.try_into().unwrap(), - time_gran: 1, //TODO - max_pages: buffer_pages.try_into().unwrap(), - padding: Default::default(), - unused: Default::default(), - }) - } -} - -impl<'o> Reply<'o, Statfs> { - /// Replies with filesystem statistics. - pub fn info(self, statfs: &FsInfo) -> Done<'o> { - self.single(&proto::StatfsOut::from(*statfs)) - } -} diff --git a/src/fuse/ops/mod.rs b/src/fuse/ops/mod.rs deleted file mode 100644 index 39a4ef0..0000000 --- a/src/fuse/ops/mod.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::{ - ffi::{CStr, OsStr}, - os::unix::ffi::OsStrExt, -}; - -use crate::util::OutputChain; -use super::{private_trait::Sealed, Done, Operation, Reply, Request}; -use bytemuck::{bytes_of, Pod}; - -mod dir; -mod entry; -mod global; -mod open; -mod rw; -mod xattr; - -pub use dir::{BufferedReaddir, Lookup, Readdir}; -pub use entry::{Forget, Getattr}; -pub use global::{Init, Statfs}; -pub use open::{Access, Open, Opendir, Release, Releasedir}; -pub use rw::{Flush, Read, Readlink, Write}; -pub use xattr::{Getxattr, Listxattr, Removexattr, Setxattr}; - -pub(crate) use global::InitState; - -pub trait FromRequest<'o, O: Operation<'o>> { - //TODO: Shouldn't be public - fn from_request(request: &Request<'o, O>) -> Self; -} - -pub enum Any {} - -impl Sealed for Any {} - -impl<'o> Operation<'o> for Any { - type RequestBody = (); - type ReplyTail = (); -} - -impl<'o, O: Operation<'o>> FromRequest<'o, O> for () { - fn from_request(_request: &Request<'o, O>) -> Self {} -} - -impl<'o, O: Operation<'o>> Reply<'o, O> { - fn empty(self) -> Done<'o> { - self.chain(OutputChain::empty()) - } - - fn single(self, single: &P) -> Done<'o> { - self.chain(OutputChain::tail(&[bytes_of(single)])) - } - - fn inner(self, deref: impl FnOnce(&Self) -> &[u8]) -> Done<'o> { - let result = self - .session - .ok(self.unique, OutputChain::tail(&[deref(&self)])); - self.finish(result) - } - - fn chain(self, chain: OutputChain<'_>) -> Done<'o> { - let result = self.session.ok(self.unique, chain); - self.finish(result) - } -} - -fn c_to_os(c_str: &CStr) -> &OsStr { - OsStr::from_bytes(c_str.to_bytes()) -} diff --git a/src/fuse/ops/open.rs b/src/fuse/ops/open.rs deleted file mode 100644 index ff41d05..0000000 --- a/src/fuse/ops/open.rs +++ /dev/null @@ -1,131 +0,0 @@ -use crate::fuse::{ - io::{AccessFlags, OpenFlags}, - private_trait::Sealed, - Done, Operation, Reply, Request, -}; - -use crate::{proto, Errno}; -use super::FromRequest; - -pub enum Open {} -pub enum Release {} -pub enum Opendir {} -pub enum Releasedir {} -pub enum Access {} - -impl Sealed for Open {} -impl Sealed for Release {} -impl Sealed for Opendir {} -impl Sealed for Releasedir {} -impl Sealed for Access {} - -impl<'o> Operation<'o> for Open { - type RequestBody = &'o proto::OpenIn; - type ReplyTail = proto::OpenOutFlags; -} - -impl<'o> Operation<'o> for Release { - type RequestBody = &'o proto::ReleaseIn; - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Opendir { - type RequestBody = &'o proto::OpendirIn; - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Releasedir { - type RequestBody = &'o proto::ReleasedirIn; - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Access { - type RequestBody = &'o proto::AccessIn; - type ReplyTail = (); -} - -impl<'o> Request<'o, Open> { - pub fn flags(&self) -> OpenFlags { - OpenFlags::from_bits_truncate(self.body.flags.try_into().unwrap_or_default()) - } -} - -impl<'o> Reply<'o, Open> { - pub fn force_direct_io(&mut self) { - self.tail |= proto::OpenOutFlags::DIRECT_IO; - } - - pub fn ok(self) -> Done<'o> { - self.ok_with_handle(0) - } - - pub fn ok_with_handle(self, handle: u64) -> Done<'o> { - let open_flags = self.tail.bits(); - - self.single(&proto::OpenOut { - fh: handle, - open_flags, - padding: Default::default(), - }) - } -} - -impl<'o> Request<'o, Release> { - pub fn handle(&self) -> u64 { - self.body.fh - } -} - -impl<'o> Reply<'o, Release> { - pub fn ok(self) -> Done<'o> { - self.empty() - } -} - -impl<'o> Reply<'o, Opendir> { - pub fn ok(self) -> Done<'o> { - self.ok_with_handle(0) - } - - pub fn ok_with_handle(self, handle: u64) -> Done<'o> { - self.single(&proto::OpenOut { - fh: handle, - open_flags: 0, - padding: Default::default(), - }) - } -} - -impl<'o> Request<'o, Releasedir> { - pub fn handle(&self) -> u64 { - self.body.release_in.fh - } -} - -impl<'o> Reply<'o, Releasedir> { - pub fn ok(self) -> Done<'o> { - self.empty() - } -} - -impl<'o> Request<'o, Access> { - pub fn mask(&self) -> AccessFlags { - AccessFlags::from_bits_truncate(self.body.mask as i32) - } -} - -impl<'o> Reply<'o, Access> { - pub fn ok(self) -> Done<'o> { - self.empty() - } - - pub fn permission_denied(self) -> Done<'o> { - self.fail(Errno::EACCES) - } -} - -impl<'o> FromRequest<'o, Open> for proto::OpenOutFlags { - fn from_request(_request: &Request<'o, Open>) -> Self { - proto::OpenOutFlags::empty() - } -} diff --git a/src/fuse/ops/rw.rs b/src/fuse/ops/rw.rs deleted file mode 100644 index 726143b..0000000 --- a/src/fuse/ops/rw.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::{ffi::OsStr, os::unix::ffi::OsStrExt}; -use crate::{proto, util::OutputChain}; -use crate::fuse::{private_trait::Sealed, Done, Operation, Reply, Request}; -use super::FromRequest; - -pub enum Readlink {} -pub enum Read {} -pub enum Write {} -pub enum Flush {} - -pub struct WriteState { - size: u32, -} - -impl Sealed for Readlink {} -impl Sealed for Read {} -impl Sealed for Write {} -impl Sealed for Flush {} - -impl<'o> Operation<'o> for Readlink { - type RequestBody = (); - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Read { - type RequestBody = &'o proto::ReadIn; - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Write { - type RequestBody = (&'o proto::WriteIn, &'o [u8]); - type ReplyTail = WriteState; -} - -impl<'o> Operation<'o> for Flush { - type RequestBody = &'o proto::FlushIn; - type ReplyTail = (); -} - -impl<'o> Reply<'o, Readlink> { - /// This inode corresponds to a symbolic link pointing to the given target path. - pub fn target>(self, target: T) -> Done<'o> { - self.chain(OutputChain::tail(&[target.as_ref().as_bytes()])) - } - - /// Same as [`Reply::target()`], except that the target path is taken from disjoint - /// slices. This involves no additional allocation. - pub fn gather_target(self, target: &[&[u8]]) -> Done<'o> { - self.chain(OutputChain::tail(target)) - } -} - -impl<'o> Request<'o, Read> { - pub fn handle(&self) -> u64 { - self.body.fh - } - - pub fn offset(&self) -> u64 { - self.body.offset - } - - pub fn size(&self) -> u32 { - self.body.size - } -} - -impl<'o> Reply<'o, Read> { - pub fn slice(self, data: &[u8]) -> Done<'o> { - self.chain(OutputChain::tail(&[data])) - } -} - -impl<'o> Request<'o, Write> { - pub fn handle(&self) -> u64 { - self.body.0.fh - } - - pub fn offset(&self) -> u64 { - self.body.0.offset - } - - pub fn data(&self) -> &[u8] { - self.body.1 - } -} - -impl<'o> Reply<'o, Write> { - pub fn all(self) -> Done<'o> { - let size = self.tail.size; - self.single(&proto::WriteOut { - size, - padding: Default::default(), - }) - } -} - -impl<'o> Request<'o, Flush> { - pub fn handle(&self) -> u64 { - self.body.fh - } -} - -impl<'o> Reply<'o, Flush> { - pub fn ok(self) -> Done<'o> { - self.empty() - } -} - -impl<'o> FromRequest<'o, Write> for WriteState { - fn from_request(request: &Request<'o, Write>) -> Self { - let (body, data) = request.body; - - if body.size as usize != data.len() { - log::warn!( - "Write size={} differs from data.len={}", - body.size, - data.len() - ); - } - - WriteState { size: body.size } - } -} diff --git a/src/fuse/ops/xattr.rs b/src/fuse/ops/xattr.rs deleted file mode 100644 index 48d10bf..0000000 --- a/src/fuse/ops/xattr.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::ffi::{CStr, OsStr}; -use crate::{proto, util::OutputChain, Errno}; -use crate::fuse::{private_trait::Sealed, Done, Operation, Reply, Request}; -use super::c_to_os; - -pub enum Setxattr {} -pub enum Getxattr {} -pub enum Listxattr {} -pub enum Removexattr {} - -pub struct XattrReadState { - size: u32, -} - -impl Sealed for Setxattr {} -impl Sealed for Getxattr {} -impl Sealed for Listxattr {} -impl Sealed for Removexattr {} - -impl<'o> Operation<'o> for Setxattr { - // header, name, value - type RequestBody = (&'o proto::SetxattrIn, &'o CStr, &'o [u8]); - type ReplyTail = (); -} - -impl<'o> Operation<'o> for Getxattr { - type RequestBody = (&'o proto::GetxattrIn, &'o CStr); - type ReplyTail = XattrReadState; -} - -impl<'o> Operation<'o> for Listxattr { - type RequestBody = &'o proto::ListxattrIn; - type ReplyTail = XattrReadState; -} - -impl<'o> Operation<'o> for Removexattr { - type RequestBody = &'o CStr; - type ReplyTail = (); -} - -//TODO: flags -impl<'o> Request<'o, Setxattr> { - pub fn name(&self) -> &OsStr { - let (_header, name, _value) = self.body; - c_to_os(name) - } - - pub fn value(&self) -> &[u8] { - let (_header, _name, value) = self.body; - value - } -} - -impl<'o> Reply<'o, Setxattr> { - pub fn ok(self) -> Done<'o> { - self.empty() - } - - pub fn not_found(self) -> Done<'o> { - self.fail(Errno::ENODATA) - } -} - -impl<'o> Request<'o, Getxattr> { - pub fn size(&self) -> u32 { - self.body.0.size - } - - pub fn name(&self) -> &OsStr { - c_to_os(self.body.1) - } -} - -impl<'o> Reply<'o, Getxattr> { - pub fn slice(self, value: &[u8]) -> Done<'o> { - let size = value.len().try_into().expect("Extremely large xattr"); - if self.tail.size == 0 { - return self.value_size(size); - } else if self.tail.size < size { - return self.buffer_too_small(); - } - - self.chain(OutputChain::tail(&[value])) - } - - pub fn value_size(self, size: u32) -> Done<'o> { - assert_eq!(self.tail.size, 0); - - self.single(&proto::GetxattrOut { - size, - padding: Default::default(), - }) - } - - pub fn buffer_too_small(self) -> Done<'o> { - self.fail(Errno::ERANGE) - } - - pub fn not_found(self) -> Done<'o> { - self.fail(Errno::ENODATA) - } -} - -impl<'o> Request<'o, Listxattr> { - pub fn size(&self) -> u32 { - self.body.getxattr_in.size - } -} - -impl<'o> Reply<'o, Listxattr> { - //TODO: buffered(), gather() - - pub fn value_size(self, size: u32) -> Done<'o> { - assert_eq!(self.tail.size, 0); - - self.single(&proto::ListxattrOut { - getxattr_out: proto::GetxattrOut { - size, - padding: Default::default(), - }, - }) - } - - pub fn buffer_too_small(self) -> Done<'o> { - self.fail(Errno::ERANGE) - } -} - -impl<'o> Request<'o, Removexattr> { - pub fn name(&self) -> &OsStr { - c_to_os(self.body) - } -} - -impl<'o> Reply<'o, Removexattr> { - pub fn ok(self) -> Done<'o> { - self.empty() - } -} diff --git a/src/fuse/session.rs b/src/fuse/session.rs deleted file mode 100644 index e83a8d4..0000000 --- a/src/fuse/session.rs +++ /dev/null @@ -1,559 +0,0 @@ -use std::{ - future::Future, - io, - marker::PhantomData, - ops::ControlFlow, - os::unix::io::{IntoRawFd, RawFd}, - path::PathBuf, - sync::{Arc, Mutex}, -}; - -use nix::{ - fcntl::{fcntl, FcntlArg, OFlag}, - sys::uio::{writev, IoVec}, - unistd::read, -}; - -use tokio::{ - io::unix::AsyncFd, - sync::{broadcast, OwnedSemaphorePermit, Semaphore}, -}; - -use bytemuck::bytes_of; -use smallvec::SmallVec; - -use crate::{ - mount::{unmount_sync, MountError}, - proto::{self, InHeader, Structured}, - util::{page_size, DumbFd, OutputChain}, - Errno, FuseError, FuseResult, -}; - -use super::{ - ops::{self, FromRequest}, - Done, Op, Operation, Reply, Request, -}; - -pub struct Start { - session_fd: DumbFd, - mountpoint: PathBuf, -} - -pub struct Session { - session_fd: AsyncFd, - interrupt_tx: broadcast::Sender, - buffers: Mutex>, - buffer_semaphore: Arc, - buffer_pages: usize, - mountpoint: Mutex>, -} - -pub struct Endpoint<'a> { - session: &'a Arc, - local_buffer: Buffer, -} - -pub enum Dispatch<'o> { - Lookup(Incoming<'o, ops::Lookup>), - Forget(Incoming<'o, ops::Forget>), - Getattr(Incoming<'o, ops::Getattr>), - Readlink(Incoming<'o, ops::Readlink>), - Open(Incoming<'o, ops::Open>), - Read(Incoming<'o, ops::Read>), - Write(Incoming<'o, ops::Write>), - Statfs(Incoming<'o, ops::Statfs>), - Release(Incoming<'o, ops::Release>), - Setxattr(Incoming<'o, ops::Setxattr>), - Getxattr(Incoming<'o, ops::Getxattr>), - Listxattr(Incoming<'o, ops::Listxattr>), - Removexattr(Incoming<'o, ops::Removexattr>), - Flush(Incoming<'o, ops::Flush>), - Opendir(Incoming<'o, ops::Opendir>), - Readdir(Incoming<'o, ops::Readdir>), - Releasedir(Incoming<'o, ops::Releasedir>), - Access(Incoming<'o, ops::Access>), -} - -pub struct Incoming<'o, O: Operation<'o>> { - common: IncomingCommon<'o>, - _phantom: PhantomData, -} - -pub struct Owned { - session: Arc, - buffer: Buffer, - header: InHeader, - _permit: OwnedSemaphorePermit, - _phantom: PhantomData, -} - -impl Session { - // Does not seem like 'a can be elided here - #[allow(clippy::needless_lifetimes)] - pub fn endpoint<'a>(self: &'a Arc) -> Endpoint<'a> { - Endpoint { - session: self, - local_buffer: Buffer::new(self.buffer_pages), - } - } - - pub fn unmount_sync(&self) -> Result<(), MountError> { - let mountpoint = self.mountpoint.lock().unwrap().take(); - if let Some(mountpoint) = &mountpoint { - unmount_sync(mountpoint)?; - } - - Ok(()) - } - - pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { - self.send(unique, 0, output) - } - - pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> { - if errno <= 0 { - log::warn!( - "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", - unique, - errno - ); - - errno = Errno::ENOMSG as i32; - } - - self.send(unique, -errno, OutputChain::empty()) - } - - pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver { - self.interrupt_tx.subscribe() - } - - async fn handshake(&mut self, buffer: &mut Buffer, init: F) -> FuseResult> - where - F: FnOnce(Op<'_, ops::Init>) -> Done<'_>, - { - self.session_fd.readable().await?.retain_ready(); - let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?; - - let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?; - let body = match opcode { - proto::Opcode::Init => { - <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes], &header)? - } - - _ => { - log::error!("First message from kernel is not Init, but {:?}", opcode); - return Err(FuseError::ProtocolInit); - } - }; - - use std::cmp::Ordering; - let supported = match body.major.cmp(&proto::MAJOR_VERSION) { - Ordering::Less => false, - Ordering::Equal => body.minor >= proto::REQUIRED_MINOR_VERSION, - Ordering::Greater => { - let tail = [bytes_of(&proto::MAJOR_VERSION)]; - self.ok(header.unique, OutputChain::tail(&tail))?; - - return Ok(Handshake::Restart(init)); - } - }; - - //TODO: fake some decency by supporting a few older minor versions - if !supported { - log::error!( - "Unsupported protocol {}.{}; this build requires \ - {major}.{}..={major}.{} (or a greater version \ - through compatibility)", - body.major, - body.minor, - proto::REQUIRED_MINOR_VERSION, - proto::TARGET_MINOR_VERSION, - major = proto::MAJOR_VERSION - ); - - self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?; - return Err(FuseError::ProtocolInit); - } - - let request = Request { header, body }; - let reply = Reply { - session: self, - unique: header.unique, - tail: ops::InitState { - kernel_flags: proto::InitFlags::from_bits_truncate(body.flags), - buffer_pages: self.buffer_pages, - }, - }; - - init((request, reply)).consume(); - Ok(Handshake::Done) - } - - fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { - let after_header: usize = output - .iter() - .flat_map(<[_]>::iter) - .copied() - .map(<[_]>::len) - .sum(); - - let length = (std::mem::size_of::() + after_header) as _; - let header = proto::OutHeader { - len: length, - error, - unique, - }; - - //TODO: Full const generics any time now? Fs::EXPECTED_REQUEST_SEGMENTS - let header = [bytes_of(&header)]; - let output = output.preceded(&header); - let buffers: SmallVec<[_; 8]> = output - .iter() - .flat_map(<[_]>::iter) - .copied() - .filter(|slice| !slice.is_empty()) - .map(IoVec::from_slice) - .collect(); - - let written = writev(*self.session_fd.get_ref(), &buffers).map_err(io::Error::from)?; - if written == length as usize { - Ok(()) - } else { - Err(FuseError::ShortWrite) - } - } -} - -impl Drop for Start { - fn drop(&mut self) { - if !self.mountpoint.as_os_str().is_empty() { - let _ = unmount_sync(&self.mountpoint); - } - } -} - -impl Drop for Session { - fn drop(&mut self) { - if let Some(mountpoint) = self.mountpoint.get_mut().unwrap().take() { - let _ = unmount_sync(&mountpoint); - } - - drop(DumbFd(*self.session_fd.get_ref())); // Close - } -} - -impl<'o> Dispatch<'o> { - pub fn op(self) -> Op<'o> { - use Dispatch::*; - - let common = match self { - Lookup(incoming) => incoming.common, - Forget(incoming) => incoming.common, - Getattr(incoming) => incoming.common, - Readlink(incoming) => incoming.common, - Open(incoming) => incoming.common, - Read(incoming) => incoming.common, - Write(incoming) => incoming.common, - Statfs(incoming) => incoming.common, - Release(incoming) => incoming.common, - Setxattr(incoming) => incoming.common, - Getxattr(incoming) => incoming.common, - Listxattr(incoming) => incoming.common, - Removexattr(incoming) => incoming.common, - Flush(incoming) => incoming.common, - Opendir(incoming) => incoming.common, - Readdir(incoming) => incoming.common, - Releasedir(incoming) => incoming.common, - Access(incoming) => incoming.common, - }; - - common.into_generic_op() - } -} - -impl Endpoint<'_> { - pub async fn receive<'o, F, Fut>(&'o mut self, dispatcher: F) -> FuseResult> - where - F: FnOnce(Dispatch<'o>) -> Fut, - Fut: Future>, - { - let buffer = &mut self.local_buffer.0; - let bytes = loop { - let session_fd = &self.session.session_fd; - - let mut readable = tokio::select! { - readable = session_fd.readable() => readable?, - - _ = session_fd.writable() => { - self.session.mountpoint.lock().unwrap().take(); - return Ok(ControlFlow::Break(())); - } - }; - - let mut read = |fd: &AsyncFd| read(*fd.get_ref(), buffer); - let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { - Ok(result) => result, - Err(_) => continue, - }; - - match result { - // Interrupted - //TODO: libfuse docs say that this has some side effects - Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue, - - result => break result, - } - }; - - let (header, opcode) = InHeader::from_bytes(&buffer[..bytes?])?; - let common = IncomingCommon { - session: self.session, - buffer: &mut self.local_buffer, - header, - }; - - let dispatch = { - use proto::Opcode::*; - - macro_rules! dispatch { - ($op:ident) => { - Dispatch::$op(Incoming { - common, - _phantom: PhantomData, - }) - }; - } - - match opcode { - Destroy => return Ok(ControlFlow::Break(())), - - Lookup => dispatch!(Lookup), - Forget => dispatch!(Forget), - Getattr => dispatch!(Getattr), - Readlink => dispatch!(Readlink), - Open => dispatch!(Open), - Read => dispatch!(Read), - Write => dispatch!(Write), - Statfs => dispatch!(Statfs), - Release => dispatch!(Release), - Setxattr => dispatch!(Setxattr), - Getxattr => dispatch!(Getxattr), - Listxattr => dispatch!(Listxattr), - Removexattr => dispatch!(Removexattr), - Flush => dispatch!(Flush), - Opendir => dispatch!(Opendir), - Readdir => dispatch!(Readdir), - Releasedir => dispatch!(Releasedir), - Access => dispatch!(Access), - BatchForget => dispatch!(Forget), - ReaddirPlus => dispatch!(Readdir), - - _ => { - log::warn!("Not implemented: {}", common.header); - - let (_request, reply) = common.into_generic_op(); - reply.not_implemented().consume(); - - return Ok(ControlFlow::Continue(())); - } - } - }; - - dispatcher(dispatch).await.consume(); - Ok(ControlFlow::Continue(())) - } -} - -impl Start { - pub async fn start(mut self, mut init: F) -> FuseResult> - where - F: FnOnce(Op<'_, ops::Init>) -> Done<'_>, - { - let mountpoint = std::mem::take(&mut self.mountpoint); - let session_fd = self.session_fd.take().into_raw_fd(); - - let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; - fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); - - let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); - - let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO - let buffer_count = SHARED_BUFFERS; //TODO - let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages)) - .take(buffer_count) - .collect(); - - let mut session = Session { - session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, - interrupt_tx, - buffers: Mutex::new(buffers), - buffer_semaphore: Arc::new(Semaphore::new(buffer_count)), - buffer_pages, - mountpoint: Mutex::new(Some(mountpoint)), - }; - - let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap(); - - loop { - init = match session.handshake(&mut init_buffer, init).await? { - Handshake::Restart(init) => init, - Handshake::Done => { - session.buffers.get_mut().unwrap().push(init_buffer); - break Ok(Arc::new(session)); - } - }; - } - } - - pub fn unmount_sync(mut self) -> Result<(), MountError> { - // This prevents Start::drop() from unmounting a second time - let mountpoint = std::mem::take(&mut self.mountpoint); - unmount_sync(&mountpoint) - } - - pub(crate) fn new(session_fd: DumbFd, mountpoint: PathBuf) -> Self { - Start { - session_fd, - mountpoint, - } - } -} - -impl<'o, O: Operation<'o>> Incoming<'o, O> -where - O::ReplyTail: FromRequest<'o, O>, -{ - pub fn op(self) -> Result, Done<'o>> { - try_op( - self.common.session, - &self.common.buffer.0, - self.common.header, - ) - } - - pub async fn owned(self) -> (Done<'o>, Owned) { - let session = self.common.session; - - let (buffer, permit) = { - let semaphore = Arc::clone(&session.buffer_semaphore); - let permit = semaphore - .acquire_owned() - .await - .expect("Buffer semaphore error"); - - let mut buffers = session.buffers.lock().unwrap(); - let buffer = buffers.pop().expect("Buffer semaphore out of sync"); - let buffer = std::mem::replace(self.common.buffer, buffer); - - (buffer, permit) - }; - - let owned = Owned { - session: Arc::clone(session), - buffer, - header: self.common.header, - _permit: permit, - _phantom: PhantomData, - }; - - (Done::new(), owned) - } -} - -impl Operation<'o>> Owned -where - for<'o> >::ReplyTail: FromRequest<'o, O>, -{ - pub async fn op<'o, F, Fut>(&'o self, handler: F) - where - F: FnOnce(Op<'o, O>) -> Fut, - Fut: Future>, - { - match try_op(&self.session, &self.buffer.0, self.header) { - Ok(op) => handler(op).await.consume(), - Err(done) => done.consume(), - } - } -} - -impl Drop for Owned { - fn drop(&mut self) { - if let Ok(mut buffers) = self.session.buffers.lock() { - let empty = Buffer(Vec::new().into_boxed_slice()); - buffers.push(std::mem::replace(&mut self.buffer, empty)); - } - } -} - -const INTERRUPT_BROADCAST_CAPACITY: usize = 32; -const SHARED_BUFFERS: usize = 32; -const HEADER_END: usize = std::mem::size_of::(); - -struct IncomingCommon<'o> { - session: &'o Arc, - buffer: &'o mut Buffer, - header: InHeader, -} - -enum Handshake { - Done, - Restart(F), -} - -struct Buffer(Box<[u8]>); - -impl<'o> IncomingCommon<'o> { - fn into_generic_op(self) -> Op<'o> { - let request = Request { - header: self.header, - body: (), - }; - - let reply = Reply { - session: self.session, - unique: self.header.unique, - tail: (), - }; - - (request, reply) - } -} - -impl Buffer { - fn new(pages: usize) -> Self { - Buffer(vec![0; pages * page_size()].into_boxed_slice()) - } -} - -fn try_op<'o, O: Operation<'o>>( - session: &'o Session, - bytes: &'o [u8], - header: InHeader, -) -> Result, Done<'o>> -where - O::ReplyTail: FromRequest<'o, O>, -{ - let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize], &header) { - Ok(body) => body, - Err(error) => { - log::error!("Parsing request {}: {}", header, error); - let reply = Reply:: { - session, - unique: header.unique, - tail: (), - }; - - return Err(reply.io_error()); - } - }; - - let request = Request { header, body }; - let reply = Reply { - session, - unique: header.unique, - tail: FromRequest::from_request(&request), - }; - - Ok((request, reply)) -} diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..7124fda --- /dev/null +++ b/src/io.rs @@ -0,0 +1,379 @@ +use bytemuck::Zeroable; +use nix::{errno::Errno, sys::stat::SFlag}; + +use std::{ + convert::Infallible, + ffi::OsStr, + future::Future, + ops::{ControlFlow, FromResidual, Try}, +}; + +use super::{Done, Operation, Reply, Request}; +use crate::{proto, FuseResult, Ino, Timestamp, Ttl}; + +#[doc(no_inline)] +pub use nix::{ + dir::Type as EntryType, + fcntl::OFlag as OpenFlags, + sys::stat::Mode, + unistd::{AccessFlags, Gid, Pid, Uid}, +}; + +pub enum Interruptible<'o, O: Operation<'o>, T> { + Completed(Reply<'o, O>, T), + Interrupted(Done<'o>), +} + +pub trait Stat { + fn ino(&self) -> Ino; + fn inode_type(&self) -> EntryType; + fn attrs(&self) -> (Attrs, Ttl); +} + +pub trait Known { + type Inode: Stat; + + fn inode(&self) -> &Self::Inode; + fn unveil(self); +} + +pub struct Failed<'o, E>(pub Done<'o>, pub E); + +pub trait Finish<'o, O: Operation<'o>> { + fn finish(&self, reply: Reply<'o, O>) -> Done<'o>; +} + +#[derive(Clone)] +pub struct Attrs(proto::Attrs); + +pub struct Entry<'a, K> { + pub offset: u64, + pub name: &'a OsStr, + pub inode: K, + pub ttl: Ttl, +} + +#[derive(Copy, Clone)] +pub struct FsInfo(proto::StatfsOut); + +impl<'o, E> From> for Done<'o> { + fn from(failed: Failed<'o, E>) -> Done<'o> { + failed.0 + } +} + +impl<'o, O: Operation<'o>> Finish<'o, O> for Errno { + fn finish(&self, reply: Reply<'o, O>) -> Done<'o> { + reply.fail(*self) + } +} + +impl<'o, O: Operation<'o>> Finish<'o, O> for std::io::Error { + fn finish(&self, reply: Reply<'o, O>) -> Done<'o> { + reply.fail( + self.raw_os_error() + .map(Errno::from_i32) + .unwrap_or(Errno::EIO), + ) + } +} + +impl<'o, O: Operation<'o>> Request<'o, O> { + pub fn ino(&self) -> Ino { + Ino(self.header.ino) + } + + pub fn generation(&self) -> u64 { + 0 + } + + pub fn uid(&self) -> Uid { + Uid::from_raw(self.header.uid) + } + + pub fn gid(&self) -> Gid { + Gid::from_raw(self.header.gid) + } + + pub fn pid(&self) -> Pid { + Pid::from_raw(self.header.pid as i32) + } +} + +impl<'o, O: Operation<'o>> Reply<'o, O> { + pub async fn interruptible(self, f: F) -> Interruptible<'o, O, T> + where + F: Future, + { + tokio::pin!(f); + let mut rx = self.session.interrupt_rx(); + + use Interruptible::*; + loop { + tokio::select! { + output = &mut f => break Completed(self, output), + + result = rx.recv() => match result { + Ok(unique) if unique == self.unique => { + break Interrupted(self.interrupted()); + } + + _ => continue, + } + } + } + } + + pub fn and_then(self, result: Result) -> Result<(Self, T), Failed<'o, E>> + where + E: Finish<'o, O>, + { + match result { + Ok(t) => Ok((self, t)), + Err(error) => { + let done = error.finish(self); + Err(Failed(done, error)) + } + } + } + + pub fn fail(self, errno: Errno) -> Done<'o> { + let result = self.session.fail(self.unique, errno as i32); + self.finish(result) + } + + pub fn not_implemented(self) -> Done<'o> { + self.fail(Errno::ENOSYS) + } + + pub fn not_permitted(self) -> Done<'o> { + self.fail(Errno::EPERM) + } + + pub fn io_error(self) -> Done<'o> { + self.fail(Errno::EIO) + } + + pub fn invalid_argument(self) -> Done<'o> { + self.fail(Errno::EINVAL) + } + + pub fn interrupted(self) -> Done<'o> { + self.fail(Errno::EINTR) + } + + pub(crate) fn finish(self, result: FuseResult<()>) -> Done<'o> { + if let Err(error) = result { + log::error!("Replying to request {}: {}", self.unique, error); + } + + Done::new() + } +} + +impl<'o, O: Operation<'o>> From<(Reply<'o, O>, Errno)> for Done<'o> { + fn from((reply, errno): (Reply<'o, O>, Errno)) -> Done<'o> { + reply.fail(errno) + } +} + +impl<'o> FromResidual> for Done<'o> { + fn from_residual(residual: Done<'o>) -> Self { + residual + } +} + +impl<'o, T: Into>> FromResidual> for Done<'o> { + fn from_residual(residual: Result) -> Self { + match residual { + Ok(_) => unreachable!(), + Err(t) => t.into(), + } + } +} + +impl<'o, O: Operation<'o>> FromResidual> for Done<'o> { + fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { + match residual { + Interruptible::Completed(_, _) => unreachable!(), + Interruptible::Interrupted(done) => done, + } + } +} + +impl Try for Done<'_> { + type Output = Self; + type Residual = Self; + + fn from_output(output: Self::Output) -> Self { + output + } + + fn branch(self) -> ControlFlow { + ControlFlow::Break(self) + } +} + +impl<'o, O: Operation<'o>, T> FromResidual> + for Interruptible<'o, O, T> +{ + fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { + use Interruptible::*; + + match residual { + Completed(_, _) => unreachable!(), + Interrupted(done) => Interrupted(done), + } + } +} + +impl<'o, O: Operation<'o>, T> Try for Interruptible<'o, O, T> { + type Output = (Reply<'o, O>, T); + type Residual = Interruptible<'o, O, Infallible>; + + fn from_output((reply, t): Self::Output) -> Self { + Self::Completed(reply, t) + } + + fn branch(self) -> ControlFlow { + use Interruptible::*; + + match self { + Completed(reply, t) => ControlFlow::Continue((reply, t)), + Interrupted(done) => ControlFlow::Break(Interrupted(done)), + } + } +} + +impl Attrs { + #[must_use] + pub fn size(self, size: u64) -> Self { + Attrs(proto::Attrs { size, ..self.0 }) + } + + #[must_use] + pub fn owner(self, uid: Uid, gid: Gid) -> Self { + Attrs(proto::Attrs { + uid: uid.as_raw(), + gid: gid.as_raw(), + ..self.0 + }) + } + + #[must_use] + pub fn mode(self, mode: Mode) -> Self { + Attrs(proto::Attrs { + mode: mode.bits(), + ..self.0 + }) + } + + #[must_use] + pub fn blocks(self, blocks: u64) -> Self { + Attrs(proto::Attrs { blocks, ..self.0 }) + } + + #[must_use] + pub fn block_size(self, block_size: u32) -> Self { + Attrs(proto::Attrs { + blksize: block_size, + ..self.0 + }) + } + + #[must_use] + pub fn device(self, device: u32) -> Self { + Attrs(proto::Attrs { + rdev: device, + ..self.0 + }) + } + + #[must_use] + pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self { + Attrs(proto::Attrs { + atime: access.seconds as _, + mtime: modify.seconds as _, + ctime: change.seconds as _, + atimensec: access.nanoseconds, + mtimensec: modify.nanoseconds, + ctimensec: change.nanoseconds, + ..self.0 + }) + } + + #[must_use] + pub fn links(self, links: u32) -> Self { + Attrs(proto::Attrs { + nlink: links, + ..self.0 + }) + } + + pub(crate) fn finish(self, inode: &impl Stat) -> proto::Attrs { + let Ino(ino) = inode.ino(); + let inode_type = match inode.inode_type() { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }; + + proto::Attrs { + ino, + mode: self.0.mode | inode_type.bits(), + ..self.0 + } + } +} + +impl Default for Attrs { + fn default() -> Self { + Attrs(Zeroable::zeroed()).links(1) + } +} + +impl FsInfo { + #[must_use] + pub fn blocks(self, size: u32, total: u64, free: u64, available: u64) -> Self { + FsInfo(proto::StatfsOut { + bsize: size, + blocks: total, + bfree: free, + bavail: available, + ..self.0 + }) + } + + #[must_use] + pub fn inodes(self, total: u64, free: u64) -> Self { + FsInfo(proto::StatfsOut { + files: total, + ffree: free, + ..self.0 + }) + } + + #[must_use] + pub fn max_filename(self, max: u32) -> Self { + FsInfo(proto::StatfsOut { + namelen: max, + ..self.0 + }) + } +} + +impl Default for FsInfo { + fn default() -> Self { + FsInfo(Zeroable::zeroed()) + } +} + +impl From for proto::StatfsOut { + fn from(FsInfo(statfs): FsInfo) -> proto::StatfsOut { + statfs + } +} diff --git a/src/lib.rs b/src/lib.rs index 891c022..f811ba1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,16 +8,51 @@ #[cfg(not(target_os = "linux"))] compile_error!("Unsupported OS"); -use std::time::{SystemTime, UNIX_EPOCH}; +use std::{ + marker::PhantomData, + time::{SystemTime, UNIX_EPOCH}, +}; pub use nix; -pub use crate::fuse::*; pub use nix::errno::Errno; pub use util::{FuseError, FuseResult}; +pub mod io; +pub mod mount; +pub mod ops; +pub mod session; + mod proto; mod util; -mod fuse; + +pub trait Operation<'o>: private_trait::Sealed + Sized { + type RequestBody: crate::proto::Structured<'o>; + type ReplyTail; +} + +pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>); + +pub struct Request<'o, O: Operation<'o>> { + header: proto::InHeader, + body: O::RequestBody, +} + +#[must_use] +pub struct Reply<'o, O: Operation<'o>> { + session: &'o session::Session, + unique: u64, + tail: O::ReplyTail, +} + +/// Inode number. +/// +/// This is a public newtype. Users are expected to inspect the underlying `u64` and construct +/// arbitrary `Ino` objects. +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] +pub struct Ino(pub u64); + +#[must_use] +pub struct Done<'o>(PhantomData<&'o mut &'o ()>); #[derive(Copy, Clone, Eq, PartialEq)] pub struct Ttl { @@ -31,12 +66,15 @@ pub struct Timestamp { nanoseconds: u32, } -/// Inode number. -/// -/// This is a public newtype. Users are expected to inspect the underlying `u64` and construct -/// arbitrary `Ino` objects. -#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] -pub struct Ino(pub u64); +impl Done<'_> { + fn new() -> Self { + Done(PhantomData) + } + + fn consume(self) { + drop(self); + } +} impl Ino { /// The invalid inode number, mostly useful for internal aspects of the FUSE protocol. @@ -122,3 +160,7 @@ impl From for Timestamp { } } } + +mod private_trait { + pub trait Sealed {} +} diff --git a/src/mod.rs b/src/mod.rs new file mode 100644 index 0000000..84c6878 --- /dev/null +++ b/src/mod.rs @@ -0,0 +1,43 @@ +use crate::proto; +use std::marker::PhantomData; + +pub mod io; +pub mod mount; +pub mod ops; +pub mod session; + +mod private_trait { + pub trait Sealed {} +} + +pub trait Operation<'o>: private_trait::Sealed + Sized { + type RequestBody: crate::proto::Structured<'o>; + type ReplyTail; +} + +pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>); + +pub struct Request<'o, O: Operation<'o>> { + header: proto::InHeader, + body: O::RequestBody, +} + +#[must_use] +pub struct Reply<'o, O: Operation<'o>> { + session: &'o session::Session, + unique: u64, + tail: O::ReplyTail, +} + +#[must_use] +pub struct Done<'o>(PhantomData<&'o mut &'o ()>); + +impl Done<'_> { + fn new() -> Self { + Done(PhantomData) + } + + fn consume(self) { + drop(self); + } +} diff --git a/src/mount.rs b/src/mount.rs new file mode 100644 index 0000000..c924a9a --- /dev/null +++ b/src/mount.rs @@ -0,0 +1,168 @@ +use std::{ + ffi::{OsStr, OsString}, + io, + os::unix::{ + ffi::OsStrExt, + io::{AsRawFd, RawFd}, + net::UnixStream, + }, + path::{Path, PathBuf}, + process::Command, +}; + +use nix::{ + self, cmsg_space, + fcntl::{fcntl, FcntlArg, FdFlag}, + sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}, +}; + +use quick_error::quick_error; + +use super::session::Start; +use crate::util::DumbFd; + +quick_error! { + #[derive(Debug)] + pub enum MountError { + Io(err: std::io::Error) { from() } + Fusermount { display("fusermount failed") } + } +} + +#[derive(Default)] +pub struct Options(OsString); + +impl Options { + pub fn fs_name>(&mut self, fs_name: O) -> &mut Self { + self.push_key_value("fsname", fs_name) + } + + pub fn read_only(&mut self) -> &mut Self { + self.push("ro") + } + + pub fn push>(&mut self, option: O) -> &mut Self { + self.push_parts(&[option.as_ref()]) + } + + pub fn push_key_value(&mut self, key: K, value: V) -> &mut Self + where + K: AsRef, + V: AsRef, + { + let (key, value) = (key.as_ref(), value.as_ref()); + + let assert_valid = |part: &OsStr| { + let bytes = part.as_bytes(); + assert!( + !bytes.is_empty() && bytes.iter().all(|b| !matches!(*b, b',' | b'=')), + "invalid key or value: {}", + part.to_string_lossy() + ); + }; + + assert_valid(key); + assert_valid(value); + + self.push_parts(&[key, OsStr::new("="), value]) + } + + fn push_parts(&mut self, segment: &[&OsStr]) -> &mut Self { + if !self.0.is_empty() { + self.0.push(","); + } + + let start = self.0.as_bytes().len(); + segment.iter().for_each(|part| self.0.push(part)); + + let bytes = self.0.as_bytes(); + let last = bytes.len() - 1; + + assert!( + last >= start && bytes[start] != b',' && bytes[last] != b',', + "invalid option string: {}", + OsStr::from_bytes(&bytes[start..]).to_string_lossy() + ); + + self + } +} + +impl> Extend for Options { + fn extend>(&mut self, iter: I) { + iter.into_iter().for_each(|option| { + self.push(option); + }); + } +} + +pub fn mount_sync(mountpoint: M, options: &Options) -> Result +where + M: AsRef + Into, +{ + let (left_side, right_side) = UnixStream::pair()?; + + // The fusermount protocol requires us to preserve right_fd across execve() + let right_fd = right_side.as_raw_fd(); + fcntl( + right_fd, + FcntlArg::F_SETFD( + FdFlag::from_bits(fcntl(right_fd, FcntlArg::F_GETFD).unwrap()).unwrap() + & !FdFlag::FD_CLOEXEC, + ), + ) + .unwrap(); + + let mut command = Command::new(FUSERMOUNT_CMD); + if !options.0.is_empty() { + command.args(&[OsStr::new("-o"), &options.0]); + } + + command.args(&[OsStr::new("--"), mountpoint.as_ref().as_ref()]); + let mut fusermount = command.env("_FUSE_COMMFD", right_fd.to_string()).spawn()?; + + // recvmsg() should fail if fusermount exits (last open fd is closed) + drop(right_side); + + let session_fd = { + let mut buffer = cmsg_space!(RawFd); + let message = recvmsg( + left_side.as_raw_fd(), + &[], + Some(&mut buffer), + MsgFlags::empty(), + ) + .map_err(io::Error::from)?; + + let session_fd = match message.cmsgs().next() { + Some(ControlMessageOwned::ScmRights(fds)) => fds.into_iter().next(), + _ => None, + }; + + session_fd.ok_or(MountError::Fusermount) + }; + + match session_fd { + Ok(session_fd) => Ok(Start::new(DumbFd(session_fd), mountpoint.into())), + + Err(error) => { + drop(left_side); + fusermount.wait()?; + Err(error) + } + } +} + +pub(crate) fn unmount_sync>(mountpoint: M) -> Result<(), MountError> { + let status = Command::new(FUSERMOUNT_CMD) + .args(&[OsStr::new("-zuq"), OsStr::new("--"), mountpoint.as_ref()]) + .status()?; + + if status.success() { + Ok(()) + } else { + Err(MountError::Fusermount) + } +} + +const FUSERMOUNT_CMD: &str = "fusermount3"; diff --git a/src/ops/dir.rs b/src/ops/dir.rs new file mode 100644 index 0000000..cb3a4f7 --- /dev/null +++ b/src/ops/dir.rs @@ -0,0 +1,253 @@ +use std::{ + convert::Infallible, + ffi::{CStr, OsStr}, + marker::PhantomData, + os::unix::ffi::OsStrExt, +}; + +use crate::{ + io::{Entry, EntryType, Interruptible, Known, Stat}, + private_trait::Sealed, + Done, Operation, Reply, Request, +}; + +use super::{c_to_os, FromRequest}; +use crate::{proto, Errno, Ino, Ttl}; +use bytemuck::{bytes_of, Zeroable}; +use bytes::BufMut; +use nix::sys::stat::SFlag; + +pub enum Lookup {} +pub enum Readdir {} +pub struct BufferedReaddir(Infallible, PhantomData); + +pub struct ReaddirState { + max_read: usize, + is_plus: bool, + buffer: B, +} + +impl Sealed for Lookup {} +impl Sealed for Readdir {} +impl Sealed for BufferedReaddir {} + +impl<'o> Operation<'o> for Lookup { + type RequestBody = &'o CStr; // name() + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Readdir { + type RequestBody = proto::OpcodeSelect< + &'o proto::ReaddirPlusIn, + &'o proto::ReaddirIn, + { proto::Opcode::ReaddirPlus as u32 }, + >; + + type ReplyTail = ReaddirState<()>; +} + +impl<'o, B> Operation<'o> for BufferedReaddir { + type RequestBody = (); // Never actually created + type ReplyTail = ReaddirState; +} + +impl<'o> Request<'o, Lookup> { + /// Returns the name of the entry being looked up in this directory. + pub fn name(&self) -> &OsStr { + c_to_os(self.body) + } +} + +impl<'o> Reply<'o, Lookup> { + /// The requested entry was found. The FUSE client will become aware of the found inode if + /// it wasn't before. This result may be cached by the client for up to the given TTL. + pub fn found(self, entry: impl Known, ttl: Ttl) -> Done<'o> { + let (attrs, attrs_ttl) = entry.inode().attrs(); + let attrs = attrs.finish(entry.inode()); + + let done = self.single(&make_entry((entry.inode().ino(), ttl), (attrs, attrs_ttl))); + entry.unveil(); + + done + } + + /// The requested entry was not found in this directory. The FUSE clint may include this + /// response in negative cache for up to the given TTL. + pub fn not_found(self, ttl: Ttl) -> Done<'o> { + self.single(&make_entry( + (Ino::NULL, ttl), + (Zeroable::zeroed(), Ttl::NULL), + )) + } + + /// The requested entry was not found in this directory, but unlike [`Reply::not_found()`] + /// this does not report back a TTL to the FUSE client. The client should not cache the + /// response. + pub fn not_found_uncached(self) -> Done<'o> { + self.fail(Errno::ENOENT) + } +} + +impl<'o> Request<'o, Readdir> { + pub fn handle(&self) -> u64 { + self.read_in().fh + } + + /// Returns the base offset in the directory stream to read from. + pub fn offset(&self) -> u64 { + self.read_in().offset + } + + pub fn size(&self) -> u32 { + self.read_in().size + } + + fn read_in(&self) -> &proto::ReadIn { + use proto::OpcodeSelect::*; + + match &self.body { + Match(readdir_plus) => &readdir_plus.read_in, + Alt(readdir) => &readdir.read_in, + } + } +} + +impl<'o> Reply<'o, Readdir> { + pub fn buffered(self, buffer: B) -> Reply<'o, BufferedReaddir> + where + B: BufMut + AsRef<[u8]>, + { + assert!(buffer.as_ref().is_empty()); + + let ReaddirState { + max_read, + is_plus, + buffer: (), + } = self.tail; + + Reply { + session: self.session, + unique: self.unique, + tail: ReaddirState { + max_read, + is_plus, + buffer, + }, + } + } +} + +impl<'o, B: BufMut + AsRef<[u8]>> Reply<'o, BufferedReaddir> { + pub fn entry(mut self, entry: Entry) -> Interruptible<'o, BufferedReaddir, ()> { + let entry_header_len = if self.tail.is_plus { + std::mem::size_of::() + } else { + std::mem::size_of::() + }; + + let name = entry.name.as_bytes(); + let padding_len = dirent_pad_bytes(entry_header_len + name.len()); + + let buffer = &mut self.tail.buffer; + let remaining = buffer + .remaining_mut() + .min(self.tail.max_read - buffer.as_ref().len()); + + let record_len = entry_header_len + name.len() + padding_len; + if remaining < record_len { + if buffer.as_ref().is_empty() { + log::error!("Buffer for readdir req #{} is too small", self.unique); + return Interruptible::Interrupted(self.fail(Errno::ENOBUFS)); + } + + return Interruptible::Interrupted(self.end()); + } + + let inode = entry.inode.inode(); + let entry_type = match inode.inode_type() { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }; + + let ino = inode.ino(); + let dirent = proto::Dirent { + ino: ino.as_raw(), + off: entry.offset, + namelen: name.len().try_into().unwrap(), + entry_type: entry_type.bits() >> 12, + }; + + enum Ent { + Dirent(proto::Dirent), + DirentPlus(proto::DirentPlus), + } + + let ent = if self.tail.is_plus { + let (attrs, attrs_ttl) = inode.attrs(); + let attrs = attrs.finish(inode); + let entry_out = make_entry((ino, entry.ttl), (attrs, attrs_ttl)); + + if name != ".".as_bytes() && name != "..".as_bytes() { + entry.inode.unveil(); + } + + Ent::DirentPlus(proto::DirentPlus { entry_out, dirent }) + } else { + Ent::Dirent(dirent) + }; + + let entry_header = match &ent { + Ent::Dirent(dirent) => bytes_of(dirent), + Ent::DirentPlus(dirent_plus) => bytes_of(dirent_plus), + }; + + buffer.put_slice(entry_header); + buffer.put_slice(name); + buffer.put_slice(&[0; 7][..padding_len]); + + if remaining - record_len >= entry_header.len() + (1 << proto::DIRENT_ALIGNMENT_BITS) { + Interruptible::Completed(self, ()) + } else { + Interruptible::Interrupted(self.end()) + } + } + + pub fn end(self) -> Done<'o> { + self.inner(|this| this.tail.buffer.as_ref()) + } +} + +impl<'o> FromRequest<'o, Readdir> for ReaddirState<()> { + fn from_request(request: &Request<'o, Readdir>) -> Self { + ReaddirState { + max_read: request.size() as usize, + is_plus: matches!(request.body, proto::OpcodeSelect::Match(_)), + buffer: (), + } + } +} + +fn make_entry( + (Ino(ino), entry_ttl): (Ino, Ttl), + (attrs, attr_ttl): (proto::Attrs, Ttl), +) -> proto::EntryOut { + proto::EntryOut { + nodeid: ino, + generation: 0, //TODO + entry_valid: entry_ttl.seconds, + attr_valid: attr_ttl.seconds, + entry_valid_nsec: entry_ttl.nanoseconds, + attr_valid_nsec: attr_ttl.nanoseconds, + attr: attrs, + } +} + +fn dirent_pad_bytes(entry_len: usize) -> usize { + const ALIGN_MASK: usize = (1 << proto::DIRENT_ALIGNMENT_BITS) - 1; + ((entry_len + ALIGN_MASK) & !ALIGN_MASK) - entry_len +} diff --git a/src/ops/entry.rs b/src/ops/entry.rs new file mode 100644 index 0000000..d3e2b17 --- /dev/null +++ b/src/ops/entry.rs @@ -0,0 +1,79 @@ +use crate::{io::Stat, private_trait::Sealed, proto, Done, Ino, Operation, Reply, Request}; + +pub enum Forget {} +pub enum Getattr {} + +impl Sealed for Forget {} +impl Sealed for Getattr {} + +impl<'o> Operation<'o> for Forget { + type RequestBody = proto::OpcodeSelect< + (&'o proto::BatchForgetIn, &'o [proto::ForgetOne]), + &'o proto::ForgetIn, + { proto::Opcode::BatchForget as u32 }, + >; + + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Getattr { + type RequestBody = &'o proto::GetattrIn; + type ReplyTail = (); +} + +impl<'o> Request<'o, Forget> { + pub fn forget_list(&self) -> impl '_ + Iterator { + use proto::OpcodeSelect::*; + + enum List<'a> { + Single(Option<(Ino, u64)>), + Batch(std::slice::Iter<'a, proto::ForgetOne>), + } + + impl Iterator for List<'_> { + type Item = (Ino, u64); + + fn next(&mut self) -> Option { + match self { + List::Single(single) => single.take(), + List::Batch(batch) => { + let forget = batch.next()?; + Some((Ino(forget.ino), forget.nlookup)) + } + } + } + } + + match self.body { + Match((_, slice)) => List::Batch(slice.iter()), + Alt(single) => List::Single(Some((self.ino(), single.nlookup))), + } + } +} + +impl<'o> Reply<'o, Forget> { + pub fn ok(self) -> Done<'o> { + // No reply for forget requests + Done::new() + } +} + +impl<'o> Request<'o, Getattr> { + pub fn handle(&self) -> u64 { + self.body.fh + } +} + +impl<'o> Reply<'o, Getattr> { + pub fn known(self, inode: &impl Stat) -> Done<'o> { + let (attrs, ttl) = inode.attrs(); + let attrs = attrs.finish(inode); + + self.single(&proto::AttrOut { + attr_valid: ttl.seconds, + attr_valid_nsec: ttl.nanoseconds, + dummy: Default::default(), + attr: attrs, + }) + } +} diff --git a/src/ops/global.rs b/src/ops/global.rs new file mode 100644 index 0000000..cd6b260 --- /dev/null +++ b/src/ops/global.rs @@ -0,0 +1,87 @@ +use crate::{io::FsInfo, private_trait::Sealed, proto, util::page_size, Done, Operation, Reply}; + +pub enum Init {} +pub enum Statfs {} + +pub struct InitState { + pub(crate) kernel_flags: proto::InitFlags, + pub(crate) buffer_pages: usize, +} + +impl Sealed for Init {} +impl Sealed for Statfs {} + +impl<'o> Operation<'o> for Init { + type RequestBody = &'o proto::InitIn; + type ReplyTail = InitState; +} + +impl<'o> Operation<'o> for Statfs { + type RequestBody = (); + type ReplyTail = (); +} + +impl<'o> Reply<'o, Init> { + pub fn ok(self) -> Done<'o> { + let InitState { + kernel_flags, + buffer_pages, + } = self.tail; + + let flags = { + use proto::InitFlags; + + //TODO: Conditions for these feature flags + // - Locks + // - ASYNC_DIO + // - WRITEBACK_CACHE + // - NO_OPEN_SUPPORT + // - HANDLE_KILLPRIV + // - POSIX_ACL + // - NO_OPENDIR_SUPPORT + // - EXPLICIT_INVAL_DATA + + let supported = InitFlags::ASYNC_READ + | InitFlags::FILE_OPS + | InitFlags::ATOMIC_O_TRUNC + | InitFlags::EXPORT_SUPPORT + | InitFlags::BIG_WRITES + | InitFlags::HAS_IOCTL_DIR + | InitFlags::AUTO_INVAL_DATA + | InitFlags::DO_READDIRPLUS + | InitFlags::READDIRPLUS_AUTO + | InitFlags::PARALLEL_DIROPS + | InitFlags::ABORT_ERROR + | InitFlags::MAX_PAGES + | InitFlags::CACHE_SYMLINKS; + + kernel_flags & supported + }; + + let buffer_size = page_size() * buffer_pages; + + // See fs/fuse/dev.c in the kernel source tree for details about max_write + let max_write = buffer_size - std::mem::size_of::<(proto::InHeader, proto::WriteIn)>(); + + self.single(&proto::InitOut { + major: proto::MAJOR_VERSION, + minor: proto::TARGET_MINOR_VERSION, + max_readahead: 0, //TODO + flags: flags.bits(), + max_background: 0, //TODO + congestion_threshold: 0, //TODO + max_write: max_write.try_into().unwrap(), + time_gran: 1, //TODO + max_pages: buffer_pages.try_into().unwrap(), + padding: Default::default(), + unused: Default::default(), + }) + } +} + +impl<'o> Reply<'o, Statfs> { + /// Replies with filesystem statistics. + pub fn info(self, statfs: &FsInfo) -> Done<'o> { + self.single(&proto::StatfsOut::from(*statfs)) + } +} diff --git a/src/ops/mod.rs b/src/ops/mod.rs new file mode 100644 index 0000000..13d146d --- /dev/null +++ b/src/ops/mod.rs @@ -0,0 +1,67 @@ +use std::{ + ffi::{CStr, OsStr}, + os::unix::ffi::OsStrExt, +}; + +use crate::{private_trait::Sealed, util::OutputChain, Done, Operation, Reply, Request}; +use bytemuck::{bytes_of, Pod}; + +mod dir; +mod entry; +mod global; +mod open; +mod rw; +mod xattr; + +pub use dir::{BufferedReaddir, Lookup, Readdir}; +pub use entry::{Forget, Getattr}; +pub use global::{Init, Statfs}; +pub use open::{Access, Open, Opendir, Release, Releasedir}; +pub use rw::{Flush, Read, Readlink, Write}; +pub use xattr::{Getxattr, Listxattr, Removexattr, Setxattr}; + +pub(crate) use global::InitState; + +pub trait FromRequest<'o, O: Operation<'o>> { + //TODO: Shouldn't be public + fn from_request(request: &Request<'o, O>) -> Self; +} + +pub enum Any {} + +impl Sealed for Any {} + +impl<'o> Operation<'o> for Any { + type RequestBody = (); + type ReplyTail = (); +} + +impl<'o, O: Operation<'o>> FromRequest<'o, O> for () { + fn from_request(_request: &Request<'o, O>) -> Self {} +} + +impl<'o, O: Operation<'o>> Reply<'o, O> { + fn empty(self) -> Done<'o> { + self.chain(OutputChain::empty()) + } + + fn single(self, single: &P) -> Done<'o> { + self.chain(OutputChain::tail(&[bytes_of(single)])) + } + + fn inner(self, deref: impl FnOnce(&Self) -> &[u8]) -> Done<'o> { + let result = self + .session + .ok(self.unique, OutputChain::tail(&[deref(&self)])); + self.finish(result) + } + + fn chain(self, chain: OutputChain<'_>) -> Done<'o> { + let result = self.session.ok(self.unique, chain); + self.finish(result) + } +} + +fn c_to_os(c_str: &CStr) -> &OsStr { + OsStr::from_bytes(c_str.to_bytes()) +} diff --git a/src/ops/open.rs b/src/ops/open.rs new file mode 100644 index 0000000..9123421 --- /dev/null +++ b/src/ops/open.rs @@ -0,0 +1,130 @@ +use crate::{ + io::{AccessFlags, OpenFlags}, + private_trait::Sealed, + proto, Done, Errno, Operation, Reply, Request, +}; + +use super::FromRequest; + +pub enum Open {} +pub enum Release {} +pub enum Opendir {} +pub enum Releasedir {} +pub enum Access {} + +impl Sealed for Open {} +impl Sealed for Release {} +impl Sealed for Opendir {} +impl Sealed for Releasedir {} +impl Sealed for Access {} + +impl<'o> Operation<'o> for Open { + type RequestBody = &'o proto::OpenIn; + type ReplyTail = proto::OpenOutFlags; +} + +impl<'o> Operation<'o> for Release { + type RequestBody = &'o proto::ReleaseIn; + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Opendir { + type RequestBody = &'o proto::OpendirIn; + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Releasedir { + type RequestBody = &'o proto::ReleasedirIn; + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Access { + type RequestBody = &'o proto::AccessIn; + type ReplyTail = (); +} + +impl<'o> Request<'o, Open> { + pub fn flags(&self) -> OpenFlags { + OpenFlags::from_bits_truncate(self.body.flags.try_into().unwrap_or_default()) + } +} + +impl<'o> Reply<'o, Open> { + pub fn force_direct_io(&mut self) { + self.tail |= proto::OpenOutFlags::DIRECT_IO; + } + + pub fn ok(self) -> Done<'o> { + self.ok_with_handle(0) + } + + pub fn ok_with_handle(self, handle: u64) -> Done<'o> { + let open_flags = self.tail.bits(); + + self.single(&proto::OpenOut { + fh: handle, + open_flags, + padding: Default::default(), + }) + } +} + +impl<'o> Request<'o, Release> { + pub fn handle(&self) -> u64 { + self.body.fh + } +} + +impl<'o> Reply<'o, Release> { + pub fn ok(self) -> Done<'o> { + self.empty() + } +} + +impl<'o> Reply<'o, Opendir> { + pub fn ok(self) -> Done<'o> { + self.ok_with_handle(0) + } + + pub fn ok_with_handle(self, handle: u64) -> Done<'o> { + self.single(&proto::OpenOut { + fh: handle, + open_flags: 0, + padding: Default::default(), + }) + } +} + +impl<'o> Request<'o, Releasedir> { + pub fn handle(&self) -> u64 { + self.body.release_in.fh + } +} + +impl<'o> Reply<'o, Releasedir> { + pub fn ok(self) -> Done<'o> { + self.empty() + } +} + +impl<'o> Request<'o, Access> { + pub fn mask(&self) -> AccessFlags { + AccessFlags::from_bits_truncate(self.body.mask as i32) + } +} + +impl<'o> Reply<'o, Access> { + pub fn ok(self) -> Done<'o> { + self.empty() + } + + pub fn permission_denied(self) -> Done<'o> { + self.fail(Errno::EACCES) + } +} + +impl<'o> FromRequest<'o, Open> for proto::OpenOutFlags { + fn from_request(_request: &Request<'o, Open>) -> Self { + proto::OpenOutFlags::empty() + } +} diff --git a/src/ops/rw.rs b/src/ops/rw.rs new file mode 100644 index 0000000..b1c184b --- /dev/null +++ b/src/ops/rw.rs @@ -0,0 +1,122 @@ +use super::FromRequest; +use crate::{private_trait::Sealed, proto, util::OutputChain, Done, Operation, Reply, Request}; +use std::{ffi::OsStr, os::unix::ffi::OsStrExt}; + +pub enum Readlink {} +pub enum Read {} +pub enum Write {} +pub enum Flush {} + +pub struct WriteState { + size: u32, +} + +impl Sealed for Readlink {} +impl Sealed for Read {} +impl Sealed for Write {} +impl Sealed for Flush {} + +impl<'o> Operation<'o> for Readlink { + type RequestBody = (); + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Read { + type RequestBody = &'o proto::ReadIn; + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Write { + type RequestBody = (&'o proto::WriteIn, &'o [u8]); + type ReplyTail = WriteState; +} + +impl<'o> Operation<'o> for Flush { + type RequestBody = &'o proto::FlushIn; + type ReplyTail = (); +} + +impl<'o> Reply<'o, Readlink> { + /// This inode corresponds to a symbolic link pointing to the given target path. + pub fn target>(self, target: T) -> Done<'o> { + self.chain(OutputChain::tail(&[target.as_ref().as_bytes()])) + } + + /// Same as [`Reply::target()`], except that the target path is taken from disjoint + /// slices. This involves no additional allocation. + pub fn gather_target(self, target: &[&[u8]]) -> Done<'o> { + self.chain(OutputChain::tail(target)) + } +} + +impl<'o> Request<'o, Read> { + pub fn handle(&self) -> u64 { + self.body.fh + } + + pub fn offset(&self) -> u64 { + self.body.offset + } + + pub fn size(&self) -> u32 { + self.body.size + } +} + +impl<'o> Reply<'o, Read> { + pub fn slice(self, data: &[u8]) -> Done<'o> { + self.chain(OutputChain::tail(&[data])) + } +} + +impl<'o> Request<'o, Write> { + pub fn handle(&self) -> u64 { + self.body.0.fh + } + + pub fn offset(&self) -> u64 { + self.body.0.offset + } + + pub fn data(&self) -> &[u8] { + self.body.1 + } +} + +impl<'o> Reply<'o, Write> { + pub fn all(self) -> Done<'o> { + let size = self.tail.size; + self.single(&proto::WriteOut { + size, + padding: Default::default(), + }) + } +} + +impl<'o> Request<'o, Flush> { + pub fn handle(&self) -> u64 { + self.body.fh + } +} + +impl<'o> Reply<'o, Flush> { + pub fn ok(self) -> Done<'o> { + self.empty() + } +} + +impl<'o> FromRequest<'o, Write> for WriteState { + fn from_request(request: &Request<'o, Write>) -> Self { + let (body, data) = request.body; + + if body.size as usize != data.len() { + log::warn!( + "Write size={} differs from data.len={}", + body.size, + data.len() + ); + } + + WriteState { size: body.size } + } +} diff --git a/src/ops/xattr.rs b/src/ops/xattr.rs new file mode 100644 index 0000000..886b290 --- /dev/null +++ b/src/ops/xattr.rs @@ -0,0 +1,141 @@ +use crate::{ + private_trait::Sealed, proto, util::OutputChain, Done, Errno, Operation, Reply, Request, +}; + +use super::c_to_os; +use std::ffi::{CStr, OsStr}; + +pub enum Setxattr {} +pub enum Getxattr {} +pub enum Listxattr {} +pub enum Removexattr {} + +pub struct XattrReadState { + size: u32, +} + +impl Sealed for Setxattr {} +impl Sealed for Getxattr {} +impl Sealed for Listxattr {} +impl Sealed for Removexattr {} + +impl<'o> Operation<'o> for Setxattr { + // header, name, value + type RequestBody = (&'o proto::SetxattrIn, &'o CStr, &'o [u8]); + type ReplyTail = (); +} + +impl<'o> Operation<'o> for Getxattr { + type RequestBody = (&'o proto::GetxattrIn, &'o CStr); + type ReplyTail = XattrReadState; +} + +impl<'o> Operation<'o> for Listxattr { + type RequestBody = &'o proto::ListxattrIn; + type ReplyTail = XattrReadState; +} + +impl<'o> Operation<'o> for Removexattr { + type RequestBody = &'o CStr; + type ReplyTail = (); +} + +//TODO: flags +impl<'o> Request<'o, Setxattr> { + pub fn name(&self) -> &OsStr { + let (_header, name, _value) = self.body; + c_to_os(name) + } + + pub fn value(&self) -> &[u8] { + let (_header, _name, value) = self.body; + value + } +} + +impl<'o> Reply<'o, Setxattr> { + pub fn ok(self) -> Done<'o> { + self.empty() + } + + pub fn not_found(self) -> Done<'o> { + self.fail(Errno::ENODATA) + } +} + +impl<'o> Request<'o, Getxattr> { + pub fn size(&self) -> u32 { + self.body.0.size + } + + pub fn name(&self) -> &OsStr { + c_to_os(self.body.1) + } +} + +impl<'o> Reply<'o, Getxattr> { + pub fn slice(self, value: &[u8]) -> Done<'o> { + let size = value.len().try_into().expect("Extremely large xattr"); + if self.tail.size == 0 { + return self.value_size(size); + } else if self.tail.size < size { + return self.buffer_too_small(); + } + + self.chain(OutputChain::tail(&[value])) + } + + pub fn value_size(self, size: u32) -> Done<'o> { + assert_eq!(self.tail.size, 0); + + self.single(&proto::GetxattrOut { + size, + padding: Default::default(), + }) + } + + pub fn buffer_too_small(self) -> Done<'o> { + self.fail(Errno::ERANGE) + } + + pub fn not_found(self) -> Done<'o> { + self.fail(Errno::ENODATA) + } +} + +impl<'o> Request<'o, Listxattr> { + pub fn size(&self) -> u32 { + self.body.getxattr_in.size + } +} + +impl<'o> Reply<'o, Listxattr> { + //TODO: buffered(), gather() + + pub fn value_size(self, size: u32) -> Done<'o> { + assert_eq!(self.tail.size, 0); + + self.single(&proto::ListxattrOut { + getxattr_out: proto::GetxattrOut { + size, + padding: Default::default(), + }, + }) + } + + pub fn buffer_too_small(self) -> Done<'o> { + self.fail(Errno::ERANGE) + } +} + +impl<'o> Request<'o, Removexattr> { + pub fn name(&self) -> &OsStr { + c_to_os(self.body) + } +} + +impl<'o> Reply<'o, Removexattr> { + pub fn ok(self) -> Done<'o> { + self.empty() + } +} diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..e83a8d4 --- /dev/null +++ b/src/session.rs @@ -0,0 +1,559 @@ +use std::{ + future::Future, + io, + marker::PhantomData, + ops::ControlFlow, + os::unix::io::{IntoRawFd, RawFd}, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use nix::{ + fcntl::{fcntl, FcntlArg, OFlag}, + sys::uio::{writev, IoVec}, + unistd::read, +}; + +use tokio::{ + io::unix::AsyncFd, + sync::{broadcast, OwnedSemaphorePermit, Semaphore}, +}; + +use bytemuck::bytes_of; +use smallvec::SmallVec; + +use crate::{ + mount::{unmount_sync, MountError}, + proto::{self, InHeader, Structured}, + util::{page_size, DumbFd, OutputChain}, + Errno, FuseError, FuseResult, +}; + +use super::{ + ops::{self, FromRequest}, + Done, Op, Operation, Reply, Request, +}; + +pub struct Start { + session_fd: DumbFd, + mountpoint: PathBuf, +} + +pub struct Session { + session_fd: AsyncFd, + interrupt_tx: broadcast::Sender, + buffers: Mutex>, + buffer_semaphore: Arc, + buffer_pages: usize, + mountpoint: Mutex>, +} + +pub struct Endpoint<'a> { + session: &'a Arc, + local_buffer: Buffer, +} + +pub enum Dispatch<'o> { + Lookup(Incoming<'o, ops::Lookup>), + Forget(Incoming<'o, ops::Forget>), + Getattr(Incoming<'o, ops::Getattr>), + Readlink(Incoming<'o, ops::Readlink>), + Open(Incoming<'o, ops::Open>), + Read(Incoming<'o, ops::Read>), + Write(Incoming<'o, ops::Write>), + Statfs(Incoming<'o, ops::Statfs>), + Release(Incoming<'o, ops::Release>), + Setxattr(Incoming<'o, ops::Setxattr>), + Getxattr(Incoming<'o, ops::Getxattr>), + Listxattr(Incoming<'o, ops::Listxattr>), + Removexattr(Incoming<'o, ops::Removexattr>), + Flush(Incoming<'o, ops::Flush>), + Opendir(Incoming<'o, ops::Opendir>), + Readdir(Incoming<'o, ops::Readdir>), + Releasedir(Incoming<'o, ops::Releasedir>), + Access(Incoming<'o, ops::Access>), +} + +pub struct Incoming<'o, O: Operation<'o>> { + common: IncomingCommon<'o>, + _phantom: PhantomData, +} + +pub struct Owned { + session: Arc, + buffer: Buffer, + header: InHeader, + _permit: OwnedSemaphorePermit, + _phantom: PhantomData, +} + +impl Session { + // Does not seem like 'a can be elided here + #[allow(clippy::needless_lifetimes)] + pub fn endpoint<'a>(self: &'a Arc) -> Endpoint<'a> { + Endpoint { + session: self, + local_buffer: Buffer::new(self.buffer_pages), + } + } + + pub fn unmount_sync(&self) -> Result<(), MountError> { + let mountpoint = self.mountpoint.lock().unwrap().take(); + if let Some(mountpoint) = &mountpoint { + unmount_sync(mountpoint)?; + } + + Ok(()) + } + + pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { + self.send(unique, 0, output) + } + + pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> { + if errno <= 0 { + log::warn!( + "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", + unique, + errno + ); + + errno = Errno::ENOMSG as i32; + } + + self.send(unique, -errno, OutputChain::empty()) + } + + pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver { + self.interrupt_tx.subscribe() + } + + async fn handshake(&mut self, buffer: &mut Buffer, init: F) -> FuseResult> + where + F: FnOnce(Op<'_, ops::Init>) -> Done<'_>, + { + self.session_fd.readable().await?.retain_ready(); + let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?; + + let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?; + let body = match opcode { + proto::Opcode::Init => { + <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes], &header)? + } + + _ => { + log::error!("First message from kernel is not Init, but {:?}", opcode); + return Err(FuseError::ProtocolInit); + } + }; + + use std::cmp::Ordering; + let supported = match body.major.cmp(&proto::MAJOR_VERSION) { + Ordering::Less => false, + Ordering::Equal => body.minor >= proto::REQUIRED_MINOR_VERSION, + Ordering::Greater => { + let tail = [bytes_of(&proto::MAJOR_VERSION)]; + self.ok(header.unique, OutputChain::tail(&tail))?; + + return Ok(Handshake::Restart(init)); + } + }; + + //TODO: fake some decency by supporting a few older minor versions + if !supported { + log::error!( + "Unsupported protocol {}.{}; this build requires \ + {major}.{}..={major}.{} (or a greater version \ + through compatibility)", + body.major, + body.minor, + proto::REQUIRED_MINOR_VERSION, + proto::TARGET_MINOR_VERSION, + major = proto::MAJOR_VERSION + ); + + self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?; + return Err(FuseError::ProtocolInit); + } + + let request = Request { header, body }; + let reply = Reply { + session: self, + unique: header.unique, + tail: ops::InitState { + kernel_flags: proto::InitFlags::from_bits_truncate(body.flags), + buffer_pages: self.buffer_pages, + }, + }; + + init((request, reply)).consume(); + Ok(Handshake::Done) + } + + fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { + let after_header: usize = output + .iter() + .flat_map(<[_]>::iter) + .copied() + .map(<[_]>::len) + .sum(); + + let length = (std::mem::size_of::() + after_header) as _; + let header = proto::OutHeader { + len: length, + error, + unique, + }; + + //TODO: Full const generics any time now? Fs::EXPECTED_REQUEST_SEGMENTS + let header = [bytes_of(&header)]; + let output = output.preceded(&header); + let buffers: SmallVec<[_; 8]> = output + .iter() + .flat_map(<[_]>::iter) + .copied() + .filter(|slice| !slice.is_empty()) + .map(IoVec::from_slice) + .collect(); + + let written = writev(*self.session_fd.get_ref(), &buffers).map_err(io::Error::from)?; + if written == length as usize { + Ok(()) + } else { + Err(FuseError::ShortWrite) + } + } +} + +impl Drop for Start { + fn drop(&mut self) { + if !self.mountpoint.as_os_str().is_empty() { + let _ = unmount_sync(&self.mountpoint); + } + } +} + +impl Drop for Session { + fn drop(&mut self) { + if let Some(mountpoint) = self.mountpoint.get_mut().unwrap().take() { + let _ = unmount_sync(&mountpoint); + } + + drop(DumbFd(*self.session_fd.get_ref())); // Close + } +} + +impl<'o> Dispatch<'o> { + pub fn op(self) -> Op<'o> { + use Dispatch::*; + + let common = match self { + Lookup(incoming) => incoming.common, + Forget(incoming) => incoming.common, + Getattr(incoming) => incoming.common, + Readlink(incoming) => incoming.common, + Open(incoming) => incoming.common, + Read(incoming) => incoming.common, + Write(incoming) => incoming.common, + Statfs(incoming) => incoming.common, + Release(incoming) => incoming.common, + Setxattr(incoming) => incoming.common, + Getxattr(incoming) => incoming.common, + Listxattr(incoming) => incoming.common, + Removexattr(incoming) => incoming.common, + Flush(incoming) => incoming.common, + Opendir(incoming) => incoming.common, + Readdir(incoming) => incoming.common, + Releasedir(incoming) => incoming.common, + Access(incoming) => incoming.common, + }; + + common.into_generic_op() + } +} + +impl Endpoint<'_> { + pub async fn receive<'o, F, Fut>(&'o mut self, dispatcher: F) -> FuseResult> + where + F: FnOnce(Dispatch<'o>) -> Fut, + Fut: Future>, + { + let buffer = &mut self.local_buffer.0; + let bytes = loop { + let session_fd = &self.session.session_fd; + + let mut readable = tokio::select! { + readable = session_fd.readable() => readable?, + + _ = session_fd.writable() => { + self.session.mountpoint.lock().unwrap().take(); + return Ok(ControlFlow::Break(())); + } + }; + + let mut read = |fd: &AsyncFd| read(*fd.get_ref(), buffer); + let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { + Ok(result) => result, + Err(_) => continue, + }; + + match result { + // Interrupted + //TODO: libfuse docs say that this has some side effects + Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue, + + result => break result, + } + }; + + let (header, opcode) = InHeader::from_bytes(&buffer[..bytes?])?; + let common = IncomingCommon { + session: self.session, + buffer: &mut self.local_buffer, + header, + }; + + let dispatch = { + use proto::Opcode::*; + + macro_rules! dispatch { + ($op:ident) => { + Dispatch::$op(Incoming { + common, + _phantom: PhantomData, + }) + }; + } + + match opcode { + Destroy => return Ok(ControlFlow::Break(())), + + Lookup => dispatch!(Lookup), + Forget => dispatch!(Forget), + Getattr => dispatch!(Getattr), + Readlink => dispatch!(Readlink), + Open => dispatch!(Open), + Read => dispatch!(Read), + Write => dispatch!(Write), + Statfs => dispatch!(Statfs), + Release => dispatch!(Release), + Setxattr => dispatch!(Setxattr), + Getxattr => dispatch!(Getxattr), + Listxattr => dispatch!(Listxattr), + Removexattr => dispatch!(Removexattr), + Flush => dispatch!(Flush), + Opendir => dispatch!(Opendir), + Readdir => dispatch!(Readdir), + Releasedir => dispatch!(Releasedir), + Access => dispatch!(Access), + BatchForget => dispatch!(Forget), + ReaddirPlus => dispatch!(Readdir), + + _ => { + log::warn!("Not implemented: {}", common.header); + + let (_request, reply) = common.into_generic_op(); + reply.not_implemented().consume(); + + return Ok(ControlFlow::Continue(())); + } + } + }; + + dispatcher(dispatch).await.consume(); + Ok(ControlFlow::Continue(())) + } +} + +impl Start { + pub async fn start(mut self, mut init: F) -> FuseResult> + where + F: FnOnce(Op<'_, ops::Init>) -> Done<'_>, + { + let mountpoint = std::mem::take(&mut self.mountpoint); + let session_fd = self.session_fd.take().into_raw_fd(); + + let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; + fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); + + let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); + + let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO + let buffer_count = SHARED_BUFFERS; //TODO + let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages)) + .take(buffer_count) + .collect(); + + let mut session = Session { + session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, + interrupt_tx, + buffers: Mutex::new(buffers), + buffer_semaphore: Arc::new(Semaphore::new(buffer_count)), + buffer_pages, + mountpoint: Mutex::new(Some(mountpoint)), + }; + + let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap(); + + loop { + init = match session.handshake(&mut init_buffer, init).await? { + Handshake::Restart(init) => init, + Handshake::Done => { + session.buffers.get_mut().unwrap().push(init_buffer); + break Ok(Arc::new(session)); + } + }; + } + } + + pub fn unmount_sync(mut self) -> Result<(), MountError> { + // This prevents Start::drop() from unmounting a second time + let mountpoint = std::mem::take(&mut self.mountpoint); + unmount_sync(&mountpoint) + } + + pub(crate) fn new(session_fd: DumbFd, mountpoint: PathBuf) -> Self { + Start { + session_fd, + mountpoint, + } + } +} + +impl<'o, O: Operation<'o>> Incoming<'o, O> +where + O::ReplyTail: FromRequest<'o, O>, +{ + pub fn op(self) -> Result, Done<'o>> { + try_op( + self.common.session, + &self.common.buffer.0, + self.common.header, + ) + } + + pub async fn owned(self) -> (Done<'o>, Owned) { + let session = self.common.session; + + let (buffer, permit) = { + let semaphore = Arc::clone(&session.buffer_semaphore); + let permit = semaphore + .acquire_owned() + .await + .expect("Buffer semaphore error"); + + let mut buffers = session.buffers.lock().unwrap(); + let buffer = buffers.pop().expect("Buffer semaphore out of sync"); + let buffer = std::mem::replace(self.common.buffer, buffer); + + (buffer, permit) + }; + + let owned = Owned { + session: Arc::clone(session), + buffer, + header: self.common.header, + _permit: permit, + _phantom: PhantomData, + }; + + (Done::new(), owned) + } +} + +impl Operation<'o>> Owned +where + for<'o> >::ReplyTail: FromRequest<'o, O>, +{ + pub async fn op<'o, F, Fut>(&'o self, handler: F) + where + F: FnOnce(Op<'o, O>) -> Fut, + Fut: Future>, + { + match try_op(&self.session, &self.buffer.0, self.header) { + Ok(op) => handler(op).await.consume(), + Err(done) => done.consume(), + } + } +} + +impl Drop for Owned { + fn drop(&mut self) { + if let Ok(mut buffers) = self.session.buffers.lock() { + let empty = Buffer(Vec::new().into_boxed_slice()); + buffers.push(std::mem::replace(&mut self.buffer, empty)); + } + } +} + +const INTERRUPT_BROADCAST_CAPACITY: usize = 32; +const SHARED_BUFFERS: usize = 32; +const HEADER_END: usize = std::mem::size_of::(); + +struct IncomingCommon<'o> { + session: &'o Arc, + buffer: &'o mut Buffer, + header: InHeader, +} + +enum Handshake { + Done, + Restart(F), +} + +struct Buffer(Box<[u8]>); + +impl<'o> IncomingCommon<'o> { + fn into_generic_op(self) -> Op<'o> { + let request = Request { + header: self.header, + body: (), + }; + + let reply = Reply { + session: self.session, + unique: self.header.unique, + tail: (), + }; + + (request, reply) + } +} + +impl Buffer { + fn new(pages: usize) -> Self { + Buffer(vec![0; pages * page_size()].into_boxed_slice()) + } +} + +fn try_op<'o, O: Operation<'o>>( + session: &'o Session, + bytes: &'o [u8], + header: InHeader, +) -> Result, Done<'o>> +where + O::ReplyTail: FromRequest<'o, O>, +{ + let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize], &header) { + Ok(body) => body, + Err(error) => { + log::error!("Parsing request {}: {}", header, error); + let reply = Reply:: { + session, + unique: header.unique, + tail: (), + }; + + return Err(reply.io_error()); + } + }; + + let request = Request { header, body }; + let reply = Reply { + session, + unique: header.unique, + tail: FromRequest::from_request(&request), + }; + + Ok((request, reply)) +} -- cgit v1.2.3