diff options
| author | Alejandro Soto <alejandro@34project.org> | 2021-12-27 00:44:23 -0600 |
|---|---|---|
| committer | Alejandro Soto <alejandro@34project.org> | 2021-12-28 19:43:44 -0600 |
| commit | a3212a0ba07da7bdae9e17637fbc237e2ae01c08 (patch) | |
| tree | 00be583beba0f321ebeea3af21582ce927943b44 | |
| parent | 311b2a40213aa48131a189f99dc4258d354c0c78 (diff) | |
Redesign the API around a user-provided main loop
This is basically a full library rewrite.
Diffstat (limited to '')
| -rw-r--r-- | Cargo.toml | 6 | ||||
| -rw-r--r-- | examples/ext2.rs | 268 | ||||
| -rw-r--r-- | src/fuse/fs.rs | 78 | ||||
| -rw-r--r-- | src/fuse/io.rs | 97 | ||||
| -rw-r--r-- | src/fuse/mod.rs | 69 | ||||
| -rw-r--r-- | src/fuse/mount.rs | 8 | ||||
| -rw-r--r-- | src/fuse/ops.rs | 332 | ||||
| -rw-r--r-- | src/fuse/session.rs | 705 | ||||
| -rw-r--r-- | src/lib.rs | 11 | ||||
| -rw-r--r-- | src/proto.rs | 329 |
10 files changed, 713 insertions, 1190 deletions
@@ -1,9 +1,10 @@ [package] name = "blown-fuse" -version = "0.1.0" +version = "0.2.0" authors = ["Alejandro Soto <alejandro@34project.org>"] edition = "2021" description = "Filesystem in Userspace" +license = "LGPL-3.0-or-later" [features] default = ["server", "mount"] @@ -12,11 +13,9 @@ mount = ["server"] client = [] [dependencies] -async-trait = "0.1.52" bitflags = "1.3.2" bytemuck = "1.7.3" bytemuck_derive = "1.0.1" -futures-util = "0.3.19" log = "0.4.14" nix = "0.23.1" num_enum = "0.5.5" @@ -27,5 +26,6 @@ tokio = { version = "1.15.0", features = ["rt", "net", "macros", "sync"] } [dev-dependencies] clap = "2.34.0" env_logger = "0.9.0" +futures-util = "0.3.19" tokio = { version = "1.15.0", features = ["rt-multi-thread"] } uuid = "0.8.2" diff --git a/examples/ext2.rs b/examples/ext2.rs index c1f4bce..9bcc2a6 100644 --- a/examples/ext2.rs +++ b/examples/ext2.rs @@ -1,7 +1,7 @@ /* Read-only ext2 (rev 1.0) implementation. * * This is not really async, since the whole backing storage - * is mmap()ed for simplicity, and then treated as a regular + * is mmap()ed for simplicity, and then treated as a static * slice (likely unsound, I don't care). Some yields are * springled in a few places in order to emulate true async * operations. @@ -9,8 +9,6 @@ * Reference: <https://www.nongnu.org/ext2-doc/ext2.html> */ -#![feature(arbitrary_self_types)] - #[cfg(target_endian = "big")] compile_error!("This example assumes a little-endian system"); @@ -32,45 +30,33 @@ use nix::{ }; use blown_fuse::{ - fs::Fuse, - io::{Attrs, Entry, FsInfo}, + io::{Attrs, Entry, FsInfo, Known}, mount::{mount_sync, Options}, - ops::{Init, Lookup, Readdir, Readlink, Statfs}, - Done, Ino, Reply, TimeToLive, + ops::{Getattr, Init, Lookup, Readdir, Readlink, Statfs}, + session::{Dispatch, Start}, + Done, FuseResult, Ino, Op, TimeToLive, }; -use async_trait::async_trait; use bytemuck::{cast_slice, from_bytes, try_from_bytes}; use bytemuck_derive::{Pod, Zeroable}; use clap::{App, Arg}; -use futures_util::stream::{self, Stream, TryStreamExt}; +use futures_util::stream::{self, Stream, StreamExt, TryStreamExt}; use smallvec::SmallVec; use tokio::{self, runtime::Runtime}; use uuid::Uuid; const EXT2_ROOT: Ino = Ino(2); -type Op<'o, O> = blown_fuse::Op<'o, Ext2, O>; - -#[derive(Copy, Clone)] -struct Farc { - ino: Ino, - inode: &'static Inode, -} - -impl std::ops::Deref for Farc { - type Target = Inode; - - fn deref(&self) -> &Self::Target { - self.inode - } -} - struct Ext2 { backing: &'static [u8], superblock: &'static Superblock, } +struct Resolved { + ino: Ino, + inode: &'static Inode, +} + #[derive(Pod, Zeroable, Copy, Clone)] #[repr(C)] struct Superblock { @@ -158,16 +144,16 @@ struct LinkedEntry { impl Ext2 { fn directory_stream( &self, - inode: Farc, + inode: &'static Inode, start: u64, - ) -> impl Stream<Item = Result<Entry<'static, Farc>, Errno>> + '_ { + ) -> impl Stream<Item = Result<Entry<&'static OsStr, Resolved>, Errno>> + '_ { stream::try_unfold(start, move |mut position| async move { loop { if position == inode.i_size as u64 { break Ok(None); // End of stream } - let bytes = self.seek_contiguous(&inode, position)?; + let bytes = self.seek_contiguous(inode, position)?; let (header, bytes) = bytes.split_at(size_of::<LinkedEntry>()); let header: &LinkedEntry = from_bytes(header); @@ -176,9 +162,14 @@ impl Ext2 { continue; // Unused entry } - let inode = self.inode(Ino(header.inode as u64))?; + let ino = Ino(header.inode as u64); let name = OsStr::from_bytes(&bytes[..header.name_len as usize]).into(); + let inode = Resolved { + ino, + inode: self.inode(ino)?, + }; + let entry = Entry { inode, name, @@ -191,7 +182,13 @@ impl Ext2 { }) } - fn inode(&self, Ino(ino): Ino) -> Result<Farc, Errno> { + fn inode(&self, ino: Ino) -> Result<&'static Inode, Errno> { + let Ino(ino) = match ino { + Ino::ROOT => EXT2_ROOT, + EXT2_ROOT => Ino::ROOT, + ino => ino, + }; + if ino == 0 { log::error!("Attempted to access the null (0) inode"); return Err(Errno::EIO); @@ -210,23 +207,21 @@ impl Ext2 { let start = index % inodes_per_block * inode_size; let end = start + size_of::<Inode>(); - Ok(Farc { - ino: Ino(ino), - inode: from_bytes(&self.block(block)?[start..end]), - }) + Ok(from_bytes(&self.block(block)?[start..end])) } - fn seek_contiguous(&self, inode: &Farc, position: u64) -> Result<&'static [u8], Errno> { + fn seek_contiguous( + &self, + inode: &'static Inode, + position: u64, + ) -> Result<&'static [u8], Errno> { let block_size = self.block_size(); let position = position as usize; let (direct, offset) = (position / block_size, position % block_size); - let out_of_bounds = || { - log::error!("Offset {} out of bounds in inode {}", position, inode.ino); - }; - + let out_of_bounds = || log::error!("Offset {} out of bounds", position); let chase = |indices: &[usize]| { - let root: &[u8] = cast_slice(&inode.inode.i_block); + let root: &[u8] = cast_slice(&inode.i_block); indices .iter() .try_fold(root, |ptrs, index| { @@ -304,12 +299,8 @@ impl Ext2 { } } -#[async_trait] -impl Fuse for Ext2 { - type Farc = Farc; - type Inode = Inode; - - async fn init<'o>(&self, reply: Reply<'o, Ext2, Init>) -> Done<'o> { +impl Ext2 { + async fn init<'o>(&self, (_, reply): Op<'o, Init>) -> Done<'o> { let label = &self.superblock.s_volume_name; let label = &label[..=label.iter().position(|byte| *byte == b'\0').unwrap_or(0)]; let label = CStr::from_bytes_with_nul(label) @@ -327,16 +318,11 @@ impl Fuse for Ext2 { log::info!("UUID: {}", Uuid::from_bytes(self.superblock.s_uuid)); log::info!("Label: {}", label.escape_debug()); - if let Ok(root) = self.inode(EXT2_ROOT) { - log::info!("Mounted successfully"); - reply.root(root) - } else { - log::error!("Failed to retrieve the root inode"); - reply.io_error() - } + log::info!("Mounted successfully"); + reply.ok() } - async fn statfs<'o>(&self, (_, reply, _): Op<'o, Statfs>) -> Done<'o> { + async fn statfs<'o>(&self, (_, reply): Op<'o, Statfs>) -> Done<'o> { let total_blocks = self.superblock.s_blocks_count as u64; let free_blocks = self.superblock.s_free_blocks_count as u64; let available_blocks = free_blocks - self.superblock.s_r_blocks_count as u64; @@ -355,65 +341,21 @@ impl Fuse for Ext2 { .filenames(255), ) } -} - -#[async_trait] -impl blown_fuse::fs::Inode for Inode { - type Fuse = Ext2; - fn ino(self: &Farc) -> Ino { - match self.ino { - Ino::ROOT => EXT2_ROOT, - EXT2_ROOT => Ino::ROOT, - ino => ino, - } - } - - fn inode_type(self: &Farc) -> Type { - let inode_type = self.i_mode >> 12; - match inode_type { - 0x01 => Type::Fifo, - 0x02 => Type::CharacterDevice, - 0x04 => Type::Directory, - 0x06 => Type::BlockDevice, - 0x08 => Type::File, - 0x0A => Type::Symlink, - 0x0C => Type::Socket, + async fn getattr<'o>(&self, (request, reply): Op<'o, Getattr>) -> Done<'o> { + let ino = request.ino(); + let (reply, inode) = reply.fallible(self.inode(ino))?; - _ => { - log::error!("Inode {} has invalid type {:x}", self.ino, inode_type); - Type::File - } - } + reply.known(&Resolved { ino, inode }) } - fn attrs(self: &Farc) -> (Attrs, TimeToLive) { - let (access, modify, change) = { - let time = |seconds: u32| (UNIX_EPOCH + Duration::from_secs(seconds.into())).into(); - (time(self.i_atime), time(self.i_mtime), time(self.i_ctime)) - }; - - let attrs = Attrs::default() - .size((self.i_dir_acl as u64) << 32 | self.i_size as u64) - .owner( - Uid::from_raw(self.i_uid.into()), - Gid::from_raw(self.i_gid.into()), - ) - .mode(Mode::from_bits_truncate(self.i_mode.into())) - .blocks(self.i_blocks.into(), 512) - .times(access, modify, change) - .links(self.i_links_count.into()); - - (attrs, TimeToLive::MAX) - } - - async fn lookup<'o>(self: Farc, (request, reply, session): Op<'o, Lookup>) -> Done<'o> { - let fs = session.fs(); + async fn lookup<'o>(&self, (request, reply): Op<'o, Lookup>) -> Done<'o> { let name = request.name(); + let (reply, parent) = reply.fallible(self.inode(request.ino()))?; //TODO: Indexed directories - let lookup = async move { - let stream = fs.directory_stream(self, 0); + let lookup = async { + let stream = self.directory_stream(parent, 0); tokio::pin!(stream); loop { @@ -429,35 +371,38 @@ impl blown_fuse::fs::Inode for Inode { let (reply, inode) = reply.fallible(result)?; if let Some(inode) = inode { - reply.found(&inode, TimeToLive::MAX) + reply.found(inode, TimeToLive::MAX) } else { reply.not_found(TimeToLive::MAX) } } - async fn readlink<'o>(self: Farc, (_, reply, session): Op<'o, Readlink>) -> Done<'o> { - if Inode::inode_type(&self) != Type::Symlink { + async fn readlink<'o>(&self, (request, reply): Op<'o, Readlink>) -> Done<'o> { + let ino = request.ino(); + let (reply, inode) = reply.fallible(self.inode(ino))?; + + let resolved = Resolved { ino, inode }; + if resolved.inode_type() != Type::Symlink { return reply.invalid_argument(); } - let size = self.i_size as usize; + let size = inode.i_size as usize; if size < size_of::<[u32; 15]>() { - return reply.target(OsStr::from_bytes(&cast_slice(&self.i_block)[..size])); + return reply.target(OsStr::from_bytes(&cast_slice(&inode.i_block)[..size])); } - let fs = session.fs(); let segments = async { /* This is unlikely to ever spill, and is guaranteed not to * do so for valid symlinks on any fs where block_size >= 4096. */ - let mut segments = SmallVec::<[&OsStr; 1]>::new(); + let mut segments = SmallVec::<[&[u8]; 1]>::new(); let (mut size, mut offset) = (size, 0); while size > 0 { - let segment = fs.seek_contiguous(&self, offset)?; + let segment = self.seek_contiguous(inode, offset)?; let segment = &segment[..segment.len().min(size)]; - segments.push(OsStr::from_bytes(segment)); + segments.push(segment); size -= segment.len(); offset += segment.len() as u64; @@ -470,9 +415,94 @@ impl blown_fuse::fs::Inode for Inode { reply.gather_target(&segments) } - async fn readdir<'o>(self: Farc, (request, reply, session): Op<'o, Readdir>) -> Done<'o> { - let stream = session.fs().directory_stream(self, request.offset()); - reply.try_stream(stream).await? + async fn readdir<'o>(&self, (request, reply): Op<'o, Readdir>) -> Done<'o> { + let (mut reply, inode) = reply.fallible(self.inode(request.ino()))?; + + let stream = self.directory_stream(inode, request.offset()); + tokio::pin!(stream); + + while let Some(entry) = stream.next().await { + let (next_reply, entry) = reply.fallible(entry)?; + let (next_reply, ()) = next_reply.entry(entry)?; + reply = next_reply; + } + + reply.end() + } +} + +impl Known for Resolved { + fn ino(&self) -> Ino { + self.ino + } + + fn inode_type(&self) -> Type { + let inode_type = self.inode.i_mode >> 12; + match inode_type { + 0x01 => Type::Fifo, + 0x02 => Type::CharacterDevice, + 0x04 => Type::Directory, + 0x06 => Type::BlockDevice, + 0x08 => Type::File, + 0x0A => Type::Symlink, + 0x0C => Type::Socket, + + _ => { + log::error!("Inode {} has invalid type {:x}", self.ino, inode_type); + Type::File + } + } + } + + fn attrs(&self) -> (Attrs, TimeToLive) { + let inode = self.inode; + let (access, modify, create) = { + let time = |seconds: u32| (UNIX_EPOCH + Duration::from_secs(seconds.into())).into(); + let (atime, mtime, ctime) = (inode.i_atime, inode.i_mtime, inode.i_ctime); + + (time(atime), time(mtime), time(ctime)) + }; + + let attrs = Attrs::default() + .size((inode.i_dir_acl as u64) << 32 | inode.i_size as u64) + .owner( + Uid::from_raw(inode.i_uid.into()), + Gid::from_raw(inode.i_gid.into()), + ) + .mode(Mode::from_bits_truncate(inode.i_mode.into())) + .blocks(inode.i_blocks.into(), 512) + .times(access, modify, create) + .links(inode.i_links_count.into()); + + (attrs, TimeToLive::MAX) + } + + fn unveil(self) {} +} + +async fn main_loop(session: Start, fs: Ext2) -> FuseResult<()> { + let session = session.start().await?; + let mut endpoint = session.endpoint(); + + loop { + let result = endpoint.receive(|dispatch| async { + use Dispatch::*; + + match dispatch { + Statfs(statfs) => fs.statfs(statfs.op()?).await, + Getattr(getattr) => fs.getattr(getattr.op()?).await, + Lookup(lookup) => fs.lookup(lookup.op()?).await, + Readlink(readlink) => fs.readlink(readlink.op()?).await, + Readdir(readdir) => fs.readdir(readdir.op()?).await, + + dispatch => { + let (_, reply) = dispatch.op(); + reply.not_implemented() + } + } + }); + + result.await?; } } @@ -553,5 +583,5 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { superblock, }; - Ok(Runtime::new()?.block_on(async { session.start(fs).await?.main_loop().await })?) + Ok(Runtime::new()?.block_on(main_loop(session, fs))?) } diff --git a/src/fuse/fs.rs b/src/fuse/fs.rs deleted file mode 100644 index 0c39ea7..0000000 --- a/src/fuse/fs.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::{Ino, TimeToLive}; -use async_trait::async_trait; -use std::{num::NonZeroUsize, ops::Deref, sync::Arc}; - -use super::{ - io::{Attrs, EntryType}, - ops::*, - Done, Op, Reply, -}; - -#[async_trait] -pub trait Fuse: Sized + Send + Sync + 'static { - type Inode: Inode<Fuse = Self> + ?Sized; - type Farc: Deref<Target = Self::Inode> + Clone + Send + Sync = Arc<Self::Inode>; - - async fn init<'o>(&self, reply: Reply<'o, Self, Init>) -> Done<'o>; - - async fn statfs<'o>(&self, (_, reply, _): Op<'o, Self, Statfs>) -> Done<'o> { - reply.not_implemented() - } - - fn request_buffers(&self) -> NonZeroUsize { - NonZeroUsize::new(16).unwrap() - } - - fn request_buffer_pages(&self) -> NonZeroUsize { - NonZeroUsize::new(4).unwrap() - } -} - -#[async_trait] -pub trait Inode: Send + Sync { - type Fuse: Fuse<Inode = Self>; - - fn ino(self: &FarcTo<Self>) -> Ino; - fn attrs(self: &FarcTo<Self>) -> (Attrs, TimeToLive); - fn inode_type(self: &FarcTo<Self>) -> EntryType; - - fn direct_io(self: &FarcTo<Self>) -> bool { - false - } - - fn access<'o>(self: &FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Access>) -> Done<'o> { - reply.not_implemented() - } - - async fn lookup<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Lookup>) -> Done<'o> { - reply.not_implemented() - } - - async fn readlink<'o>( - self: FarcTo<Self>, - (_, reply, _): Op<'o, Self::Fuse, Readlink>, - ) -> Done<'o> { - reply.not_implemented() - } - - async fn open<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Open>) -> Done<'o> { - // Calling not_implemented() here would ignore direct_io() and similar flags - reply.ok() - } - - async fn opendir<'o>( - self: FarcTo<Self>, - (_, reply, _): Op<'o, Self::Fuse, Opendir>, - ) -> Done<'o> { - reply.not_implemented() - } - - async fn readdir<'o>( - self: FarcTo<Self>, - (_, reply, _): Op<'o, Self::Fuse, Readdir>, - ) -> Done<'o> { - reply.not_implemented() - } -} - -pub type FarcTo<I> = <<I as Inode>::Fuse as Fuse>::Farc; diff --git a/src/fuse/io.rs b/src/fuse/io.rs index 305f0ef..5f85e4d 100644 --- a/src/fuse/io.rs +++ b/src/fuse/io.rs @@ -2,19 +2,14 @@ use bytemuck::Zeroable; use nix::{errno::Errno, sys::stat::SFlag}; use std::{ - borrow::Cow, convert::Infallible, - ffi::OsStr, future::Future, ops::{ControlFlow, FromResidual, Try}, }; -use crate::{proto, Ino, TimeToLive, Timestamp}; +use crate::{proto, FuseResult, Ino, TimeToLive, Timestamp}; -use super::{ - fs::{Fuse, Inode}, - session, Done, Operation, Reply, Request, -}; +use super::{Done, Operation, Reply, Request}; #[doc(no_inline)] pub use nix::{ @@ -23,24 +18,31 @@ pub use nix::{ unistd::{AccessFlags, Gid, Pid, Uid}, }; -pub enum Interruptible<'o, Fs: Fuse, O: Operation<'o, Fs>, T> { - Completed(Reply<'o, Fs, O>, T), +pub enum Interruptible<'o, O: Operation<'o>, T> { + Completed(Reply<'o, O>, T), Interrupted(Done<'o>), } +pub trait Known { + fn ino(&self) -> Ino; + fn inode_type(&self) -> EntryType; + fn attrs(&self) -> (Attrs, TimeToLive); + fn unveil(self); +} + #[derive(Clone)] pub struct Attrs(proto::Attrs); -pub struct Entry<'a, Ref> { +pub struct Entry<N, K> { pub offset: u64, - pub name: Cow<'a, OsStr>, - pub inode: Ref, + pub name: N, + pub inode: K, pub ttl: TimeToLive, } pub struct FsInfo(proto::StatfsOut); -impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> { +impl<'o, O: Operation<'o>> Request<'o, O> { pub fn ino(&self) -> Ino { Ino(self.header.ino) } @@ -62,13 +64,13 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> { } } -impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { - pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, Fs, O, T> +impl<'o, O: Operation<'o>> Reply<'o, O> { + pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, O, T> where F: Future<Output = T>, { tokio::pin!(f); - let mut rx = session::interrupt_rx(self.session); + let mut rx = self.session.interrupt_rx(); use Interruptible::*; loop { @@ -93,11 +95,9 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { } } - pub fn fail(mut self, errno: Errno) -> Done<'o> { - let errno = errno as i32; - O::consume_errno(errno, &mut self.tail); - - Done::from_result(session::fail(self.session, self.unique, errno)) + pub fn fail(self, errno: Errno) -> Done<'o> { + let result = self.session.fail(self.unique, errno as i32); + self.finish(result) } pub fn not_implemented(self) -> Done<'o> { @@ -115,14 +115,18 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { pub fn interrupted(self) -> Done<'o> { self.fail(Errno::EINTR) } + + pub(crate) fn finish(self, result: FuseResult<()>) -> Done<'o> { + if let Err(error) = result { + log::error!("Replying to request {}: {}", self.unique, error); + } + + Done::done() + } } -impl<'o, Fs, O> From<(Reply<'o, Fs, O>, Errno)> for Done<'o> -where - Fs: Fuse, - O: Operation<'o, Fs>, -{ - fn from((reply, errno): (Reply<'o, Fs, O>, Errno)) -> Done<'o> { +impl<'o, O: Operation<'o>> From<(Reply<'o, O>, Errno)> for Done<'o> { + fn from((reply, errno): (Reply<'o, O>, Errno)) -> Done<'o> { reply.fail(errno) } } @@ -142,12 +146,8 @@ impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> { } } -impl<'o, Fs, O> FromResidual<Interruptible<'o, Fs, O, Infallible>> for Done<'o> -where - Fs: Fuse, - O: Operation<'o, Fs>, -{ - fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { +impl<'o, O: Operation<'o>> FromResidual<Interruptible<'o, O, Infallible>> for Done<'o> { + fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { match residual { Interruptible::Completed(_, _) => unreachable!(), Interruptible::Interrupted(done) => done, @@ -168,13 +168,10 @@ impl Try for Done<'_> { } } -impl<'o, Fs, O, T> FromResidual<Interruptible<'o, Fs, O, Infallible>> - for Interruptible<'o, Fs, O, T> -where - Fs: Fuse, - O: Operation<'o, Fs>, +impl<'o, O: Operation<'o>, T> FromResidual<Interruptible<'o, O, Infallible>> + for Interruptible<'o, O, T> { - fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { + fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self { use Interruptible::*; match residual { @@ -184,13 +181,9 @@ where } } -impl<'o, Fs, O, T> Try for Interruptible<'o, Fs, O, T> -where - Fs: Fuse, - O: Operation<'o, Fs>, -{ - type Output = (Reply<'o, Fs, O>, T); - type Residual = Interruptible<'o, Fs, O, Infallible>; +impl<'o, O: Operation<'o>, T> Try for Interruptible<'o, O, T> { + type Output = (Reply<'o, O>, T); + type Residual = Interruptible<'o, O, Infallible>; fn from_output((reply, t): Self::Output) -> Self { Self::Completed(reply, t) @@ -234,14 +227,14 @@ impl Attrs { }) } - pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self { + pub fn times(self, access: Timestamp, modify: Timestamp, create: Timestamp) -> Self { Attrs(proto::Attrs { atime: access.seconds, mtime: modify.seconds, - ctime: change.seconds, + ctime: create.seconds, atimensec: access.nanoseconds, mtimensec: modify.nanoseconds, - ctimensec: change.nanoseconds, + ctimensec: create.nanoseconds, ..self.0 }) } @@ -253,9 +246,9 @@ impl Attrs { }) } - pub(crate) fn finish<Fs: Fuse>(self, inode: &Fs::Farc) -> proto::Attrs { - let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); - let inode_type = match <Fs as Fuse>::Inode::inode_type(inode) { + pub(crate) fn finish(self, inode: &impl Known) -> proto::Attrs { + let Ino(ino) = inode.ino(); + let inode_type = match inode.inode_type() { EntryType::Fifo => SFlag::S_IFIFO, EntryType::CharacterDevice => SFlag::S_IFCHR, EntryType::Directory => SFlag::S_IFDIR, diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs index 0e39e6b..5fe6f46 100644 --- a/src/fuse/mod.rs +++ b/src/fuse/mod.rs @@ -1,87 +1,46 @@ -use std::{ - collections::HashMap, - marker::PhantomData, - os::unix::io::RawFd, - sync::{Arc, Mutex}, -}; - -use tokio::{ - io::unix::AsyncFd, - sync::{broadcast, Notify, Semaphore}, -}; - -use crate::{proto, util::DumbFd, FuseResult, Ino}; +use crate::proto; +use std::marker::PhantomData; pub mod io; #[doc(cfg(feature = "server"))] -pub mod fs; - -#[doc(cfg(feature = "server"))] pub mod ops; #[doc(cfg(feature = "mount"))] pub mod mount; -mod session; -use fs::Fuse; - -#[doc(cfg(feature = "server"))] -pub struct Session<Fs: Fuse> { - _fusermount_fd: DumbFd, - session_fd: AsyncFd<RawFd>, - proto_minor: u32, - fs: Fs, - input_semaphore: Arc<Semaphore>, - large_buffers: Mutex<Vec<Box<[u8]>>>, - known: Mutex<HashMap<Ino, (Fs::Farc, u64)>>, - destroy: Notify, - interrupt_tx: broadcast::Sender<u64>, -} - -#[doc(cfg(feature = "server"))] -pub struct Start { - fusermount_fd: DumbFd, - session_fd: DumbFd, -} +pub mod session; mod private_trait { - pub trait Operation<'o, Fs: super::Fuse> { - type RequestBody = (); - type ReplyTail = (); - - fn consume_errno(_errno: i32, _tail: &mut Self::ReplyTail) {} + pub trait Operation<'o> { + type RequestBody: crate::proto::Structured<'o>; + type ReplyTail: Default; } } use private_trait::Operation; -#[doc(cfg(feature = "server"))] -pub type Op<'o, Fs, O> = (Request<'o, Fs, O>, Reply<'o, Fs, O>, &'o Arc<Session<Fs>>); +pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>); #[doc(cfg(feature = "server"))] -pub struct Request<'o, Fs: Fuse, O: Operation<'o, Fs>> { - header: &'o proto::InHeader, +pub struct Request<'o, O: Operation<'o>> { + header: proto::InHeader, body: O::RequestBody, } #[doc(cfg(feature = "server"))] -pub struct Reply<'o, Fs: Fuse, O: Operation<'o, Fs>> { - session: &'o Session<Fs>, +pub struct Reply<'o, O: Operation<'o>> { + session: &'o session::Session, unique: u64, tail: O::ReplyTail, } #[must_use] #[doc(cfg(feature = "server"))] -pub struct Done<'o>(FuseResult<PhantomData<&'o ()>>); +pub struct Done<'o>(PhantomData<*mut &'o ()>); impl Done<'_> { - fn from_result(result: FuseResult<()>) -> Self { - Done(result.map(|()| PhantomData)) - } - - fn into_result(self) -> FuseResult<()> { - self.0.map(|PhantomData| ()) + fn done() -> Self { + Done(PhantomData) } } diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs index 955b28a..6464372 100644 --- a/src/fuse/mount.rs +++ b/src/fuse/mount.rs @@ -17,7 +17,7 @@ use nix::{ use quick_error::quick_error; -use super::Start; +use super::session::Start; use crate::util::DumbFd; quick_error! { @@ -146,10 +146,8 @@ where Ok(session_fd) => { let fusermount_fd = DumbFd(left_side.into_raw_fd()); let session_fd = DumbFd(session_fd); - Ok(Start { - fusermount_fd, - session_fd, - }) + + Ok(Start::new(fusermount_fd, session_fd)) } Err(error) => { diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs index bbd49fe..9bca233 100644 --- a/src/fuse/ops.rs +++ b/src/fuse/ops.rs @@ -1,9 +1,6 @@ use bytemuck::{bytes_of, Pod, Zeroable}; -use futures_util::stream::{Stream, StreamExt, TryStreamExt}; -use nix::sys::stat::SFlag; use std::{ - borrow::Borrow, ffi::{CStr, OsStr}, os::unix::ffi::OsStrExt, }; @@ -11,56 +8,61 @@ use std::{ use crate::{proto, util::OutputChain, Errno, Ino, TimeToLive}; use super::{ - fs::{Fuse, Inode}, - io::{AccessFlags, Entry, EntryType, FsInfo}, - session, Done, Operation, Reply, Request, + io::{AccessFlags, Entry, FsInfo, Interruptible, Known}, + Done, Operation, Reply, Request, }; macro_rules! op { - { $name:ident $operation:tt $(,)+ } => { + { $name:ident $operation:tt } => { pub struct $name(()); - impl<'o, Fs: Fuse> Operation<'o, Fs> for $name $operation + impl<'o> Operation<'o> for $name $operation }; - { $name:ident $operation:tt, Request $request:tt $($next:tt)+ } => { - impl<'o, Fs: Fuse> Request<'o, Fs, $name> $request + { $name:ident $operation:tt impl Request $request:tt $($next:tt)* } => { + impl<'o> Request<'o, $name> $request - op! { $name $operation $($next)+ } + op! { $name $operation $($next)* } }; - { $name:ident $operation:tt, Reply $reply:tt $($next:tt)+ } => { - impl<'o, Fs: Fuse> Reply<'o, Fs, $name> $reply + { $name:ident $operation:tt impl Reply $reply:tt $($next:tt)* } => { + impl<'o> Reply<'o, $name> $reply - op! { $name $operation $($next)+ } + op! { $name $operation $($next)* } }; } op! { + Any { + type RequestBody = (); + type ReplyTail = (); + } +} + +op! { Lookup { - // name() - type RequestBody = &'o CStr; - }, + type RequestBody = &'o CStr; // name() + type ReplyTail = (); + } - Request { + impl Request { /// Returns the name of the entry being looked up in this directory. pub fn name(&self) -> &OsStr { c_to_os(self.body) } - }, - - Reply { - /// The requested entry was found and a `Farc` was successfully determined from it. The - /// FUSE client will become aware of the found inode if it wasn't before. This result may - /// be cached by the client for up to the given TTL. - pub fn found(self, entry: &Fs::Farc, ttl: TimeToLive) -> Done<'o> { - let (attrs, attrs_ttl) = <Fs as Fuse>::Inode::attrs(entry); - session::unveil(self.session, entry); - - self.single(&make_entry( - (<Fs as Fuse>::Inode::ino(entry), ttl), - (attrs.finish::<Fs>(entry), attrs_ttl), - )) + } + + impl Reply { + /// The requested entry was found. The FUSE client will become aware of the found inode if + /// it wasn't before. This result may be cached by the client for up to the given TTL. + pub fn found(self, entry: impl Known, ttl: TimeToLive) -> Done<'o> { + let (attrs, attrs_ttl) = entry.attrs(); + let attrs = attrs.finish(&entry); + + let done = self.single(&make_entry((entry.ino(), ttl), (attrs, attrs_ttl))); + entry.unveil(); + + done } /// The requested entry was not found in this directory. The FUSE clint may include this @@ -75,13 +77,37 @@ op! { pub fn not_found_uncached(self) -> Done<'o> { self.fail(Errno::ENOENT) } - }, + } +} + +op! { + Getattr { + type RequestBody = &'o proto::GetattrIn; + type ReplyTail = (); + } + + impl Reply { + pub fn known(self, inode: &impl Known) -> Done<'o> { + let (attrs, ttl) = inode.attrs(); + let attrs = attrs.finish(inode); + + self.single(&proto::AttrOut { + attr_valid: ttl.seconds, + attr_valid_nsec: ttl.nanoseconds, + dummy: Default::default(), + attr: attrs, + }) + } + } } op! { - Readlink {}, + Readlink { + type RequestBody = (); + type ReplyTail = (); + } - Reply { + impl Reply { /// This inode corresponds to a symbolic link pointing to the given target path. pub fn target(self, target: &OsStr) -> Done<'o> { self.chain(OutputChain::tail(&[target.as_bytes()])) @@ -89,230 +115,129 @@ op! { /// Same as [`Reply::target()`], except that the target path is taken from disjoint /// slices. This involves no additional allocation. - pub fn gather_target(self, target: &[&OsStr]) -> Done<'o> { - //FIXME: Likely UB - self.chain(OutputChain::tail(unsafe { std::mem::transmute(target) })) + pub fn gather_target(self, target: &[&[u8]]) -> Done<'o> { + self.chain(OutputChain::tail(target)) } - }, + } } op! { Open { type RequestBody = &'o proto::OpenIn; - type ReplyTail = (Ino, proto::OpenOutFlags); - }, + type ReplyTail = private::OpenFlags; + } + + impl Reply { + pub fn force_direct_io(&mut self) { + self.tail.0 |= proto::OpenOutFlags::DIRECT_IO; + } - Reply { /// The inode may now be accessed. pub fn ok(self) -> Done<'o> { self.ok_with_handle(0) } fn ok_with_handle(self, handle: u64) -> Done<'o> { - let (_, flags) = self.tail; + let open_flags = self.tail.0.bits(); + self.single(&proto::OpenOut { fh: handle, - open_flags: flags.bits(), + open_flags, padding: Default::default(), }) } - }, + } } -op! { Read {}, } -/*op! { +op! { Read { - type RequestBody = &'o proto::ReadIn; - type ReplyTail = &'o mut OutputBytes<'o>; - }, - - Request { - pub fn offset(&self) -> u64 { - self.body.offset - } - - pub fn size(&self) -> u32 { - self.body.size - } - }, - - Reply { - pub fn remaining(&self) -> u64 { - self.tail.remaining() - } - - pub fn end(self) -> Done<'o> { - if self.tail.ready() { - self.chain(OutputChain::tail(self.tail.segments())) - } else { - // The read() handler will be invoked again with same OutputBytes - self.done() - } - } - - pub fn hole(self, size: u64) -> Result<Self, Done<'o>> { - self.tail - } - - pub fn copy(self, data: &[u8]) -> Result<Self, Done<'o>> { - self.self_or_done(self.tail.copy(data)) - } - - pub fn put(self, data: &'o [u8]) -> Result<Self, Done<'o>> { - self.self_or_done(self.tail.put(data)) - } - - pub fn gather(self, data: &'o [&'o [u8]]) -> Result<Self, Done<'o>> { - self.self_or_done(self.tail.gather(data)) - } - - fn self_or_done(self, capacity: OutputCapacity) -> Result<Self, Done<'o>> { - match capacity { - OutputCapacity::Available => Ok(self), - OutputCapacity::Filled => Err(self.done()), - } - } - }, -}*/ + type RequestBody = (); + type ReplyTail = (); + } +} op! { Write { type RequestBody = &'o proto::WriteIn; - }, + type ReplyTail = (); + } } op! { Init { - type ReplyTail = &'o mut Result<Fs::Farc, i32>; + type RequestBody = &'o proto::InitIn; + type ReplyTail = (); + } - fn consume_errno(errno: i32, tail: &mut Self::ReplyTail) { - **tail = Err(errno); - } - }, - - Reply { - /// Server-side initialization succeeded. The provided `Farc` references the filesystem's - /// root inode. - pub fn root(self, root: Fs::Farc) -> Done<'o> { - *self.tail = Ok(root); - self.done() + impl Reply { + pub fn ok(self) -> Done<'o> { + self.nop() } - }, + } } op! { - Statfs {}, + Statfs { + type RequestBody = (); + type ReplyTail = (); + } - Reply { + impl Reply { /// Replies with filesystem statistics. pub fn info(self, statfs: FsInfo) -> Done<'o> { let statfs: proto::StatfsOut = statfs.into(); self.single(&statfs) } - }, + } } op! { Opendir { type RequestBody = &'o proto::OpendirIn; - }, + type ReplyTail = (); + } } op! { Readdir { type RequestBody = &'o proto::ReaddirIn; - }, + type ReplyTail = (); + } - Request { + impl Request { /// Returns the base offset in the directory stream to read from. pub fn offset(&self) -> u64 { self.body.read_in.offset } - }, + } - Reply { - pub fn try_iter<'a, I, E, Ref>( - self, - mut entries: I, - ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> + impl Reply { + pub fn entry<N>(self, inode: Entry<N, impl Known>) -> Interruptible<'o, Readdir, ()> where - I: Iterator<Item = Result<Entry<'a, Ref>, E>> + Send, - Ref: Borrow<Fs::Farc>, + N: AsRef<OsStr>, { - //TODO: This is about as shitty as it gets - match entries.next().transpose() { - Ok(Some(entry)) => { - let Entry { - name, - inode, - offset, - .. - } = entry; - - let inode = inode.borrow(); - let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); - - let dirent = proto::Dirent { - ino, - off: offset, - namelen: name.len() as u32, - entry_type: (match <Fs as Fuse>::Inode::inode_type(inode) { - EntryType::Fifo => SFlag::S_IFIFO, - EntryType::CharacterDevice => SFlag::S_IFCHR, - EntryType::Directory => SFlag::S_IFDIR, - EntryType::BlockDevice => SFlag::S_IFBLK, - EntryType::File => SFlag::S_IFREG, - EntryType::Symlink => SFlag::S_IFLNK, - EntryType::Socket => SFlag::S_IFSOCK, - }) - .bits() - >> 12, - }; - - let dirent = bytes_of(&dirent); - let name = name.as_bytes(); - - let padding = [0; 8]; - let padding = &padding[..7 - (dirent.len() + name.len() - 1) % 8]; - - Ok(self.chain(OutputChain::tail(&[dirent, name, padding]))) - } - - Err(error) => Err((self, error)), - - Ok(None) => Ok(self.empty()), - } + todo!() } - // See rust-lang/rust#61949 - pub async fn try_stream<'a, S, E, Ref>( - self, - entries: S, - ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> - where - S: Stream<Item = Result<Entry<'a, Ref>, E>> + Send, - Ref: Borrow<Fs::Farc> + Send, - E: Send, - { - //TODO: This is about as shitty as it gets - let first = entries.boxed().try_next().await; - self.try_iter(first.transpose().into_iter()) + pub fn end(self) -> Done<'o> { + todo!() } - }, + } } op! { Access { type RequestBody = &'o proto::AccessIn; - }, + type ReplyTail = (); + } - Request { + impl Request { pub fn mask(&self) -> AccessFlags { AccessFlags::from_bits_truncate(self.body.mask as i32) } - }, + } - Reply { + impl Reply { pub fn ok(self) -> Done<'o> { self.empty() } @@ -320,12 +245,32 @@ op! { pub fn permission_denied(self) -> Done<'o> { self.fail(Errno::EACCES) } - }, + } +} + +op! { + Destroy { + type RequestBody = (); + type ReplyTail = (); + } +} + +mod private { + use crate::proto; + + #[derive(Copy, Clone)] + pub struct OpenFlags(pub proto::OpenOutFlags); + + impl Default for OpenFlags { + fn default() -> Self { + OpenFlags(proto::OpenOutFlags::empty()) + } + } } -impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { - fn done(self) -> Done<'o> { - Done::from_result(Ok(())) +impl<'o, O: Operation<'o>> Reply<'o, O> { + fn nop(self) -> Done<'o> { + Done::done() } fn empty(self) -> Done<'o> { @@ -337,7 +282,8 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { } fn chain(self, chain: OutputChain<'_>) -> Done<'o> { - Done::from_result(session::ok(self.session, self.unique, chain)) + let result = self.session.ok(self.unique, chain); + self.finish(result) } } diff --git a/src/fuse/session.rs b/src/fuse/session.rs index 8975c57..5045099 100644 --- a/src/fuse/session.rs +++ b/src/fuse/session.rs @@ -1,148 +1,121 @@ use std::{ - collections::{hash_map, HashMap}, convert::TryInto, + future::Future, io, + marker::PhantomData, os::unix::io::{IntoRawFd, RawFd}, sync::{Arc, Mutex}, }; use nix::{ fcntl::{fcntl, FcntlArg, OFlag}, - sys::uio::{readv, writev, IoVec}, - unistd::{sysconf, SysconfVar}, + sys::uio::{writev, IoVec}, + unistd::{read, sysconf, SysconfVar}, }; use tokio::{ io::unix::AsyncFd, - sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore}, + sync::{broadcast, OwnedSemaphorePermit, Semaphore}, }; -use bytemuck::{bytes_of, try_from_bytes}; +use bytemuck::bytes_of; use smallvec::SmallVec; use crate::{ - proto::{self, InHeader}, - util::{display_or, OutputChain}, - Errno, FuseError, FuseResult, Ino, + proto::{self, InHeader, Structured}, + util::{DumbFd, OutputChain}, + Errno, FuseError, FuseResult, }; -use super::{ - fs::{Fuse, Inode}, - Reply, Request, Session, Start, -}; +use super::{ops, Done, Op, Operation, Reply, Request}; -pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { - session.send(unique, 0, output) +pub struct Start { + fusermount_fd: DumbFd, + session_fd: DumbFd, } -pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> { - if errno <= 0 { - log::warn!( - "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", - unique, - errno - ); +pub struct Session { + _fusermount_fd: DumbFd, + session_fd: AsyncFd<RawFd>, + interrupt_tx: broadcast::Sender<u64>, + buffers: Mutex<Vec<Buffer>>, + buffer_semaphore: Arc<Semaphore>, + proto_minor: u32, + buffer_pages: usize, +} - errno = Errno::ENOMSG as i32; - } +pub struct Endpoint<'a> { + session: &'a Arc<Session>, + local_buffer: Buffer, +} - session.send(unique, -errno, OutputChain::empty()) +pub enum Dispatch<'o> { + Lookup(Incoming<'o, ops::Lookup>), + Getattr(Incoming<'o, ops::Getattr>), + Readlink(Incoming<'o, ops::Readlink>), + Read(Incoming<'o, ops::Read>), + Write(Incoming<'o, ops::Write>), + Statfs(Incoming<'o, ops::Statfs>), + Readdir(Incoming<'o, ops::Readdir>), + Access(Incoming<'o, ops::Access>), + Destroy(Incoming<'o, ops::Destroy>), } -pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) { - let ino = <Fs as Fuse>::Inode::ino(inode); - let mut known = session.known.lock().unwrap(); +pub struct Incoming<'o, O: Operation<'o>> { + common: IncomingCommon<'o>, + _phantom: PhantomData<O>, +} - use hash_map::Entry::*; - match known.entry(ino) { - Occupied(entry) => { - let (_, count) = entry.into_mut(); - *count += 1; - } +pub struct Owned<O: for<'o> Operation<'o>> { + session: Arc<Session>, + buffer: Buffer, + header: InHeader, + _permit: OwnedSemaphorePermit, + _phantom: PhantomData<O>, +} - Vacant(entry) => { - entry.insert((Fs::Farc::clone(inode), 1)); +impl Session { + pub fn endpoint<'a>(self: &'a Arc<Self>) -> Endpoint<'a> { + Endpoint { + session: self, + local_buffer: Buffer::new(self.buffer_pages), } } -} -pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> { - session.interrupt_tx.subscribe() -} - -impl<Fs: Fuse> Session<Fs> { - pub fn fs(&self) -> &Fs { - &self.fs + pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { + self.send(unique, 0, output) } - pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> { - let this = Arc::clone(&self); - let main_loop = async move { - loop { - let incoming = this.receive().await; - let this = Arc::clone(&this); - - tokio::spawn(async move { - let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming { - Ok(mut incoming) => match this.dispatch(&mut incoming).await { - Ok(()) => (Ok(()), None), - - Err(error) => { - let data = incoming.buffer.data(); - let data = &data[..std::mem::size_of::<InHeader>().max(data.len())]; - (Err(error), try_from_bytes(data).ok().copied()) - } - }, - - Err(error) => (Err(error.into()), None), - }; - - let header = display_or(header, "(bad)"); - if let Err(error) = result { - log::error!("Handling request {}: {}", header, error); - } - }); - } - }; + pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> { + if errno <= 0 { + log::warn!( + "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", + unique, + errno + ); - tokio::select! { - () = main_loop => unreachable!(), - () = self.destroy.notified() => Ok(()), + errno = Errno::ENOMSG as i32; } - } - async fn do_handshake( - &mut self, - pages_per_buffer: usize, - bytes_per_buffer: usize, - ) -> FuseResult<Handshake> { - use FuseError::*; - - let buffer = { - self.session_fd.readable().await?.retain_ready(); - let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap(); - - let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE])); - let sbo = match &mut data { - InputBufferStorage::Sbo(SboStorage(sbo)) => sbo, - _ => unreachable!(), - }; + self.send(unique, -errno, OutputChain::empty()) + } - let mut io_vecs = [ - IoVec::from_mut_slice(sbo), - IoVec::from_mut_slice(large_buffer), - ]; + pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver<u64> { + self.interrupt_tx.subscribe() + } - let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(io::Error::from)?; - InputBuffer { bytes, data } - }; + async fn handshake(&mut self, buffer: &mut Buffer) -> FuseResult<Handshake> { + self.session_fd.readable().await?.retain_ready(); + let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?; - let request: proto::Request<'_> = buffer.data().try_into()?; + let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?; + let init = match opcode { + proto::Opcode::Init => <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes])?, - let unique = request.header().unique; - let init = match request.body() { - proto::RequestBody::Init(body) => body, - _ => return Err(ProtocolInit), + _ => { + log::error!("First message from kernel is not Init, but {:?}", opcode); + return Err(FuseError::ProtocolInit); + } }; use std::cmp::Ordering; @@ -156,7 +129,7 @@ impl<Fs: Fuse> Session<Fs> { Ordering::Greater => { let tail = [bytes_of(&proto::MAJOR_VERSION)]; - ok(self, unique, OutputChain::tail(&tail))?; + self.ok(header.unique, OutputChain::tail(&tail))?; return Ok(Handshake::Restart); } @@ -175,40 +148,27 @@ impl<Fs: Fuse> Session<Fs> { major = proto::MAJOR_VERSION ); - fail(self, unique, Errno::EPROTONOSUPPORT as i32)?; - return Err(ProtocolInit); + self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?; + return Err(FuseError::ProtocolInit); } - let root = { - let mut init_result = Err(0); - let reply = Reply { - session: self, - unique, - tail: &mut init_result, - }; + let flags = { + use proto::InitFlags; - self.fs.init(reply).await.into_result()?; + let kernel = InitFlags::from_bits_truncate(init.flags); + let supported = InitFlags::PARALLEL_DIROPS + | InitFlags::ABORT_ERROR + | InitFlags::MAX_PAGES + | InitFlags::CACHE_SYMLINKS; - match init_result { - Ok(root) => root, - Err(errno) => { - log::error!("init() handler failed: {}", Errno::from_i32(errno)); - return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno))); - } - } + kernel & supported }; - self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1)); + let buffer_size = page_size() * self.buffer_pages; - use proto::InitFlags; - let flags = InitFlags::from_bits_truncate(init.flags); - let supported = InitFlags::PARALLEL_DIROPS - | InitFlags::ABORT_ERROR - | InitFlags::MAX_PAGES - | InitFlags::CACHE_SYMLINKS; + // See fs/fuse/dev.c in the kernel source tree for details about max_write + let max_write = buffer_size - std::mem::size_of::<(InHeader, proto::WriteIn)>(); - let flags = flags & supported; - let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>(); let init_out = proto::InitOut { major: proto::MAJOR_VERSION, minor: proto::TARGET_MINOR_VERSION, @@ -218,230 +178,17 @@ impl<Fs: Fuse> Session<Fs> { congestion_threshold: 0, //TODO max_write: max_write.try_into().unwrap(), time_gran: 1, //TODO - max_pages: pages_per_buffer.try_into().unwrap(), + max_pages: self.buffer_pages.try_into().unwrap(), padding: Default::default(), unused: Default::default(), }; let tail = [bytes_of(&init_out)]; - ok(self, unique, OutputChain::tail(&tail))?; + self.ok(header.unique, OutputChain::tail(&tail))?; Ok(Handshake::Done) } - async fn dispatch(self: &Arc<Self>, request: &mut Incoming) -> FuseResult<()> { - let request: proto::Request<'_> = request.buffer.data().try_into()?; - let header = request.header(); - let InHeader { unique, ino, .. } = *header; - let ino = Ino(ino); - - use proto::RequestBody::*; - - macro_rules! op { - () => { - op!(()) - }; - - ($body:expr) => { - op!($body, ()) - }; - - ($body:expr, $tail:expr) => {{ - let request = Request { - header, - body: $body, - }; - let reply = Reply { - session: &self, - unique, - tail: $tail, - }; - - (request, reply, self) - }}; - } - - // These operations don't involve locking and searching self.known - match request.body() { - Forget(body) => { - self.forget(std::iter::once((ino, body.nlookup))).await; - return Ok(()); - } - - Statfs => return self.fs.statfs(op!()).await.into_result(), - - Interrupt(body) => { - //TODO: Don't reply with EAGAIN if the interrupt is successful - let _ = self.interrupt_tx.send(body.unique); - return fail(self, unique, Errno::EAGAIN as i32); - } - - Destroy => { - self.destroy.notify_one(); - return Ok(()); - } - - BatchForget { forgets, .. } => { - let forgets = forgets - .iter() - .map(|target| (Ino(target.ino), target.nlookup)); - - self.forget(forgets).await; - return Ok(()); - } - - _ => (), - } - - // Some operations are handled while self.known is locked - let inode = { - let known = self.known.lock().unwrap(); - let inode = match known.get(&ino) { - Some((farc, _)) => farc, - None => { - log::error!( - "Lookup count for ino {} reached zero while still \ - known to the kernel, this is a bug", - ino - ); - - return fail(self, unique, Errno::ENOANO as i32); - } - }; - - match request.body() { - Getattr(_) => { - //TODO: Getattr flags - let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode); - let attrs = attrs.finish::<Fs>(inode); - drop(known); - - let out = proto::AttrOut { - attr_valid: ttl.seconds, - attr_valid_nsec: ttl.nanoseconds, - dummy: Default::default(), - attr: attrs, - }; - - return ok(self, unique, OutputChain::tail(&[bytes_of(&out)])); - } - - Access(body) => { - return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result() - } - - _ => inode.clone(), - } - }; - - macro_rules! inode_op { - ($op:ident, $($exprs:expr),+) => { - <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await - }; - } - - // These operations involve a Farc cloned from self.known - let done = match request.body() { - Lookup { name } => inode_op!(lookup, *name), - Readlink => inode_op!(readlink, ()), - Open(body) => { - let mut flags = proto::OpenOutFlags::empty(); - if <Fs as Fuse>::Inode::direct_io(&inode) { - flags |= proto::OpenOutFlags::DIRECT_IO; - } - - inode_op!(open, *body, (ino, flags)) - } - Opendir(body) => inode_op!(opendir, *body), - Readdir(body) => inode_op!(readdir, *body), - - _ => return fail(self, unique, Errno::ENOSYS as i32), - }; - - done.into_result() - } - - async fn forget<I>(&self, targets: I) - where - I: Iterator<Item = (Ino, u64)>, - { - let mut known = self.known.lock().unwrap(); - - for (ino, subtracted) in targets { - use hash_map::Entry::*; - - match known.entry(ino) { - Occupied(mut entry) => { - let (_, count) = entry.get_mut(); - - *count = count.saturating_sub(subtracted); - if *count > 0 { - continue; - } - - entry.remove(); - } - - Vacant(_) => { - log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino); - continue; - } - } - } - } - - async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming> { - use InputBufferStorage::*; - - let permit = Arc::clone(&self.input_semaphore) - .acquire_owned() - .await - .unwrap(); - - let mut incoming = Incoming { - buffer: InputBuffer { - bytes: 0, - data: Sbo(SboStorage([0; SBO_SIZE])), - }, - }; - - let sbo = match &mut incoming.buffer.data { - Sbo(SboStorage(sbo)) => sbo, - _ => unreachable!(), - }; - - loop { - let mut readable = self.session_fd.readable().await?; - - let mut large_buffers = self.large_buffers.lock().unwrap(); - let large_buffer = large_buffers.last_mut().unwrap(); - - let mut io_vecs = [ - IoVec::from_mut_slice(sbo), - IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]), - ]; - - let mut read = |fd: &AsyncFd<RawFd>| readv(*fd.get_ref(), &mut io_vecs); - match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { - Ok(Ok(bytes)) => { - if bytes > SBO_SIZE { - (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo); - incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit); - } - - incoming.buffer.bytes = bytes; - return Ok(incoming); - } - - // Interrupted - Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue, - - Ok(Err(error)) => return Err(error), - Err(_) => continue, - } - } - } - fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { let after_header: usize = output .iter() @@ -479,81 +226,261 @@ impl<Fs: Fuse> Session<Fs> { } } +impl<'o> Dispatch<'o> { + pub fn op(self) -> Op<'o> { + use Dispatch::*; + + let common = match self { + Lookup(incoming) => incoming.common, + Getattr(incoming) => incoming.common, + Readlink(incoming) => incoming.common, + Read(incoming) => incoming.common, + Write(incoming) => incoming.common, + Statfs(incoming) => incoming.common, + Readdir(incoming) => incoming.common, + Access(incoming) => incoming.common, + Destroy(incoming) => incoming.common, + }; + + common.into_generic_op() + } +} + +impl Endpoint<'_> { + pub async fn receive<'a, F, Fut>(&'a mut self, dispatcher: F) -> FuseResult<()> + where + F: FnOnce(Dispatch<'a>) -> Fut, + Fut: Future<Output = Done<'a>>, + { + let buffer = &mut self.local_buffer.0; + let bytes = loop { + let mut readable = self.session.session_fd.readable().await?; + let mut read = |fd: &AsyncFd<RawFd>| read(*fd.get_ref(), buffer); + + let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { + Ok(result) => result, + Err(_) => continue, + }; + + match result { + // Interrupted + Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue, + + result => break result, + } + }; + + let (header, opcode) = proto::InHeader::from_bytes(&buffer[..bytes?])?; + let common = IncomingCommon { + session: self.session, + buffer: &mut self.local_buffer, + header, + }; + + let dispatch = { + use proto::Opcode::*; + + macro_rules! dispatch { + ($op:ident) => { + Dispatch::$op(Incoming { + common, + _phantom: PhantomData, + }) + }; + } + + match opcode { + Lookup => dispatch!(Lookup), + Getattr => dispatch!(Getattr), + Readlink => dispatch!(Readlink), + Read => dispatch!(Read), + Write => dispatch!(Write), + Statfs => dispatch!(Statfs), + Readdir => dispatch!(Readdir), + Access => dispatch!(Access), + Destroy => dispatch!(Destroy), + + _ => { + log::warn!("Not implemented: {}", common.header); + + let (_request, reply) = common.into_generic_op(); + let _ = reply.not_implemented(); + + return Ok(()); + } + } + }; + + let _ = dispatcher(dispatch).await; + Ok(()) + } +} + impl Start { - pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> { + pub async fn start(self) -> FuseResult<Arc<Session>> { let session_fd = self.session_fd.into_raw_fd(); let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); - let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize; - let pages_per_buffer = fs.request_buffer_pages().get(); - let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap(); - assert!(bytes_per_buffer >= proto::MIN_READ_SIZE); + let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); - let mut large_buffers = Vec::with_capacity(fs.request_buffers().get()); - for _ in 0..large_buffers.capacity() { - large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice()); - } + let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO + let buffer_count = SHARED_BUFFERS; //TODO + let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages)) + .take(buffer_count) + .collect(); - let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); let mut session = Session { _fusermount_fd: self.fusermount_fd, session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, - proto_minor: 0, // Set by Session::do_handshake() - fs, - input_semaphore: Arc::new(Semaphore::new(large_buffers.len())), - large_buffers: Mutex::new(large_buffers), - known: Mutex::new(HashMap::new()), - destroy: Notify::new(), interrupt_tx, + buffers: Mutex::new(buffers), + buffer_semaphore: Arc::new(Semaphore::new(buffer_count)), + proto_minor: 0, // Set by Session::do_handshake() + buffer_pages, }; - loop { - let state = session - .do_handshake(pages_per_buffer, bytes_per_buffer) - .await?; + let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap(); - if let Handshake::Done = state { + loop { + if let Handshake::Done = session.handshake(&mut init_buffer).await? { + session.buffers.get_mut().unwrap().push(init_buffer); break Ok(Arc::new(session)); } } } -} -enum Handshake { - Done, - Restart, + pub(crate) fn new(fusermount_fd: DumbFd, session_fd: DumbFd) -> Self { + Start { + fusermount_fd, + session_fd, + } + } } -struct Incoming { - buffer: InputBuffer, +impl<'o, O: Operation<'o>> Incoming<'o, O> { + pub fn op(self) -> Result<Op<'o, O>, Done<'o>> { + try_op( + self.common.session, + &self.common.buffer.0, + self.common.header, + ) + } } -struct InputBuffer { - pub bytes: usize, - pub data: InputBufferStorage, +impl<O: for<'o> Operation<'o>> Incoming<'_, O> { + pub async fn owned(self) -> Owned<O> { + let session = self.common.session; + + let (buffer, permit) = { + let semaphore = Arc::clone(&session.buffer_semaphore); + let permit = semaphore + .acquire_owned() + .await + .expect("Buffer semaphore error"); + + let mut buffers = session.buffers.lock().unwrap(); + let mut buffer = buffers.pop().expect("Buffer semaphore out of sync"); + + std::mem::swap(&mut buffer, self.common.buffer); + (buffer, permit) + }; + + Owned { + session: Arc::clone(session), + buffer, + header: self.common.header, + _permit: permit, + _phantom: PhantomData, + } + } } -enum InputBufferStorage { - Sbo(SboStorage), - Spilled(Box<[u8]>, OwnedSemaphorePermit), +impl<O: for<'o> Operation<'o>> Owned<O> { + pub fn op(&self) -> Result<Op<'_, O>, Done<'_>> { + try_op(&self.session, &self.buffer.0, self.header) + } } -#[repr(align(8))] -struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]); +impl<O: for<'o> Operation<'o>> Drop for Owned<O> { + fn drop(&mut self) { + if let Ok(mut buffers) = self.session.buffers.lock() { + let empty = Buffer(Vec::new().into_boxed_slice()); + buffers.push(std::mem::replace(&mut self.buffer, empty)); + } + } +} -const SBO_SIZE: usize = std::mem::size_of::<SboStorage>(); const INTERRUPT_BROADCAST_CAPACITY: usize = 32; +const SHARED_BUFFERS: usize = 32; +const HEADER_END: usize = std::mem::size_of::<InHeader>(); -impl InputBuffer { - fn data(&self) -> &[u8] { - use InputBufferStorage::*; - let storage = match &self.data { - Sbo(sbo) => &sbo.0, - Spilled(buffer, _) => &buffer[..], +struct IncomingCommon<'o> { + session: &'o Arc<Session>, + buffer: &'o mut Buffer, + header: proto::InHeader, +} + +enum Handshake { + Done, + Restart, +} + +struct Buffer(Box<[u8]>); + +impl<'o> IncomingCommon<'o> { + fn into_generic_op(self) -> Op<'o> { + let request = Request { + header: self.header, + body: (), }; - &storage[..self.bytes] + let reply = Reply { + session: self.session, + unique: self.header.unique, + tail: (), + }; + + (request, reply) + } +} + +impl Buffer { + fn new(pages: usize) -> Self { + Buffer(vec![0; pages * page_size()].into_boxed_slice()) } } + +fn try_op<'o, O: Operation<'o>>( + session: &'o Session, + bytes: &'o [u8], + header: proto::InHeader, +) -> Result<Op<'o, O>, Done<'o>> { + let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize]) { + Ok(body) => body, + Err(error) => { + log::error!("Parsing request {}: {}", header, error); + let reply = Reply::<ops::Any> { + session, + unique: header.unique, + tail: (), + }; + + return Err(reply.io_error()); + } + }; + + let request = Request { header, body }; + let reply = Reply { + session, + unique: header.unique, + tail: Default::default(), + }; + + Ok((request, reply)) +} + +fn page_size() -> usize { + sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize +} @@ -2,15 +2,8 @@ //! //! `blown-fuse` -#![feature( - concat_idents, - arbitrary_self_types, - associated_type_bounds, - associated_type_defaults, - trait_alias, - try_trait_v2, - doc_cfg -)] +#![forbid(unsafe_code)] +#![feature(try_trait_v2, doc_cfg)] #[cfg(not(target_os = "linux"))] compile_error!("Unsupported OS"); diff --git a/src/proto.rs b/src/proto.rs index 53c2123..f15aaff 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -4,7 +4,7 @@ use bitflags::bitflags; use bytemuck::{self, Pod}; use bytemuck_derive::{Pod, Zeroable}; use num_enum::TryFromPrimitive; -use std::{convert::TryFrom, ffi::CStr, fmt, mem}; +use std::{convert::TryFrom, ffi::CStr, fmt}; use crate::{util::display_or, FuseError, FuseResult}; @@ -14,9 +14,15 @@ pub const MAJOR_VERSION: u32 = 7; pub const TARGET_MINOR_VERSION: u32 = 32; pub const REQUIRED_MINOR_VERSION: u32 = 31; -pub struct Request<'a> { - header: &'a InHeader, - body: RequestBody<'a>, +pub trait Structured<'o>: Sized { + fn split_from(bytes: &'o [u8], last: bool) -> FuseResult<(Self, &'o [u8])>; + + fn toplevel_from(bytes: &'o [u8]) -> FuseResult<Self> { + match Self::split_from(bytes, true)? { + (ok, end) if end.is_empty() => Ok(ok), + _ => Err(FuseError::BadLength), + } + } } #[derive(Pod, Zeroable, Copy, Clone)] @@ -40,98 +46,6 @@ pub struct OutHeader { pub unique: u64, } -pub enum RequestBody<'a> { - Lookup { - name: &'a CStr, - }, - Forget(&'a ForgetIn), - Getattr(&'a GetattrIn), - Setattr(&'a SetattrIn), - Readlink, - Symlink { - name: &'a CStr, - target: &'a CStr, - }, - Mknod { - prefix: &'a MknodIn, - name: &'a CStr, - }, - Mkdir { - prefix: &'a MkdirIn, - target: &'a CStr, - }, - Unlink { - name: &'a CStr, - }, - Rmdir { - name: &'a CStr, - }, - Rename { - prefix: &'a RenameIn, - old: &'a CStr, - new: &'a CStr, - }, - Link(&'a LinkIn), - Open(&'a OpenIn), - Read(&'a ReadIn), - Write { - prefix: &'a WriteIn, - data: &'a [u8], - }, - Statfs, - Release(&'a ReleaseIn), - Fsync(&'a FsyncIn), - Setxattr { - prefix: &'a SetxattrIn, - name: &'a CStr, - value: &'a CStr, - }, - Getxattr { - prefix: &'a GetxattrIn, - name: &'a CStr, - }, - Listxattr(&'a ListxattrIn), - Removexattr { - name: &'a CStr, - }, - Flush(&'a FlushIn), - Init(&'a InitIn), - Opendir(&'a OpendirIn), - Readdir(&'a ReaddirIn), - Releasedir(&'a ReleasedirIn), - Fsyncdir(&'a FsyncdirIn), - Getlk(&'a GetlkIn), - Setlk(&'a SetlkIn), - Setlkw(&'a SetlkwIn), - Access(&'a AccessIn), - Create { - prefix: &'a CreateIn, - name: &'a CStr, - }, - Interrupt(&'a InterruptIn), - Bmap(&'a BmapIn), - Destroy, - Ioctl { - prefix: &'a IoctlIn, - data: &'a [u8], - }, - Poll(&'a PollIn), - NotifyReply, - BatchForget { - prefix: &'a BatchForgetIn, - forgets: &'a [ForgetOne], - }, - Fallocate(&'a FallocateIn), - ReaddirPlus(&'a ReaddirPlusIn), - Rename2 { - prefix: &'a Rename2In, - old: &'a CStr, - new: &'a CStr, - }, - Lseek(&'a LseekIn), - CopyFileRange(&'a CopyFileRangeIn), -} - #[derive(TryFromPrimitive, Copy, Clone, Debug)] #[repr(u32)] pub enum Opcode { @@ -650,211 +564,52 @@ pub struct CopyFileRangeIn { pub flags: u64, } -impl Request<'_> { - pub fn header(&self) -> &InHeader { - self.header - } - - pub fn body(&self) -> &RequestBody<'_> { - &self.body +impl<'o> Structured<'o> for () { + fn split_from(bytes: &'o [u8], _last: bool) -> FuseResult<(Self, &'o [u8])> { + Ok(((), bytes)) } } -impl<'a> TryFrom<&'a [u8]> for Request<'a> { - type Error = FuseError; - - fn try_from(bytes: &'a [u8]) -> FuseResult<Self> { - use FuseError::*; - - fn split_from_bytes<T: Pod>(bytes: &[u8]) -> FuseResult<(&T, &[u8])> { - let (bytes, next_bytes) = bytes.split_at(bytes.len().min(std::mem::size_of::<T>())); - match bytemuck::try_from_bytes(bytes) { - Ok(t) => Ok((t, next_bytes)), - Err(_) => Err(Truncated), +impl<'o> Structured<'o> for &'o CStr { + fn split_from(bytes: &'o [u8], last: bool) -> FuseResult<(Self, &'o [u8])> { + let (cstr, after_cstr): (&[u8], &[u8]) = if last { + (bytes, &[]) + } else { + match bytes.iter().position(|byte| *byte == b'\0') { + Some(nul) => bytes.split_at(nul + 1), + None => return Err(FuseError::Truncated), } - } - - let full_bytes = bytes; - let (header, mut bytes) = split_from_bytes::<InHeader>(full_bytes)?; - - if header.len as usize != full_bytes.len() { - return Err(BadLength); - } - - let opcode = match Opcode::try_from(header.opcode) { - Ok(opcode) => opcode, - Err(_) => return Err(BadOpcode), }; - macro_rules! prefix { - ($op:ident, $ident:ident, $is_last:expr) => { - prefix!($op, $ident); - }; - - ($op:ident, $ident:ident) => { - let ($ident, after_prefix) = split_from_bytes::<concat_idents!($op, In)>(bytes)?; - bytes = after_prefix; - }; - } - - fn cstr_from_bytes(bytes: &[u8], is_last: bool) -> FuseResult<(&CStr, &[u8])> { - let (cstr, after_cstr): (&[u8], &[u8]) = if is_last { - (bytes, &[]) - } else { - match bytes.iter().position(|byte| *byte == b'\0') { - Some(nul) => bytes.split_at(nul + 1), - None => return Err(Truncated), - } - }; - - let cstr = CStr::from_bytes_with_nul(cstr).map_err(|_| BadLength)?; - Ok((cstr, after_cstr)) - } - - macro_rules! cstr { - ($op:ident, $ident:ident, $is_last:expr) => { - let ($ident, next_bytes) = cstr_from_bytes(bytes, $is_last)?; - bytes = next_bytes; - }; - } - - macro_rules! name { - ($op:ident, $ident:ident, $is_last:expr) => { - cstr!($op, $ident, $is_last); - }; - } - - macro_rules! value { - ($op:ident, $ident:ident, $is_last:expr) => { - cstr!($op, $ident, $is_last); - }; - } - - macro_rules! target { - ($op:ident, $ident:ident, $is_last:expr) => { - cstr!($op, $ident, $is_last); - }; - } - - macro_rules! old { - ($op:ident, $ident:ident, $is_last:expr) => { - cstr!($op, $ident, $is_last); - }; - } + let cstr = CStr::from_bytes_with_nul(cstr).map_err(|_| FuseError::BadLength)?; + Ok((cstr, after_cstr)) + } +} - macro_rules! new { - ($op:ident, $ident:ident, $is_last:expr) => { - cstr!($op, $ident, $is_last); - }; +impl<'o, T: Pod> Structured<'o> for &'o T { + fn split_from(bytes: &'o [u8], _last: bool) -> FuseResult<(Self, &'o [u8])> { + let (bytes, next_bytes) = bytes.split_at(bytes.len().min(std::mem::size_of::<T>())); + match bytemuck::try_from_bytes(bytes) { + Ok(t) => Ok((t, next_bytes)), + Err(_) => Err(FuseError::Truncated), } + } +} - macro_rules! build_body { - ($op:ident, $last:ident) => { - $last!($op, $last, true); - }; - - ($op:ident, $field:ident, $($next:ident),+) => { - $field!($op, $field, false); - build_body!($op, $($next),+); - }; - } +impl InHeader { + pub fn from_bytes(bytes: &[u8]) -> FuseResult<(Self, Opcode)> { + let (header, _) = <&InHeader>::split_from(bytes, false)?; - macro_rules! body { - ($op:ident) => { - RequestBody::$op - }; - - ($op:ident, prefix) => { - { - prefix!($op, prefix); - RequestBody::$op(prefix) - } - }; - - ($op:ident, prefix, data where len == $size_field:ident) => { - { - prefix!($op, prefix); - if prefix.$size_field as usize != bytes.len() { - return Err(BadLength); - } - - RequestBody::$op { prefix, data: mem::take(&mut bytes) } - } - }; - - ($op:ident, $($fields:ident),+) => { - { - build_body!($op, $($fields),+); - RequestBody::$op { $($fields),+ } - } - }; + if header.len as usize != bytes.len() { + return Err(FuseError::BadLength); } - use Opcode::*; - let body = match opcode { - Lookup => body!(Lookup, name), - Forget => body!(Forget, prefix), - Getattr => body!(Getattr, prefix), - Setattr => body!(Setattr, prefix), - Readlink => body!(Readlink), - Symlink => body!(Symlink, name, target), - Mknod => body!(Mknod, prefix, name), - Mkdir => body!(Mkdir, prefix, target), - Unlink => body!(Unlink, name), - Rmdir => body!(Rmdir, name), - Rename => body!(Rename, prefix, old, new), - Link => body!(Link, prefix), - Open => body!(Open, prefix), - Read => body!(Read, prefix), - Write => body!(Write, prefix, data where len == size), - Statfs => body!(Statfs), - Release => body!(Release, prefix), - Fsync => body!(Fsync, prefix), - Setxattr => body!(Setxattr, prefix, name, value), - Getxattr => body!(Getxattr, prefix, name), - Listxattr => body!(Listxattr, prefix), - Removexattr => body!(Removexattr, name), - Flush => body!(Flush, prefix), - Init => body!(Init, prefix), - Opendir => body!(Opendir, prefix), - Readdir => body!(Readdir, prefix), - Releasedir => body!(Releasedir, prefix), - Fsyncdir => body!(Fsyncdir, prefix), - Getlk => body!(Getlk, prefix), - Setlk => body!(Setlk, prefix), - Setlkw => body!(Setlkw, prefix), - Access => body!(Access, prefix), - Create => body!(Create, prefix, name), - Interrupt => body!(Interrupt, prefix), - Bmap => body!(Bmap, prefix), - Destroy => body!(Destroy), - Ioctl => body!(Ioctl, prefix, data where len == in_size), - Poll => body!(Poll, prefix), - NotifyReply => body!(NotifyReply), - BatchForget => { - prefix!(BatchForget, prefix); - - let forgets = mem::take(&mut bytes); - let forgets = bytemuck::try_cast_slice(forgets).map_err(|_| Truncated)?; - - if prefix.count as usize != forgets.len() { - return Err(BadLength); - } - - RequestBody::BatchForget { prefix, forgets } - } - Fallocate => body!(Fallocate, prefix), - ReaddirPlus => body!(ReaddirPlus, prefix), - Rename2 => body!(Rename2, prefix, old, new), - Lseek => body!(Lseek, prefix), - CopyFileRange => body!(CopyFileRange, prefix), + let opcode = match Opcode::try_from(header.opcode) { + Ok(opcode) => opcode, + Err(_) => return Err(FuseError::BadOpcode), }; - if bytes.is_empty() { - Ok(Request { header, body }) - } else { - Err(BadLength) - } + Ok((*header, opcode)) } } |
