diff options
Diffstat (limited to 'src/fuse')
| -rw-r--r-- | src/fuse/fs.rs | 78 | ||||
| -rw-r--r-- | src/fuse/io.rs | 97 | ||||
| -rw-r--r-- | src/fuse/mod.rs | 69 | ||||
| -rw-r--r-- | src/fuse/mount.rs | 8 | ||||
| -rw-r--r-- | src/fuse/ops.rs | 332 | ||||
| -rw-r--r-- | src/fuse/session.rs | 705 |
6 files changed, 517 insertions, 772 deletions
diff --git a/src/fuse/fs.rs b/src/fuse/fs.rs deleted file mode 100644 index 0c39ea7..0000000 --- a/src/fuse/fs.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::{Ino, TimeToLive}; -use async_trait::async_trait; -use std::{num::NonZeroUsize, ops::Deref, sync::Arc}; - -use super::{ - io::{Attrs, EntryType}, - ops::*, - Done, Op, Reply, -}; - -#[async_trait] -pub trait Fuse: Sized + Send + Sync + 'static { - type Inode: Inode<Fuse = Self> + ?Sized; - type Farc: Deref<Target = Self::Inode> + Clone + Send + Sync = Arc<Self::Inode>; - - async fn init<'o>(&self, reply: Reply<'o, Self, Init>) -> Done<'o>; - - async fn statfs<'o>(&self, (_, reply, _): Op<'o, Self, Statfs>) -> Done<'o> { - reply.not_implemented() - } - - fn request_buffers(&self) -> NonZeroUsize { - NonZeroUsize::new(16).unwrap() - } - - fn request_buffer_pages(&self) -> NonZeroUsize { - NonZeroUsize::new(4).unwrap() - } -} - -#[async_trait] -pub trait Inode: Send + Sync { - type Fuse: Fuse<Inode = Self>; - - fn ino(self: &FarcTo<Self>) -> Ino; - fn attrs(self: &FarcTo<Self>) -> (Attrs, TimeToLive); - fn inode_type(self: &FarcTo<Self>) -> EntryType; - - fn direct_io(self: &FarcTo<Self>) -> bool { - false - } - - fn access<'o>(self: &FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Access>) -> Done<'o> { - reply.not_implemented() - } - - async fn lookup<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Lookup>) -> Done<'o> { - reply.not_implemented() - } - - async fn readlink<'o>( - self: FarcTo<Self>, - (_, reply, _): Op<'o, Self::Fuse, Readlink>, - ) -> Done<'o> { - reply.not_implemented() - } - - async fn open<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Open>) -> Done<'o> { - // Calling not_implemented() here would ignore direct_io() and similar flags - reply.ok() - } - - async fn opendir<'o>( - self: FarcTo<Self>, - (_, reply, _): Op<'o, Self::Fuse, Opendir>, - ) -> Done<'o> { - reply.not_implemented() - } - - async fn readdir<'o>( - self: FarcTo<Self>, - (_, reply, _): Op<'o, Self::Fuse, Readdir>, - ) -> Done<'o> { - reply.not_implemented() - } -} - -pub type FarcTo<I> = <<I as Inode>::Fuse as Fuse>::Farc; diff --git a/src/fuse/io.rs b/src/fuse/io.rs index 305f0ef..5f85e4d 100644 --- a/src/fuse/io.rs +++ b/src/fuse/io.rs @@ -2,19 +2,14 @@ use bytemuck::Zeroable; use nix::{errno::Errno, sys::stat::SFlag}; use std::{ - borrow::Cow, convert::Infallible, - ffi::OsStr, future::Future, ops::{ControlFlow, FromResidual, Try}, }; -use crate::{proto, Ino, TimeToLive, Timestamp}; +use crate::{proto, FuseResult, Ino, TimeToLive, Timestamp}; -use super::{ - fs::{Fuse, Inode}, - session, Done, Operation, Reply, Request, -}; +use super::{Done, Operation, Reply, Request}; #[doc(no_inline)] pub use nix::{ @@ -23,24 +18,31 @@ pub use nix::{ unistd::{AccessFlags, Gid, Pid, Uid}, }; -pub enum Interruptible<'o, Fs: Fuse, O: Operation<'o, Fs>, T> { - Completed(Reply<'o, Fs, O>, T), +pub enum Interruptible<'o, O: Operation<'o>, T> { + Completed(Reply<'o, O>, T), Interrupted(Done<'o>), } +pub trait Known { + fn ino(&self) -> Ino; + fn inode_type(&self) -> EntryType; + fn attrs(&self) -> (Attrs, TimeToLive); + fn unveil(self); +} + #[derive(Clone)] pub struct Attrs(proto::Attrs); -pub struct Entry<'a, Ref> { +pub struct Entry<N, K> { pub offset: u64, - pub name: Cow<'a, OsStr>, - pub inode: Ref, + pub name: N, + pub inode: K, pub ttl: TimeToLive, } pub struct FsInfo(proto::StatfsOut); -impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> { +impl<'o, O: Operation<'o>> Request<'o, O> { pub fn ino(&self) -> Ino { Ino(self.header.ino) } @@ -62,13 +64,13 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> { } } -impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { - pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, Fs, O, T> +impl<'o, O: Operation<'o>> Reply<'o, O> { + pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, O, T> where F: Future<Output = T>, { tokio::pin!(f); - let mut rx = session::interrupt_rx(self.session); + let mut rx = self.session.interrupt_rx(); use Interruptible::*; loop { @@ -93,11 +95,9 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { } } - pub fn fail(mut self, errno: Errno) -> Done<'o> { - let errno = errno as i32; - O::consume_errno(errno, &mut self.tail); - - Done::from_result(session::fail(self.session, self.unique, errno)) + pub fn fail(self, errno: Errno) -> Done<'o> { + let result = self.session.fail(self.unique, errno as i32); + self.finish(result) } pub fn not_implemented(self) -> Done<'o> { @@ -115,14 +115,18 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { pub fn interrupted(self) -> Done<'o> { self.fail(Errno::EINTR) } + + pub(crate) fn finish(self, result: FuseResult<()>) -> Done<'o> { + if let Err(error) = result { + log::error!("Replying to request {}: {}", self.unique, error); + } + + Done::done() + } } -impl<'o, Fs, O> From<(Reply<'o, Fs, O>, Errno)> for Done<'o> -where - Fs: Fuse, - O: Operation<'o, Fs>, -{ - fn from((reply, errno): (Reply<'o, Fs, O>, Errno)) -> Done<'o> { +impl<'o, O: Operation<'o>> From<(Reply<'o, O>, Errno)> for Done<'o> { + fn from((reply, errno): (Reply<'o, O>, Errno)) -> Done<'o> { reply.fail(errno) } } @@ -142,12 +146,8 @@ impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> { } } -impl<'o, Fs, O> FromResidual<Interruptible<'o, Fs, O, Infallible>> for Done<'o> -where - Fs: Fuse, - O: Operation<'o, Fs>, -{ - fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { +impl<'o, O: Operation<'o>> FromResidual<Interruptible<'o, O, Infallible>> for Done<'o> { + fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { match residual { Interruptible::Completed(_, _) => unreachable!(), Interruptible::Interrupted(done) => done, @@ -168,13 +168,10 @@ impl Try for Done<'_> { } } -impl<'o, Fs, O, T> FromResidual<Interruptible<'o, Fs, O, Infallible>> - for Interruptible<'o, Fs, O, T> -where - Fs: Fuse, - O: Operation<'o, Fs>, +impl<'o, O: Operation<'o>, T> FromResidual<Interruptible<'o, O, Infallible>> + for Interruptible<'o, O, T> { - fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { + fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { use Interruptible::*; match residual { @@ -184,13 +181,9 @@ where } } -impl<'o, Fs, O, T> Try for Interruptible<'o, Fs, O, T> -where - Fs: Fuse, - O: Operation<'o, Fs>, -{ - type Output = (Reply<'o, Fs, O>, T); - type Residual = Interruptible<'o, Fs, O, Infallible>; +impl<'o, O: Operation<'o>, T> Try for Interruptible<'o, O, T> { + type Output = (Reply<'o, O>, T); + type Residual = Interruptible<'o, O, Infallible>; fn from_output((reply, t): Self::Output) -> Self { Self::Completed(reply, t) @@ -234,14 +227,14 @@ impl Attrs { }) } - pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self { + pub fn times(self, access: Timestamp, modify: Timestamp, create: Timestamp) -> Self { Attrs(proto::Attrs { atime: access.seconds, mtime: modify.seconds, - ctime: change.seconds, + ctime: create.seconds, atimensec: access.nanoseconds, mtimensec: modify.nanoseconds, - ctimensec: change.nanoseconds, + ctimensec: create.nanoseconds, ..self.0 }) } @@ -253,9 +246,9 @@ impl Attrs { }) } - pub(crate) fn finish<Fs: Fuse>(self, inode: &Fs::Farc) -> proto::Attrs { - let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); - let inode_type = match <Fs as Fuse>::Inode::inode_type(inode) { + pub(crate) fn finish(self, inode: &impl Known) -> proto::Attrs { + let Ino(ino) = inode.ino(); + let inode_type = match inode.inode_type() { EntryType::Fifo => SFlag::S_IFIFO, EntryType::CharacterDevice => SFlag::S_IFCHR, EntryType::Directory => SFlag::S_IFDIR, diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs index 0e39e6b..5fe6f46 100644 --- a/src/fuse/mod.rs +++ b/src/fuse/mod.rs @@ -1,87 +1,46 @@ -use std::{ - collections::HashMap, - marker::PhantomData, - os::unix::io::RawFd, - sync::{Arc, Mutex}, -}; - -use tokio::{ - io::unix::AsyncFd, - sync::{broadcast, Notify, Semaphore}, -}; - -use crate::{proto, util::DumbFd, FuseResult, Ino}; +use crate::proto; +use std::marker::PhantomData; pub mod io; #[doc(cfg(feature = "server"))] -pub mod fs; - -#[doc(cfg(feature = "server"))] pub mod ops; #[doc(cfg(feature = "mount"))] pub mod mount; -mod session; -use fs::Fuse; - -#[doc(cfg(feature = "server"))] -pub struct Session<Fs: Fuse> { - _fusermount_fd: DumbFd, - session_fd: AsyncFd<RawFd>, - proto_minor: u32, - fs: Fs, - input_semaphore: Arc<Semaphore>, - large_buffers: Mutex<Vec<Box<[u8]>>>, - known: Mutex<HashMap<Ino, (Fs::Farc, u64)>>, - destroy: Notify, - interrupt_tx: broadcast::Sender<u64>, -} - -#[doc(cfg(feature = "server"))] -pub struct Start { - fusermount_fd: DumbFd, - session_fd: DumbFd, -} +pub mod session; mod private_trait { - pub trait Operation<'o, Fs: super::Fuse> { - type RequestBody = (); - type ReplyTail = (); - - fn consume_errno(_errno: i32, _tail: &mut Self::ReplyTail) {} + pub trait Operation<'o> { + type RequestBody: crate::proto::Structured<'o>; + type ReplyTail: Default; } } use private_trait::Operation; -#[doc(cfg(feature = "server"))] -pub type Op<'o, Fs, O> = (Request<'o, Fs, O>, Reply<'o, Fs, O>, &'o Arc<Session<Fs>>); +pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>); #[doc(cfg(feature = "server"))] -pub struct Request<'o, Fs: Fuse, O: Operation<'o, Fs>> { - header: &'o proto::InHeader, +pub struct Request<'o, O: Operation<'o>> { + header: proto::InHeader, body: O::RequestBody, } #[doc(cfg(feature = "server"))] -pub struct Reply<'o, Fs: Fuse, O: Operation<'o, Fs>> { - session: &'o Session<Fs>, +pub struct Reply<'o, O: Operation<'o>> { + session: &'o session::Session, unique: u64, tail: O::ReplyTail, } #[must_use] #[doc(cfg(feature = "server"))] -pub struct Done<'o>(FuseResult<PhantomData<&'o ()>>); +pub struct Done<'o>(PhantomData<*mut &'o ()>); impl Done<'_> { - fn from_result(result: FuseResult<()>) -> Self { - Done(result.map(|()| PhantomData)) - } - - fn into_result(self) -> FuseResult<()> { - self.0.map(|PhantomData| ()) + fn done() -> Self { + Done(PhantomData) } } diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs index 955b28a..6464372 100644 --- a/src/fuse/mount.rs +++ b/src/fuse/mount.rs @@ -17,7 +17,7 @@ use nix::{ use quick_error::quick_error; -use super::Start; +use super::session::Start; use crate::util::DumbFd; quick_error! { @@ -146,10 +146,8 @@ where Ok(session_fd) => { let fusermount_fd = DumbFd(left_side.into_raw_fd()); let session_fd = DumbFd(session_fd); - Ok(Start { - fusermount_fd, - session_fd, - }) + + Ok(Start::new(fusermount_fd, session_fd)) } Err(error) => { diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs index bbd49fe..9bca233 100644 --- a/src/fuse/ops.rs +++ b/src/fuse/ops.rs @@ -1,9 +1,6 @@ use bytemuck::{bytes_of, Pod, Zeroable}; -use futures_util::stream::{Stream, StreamExt, TryStreamExt}; -use nix::sys::stat::SFlag; use std::{ - borrow::Borrow, ffi::{CStr, OsStr}, os::unix::ffi::OsStrExt, }; @@ -11,56 +8,61 @@ use std::{ use crate::{proto, util::OutputChain, Errno, Ino, TimeToLive}; use super::{ - fs::{Fuse, Inode}, - io::{AccessFlags, Entry, EntryType, FsInfo}, - session, Done, Operation, Reply, Request, + io::{AccessFlags, Entry, FsInfo, Interruptible, Known}, + Done, Operation, Reply, Request, }; macro_rules! op { - { $name:ident $operation:tt $(,)+ } => { + { $name:ident $operation:tt } => { pub struct $name(()); - impl<'o, Fs: Fuse> Operation<'o, Fs> for $name $operation + impl<'o> Operation<'o> for $name $operation }; - { $name:ident $operation:tt, Request $request:tt $($next:tt)+ } => { - impl<'o, Fs: Fuse> Request<'o, Fs, $name> $request + { $name:ident $operation:tt impl Request $request:tt $($next:tt)* } => { + impl<'o> Request<'o, $name> $request - op! { $name $operation $($next)+ } + op! { $name $operation $($next)* } }; - { $name:ident $operation:tt, Reply $reply:tt $($next:tt)+ } => { - impl<'o, Fs: Fuse> Reply<'o, Fs, $name> $reply + { $name:ident $operation:tt impl Reply $reply:tt $($next:tt)* } => { + impl<'o> Reply<'o, $name> $reply - op! { $name $operation $($next)+ } + op! { $name $operation $($next)* } }; } op! { + Any { + type RequestBody = (); + type ReplyTail = (); + } +} + +op! { Lookup { - // name() - type RequestBody = &'o CStr; - }, + type RequestBody = &'o CStr; // name() + type ReplyTail = (); + } - Request { + impl Request { /// Returns the name of the entry being looked up in this directory. pub fn name(&self) -> &OsStr { c_to_os(self.body) } - }, - - Reply { - /// The requested entry was found and a `Farc` was successfully determined from it. The - /// FUSE client will become aware of the found inode if it wasn't before. This result may - /// be cached by the client for up to the given TTL. - pub fn found(self, entry: &Fs::Farc, ttl: TimeToLive) -> Done<'o> { - let (attrs, attrs_ttl) = <Fs as Fuse>::Inode::attrs(entry); - session::unveil(self.session, entry); - - self.single(&make_entry( - (<Fs as Fuse>::Inode::ino(entry), ttl), - (attrs.finish::<Fs>(entry), attrs_ttl), - )) + } + + impl Reply { + /// The requested entry was found. The FUSE client will become aware of the found inode if + /// it wasn't before. This result may be cached by the client for up to the given TTL. + pub fn found(self, entry: impl Known, ttl: TimeToLive) -> Done<'o> { + let (attrs, attrs_ttl) = entry.attrs(); + let attrs = attrs.finish(&entry); + + let done = self.single(&make_entry((entry.ino(), ttl), (attrs, attrs_ttl))); + entry.unveil(); + + done } /// The requested entry was not found in this directory. The FUSE clint may include this @@ -75,13 +77,37 @@ op! { pub fn not_found_uncached(self) -> Done<'o> { self.fail(Errno::ENOENT) } - }, + } +} + +op! { + Getattr { + type RequestBody = &'o proto::GetattrIn; + type ReplyTail = (); + } + + impl Reply { + pub fn known(self, inode: &impl Known) -> Done<'o> { + let (attrs, ttl) = inode.attrs(); + let attrs = attrs.finish(inode); + + self.single(&proto::AttrOut { + attr_valid: ttl.seconds, + attr_valid_nsec: ttl.nanoseconds, + dummy: Default::default(), + attr: attrs, + }) + } + } } op! { - Readlink {}, + Readlink { + type RequestBody = (); + type ReplyTail = (); + } - Reply { + impl Reply { /// This inode corresponds to a symbolic link pointing to the given target path. pub fn target(self, target: &OsStr) -> Done<'o> { self.chain(OutputChain::tail(&[target.as_bytes()])) @@ -89,230 +115,129 @@ op! { /// Same as [`Reply::target()`], except that the target path is taken from disjoint /// slices. This involves no additional allocation. - pub fn gather_target(self, target: &[&OsStr]) -> Done<'o> { - //FIXME: Likely UB - self.chain(OutputChain::tail(unsafe { std::mem::transmute(target) })) + pub fn gather_target(self, target: &[&[u8]]) -> Done<'o> { + self.chain(OutputChain::tail(target)) } - }, + } } op! { Open { type RequestBody = &'o proto::OpenIn; - type ReplyTail = (Ino, proto::OpenOutFlags); - }, + type ReplyTail = private::OpenFlags; + } + + impl Reply { + pub fn force_direct_io(&mut self) { + self.tail.0 |= proto::OpenOutFlags::DIRECT_IO; + } - Reply { /// The inode may now be accessed. pub fn ok(self) -> Done<'o> { self.ok_with_handle(0) } fn ok_with_handle(self, handle: u64) -> Done<'o> { - let (_, flags) = self.tail; + let open_flags = self.tail.0.bits(); + self.single(&proto::OpenOut { fh: handle, - open_flags: flags.bits(), + open_flags, padding: Default::default(), }) } - }, + } } -op! { Read {}, } -/*op! { +op! { Read { - type RequestBody = &'o proto::ReadIn; - type ReplyTail = &'o mut OutputBytes<'o>; - }, - - Request { - pub fn offset(&self) -> u64 { - self.body.offset - } - - pub fn size(&self) -> u32 { - self.body.size - } - }, - - Reply { - pub fn remaining(&self) -> u64 { - self.tail.remaining() - } - - pub fn end(self) -> Done<'o> { - if self.tail.ready() { - self.chain(OutputChain::tail(self.tail.segments())) - } else { - // The read() handler will be invoked again with same OutputBytes - self.done() - } - } - - pub fn hole(self, size: u64) -> Result<Self, Done<'o>> { - self.tail - } - - pub fn copy(self, data: &[u8]) -> Result<Self, Done<'o>> { - self.self_or_done(self.tail.copy(data)) - } - - pub fn put(self, data: &'o [u8]) -> Result<Self, Done<'o>> { - self.self_or_done(self.tail.put(data)) - } - - pub fn gather(self, data: &'o [&'o [u8]]) -> Result<Self, Done<'o>> { - self.self_or_done(self.tail.gather(data)) - } - - fn self_or_done(self, capacity: OutputCapacity) -> Result<Self, Done<'o>> { - match capacity { - OutputCapacity::Available => Ok(self), - OutputCapacity::Filled => Err(self.done()), - } - } - }, -}*/ + type RequestBody = (); + type ReplyTail = (); + } +} op! { Write { type RequestBody = &'o proto::WriteIn; - }, + type ReplyTail = (); + } } op! { Init { - type ReplyTail = &'o mut Result<Fs::Farc, i32>; + type RequestBody = &'o proto::InitIn; + type ReplyTail = (); + } - fn consume_errno(errno: i32, tail: &mut Self::ReplyTail) { - **tail = Err(errno); - } - }, - - Reply { - /// Server-side initialization succeeded. The provided `Farc` references the filesystem's - /// root inode. - pub fn root(self, root: Fs::Farc) -> Done<'o> { - *self.tail = Ok(root); - self.done() + impl Reply { + pub fn ok(self) -> Done<'o> { + self.nop() } - }, + } } op! { - Statfs {}, + Statfs { + type RequestBody = (); + type ReplyTail = (); + } - Reply { + impl Reply { /// Replies with filesystem statistics. pub fn info(self, statfs: FsInfo) -> Done<'o> { let statfs: proto::StatfsOut = statfs.into(); self.single(&statfs) } - }, + } } op! { Opendir { type RequestBody = &'o proto::OpendirIn; - }, + type ReplyTail = (); + } } op! { Readdir { type RequestBody = &'o proto::ReaddirIn; - }, + type ReplyTail = (); + } - Request { + impl Request { /// Returns the base offset in the directory stream to read from. pub fn offset(&self) -> u64 { self.body.read_in.offset } - }, + } - Reply { - pub fn try_iter<'a, I, E, Ref>( - self, - mut entries: I, - ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> + impl Reply { + pub fn entry<N>(self, inode: Entry<N, impl Known>) -> Interruptible<'o, Readdir, ()> where - I: Iterator<Item = Result<Entry<'a, Ref>, E>> + Send, - Ref: Borrow<Fs::Farc>, + N: AsRef<OsStr>, { - //TODO: This is about as shitty as it gets - match entries.next().transpose() { - Ok(Some(entry)) => { - let Entry { - name, - inode, - offset, - .. - } = entry; - - let inode = inode.borrow(); - let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); - - let dirent = proto::Dirent { - ino, - off: offset, - namelen: name.len() as u32, - entry_type: (match <Fs as Fuse>::Inode::inode_type(inode) { - EntryType::Fifo => SFlag::S_IFIFO, - EntryType::CharacterDevice => SFlag::S_IFCHR, - EntryType::Directory => SFlag::S_IFDIR, - EntryType::BlockDevice => SFlag::S_IFBLK, - EntryType::File => SFlag::S_IFREG, - EntryType::Symlink => SFlag::S_IFLNK, - EntryType::Socket => SFlag::S_IFSOCK, - }) - .bits() - >> 12, - }; - - let dirent = bytes_of(&dirent); - let name = name.as_bytes(); - - let padding = [0; 8]; - let padding = &padding[..7 - (dirent.len() + name.len() - 1) % 8]; - - Ok(self.chain(OutputChain::tail(&[dirent, name, padding]))) - } - - Err(error) => Err((self, error)), - - Ok(None) => Ok(self.empty()), - } + todo!() } - // See rust-lang/rust#61949 - pub async fn try_stream<'a, S, E, Ref>( - self, - entries: S, - ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> - where - S: Stream<Item = Result<Entry<'a, Ref>, E>> + Send, - Ref: Borrow<Fs::Farc> + Send, - E: Send, - { - //TODO: This is about as shitty as it gets - let first = entries.boxed().try_next().await; - self.try_iter(first.transpose().into_iter()) + pub fn end(self) -> Done<'o> { + todo!() } - }, + } } op! { Access { type RequestBody = &'o proto::AccessIn; - }, + type ReplyTail = (); + } - Request { + impl Request { pub fn mask(&self) -> AccessFlags { AccessFlags::from_bits_truncate(self.body.mask as i32) } - }, + } - Reply { + impl Reply { pub fn ok(self) -> Done<'o> { self.empty() } @@ -320,12 +245,32 @@ op! { pub fn permission_denied(self) -> Done<'o> { self.fail(Errno::EACCES) } - }, + } +} + +op! { + Destroy { + type RequestBody = (); + type ReplyTail = (); + } +} + +mod private { + use crate::proto; + + #[derive(Copy, Clone)] + pub struct OpenFlags(pub proto::OpenOutFlags); + + impl Default for OpenFlags { + fn default() -> Self { + OpenFlags(proto::OpenOutFlags::empty()) + } + } } -impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { - fn done(self) -> Done<'o> { - Done::from_result(Ok(())) +impl<'o, O: Operation<'o>> Reply<'o, O> { + fn nop(self) -> Done<'o> { + Done::done() } fn empty(self) -> Done<'o> { @@ -337,7 +282,8 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { } fn chain(self, chain: OutputChain<'_>) -> Done<'o> { - Done::from_result(session::ok(self.session, self.unique, chain)) + let result = self.session.ok(self.unique, chain); + self.finish(result) } } diff --git a/src/fuse/session.rs b/src/fuse/session.rs index 8975c57..5045099 100644 --- a/src/fuse/session.rs +++ b/src/fuse/session.rs @@ -1,148 +1,121 @@ use std::{ - collections::{hash_map, HashMap}, convert::TryInto, + future::Future, io, + marker::PhantomData, os::unix::io::{IntoRawFd, RawFd}, sync::{Arc, Mutex}, }; use nix::{ fcntl::{fcntl, FcntlArg, OFlag}, - sys::uio::{readv, writev, IoVec}, - unistd::{sysconf, SysconfVar}, + sys::uio::{writev, IoVec}, + unistd::{read, sysconf, SysconfVar}, }; use tokio::{ io::unix::AsyncFd, - sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore}, + sync::{broadcast, OwnedSemaphorePermit, Semaphore}, }; -use bytemuck::{bytes_of, try_from_bytes}; +use bytemuck::bytes_of; use smallvec::SmallVec; use crate::{ - proto::{self, InHeader}, - util::{display_or, OutputChain}, - Errno, FuseError, FuseResult, Ino, + proto::{self, InHeader, Structured}, + util::{DumbFd, OutputChain}, + Errno, FuseError, FuseResult, }; -use super::{ - fs::{Fuse, Inode}, - Reply, Request, Session, Start, -}; +use super::{ops, Done, Op, Operation, Reply, Request}; -pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { - session.send(unique, 0, output) +pub struct Start { + fusermount_fd: DumbFd, + session_fd: DumbFd, } -pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> { - if errno <= 0 { - log::warn!( - "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", - unique, - errno - ); +pub struct Session { + _fusermount_fd: DumbFd, + session_fd: AsyncFd<RawFd>, + interrupt_tx: broadcast::Sender<u64>, + buffers: Mutex<Vec<Buffer>>, + buffer_semaphore: Arc<Semaphore>, + proto_minor: u32, + buffer_pages: usize, +} - errno = Errno::ENOMSG as i32; - } +pub struct Endpoint<'a> { + session: &'a Arc<Session>, + local_buffer: Buffer, +} - session.send(unique, -errno, OutputChain::empty()) +pub enum Dispatch<'o> { + Lookup(Incoming<'o, ops::Lookup>), + Getattr(Incoming<'o, ops::Getattr>), + Readlink(Incoming<'o, ops::Readlink>), + Read(Incoming<'o, ops::Read>), + Write(Incoming<'o, ops::Write>), + Statfs(Incoming<'o, ops::Statfs>), + Readdir(Incoming<'o, ops::Readdir>), + Access(Incoming<'o, ops::Access>), + Destroy(Incoming<'o, ops::Destroy>), } -pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) { - let ino = <Fs as Fuse>::Inode::ino(inode); - let mut known = session.known.lock().unwrap(); +pub struct Incoming<'o, O: Operation<'o>> { + common: IncomingCommon<'o>, + _phantom: PhantomData<O>, +} - use hash_map::Entry::*; - match known.entry(ino) { - Occupied(entry) => { - let (_, count) = entry.into_mut(); - *count += 1; - } +pub struct Owned<O: for<'o> Operation<'o>> { + session: Arc<Session>, + buffer: Buffer, + header: InHeader, + _permit: OwnedSemaphorePermit, + _phantom: PhantomData<O>, +} - Vacant(entry) => { - entry.insert((Fs::Farc::clone(inode), 1)); +impl Session { + pub fn endpoint<'a>(self: &'a Arc<Self>) -> Endpoint<'a> { + Endpoint { + session: self, + local_buffer: Buffer::new(self.buffer_pages), } } -} -pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> { - session.interrupt_tx.subscribe() -} - -impl<Fs: Fuse> Session<Fs> { - pub fn fs(&self) -> &Fs { - &self.fs + pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { + self.send(unique, 0, output) } - pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> { - let this = Arc::clone(&self); - let main_loop = async move { - loop { - let incoming = this.receive().await; - let this = Arc::clone(&this); - - tokio::spawn(async move { - let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming { - Ok(mut incoming) => match this.dispatch(&mut incoming).await { - Ok(()) => (Ok(()), None), - - Err(error) => { - let data = incoming.buffer.data(); - let data = &data[..std::mem::size_of::<InHeader>().max(data.len())]; - (Err(error), try_from_bytes(data).ok().copied()) - } - }, - - Err(error) => (Err(error.into()), None), - }; - - let header = display_or(header, "(bad)"); - if let Err(error) = result { - log::error!("Handling request {}: {}", header, error); - } - }); - } - }; + pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> { + if errno <= 0 { + log::warn!( + "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", + unique, + errno + ); - tokio::select! { - () = main_loop => unreachable!(), - () = self.destroy.notified() => Ok(()), + errno = Errno::ENOMSG as i32; } - } - async fn do_handshake( - &mut self, - pages_per_buffer: usize, - bytes_per_buffer: usize, - ) -> FuseResult<Handshake> { - use FuseError::*; - - let buffer = { - self.session_fd.readable().await?.retain_ready(); - let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap(); - - let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE])); - let sbo = match &mut data { - InputBufferStorage::Sbo(SboStorage(sbo)) => sbo, - _ => unreachable!(), - }; + self.send(unique, -errno, OutputChain::empty()) + } - let mut io_vecs = [ - IoVec::from_mut_slice(sbo), - IoVec::from_mut_slice(large_buffer), - ]; + pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver<u64> { + self.interrupt_tx.subscribe() + } - let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(io::Error::from)?; - InputBuffer { bytes, data } - }; + async fn handshake(&mut self, buffer: &mut Buffer) -> FuseResult<Handshake> { + self.session_fd.readable().await?.retain_ready(); + let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?; - let request: proto::Request<'_> = buffer.data().try_into()?; + let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?; + let init = match opcode { + proto::Opcode::Init => <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes])?, - let unique = request.header().unique; - let init = match request.body() { - proto::RequestBody::Init(body) => body, - _ => return Err(ProtocolInit), + _ => { + log::error!("First message from kernel is not Init, but {:?}", opcode); + return Err(FuseError::ProtocolInit); + } }; use std::cmp::Ordering; @@ -156,7 +129,7 @@ impl<Fs: Fuse> Session<Fs> { Ordering::Greater => { let tail = [bytes_of(&proto::MAJOR_VERSION)]; - ok(self, unique, OutputChain::tail(&tail))?; + self.ok(header.unique, OutputChain::tail(&tail))?; return Ok(Handshake::Restart); } @@ -175,40 +148,27 @@ impl<Fs: Fuse> Session<Fs> { major = proto::MAJOR_VERSION ); - fail(self, unique, Errno::EPROTONOSUPPORT as i32)?; - return Err(ProtocolInit); + self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?; + return Err(FuseError::ProtocolInit); } - let root = { - let mut init_result = Err(0); - let reply = Reply { - session: self, - unique, - tail: &mut init_result, - }; + let flags = { + use proto::InitFlags; - self.fs.init(reply).await.into_result()?; + let kernel = InitFlags::from_bits_truncate(init.flags); + let supported = InitFlags::PARALLEL_DIROPS + | InitFlags::ABORT_ERROR + | InitFlags::MAX_PAGES + | InitFlags::CACHE_SYMLINKS; - match init_result { - Ok(root) => root, - Err(errno) => { - log::error!("init() handler failed: {}", Errno::from_i32(errno)); - return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno))); - } - } + kernel & supported }; - self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1)); + let buffer_size = page_size() * self.buffer_pages; - use proto::InitFlags; - let flags = InitFlags::from_bits_truncate(init.flags); - let supported = InitFlags::PARALLEL_DIROPS - | InitFlags::ABORT_ERROR - | InitFlags::MAX_PAGES - | InitFlags::CACHE_SYMLINKS; + // See fs/fuse/dev.c in the kernel source tree for details about max_write + let max_write = buffer_size - std::mem::size_of::<(InHeader, proto::WriteIn)>(); - let flags = flags & supported; - let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>(); let init_out = proto::InitOut { major: proto::MAJOR_VERSION, minor: proto::TARGET_MINOR_VERSION, @@ -218,230 +178,17 @@ impl<Fs: Fuse> Session<Fs> { congestion_threshold: 0, //TODO max_write: max_write.try_into().unwrap(), time_gran: 1, //TODO - max_pages: pages_per_buffer.try_into().unwrap(), + max_pages: self.buffer_pages.try_into().unwrap(), padding: Default::default(), unused: Default::default(), }; let tail = [bytes_of(&init_out)]; - ok(self, unique, OutputChain::tail(&tail))?; + self.ok(header.unique, OutputChain::tail(&tail))?; Ok(Handshake::Done) } - async fn dispatch(self: &Arc<Self>, request: &mut Incoming) -> FuseResult<()> { - let request: proto::Request<'_> = request.buffer.data().try_into()?; - let header = request.header(); - let InHeader { unique, ino, .. } = *header; - let ino = Ino(ino); - - use proto::RequestBody::*; - - macro_rules! op { - () => { - op!(()) - }; - - ($body:expr) => { - op!($body, ()) - }; - - ($body:expr, $tail:expr) => {{ - let request = Request { - header, - body: $body, - }; - let reply = Reply { - session: &self, - unique, - tail: $tail, - }; - - (request, reply, self) - }}; - } - - // These operations don't involve locking and searching self.known - match request.body() { - Forget(body) => { - self.forget(std::iter::once((ino, body.nlookup))).await; - return Ok(()); - } - - Statfs => return self.fs.statfs(op!()).await.into_result(), - - Interrupt(body) => { - //TODO: Don't reply with EAGAIN if the interrupt is successful - let _ = self.interrupt_tx.send(body.unique); - return fail(self, unique, Errno::EAGAIN as i32); - } - - Destroy => { - self.destroy.notify_one(); - return Ok(()); - } - - BatchForget { forgets, .. } => { - let forgets = forgets - .iter() - .map(|target| (Ino(target.ino), target.nlookup)); - - self.forget(forgets).await; - return Ok(()); - } - - _ => (), - } - - // Some operations are handled while self.known is locked - let inode = { - let known = self.known.lock().unwrap(); - let inode = match known.get(&ino) { - Some((farc, _)) => farc, - None => { - log::error!( - "Lookup count for ino {} reached zero while still \ - known to the kernel, this is a bug", - ino - ); - - return fail(self, unique, Errno::ENOANO as i32); - } - }; - - match request.body() { - Getattr(_) => { - //TODO: Getattr flags - let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode); - let attrs = attrs.finish::<Fs>(inode); - drop(known); - - let out = proto::AttrOut { - attr_valid: ttl.seconds, - attr_valid_nsec: ttl.nanoseconds, - dummy: Default::default(), - attr: attrs, - }; - - return ok(self, unique, OutputChain::tail(&[bytes_of(&out)])); - } - - Access(body) => { - return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result() - } - - _ => inode.clone(), - } - }; - - macro_rules! inode_op { - ($op:ident, $($exprs:expr),+) => { - <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await - }; - } - - // These operations involve a Farc cloned from self.known - let done = match request.body() { - Lookup { name } => inode_op!(lookup, *name), - Readlink => inode_op!(readlink, ()), - Open(body) => { - let mut flags = proto::OpenOutFlags::empty(); - if <Fs as Fuse>::Inode::direct_io(&inode) { - flags |= proto::OpenOutFlags::DIRECT_IO; - } - - inode_op!(open, *body, (ino, flags)) - } - Opendir(body) => inode_op!(opendir, *body), - Readdir(body) => inode_op!(readdir, *body), - - _ => return fail(self, unique, Errno::ENOSYS as i32), - }; - - done.into_result() - } - - async fn forget<I>(&self, targets: I) - where - I: Iterator<Item = (Ino, u64)>, - { - let mut known = self.known.lock().unwrap(); - - for (ino, subtracted) in targets { - use hash_map::Entry::*; - - match known.entry(ino) { - Occupied(mut entry) => { - let (_, count) = entry.get_mut(); - - *count = count.saturating_sub(subtracted); - if *count > 0 { - continue; - } - - entry.remove(); - } - - Vacant(_) => { - log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino); - continue; - } - } - } - } - - async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming> { - use InputBufferStorage::*; - - let permit = Arc::clone(&self.input_semaphore) - .acquire_owned() - .await - .unwrap(); - - let mut incoming = Incoming { - buffer: InputBuffer { - bytes: 0, - data: Sbo(SboStorage([0; SBO_SIZE])), - }, - }; - - let sbo = match &mut incoming.buffer.data { - Sbo(SboStorage(sbo)) => sbo, - _ => unreachable!(), - }; - - loop { - let mut readable = self.session_fd.readable().await?; - - let mut large_buffers = self.large_buffers.lock().unwrap(); - let large_buffer = large_buffers.last_mut().unwrap(); - - let mut io_vecs = [ - IoVec::from_mut_slice(sbo), - IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]), - ]; - - let mut read = |fd: &AsyncFd<RawFd>| readv(*fd.get_ref(), &mut io_vecs); - match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { - Ok(Ok(bytes)) => { - if bytes > SBO_SIZE { - (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo); - incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit); - } - - incoming.buffer.bytes = bytes; - return Ok(incoming); - } - - // Interrupted - Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue, - - Ok(Err(error)) => return Err(error), - Err(_) => continue, - } - } - } - fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { let after_header: usize = output .iter() @@ -479,81 +226,261 @@ impl<Fs: Fuse> Session<Fs> { } } +impl<'o> Dispatch<'o> { + pub fn op(self) -> Op<'o> { + use Dispatch::*; + + let common = match self { + Lookup(incoming) => incoming.common, + Getattr(incoming) => incoming.common, + Readlink(incoming) => incoming.common, + Read(incoming) => incoming.common, + Write(incoming) => incoming.common, + Statfs(incoming) => incoming.common, + Readdir(incoming) => incoming.common, + Access(incoming) => incoming.common, + Destroy(incoming) => incoming.common, + }; + + common.into_generic_op() + } +} + +impl Endpoint<'_> { + pub async fn receive<'a, F, Fut>(&'a mut self, dispatcher: F) -> FuseResult<()> + where + F: FnOnce(Dispatch<'a>) -> Fut, + Fut: Future<Output = Done<'a>>, + { + let buffer = &mut self.local_buffer.0; + let bytes = loop { + let mut readable = self.session.session_fd.readable().await?; + let mut read = |fd: &AsyncFd<RawFd>| read(*fd.get_ref(), buffer); + + let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { + Ok(result) => result, + Err(_) => continue, + }; + + match result { + // Interrupted + Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue, + + result => break result, + } + }; + + let (header, opcode) = proto::InHeader::from_bytes(&buffer[..bytes?])?; + let common = IncomingCommon { + session: self.session, + buffer: &mut self.local_buffer, + header, + }; + + let dispatch = { + use proto::Opcode::*; + + macro_rules! dispatch { + ($op:ident) => { + Dispatch::$op(Incoming { + common, + _phantom: PhantomData, + }) + }; + } + + match opcode { + Lookup => dispatch!(Lookup), + Getattr => dispatch!(Getattr), + Readlink => dispatch!(Readlink), + Read => dispatch!(Read), + Write => dispatch!(Write), + Statfs => dispatch!(Statfs), + Readdir => dispatch!(Readdir), + Access => dispatch!(Access), + Destroy => dispatch!(Destroy), + + _ => { + log::warn!("Not implemented: {}", common.header); + + let (_request, reply) = common.into_generic_op(); + let _ = reply.not_implemented(); + + return Ok(()); + } + } + }; + + let _ = dispatcher(dispatch).await; + Ok(()) + } +} + impl Start { - pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> { + pub async fn start(self) -> FuseResult<Arc<Session>> { let session_fd = self.session_fd.into_raw_fd(); let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); - let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize; - let pages_per_buffer = fs.request_buffer_pages().get(); - let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap(); - assert!(bytes_per_buffer >= proto::MIN_READ_SIZE); + let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); - let mut large_buffers = Vec::with_capacity(fs.request_buffers().get()); - for _ in 0..large_buffers.capacity() { - large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice()); - } + let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO + let buffer_count = SHARED_BUFFERS; //TODO + let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages)) + .take(buffer_count) + .collect(); - let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); let mut session = Session { _fusermount_fd: self.fusermount_fd, session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, - proto_minor: 0, // Set by Session::do_handshake() - fs, - input_semaphore: Arc::new(Semaphore::new(large_buffers.len())), - large_buffers: Mutex::new(large_buffers), - known: Mutex::new(HashMap::new()), - destroy: Notify::new(), interrupt_tx, + buffers: Mutex::new(buffers), + buffer_semaphore: Arc::new(Semaphore::new(buffer_count)), + proto_minor: 0, // Set by Session::do_handshake() + buffer_pages, }; - loop { - let state = session - .do_handshake(pages_per_buffer, bytes_per_buffer) - .await?; + let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap(); - if let Handshake::Done = state { + loop { + if let Handshake::Done = session.handshake(&mut init_buffer).await? { + session.buffers.get_mut().unwrap().push(init_buffer); break Ok(Arc::new(session)); } } } -} -enum Handshake { - Done, - Restart, + pub(crate) fn new(fusermount_fd: DumbFd, session_fd: DumbFd) -> Self { + Start { + fusermount_fd, + session_fd, + } + } } -struct Incoming { - buffer: InputBuffer, +impl<'o, O: Operation<'o>> Incoming<'o, O> { + pub fn op(self) -> Result<Op<'o, O>, Done<'o>> { + try_op( + self.common.session, + &self.common.buffer.0, + self.common.header, + ) + } } -struct InputBuffer { - pub bytes: usize, - pub data: InputBufferStorage, +impl<O: for<'o> Operation<'o>> Incoming<'_, O> { + pub async fn owned(self) -> Owned<O> { + let session = self.common.session; + + let (buffer, permit) = { + let semaphore = Arc::clone(&session.buffer_semaphore); + let permit = semaphore + .acquire_owned() + .await + .expect("Buffer semaphore error"); + + let mut buffers = session.buffers.lock().unwrap(); + let mut buffer = buffers.pop().expect("Buffer semaphore out of sync"); + + std::mem::swap(&mut buffer, self.common.buffer); + (buffer, permit) + }; + + Owned { + session: Arc::clone(session), + buffer, + header: self.common.header, + _permit: permit, + _phantom: PhantomData, + } + } } -enum InputBufferStorage { - Sbo(SboStorage), - Spilled(Box<[u8]>, OwnedSemaphorePermit), +impl<O: for<'o> Operation<'o>> Owned<O> { + pub fn op(&self) -> Result<Op<'_, O>, Done<'_>> { + try_op(&self.session, &self.buffer.0, self.header) + } } -#[repr(align(8))] -struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]); +impl<O: for<'o> Operation<'o>> Drop for Owned<O> { + fn drop(&mut self) { + if let Ok(mut buffers) = self.session.buffers.lock() { + let empty = Buffer(Vec::new().into_boxed_slice()); + buffers.push(std::mem::replace(&mut self.buffer, empty)); + } + } +} -const SBO_SIZE: usize = std::mem::size_of::<SboStorage>(); const INTERRUPT_BROADCAST_CAPACITY: usize = 32; +const SHARED_BUFFERS: usize = 32; +const HEADER_END: usize = std::mem::size_of::<InHeader>(); -impl InputBuffer { - fn data(&self) -> &[u8] { - use InputBufferStorage::*; - let storage = match &self.data { - Sbo(sbo) => &sbo.0, - Spilled(buffer, _) => &buffer[..], +struct IncomingCommon<'o> { + session: &'o Arc<Session>, + buffer: &'o mut Buffer, + header: proto::InHeader, +} + +enum Handshake { + Done, + Restart, +} + +struct Buffer(Box<[u8]>); + +impl<'o> IncomingCommon<'o> { + fn into_generic_op(self) -> Op<'o> { + let request = Request { + header: self.header, + body: (), }; - &storage[..self.bytes] + let reply = Reply { + session: self.session, + unique: self.header.unique, + tail: (), + }; + + (request, reply) + } +} + +impl Buffer { + fn new(pages: usize) -> Self { + Buffer(vec![0; pages * page_size()].into_boxed_slice()) } } + +fn try_op<'o, O: Operation<'o>>( + session: &'o Session, + bytes: &'o [u8], + header: proto::InHeader, +) -> Result<Op<'o, O>, Done<'o>> { + let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize]) { + Ok(body) => body, + Err(error) => { + log::error!("Parsing request {}: {}", header, error); + let reply = Reply::<ops::Any> { + session, + unique: header.unique, + tail: (), + }; + + return Err(reply.io_error()); + } + }; + + let request = Request { header, body }; + let reply = Reply { + session, + unique: header.unique, + tail: Default::default(), + }; + + Ok((request, reply)) +} + +fn page_size() -> usize { + sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize +} |
