summaryrefslogtreecommitdiff
path: root/src/fuse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/fuse/fs.rs127
-rw-r--r--src/fuse/io.rs319
-rw-r--r--src/fuse/mod.rs87
-rw-r--r--src/fuse/mount.rs160
-rw-r--r--src/fuse/ops.rs366
-rw-r--r--src/fuse/session.rs569
6 files changed, 1628 insertions, 0 deletions
diff --git a/src/fuse/fs.rs b/src/fuse/fs.rs
new file mode 100644
index 0000000..4cf3282
--- /dev/null
+++ b/src/fuse/fs.rs
@@ -0,0 +1,127 @@
+use async_trait::async_trait;
+use nix::errno::Errno;
+
+use std::{
+ num::NonZeroUsize,
+ ops::{Deref, DerefMut},
+ sync::Arc,
+};
+
+use crate::{Ino, TimeToLive};
+
+use super::{
+ io::{Attrs, EntryType},
+ ops::*,
+ Done, Op, Reply,
+};
+
+#[async_trait]
+pub trait Fuse: Sized + Send + Sync + 'static {
+ type Inode: Inode<Fuse = Self> + ?Sized;
+ type Farc: Deref<Target = Self::Inode> + Clone + Send + Sync = Arc<Self::Inode>;
+
+ async fn init<'o>(&self, reply: Reply<'o, Self, Init>) -> Done<'o>;
+
+ async fn statfs<'o>(&self, (_, reply, _): Op<'o, Self, Statfs>) -> Done<'o> {
+ reply.not_implemented()
+ }
+
+ fn request_buffers(&self) -> NonZeroUsize {
+ NonZeroUsize::new(16).unwrap()
+ }
+
+ fn request_buffer_pages(&self) -> NonZeroUsize {
+ NonZeroUsize::new(4).unwrap()
+ }
+}
+
+#[async_trait]
+pub trait Inode: Send + Sync {
+ type Fuse: Fuse<Inode = Self>;
+
+ fn ino(self: &FarcTo<Self>) -> Ino;
+ fn attrs(self: &FarcTo<Self>) -> (Attrs, TimeToLive);
+ fn inode_type(self: &FarcTo<Self>) -> EntryType;
+
+ fn direct_io<'o>(self: &FarcTo<Self>) -> bool {
+ false
+ }
+
+ fn access<'o>(self: &FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Access>) -> Done<'o> {
+ reply.not_implemented()
+ }
+
+ async fn lookup<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Lookup>) -> Done<'o> {
+ reply.not_implemented()
+ }
+
+ async fn readlink<'o>(
+ self: FarcTo<Self>,
+ (_, reply, _): Op<'o, Self::Fuse, Readlink>,
+ ) -> Done<'o> {
+ reply.not_implemented()
+ }
+
+ async fn open<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Open>) -> Done<'o> {
+ // Calling not_implemented() here would ignore direct_io() and similar flags
+ reply.ok()
+ }
+
+ async fn opendir<'o>(
+ self: FarcTo<Self>,
+ (_, reply, _): Op<'o, Self::Fuse, Opendir>,
+ ) -> Done<'o> {
+ reply.not_implemented()
+ }
+
+ async fn readdir<'o>(
+ self: FarcTo<Self>,
+ (_, reply, _): Op<'o, Self::Fuse, Readdir>,
+ ) -> Done<'o> {
+ reply.not_implemented()
+ }
+}
+
+#[async_trait]
+pub trait Tape: Send + Sync {
+ type Fuse: Fuse;
+
+ async fn seek(self: &mut Head<Self>, offset: u64) -> Result<(), Errno>;
+
+ async fn rewind(self: &mut Head<Self>) -> Result<(), Errno> {
+ self.seek(0).await
+ }
+
+ async fn read<'o>(self: &mut Head<Self>, (_, reply, _): Op<'o, Self::Fuse, Read>) -> Done<'o> {
+ reply.not_implemented()
+ }
+
+ async fn write<'o>(
+ self: &mut Head<Self>,
+ (_, reply, _): Op<'o, Self::Fuse, Write>,
+ ) -> Done<'o> {
+ reply.not_implemented()
+ }
+}
+
+pub type FarcTo<I> = <<I as Inode>::Fuse as Fuse>::Farc;
+
+pub struct Head<T: Tape + ?Sized> {
+ offset: u64,
+ inode: <T::Fuse as Fuse>::Farc,
+ tape: T,
+}
+
+impl<T: Tape + ?Sized> Deref for Head<T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ &self.tape
+ }
+}
+
+impl<T: Tape + ?Sized> DerefMut for Head<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.tape
+ }
+}
diff --git a/src/fuse/io.rs b/src/fuse/io.rs
new file mode 100644
index 0000000..450d9f7
--- /dev/null
+++ b/src/fuse/io.rs
@@ -0,0 +1,319 @@
+use bytemuck::Zeroable;
+use nix::{errno::Errno, sys::stat::SFlag};
+
+use std::{
+ borrow::Cow,
+ convert::Infallible,
+ ffi::OsStr,
+ future::Future,
+ ops::{ControlFlow, FromResidual, Try},
+};
+
+use crate::{proto, Ino, TimeToLive, Timestamp};
+
+use super::{
+ fs::{Fuse, Inode},
+ session, Done, Operation, Reply, Request,
+};
+
+#[doc(no_inline)]
+pub use nix::{
+ dir::Type as EntryType,
+ sys::stat::Mode,
+ unistd::{AccessFlags, Gid, Pid, Uid},
+};
+
+pub enum Interruptible<'o, Fs: Fuse, O: Operation<'o, Fs>, T> {
+ Completed(Reply<'o, Fs, O>, T),
+ Interrupted(Done<'o>),
+}
+
+#[derive(Clone)]
+pub struct Attrs(proto::Attrs);
+
+pub struct Entry<'a, Ref> {
+ pub offset: u64,
+ pub name: Cow<'a, OsStr>,
+ pub inode: Ref,
+ pub ttl: TimeToLive,
+}
+
+pub struct FsInfo(proto::StatfsOut);
+
+impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> {
+ pub fn ino(&self) -> Ino {
+ Ino(self.header.ino)
+ }
+
+ pub fn generation(&self) -> u64 {
+ 0
+ }
+
+ pub fn uid(&self) -> Uid {
+ Uid::from_raw(self.header.uid)
+ }
+
+ pub fn gid(&self) -> Gid {
+ Gid::from_raw(self.header.gid)
+ }
+
+ pub fn pid(&self) -> Pid {
+ Pid::from_raw(self.header.pid as i32)
+ }
+}
+
+impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
+ pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, Fs, O, T>
+ where
+ F: Future<Output = T>,
+ {
+ tokio::pin!(f);
+ let mut rx = session::interrupt_rx(&self.session);
+
+ use Interruptible::*;
+ loop {
+ tokio::select! {
+ output = &mut f => break Completed(self, output),
+
+ result = rx.recv() => match result {
+ Ok(unique) if unique == self.unique => {
+ break Interrupted(self.interrupted());
+ }
+
+ _ => continue,
+ }
+ }
+ }
+ }
+
+ pub fn fallible<T>(self, result: Result<T, Errno>) -> Result<(Self, T), Done<'o>> {
+ match result {
+ Ok(t) => Ok((self, t)),
+ Err(errno) => Err(self.fail(errno)),
+ }
+ }
+
+ pub fn fail(mut self, errno: Errno) -> Done<'o> {
+ let errno = errno as i32;
+ O::consume_errno(errno, &mut self.tail);
+
+ Done::from_result(session::fail(&self.session, self.unique, errno))
+ }
+
+ pub fn not_implemented(self) -> Done<'o> {
+ self.fail(Errno::ENOSYS)
+ }
+
+ pub fn io_error(self) -> Done<'o> {
+ self.fail(Errno::EIO)
+ }
+
+ pub fn invalid_argument(self) -> Done<'o> {
+ self.fail(Errno::EINVAL)
+ }
+
+ pub fn interrupted(self) -> Done<'o> {
+ self.fail(Errno::EINTR)
+ }
+}
+
+impl<'o, Fs, O> From<(Reply<'o, Fs, O>, Errno)> for Done<'o>
+where
+ Fs: Fuse,
+ O: Operation<'o, Fs>,
+{
+ fn from((reply, errno): (Reply<'o, Fs, O>, Errno)) -> Done<'o> {
+ reply.fail(errno)
+ }
+}
+
+impl<'o> FromResidual<Done<'o>> for Done<'o> {
+ fn from_residual(residual: Done<'o>) -> Self {
+ residual
+ }
+}
+
+impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> {
+ fn from_residual(residual: Result<Infallible, T>) -> Self {
+ match residual {
+ Ok(_) => unreachable!(),
+ Err(t) => t.into(),
+ }
+ }
+}
+
+impl<'o, Fs, O> FromResidual<Interruptible<'o, Fs, O, Infallible>> for Done<'o>
+where
+ Fs: Fuse,
+ O: Operation<'o, Fs>,
+{
+ fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self {
+ match residual {
+ Interruptible::Completed(_, _) => unreachable!(),
+ Interruptible::Interrupted(done) => done,
+ }
+ }
+}
+
+impl Try for Done<'_> {
+ type Output = Self;
+ type Residual = Self;
+
+ fn from_output(output: Self::Output) -> Self {
+ output
+ }
+
+ fn branch(self) -> ControlFlow<Self::Residual, Self::Output> {
+ ControlFlow::Break(self)
+ }
+}
+
+impl<'o, Fs, O, T> FromResidual<Interruptible<'o, Fs, O, Infallible>>
+ for Interruptible<'o, Fs, O, T>
+where
+ Fs: Fuse,
+ O: Operation<'o, Fs>,
+{
+ fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self {
+ use Interruptible::*;
+
+ match residual {
+ Completed(_, _) => unreachable!(),
+ Interrupted(done) => Interrupted(done),
+ }
+ }
+}
+
+impl<'o, Fs, O, T> Try for Interruptible<'o, Fs, O, T>
+where
+ Fs: Fuse,
+ O: Operation<'o, Fs>,
+{
+ type Output = (Reply<'o, Fs, O>, T);
+ type Residual = Interruptible<'o, Fs, O, Infallible>;
+
+ fn from_output((reply, t): Self::Output) -> Self {
+ Self::Completed(reply, t)
+ }
+
+ fn branch(self) -> ControlFlow<Self::Residual, Self::Output> {
+ use Interruptible::*;
+
+ match self {
+ Completed(reply, t) => ControlFlow::Continue((reply, t)),
+ Interrupted(done) => ControlFlow::Break(Interrupted(done)),
+ }
+ }
+}
+
+impl Attrs {
+ pub fn size(self, size: u64) -> Self {
+ Attrs(proto::Attrs { size, ..self.0 })
+ }
+
+ pub fn owner(self, uid: Uid, gid: Gid) -> Self {
+ Attrs(proto::Attrs {
+ uid: uid.as_raw(),
+ gid: gid.as_raw(),
+ ..self.0
+ })
+ }
+
+ pub fn mode(self, mode: Mode) -> Self {
+ Attrs(proto::Attrs {
+ mode: mode.bits(),
+ ..self.0
+ })
+ }
+
+ pub fn blocks(self, blocks: u64, block_size: u32) -> Self {
+ Attrs(proto::Attrs {
+ blocks,
+ blksize: block_size,
+ ..self.0
+ })
+ }
+
+ pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self {
+ Attrs(proto::Attrs {
+ atime: access.seconds,
+ mtime: modify.seconds,
+ ctime: change.seconds,
+ atimensec: access.nanoseconds,
+ mtimensec: modify.nanoseconds,
+ ctimensec: change.nanoseconds,
+ ..self.0
+ })
+ }
+
+ pub fn links(self, links: u32) -> Self {
+ Attrs(proto::Attrs {
+ nlink: links,
+ ..self.0
+ })
+ }
+
+ pub(crate) fn finish<Fs: Fuse>(self, inode: &Fs::Farc) -> proto::Attrs {
+ let Ino(ino) = <Fs as Fuse>::Inode::ino(inode);
+ let inode_type = match <Fs as Fuse>::Inode::inode_type(inode) {
+ EntryType::Fifo => SFlag::S_IFIFO,
+ EntryType::CharacterDevice => SFlag::S_IFCHR,
+ EntryType::Directory => SFlag::S_IFDIR,
+ EntryType::BlockDevice => SFlag::S_IFBLK,
+ EntryType::File => SFlag::S_IFREG,
+ EntryType::Symlink => SFlag::S_IFLNK,
+ EntryType::Socket => SFlag::S_IFSOCK,
+ };
+
+ proto::Attrs {
+ ino,
+ mode: self.0.mode | inode_type.bits(),
+ ..self.0
+ }
+ }
+}
+
+impl Default for Attrs {
+ fn default() -> Self {
+ Attrs(Zeroable::zeroed()).links(1)
+ }
+}
+
+impl FsInfo {
+ pub fn blocks(self, size: u32, total: u64, free: u64, available: u64) -> Self {
+ FsInfo(proto::StatfsOut {
+ bsize: size,
+ blocks: total,
+ bfree: free,
+ bavail: available,
+ ..self.0
+ })
+ }
+
+ pub fn inodes(self, total: u64, free: u64) -> Self {
+ FsInfo(proto::StatfsOut {
+ files: total,
+ ffree: free,
+ ..self.0
+ })
+ }
+
+ pub fn filenames(self, max: u32) -> Self {
+ FsInfo(proto::StatfsOut {
+ namelen: max,
+ ..self.0
+ })
+ }
+}
+
+impl Default for FsInfo {
+ fn default() -> Self {
+ FsInfo(Zeroable::zeroed())
+ }
+}
+
+impl From<FsInfo> for proto::StatfsOut {
+ fn from(FsInfo(statfs): FsInfo) -> proto::StatfsOut {
+ statfs
+ }
+}
diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs
new file mode 100644
index 0000000..0e39e6b
--- /dev/null
+++ b/src/fuse/mod.rs
@@ -0,0 +1,87 @@
+use std::{
+ collections::HashMap,
+ marker::PhantomData,
+ os::unix::io::RawFd,
+ sync::{Arc, Mutex},
+};
+
+use tokio::{
+ io::unix::AsyncFd,
+ sync::{broadcast, Notify, Semaphore},
+};
+
+use crate::{proto, util::DumbFd, FuseResult, Ino};
+
+pub mod io;
+
+#[doc(cfg(feature = "server"))]
+pub mod fs;
+
+#[doc(cfg(feature = "server"))]
+pub mod ops;
+
+#[doc(cfg(feature = "mount"))]
+pub mod mount;
+
+mod session;
+use fs::Fuse;
+
+#[doc(cfg(feature = "server"))]
+pub struct Session<Fs: Fuse> {
+ _fusermount_fd: DumbFd,
+ session_fd: AsyncFd<RawFd>,
+ proto_minor: u32,
+ fs: Fs,
+ input_semaphore: Arc<Semaphore>,
+ large_buffers: Mutex<Vec<Box<[u8]>>>,
+ known: Mutex<HashMap<Ino, (Fs::Farc, u64)>>,
+ destroy: Notify,
+ interrupt_tx: broadcast::Sender<u64>,
+}
+
+#[doc(cfg(feature = "server"))]
+pub struct Start {
+ fusermount_fd: DumbFd,
+ session_fd: DumbFd,
+}
+
+mod private_trait {
+ pub trait Operation<'o, Fs: super::Fuse> {
+ type RequestBody = ();
+ type ReplyTail = ();
+
+ fn consume_errno(_errno: i32, _tail: &mut Self::ReplyTail) {}
+ }
+}
+
+use private_trait::Operation;
+
+#[doc(cfg(feature = "server"))]
+pub type Op<'o, Fs, O> = (Request<'o, Fs, O>, Reply<'o, Fs, O>, &'o Arc<Session<Fs>>);
+
+#[doc(cfg(feature = "server"))]
+pub struct Request<'o, Fs: Fuse, O: Operation<'o, Fs>> {
+ header: &'o proto::InHeader,
+ body: O::RequestBody,
+}
+
+#[doc(cfg(feature = "server"))]
+pub struct Reply<'o, Fs: Fuse, O: Operation<'o, Fs>> {
+ session: &'o Session<Fs>,
+ unique: u64,
+ tail: O::ReplyTail,
+}
+
+#[must_use]
+#[doc(cfg(feature = "server"))]
+pub struct Done<'o>(FuseResult<PhantomData<&'o ()>>);
+
+impl Done<'_> {
+ fn from_result(result: FuseResult<()>) -> Self {
+ Done(result.map(|()| PhantomData))
+ }
+
+ fn into_result(self) -> FuseResult<()> {
+ self.0.map(|PhantomData| ())
+ }
+}
diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs
new file mode 100644
index 0000000..ebf7e5d
--- /dev/null
+++ b/src/fuse/mount.rs
@@ -0,0 +1,160 @@
+use std::{
+ ffi::{OsStr, OsString},
+ os::unix::{
+ ffi::OsStrExt,
+ io::{AsRawFd, IntoRawFd, RawFd},
+ net::UnixStream,
+ },
+ process::Command,
+};
+
+use nix::{
+ self, cmsg_space,
+ fcntl::{fcntl, FcntlArg, FdFlag},
+ sys::socket::{recvmsg, ControlMessageOwned, MsgFlags},
+};
+
+use quick_error::quick_error;
+
+use super::Start;
+use crate::util::{from_nix_error, DumbFd};
+
+quick_error! {
+ #[derive(Debug)]
+ pub enum MountError {
+ Io(err: std::io::Error) { from() }
+ Fusermount { display("fusermount failed") }
+ }
+}
+
+#[derive(Default)]
+pub struct Options(OsString);
+
+impl Options {
+ pub fn fs_name<O: AsRef<OsStr>>(&mut self, fs_name: O) -> &mut Self {
+ self.push_key_value("fsname", fs_name)
+ }
+
+ pub fn read_only(&mut self) -> &mut Self {
+ self.push("ro")
+ }
+
+ pub fn push<O: AsRef<OsStr>>(&mut self, option: O) -> &mut Self {
+ self.push_parts(&[option.as_ref()])
+ }
+
+ pub fn push_key_value<K, V>(&mut self, key: K, value: V) -> &mut Self
+ where
+ K: AsRef<OsStr>,
+ V: AsRef<OsStr>,
+ {
+ let (key, value) = (key.as_ref(), value.as_ref());
+
+ let assert_valid = |part: &OsStr| {
+ let bytes = part.as_bytes();
+ assert!(
+ !bytes.is_empty() && bytes.iter().all(|b| !matches!(*b, b',' | b'=')),
+ "invalid key or value: {}",
+ part.to_string_lossy()
+ );
+ };
+
+ assert_valid(key);
+ assert_valid(value);
+
+ self.push_parts(&[key, OsStr::new("="), value])
+ }
+
+ fn push_parts(&mut self, segment: &[&OsStr]) -> &mut Self {
+ if !self.0.is_empty() {
+ self.0.push(",");
+ }
+
+ let start = self.0.as_bytes().len();
+ segment.iter().for_each(|part| self.0.push(part));
+
+ let bytes = self.0.as_bytes();
+ let last = bytes.len() - 1;
+
+ assert!(
+ last >= start && bytes[start] != b',' && bytes[last] != b',',
+ "invalid option string: {}",
+ OsStr::from_bytes(&bytes[start..]).to_string_lossy()
+ );
+
+ self
+ }
+}
+
+impl<O: AsRef<OsStr>> Extend<O> for Options {
+ fn extend<I: IntoIterator<Item = O>>(&mut self, iter: I) {
+ iter.into_iter().for_each(|option| {
+ self.push(option);
+ });
+ }
+}
+
+pub fn mount_sync<M>(mountpoint: M, options: &Options) -> Result<Start, MountError>
+where
+ M: AsRef<OsStr>,
+{
+ let (left_side, right_side) = UnixStream::pair()?;
+
+ // The fusermount protocol requires us to preserve right_fd across execve()
+ let right_fd = right_side.as_raw_fd();
+ fcntl(
+ right_fd,
+ FcntlArg::F_SETFD(
+ FdFlag::from_bits(fcntl(right_fd, FcntlArg::F_GETFD).unwrap()).unwrap()
+ & !FdFlag::FD_CLOEXEC,
+ ),
+ )
+ .unwrap();
+
+ let mut command = Command::new("fusermount3");
+ if !options.0.is_empty() {
+ command.args(&[OsStr::new("-o"), &options.0, mountpoint.as_ref()]);
+ } else {
+ command.arg(mountpoint);
+ };
+
+ let mut fusermount = command.env("_FUSE_COMMFD", right_fd.to_string()).spawn()?;
+
+ // recvmsg() should fail if fusermount exits (last open fd is closed)
+ drop(right_side);
+
+ let session_fd = (|| {
+ let mut buffer = cmsg_space!(RawFd);
+ let message = recvmsg(
+ left_side.as_raw_fd(),
+ &[],
+ Some(&mut buffer),
+ MsgFlags::empty(),
+ )
+ .map_err(from_nix_error)?;
+
+ let session_fd = match message.cmsgs().next() {
+ Some(ControlMessageOwned::ScmRights(fds)) => fds.into_iter().next(),
+ _ => None,
+ };
+
+ session_fd.ok_or(MountError::Fusermount)
+ })();
+
+ match session_fd {
+ Ok(session_fd) => {
+ let fusermount_fd = DumbFd(left_side.into_raw_fd());
+ let session_fd = DumbFd(session_fd);
+ Ok(Start {
+ fusermount_fd,
+ session_fd,
+ })
+ }
+
+ Err(error) => {
+ drop(left_side);
+ fusermount.wait()?;
+ Err(error)
+ }
+ }
+}
diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs
new file mode 100644
index 0000000..7a0c18c
--- /dev/null
+++ b/src/fuse/ops.rs
@@ -0,0 +1,366 @@
+use bytemuck::{bytes_of, Pod, Zeroable};
+use futures_util::stream::{Stream, StreamExt, TryStreamExt};
+use nix::sys::stat::SFlag;
+
+use std::{
+ borrow::Borrow,
+ ffi::{CStr, OsStr},
+ os::unix::ffi::OsStrExt,
+};
+
+use crate::{proto, util::OutputChain, Errno, Ino, TimeToLive};
+
+use super::{
+ fs::{Fuse, Inode, Tape},
+ io::{AccessFlags, Entry, EntryType, FsInfo},
+ session, Done, Operation, Reply, Request,
+};
+
+macro_rules! op {
+ { $name:ident $operation:tt $(,)+ } => {
+ pub struct $name(());
+
+ impl<'o, Fs: Fuse> Operation<'o, Fs> for $name $operation
+ };
+
+ { $name:ident $operation:tt, Request $request:tt $($next:tt)+ } => {
+ impl<'o, Fs: Fuse> Request<'o, Fs, $name> $request
+
+ op! { $name $operation $($next)+ }
+ };
+
+ { $name:ident $operation:tt, Reply $reply:tt $($next:tt)+ } => {
+ impl<'o, Fs: Fuse> Reply<'o, Fs, $name> $reply
+
+ op! { $name $operation $($next)+ }
+ };
+}
+
+op! {
+ Lookup {
+ // name()
+ type RequestBody = &'o CStr;
+ },
+
+ Request {
+ /// Returns the name of the entry being looked up in this directory.
+ pub fn name(&self) -> &OsStr {
+ c_to_os(self.body)
+ }
+ },
+
+ Reply {
+ /// The requested entry was found and a `Farc` was successfully determined from it. The
+ /// FUSE client will become aware of the found inode if it wasn't before. This result may
+ /// be cached by the client for up to the given TTL.
+ pub fn found(self, entry: &Fs::Farc, ttl: TimeToLive) -> Done<'o> {
+ let (attrs, attrs_ttl) = <Fs as Fuse>::Inode::attrs(entry);
+ session::unveil(&self.session, entry);
+
+ self.single(&make_entry(
+ (<Fs as Fuse>::Inode::ino(entry), ttl),
+ (attrs.finish::<Fs>(entry), attrs_ttl),
+ ))
+ }
+
+ /// The requested entry was not found in this directory. The FUSE clint may include this
+ /// response in negative cache for up to the given TTL.
+ pub fn not_found(self, ttl: TimeToLive) -> Done<'o> {
+ self.single(&make_entry((Ino::NULL, ttl), (Zeroable::zeroed(), Default::default())))
+ }
+
+ /// The requested entry was not found in this directory, but unlike [`Reply::not_found()`]
+ /// this does not report back a TTL to the FUSE client. The client should not cache the
+ /// response.
+ pub fn not_found_uncached(self) -> Done<'o> {
+ self.fail(Errno::ENOENT)
+ }
+ },
+}
+
+op! {
+ Readlink {},
+
+ Reply {
+ /// This inode corresponds to a symbolic link pointing to the given target path.
+ pub fn target(self, target: &OsStr) -> Done<'o> {
+ self.chain(OutputChain::tail(&[target.as_bytes()]))
+ }
+
+ /// Same as [`Reply::target()`], except that the target path is taken from disjoint
+ /// slices. This involves no additional allocation.
+ pub fn gather_target(self, target: &[&OsStr]) -> Done<'o> {
+ //FIXME: Likely UB
+ self.chain(OutputChain::tail(unsafe { std::mem::transmute(target) }))
+ }
+ },
+}
+
+op! {
+ Open {
+ type RequestBody = &'o proto::OpenIn;
+ type ReplyTail = (Ino, proto::OpenOutFlags);
+ },
+
+ Reply {
+ /// The iinode may now be accessed.
+ pub fn ok(self) -> Done<'o> {
+ self.ok_with_handle(0)
+ }
+
+ /*pub fn tape<R: Tape<Fuse = Fs>>(self, reel: R) -> Done<'o> {
+ let (ino, _) = self.tail;
+ self.ok_with_handle(session::allocate_handle(&self.session, ino, reel))
+ }*/
+
+ fn ok_with_handle(self, handle: u64) -> Done<'o> {
+ let (_, flags) = self.tail;
+ self.single(&proto::OpenOut {
+ fh: handle,
+ open_flags: flags.bits(),
+ padding: Default::default(),
+ })
+ }
+ },
+}
+
+op! { Read {}, }
+/*op! {
+ Read {
+ type RequestBody = &'o proto::ReadIn;
+ type ReplyTail = &'o mut OutputBytes<'o>;
+ },
+
+ Request {
+ pub fn offset(&self) -> u64 {
+ self.body.offset
+ }
+
+ pub fn size(&self) -> u32 {
+ self.body.size
+ }
+ },
+
+ Reply {
+ pub fn remaining(&self) -> u64 {
+ self.tail.remaining()
+ }
+
+ pub fn end(self) -> Done<'o> {
+ if self.tail.ready() {
+ self.chain(OutputChain::tail(self.tail.segments()))
+ } else {
+ // The read() handler will be invoked again with same OutputBytes
+ self.done()
+ }
+ }
+
+ pub fn hole(self, size: u64) -> Result<Self, Done<'o>> {
+ self.tail
+ }
+
+ pub fn copy(self, data: &[u8]) -> Result<Self, Done<'o>> {
+ self.self_or_done(self.tail.copy(data))
+ }
+
+ pub fn put(self, data: &'o [u8]) -> Result<Self, Done<'o>> {
+ self.self_or_done(self.tail.put(data))
+ }
+
+ pub fn gather(self, data: &'o [&'o [u8]]) -> Result<Self, Done<'o>> {
+ self.self_or_done(self.tail.gather(data))
+ }
+
+ fn self_or_done(self, capacity: OutputCapacity) -> Result<Self, Done<'o>> {
+ match capacity {
+ OutputCapacity::Available => Ok(self),
+ OutputCapacity::Filled => Err(self.done()),
+ }
+ }
+ },
+}*/
+
+op! {
+ Write {
+ type RequestBody = &'o proto::WriteIn;
+ },
+}
+
+op! {
+ Init {
+ type ReplyTail = &'o mut Result<Fs::Farc, i32>;
+
+ fn consume_errno(errno: i32, tail: &mut Self::ReplyTail) {
+ **tail = Err(errno);
+ }
+ },
+
+ Reply {
+ /// Server-side initialization succeeded. The provided `Farc` references the filesystem's
+ /// root inode.
+ pub fn root(self, root: Fs::Farc) -> Done<'o> {
+ *self.tail = Ok(root);
+ self.done()
+ }
+ },
+}
+
+op! {
+ Statfs {},
+
+ Reply {
+ /// Replies with filesystem statistics.
+ pub fn info(self, statfs: FsInfo) -> Done<'o> {
+ let statfs: proto::StatfsOut = statfs.into();
+ self.single(&statfs)
+ }
+ },
+}
+
+op! {
+ Opendir {
+ type RequestBody = &'o proto::OpendirIn;
+ },
+}
+
+op! {
+ Readdir {
+ type RequestBody = &'o proto::ReaddirIn;
+ },
+
+ Request {
+ /// Returns the base offset in the directory stream to read from.
+ pub fn offset(&self) -> u64 {
+ self.body.read_in.offset
+ }
+ },
+
+ Reply {
+ pub fn try_iter<'a, I, E, Ref>(
+ self,
+ mut entries: I,
+ ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)>
+ where
+ I: Iterator<Item = Result<Entry<'a, Ref>, E>> + Send,
+ Ref: Borrow<Fs::Farc>,
+ {
+ //TODO: This is about as shitty as it gets
+ match entries.next().transpose() {
+ Ok(Some(entry)) => {
+ let Entry {
+ name,
+ inode,
+ offset,
+ ..
+ } = entry;
+
+ let inode = inode.borrow();
+ let Ino(ino) = <Fs as Fuse>::Inode::ino(inode);
+
+ let dirent = proto::Dirent {
+ ino,
+ off: offset,
+ namelen: name.len() as u32,
+ entry_type: (match <Fs as Fuse>::Inode::inode_type(inode) {
+ EntryType::Fifo => SFlag::S_IFIFO,
+ EntryType::CharacterDevice => SFlag::S_IFCHR,
+ EntryType::Directory => SFlag::S_IFDIR,
+ EntryType::BlockDevice => SFlag::S_IFBLK,
+ EntryType::File => SFlag::S_IFREG,
+ EntryType::Symlink => SFlag::S_IFLNK,
+ EntryType::Socket => SFlag::S_IFSOCK,
+ })
+ .bits()
+ >> 12,
+ };
+
+ let dirent = bytes_of(&dirent);
+ let name = name.as_bytes();
+
+ let padding = [0; 8];
+ let padding = &padding[..7 - (dirent.len() + name.len() - 1) % 8];
+
+ Ok(self.chain(OutputChain::tail(&[dirent, name, padding])))
+ }
+
+ Err(error) => Err((self, error)),
+
+ Ok(None) => Ok(self.empty()),
+ }
+ }
+
+ // See rust-lang/rust#61949
+ pub async fn try_stream<'a, S, E, Ref>(
+ self,
+ entries: S,
+ ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)>
+ where
+ S: Stream<Item = Result<Entry<'a, Ref>, E>> + Send,
+ Ref: Borrow<Fs::Farc> + Send,
+ E: Send,
+ {
+ //TODO: This is about as shitty as it gets
+ let first = entries.boxed().try_next().await;
+ self.try_iter(first.transpose().into_iter())
+ }
+ },
+}
+
+op! {
+ Access {
+ type RequestBody = &'o proto::AccessIn;
+ },
+
+ Request {
+ pub fn mask(&self) -> AccessFlags {
+ AccessFlags::from_bits_truncate(self.body.mask as i32)
+ }
+ },
+
+ Reply {
+ pub fn ok(self) -> Done<'o> {
+ self.empty()
+ }
+
+ pub fn permission_denied(self) -> Done<'o> {
+ self.fail(Errno::EACCES)
+ }
+ },
+}
+
+impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
+ fn done(self) -> Done<'o> {
+ Done::from_result(Ok(()))
+ }
+
+ fn empty(self) -> Done<'o> {
+ self.chain(OutputChain::empty())
+ }
+
+ fn single<P: Pod>(self, single: &P) -> Done<'o> {
+ self.chain(OutputChain::tail(&[bytes_of(single)]))
+ }
+
+ fn chain(self, chain: OutputChain<'_>) -> Done<'o> {
+ Done::from_result(session::ok(&self.session, self.unique, chain))
+ }
+}
+
+fn c_to_os(string: &CStr) -> &OsStr {
+ OsStr::from_bytes(string.to_bytes())
+}
+
+fn make_entry(
+ (Ino(ino), entry_ttl): (Ino, TimeToLive),
+ (attrs, attr_ttl): (proto::Attrs, TimeToLive),
+) -> proto::EntryOut {
+ proto::EntryOut {
+ nodeid: ino,
+ generation: 0, //TODO
+ entry_valid: entry_ttl.seconds,
+ attr_valid: attr_ttl.seconds,
+ entry_valid_nsec: entry_ttl.nanoseconds,
+ attr_valid_nsec: attr_ttl.nanoseconds,
+ attr: attrs,
+ }
+}
diff --git a/src/fuse/session.rs b/src/fuse/session.rs
new file mode 100644
index 0000000..35ebb69
--- /dev/null
+++ b/src/fuse/session.rs
@@ -0,0 +1,569 @@
+use std::{
+ collections::{hash_map, HashMap},
+ convert::TryInto,
+ os::unix::io::IntoRawFd,
+ sync::{Arc, Mutex},
+};
+
+use nix::{
+ fcntl::{fcntl, FcntlArg, OFlag},
+ sys::uio::{readv, writev, IoVec},
+ unistd::{sysconf, SysconfVar},
+};
+
+use tokio::{
+ io::unix::AsyncFd,
+ sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore},
+};
+
+use bytemuck::{bytes_of, try_from_bytes};
+use smallvec::SmallVec;
+
+use crate::{
+ proto::{self, InHeader},
+ util::{display_or, from_nix_error, OutputChain},
+ Errno, FuseError, FuseResult, Ino,
+};
+
+use super::{
+ fs::{Fuse, Inode},
+ Reply, Request, Session, Start,
+};
+
+pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> {
+ session.send(unique, 0, output)
+}
+
+pub fn notify<Fs: Fuse>(
+ session: &Session<Fs>,
+ op: proto::NotifyCode,
+ output: OutputChain<'_>,
+) -> FuseResult<()> {
+ session.send(0, op as i32, output)
+}
+
+pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> {
+ if errno <= 0 {
+ log::warn!(
+ "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG",
+ unique,
+ errno
+ );
+
+ errno = Errno::ENOMSG as i32;
+ }
+
+ session.send(unique, -errno, OutputChain::empty())
+}
+
+pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) {
+ let ino = <Fs as Fuse>::Inode::ino(inode);
+ let mut known = session.known.lock().unwrap();
+
+ use hash_map::Entry::*;
+ match known.entry(ino) {
+ Occupied(entry) => {
+ let (_, count) = entry.into_mut();
+ *count += 1;
+ }
+
+ Vacant(entry) => {
+ entry.insert((Fs::Farc::clone(inode), 1));
+ }
+ }
+}
+
+pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> {
+ session.interrupt_tx.subscribe()
+}
+
+impl<Fs: Fuse> Session<Fs> {
+ pub fn fs(&self) -> &Fs {
+ &self.fs
+ }
+
+ pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> {
+ let this = Arc::clone(&self);
+ let main_loop = async move {
+ loop {
+ let incoming = this.receive().await;
+ let this = Arc::clone(&this);
+
+ tokio::spawn(async move {
+ let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming {
+ Ok(mut incoming) => match this.dispatch(&mut incoming).await {
+ Ok(()) => (Ok(()), None),
+
+ Err(error) => {
+ let data = incoming.buffer.data();
+ let data = &data[..std::mem::size_of::<InHeader>().max(data.len())];
+ (Err(error), try_from_bytes(data).ok().copied())
+ }
+ },
+
+ Err(error) => (Err(error.into()), None),
+ };
+
+ let header = display_or(header, "(bad)");
+ if let Err(error) = result {
+ log::error!("Handling request {}: {}", header, error);
+ }
+ });
+ }
+ };
+
+ tokio::select! {
+ () = main_loop => unreachable!(),
+ () = self.destroy.notified() => Ok(()),
+ }
+ }
+
+ async fn do_handshake(
+ &mut self,
+ pages_per_buffer: usize,
+ bytes_per_buffer: usize,
+ ) -> FuseResult<Handshake> {
+ use FuseError::*;
+
+ let buffer = {
+ self.session_fd.readable().await?.retain_ready();
+ let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap();
+
+ let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE]));
+ let sbo = match &mut data {
+ InputBufferStorage::Sbo(SboStorage(sbo)) => sbo,
+ _ => unreachable!(),
+ };
+
+ let mut io_vecs = [
+ IoVec::from_mut_slice(sbo),
+ IoVec::from_mut_slice(large_buffer),
+ ];
+
+ let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(from_nix_error)?;
+ InputBuffer { bytes, data }
+ };
+
+ let request: proto::Request<'_> = buffer.data().try_into()?;
+
+ let unique = request.header().unique;
+ let init = match request.body() {
+ proto::RequestBody::Init(body) => body,
+ _ => return Err(ProtocolInit),
+ };
+
+ use std::cmp::Ordering;
+ let supported = match init.major.cmp(&proto::MAJOR_VERSION) {
+ Ordering::Less => false,
+
+ Ordering::Equal => {
+ self.proto_minor = init.minor;
+ self.proto_minor >= proto::REQUIRED_MINOR_VERSION
+ }
+
+ Ordering::Greater => {
+ let tail = [bytes_of(&proto::MAJOR_VERSION)];
+ ok(&self, unique, OutputChain::tail(&tail))?;
+
+ return Ok(Handshake::Restart);
+ }
+ };
+
+ //TODO: fake some decency by supporting a few older minor versions
+ if !supported {
+ log::error!(
+ "Unsupported protocol {}.{}; this build requires \
+ {major}.{}..={major}.{} (or a greater version \
+ through compatibility)",
+ init.major,
+ init.minor,
+ proto::REQUIRED_MINOR_VERSION,
+ proto::TARGET_MINOR_VERSION,
+ major = proto::MAJOR_VERSION
+ );
+
+ fail(&self, unique, Errno::EPROTONOSUPPORT as i32)?;
+ return Err(ProtocolInit);
+ }
+
+ let root = {
+ let mut init_result = Err(0);
+ let reply = Reply {
+ session: &self,
+ unique,
+ tail: &mut init_result,
+ };
+
+ self.fs.init(reply).await.into_result()?;
+
+ match init_result {
+ Ok(root) => root,
+ Err(errno) => {
+ log::error!("init() handler failed: {}", Errno::from_i32(errno));
+ return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno)));
+ }
+ }
+ };
+
+ self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1));
+
+ use proto::InitFlags;
+ let flags = InitFlags::from_bits_truncate(init.flags);
+ let supported = InitFlags::PARALLEL_DIROPS
+ | InitFlags::ABORT_ERROR
+ | InitFlags::MAX_PAGES
+ | InitFlags::CACHE_SYMLINKS;
+
+ let flags = flags & supported;
+ let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>();
+ let init_out = proto::InitOut {
+ major: proto::MAJOR_VERSION,
+ minor: proto::TARGET_MINOR_VERSION,
+ max_readahead: 0, //TODO
+ flags: flags.bits(),
+ max_background: 0, //TODO
+ congestion_threshold: 0, //TODO
+ max_write: max_write.try_into().unwrap(),
+ time_gran: 1, //TODO
+ max_pages: pages_per_buffer.try_into().unwrap(),
+ padding: Default::default(),
+ unused: Default::default(),
+ };
+
+ let tail = [bytes_of(&init_out)];
+ ok(&self, unique, OutputChain::tail(&tail))?;
+
+ Ok(Handshake::Done)
+ }
+
+ async fn dispatch(self: &Arc<Self>, request: &mut Incoming<Fs>) -> FuseResult<()> {
+ let request: proto::Request<'_> = request.buffer.data().try_into()?;
+ let header = request.header();
+ let InHeader { unique, ino, .. } = *header;
+ let ino = Ino(ino);
+
+ use proto::RequestBody::*;
+
+ macro_rules! op {
+ () => {
+ op!(())
+ };
+
+ ($body:expr) => {
+ op!($body, ())
+ };
+
+ ($body:expr, $tail:expr) => {{
+ let request = Request {
+ header,
+ body: $body,
+ };
+ let reply = Reply {
+ session: &self,
+ unique,
+ tail: $tail,
+ };
+
+ (request, reply, self)
+ }};
+ }
+
+ // These operations don't involve locking and searching self.known
+ match request.body() {
+ Forget(body) => {
+ self.forget(std::iter::once((ino, body.nlookup))).await;
+ return Ok(());
+ }
+
+ Statfs => return self.fs.statfs(op!()).await.into_result(),
+
+ Interrupt(body) => {
+ //TODO: Don't reply with EAGAIN if the interrupt is successful
+ let _ = self.interrupt_tx.send(body.unique);
+ return fail(&self, unique, Errno::EAGAIN as i32);
+ }
+
+ Destroy => {
+ self.destroy.notify_one();
+ return Ok(());
+ }
+
+ BatchForget { forgets, .. } => {
+ let forgets = forgets
+ .iter()
+ .map(|target| (Ino(target.ino), target.nlookup));
+
+ self.forget(forgets).await;
+ return Ok(());
+ }
+
+ _ => (),
+ }
+
+ // Some operations are handled while self.known is locked
+ let inode = {
+ let known = self.known.lock().unwrap();
+ let inode = match known.get(&ino) {
+ Some((farc, _)) => farc,
+ None => {
+ log::error!(
+ "Lookup count for ino {} reached zero while still \
+ known to the kernel, this is a bug",
+ ino
+ );
+
+ return fail(&self, unique, Errno::ENOANO as i32);
+ }
+ };
+
+ match request.body() {
+ Getattr(_) => {
+ //TODO: Getattr flags
+ let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode);
+ let attrs = attrs.finish::<Fs>(inode);
+ drop(known);
+
+ let out = proto::AttrOut {
+ attr_valid: ttl.seconds,
+ attr_valid_nsec: ttl.nanoseconds,
+ dummy: Default::default(),
+ attr: attrs,
+ };
+
+ return ok(&self, unique, OutputChain::tail(&[bytes_of(&out)]));
+ }
+
+ Access(body) => {
+ return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result()
+ }
+
+ _ => inode.clone(),
+ }
+ };
+
+ macro_rules! inode_op {
+ ($op:ident, $($exprs:expr),+) => {
+ <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await
+ };
+ }
+
+ // These operations involve a Farc cloned from self.known
+ let done = match request.body() {
+ Lookup { name } => inode_op!(lookup, *name),
+ Readlink => inode_op!(readlink, ()),
+ Open(body) => {
+ let mut flags = proto::OpenOutFlags::empty();
+ if <Fs as Fuse>::Inode::direct_io(&inode) {
+ flags |= proto::OpenOutFlags::DIRECT_IO;
+ }
+
+ inode_op!(open, *body, (ino, flags))
+ }
+ Opendir(body) => inode_op!(opendir, *body),
+ Readdir(body) => inode_op!(readdir, *body),
+
+ _ => return fail(&self, unique, Errno::ENOSYS as i32),
+ };
+
+ done.into_result()
+ }
+
+ async fn forget<I>(&self, targets: I)
+ where
+ I: Iterator<Item = (Ino, u64)>,
+ {
+ let mut known = self.known.lock().unwrap();
+
+ for (ino, subtracted) in targets {
+ use hash_map::Entry::*;
+
+ match known.entry(ino) {
+ Occupied(mut entry) => {
+ let (_, count) = entry.get_mut();
+
+ *count = count.saturating_sub(subtracted);
+ if *count > 0 {
+ continue;
+ }
+
+ entry.remove();
+ }
+
+ Vacant(_) => {
+ log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino);
+ continue;
+ }
+ }
+ }
+ }
+
+ async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming<Fs>> {
+ use InputBufferStorage::*;
+
+ let permit = Arc::clone(&self.input_semaphore)
+ .acquire_owned()
+ .await
+ .unwrap();
+
+ let mut incoming = Incoming {
+ session: Arc::clone(self),
+ buffer: InputBuffer {
+ bytes: 0,
+ data: Sbo(SboStorage([0; SBO_SIZE])),
+ },
+ };
+
+ let sbo = match &mut incoming.buffer.data {
+ Sbo(SboStorage(sbo)) => sbo,
+ _ => unreachable!(),
+ };
+
+ loop {
+ let mut readable = self.session_fd.readable().await?;
+
+ let mut large_buffers = self.large_buffers.lock().unwrap();
+ let large_buffer = large_buffers.last_mut().unwrap();
+
+ let mut io_vecs = [
+ IoVec::from_mut_slice(sbo),
+ IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]),
+ ];
+
+ match readable.try_io(|fd| readv(*fd.get_ref(), &mut io_vecs).map_err(from_nix_error)) {
+ Ok(Ok(bytes)) => {
+ if bytes > SBO_SIZE {
+ (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo);
+ incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit);
+ }
+
+ incoming.buffer.bytes = bytes;
+ return Ok(incoming);
+ }
+
+ // Interrupted
+ Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue,
+
+ Ok(Err(error)) => return Err(error),
+ Err(_) => continue,
+ }
+ }
+ }
+
+ fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> {
+ let length = std::mem::size_of::<proto::OutHeader>();
+ let length = length
+ + output
+ .iter()
+ .map(<[_]>::iter)
+ .flatten()
+ .copied()
+ .map(<[_]>::len)
+ .sum::<usize>();
+
+ let length = length.try_into().unwrap();
+ let header = proto::OutHeader {
+ len: length,
+ error,
+ unique,
+ };
+
+ //TODO: Full const generics any time now? Fs::EXPECTED_REQUEST_SEGMENTS
+ let header = [bytes_of(&header)];
+ let output = output.preceded(&header);
+ let buffers: SmallVec<[_; 8]> = output
+ .iter()
+ .map(<[_]>::iter)
+ .flatten()
+ .copied()
+ .filter(|slice| !slice.is_empty())
+ .map(IoVec::from_slice)
+ .collect();
+
+ let written = writev(*self.session_fd.get_ref(), &buffers).map_err(from_nix_error)?;
+ if written == length as usize {
+ Ok(())
+ } else {
+ Err(FuseError::ShortWrite)
+ }
+ }
+}
+
+impl Start {
+ pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> {
+ let session_fd = self.session_fd.into_raw_fd();
+
+ let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE;
+ fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap();
+
+ let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize;
+ let pages_per_buffer = fs.request_buffer_pages().get();
+ let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap();
+ assert!(bytes_per_buffer >= proto::MIN_READ_SIZE);
+
+ let mut large_buffers = Vec::with_capacity(fs.request_buffers().get());
+ for _ in 0..large_buffers.capacity() {
+ large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice());
+ }
+
+ let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY);
+ let mut session = Session {
+ _fusermount_fd: self.fusermount_fd,
+ session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?,
+ proto_minor: 0, // Set by Session::do_handshake()
+ fs,
+ input_semaphore: Arc::new(Semaphore::new(large_buffers.len())),
+ large_buffers: Mutex::new(large_buffers),
+ known: Mutex::new(HashMap::new()),
+ destroy: Notify::new(),
+ interrupt_tx,
+ };
+
+ loop {
+ let state = session
+ .do_handshake(pages_per_buffer, bytes_per_buffer)
+ .await?;
+
+ if let Handshake::Done = state {
+ break Ok(Arc::new(session));
+ }
+ }
+ }
+}
+
+enum Handshake {
+ Done,
+ Restart,
+}
+
+struct Incoming<Fs: Fuse> {
+ session: Arc<Session<Fs>>,
+ buffer: InputBuffer,
+}
+
+struct InputBuffer {
+ pub bytes: usize,
+ pub data: InputBufferStorage,
+}
+
+enum InputBufferStorage {
+ Sbo(SboStorage),
+ Spilled(Box<[u8]>, OwnedSemaphorePermit),
+}
+
+#[repr(align(8))]
+struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]);
+
+const SBO_SIZE: usize = std::mem::size_of::<SboStorage>();
+const INTERRUPT_BROADCAST_CAPACITY: usize = 32;
+
+impl InputBuffer {
+ fn data(&self) -> &[u8] {
+ use InputBufferStorage::*;
+ let storage = match &self.data {
+ Sbo(sbo) => &sbo.0,
+ Spilled(buffer, _) => &buffer[..],
+ };
+
+ &storage[..self.bytes]
+ }
+}