summaryrefslogtreecommitdiff
path: root/src/fuse
diff options
context:
space:
mode:
authorAlejandro Soto <alejandro@34project.org>2022-01-04 06:49:48 -0600
committerAlejandro Soto <alejandro@34project.org>2022-01-04 06:49:52 -0600
commit70baa472b2bee69f205cc1aada304d597b858005 (patch)
tree7a0b1a0381b68fe0e091b87d00634ff13568bf6d /src/fuse
parent1955ec118a32d3fa174496abe5442c82c609273a (diff)
Move crate::fuse::* to the top-level
Diffstat (limited to 'src/fuse')
-rw-r--r--src/fuse/io.rs379
-rw-r--r--src/fuse/mod.rs43
-rw-r--r--src/fuse/mount.rs168
-rw-r--r--src/fuse/ops/dir.rs253
-rw-r--r--src/fuse/ops/entry.rs80
-rw-r--r--src/fuse/ops/global.rs88
-rw-r--r--src/fuse/ops/mod.rs68
-rw-r--r--src/fuse/ops/open.rs131
-rw-r--r--src/fuse/ops/rw.rs123
-rw-r--r--src/fuse/ops/xattr.rs139
-rw-r--r--src/fuse/session.rs559
11 files changed, 0 insertions, 2031 deletions
diff --git a/src/fuse/io.rs b/src/fuse/io.rs
deleted file mode 100644
index 7124fda..0000000
--- a/src/fuse/io.rs
+++ /dev/null
@@ -1,379 +0,0 @@
-use bytemuck::Zeroable;
-use nix::{errno::Errno, sys::stat::SFlag};
-
-use std::{
- convert::Infallible,
- ffi::OsStr,
- future::Future,
- ops::{ControlFlow, FromResidual, Try},
-};
-
-use super::{Done, Operation, Reply, Request};
-use crate::{proto, FuseResult, Ino, Timestamp, Ttl};
-
-#[doc(no_inline)]
-pub use nix::{
- dir::Type as EntryType,
- fcntl::OFlag as OpenFlags,
- sys::stat::Mode,
- unistd::{AccessFlags, Gid, Pid, Uid},
-};
-
-pub enum Interruptible<'o, O: Operation<'o>, T> {
- Completed(Reply<'o, O>, T),
- Interrupted(Done<'o>),
-}
-
-pub trait Stat {
- fn ino(&self) -> Ino;
- fn inode_type(&self) -> EntryType;
- fn attrs(&self) -> (Attrs, Ttl);
-}
-
-pub trait Known {
- type Inode: Stat;
-
- fn inode(&self) -> &Self::Inode;
- fn unveil(self);
-}
-
-pub struct Failed<'o, E>(pub Done<'o>, pub E);
-
-pub trait Finish<'o, O: Operation<'o>> {
- fn finish(&self, reply: Reply<'o, O>) -> Done<'o>;
-}
-
-#[derive(Clone)]
-pub struct Attrs(proto::Attrs);
-
-pub struct Entry<'a, K> {
- pub offset: u64,
- pub name: &'a OsStr,
- pub inode: K,
- pub ttl: Ttl,
-}
-
-#[derive(Copy, Clone)]
-pub struct FsInfo(proto::StatfsOut);
-
-impl<'o, E> From<Failed<'o, E>> for Done<'o> {
- fn from(failed: Failed<'o, E>) -> Done<'o> {
- failed.0
- }
-}
-
-impl<'o, O: Operation<'o>> Finish<'o, O> for Errno {
- fn finish(&self, reply: Reply<'o, O>) -> Done<'o> {
- reply.fail(*self)
- }
-}
-
-impl<'o, O: Operation<'o>> Finish<'o, O> for std::io::Error {
- fn finish(&self, reply: Reply<'o, O>) -> Done<'o> {
- reply.fail(
- self.raw_os_error()
- .map(Errno::from_i32)
- .unwrap_or(Errno::EIO),
- )
- }
-}
-
-impl<'o, O: Operation<'o>> Request<'o, O> {
- pub fn ino(&self) -> Ino {
- Ino(self.header.ino)
- }
-
- pub fn generation(&self) -> u64 {
- 0
- }
-
- pub fn uid(&self) -> Uid {
- Uid::from_raw(self.header.uid)
- }
-
- pub fn gid(&self) -> Gid {
- Gid::from_raw(self.header.gid)
- }
-
- pub fn pid(&self) -> Pid {
- Pid::from_raw(self.header.pid as i32)
- }
-}
-
-impl<'o, O: Operation<'o>> Reply<'o, O> {
- pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, O, T>
- where
- F: Future<Output = T>,
- {
- tokio::pin!(f);
- let mut rx = self.session.interrupt_rx();
-
- use Interruptible::*;
- loop {
- tokio::select! {
- output = &mut f => break Completed(self, output),
-
- result = rx.recv() => match result {
- Ok(unique) if unique == self.unique => {
- break Interrupted(self.interrupted());
- }
-
- _ => continue,
- }
- }
- }
- }
-
- pub fn and_then<T, E>(self, result: Result<T, E>) -> Result<(Self, T), Failed<'o, E>>
- where
- E: Finish<'o, O>,
- {
- match result {
- Ok(t) => Ok((self, t)),
- Err(error) => {
- let done = error.finish(self);
- Err(Failed(done, error))
- }
- }
- }
-
- pub fn fail(self, errno: Errno) -> Done<'o> {
- let result = self.session.fail(self.unique, errno as i32);
- self.finish(result)
- }
-
- pub fn not_implemented(self) -> Done<'o> {
- self.fail(Errno::ENOSYS)
- }
-
- pub fn not_permitted(self) -> Done<'o> {
- self.fail(Errno::EPERM)
- }
-
- pub fn io_error(self) -> Done<'o> {
- self.fail(Errno::EIO)
- }
-
- pub fn invalid_argument(self) -> Done<'o> {
- self.fail(Errno::EINVAL)
- }
-
- pub fn interrupted(self) -> Done<'o> {
- self.fail(Errno::EINTR)
- }
-
- pub(crate) fn finish(self, result: FuseResult<()>) -> Done<'o> {
- if let Err(error) = result {
- log::error!("Replying to request {}: {}", self.unique, error);
- }
-
- Done::new()
- }
-}
-
-impl<'o, O: Operation<'o>> From<(Reply<'o, O>, Errno)> for Done<'o> {
- fn from((reply, errno): (Reply<'o, O>, Errno)) -> Done<'o> {
- reply.fail(errno)
- }
-}
-
-impl<'o> FromResidual<Done<'o>> for Done<'o> {
- fn from_residual(residual: Done<'o>) -> Self {
- residual
- }
-}
-
-impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> {
- fn from_residual(residual: Result<Infallible, T>) -> Self {
- match residual {
- Ok(_) => unreachable!(),
- Err(t) => t.into(),
- }
- }
-}
-
-impl<'o, O: Operation<'o>> FromResidual<Interruptible<'o, O, Infallible>> for Done<'o> {
- fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self {
- match residual {
- Interruptible::Completed(_, _) => unreachable!(),
- Interruptible::Interrupted(done) => done,
- }
- }
-}
-
-impl Try for Done<'_> {
- type Output = Self;
- type Residual = Self;
-
- fn from_output(output: Self::Output) -> Self {
- output
- }
-
- fn branch(self) -> ControlFlow<Self::Residual, Self::Output> {
- ControlFlow::Break(self)
- }
-}
-
-impl<'o, O: Operation<'o>, T> FromResidual<Interruptible<'o, O, Infallible>>
- for Interruptible<'o, O, T>
-{
- fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self {
- use Interruptible::*;
-
- match residual {
- Completed(_, _) => unreachable!(),
- Interrupted(done) => Interrupted(done),
- }
- }
-}
-
-impl<'o, O: Operation<'o>, T> Try for Interruptible<'o, O, T> {
- type Output = (Reply<'o, O>, T);
- type Residual = Interruptible<'o, O, Infallible>;
-
- fn from_output((reply, t): Self::Output) -> Self {
- Self::Completed(reply, t)
- }
-
- fn branch(self) -> ControlFlow<Self::Residual, Self::Output> {
- use Interruptible::*;
-
- match self {
- Completed(reply, t) => ControlFlow::Continue((reply, t)),
- Interrupted(done) => ControlFlow::Break(Interrupted(done)),
- }
- }
-}
-
-impl Attrs {
- #[must_use]
- pub fn size(self, size: u64) -> Self {
- Attrs(proto::Attrs { size, ..self.0 })
- }
-
- #[must_use]
- pub fn owner(self, uid: Uid, gid: Gid) -> Self {
- Attrs(proto::Attrs {
- uid: uid.as_raw(),
- gid: gid.as_raw(),
- ..self.0
- })
- }
-
- #[must_use]
- pub fn mode(self, mode: Mode) -> Self {
- Attrs(proto::Attrs {
- mode: mode.bits(),
- ..self.0
- })
- }
-
- #[must_use]
- pub fn blocks(self, blocks: u64) -> Self {
- Attrs(proto::Attrs { blocks, ..self.0 })
- }
-
- #[must_use]
- pub fn block_size(self, block_size: u32) -> Self {
- Attrs(proto::Attrs {
- blksize: block_size,
- ..self.0
- })
- }
-
- #[must_use]
- pub fn device(self, device: u32) -> Self {
- Attrs(proto::Attrs {
- rdev: device,
- ..self.0
- })
- }
-
- #[must_use]
- pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self {
- Attrs(proto::Attrs {
- atime: access.seconds as _,
- mtime: modify.seconds as _,
- ctime: change.seconds as _,
- atimensec: access.nanoseconds,
- mtimensec: modify.nanoseconds,
- ctimensec: change.nanoseconds,
- ..self.0
- })
- }
-
- #[must_use]
- pub fn links(self, links: u32) -> Self {
- Attrs(proto::Attrs {
- nlink: links,
- ..self.0
- })
- }
-
- pub(crate) fn finish(self, inode: &impl Stat) -> proto::Attrs {
- let Ino(ino) = inode.ino();
- let inode_type = match inode.inode_type() {
- EntryType::Fifo => SFlag::S_IFIFO,
- EntryType::CharacterDevice => SFlag::S_IFCHR,
- EntryType::Directory => SFlag::S_IFDIR,
- EntryType::BlockDevice => SFlag::S_IFBLK,
- EntryType::File => SFlag::S_IFREG,
- EntryType::Symlink => SFlag::S_IFLNK,
- EntryType::Socket => SFlag::S_IFSOCK,
- };
-
- proto::Attrs {
- ino,
- mode: self.0.mode | inode_type.bits(),
- ..self.0
- }
- }
-}
-
-impl Default for Attrs {
- fn default() -> Self {
- Attrs(Zeroable::zeroed()).links(1)
- }
-}
-
-impl FsInfo {
- #[must_use]
- pub fn blocks(self, size: u32, total: u64, free: u64, available: u64) -> Self {
- FsInfo(proto::StatfsOut {
- bsize: size,
- blocks: total,
- bfree: free,
- bavail: available,
- ..self.0
- })
- }
-
- #[must_use]
- pub fn inodes(self, total: u64, free: u64) -> Self {
- FsInfo(proto::StatfsOut {
- files: total,
- ffree: free,
- ..self.0
- })
- }
-
- #[must_use]
- pub fn max_filename(self, max: u32) -> Self {
- FsInfo(proto::StatfsOut {
- namelen: max,
- ..self.0
- })
- }
-}
-
-impl Default for FsInfo {
- fn default() -> Self {
- FsInfo(Zeroable::zeroed())
- }
-}
-
-impl From<FsInfo> for proto::StatfsOut {
- fn from(FsInfo(statfs): FsInfo) -> proto::StatfsOut {
- statfs
- }
-}
diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs
deleted file mode 100644
index 84c6878..0000000
--- a/src/fuse/mod.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-use crate::proto;
-use std::marker::PhantomData;
-
-pub mod io;
-pub mod mount;
-pub mod ops;
-pub mod session;
-
-mod private_trait {
- pub trait Sealed {}
-}
-
-pub trait Operation<'o>: private_trait::Sealed + Sized {
- type RequestBody: crate::proto::Structured<'o>;
- type ReplyTail;
-}
-
-pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>);
-
-pub struct Request<'o, O: Operation<'o>> {
- header: proto::InHeader,
- body: O::RequestBody,
-}
-
-#[must_use]
-pub struct Reply<'o, O: Operation<'o>> {
- session: &'o session::Session,
- unique: u64,
- tail: O::ReplyTail,
-}
-
-#[must_use]
-pub struct Done<'o>(PhantomData<&'o mut &'o ()>);
-
-impl Done<'_> {
- fn new() -> Self {
- Done(PhantomData)
- }
-
- fn consume(self) {
- drop(self);
- }
-}
diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs
deleted file mode 100644
index c924a9a..0000000
--- a/src/fuse/mount.rs
+++ /dev/null
@@ -1,168 +0,0 @@
-use std::{
- ffi::{OsStr, OsString},
- io,
- os::unix::{
- ffi::OsStrExt,
- io::{AsRawFd, RawFd},
- net::UnixStream,
- },
- path::{Path, PathBuf},
- process::Command,
-};
-
-use nix::{
- self, cmsg_space,
- fcntl::{fcntl, FcntlArg, FdFlag},
- sys::socket::{recvmsg, ControlMessageOwned, MsgFlags},
-};
-
-use quick_error::quick_error;
-
-use super::session::Start;
-use crate::util::DumbFd;
-
-quick_error! {
- #[derive(Debug)]
- pub enum MountError {
- Io(err: std::io::Error) { from() }
- Fusermount { display("fusermount failed") }
- }
-}
-
-#[derive(Default)]
-pub struct Options(OsString);
-
-impl Options {
- pub fn fs_name<O: AsRef<OsStr>>(&mut self, fs_name: O) -> &mut Self {
- self.push_key_value("fsname", fs_name)
- }
-
- pub fn read_only(&mut self) -> &mut Self {
- self.push("ro")
- }
-
- pub fn push<O: AsRef<OsStr>>(&mut self, option: O) -> &mut Self {
- self.push_parts(&[option.as_ref()])
- }
-
- pub fn push_key_value<K, V>(&mut self, key: K, value: V) -> &mut Self
- where
- K: AsRef<OsStr>,
- V: AsRef<OsStr>,
- {
- let (key, value) = (key.as_ref(), value.as_ref());
-
- let assert_valid = |part: &OsStr| {
- let bytes = part.as_bytes();
- assert!(
- !bytes.is_empty() && bytes.iter().all(|b| !matches!(*b, b',' | b'=')),
- "invalid key or value: {}",
- part.to_string_lossy()
- );
- };
-
- assert_valid(key);
- assert_valid(value);
-
- self.push_parts(&[key, OsStr::new("="), value])
- }
-
- fn push_parts(&mut self, segment: &[&OsStr]) -> &mut Self {
- if !self.0.is_empty() {
- self.0.push(",");
- }
-
- let start = self.0.as_bytes().len();
- segment.iter().for_each(|part| self.0.push(part));
-
- let bytes = self.0.as_bytes();
- let last = bytes.len() - 1;
-
- assert!(
- last >= start && bytes[start] != b',' && bytes[last] != b',',
- "invalid option string: {}",
- OsStr::from_bytes(&bytes[start..]).to_string_lossy()
- );
-
- self
- }
-}
-
-impl<O: AsRef<OsStr>> Extend<O> for Options {
- fn extend<I: IntoIterator<Item = O>>(&mut self, iter: I) {
- iter.into_iter().for_each(|option| {
- self.push(option);
- });
- }
-}
-
-pub fn mount_sync<M>(mountpoint: M, options: &Options) -> Result<Start, MountError>
-where
- M: AsRef<Path> + Into<PathBuf>,
-{
- let (left_side, right_side) = UnixStream::pair()?;
-
- // The fusermount protocol requires us to preserve right_fd across execve()
- let right_fd = right_side.as_raw_fd();
- fcntl(
- right_fd,
- FcntlArg::F_SETFD(
- FdFlag::from_bits(fcntl(right_fd, FcntlArg::F_GETFD).unwrap()).unwrap()
- & !FdFlag::FD_CLOEXEC,
- ),
- )
- .unwrap();
-
- let mut command = Command::new(FUSERMOUNT_CMD);
- if !options.0.is_empty() {
- command.args(&[OsStr::new("-o"), &options.0]);
- }
-
- command.args(&[OsStr::new("--"), mountpoint.as_ref().as_ref()]);
- let mut fusermount = command.env("_FUSE_COMMFD", right_fd.to_string()).spawn()?;
-
- // recvmsg() should fail if fusermount exits (last open fd is closed)
- drop(right_side);
-
- let session_fd = {
- let mut buffer = cmsg_space!(RawFd);
- let message = recvmsg(
- left_side.as_raw_fd(),
- &[],
- Some(&mut buffer),
- MsgFlags::empty(),
- )
- .map_err(io::Error::from)?;
-
- let session_fd = match message.cmsgs().next() {
- Some(ControlMessageOwned::ScmRights(fds)) => fds.into_iter().next(),
- _ => None,
- };
-
- session_fd.ok_or(MountError::Fusermount)
- };
-
- match session_fd {
- Ok(session_fd) => Ok(Start::new(DumbFd(session_fd), mountpoint.into())),
-
- Err(error) => {
- drop(left_side);
- fusermount.wait()?;
- Err(error)
- }
- }
-}
-
-pub(crate) fn unmount_sync<M: AsRef<OsStr>>(mountpoint: M) -> Result<(), MountError> {
- let status = Command::new(FUSERMOUNT_CMD)
- .args(&[OsStr::new("-zuq"), OsStr::new("--"), mountpoint.as_ref()])
- .status()?;
-
- if status.success() {
- Ok(())
- } else {
- Err(MountError::Fusermount)
- }
-}
-
-const FUSERMOUNT_CMD: &str = "fusermount3";
diff --git a/src/fuse/ops/dir.rs b/src/fuse/ops/dir.rs
deleted file mode 100644
index 76267ec..0000000
--- a/src/fuse/ops/dir.rs
+++ /dev/null
@@ -1,253 +0,0 @@
-use std::{
- convert::Infallible,
- ffi::{CStr, OsStr},
- marker::PhantomData,
- os::unix::ffi::OsStrExt,
-};
-
-use crate::fuse::{
- io::{Entry, EntryType, Interruptible, Known, Stat},
- private_trait::Sealed,
- Done, Operation, Reply, Request,
-};
-
-use super::{c_to_os, FromRequest};
-use crate::{proto, Errno, Ino, Ttl};
-use bytemuck::{bytes_of, Zeroable};
-use bytes::BufMut;
-use nix::sys::stat::SFlag;
-
-pub enum Lookup {}
-pub enum Readdir {}
-pub struct BufferedReaddir<B>(Infallible, PhantomData<B>);
-
-pub struct ReaddirState<B> {
- max_read: usize,
- is_plus: bool,
- buffer: B,
-}
-
-impl Sealed for Lookup {}
-impl Sealed for Readdir {}
-impl<B> Sealed for BufferedReaddir<B> {}
-
-impl<'o> Operation<'o> for Lookup {
- type RequestBody = &'o CStr; // name()
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Readdir {
- type RequestBody = proto::OpcodeSelect<
- &'o proto::ReaddirPlusIn,
- &'o proto::ReaddirIn,
- { proto::Opcode::ReaddirPlus as u32 },
- >;
-
- type ReplyTail = ReaddirState<()>;
-}
-
-impl<'o, B> Operation<'o> for BufferedReaddir<B> {
- type RequestBody = (); // Never actually created
- type ReplyTail = ReaddirState<B>;
-}
-
-impl<'o> Request<'o, Lookup> {
- /// Returns the name of the entry being looked up in this directory.
- pub fn name(&self) -> &OsStr {
- c_to_os(self.body)
- }
-}
-
-impl<'o> Reply<'o, Lookup> {
- /// The requested entry was found. The FUSE client will become aware of the found inode if
- /// it wasn't before. This result may be cached by the client for up to the given TTL.
- pub fn found(self, entry: impl Known, ttl: Ttl) -> Done<'o> {
- let (attrs, attrs_ttl) = entry.inode().attrs();
- let attrs = attrs.finish(entry.inode());
-
- let done = self.single(&make_entry((entry.inode().ino(), ttl), (attrs, attrs_ttl)));
- entry.unveil();
-
- done
- }
-
- /// The requested entry was not found in this directory. The FUSE clint may include this
- /// response in negative cache for up to the given TTL.
- pub fn not_found(self, ttl: Ttl) -> Done<'o> {
- self.single(&make_entry(
- (Ino::NULL, ttl),
- (Zeroable::zeroed(), Ttl::NULL),
- ))
- }
-
- /// The requested entry was not found in this directory, but unlike [`Reply::not_found()`]
- /// this does not report back a TTL to the FUSE client. The client should not cache the
- /// response.
- pub fn not_found_uncached(self) -> Done<'o> {
- self.fail(Errno::ENOENT)
- }
-}
-
-impl<'o> Request<'o, Readdir> {
- pub fn handle(&self) -> u64 {
- self.read_in().fh
- }
-
- /// Returns the base offset in the directory stream to read from.
- pub fn offset(&self) -> u64 {
- self.read_in().offset
- }
-
- pub fn size(&self) -> u32 {
- self.read_in().size
- }
-
- fn read_in(&self) -> &proto::ReadIn {
- use proto::OpcodeSelect::*;
-
- match &self.body {
- Match(readdir_plus) => &readdir_plus.read_in,
- Alt(readdir) => &readdir.read_in,
- }
- }
-}
-
-impl<'o> Reply<'o, Readdir> {
- pub fn buffered<B>(self, buffer: B) -> Reply<'o, BufferedReaddir<B>>
- where
- B: BufMut + AsRef<[u8]>,
- {
- assert!(buffer.as_ref().is_empty());
-
- let ReaddirState {
- max_read,
- is_plus,
- buffer: (),
- } = self.tail;
-
- Reply {
- session: self.session,
- unique: self.unique,
- tail: ReaddirState {
- max_read,
- is_plus,
- buffer,
- },
- }
- }
-}
-
-impl<'o, B: BufMut + AsRef<[u8]>> Reply<'o, BufferedReaddir<B>> {
- pub fn entry(mut self, entry: Entry<impl Known>) -> Interruptible<'o, BufferedReaddir<B>, ()> {
- let entry_header_len = if self.tail.is_plus {
- std::mem::size_of::<proto::DirentPlus>()
- } else {
- std::mem::size_of::<proto::Dirent>()
- };
-
- let name = entry.name.as_bytes();
- let padding_len = dirent_pad_bytes(entry_header_len + name.len());
-
- let buffer = &mut self.tail.buffer;
- let remaining = buffer
- .remaining_mut()
- .min(self.tail.max_read - buffer.as_ref().len());
-
- let record_len = entry_header_len + name.len() + padding_len;
- if remaining < record_len {
- if buffer.as_ref().is_empty() {
- log::error!("Buffer for readdir req #{} is too small", self.unique);
- return Interruptible::Interrupted(self.fail(Errno::ENOBUFS));
- }
-
- return Interruptible::Interrupted(self.end());
- }
-
- let inode = entry.inode.inode();
- let entry_type = match inode.inode_type() {
- EntryType::Fifo => SFlag::S_IFIFO,
- EntryType::CharacterDevice => SFlag::S_IFCHR,
- EntryType::Directory => SFlag::S_IFDIR,
- EntryType::BlockDevice => SFlag::S_IFBLK,
- EntryType::File => SFlag::S_IFREG,
- EntryType::Symlink => SFlag::S_IFLNK,
- EntryType::Socket => SFlag::S_IFSOCK,
- };
-
- let ino = inode.ino();
- let dirent = proto::Dirent {
- ino: ino.as_raw(),
- off: entry.offset,
- namelen: name.len().try_into().unwrap(),
- entry_type: entry_type.bits() >> 12,
- };
-
- enum Ent {
- Dirent(proto::Dirent),
- DirentPlus(proto::DirentPlus),
- }
-
- let ent = if self.tail.is_plus {
- let (attrs, attrs_ttl) = inode.attrs();
- let attrs = attrs.finish(inode);
- let entry_out = make_entry((ino, entry.ttl), (attrs, attrs_ttl));
-
- if name != ".".as_bytes() && name != "..".as_bytes() {
- entry.inode.unveil();
- }
-
- Ent::DirentPlus(proto::DirentPlus { entry_out, dirent })
- } else {
- Ent::Dirent(dirent)
- };
-
- let entry_header = match &ent {
- Ent::Dirent(dirent) => bytes_of(dirent),
- Ent::DirentPlus(dirent_plus) => bytes_of(dirent_plus),
- };
-
- buffer.put_slice(entry_header);
- buffer.put_slice(name);
- buffer.put_slice(&[0; 7][..padding_len]);
-
- if remaining - record_len >= entry_header.len() + (1 << proto::DIRENT_ALIGNMENT_BITS) {
- Interruptible::Completed(self, ())
- } else {
- Interruptible::Interrupted(self.end())
- }
- }
-
- pub fn end(self) -> Done<'o> {
- self.inner(|this| this.tail.buffer.as_ref())
- }
-}
-
-impl<'o> FromRequest<'o, Readdir> for ReaddirState<()> {
- fn from_request(request: &Request<'o, Readdir>) -> Self {
- ReaddirState {
- max_read: request.size() as usize,
- is_plus: matches!(request.body, proto::OpcodeSelect::Match(_)),
- buffer: (),
- }
- }
-}
-
-fn make_entry(
- (Ino(ino), entry_ttl): (Ino, Ttl),
- (attrs, attr_ttl): (proto::Attrs, Ttl),
-) -> proto::EntryOut {
- proto::EntryOut {
- nodeid: ino,
- generation: 0, //TODO
- entry_valid: entry_ttl.seconds,
- attr_valid: attr_ttl.seconds,
- entry_valid_nsec: entry_ttl.nanoseconds,
- attr_valid_nsec: attr_ttl.nanoseconds,
- attr: attrs,
- }
-}
-
-fn dirent_pad_bytes(entry_len: usize) -> usize {
- const ALIGN_MASK: usize = (1 << proto::DIRENT_ALIGNMENT_BITS) - 1;
- ((entry_len + ALIGN_MASK) & !ALIGN_MASK) - entry_len
-}
diff --git a/src/fuse/ops/entry.rs b/src/fuse/ops/entry.rs
deleted file mode 100644
index 9475a61..0000000
--- a/src/fuse/ops/entry.rs
+++ /dev/null
@@ -1,80 +0,0 @@
-use crate::{proto, Ino};
-use crate::fuse::{io::Stat, private_trait::Sealed, Done, Operation, Reply, Request};
-
-pub enum Forget {}
-pub enum Getattr {}
-
-impl Sealed for Forget {}
-impl Sealed for Getattr {}
-
-impl<'o> Operation<'o> for Forget {
- type RequestBody = proto::OpcodeSelect<
- (&'o proto::BatchForgetIn, &'o [proto::ForgetOne]),
- &'o proto::ForgetIn,
- { proto::Opcode::BatchForget as u32 },
- >;
-
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Getattr {
- type RequestBody = &'o proto::GetattrIn;
- type ReplyTail = ();
-}
-
-impl<'o> Request<'o, Forget> {
- pub fn forget_list(&self) -> impl '_ + Iterator<Item = (Ino, u64)> {
- use proto::OpcodeSelect::*;
-
- enum List<'a> {
- Single(Option<(Ino, u64)>),
- Batch(std::slice::Iter<'a, proto::ForgetOne>),
- }
-
- impl Iterator for List<'_> {
- type Item = (Ino, u64);
-
- fn next(&mut self) -> Option<Self::Item> {
- match self {
- List::Single(single) => single.take(),
- List::Batch(batch) => {
- let forget = batch.next()?;
- Some((Ino(forget.ino), forget.nlookup))
- }
- }
- }
- }
-
- match self.body {
- Match((_, slice)) => List::Batch(slice.iter()),
- Alt(single) => List::Single(Some((self.ino(), single.nlookup))),
- }
- }
-}
-
-impl<'o> Reply<'o, Forget> {
- pub fn ok(self) -> Done<'o> {
- // No reply for forget requests
- Done::new()
- }
-}
-
-impl<'o> Request<'o, Getattr> {
- pub fn handle(&self) -> u64 {
- self.body.fh
- }
-}
-
-impl<'o> Reply<'o, Getattr> {
- pub fn known(self, inode: &impl Stat) -> Done<'o> {
- let (attrs, ttl) = inode.attrs();
- let attrs = attrs.finish(inode);
-
- self.single(&proto::AttrOut {
- attr_valid: ttl.seconds,
- attr_valid_nsec: ttl.nanoseconds,
- dummy: Default::default(),
- attr: attrs,
- })
- }
-}
diff --git a/src/fuse/ops/global.rs b/src/fuse/ops/global.rs
deleted file mode 100644
index e2ecd7b..0000000
--- a/src/fuse/ops/global.rs
+++ /dev/null
@@ -1,88 +0,0 @@
-use crate::{proto, util::page_size};
-use crate::fuse::{io::FsInfo, private_trait::Sealed, Done, Operation, Reply};
-
-pub enum Init {}
-pub enum Statfs {}
-
-pub struct InitState {
- pub(crate) kernel_flags: proto::InitFlags,
- pub(crate) buffer_pages: usize,
-}
-
-impl Sealed for Init {}
-impl Sealed for Statfs {}
-
-impl<'o> Operation<'o> for Init {
- type RequestBody = &'o proto::InitIn;
- type ReplyTail = InitState;
-}
-
-impl<'o> Operation<'o> for Statfs {
- type RequestBody = ();
- type ReplyTail = ();
-}
-
-impl<'o> Reply<'o, Init> {
- pub fn ok(self) -> Done<'o> {
- let InitState {
- kernel_flags,
- buffer_pages,
- } = self.tail;
-
- let flags = {
- use proto::InitFlags;
-
- //TODO: Conditions for these feature flags
- // - Locks
- // - ASYNC_DIO
- // - WRITEBACK_CACHE
- // - NO_OPEN_SUPPORT
- // - HANDLE_KILLPRIV
- // - POSIX_ACL
- // - NO_OPENDIR_SUPPORT
- // - EXPLICIT_INVAL_DATA
-
- let supported = InitFlags::ASYNC_READ
- | InitFlags::FILE_OPS
- | InitFlags::ATOMIC_O_TRUNC
- | InitFlags::EXPORT_SUPPORT
- | InitFlags::BIG_WRITES
- | InitFlags::HAS_IOCTL_DIR
- | InitFlags::AUTO_INVAL_DATA
- | InitFlags::DO_READDIRPLUS
- | InitFlags::READDIRPLUS_AUTO
- | InitFlags::PARALLEL_DIROPS
- | InitFlags::ABORT_ERROR
- | InitFlags::MAX_PAGES
- | InitFlags::CACHE_SYMLINKS;
-
- kernel_flags & supported
- };
-
- let buffer_size = page_size() * buffer_pages;
-
- // See fs/fuse/dev.c in the kernel source tree for details about max_write
- let max_write = buffer_size - std::mem::size_of::<(proto::InHeader, proto::WriteIn)>();
-
- self.single(&proto::InitOut {
- major: proto::MAJOR_VERSION,
- minor: proto::TARGET_MINOR_VERSION,
- max_readahead: 0, //TODO
- flags: flags.bits(),
- max_background: 0, //TODO
- congestion_threshold: 0, //TODO
- max_write: max_write.try_into().unwrap(),
- time_gran: 1, //TODO
- max_pages: buffer_pages.try_into().unwrap(),
- padding: Default::default(),
- unused: Default::default(),
- })
- }
-}
-
-impl<'o> Reply<'o, Statfs> {
- /// Replies with filesystem statistics.
- pub fn info(self, statfs: &FsInfo) -> Done<'o> {
- self.single(&proto::StatfsOut::from(*statfs))
- }
-}
diff --git a/src/fuse/ops/mod.rs b/src/fuse/ops/mod.rs
deleted file mode 100644
index 39a4ef0..0000000
--- a/src/fuse/ops/mod.rs
+++ /dev/null
@@ -1,68 +0,0 @@
-use std::{
- ffi::{CStr, OsStr},
- os::unix::ffi::OsStrExt,
-};
-
-use crate::util::OutputChain;
-use super::{private_trait::Sealed, Done, Operation, Reply, Request};
-use bytemuck::{bytes_of, Pod};
-
-mod dir;
-mod entry;
-mod global;
-mod open;
-mod rw;
-mod xattr;
-
-pub use dir::{BufferedReaddir, Lookup, Readdir};
-pub use entry::{Forget, Getattr};
-pub use global::{Init, Statfs};
-pub use open::{Access, Open, Opendir, Release, Releasedir};
-pub use rw::{Flush, Read, Readlink, Write};
-pub use xattr::{Getxattr, Listxattr, Removexattr, Setxattr};
-
-pub(crate) use global::InitState;
-
-pub trait FromRequest<'o, O: Operation<'o>> {
- //TODO: Shouldn't be public
- fn from_request(request: &Request<'o, O>) -> Self;
-}
-
-pub enum Any {}
-
-impl Sealed for Any {}
-
-impl<'o> Operation<'o> for Any {
- type RequestBody = ();
- type ReplyTail = ();
-}
-
-impl<'o, O: Operation<'o>> FromRequest<'o, O> for () {
- fn from_request(_request: &Request<'o, O>) -> Self {}
-}
-
-impl<'o, O: Operation<'o>> Reply<'o, O> {
- fn empty(self) -> Done<'o> {
- self.chain(OutputChain::empty())
- }
-
- fn single<P: Pod>(self, single: &P) -> Done<'o> {
- self.chain(OutputChain::tail(&[bytes_of(single)]))
- }
-
- fn inner(self, deref: impl FnOnce(&Self) -> &[u8]) -> Done<'o> {
- let result = self
- .session
- .ok(self.unique, OutputChain::tail(&[deref(&self)]));
- self.finish(result)
- }
-
- fn chain(self, chain: OutputChain<'_>) -> Done<'o> {
- let result = self.session.ok(self.unique, chain);
- self.finish(result)
- }
-}
-
-fn c_to_os(c_str: &CStr) -> &OsStr {
- OsStr::from_bytes(c_str.to_bytes())
-}
diff --git a/src/fuse/ops/open.rs b/src/fuse/ops/open.rs
deleted file mode 100644
index ff41d05..0000000
--- a/src/fuse/ops/open.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-use crate::fuse::{
- io::{AccessFlags, OpenFlags},
- private_trait::Sealed,
- Done, Operation, Reply, Request,
-};
-
-use crate::{proto, Errno};
-use super::FromRequest;
-
-pub enum Open {}
-pub enum Release {}
-pub enum Opendir {}
-pub enum Releasedir {}
-pub enum Access {}
-
-impl Sealed for Open {}
-impl Sealed for Release {}
-impl Sealed for Opendir {}
-impl Sealed for Releasedir {}
-impl Sealed for Access {}
-
-impl<'o> Operation<'o> for Open {
- type RequestBody = &'o proto::OpenIn;
- type ReplyTail = proto::OpenOutFlags;
-}
-
-impl<'o> Operation<'o> for Release {
- type RequestBody = &'o proto::ReleaseIn;
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Opendir {
- type RequestBody = &'o proto::OpendirIn;
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Releasedir {
- type RequestBody = &'o proto::ReleasedirIn;
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Access {
- type RequestBody = &'o proto::AccessIn;
- type ReplyTail = ();
-}
-
-impl<'o> Request<'o, Open> {
- pub fn flags(&self) -> OpenFlags {
- OpenFlags::from_bits_truncate(self.body.flags.try_into().unwrap_or_default())
- }
-}
-
-impl<'o> Reply<'o, Open> {
- pub fn force_direct_io(&mut self) {
- self.tail |= proto::OpenOutFlags::DIRECT_IO;
- }
-
- pub fn ok(self) -> Done<'o> {
- self.ok_with_handle(0)
- }
-
- pub fn ok_with_handle(self, handle: u64) -> Done<'o> {
- let open_flags = self.tail.bits();
-
- self.single(&proto::OpenOut {
- fh: handle,
- open_flags,
- padding: Default::default(),
- })
- }
-}
-
-impl<'o> Request<'o, Release> {
- pub fn handle(&self) -> u64 {
- self.body.fh
- }
-}
-
-impl<'o> Reply<'o, Release> {
- pub fn ok(self) -> Done<'o> {
- self.empty()
- }
-}
-
-impl<'o> Reply<'o, Opendir> {
- pub fn ok(self) -> Done<'o> {
- self.ok_with_handle(0)
- }
-
- pub fn ok_with_handle(self, handle: u64) -> Done<'o> {
- self.single(&proto::OpenOut {
- fh: handle,
- open_flags: 0,
- padding: Default::default(),
- })
- }
-}
-
-impl<'o> Request<'o, Releasedir> {
- pub fn handle(&self) -> u64 {
- self.body.release_in.fh
- }
-}
-
-impl<'o> Reply<'o, Releasedir> {
- pub fn ok(self) -> Done<'o> {
- self.empty()
- }
-}
-
-impl<'o> Request<'o, Access> {
- pub fn mask(&self) -> AccessFlags {
- AccessFlags::from_bits_truncate(self.body.mask as i32)
- }
-}
-
-impl<'o> Reply<'o, Access> {
- pub fn ok(self) -> Done<'o> {
- self.empty()
- }
-
- pub fn permission_denied(self) -> Done<'o> {
- self.fail(Errno::EACCES)
- }
-}
-
-impl<'o> FromRequest<'o, Open> for proto::OpenOutFlags {
- fn from_request(_request: &Request<'o, Open>) -> Self {
- proto::OpenOutFlags::empty()
- }
-}
diff --git a/src/fuse/ops/rw.rs b/src/fuse/ops/rw.rs
deleted file mode 100644
index 726143b..0000000
--- a/src/fuse/ops/rw.rs
+++ /dev/null
@@ -1,123 +0,0 @@
-use std::{ffi::OsStr, os::unix::ffi::OsStrExt};
-use crate::{proto, util::OutputChain};
-use crate::fuse::{private_trait::Sealed, Done, Operation, Reply, Request};
-use super::FromRequest;
-
-pub enum Readlink {}
-pub enum Read {}
-pub enum Write {}
-pub enum Flush {}
-
-pub struct WriteState {
- size: u32,
-}
-
-impl Sealed for Readlink {}
-impl Sealed for Read {}
-impl Sealed for Write {}
-impl Sealed for Flush {}
-
-impl<'o> Operation<'o> for Readlink {
- type RequestBody = ();
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Read {
- type RequestBody = &'o proto::ReadIn;
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Write {
- type RequestBody = (&'o proto::WriteIn, &'o [u8]);
- type ReplyTail = WriteState;
-}
-
-impl<'o> Operation<'o> for Flush {
- type RequestBody = &'o proto::FlushIn;
- type ReplyTail = ();
-}
-
-impl<'o> Reply<'o, Readlink> {
- /// This inode corresponds to a symbolic link pointing to the given target path.
- pub fn target<T: AsRef<OsStr>>(self, target: T) -> Done<'o> {
- self.chain(OutputChain::tail(&[target.as_ref().as_bytes()]))
- }
-
- /// Same as [`Reply::target()`], except that the target path is taken from disjoint
- /// slices. This involves no additional allocation.
- pub fn gather_target(self, target: &[&[u8]]) -> Done<'o> {
- self.chain(OutputChain::tail(target))
- }
-}
-
-impl<'o> Request<'o, Read> {
- pub fn handle(&self) -> u64 {
- self.body.fh
- }
-
- pub fn offset(&self) -> u64 {
- self.body.offset
- }
-
- pub fn size(&self) -> u32 {
- self.body.size
- }
-}
-
-impl<'o> Reply<'o, Read> {
- pub fn slice(self, data: &[u8]) -> Done<'o> {
- self.chain(OutputChain::tail(&[data]))
- }
-}
-
-impl<'o> Request<'o, Write> {
- pub fn handle(&self) -> u64 {
- self.body.0.fh
- }
-
- pub fn offset(&self) -> u64 {
- self.body.0.offset
- }
-
- pub fn data(&self) -> &[u8] {
- self.body.1
- }
-}
-
-impl<'o> Reply<'o, Write> {
- pub fn all(self) -> Done<'o> {
- let size = self.tail.size;
- self.single(&proto::WriteOut {
- size,
- padding: Default::default(),
- })
- }
-}
-
-impl<'o> Request<'o, Flush> {
- pub fn handle(&self) -> u64 {
- self.body.fh
- }
-}
-
-impl<'o> Reply<'o, Flush> {
- pub fn ok(self) -> Done<'o> {
- self.empty()
- }
-}
-
-impl<'o> FromRequest<'o, Write> for WriteState {
- fn from_request(request: &Request<'o, Write>) -> Self {
- let (body, data) = request.body;
-
- if body.size as usize != data.len() {
- log::warn!(
- "Write size={} differs from data.len={}",
- body.size,
- data.len()
- );
- }
-
- WriteState { size: body.size }
- }
-}
diff --git a/src/fuse/ops/xattr.rs b/src/fuse/ops/xattr.rs
deleted file mode 100644
index 48d10bf..0000000
--- a/src/fuse/ops/xattr.rs
+++ /dev/null
@@ -1,139 +0,0 @@
-use std::ffi::{CStr, OsStr};
-use crate::{proto, util::OutputChain, Errno};
-use crate::fuse::{private_trait::Sealed, Done, Operation, Reply, Request};
-use super::c_to_os;
-
-pub enum Setxattr {}
-pub enum Getxattr {}
-pub enum Listxattr {}
-pub enum Removexattr {}
-
-pub struct XattrReadState {
- size: u32,
-}
-
-impl Sealed for Setxattr {}
-impl Sealed for Getxattr {}
-impl Sealed for Listxattr {}
-impl Sealed for Removexattr {}
-
-impl<'o> Operation<'o> for Setxattr {
- // header, name, value
- type RequestBody = (&'o proto::SetxattrIn, &'o CStr, &'o [u8]);
- type ReplyTail = ();
-}
-
-impl<'o> Operation<'o> for Getxattr {
- type RequestBody = (&'o proto::GetxattrIn, &'o CStr);
- type ReplyTail = XattrReadState;
-}
-
-impl<'o> Operation<'o> for Listxattr {
- type RequestBody = &'o proto::ListxattrIn;
- type ReplyTail = XattrReadState;
-}
-
-impl<'o> Operation<'o> for Removexattr {
- type RequestBody = &'o CStr;
- type ReplyTail = ();
-}
-
-//TODO: flags
-impl<'o> Request<'o, Setxattr> {
- pub fn name(&self) -> &OsStr {
- let (_header, name, _value) = self.body;
- c_to_os(name)
- }
-
- pub fn value(&self) -> &[u8] {
- let (_header, _name, value) = self.body;
- value
- }
-}
-
-impl<'o> Reply<'o, Setxattr> {
- pub fn ok(self) -> Done<'o> {
- self.empty()
- }
-
- pub fn not_found(self) -> Done<'o> {
- self.fail(Errno::ENODATA)
- }
-}
-
-impl<'o> Request<'o, Getxattr> {
- pub fn size(&self) -> u32 {
- self.body.0.size
- }
-
- pub fn name(&self) -> &OsStr {
- c_to_os(self.body.1)
- }
-}
-
-impl<'o> Reply<'o, Getxattr> {
- pub fn slice(self, value: &[u8]) -> Done<'o> {
- let size = value.len().try_into().expect("Extremely large xattr");
- if self.tail.size == 0 {
- return self.value_size(size);
- } else if self.tail.size < size {
- return self.buffer_too_small();
- }
-
- self.chain(OutputChain::tail(&[value]))
- }
-
- pub fn value_size(self, size: u32) -> Done<'o> {
- assert_eq!(self.tail.size, 0);
-
- self.single(&proto::GetxattrOut {
- size,
- padding: Default::default(),
- })
- }
-
- pub fn buffer_too_small(self) -> Done<'o> {
- self.fail(Errno::ERANGE)
- }
-
- pub fn not_found(self) -> Done<'o> {
- self.fail(Errno::ENODATA)
- }
-}
-
-impl<'o> Request<'o, Listxattr> {
- pub fn size(&self) -> u32 {
- self.body.getxattr_in.size
- }
-}
-
-impl<'o> Reply<'o, Listxattr> {
- //TODO: buffered(), gather()
-
- pub fn value_size(self, size: u32) -> Done<'o> {
- assert_eq!(self.tail.size, 0);
-
- self.single(&proto::ListxattrOut {
- getxattr_out: proto::GetxattrOut {
- size,
- padding: Default::default(),
- },
- })
- }
-
- pub fn buffer_too_small(self) -> Done<'o> {
- self.fail(Errno::ERANGE)
- }
-}
-
-impl<'o> Request<'o, Removexattr> {
- pub fn name(&self) -> &OsStr {
- c_to_os(self.body)
- }
-}
-
-impl<'o> Reply<'o, Removexattr> {
- pub fn ok(self) -> Done<'o> {
- self.empty()
- }
-}
diff --git a/src/fuse/session.rs b/src/fuse/session.rs
deleted file mode 100644
index e83a8d4..0000000
--- a/src/fuse/session.rs
+++ /dev/null
@@ -1,559 +0,0 @@
-use std::{
- future::Future,
- io,
- marker::PhantomData,
- ops::ControlFlow,
- os::unix::io::{IntoRawFd, RawFd},
- path::PathBuf,
- sync::{Arc, Mutex},
-};
-
-use nix::{
- fcntl::{fcntl, FcntlArg, OFlag},
- sys::uio::{writev, IoVec},
- unistd::read,
-};
-
-use tokio::{
- io::unix::AsyncFd,
- sync::{broadcast, OwnedSemaphorePermit, Semaphore},
-};
-
-use bytemuck::bytes_of;
-use smallvec::SmallVec;
-
-use crate::{
- mount::{unmount_sync, MountError},
- proto::{self, InHeader, Structured},
- util::{page_size, DumbFd, OutputChain},
- Errno, FuseError, FuseResult,
-};
-
-use super::{
- ops::{self, FromRequest},
- Done, Op, Operation, Reply, Request,
-};
-
-pub struct Start {
- session_fd: DumbFd,
- mountpoint: PathBuf,
-}
-
-pub struct Session {
- session_fd: AsyncFd<RawFd>,
- interrupt_tx: broadcast::Sender<u64>,
- buffers: Mutex<Vec<Buffer>>,
- buffer_semaphore: Arc<Semaphore>,
- buffer_pages: usize,
- mountpoint: Mutex<Option<PathBuf>>,
-}
-
-pub struct Endpoint<'a> {
- session: &'a Arc<Session>,
- local_buffer: Buffer,
-}
-
-pub enum Dispatch<'o> {
- Lookup(Incoming<'o, ops::Lookup>),
- Forget(Incoming<'o, ops::Forget>),
- Getattr(Incoming<'o, ops::Getattr>),
- Readlink(Incoming<'o, ops::Readlink>),
- Open(Incoming<'o, ops::Open>),
- Read(Incoming<'o, ops::Read>),
- Write(Incoming<'o, ops::Write>),
- Statfs(Incoming<'o, ops::Statfs>),
- Release(Incoming<'o, ops::Release>),
- Setxattr(Incoming<'o, ops::Setxattr>),
- Getxattr(Incoming<'o, ops::Getxattr>),
- Listxattr(Incoming<'o, ops::Listxattr>),
- Removexattr(Incoming<'o, ops::Removexattr>),
- Flush(Incoming<'o, ops::Flush>),
- Opendir(Incoming<'o, ops::Opendir>),
- Readdir(Incoming<'o, ops::Readdir>),
- Releasedir(Incoming<'o, ops::Releasedir>),
- Access(Incoming<'o, ops::Access>),
-}
-
-pub struct Incoming<'o, O: Operation<'o>> {
- common: IncomingCommon<'o>,
- _phantom: PhantomData<O>,
-}
-
-pub struct Owned<O> {
- session: Arc<Session>,
- buffer: Buffer,
- header: InHeader,
- _permit: OwnedSemaphorePermit,
- _phantom: PhantomData<O>,
-}
-
-impl Session {
- // Does not seem like 'a can be elided here
- #[allow(clippy::needless_lifetimes)]
- pub fn endpoint<'a>(self: &'a Arc<Self>) -> Endpoint<'a> {
- Endpoint {
- session: self,
- local_buffer: Buffer::new(self.buffer_pages),
- }
- }
-
- pub fn unmount_sync(&self) -> Result<(), MountError> {
- let mountpoint = self.mountpoint.lock().unwrap().take();
- if let Some(mountpoint) = &mountpoint {
- unmount_sync(mountpoint)?;
- }
-
- Ok(())
- }
-
- pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> {
- self.send(unique, 0, output)
- }
-
- pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> {
- if errno <= 0 {
- log::warn!(
- "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG",
- unique,
- errno
- );
-
- errno = Errno::ENOMSG as i32;
- }
-
- self.send(unique, -errno, OutputChain::empty())
- }
-
- pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver<u64> {
- self.interrupt_tx.subscribe()
- }
-
- async fn handshake<F>(&mut self, buffer: &mut Buffer, init: F) -> FuseResult<Handshake<F>>
- where
- F: FnOnce(Op<'_, ops::Init>) -> Done<'_>,
- {
- self.session_fd.readable().await?.retain_ready();
- let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?;
-
- let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?;
- let body = match opcode {
- proto::Opcode::Init => {
- <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes], &header)?
- }
-
- _ => {
- log::error!("First message from kernel is not Init, but {:?}", opcode);
- return Err(FuseError::ProtocolInit);
- }
- };
-
- use std::cmp::Ordering;
- let supported = match body.major.cmp(&proto::MAJOR_VERSION) {
- Ordering::Less => false,
- Ordering::Equal => body.minor >= proto::REQUIRED_MINOR_VERSION,
- Ordering::Greater => {
- let tail = [bytes_of(&proto::MAJOR_VERSION)];
- self.ok(header.unique, OutputChain::tail(&tail))?;
-
- return Ok(Handshake::Restart(init));
- }
- };
-
- //TODO: fake some decency by supporting a few older minor versions
- if !supported {
- log::error!(
- "Unsupported protocol {}.{}; this build requires \
- {major}.{}..={major}.{} (or a greater version \
- through compatibility)",
- body.major,
- body.minor,
- proto::REQUIRED_MINOR_VERSION,
- proto::TARGET_MINOR_VERSION,
- major = proto::MAJOR_VERSION
- );
-
- self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?;
- return Err(FuseError::ProtocolInit);
- }
-
- let request = Request { header, body };
- let reply = Reply {
- session: self,
- unique: header.unique,
- tail: ops::InitState {
- kernel_flags: proto::InitFlags::from_bits_truncate(body.flags),
- buffer_pages: self.buffer_pages,
- },
- };
-
- init((request, reply)).consume();
- Ok(Handshake::Done)
- }
-
- fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> {
- let after_header: usize = output
- .iter()
- .flat_map(<[_]>::iter)
- .copied()
- .map(<[_]>::len)
- .sum();
-
- let length = (std::mem::size_of::<proto::OutHeader>() + after_header) as _;
- let header = proto::OutHeader {
- len: length,
- error,
- unique,
- };
-
- //TODO: Full const generics any time now? Fs::EXPECTED_REQUEST_SEGMENTS
- let header = [bytes_of(&header)];
- let output = output.preceded(&header);
- let buffers: SmallVec<[_; 8]> = output
- .iter()
- .flat_map(<[_]>::iter)
- .copied()
- .filter(|slice| !slice.is_empty())
- .map(IoVec::from_slice)
- .collect();
-
- let written = writev(*self.session_fd.get_ref(), &buffers).map_err(io::Error::from)?;
- if written == length as usize {
- Ok(())
- } else {
- Err(FuseError::ShortWrite)
- }
- }
-}
-
-impl Drop for Start {
- fn drop(&mut self) {
- if !self.mountpoint.as_os_str().is_empty() {
- let _ = unmount_sync(&self.mountpoint);
- }
- }
-}
-
-impl Drop for Session {
- fn drop(&mut self) {
- if let Some(mountpoint) = self.mountpoint.get_mut().unwrap().take() {
- let _ = unmount_sync(&mountpoint);
- }
-
- drop(DumbFd(*self.session_fd.get_ref())); // Close
- }
-}
-
-impl<'o> Dispatch<'o> {
- pub fn op(self) -> Op<'o> {
- use Dispatch::*;
-
- let common = match self {
- Lookup(incoming) => incoming.common,
- Forget(incoming) => incoming.common,
- Getattr(incoming) => incoming.common,
- Readlink(incoming) => incoming.common,
- Open(incoming) => incoming.common,
- Read(incoming) => incoming.common,
- Write(incoming) => incoming.common,
- Statfs(incoming) => incoming.common,
- Release(incoming) => incoming.common,
- Setxattr(incoming) => incoming.common,
- Getxattr(incoming) => incoming.common,
- Listxattr(incoming) => incoming.common,
- Removexattr(incoming) => incoming.common,
- Flush(incoming) => incoming.common,
- Opendir(incoming) => incoming.common,
- Readdir(incoming) => incoming.common,
- Releasedir(incoming) => incoming.common,
- Access(incoming) => incoming.common,
- };
-
- common.into_generic_op()
- }
-}
-
-impl Endpoint<'_> {
- pub async fn receive<'o, F, Fut>(&'o mut self, dispatcher: F) -> FuseResult<ControlFlow<()>>
- where
- F: FnOnce(Dispatch<'o>) -> Fut,
- Fut: Future<Output = Done<'o>>,
- {
- let buffer = &mut self.local_buffer.0;
- let bytes = loop {
- let session_fd = &self.session.session_fd;
-
- let mut readable = tokio::select! {
- readable = session_fd.readable() => readable?,
-
- _ = session_fd.writable() => {
- self.session.mountpoint.lock().unwrap().take();
- return Ok(ControlFlow::Break(()));
- }
- };
-
- let mut read = |fd: &AsyncFd<RawFd>| read(*fd.get_ref(), buffer);
- let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) {
- Ok(result) => result,
- Err(_) => continue,
- };
-
- match result {
- // Interrupted
- //TODO: libfuse docs say that this has some side effects
- Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
-
- result => break result,
- }
- };
-
- let (header, opcode) = InHeader::from_bytes(&buffer[..bytes?])?;
- let common = IncomingCommon {
- session: self.session,
- buffer: &mut self.local_buffer,
- header,
- };
-
- let dispatch = {
- use proto::Opcode::*;
-
- macro_rules! dispatch {
- ($op:ident) => {
- Dispatch::$op(Incoming {
- common,
- _phantom: PhantomData,
- })
- };
- }
-
- match opcode {
- Destroy => return Ok(ControlFlow::Break(())),
-
- Lookup => dispatch!(Lookup),
- Forget => dispatch!(Forget),
- Getattr => dispatch!(Getattr),
- Readlink => dispatch!(Readlink),
- Open => dispatch!(Open),
- Read => dispatch!(Read),
- Write => dispatch!(Write),
- Statfs => dispatch!(Statfs),
- Release => dispatch!(Release),
- Setxattr => dispatch!(Setxattr),
- Getxattr => dispatch!(Getxattr),
- Listxattr => dispatch!(Listxattr),
- Removexattr => dispatch!(Removexattr),
- Flush => dispatch!(Flush),
- Opendir => dispatch!(Opendir),
- Readdir => dispatch!(Readdir),
- Releasedir => dispatch!(Releasedir),
- Access => dispatch!(Access),
- BatchForget => dispatch!(Forget),
- ReaddirPlus => dispatch!(Readdir),
-
- _ => {
- log::warn!("Not implemented: {}", common.header);
-
- let (_request, reply) = common.into_generic_op();
- reply.not_implemented().consume();
-
- return Ok(ControlFlow::Continue(()));
- }
- }
- };
-
- dispatcher(dispatch).await.consume();
- Ok(ControlFlow::Continue(()))
- }
-}
-
-impl Start {
- pub async fn start<F>(mut self, mut init: F) -> FuseResult<Arc<Session>>
- where
- F: FnOnce(Op<'_, ops::Init>) -> Done<'_>,
- {
- let mountpoint = std::mem::take(&mut self.mountpoint);
- let session_fd = self.session_fd.take().into_raw_fd();
-
- let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE;
- fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap();
-
- let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY);
-
- let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO
- let buffer_count = SHARED_BUFFERS; //TODO
- let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages))
- .take(buffer_count)
- .collect();
-
- let mut session = Session {
- session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?,
- interrupt_tx,
- buffers: Mutex::new(buffers),
- buffer_semaphore: Arc::new(Semaphore::new(buffer_count)),
- buffer_pages,
- mountpoint: Mutex::new(Some(mountpoint)),
- };
-
- let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap();
-
- loop {
- init = match session.handshake(&mut init_buffer, init).await? {
- Handshake::Restart(init) => init,
- Handshake::Done => {
- session.buffers.get_mut().unwrap().push(init_buffer);
- break Ok(Arc::new(session));
- }
- };
- }
- }
-
- pub fn unmount_sync(mut self) -> Result<(), MountError> {
- // This prevents Start::drop() from unmounting a second time
- let mountpoint = std::mem::take(&mut self.mountpoint);
- unmount_sync(&mountpoint)
- }
-
- pub(crate) fn new(session_fd: DumbFd, mountpoint: PathBuf) -> Self {
- Start {
- session_fd,
- mountpoint,
- }
- }
-}
-
-impl<'o, O: Operation<'o>> Incoming<'o, O>
-where
- O::ReplyTail: FromRequest<'o, O>,
-{
- pub fn op(self) -> Result<Op<'o, O>, Done<'o>> {
- try_op(
- self.common.session,
- &self.common.buffer.0,
- self.common.header,
- )
- }
-
- pub async fn owned(self) -> (Done<'o>, Owned<O>) {
- let session = self.common.session;
-
- let (buffer, permit) = {
- let semaphore = Arc::clone(&session.buffer_semaphore);
- let permit = semaphore
- .acquire_owned()
- .await
- .expect("Buffer semaphore error");
-
- let mut buffers = session.buffers.lock().unwrap();
- let buffer = buffers.pop().expect("Buffer semaphore out of sync");
- let buffer = std::mem::replace(self.common.buffer, buffer);
-
- (buffer, permit)
- };
-
- let owned = Owned {
- session: Arc::clone(session),
- buffer,
- header: self.common.header,
- _permit: permit,
- _phantom: PhantomData,
- };
-
- (Done::new(), owned)
- }
-}
-
-impl<O: for<'o> Operation<'o>> Owned<O>
-where
- for<'o> <O as Operation<'o>>::ReplyTail: FromRequest<'o, O>,
-{
- pub async fn op<'o, F, Fut>(&'o self, handler: F)
- where
- F: FnOnce(Op<'o, O>) -> Fut,
- Fut: Future<Output = Done<'o>>,
- {
- match try_op(&self.session, &self.buffer.0, self.header) {
- Ok(op) => handler(op).await.consume(),
- Err(done) => done.consume(),
- }
- }
-}
-
-impl<O> Drop for Owned<O> {
- fn drop(&mut self) {
- if let Ok(mut buffers) = self.session.buffers.lock() {
- let empty = Buffer(Vec::new().into_boxed_slice());
- buffers.push(std::mem::replace(&mut self.buffer, empty));
- }
- }
-}
-
-const INTERRUPT_BROADCAST_CAPACITY: usize = 32;
-const SHARED_BUFFERS: usize = 32;
-const HEADER_END: usize = std::mem::size_of::<InHeader>();
-
-struct IncomingCommon<'o> {
- session: &'o Arc<Session>,
- buffer: &'o mut Buffer,
- header: InHeader,
-}
-
-enum Handshake<F> {
- Done,
- Restart(F),
-}
-
-struct Buffer(Box<[u8]>);
-
-impl<'o> IncomingCommon<'o> {
- fn into_generic_op(self) -> Op<'o> {
- let request = Request {
- header: self.header,
- body: (),
- };
-
- let reply = Reply {
- session: self.session,
- unique: self.header.unique,
- tail: (),
- };
-
- (request, reply)
- }
-}
-
-impl Buffer {
- fn new(pages: usize) -> Self {
- Buffer(vec![0; pages * page_size()].into_boxed_slice())
- }
-}
-
-fn try_op<'o, O: Operation<'o>>(
- session: &'o Session,
- bytes: &'o [u8],
- header: InHeader,
-) -> Result<Op<'o, O>, Done<'o>>
-where
- O::ReplyTail: FromRequest<'o, O>,
-{
- let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize], &header) {
- Ok(body) => body,
- Err(error) => {
- log::error!("Parsing request {}: {}", header, error);
- let reply = Reply::<ops::Any> {
- session,
- unique: header.unique,
- tail: (),
- };
-
- return Err(reply.io_error());
- }
- };
-
- let request = Request { header, body };
- let reply = Reply {
- session,
- unique: header.unique,
- tail: FromRequest::from_request(&request),
- };
-
- Ok((request, reply))
-}