diff options
| author | Alejandro Soto <alejandro@34project.org> | 2021-12-29 04:56:41 -0600 |
|---|---|---|
| committer | Alejandro Soto <alejandro@34project.org> | 2021-12-29 05:20:49 -0600 |
| commit | 48efcd0ce8c8a9ac51dc80c6ec49c63f6694b031 (patch) | |
| tree | bdb330a1bb59958ca914be40c906112d25b1eb39 | |
| parent | 606846f23726c3472e6e12b17447b102ad6158cc (diff) | |
Implement buffered readdir
| -rw-r--r-- | Cargo.toml | 1 | ||||
| -rw-r--r-- | examples/ext2.rs | 5 | ||||
| -rw-r--r-- | src/fuse/io.rs | 5 | ||||
| -rw-r--r-- | src/fuse/mod.rs | 2 | ||||
| -rw-r--r-- | src/fuse/ops.rs | 204 | ||||
| -rw-r--r-- | src/fuse/session.rs | 13 | ||||
| -rw-r--r-- | src/lib.rs | 5 | ||||
| -rw-r--r-- | src/proto.rs | 4 |
8 files changed, 200 insertions, 39 deletions
@@ -16,6 +16,7 @@ client = [] bitflags = "1.3.2" bytemuck = "1.7.3" bytemuck_derive = "1.0.1" +bytes = "1.1.0" log = "0.4.14" nix = "0.23.1" num_enum = "0.5.5" diff --git a/examples/ext2.rs b/examples/ext2.rs index 4d074e9..3a68eee 100644 --- a/examples/ext2.rs +++ b/examples/ext2.rs @@ -147,7 +147,7 @@ impl Ext2 { &self, inode: &'static Inode, start: u64, - ) -> impl Stream<Item = Result<Entry<&'static OsStr, Resolved>, Errno>> + '_ { + ) -> impl Stream<Item = Result<Entry<'static, Resolved>, Errno>> + '_ { stream::try_unfold(start, move |mut position| async move { loop { if position == inode.i_size as u64 { @@ -417,7 +417,8 @@ impl Ext2 { } async fn readdir<'o>(&self, (request, reply): Op<'o, Readdir>) -> Done<'o> { - let (mut reply, inode) = reply.fallible(self.inode(request.ino()))?; + let (reply, inode) = reply.fallible(self.inode(request.ino()))?; + let mut reply = reply.buffered(Vec::new()); let stream = self.directory_stream(inode, request.offset()); tokio::pin!(stream); diff --git a/src/fuse/io.rs b/src/fuse/io.rs index 47df045..7ba2944 100644 --- a/src/fuse/io.rs +++ b/src/fuse/io.rs @@ -3,6 +3,7 @@ use nix::{errno::Errno, sys::stat::SFlag}; use std::{ convert::Infallible, + ffi::OsStr, future::Future, ops::{ControlFlow, FromResidual, Try}, }; @@ -33,9 +34,9 @@ pub trait Known { #[derive(Clone)] pub struct Attrs(proto::Attrs); -pub struct Entry<N, K> { +pub struct Entry<'a, K> { pub offset: u64, - pub name: N, + pub name: &'a OsStr, pub inode: K, pub ttl: Ttl, } diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs index 5893d79..5a1bda6 100644 --- a/src/fuse/mod.rs +++ b/src/fuse/mod.rs @@ -15,7 +15,7 @@ mod private_trait { pub trait Sealed {} } -pub trait Operation<'o>: private_trait::Sealed { +pub trait Operation<'o>: private_trait::Sealed + Sized { type RequestBody: crate::proto::Structured<'o>; type ReplyTail; } diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs index 66708ad..21458c2 100644 --- a/src/fuse/ops.rs +++ b/src/fuse/ops.rs @@ -1,7 +1,7 @@ -use bytemuck::{bytes_of, Pod, Zeroable}; - use std::{ + convert::Infallible, ffi::{CStr, OsStr}, + marker::PhantomData, os::unix::ffi::OsStrExt, }; @@ -12,15 +12,25 @@ use crate::{ }; use super::{ - io::{AccessFlags, Entry, FsInfo, Interruptible, Known}, + io::{AccessFlags, Entry, EntryType, FsInfo, Interruptible, Known}, + private_trait::Sealed, Done, Operation, Reply, Request, }; +use bytemuck::{bytes_of, Pod, Zeroable}; +use bytes::BufMut; +use nix::sys::stat::SFlag; + +pub trait FromRequest<'o, O: Operation<'o>> { + //TODO: Shouldn't be public + fn from_request(request: &Request<'o, O>) -> Self; +} + macro_rules! op { { $name:ident $operation:tt } => { - pub struct $name(std::convert::Infallible); + pub struct $name(Infallible); - impl super::private_trait::Sealed for $name {} + impl Sealed for $name {} impl<'o> Operation<'o> for $name $operation }; @@ -53,7 +63,7 @@ op! { impl Request { /// Returns the name of the entry being looked up in this directory. pub fn name(&self) -> &OsStr { - c_to_os(self.body) + OsStr::from_bytes(self.body.to_bytes()) } } @@ -167,12 +177,12 @@ op! { op! { Open { type RequestBody = &'o proto::OpenIn; - type ReplyTail = state::OpenFlags; + type ReplyTail = proto::OpenOutFlags; } impl Reply { pub fn force_direct_io(&mut self) { - self.tail.0 |= proto::OpenOutFlags::DIRECT_IO; + self.tail |= proto::OpenOutFlags::DIRECT_IO; } /// The inode may now be accessed. @@ -181,7 +191,7 @@ op! { } fn ok_with_handle(self, handle: u64) -> Done<'o> { - let open_flags = self.tail.0.bits(); + let open_flags = self.tail.bits(); self.single(&proto::OpenOut { fh: handle, @@ -314,36 +324,147 @@ op! { op! { Readdir { - type RequestBody = &'o proto::ReaddirIn; - type ReplyTail = (); + type RequestBody = proto::OpcodeSelect< + &'o proto::ReaddirPlusIn, + &'o proto::ReaddirIn, + { proto::Opcode::ReaddirPlus as u32 }, + >; + + type ReplyTail = state::Readdir<()>; } impl Request { pub fn handle(&self) -> u64 { - self.body.read_in.fh + self.read_in().fh } /// Returns the base offset in the directory stream to read from. pub fn offset(&self) -> u64 { - self.body.read_in.offset + self.read_in().offset } pub fn size(&self) -> u32 { - self.body.read_in.size + self.read_in().size + } + + fn read_in(&self) -> &proto::ReadIn { + use proto::OpcodeSelect::*; + + match &self.body { + Match(readdir_plus) => &readdir_plus.read_in, + Alt(readdir) => &readdir.read_in, + } } } impl Reply { - pub fn entry<N>(self, inode: Entry<N, impl Known>) -> Interruptible<'o, Readdir, ()> + pub fn buffered<B>(self, buffer: B) -> Reply<'o, BufferedReaddir<B>> where - N: AsRef<OsStr>, + B: BufMut + AsRef<[u8]>, { - todo!() + assert!(buffer.as_ref().is_empty()); + + let state::Readdir { max_read, is_plus, buffer: (), } = self.tail; + + Reply { + session: self.session, + unique: self.unique, + tail: state::Readdir { max_read, is_plus, buffer, }, + } } + } +} + +pub struct BufferedReaddir<B>(Infallible, PhantomData<B>); + +impl<B> Sealed for BufferedReaddir<B> {} + +impl<'o, B> Operation<'o> for BufferedReaddir<B> { + type RequestBody = (); // Never actually created + type ReplyTail = state::Readdir<B>; +} - pub fn end(self) -> Done<'o> { - todo!() +impl<'o, B: BufMut + AsRef<[u8]>> Reply<'o, BufferedReaddir<B>> { + pub fn entry(mut self, entry: Entry<impl Known>) -> Interruptible<'o, BufferedReaddir<B>, ()> { + let entry_header_len = if self.tail.is_plus { + std::mem::size_of::<proto::DirentPlus>() + } else { + std::mem::size_of::<proto::Dirent>() + }; + + let name = entry.name.as_bytes(); + let padding_len = dirent_pad_bytes(entry_header_len + name.len()); + + let buffer = &mut self.tail.buffer; + let remaining = buffer + .remaining_mut() + .min(self.tail.max_read - buffer.as_ref().len()); + + let record_len = entry_header_len + name.len() + padding_len; + if remaining < record_len { + if buffer.as_ref().is_empty() { + log::error!("Buffer for readdir req #{} is too small", self.unique); + return Interruptible::Interrupted(self.fail(Errno::ENOBUFS)); + } + + return Interruptible::Interrupted(self.end()); } + + let entry_type = match entry.inode.inode_type() { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }; + + let ino = entry.inode.ino(); + let dirent = proto::Dirent { + ino: ino.as_raw(), + off: entry.offset, + namelen: name.len().try_into().unwrap(), + entry_type: entry_type.bits() >> 12, + }; + + enum Ent { + Dirent(proto::Dirent), + DirentPlus(proto::DirentPlus), + } + + let ent = if self.tail.is_plus { + let (attrs, attrs_ttl) = entry.inode.attrs(); + let attrs = attrs.finish(&entry.inode); + let entry_out = make_entry((ino, entry.ttl), (attrs, attrs_ttl)); + + if name != ".".as_bytes() && name != "..".as_bytes() { + entry.inode.unveil(); + } + + Ent::DirentPlus(proto::DirentPlus { entry_out, dirent }) + } else { + Ent::Dirent(dirent) + }; + + let entry_header = match &ent { + Ent::Dirent(dirent) => bytes_of(dirent), + Ent::DirentPlus(dirent_plus) => bytes_of(dirent_plus), + }; + + buffer.put_slice(entry_header); + buffer.put_slice(name); + buffer.put_slice(&[0; 7][..padding_len]); + + if remaining - record_len >= entry_header.len() + (1 << proto::DIRENT_ALIGNMENT_BITS) { + Interruptible::Completed(self, ()) + } else { + Interruptible::Interrupted(self.end()) + } + } + + pub fn end(self) -> Done<'o> { + self.inner(|this| this.tail.buffer.as_ref()) } } @@ -385,12 +506,31 @@ pub(crate) mod state { pub buffer_pages: usize, } - #[derive(Copy, Clone)] - pub struct OpenFlags(pub proto::OpenOutFlags); + pub struct Readdir<B> { + pub max_read: usize, + pub is_plus: bool, + pub buffer: B, + } +} + +impl<'o, O: Operation<'o>> FromRequest<'o, O> for () { + fn from_request(_request: &Request<'o, O>) -> Self { + () + } +} - impl Default for OpenFlags { - fn default() -> Self { - OpenFlags(proto::OpenOutFlags::empty()) +impl<'o> FromRequest<'o, Open> for proto::OpenOutFlags { + fn from_request(_request: &Request<'o, Open>) -> Self { + proto::OpenOutFlags::empty() + } +} + +impl<'o> FromRequest<'o, Readdir> for state::Readdir<()> { + fn from_request(request: &Request<'o, Readdir>) -> Self { + state::Readdir { + max_read: request.size() as usize, + is_plus: matches!(request.body, proto::OpcodeSelect::Match(_)), + buffer: (), } } } @@ -404,16 +544,19 @@ impl<'o, O: Operation<'o>> Reply<'o, O> { self.chain(OutputChain::tail(&[bytes_of(single)])) } + fn inner(self, deref: impl FnOnce(&Self) -> &[u8]) -> Done<'o> { + let result = self + .session + .ok(self.unique, OutputChain::tail(&[deref(&self)])); + self.finish(result) + } + fn chain(self, chain: OutputChain<'_>) -> Done<'o> { let result = self.session.ok(self.unique, chain); self.finish(result) } } -fn c_to_os(string: &CStr) -> &OsStr { - OsStr::from_bytes(string.to_bytes()) -} - fn make_entry( (Ino(ino), entry_ttl): (Ino, Ttl), (attrs, attr_ttl): (proto::Attrs, Ttl), @@ -428,3 +571,8 @@ fn make_entry( attr: attrs, } } + +fn dirent_pad_bytes(entry_len: usize) -> usize { + const ALIGN_MASK: usize = (1 << proto::DIRENT_ALIGNMENT_BITS) - 1; + ((entry_len + ALIGN_MASK) & !ALIGN_MASK) - entry_len +} diff --git a/src/fuse/session.rs b/src/fuse/session.rs index 82db758..f25264f 100644 --- a/src/fuse/session.rs +++ b/src/fuse/session.rs @@ -27,7 +27,10 @@ use crate::{ Errno, FuseError, FuseResult, }; -use super::{ops, Done, Op, Operation, Reply, Request}; +use super::{ + ops::{self, FromRequest}, + Done, Op, Operation, Reply, Request, +}; pub struct Start { fusermount_fd: DumbFd, @@ -357,7 +360,7 @@ impl Start { impl<'o, O: Operation<'o>> Incoming<'o, O> where - O::ReplyTail: Default, + O::ReplyTail: FromRequest<'o, O>, { pub fn op(self) -> Result<Op<'o, O>, Done<'o>> { try_op( @@ -398,7 +401,7 @@ impl<O: for<'o> Operation<'o>> Incoming<'_, O> { impl<O: for<'o> Operation<'o>> Owned<O> where - for<'o> <O as Operation<'o>>::ReplyTail: Default, + for<'o> <O as Operation<'o>>::ReplyTail: FromRequest<'o, O>, { pub fn op(&self) -> Result<Op<'_, O>, Done<'_>> { try_op(&self.session, &self.buffer.0, self.header) @@ -460,7 +463,7 @@ fn try_op<'o, O: Operation<'o>>( header: InHeader, ) -> Result<Op<'o, O>, Done<'o>> where - O::ReplyTail: Default, + O::ReplyTail: FromRequest<'o, O>, { let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize], &header) { Ok(body) => body, @@ -480,7 +483,7 @@ where let reply = Reply { session, unique: header.unique, - tail: Default::default(), + tail: FromRequest::from_request(&request), }; Ok((request, reply)) @@ -64,6 +64,11 @@ impl Ino { /// number, while replies involving the root inode will always report `Ino::ROOT` to the FUSE /// client. Therefore, filesystems do not have to be aware of `Ino::ROOT` in most cases. pub const ROOT: Self = Ino(proto::ROOT_ID); + + /// Extracts the raw inode number. + pub fn as_raw(self) -> u64 { + self.0 + } } impl std::fmt::Display for Ino { diff --git a/src/proto.rs b/src/proto.rs index 7ef3415..1ef33a5 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -9,11 +9,13 @@ use std::{convert::TryFrom, ffi::CStr, fmt}; use crate::{util::display_or, FuseError, FuseResult}; pub const ROOT_ID: u64 = 1; -pub const MIN_READ_SIZE: usize = 8192; pub const MAJOR_VERSION: u32 = 7; pub const TARGET_MINOR_VERSION: u32 = 32; pub const REQUIRED_MINOR_VERSION: u32 = 31; +pub const MIN_READ_SIZE: usize = 8192; +pub const DIRENT_ALIGNMENT_BITS: usize = 3; + pub trait Structured<'o>: Sized { fn split_from(bytes: &'o [u8], header: &InHeader, last: bool) -> FuseResult<(Self, &'o [u8])>; |
