diff options
Diffstat (limited to '')
| -rw-r--r-- | src/fuse/fs.rs | 127 | ||||
| -rw-r--r-- | src/fuse/io.rs | 319 | ||||
| -rw-r--r-- | src/fuse/mod.rs | 87 | ||||
| -rw-r--r-- | src/fuse/mount.rs | 160 | ||||
| -rw-r--r-- | src/fuse/ops.rs | 366 | ||||
| -rw-r--r-- | src/fuse/session.rs | 569 |
6 files changed, 1628 insertions, 0 deletions
diff --git a/src/fuse/fs.rs b/src/fuse/fs.rs new file mode 100644 index 0000000..4cf3282 --- /dev/null +++ b/src/fuse/fs.rs @@ -0,0 +1,127 @@ +use async_trait::async_trait; +use nix::errno::Errno; + +use std::{ + num::NonZeroUsize, + ops::{Deref, DerefMut}, + sync::Arc, +}; + +use crate::{Ino, TimeToLive}; + +use super::{ + io::{Attrs, EntryType}, + ops::*, + Done, Op, Reply, +}; + +#[async_trait] +pub trait Fuse: Sized + Send + Sync + 'static { + type Inode: Inode<Fuse = Self> + ?Sized; + type Farc: Deref<Target = Self::Inode> + Clone + Send + Sync = Arc<Self::Inode>; + + async fn init<'o>(&self, reply: Reply<'o, Self, Init>) -> Done<'o>; + + async fn statfs<'o>(&self, (_, reply, _): Op<'o, Self, Statfs>) -> Done<'o> { + reply.not_implemented() + } + + fn request_buffers(&self) -> NonZeroUsize { + NonZeroUsize::new(16).unwrap() + } + + fn request_buffer_pages(&self) -> NonZeroUsize { + NonZeroUsize::new(4).unwrap() + } +} + +#[async_trait] +pub trait Inode: Send + Sync { + type Fuse: Fuse<Inode = Self>; + + fn ino(self: &FarcTo<Self>) -> Ino; + fn attrs(self: &FarcTo<Self>) -> (Attrs, TimeToLive); + fn inode_type(self: &FarcTo<Self>) -> EntryType; + + fn direct_io<'o>(self: &FarcTo<Self>) -> bool { + false + } + + fn access<'o>(self: &FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Access>) -> Done<'o> { + reply.not_implemented() + } + + async fn lookup<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Lookup>) -> Done<'o> { + reply.not_implemented() + } + + async fn readlink<'o>( + self: FarcTo<Self>, + (_, reply, _): Op<'o, Self::Fuse, Readlink>, + ) -> Done<'o> { + reply.not_implemented() + } + + async fn open<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Open>) -> Done<'o> { + // Calling not_implemented() here would ignore direct_io() and similar flags + reply.ok() + } + + async fn opendir<'o>( + self: FarcTo<Self>, + (_, reply, _): Op<'o, Self::Fuse, Opendir>, + ) -> Done<'o> { + reply.not_implemented() + } + + async fn readdir<'o>( + self: FarcTo<Self>, + (_, reply, _): Op<'o, Self::Fuse, Readdir>, + ) -> Done<'o> { + reply.not_implemented() + } +} + +#[async_trait] +pub trait Tape: Send + Sync { + type Fuse: Fuse; + + async fn seek(self: &mut Head<Self>, offset: u64) -> Result<(), Errno>; + + async fn rewind(self: &mut Head<Self>) -> Result<(), Errno> { + self.seek(0).await + } + + async fn read<'o>(self: &mut Head<Self>, (_, reply, _): Op<'o, Self::Fuse, Read>) -> Done<'o> { + reply.not_implemented() + } + + async fn write<'o>( + self: &mut Head<Self>, + (_, reply, _): Op<'o, Self::Fuse, Write>, + ) -> Done<'o> { + reply.not_implemented() + } +} + +pub type FarcTo<I> = <<I as Inode>::Fuse as Fuse>::Farc; + +pub struct Head<T: Tape + ?Sized> { + offset: u64, + inode: <T::Fuse as Fuse>::Farc, + tape: T, +} + +impl<T: Tape + ?Sized> Deref for Head<T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.tape + } +} + +impl<T: Tape + ?Sized> DerefMut for Head<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.tape + } +} diff --git a/src/fuse/io.rs b/src/fuse/io.rs new file mode 100644 index 0000000..450d9f7 --- /dev/null +++ b/src/fuse/io.rs @@ -0,0 +1,319 @@ +use bytemuck::Zeroable; +use nix::{errno::Errno, sys::stat::SFlag}; + +use std::{ + borrow::Cow, + convert::Infallible, + ffi::OsStr, + future::Future, + ops::{ControlFlow, FromResidual, Try}, +}; + +use crate::{proto, Ino, TimeToLive, Timestamp}; + +use super::{ + fs::{Fuse, Inode}, + session, Done, Operation, Reply, Request, +}; + +#[doc(no_inline)] +pub use nix::{ + dir::Type as EntryType, + sys::stat::Mode, + unistd::{AccessFlags, Gid, Pid, Uid}, +}; + +pub enum Interruptible<'o, Fs: Fuse, O: Operation<'o, Fs>, T> { + Completed(Reply<'o, Fs, O>, T), + Interrupted(Done<'o>), +} + +#[derive(Clone)] +pub struct Attrs(proto::Attrs); + +pub struct Entry<'a, Ref> { + pub offset: u64, + pub name: Cow<'a, OsStr>, + pub inode: Ref, + pub ttl: TimeToLive, +} + +pub struct FsInfo(proto::StatfsOut); + +impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> { + pub fn ino(&self) -> Ino { + Ino(self.header.ino) + } + + pub fn generation(&self) -> u64 { + 0 + } + + pub fn uid(&self) -> Uid { + Uid::from_raw(self.header.uid) + } + + pub fn gid(&self) -> Gid { + Gid::from_raw(self.header.gid) + } + + pub fn pid(&self) -> Pid { + Pid::from_raw(self.header.pid as i32) + } +} + +impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { + pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, Fs, O, T> + where + F: Future<Output = T>, + { + tokio::pin!(f); + let mut rx = session::interrupt_rx(&self.session); + + use Interruptible::*; + loop { + tokio::select! { + output = &mut f => break Completed(self, output), + + result = rx.recv() => match result { + Ok(unique) if unique == self.unique => { + break Interrupted(self.interrupted()); + } + + _ => continue, + } + } + } + } + + pub fn fallible<T>(self, result: Result<T, Errno>) -> Result<(Self, T), Done<'o>> { + match result { + Ok(t) => Ok((self, t)), + Err(errno) => Err(self.fail(errno)), + } + } + + pub fn fail(mut self, errno: Errno) -> Done<'o> { + let errno = errno as i32; + O::consume_errno(errno, &mut self.tail); + + Done::from_result(session::fail(&self.session, self.unique, errno)) + } + + pub fn not_implemented(self) -> Done<'o> { + self.fail(Errno::ENOSYS) + } + + pub fn io_error(self) -> Done<'o> { + self.fail(Errno::EIO) + } + + pub fn invalid_argument(self) -> Done<'o> { + self.fail(Errno::EINVAL) + } + + pub fn interrupted(self) -> Done<'o> { + self.fail(Errno::EINTR) + } +} + +impl<'o, Fs, O> From<(Reply<'o, Fs, O>, Errno)> for Done<'o> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + fn from((reply, errno): (Reply<'o, Fs, O>, Errno)) -> Done<'o> { + reply.fail(errno) + } +} + +impl<'o> FromResidual<Done<'o>> for Done<'o> { + fn from_residual(residual: Done<'o>) -> Self { + residual + } +} + +impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> { + fn from_residual(residual: Result<Infallible, T>) -> Self { + match residual { + Ok(_) => unreachable!(), + Err(t) => t.into(), + } + } +} + +impl<'o, Fs, O> FromResidual<Interruptible<'o, Fs, O, Infallible>> for Done<'o> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { + match residual { + Interruptible::Completed(_, _) => unreachable!(), + Interruptible::Interrupted(done) => done, + } + } +} + +impl Try for Done<'_> { + type Output = Self; + type Residual = Self; + + fn from_output(output: Self::Output) -> Self { + output + } + + fn branch(self) -> ControlFlow<Self::Residual, Self::Output> { + ControlFlow::Break(self) + } +} + +impl<'o, Fs, O, T> FromResidual<Interruptible<'o, Fs, O, Infallible>> + for Interruptible<'o, Fs, O, T> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { + use Interruptible::*; + + match residual { + Completed(_, _) => unreachable!(), + Interrupted(done) => Interrupted(done), + } + } +} + +impl<'o, Fs, O, T> Try for Interruptible<'o, Fs, O, T> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + type Output = (Reply<'o, Fs, O>, T); + type Residual = Interruptible<'o, Fs, O, Infallible>; + + fn from_output((reply, t): Self::Output) -> Self { + Self::Completed(reply, t) + } + + fn branch(self) -> ControlFlow<Self::Residual, Self::Output> { + use Interruptible::*; + + match self { + Completed(reply, t) => ControlFlow::Continue((reply, t)), + Interrupted(done) => ControlFlow::Break(Interrupted(done)), + } + } +} + +impl Attrs { + pub fn size(self, size: u64) -> Self { + Attrs(proto::Attrs { size, ..self.0 }) + } + + pub fn owner(self, uid: Uid, gid: Gid) -> Self { + Attrs(proto::Attrs { + uid: uid.as_raw(), + gid: gid.as_raw(), + ..self.0 + }) + } + + pub fn mode(self, mode: Mode) -> Self { + Attrs(proto::Attrs { + mode: mode.bits(), + ..self.0 + }) + } + + pub fn blocks(self, blocks: u64, block_size: u32) -> Self { + Attrs(proto::Attrs { + blocks, + blksize: block_size, + ..self.0 + }) + } + + pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self { + Attrs(proto::Attrs { + atime: access.seconds, + mtime: modify.seconds, + ctime: change.seconds, + atimensec: access.nanoseconds, + mtimensec: modify.nanoseconds, + ctimensec: change.nanoseconds, + ..self.0 + }) + } + + pub fn links(self, links: u32) -> Self { + Attrs(proto::Attrs { + nlink: links, + ..self.0 + }) + } + + pub(crate) fn finish<Fs: Fuse>(self, inode: &Fs::Farc) -> proto::Attrs { + let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); + let inode_type = match <Fs as Fuse>::Inode::inode_type(inode) { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }; + + proto::Attrs { + ino, + mode: self.0.mode | inode_type.bits(), + ..self.0 + } + } +} + +impl Default for Attrs { + fn default() -> Self { + Attrs(Zeroable::zeroed()).links(1) + } +} + +impl FsInfo { + pub fn blocks(self, size: u32, total: u64, free: u64, available: u64) -> Self { + FsInfo(proto::StatfsOut { + bsize: size, + blocks: total, + bfree: free, + bavail: available, + ..self.0 + }) + } + + pub fn inodes(self, total: u64, free: u64) -> Self { + FsInfo(proto::StatfsOut { + files: total, + ffree: free, + ..self.0 + }) + } + + pub fn filenames(self, max: u32) -> Self { + FsInfo(proto::StatfsOut { + namelen: max, + ..self.0 + }) + } +} + +impl Default for FsInfo { + fn default() -> Self { + FsInfo(Zeroable::zeroed()) + } +} + +impl From<FsInfo> for proto::StatfsOut { + fn from(FsInfo(statfs): FsInfo) -> proto::StatfsOut { + statfs + } +} diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs new file mode 100644 index 0000000..0e39e6b --- /dev/null +++ b/src/fuse/mod.rs @@ -0,0 +1,87 @@ +use std::{ + collections::HashMap, + marker::PhantomData, + os::unix::io::RawFd, + sync::{Arc, Mutex}, +}; + +use tokio::{ + io::unix::AsyncFd, + sync::{broadcast, Notify, Semaphore}, +}; + +use crate::{proto, util::DumbFd, FuseResult, Ino}; + +pub mod io; + +#[doc(cfg(feature = "server"))] +pub mod fs; + +#[doc(cfg(feature = "server"))] +pub mod ops; + +#[doc(cfg(feature = "mount"))] +pub mod mount; + +mod session; +use fs::Fuse; + +#[doc(cfg(feature = "server"))] +pub struct Session<Fs: Fuse> { + _fusermount_fd: DumbFd, + session_fd: AsyncFd<RawFd>, + proto_minor: u32, + fs: Fs, + input_semaphore: Arc<Semaphore>, + large_buffers: Mutex<Vec<Box<[u8]>>>, + known: Mutex<HashMap<Ino, (Fs::Farc, u64)>>, + destroy: Notify, + interrupt_tx: broadcast::Sender<u64>, +} + +#[doc(cfg(feature = "server"))] +pub struct Start { + fusermount_fd: DumbFd, + session_fd: DumbFd, +} + +mod private_trait { + pub trait Operation<'o, Fs: super::Fuse> { + type RequestBody = (); + type ReplyTail = (); + + fn consume_errno(_errno: i32, _tail: &mut Self::ReplyTail) {} + } +} + +use private_trait::Operation; + +#[doc(cfg(feature = "server"))] +pub type Op<'o, Fs, O> = (Request<'o, Fs, O>, Reply<'o, Fs, O>, &'o Arc<Session<Fs>>); + +#[doc(cfg(feature = "server"))] +pub struct Request<'o, Fs: Fuse, O: Operation<'o, Fs>> { + header: &'o proto::InHeader, + body: O::RequestBody, +} + +#[doc(cfg(feature = "server"))] +pub struct Reply<'o, Fs: Fuse, O: Operation<'o, Fs>> { + session: &'o Session<Fs>, + unique: u64, + tail: O::ReplyTail, +} + +#[must_use] +#[doc(cfg(feature = "server"))] +pub struct Done<'o>(FuseResult<PhantomData<&'o ()>>); + +impl Done<'_> { + fn from_result(result: FuseResult<()>) -> Self { + Done(result.map(|()| PhantomData)) + } + + fn into_result(self) -> FuseResult<()> { + self.0.map(|PhantomData| ()) + } +} diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs new file mode 100644 index 0000000..ebf7e5d --- /dev/null +++ b/src/fuse/mount.rs @@ -0,0 +1,160 @@ +use std::{ + ffi::{OsStr, OsString}, + os::unix::{ + ffi::OsStrExt, + io::{AsRawFd, IntoRawFd, RawFd}, + net::UnixStream, + }, + process::Command, +}; + +use nix::{ + self, cmsg_space, + fcntl::{fcntl, FcntlArg, FdFlag}, + sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}, +}; + +use quick_error::quick_error; + +use super::Start; +use crate::util::{from_nix_error, DumbFd}; + +quick_error! { + #[derive(Debug)] + pub enum MountError { + Io(err: std::io::Error) { from() } + Fusermount { display("fusermount failed") } + } +} + +#[derive(Default)] +pub struct Options(OsString); + +impl Options { + pub fn fs_name<O: AsRef<OsStr>>(&mut self, fs_name: O) -> &mut Self { + self.push_key_value("fsname", fs_name) + } + + pub fn read_only(&mut self) -> &mut Self { + self.push("ro") + } + + pub fn push<O: AsRef<OsStr>>(&mut self, option: O) -> &mut Self { + self.push_parts(&[option.as_ref()]) + } + + pub fn push_key_value<K, V>(&mut self, key: K, value: V) -> &mut Self + where + K: AsRef<OsStr>, + V: AsRef<OsStr>, + { + let (key, value) = (key.as_ref(), value.as_ref()); + + let assert_valid = |part: &OsStr| { + let bytes = part.as_bytes(); + assert!( + !bytes.is_empty() && bytes.iter().all(|b| !matches!(*b, b',' | b'=')), + "invalid key or value: {}", + part.to_string_lossy() + ); + }; + + assert_valid(key); + assert_valid(value); + + self.push_parts(&[key, OsStr::new("="), value]) + } + + fn push_parts(&mut self, segment: &[&OsStr]) -> &mut Self { + if !self.0.is_empty() { + self.0.push(","); + } + + let start = self.0.as_bytes().len(); + segment.iter().for_each(|part| self.0.push(part)); + + let bytes = self.0.as_bytes(); + let last = bytes.len() - 1; + + assert!( + last >= start && bytes[start] != b',' && bytes[last] != b',', + "invalid option string: {}", + OsStr::from_bytes(&bytes[start..]).to_string_lossy() + ); + + self + } +} + +impl<O: AsRef<OsStr>> Extend<O> for Options { + fn extend<I: IntoIterator<Item = O>>(&mut self, iter: I) { + iter.into_iter().for_each(|option| { + self.push(option); + }); + } +} + +pub fn mount_sync<M>(mountpoint: M, options: &Options) -> Result<Start, MountError> +where + M: AsRef<OsStr>, +{ + let (left_side, right_side) = UnixStream::pair()?; + + // The fusermount protocol requires us to preserve right_fd across execve() + let right_fd = right_side.as_raw_fd(); + fcntl( + right_fd, + FcntlArg::F_SETFD( + FdFlag::from_bits(fcntl(right_fd, FcntlArg::F_GETFD).unwrap()).unwrap() + & !FdFlag::FD_CLOEXEC, + ), + ) + .unwrap(); + + let mut command = Command::new("fusermount3"); + if !options.0.is_empty() { + command.args(&[OsStr::new("-o"), &options.0, mountpoint.as_ref()]); + } else { + command.arg(mountpoint); + }; + + let mut fusermount = command.env("_FUSE_COMMFD", right_fd.to_string()).spawn()?; + + // recvmsg() should fail if fusermount exits (last open fd is closed) + drop(right_side); + + let session_fd = (|| { + let mut buffer = cmsg_space!(RawFd); + let message = recvmsg( + left_side.as_raw_fd(), + &[], + Some(&mut buffer), + MsgFlags::empty(), + ) + .map_err(from_nix_error)?; + + let session_fd = match message.cmsgs().next() { + Some(ControlMessageOwned::ScmRights(fds)) => fds.into_iter().next(), + _ => None, + }; + + session_fd.ok_or(MountError::Fusermount) + })(); + + match session_fd { + Ok(session_fd) => { + let fusermount_fd = DumbFd(left_side.into_raw_fd()); + let session_fd = DumbFd(session_fd); + Ok(Start { + fusermount_fd, + session_fd, + }) + } + + Err(error) => { + drop(left_side); + fusermount.wait()?; + Err(error) + } + } +} diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs new file mode 100644 index 0000000..7a0c18c --- /dev/null +++ b/src/fuse/ops.rs @@ -0,0 +1,366 @@ +use bytemuck::{bytes_of, Pod, Zeroable}; +use futures_util::stream::{Stream, StreamExt, TryStreamExt}; +use nix::sys::stat::SFlag; + +use std::{ + borrow::Borrow, + ffi::{CStr, OsStr}, + os::unix::ffi::OsStrExt, +}; + +use crate::{proto, util::OutputChain, Errno, Ino, TimeToLive}; + +use super::{ + fs::{Fuse, Inode, Tape}, + io::{AccessFlags, Entry, EntryType, FsInfo}, + session, Done, Operation, Reply, Request, +}; + +macro_rules! op { + { $name:ident $operation:tt $(,)+ } => { + pub struct $name(()); + + impl<'o, Fs: Fuse> Operation<'o, Fs> for $name $operation + }; + + { $name:ident $operation:tt, Request $request:tt $($next:tt)+ } => { + impl<'o, Fs: Fuse> Request<'o, Fs, $name> $request + + op! { $name $operation $($next)+ } + }; + + { $name:ident $operation:tt, Reply $reply:tt $($next:tt)+ } => { + impl<'o, Fs: Fuse> Reply<'o, Fs, $name> $reply + + op! { $name $operation $($next)+ } + }; +} + +op! { + Lookup { + // name() + type RequestBody = &'o CStr; + }, + + Request { + /// Returns the name of the entry being looked up in this directory. + pub fn name(&self) -> &OsStr { + c_to_os(self.body) + } + }, + + Reply { + /// The requested entry was found and a `Farc` was successfully determined from it. The + /// FUSE client will become aware of the found inode if it wasn't before. This result may + /// be cached by the client for up to the given TTL. + pub fn found(self, entry: &Fs::Farc, ttl: TimeToLive) -> Done<'o> { + let (attrs, attrs_ttl) = <Fs as Fuse>::Inode::attrs(entry); + session::unveil(&self.session, entry); + + self.single(&make_entry( + (<Fs as Fuse>::Inode::ino(entry), ttl), + (attrs.finish::<Fs>(entry), attrs_ttl), + )) + } + + /// The requested entry was not found in this directory. The FUSE clint may include this + /// response in negative cache for up to the given TTL. + pub fn not_found(self, ttl: TimeToLive) -> Done<'o> { + self.single(&make_entry((Ino::NULL, ttl), (Zeroable::zeroed(), Default::default()))) + } + + /// The requested entry was not found in this directory, but unlike [`Reply::not_found()`] + /// this does not report back a TTL to the FUSE client. The client should not cache the + /// response. + pub fn not_found_uncached(self) -> Done<'o> { + self.fail(Errno::ENOENT) + } + }, +} + +op! { + Readlink {}, + + Reply { + /// This inode corresponds to a symbolic link pointing to the given target path. + pub fn target(self, target: &OsStr) -> Done<'o> { + self.chain(OutputChain::tail(&[target.as_bytes()])) + } + + /// Same as [`Reply::target()`], except that the target path is taken from disjoint + /// slices. This involves no additional allocation. + pub fn gather_target(self, target: &[&OsStr]) -> Done<'o> { + //FIXME: Likely UB + self.chain(OutputChain::tail(unsafe { std::mem::transmute(target) })) + } + }, +} + +op! { + Open { + type RequestBody = &'o proto::OpenIn; + type ReplyTail = (Ino, proto::OpenOutFlags); + }, + + Reply { + /// The iinode may now be accessed. + pub fn ok(self) -> Done<'o> { + self.ok_with_handle(0) + } + + /*pub fn tape<R: Tape<Fuse = Fs>>(self, reel: R) -> Done<'o> { + let (ino, _) = self.tail; + self.ok_with_handle(session::allocate_handle(&self.session, ino, reel)) + }*/ + + fn ok_with_handle(self, handle: u64) -> Done<'o> { + let (_, flags) = self.tail; + self.single(&proto::OpenOut { + fh: handle, + open_flags: flags.bits(), + padding: Default::default(), + }) + } + }, +} + +op! { Read {}, } +/*op! { + Read { + type RequestBody = &'o proto::ReadIn; + type ReplyTail = &'o mut OutputBytes<'o>; + }, + + Request { + pub fn offset(&self) -> u64 { + self.body.offset + } + + pub fn size(&self) -> u32 { + self.body.size + } + }, + + Reply { + pub fn remaining(&self) -> u64 { + self.tail.remaining() + } + + pub fn end(self) -> Done<'o> { + if self.tail.ready() { + self.chain(OutputChain::tail(self.tail.segments())) + } else { + // The read() handler will be invoked again with same OutputBytes + self.done() + } + } + + pub fn hole(self, size: u64) -> Result<Self, Done<'o>> { + self.tail + } + + pub fn copy(self, data: &[u8]) -> Result<Self, Done<'o>> { + self.self_or_done(self.tail.copy(data)) + } + + pub fn put(self, data: &'o [u8]) -> Result<Self, Done<'o>> { + self.self_or_done(self.tail.put(data)) + } + + pub fn gather(self, data: &'o [&'o [u8]]) -> Result<Self, Done<'o>> { + self.self_or_done(self.tail.gather(data)) + } + + fn self_or_done(self, capacity: OutputCapacity) -> Result<Self, Done<'o>> { + match capacity { + OutputCapacity::Available => Ok(self), + OutputCapacity::Filled => Err(self.done()), + } + } + }, +}*/ + +op! { + Write { + type RequestBody = &'o proto::WriteIn; + }, +} + +op! { + Init { + type ReplyTail = &'o mut Result<Fs::Farc, i32>; + + fn consume_errno(errno: i32, tail: &mut Self::ReplyTail) { + **tail = Err(errno); + } + }, + + Reply { + /// Server-side initialization succeeded. The provided `Farc` references the filesystem's + /// root inode. + pub fn root(self, root: Fs::Farc) -> Done<'o> { + *self.tail = Ok(root); + self.done() + } + }, +} + +op! { + Statfs {}, + + Reply { + /// Replies with filesystem statistics. + pub fn info(self, statfs: FsInfo) -> Done<'o> { + let statfs: proto::StatfsOut = statfs.into(); + self.single(&statfs) + } + }, +} + +op! { + Opendir { + type RequestBody = &'o proto::OpendirIn; + }, +} + +op! { + Readdir { + type RequestBody = &'o proto::ReaddirIn; + }, + + Request { + /// Returns the base offset in the directory stream to read from. + pub fn offset(&self) -> u64 { + self.body.read_in.offset + } + }, + + Reply { + pub fn try_iter<'a, I, E, Ref>( + self, + mut entries: I, + ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> + where + I: Iterator<Item = Result<Entry<'a, Ref>, E>> + Send, + Ref: Borrow<Fs::Farc>, + { + //TODO: This is about as shitty as it gets + match entries.next().transpose() { + Ok(Some(entry)) => { + let Entry { + name, + inode, + offset, + .. + } = entry; + + let inode = inode.borrow(); + let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); + + let dirent = proto::Dirent { + ino, + off: offset, + namelen: name.len() as u32, + entry_type: (match <Fs as Fuse>::Inode::inode_type(inode) { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }) + .bits() + >> 12, + }; + + let dirent = bytes_of(&dirent); + let name = name.as_bytes(); + + let padding = [0; 8]; + let padding = &padding[..7 - (dirent.len() + name.len() - 1) % 8]; + + Ok(self.chain(OutputChain::tail(&[dirent, name, padding]))) + } + + Err(error) => Err((self, error)), + + Ok(None) => Ok(self.empty()), + } + } + + // See rust-lang/rust#61949 + pub async fn try_stream<'a, S, E, Ref>( + self, + entries: S, + ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> + where + S: Stream<Item = Result<Entry<'a, Ref>, E>> + Send, + Ref: Borrow<Fs::Farc> + Send, + E: Send, + { + //TODO: This is about as shitty as it gets + let first = entries.boxed().try_next().await; + self.try_iter(first.transpose().into_iter()) + } + }, +} + +op! { + Access { + type RequestBody = &'o proto::AccessIn; + }, + + Request { + pub fn mask(&self) -> AccessFlags { + AccessFlags::from_bits_truncate(self.body.mask as i32) + } + }, + + Reply { + pub fn ok(self) -> Done<'o> { + self.empty() + } + + pub fn permission_denied(self) -> Done<'o> { + self.fail(Errno::EACCES) + } + }, +} + +impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { + fn done(self) -> Done<'o> { + Done::from_result(Ok(())) + } + + fn empty(self) -> Done<'o> { + self.chain(OutputChain::empty()) + } + + fn single<P: Pod>(self, single: &P) -> Done<'o> { + self.chain(OutputChain::tail(&[bytes_of(single)])) + } + + fn chain(self, chain: OutputChain<'_>) -> Done<'o> { + Done::from_result(session::ok(&self.session, self.unique, chain)) + } +} + +fn c_to_os(string: &CStr) -> &OsStr { + OsStr::from_bytes(string.to_bytes()) +} + +fn make_entry( + (Ino(ino), entry_ttl): (Ino, TimeToLive), + (attrs, attr_ttl): (proto::Attrs, TimeToLive), +) -> proto::EntryOut { + proto::EntryOut { + nodeid: ino, + generation: 0, //TODO + entry_valid: entry_ttl.seconds, + attr_valid: attr_ttl.seconds, + entry_valid_nsec: entry_ttl.nanoseconds, + attr_valid_nsec: attr_ttl.nanoseconds, + attr: attrs, + } +} diff --git a/src/fuse/session.rs b/src/fuse/session.rs new file mode 100644 index 0000000..35ebb69 --- /dev/null +++ b/src/fuse/session.rs @@ -0,0 +1,569 @@ +use std::{ + collections::{hash_map, HashMap}, + convert::TryInto, + os::unix::io::IntoRawFd, + sync::{Arc, Mutex}, +}; + +use nix::{ + fcntl::{fcntl, FcntlArg, OFlag}, + sys::uio::{readv, writev, IoVec}, + unistd::{sysconf, SysconfVar}, +}; + +use tokio::{ + io::unix::AsyncFd, + sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore}, +}; + +use bytemuck::{bytes_of, try_from_bytes}; +use smallvec::SmallVec; + +use crate::{ + proto::{self, InHeader}, + util::{display_or, from_nix_error, OutputChain}, + Errno, FuseError, FuseResult, Ino, +}; + +use super::{ + fs::{Fuse, Inode}, + Reply, Request, Session, Start, +}; + +pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { + session.send(unique, 0, output) +} + +pub fn notify<Fs: Fuse>( + session: &Session<Fs>, + op: proto::NotifyCode, + output: OutputChain<'_>, +) -> FuseResult<()> { + session.send(0, op as i32, output) +} + +pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> { + if errno <= 0 { + log::warn!( + "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", + unique, + errno + ); + + errno = Errno::ENOMSG as i32; + } + + session.send(unique, -errno, OutputChain::empty()) +} + +pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) { + let ino = <Fs as Fuse>::Inode::ino(inode); + let mut known = session.known.lock().unwrap(); + + use hash_map::Entry::*; + match known.entry(ino) { + Occupied(entry) => { + let (_, count) = entry.into_mut(); + *count += 1; + } + + Vacant(entry) => { + entry.insert((Fs::Farc::clone(inode), 1)); + } + } +} + +pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> { + session.interrupt_tx.subscribe() +} + +impl<Fs: Fuse> Session<Fs> { + pub fn fs(&self) -> &Fs { + &self.fs + } + + pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> { + let this = Arc::clone(&self); + let main_loop = async move { + loop { + let incoming = this.receive().await; + let this = Arc::clone(&this); + + tokio::spawn(async move { + let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming { + Ok(mut incoming) => match this.dispatch(&mut incoming).await { + Ok(()) => (Ok(()), None), + + Err(error) => { + let data = incoming.buffer.data(); + let data = &data[..std::mem::size_of::<InHeader>().max(data.len())]; + (Err(error), try_from_bytes(data).ok().copied()) + } + }, + + Err(error) => (Err(error.into()), None), + }; + + let header = display_or(header, "(bad)"); + if let Err(error) = result { + log::error!("Handling request {}: {}", header, error); + } + }); + } + }; + + tokio::select! { + () = main_loop => unreachable!(), + () = self.destroy.notified() => Ok(()), + } + } + + async fn do_handshake( + &mut self, + pages_per_buffer: usize, + bytes_per_buffer: usize, + ) -> FuseResult<Handshake> { + use FuseError::*; + + let buffer = { + self.session_fd.readable().await?.retain_ready(); + let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap(); + + let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE])); + let sbo = match &mut data { + InputBufferStorage::Sbo(SboStorage(sbo)) => sbo, + _ => unreachable!(), + }; + + let mut io_vecs = [ + IoVec::from_mut_slice(sbo), + IoVec::from_mut_slice(large_buffer), + ]; + + let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(from_nix_error)?; + InputBuffer { bytes, data } + }; + + let request: proto::Request<'_> = buffer.data().try_into()?; + + let unique = request.header().unique; + let init = match request.body() { + proto::RequestBody::Init(body) => body, + _ => return Err(ProtocolInit), + }; + + use std::cmp::Ordering; + let supported = match init.major.cmp(&proto::MAJOR_VERSION) { + Ordering::Less => false, + + Ordering::Equal => { + self.proto_minor = init.minor; + self.proto_minor >= proto::REQUIRED_MINOR_VERSION + } + + Ordering::Greater => { + let tail = [bytes_of(&proto::MAJOR_VERSION)]; + ok(&self, unique, OutputChain::tail(&tail))?; + + return Ok(Handshake::Restart); + } + }; + + //TODO: fake some decency by supporting a few older minor versions + if !supported { + log::error!( + "Unsupported protocol {}.{}; this build requires \ + {major}.{}..={major}.{} (or a greater version \ + through compatibility)", + init.major, + init.minor, + proto::REQUIRED_MINOR_VERSION, + proto::TARGET_MINOR_VERSION, + major = proto::MAJOR_VERSION + ); + + fail(&self, unique, Errno::EPROTONOSUPPORT as i32)?; + return Err(ProtocolInit); + } + + let root = { + let mut init_result = Err(0); + let reply = Reply { + session: &self, + unique, + tail: &mut init_result, + }; + + self.fs.init(reply).await.into_result()?; + + match init_result { + Ok(root) => root, + Err(errno) => { + log::error!("init() handler failed: {}", Errno::from_i32(errno)); + return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno))); + } + } + }; + + self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1)); + + use proto::InitFlags; + let flags = InitFlags::from_bits_truncate(init.flags); + let supported = InitFlags::PARALLEL_DIROPS + | InitFlags::ABORT_ERROR + | InitFlags::MAX_PAGES + | InitFlags::CACHE_SYMLINKS; + + let flags = flags & supported; + let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>(); + let init_out = proto::InitOut { + major: proto::MAJOR_VERSION, + minor: proto::TARGET_MINOR_VERSION, + max_readahead: 0, //TODO + flags: flags.bits(), + max_background: 0, //TODO + congestion_threshold: 0, //TODO + max_write: max_write.try_into().unwrap(), + time_gran: 1, //TODO + max_pages: pages_per_buffer.try_into().unwrap(), + padding: Default::default(), + unused: Default::default(), + }; + + let tail = [bytes_of(&init_out)]; + ok(&self, unique, OutputChain::tail(&tail))?; + + Ok(Handshake::Done) + } + + async fn dispatch(self: &Arc<Self>, request: &mut Incoming<Fs>) -> FuseResult<()> { + let request: proto::Request<'_> = request.buffer.data().try_into()?; + let header = request.header(); + let InHeader { unique, ino, .. } = *header; + let ino = Ino(ino); + + use proto::RequestBody::*; + + macro_rules! op { + () => { + op!(()) + }; + + ($body:expr) => { + op!($body, ()) + }; + + ($body:expr, $tail:expr) => {{ + let request = Request { + header, + body: $body, + }; + let reply = Reply { + session: &self, + unique, + tail: $tail, + }; + + (request, reply, self) + }}; + } + + // These operations don't involve locking and searching self.known + match request.body() { + Forget(body) => { + self.forget(std::iter::once((ino, body.nlookup))).await; + return Ok(()); + } + + Statfs => return self.fs.statfs(op!()).await.into_result(), + + Interrupt(body) => { + //TODO: Don't reply with EAGAIN if the interrupt is successful + let _ = self.interrupt_tx.send(body.unique); + return fail(&self, unique, Errno::EAGAIN as i32); + } + + Destroy => { + self.destroy.notify_one(); + return Ok(()); + } + + BatchForget { forgets, .. } => { + let forgets = forgets + .iter() + .map(|target| (Ino(target.ino), target.nlookup)); + + self.forget(forgets).await; + return Ok(()); + } + + _ => (), + } + + // Some operations are handled while self.known is locked + let inode = { + let known = self.known.lock().unwrap(); + let inode = match known.get(&ino) { + Some((farc, _)) => farc, + None => { + log::error!( + "Lookup count for ino {} reached zero while still \ + known to the kernel, this is a bug", + ino + ); + + return fail(&self, unique, Errno::ENOANO as i32); + } + }; + + match request.body() { + Getattr(_) => { + //TODO: Getattr flags + let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode); + let attrs = attrs.finish::<Fs>(inode); + drop(known); + + let out = proto::AttrOut { + attr_valid: ttl.seconds, + attr_valid_nsec: ttl.nanoseconds, + dummy: Default::default(), + attr: attrs, + }; + + return ok(&self, unique, OutputChain::tail(&[bytes_of(&out)])); + } + + Access(body) => { + return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result() + } + + _ => inode.clone(), + } + }; + + macro_rules! inode_op { + ($op:ident, $($exprs:expr),+) => { + <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await + }; + } + + // These operations involve a Farc cloned from self.known + let done = match request.body() { + Lookup { name } => inode_op!(lookup, *name), + Readlink => inode_op!(readlink, ()), + Open(body) => { + let mut flags = proto::OpenOutFlags::empty(); + if <Fs as Fuse>::Inode::direct_io(&inode) { + flags |= proto::OpenOutFlags::DIRECT_IO; + } + + inode_op!(open, *body, (ino, flags)) + } + Opendir(body) => inode_op!(opendir, *body), + Readdir(body) => inode_op!(readdir, *body), + + _ => return fail(&self, unique, Errno::ENOSYS as i32), + }; + + done.into_result() + } + + async fn forget<I>(&self, targets: I) + where + I: Iterator<Item = (Ino, u64)>, + { + let mut known = self.known.lock().unwrap(); + + for (ino, subtracted) in targets { + use hash_map::Entry::*; + + match known.entry(ino) { + Occupied(mut entry) => { + let (_, count) = entry.get_mut(); + + *count = count.saturating_sub(subtracted); + if *count > 0 { + continue; + } + + entry.remove(); + } + + Vacant(_) => { + log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino); + continue; + } + } + } + } + + async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming<Fs>> { + use InputBufferStorage::*; + + let permit = Arc::clone(&self.input_semaphore) + .acquire_owned() + .await + .unwrap(); + + let mut incoming = Incoming { + session: Arc::clone(self), + buffer: InputBuffer { + bytes: 0, + data: Sbo(SboStorage([0; SBO_SIZE])), + }, + }; + + let sbo = match &mut incoming.buffer.data { + Sbo(SboStorage(sbo)) => sbo, + _ => unreachable!(), + }; + + loop { + let mut readable = self.session_fd.readable().await?; + + let mut large_buffers = self.large_buffers.lock().unwrap(); + let large_buffer = large_buffers.last_mut().unwrap(); + + let mut io_vecs = [ + IoVec::from_mut_slice(sbo), + IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]), + ]; + + match readable.try_io(|fd| readv(*fd.get_ref(), &mut io_vecs).map_err(from_nix_error)) { + Ok(Ok(bytes)) => { + if bytes > SBO_SIZE { + (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo); + incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit); + } + + incoming.buffer.bytes = bytes; + return Ok(incoming); + } + + // Interrupted + Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue, + + Ok(Err(error)) => return Err(error), + Err(_) => continue, + } + } + } + + fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { + let length = std::mem::size_of::<proto::OutHeader>(); + let length = length + + output + .iter() + .map(<[_]>::iter) + .flatten() + .copied() + .map(<[_]>::len) + .sum::<usize>(); + + let length = length.try_into().unwrap(); + let header = proto::OutHeader { + len: length, + error, + unique, + }; + + //TODO: Full const generics any time now? Fs::EXPECTED_REQUEST_SEGMENTS + let header = [bytes_of(&header)]; + let output = output.preceded(&header); + let buffers: SmallVec<[_; 8]> = output + .iter() + .map(<[_]>::iter) + .flatten() + .copied() + .filter(|slice| !slice.is_empty()) + .map(IoVec::from_slice) + .collect(); + + let written = writev(*self.session_fd.get_ref(), &buffers).map_err(from_nix_error)?; + if written == length as usize { + Ok(()) + } else { + Err(FuseError::ShortWrite) + } + } +} + +impl Start { + pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> { + let session_fd = self.session_fd.into_raw_fd(); + + let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; + fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); + + let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize; + let pages_per_buffer = fs.request_buffer_pages().get(); + let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap(); + assert!(bytes_per_buffer >= proto::MIN_READ_SIZE); + + let mut large_buffers = Vec::with_capacity(fs.request_buffers().get()); + for _ in 0..large_buffers.capacity() { + large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice()); + } + + let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); + let mut session = Session { + _fusermount_fd: self.fusermount_fd, + session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, + proto_minor: 0, // Set by Session::do_handshake() + fs, + input_semaphore: Arc::new(Semaphore::new(large_buffers.len())), + large_buffers: Mutex::new(large_buffers), + known: Mutex::new(HashMap::new()), + destroy: Notify::new(), + interrupt_tx, + }; + + loop { + let state = session + .do_handshake(pages_per_buffer, bytes_per_buffer) + .await?; + + if let Handshake::Done = state { + break Ok(Arc::new(session)); + } + } + } +} + +enum Handshake { + Done, + Restart, +} + +struct Incoming<Fs: Fuse> { + session: Arc<Session<Fs>>, + buffer: InputBuffer, +} + +struct InputBuffer { + pub bytes: usize, + pub data: InputBufferStorage, +} + +enum InputBufferStorage { + Sbo(SboStorage), + Spilled(Box<[u8]>, OwnedSemaphorePermit), +} + +#[repr(align(8))] +struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]); + +const SBO_SIZE: usize = std::mem::size_of::<SboStorage>(); +const INTERRUPT_BROADCAST_CAPACITY: usize = 32; + +impl InputBuffer { + fn data(&self) -> &[u8] { + use InputBufferStorage::*; + let storage = match &self.data { + Sbo(sbo) => &sbo.0, + Spilled(buffer, _) => &buffer[..], + }; + + &storage[..self.bytes] + } +} |
