summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlejandro Soto <alejandro@34project.org>2021-12-27 00:44:23 -0600
committerAlejandro Soto <alejandro@34project.org>2021-12-28 19:43:44 -0600
commita3212a0ba07da7bdae9e17637fbc237e2ae01c08 (patch)
tree00be583beba0f321ebeea3af21582ce927943b44
parent311b2a40213aa48131a189f99dc4258d354c0c78 (diff)
Redesign the API around a user-provided main loop
This is basically a full library rewrite.
Diffstat (limited to '')
-rw-r--r--Cargo.toml6
-rw-r--r--examples/ext2.rs268
-rw-r--r--src/fuse/fs.rs78
-rw-r--r--src/fuse/io.rs97
-rw-r--r--src/fuse/mod.rs69
-rw-r--r--src/fuse/mount.rs8
-rw-r--r--src/fuse/ops.rs332
-rw-r--r--src/fuse/session.rs705
-rw-r--r--src/lib.rs11
-rw-r--r--src/proto.rs329
10 files changed, 713 insertions, 1190 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 63972a5..96c0041 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,9 +1,10 @@
[package]
name = "blown-fuse"
-version = "0.1.0"
+version = "0.2.0"
authors = ["Alejandro Soto <alejandro@34project.org>"]
edition = "2021"
description = "Filesystem in Userspace"
+license = "LGPL-3.0-or-later"
[features]
default = ["server", "mount"]
@@ -12,11 +13,9 @@ mount = ["server"]
client = []
[dependencies]
-async-trait = "0.1.52"
bitflags = "1.3.2"
bytemuck = "1.7.3"
bytemuck_derive = "1.0.1"
-futures-util = "0.3.19"
log = "0.4.14"
nix = "0.23.1"
num_enum = "0.5.5"
@@ -27,5 +26,6 @@ tokio = { version = "1.15.0", features = ["rt", "net", "macros", "sync"] }
[dev-dependencies]
clap = "2.34.0"
env_logger = "0.9.0"
+futures-util = "0.3.19"
tokio = { version = "1.15.0", features = ["rt-multi-thread"] }
uuid = "0.8.2"
diff --git a/examples/ext2.rs b/examples/ext2.rs
index c1f4bce..9bcc2a6 100644
--- a/examples/ext2.rs
+++ b/examples/ext2.rs
@@ -1,7 +1,7 @@
/* Read-only ext2 (rev 1.0) implementation.
*
* This is not really async, since the whole backing storage
- * is mmap()ed for simplicity, and then treated as a regular
+ * is mmap()ed for simplicity, and then treated as a static
* slice (likely unsound, I don't care). Some yields are
* springled in a few places in order to emulate true async
* operations.
@@ -9,8 +9,6 @@
* Reference: <https://www.nongnu.org/ext2-doc/ext2.html>
*/
-#![feature(arbitrary_self_types)]
-
#[cfg(target_endian = "big")]
compile_error!("This example assumes a little-endian system");
@@ -32,45 +30,33 @@ use nix::{
};
use blown_fuse::{
- fs::Fuse,
- io::{Attrs, Entry, FsInfo},
+ io::{Attrs, Entry, FsInfo, Known},
mount::{mount_sync, Options},
- ops::{Init, Lookup, Readdir, Readlink, Statfs},
- Done, Ino, Reply, TimeToLive,
+ ops::{Getattr, Init, Lookup, Readdir, Readlink, Statfs},
+ session::{Dispatch, Start},
+ Done, FuseResult, Ino, Op, TimeToLive,
};
-use async_trait::async_trait;
use bytemuck::{cast_slice, from_bytes, try_from_bytes};
use bytemuck_derive::{Pod, Zeroable};
use clap::{App, Arg};
-use futures_util::stream::{self, Stream, TryStreamExt};
+use futures_util::stream::{self, Stream, StreamExt, TryStreamExt};
use smallvec::SmallVec;
use tokio::{self, runtime::Runtime};
use uuid::Uuid;
const EXT2_ROOT: Ino = Ino(2);
-type Op<'o, O> = blown_fuse::Op<'o, Ext2, O>;
-
-#[derive(Copy, Clone)]
-struct Farc {
- ino: Ino,
- inode: &'static Inode,
-}
-
-impl std::ops::Deref for Farc {
- type Target = Inode;
-
- fn deref(&self) -> &Self::Target {
- self.inode
- }
-}
-
struct Ext2 {
backing: &'static [u8],
superblock: &'static Superblock,
}
+struct Resolved {
+ ino: Ino,
+ inode: &'static Inode,
+}
+
#[derive(Pod, Zeroable, Copy, Clone)]
#[repr(C)]
struct Superblock {
@@ -158,16 +144,16 @@ struct LinkedEntry {
impl Ext2 {
fn directory_stream(
&self,
- inode: Farc,
+ inode: &'static Inode,
start: u64,
- ) -> impl Stream<Item = Result<Entry<'static, Farc>, Errno>> + '_ {
+ ) -> impl Stream<Item = Result<Entry<&'static OsStr, Resolved>, Errno>> + '_ {
stream::try_unfold(start, move |mut position| async move {
loop {
if position == inode.i_size as u64 {
break Ok(None); // End of stream
}
- let bytes = self.seek_contiguous(&inode, position)?;
+ let bytes = self.seek_contiguous(inode, position)?;
let (header, bytes) = bytes.split_at(size_of::<LinkedEntry>());
let header: &LinkedEntry = from_bytes(header);
@@ -176,9 +162,14 @@ impl Ext2 {
continue; // Unused entry
}
- let inode = self.inode(Ino(header.inode as u64))?;
+ let ino = Ino(header.inode as u64);
let name = OsStr::from_bytes(&bytes[..header.name_len as usize]).into();
+ let inode = Resolved {
+ ino,
+ inode: self.inode(ino)?,
+ };
+
let entry = Entry {
inode,
name,
@@ -191,7 +182,13 @@ impl Ext2 {
})
}
- fn inode(&self, Ino(ino): Ino) -> Result<Farc, Errno> {
+ fn inode(&self, ino: Ino) -> Result<&'static Inode, Errno> {
+ let Ino(ino) = match ino {
+ Ino::ROOT => EXT2_ROOT,
+ EXT2_ROOT => Ino::ROOT,
+ ino => ino,
+ };
+
if ino == 0 {
log::error!("Attempted to access the null (0) inode");
return Err(Errno::EIO);
@@ -210,23 +207,21 @@ impl Ext2 {
let start = index % inodes_per_block * inode_size;
let end = start + size_of::<Inode>();
- Ok(Farc {
- ino: Ino(ino),
- inode: from_bytes(&self.block(block)?[start..end]),
- })
+ Ok(from_bytes(&self.block(block)?[start..end]))
}
- fn seek_contiguous(&self, inode: &Farc, position: u64) -> Result<&'static [u8], Errno> {
+ fn seek_contiguous(
+ &self,
+ inode: &'static Inode,
+ position: u64,
+ ) -> Result<&'static [u8], Errno> {
let block_size = self.block_size();
let position = position as usize;
let (direct, offset) = (position / block_size, position % block_size);
- let out_of_bounds = || {
- log::error!("Offset {} out of bounds in inode {}", position, inode.ino);
- };
-
+ let out_of_bounds = || log::error!("Offset {} out of bounds", position);
let chase = |indices: &[usize]| {
- let root: &[u8] = cast_slice(&inode.inode.i_block);
+ let root: &[u8] = cast_slice(&inode.i_block);
indices
.iter()
.try_fold(root, |ptrs, index| {
@@ -304,12 +299,8 @@ impl Ext2 {
}
}
-#[async_trait]
-impl Fuse for Ext2 {
- type Farc = Farc;
- type Inode = Inode;
-
- async fn init<'o>(&self, reply: Reply<'o, Ext2, Init>) -> Done<'o> {
+impl Ext2 {
+ async fn init<'o>(&self, (_, reply): Op<'o, Init>) -> Done<'o> {
let label = &self.superblock.s_volume_name;
let label = &label[..=label.iter().position(|byte| *byte == b'\0').unwrap_or(0)];
let label = CStr::from_bytes_with_nul(label)
@@ -327,16 +318,11 @@ impl Fuse for Ext2 {
log::info!("UUID: {}", Uuid::from_bytes(self.superblock.s_uuid));
log::info!("Label: {}", label.escape_debug());
- if let Ok(root) = self.inode(EXT2_ROOT) {
- log::info!("Mounted successfully");
- reply.root(root)
- } else {
- log::error!("Failed to retrieve the root inode");
- reply.io_error()
- }
+ log::info!("Mounted successfully");
+ reply.ok()
}
- async fn statfs<'o>(&self, (_, reply, _): Op<'o, Statfs>) -> Done<'o> {
+ async fn statfs<'o>(&self, (_, reply): Op<'o, Statfs>) -> Done<'o> {
let total_blocks = self.superblock.s_blocks_count as u64;
let free_blocks = self.superblock.s_free_blocks_count as u64;
let available_blocks = free_blocks - self.superblock.s_r_blocks_count as u64;
@@ -355,65 +341,21 @@ impl Fuse for Ext2 {
.filenames(255),
)
}
-}
-
-#[async_trait]
-impl blown_fuse::fs::Inode for Inode {
- type Fuse = Ext2;
- fn ino(self: &Farc) -> Ino {
- match self.ino {
- Ino::ROOT => EXT2_ROOT,
- EXT2_ROOT => Ino::ROOT,
- ino => ino,
- }
- }
-
- fn inode_type(self: &Farc) -> Type {
- let inode_type = self.i_mode >> 12;
- match inode_type {
- 0x01 => Type::Fifo,
- 0x02 => Type::CharacterDevice,
- 0x04 => Type::Directory,
- 0x06 => Type::BlockDevice,
- 0x08 => Type::File,
- 0x0A => Type::Symlink,
- 0x0C => Type::Socket,
+ async fn getattr<'o>(&self, (request, reply): Op<'o, Getattr>) -> Done<'o> {
+ let ino = request.ino();
+ let (reply, inode) = reply.fallible(self.inode(ino))?;
- _ => {
- log::error!("Inode {} has invalid type {:x}", self.ino, inode_type);
- Type::File
- }
- }
+ reply.known(&Resolved { ino, inode })
}
- fn attrs(self: &Farc) -> (Attrs, TimeToLive) {
- let (access, modify, change) = {
- let time = |seconds: u32| (UNIX_EPOCH + Duration::from_secs(seconds.into())).into();
- (time(self.i_atime), time(self.i_mtime), time(self.i_ctime))
- };
-
- let attrs = Attrs::default()
- .size((self.i_dir_acl as u64) << 32 | self.i_size as u64)
- .owner(
- Uid::from_raw(self.i_uid.into()),
- Gid::from_raw(self.i_gid.into()),
- )
- .mode(Mode::from_bits_truncate(self.i_mode.into()))
- .blocks(self.i_blocks.into(), 512)
- .times(access, modify, change)
- .links(self.i_links_count.into());
-
- (attrs, TimeToLive::MAX)
- }
-
- async fn lookup<'o>(self: Farc, (request, reply, session): Op<'o, Lookup>) -> Done<'o> {
- let fs = session.fs();
+ async fn lookup<'o>(&self, (request, reply): Op<'o, Lookup>) -> Done<'o> {
let name = request.name();
+ let (reply, parent) = reply.fallible(self.inode(request.ino()))?;
//TODO: Indexed directories
- let lookup = async move {
- let stream = fs.directory_stream(self, 0);
+ let lookup = async {
+ let stream = self.directory_stream(parent, 0);
tokio::pin!(stream);
loop {
@@ -429,35 +371,38 @@ impl blown_fuse::fs::Inode for Inode {
let (reply, inode) = reply.fallible(result)?;
if let Some(inode) = inode {
- reply.found(&inode, TimeToLive::MAX)
+ reply.found(inode, TimeToLive::MAX)
} else {
reply.not_found(TimeToLive::MAX)
}
}
- async fn readlink<'o>(self: Farc, (_, reply, session): Op<'o, Readlink>) -> Done<'o> {
- if Inode::inode_type(&self) != Type::Symlink {
+ async fn readlink<'o>(&self, (request, reply): Op<'o, Readlink>) -> Done<'o> {
+ let ino = request.ino();
+ let (reply, inode) = reply.fallible(self.inode(ino))?;
+
+ let resolved = Resolved { ino, inode };
+ if resolved.inode_type() != Type::Symlink {
return reply.invalid_argument();
}
- let size = self.i_size as usize;
+ let size = inode.i_size as usize;
if size < size_of::<[u32; 15]>() {
- return reply.target(OsStr::from_bytes(&cast_slice(&self.i_block)[..size]));
+ return reply.target(OsStr::from_bytes(&cast_slice(&inode.i_block)[..size]));
}
- let fs = session.fs();
let segments = async {
/* This is unlikely to ever spill, and is guaranteed not to
* do so for valid symlinks on any fs where block_size >= 4096.
*/
- let mut segments = SmallVec::<[&OsStr; 1]>::new();
+ let mut segments = SmallVec::<[&[u8]; 1]>::new();
let (mut size, mut offset) = (size, 0);
while size > 0 {
- let segment = fs.seek_contiguous(&self, offset)?;
+ let segment = self.seek_contiguous(inode, offset)?;
let segment = &segment[..segment.len().min(size)];
- segments.push(OsStr::from_bytes(segment));
+ segments.push(segment);
size -= segment.len();
offset += segment.len() as u64;
@@ -470,9 +415,94 @@ impl blown_fuse::fs::Inode for Inode {
reply.gather_target(&segments)
}
- async fn readdir<'o>(self: Farc, (request, reply, session): Op<'o, Readdir>) -> Done<'o> {
- let stream = session.fs().directory_stream(self, request.offset());
- reply.try_stream(stream).await?
+ async fn readdir<'o>(&self, (request, reply): Op<'o, Readdir>) -> Done<'o> {
+ let (mut reply, inode) = reply.fallible(self.inode(request.ino()))?;
+
+ let stream = self.directory_stream(inode, request.offset());
+ tokio::pin!(stream);
+
+ while let Some(entry) = stream.next().await {
+ let (next_reply, entry) = reply.fallible(entry)?;
+ let (next_reply, ()) = next_reply.entry(entry)?;
+ reply = next_reply;
+ }
+
+ reply.end()
+ }
+}
+
+impl Known for Resolved {
+ fn ino(&self) -> Ino {
+ self.ino
+ }
+
+ fn inode_type(&self) -> Type {
+ let inode_type = self.inode.i_mode >> 12;
+ match inode_type {
+ 0x01 => Type::Fifo,
+ 0x02 => Type::CharacterDevice,
+ 0x04 => Type::Directory,
+ 0x06 => Type::BlockDevice,
+ 0x08 => Type::File,
+ 0x0A => Type::Symlink,
+ 0x0C => Type::Socket,
+
+ _ => {
+ log::error!("Inode {} has invalid type {:x}", self.ino, inode_type);
+ Type::File
+ }
+ }
+ }
+
+ fn attrs(&self) -> (Attrs, TimeToLive) {
+ let inode = self.inode;
+ let (access, modify, create) = {
+ let time = |seconds: u32| (UNIX_EPOCH + Duration::from_secs(seconds.into())).into();
+ let (atime, mtime, ctime) = (inode.i_atime, inode.i_mtime, inode.i_ctime);
+
+ (time(atime), time(mtime), time(ctime))
+ };
+
+ let attrs = Attrs::default()
+ .size((inode.i_dir_acl as u64) << 32 | inode.i_size as u64)
+ .owner(
+ Uid::from_raw(inode.i_uid.into()),
+ Gid::from_raw(inode.i_gid.into()),
+ )
+ .mode(Mode::from_bits_truncate(inode.i_mode.into()))
+ .blocks(inode.i_blocks.into(), 512)
+ .times(access, modify, create)
+ .links(inode.i_links_count.into());
+
+ (attrs, TimeToLive::MAX)
+ }
+
+ fn unveil(self) {}
+}
+
+async fn main_loop(session: Start, fs: Ext2) -> FuseResult<()> {
+ let session = session.start().await?;
+ let mut endpoint = session.endpoint();
+
+ loop {
+ let result = endpoint.receive(|dispatch| async {
+ use Dispatch::*;
+
+ match dispatch {
+ Statfs(statfs) => fs.statfs(statfs.op()?).await,
+ Getattr(getattr) => fs.getattr(getattr.op()?).await,
+ Lookup(lookup) => fs.lookup(lookup.op()?).await,
+ Readlink(readlink) => fs.readlink(readlink.op()?).await,
+ Readdir(readdir) => fs.readdir(readdir.op()?).await,
+
+ dispatch => {
+ let (_, reply) = dispatch.op();
+ reply.not_implemented()
+ }
+ }
+ });
+
+ result.await?;
}
}
@@ -553,5 +583,5 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
superblock,
};
- Ok(Runtime::new()?.block_on(async { session.start(fs).await?.main_loop().await })?)
+ Ok(Runtime::new()?.block_on(main_loop(session, fs))?)
}
diff --git a/src/fuse/fs.rs b/src/fuse/fs.rs
deleted file mode 100644
index 0c39ea7..0000000
--- a/src/fuse/fs.rs
+++ /dev/null
@@ -1,78 +0,0 @@
-use crate::{Ino, TimeToLive};
-use async_trait::async_trait;
-use std::{num::NonZeroUsize, ops::Deref, sync::Arc};
-
-use super::{
- io::{Attrs, EntryType},
- ops::*,
- Done, Op, Reply,
-};
-
-#[async_trait]
-pub trait Fuse: Sized + Send + Sync + 'static {
- type Inode: Inode<Fuse = Self> + ?Sized;
- type Farc: Deref<Target = Self::Inode> + Clone + Send + Sync = Arc<Self::Inode>;
-
- async fn init<'o>(&self, reply: Reply<'o, Self, Init>) -> Done<'o>;
-
- async fn statfs<'o>(&self, (_, reply, _): Op<'o, Self, Statfs>) -> Done<'o> {
- reply.not_implemented()
- }
-
- fn request_buffers(&self) -> NonZeroUsize {
- NonZeroUsize::new(16).unwrap()
- }
-
- fn request_buffer_pages(&self) -> NonZeroUsize {
- NonZeroUsize::new(4).unwrap()
- }
-}
-
-#[async_trait]
-pub trait Inode: Send + Sync {
- type Fuse: Fuse<Inode = Self>;
-
- fn ino(self: &FarcTo<Self>) -> Ino;
- fn attrs(self: &FarcTo<Self>) -> (Attrs, TimeToLive);
- fn inode_type(self: &FarcTo<Self>) -> EntryType;
-
- fn direct_io(self: &FarcTo<Self>) -> bool {
- false
- }
-
- fn access<'o>(self: &FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Access>) -> Done<'o> {
- reply.not_implemented()
- }
-
- async fn lookup<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Lookup>) -> Done<'o> {
- reply.not_implemented()
- }
-
- async fn readlink<'o>(
- self: FarcTo<Self>,
- (_, reply, _): Op<'o, Self::Fuse, Readlink>,
- ) -> Done<'o> {
- reply.not_implemented()
- }
-
- async fn open<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Open>) -> Done<'o> {
- // Calling not_implemented() here would ignore direct_io() and similar flags
- reply.ok()
- }
-
- async fn opendir<'o>(
- self: FarcTo<Self>,
- (_, reply, _): Op<'o, Self::Fuse, Opendir>,
- ) -> Done<'o> {
- reply.not_implemented()
- }
-
- async fn readdir<'o>(
- self: FarcTo<Self>,
- (_, reply, _): Op<'o, Self::Fuse, Readdir>,
- ) -> Done<'o> {
- reply.not_implemented()
- }
-}
-
-pub type FarcTo<I> = <<I as Inode>::Fuse as Fuse>::Farc;
diff --git a/src/fuse/io.rs b/src/fuse/io.rs
index 305f0ef..5f85e4d 100644
--- a/src/fuse/io.rs
+++ b/src/fuse/io.rs
@@ -2,19 +2,14 @@ use bytemuck::Zeroable;
use nix::{errno::Errno, sys::stat::SFlag};
use std::{
- borrow::Cow,
convert::Infallible,
- ffi::OsStr,
future::Future,
ops::{ControlFlow, FromResidual, Try},
};
-use crate::{proto, Ino, TimeToLive, Timestamp};
+use crate::{proto, FuseResult, Ino, TimeToLive, Timestamp};
-use super::{
- fs::{Fuse, Inode},
- session, Done, Operation, Reply, Request,
-};
+use super::{Done, Operation, Reply, Request};
#[doc(no_inline)]
pub use nix::{
@@ -23,24 +18,31 @@ pub use nix::{
unistd::{AccessFlags, Gid, Pid, Uid},
};
-pub enum Interruptible<'o, Fs: Fuse, O: Operation<'o, Fs>, T> {
- Completed(Reply<'o, Fs, O>, T),
+pub enum Interruptible<'o, O: Operation<'o>, T> {
+ Completed(Reply<'o, O>, T),
Interrupted(Done<'o>),
}
+pub trait Known {
+ fn ino(&self) -> Ino;
+ fn inode_type(&self) -> EntryType;
+ fn attrs(&self) -> (Attrs, TimeToLive);
+ fn unveil(self);
+}
+
#[derive(Clone)]
pub struct Attrs(proto::Attrs);
-pub struct Entry<'a, Ref> {
+pub struct Entry<N, K> {
pub offset: u64,
- pub name: Cow<'a, OsStr>,
- pub inode: Ref,
+ pub name: N,
+ pub inode: K,
pub ttl: TimeToLive,
}
pub struct FsInfo(proto::StatfsOut);
-impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> {
+impl<'o, O: Operation<'o>> Request<'o, O> {
pub fn ino(&self) -> Ino {
Ino(self.header.ino)
}
@@ -62,13 +64,13 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> {
}
}
-impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
- pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, Fs, O, T>
+impl<'o, O: Operation<'o>> Reply<'o, O> {
+ pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, O, T>
where
F: Future<Output = T>,
{
tokio::pin!(f);
- let mut rx = session::interrupt_rx(self.session);
+ let mut rx = self.session.interrupt_rx();
use Interruptible::*;
loop {
@@ -93,11 +95,9 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
}
}
- pub fn fail(mut self, errno: Errno) -> Done<'o> {
- let errno = errno as i32;
- O::consume_errno(errno, &mut self.tail);
-
- Done::from_result(session::fail(self.session, self.unique, errno))
+ pub fn fail(self, errno: Errno) -> Done<'o> {
+ let result = self.session.fail(self.unique, errno as i32);
+ self.finish(result)
}
pub fn not_implemented(self) -> Done<'o> {
@@ -115,14 +115,18 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
pub fn interrupted(self) -> Done<'o> {
self.fail(Errno::EINTR)
}
+
+ pub(crate) fn finish(self, result: FuseResult<()>) -> Done<'o> {
+ if let Err(error) = result {
+ log::error!("Replying to request {}: {}", self.unique, error);
+ }
+
+ Done::done()
+ }
}
-impl<'o, Fs, O> From<(Reply<'o, Fs, O>, Errno)> for Done<'o>
-where
- Fs: Fuse,
- O: Operation<'o, Fs>,
-{
- fn from((reply, errno): (Reply<'o, Fs, O>, Errno)) -> Done<'o> {
+impl<'o, O: Operation<'o>> From<(Reply<'o, O>, Errno)> for Done<'o> {
+ fn from((reply, errno): (Reply<'o, O>, Errno)) -> Done<'o> {
reply.fail(errno)
}
}
@@ -142,12 +146,8 @@ impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> {
}
}
-impl<'o, Fs, O> FromResidual<Interruptible<'o, Fs, O, Infallible>> for Done<'o>
-where
- Fs: Fuse,
- O: Operation<'o, Fs>,
-{
- fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self {
+impl<'o, O: Operation<'o>> FromResidual<Interruptible<'o, O, Infallible>> for Done<'o> {
+ fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self {
match residual {
Interruptible::Completed(_, _) => unreachable!(),
Interruptible::Interrupted(done) => done,
@@ -168,13 +168,10 @@ impl Try for Done<'_> {
}
}
-impl<'o, Fs, O, T> FromResidual<Interruptible<'o, Fs, O, Infallible>>
- for Interruptible<'o, Fs, O, T>
-where
- Fs: Fuse,
- O: Operation<'o, Fs>,
+impl<'o, O: Operation<'o>, T> FromResidual<Interruptible<'o, O, Infallible>>
+ for Interruptible<'o, O, T>
{
- fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self {
+ fn from_residual(residual: Interruptible<'o, O, Infallible>) -> Self {
use Interruptible::*;
match residual {
@@ -184,13 +181,9 @@ where
}
}
-impl<'o, Fs, O, T> Try for Interruptible<'o, Fs, O, T>
-where
- Fs: Fuse,
- O: Operation<'o, Fs>,
-{
- type Output = (Reply<'o, Fs, O>, T);
- type Residual = Interruptible<'o, Fs, O, Infallible>;
+impl<'o, O: Operation<'o>, T> Try for Interruptible<'o, O, T> {
+ type Output = (Reply<'o, O>, T);
+ type Residual = Interruptible<'o, O, Infallible>;
fn from_output((reply, t): Self::Output) -> Self {
Self::Completed(reply, t)
@@ -234,14 +227,14 @@ impl Attrs {
})
}
- pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self {
+ pub fn times(self, access: Timestamp, modify: Timestamp, create: Timestamp) -> Self {
Attrs(proto::Attrs {
atime: access.seconds,
mtime: modify.seconds,
- ctime: change.seconds,
+ ctime: create.seconds,
atimensec: access.nanoseconds,
mtimensec: modify.nanoseconds,
- ctimensec: change.nanoseconds,
+ ctimensec: create.nanoseconds,
..self.0
})
}
@@ -253,9 +246,9 @@ impl Attrs {
})
}
- pub(crate) fn finish<Fs: Fuse>(self, inode: &Fs::Farc) -> proto::Attrs {
- let Ino(ino) = <Fs as Fuse>::Inode::ino(inode);
- let inode_type = match <Fs as Fuse>::Inode::inode_type(inode) {
+ pub(crate) fn finish(self, inode: &impl Known) -> proto::Attrs {
+ let Ino(ino) = inode.ino();
+ let inode_type = match inode.inode_type() {
EntryType::Fifo => SFlag::S_IFIFO,
EntryType::CharacterDevice => SFlag::S_IFCHR,
EntryType::Directory => SFlag::S_IFDIR,
diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs
index 0e39e6b..5fe6f46 100644
--- a/src/fuse/mod.rs
+++ b/src/fuse/mod.rs
@@ -1,87 +1,46 @@
-use std::{
- collections::HashMap,
- marker::PhantomData,
- os::unix::io::RawFd,
- sync::{Arc, Mutex},
-};
-
-use tokio::{
- io::unix::AsyncFd,
- sync::{broadcast, Notify, Semaphore},
-};
-
-use crate::{proto, util::DumbFd, FuseResult, Ino};
+use crate::proto;
+use std::marker::PhantomData;
pub mod io;
#[doc(cfg(feature = "server"))]
-pub mod fs;
-
-#[doc(cfg(feature = "server"))]
pub mod ops;
#[doc(cfg(feature = "mount"))]
pub mod mount;
-mod session;
-use fs::Fuse;
-
-#[doc(cfg(feature = "server"))]
-pub struct Session<Fs: Fuse> {
- _fusermount_fd: DumbFd,
- session_fd: AsyncFd<RawFd>,
- proto_minor: u32,
- fs: Fs,
- input_semaphore: Arc<Semaphore>,
- large_buffers: Mutex<Vec<Box<[u8]>>>,
- known: Mutex<HashMap<Ino, (Fs::Farc, u64)>>,
- destroy: Notify,
- interrupt_tx: broadcast::Sender<u64>,
-}
-
-#[doc(cfg(feature = "server"))]
-pub struct Start {
- fusermount_fd: DumbFd,
- session_fd: DumbFd,
-}
+pub mod session;
mod private_trait {
- pub trait Operation<'o, Fs: super::Fuse> {
- type RequestBody = ();
- type ReplyTail = ();
-
- fn consume_errno(_errno: i32, _tail: &mut Self::ReplyTail) {}
+ pub trait Operation<'o> {
+ type RequestBody: crate::proto::Structured<'o>;
+ type ReplyTail: Default;
}
}
use private_trait::Operation;
-#[doc(cfg(feature = "server"))]
-pub type Op<'o, Fs, O> = (Request<'o, Fs, O>, Reply<'o, Fs, O>, &'o Arc<Session<Fs>>);
+pub type Op<'o, O = ops::Any> = (Request<'o, O>, Reply<'o, O>);
#[doc(cfg(feature = "server"))]
-pub struct Request<'o, Fs: Fuse, O: Operation<'o, Fs>> {
- header: &'o proto::InHeader,
+pub struct Request<'o, O: Operation<'o>> {
+ header: proto::InHeader,
body: O::RequestBody,
}
#[doc(cfg(feature = "server"))]
-pub struct Reply<'o, Fs: Fuse, O: Operation<'o, Fs>> {
- session: &'o Session<Fs>,
+pub struct Reply<'o, O: Operation<'o>> {
+ session: &'o session::Session,
unique: u64,
tail: O::ReplyTail,
}
#[must_use]
#[doc(cfg(feature = "server"))]
-pub struct Done<'o>(FuseResult<PhantomData<&'o ()>>);
+pub struct Done<'o>(PhantomData<*mut &'o ()>);
impl Done<'_> {
- fn from_result(result: FuseResult<()>) -> Self {
- Done(result.map(|()| PhantomData))
- }
-
- fn into_result(self) -> FuseResult<()> {
- self.0.map(|PhantomData| ())
+ fn done() -> Self {
+ Done(PhantomData)
}
}
diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs
index 955b28a..6464372 100644
--- a/src/fuse/mount.rs
+++ b/src/fuse/mount.rs
@@ -17,7 +17,7 @@ use nix::{
use quick_error::quick_error;
-use super::Start;
+use super::session::Start;
use crate::util::DumbFd;
quick_error! {
@@ -146,10 +146,8 @@ where
Ok(session_fd) => {
let fusermount_fd = DumbFd(left_side.into_raw_fd());
let session_fd = DumbFd(session_fd);
- Ok(Start {
- fusermount_fd,
- session_fd,
- })
+
+ Ok(Start::new(fusermount_fd, session_fd))
}
Err(error) => {
diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs
index bbd49fe..9bca233 100644
--- a/src/fuse/ops.rs
+++ b/src/fuse/ops.rs
@@ -1,9 +1,6 @@
use bytemuck::{bytes_of, Pod, Zeroable};
-use futures_util::stream::{Stream, StreamExt, TryStreamExt};
-use nix::sys::stat::SFlag;
use std::{
- borrow::Borrow,
ffi::{CStr, OsStr},
os::unix::ffi::OsStrExt,
};
@@ -11,56 +8,61 @@ use std::{
use crate::{proto, util::OutputChain, Errno, Ino, TimeToLive};
use super::{
- fs::{Fuse, Inode},
- io::{AccessFlags, Entry, EntryType, FsInfo},
- session, Done, Operation, Reply, Request,
+ io::{AccessFlags, Entry, FsInfo, Interruptible, Known},
+ Done, Operation, Reply, Request,
};
macro_rules! op {
- { $name:ident $operation:tt $(,)+ } => {
+ { $name:ident $operation:tt } => {
pub struct $name(());
- impl<'o, Fs: Fuse> Operation<'o, Fs> for $name $operation
+ impl<'o> Operation<'o> for $name $operation
};
- { $name:ident $operation:tt, Request $request:tt $($next:tt)+ } => {
- impl<'o, Fs: Fuse> Request<'o, Fs, $name> $request
+ { $name:ident $operation:tt impl Request $request:tt $($next:tt)* } => {
+ impl<'o> Request<'o, $name> $request
- op! { $name $operation $($next)+ }
+ op! { $name $operation $($next)* }
};
- { $name:ident $operation:tt, Reply $reply:tt $($next:tt)+ } => {
- impl<'o, Fs: Fuse> Reply<'o, Fs, $name> $reply
+ { $name:ident $operation:tt impl Reply $reply:tt $($next:tt)* } => {
+ impl<'o> Reply<'o, $name> $reply
- op! { $name $operation $($next)+ }
+ op! { $name $operation $($next)* }
};
}
op! {
+ Any {
+ type RequestBody = ();
+ type ReplyTail = ();
+ }
+}
+
+op! {
Lookup {
- // name()
- type RequestBody = &'o CStr;
- },
+ type RequestBody = &'o CStr; // name()
+ type ReplyTail = ();
+ }
- Request {
+ impl Request {
/// Returns the name of the entry being looked up in this directory.
pub fn name(&self) -> &OsStr {
c_to_os(self.body)
}
- },
-
- Reply {
- /// The requested entry was found and a `Farc` was successfully determined from it. The
- /// FUSE client will become aware of the found inode if it wasn't before. This result may
- /// be cached by the client for up to the given TTL.
- pub fn found(self, entry: &Fs::Farc, ttl: TimeToLive) -> Done<'o> {
- let (attrs, attrs_ttl) = <Fs as Fuse>::Inode::attrs(entry);
- session::unveil(self.session, entry);
-
- self.single(&make_entry(
- (<Fs as Fuse>::Inode::ino(entry), ttl),
- (attrs.finish::<Fs>(entry), attrs_ttl),
- ))
+ }
+
+ impl Reply {
+ /// The requested entry was found. The FUSE client will become aware of the found inode if
+ /// it wasn't before. This result may be cached by the client for up to the given TTL.
+ pub fn found(self, entry: impl Known, ttl: TimeToLive) -> Done<'o> {
+ let (attrs, attrs_ttl) = entry.attrs();
+ let attrs = attrs.finish(&entry);
+
+ let done = self.single(&make_entry((entry.ino(), ttl), (attrs, attrs_ttl)));
+ entry.unveil();
+
+ done
}
/// The requested entry was not found in this directory. The FUSE clint may include this
@@ -75,13 +77,37 @@ op! {
pub fn not_found_uncached(self) -> Done<'o> {
self.fail(Errno::ENOENT)
}
- },
+ }
+}
+
+op! {
+ Getattr {
+ type RequestBody = &'o proto::GetattrIn;
+ type ReplyTail = ();
+ }
+
+ impl Reply {
+ pub fn known(self, inode: &impl Known) -> Done<'o> {
+ let (attrs, ttl) = inode.attrs();
+ let attrs = attrs.finish(inode);
+
+ self.single(&proto::AttrOut {
+ attr_valid: ttl.seconds,
+ attr_valid_nsec: ttl.nanoseconds,
+ dummy: Default::default(),
+ attr: attrs,
+ })
+ }
+ }
}
op! {
- Readlink {},
+ Readlink {
+ type RequestBody = ();
+ type ReplyTail = ();
+ }
- Reply {
+ impl Reply {
/// This inode corresponds to a symbolic link pointing to the given target path.
pub fn target(self, target: &OsStr) -> Done<'o> {
self.chain(OutputChain::tail(&[target.as_bytes()]))
@@ -89,230 +115,129 @@ op! {
/// Same as [`Reply::target()`], except that the target path is taken from disjoint
/// slices. This involves no additional allocation.
- pub fn gather_target(self, target: &[&OsStr]) -> Done<'o> {
- //FIXME: Likely UB
- self.chain(OutputChain::tail(unsafe { std::mem::transmute(target) }))
+ pub fn gather_target(self, target: &[&[u8]]) -> Done<'o> {
+ self.chain(OutputChain::tail(target))
}
- },
+ }
}
op! {
Open {
type RequestBody = &'o proto::OpenIn;
- type ReplyTail = (Ino, proto::OpenOutFlags);
- },
+ type ReplyTail = private::OpenFlags;
+ }
+
+ impl Reply {
+ pub fn force_direct_io(&mut self) {
+ self.tail.0 |= proto::OpenOutFlags::DIRECT_IO;
+ }
- Reply {
/// The inode may now be accessed.
pub fn ok(self) -> Done<'o> {
self.ok_with_handle(0)
}
fn ok_with_handle(self, handle: u64) -> Done<'o> {
- let (_, flags) = self.tail;
+ let open_flags = self.tail.0.bits();
+
self.single(&proto::OpenOut {
fh: handle,
- open_flags: flags.bits(),
+ open_flags,
padding: Default::default(),
})
}
- },
+ }
}
-op! { Read {}, }
-/*op! {
+op! {
Read {
- type RequestBody = &'o proto::ReadIn;
- type ReplyTail = &'o mut OutputBytes<'o>;
- },
-
- Request {
- pub fn offset(&self) -> u64 {
- self.body.offset
- }
-
- pub fn size(&self) -> u32 {
- self.body.size
- }
- },
-
- Reply {
- pub fn remaining(&self) -> u64 {
- self.tail.remaining()
- }
-
- pub fn end(self) -> Done<'o> {
- if self.tail.ready() {
- self.chain(OutputChain::tail(self.tail.segments()))
- } else {
- // The read() handler will be invoked again with same OutputBytes
- self.done()
- }
- }
-
- pub fn hole(self, size: u64) -> Result<Self, Done<'o>> {
- self.tail
- }
-
- pub fn copy(self, data: &[u8]) -> Result<Self, Done<'o>> {
- self.self_or_done(self.tail.copy(data))
- }
-
- pub fn put(self, data: &'o [u8]) -> Result<Self, Done<'o>> {
- self.self_or_done(self.tail.put(data))
- }
-
- pub fn gather(self, data: &'o [&'o [u8]]) -> Result<Self, Done<'o>> {
- self.self_or_done(self.tail.gather(data))
- }
-
- fn self_or_done(self, capacity: OutputCapacity) -> Result<Self, Done<'o>> {
- match capacity {
- OutputCapacity::Available => Ok(self),
- OutputCapacity::Filled => Err(self.done()),
- }
- }
- },
-}*/
+ type RequestBody = ();
+ type ReplyTail = ();
+ }
+}
op! {
Write {
type RequestBody = &'o proto::WriteIn;
- },
+ type ReplyTail = ();
+ }
}
op! {
Init {
- type ReplyTail = &'o mut Result<Fs::Farc, i32>;
+ type RequestBody = &'o proto::InitIn;
+ type ReplyTail = ();
+ }
- fn consume_errno(errno: i32, tail: &mut Self::ReplyTail) {
- **tail = Err(errno);
- }
- },
-
- Reply {
- /// Server-side initialization succeeded. The provided `Farc` references the filesystem's
- /// root inode.
- pub fn root(self, root: Fs::Farc) -> Done<'o> {
- *self.tail = Ok(root);
- self.done()
+ impl Reply {
+ pub fn ok(self) -> Done<'o> {
+ self.nop()
}
- },
+ }
}
op! {
- Statfs {},
+ Statfs {
+ type RequestBody = ();
+ type ReplyTail = ();
+ }
- Reply {
+ impl Reply {
/// Replies with filesystem statistics.
pub fn info(self, statfs: FsInfo) -> Done<'o> {
let statfs: proto::StatfsOut = statfs.into();
self.single(&statfs)
}
- },
+ }
}
op! {
Opendir {
type RequestBody = &'o proto::OpendirIn;
- },
+ type ReplyTail = ();
+ }
}
op! {
Readdir {
type RequestBody = &'o proto::ReaddirIn;
- },
+ type ReplyTail = ();
+ }
- Request {
+ impl Request {
/// Returns the base offset in the directory stream to read from.
pub fn offset(&self) -> u64 {
self.body.read_in.offset
}
- },
+ }
- Reply {
- pub fn try_iter<'a, I, E, Ref>(
- self,
- mut entries: I,
- ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)>
+ impl Reply {
+ pub fn entry<N>(self, inode: Entry<N, impl Known>) -> Interruptible<'o, Readdir, ()>
where
- I: Iterator<Item = Result<Entry<'a, Ref>, E>> + Send,
- Ref: Borrow<Fs::Farc>,
+ N: AsRef<OsStr>,
{
- //TODO: This is about as shitty as it gets
- match entries.next().transpose() {
- Ok(Some(entry)) => {
- let Entry {
- name,
- inode,
- offset,
- ..
- } = entry;
-
- let inode = inode.borrow();
- let Ino(ino) = <Fs as Fuse>::Inode::ino(inode);
-
- let dirent = proto::Dirent {
- ino,
- off: offset,
- namelen: name.len() as u32,
- entry_type: (match <Fs as Fuse>::Inode::inode_type(inode) {
- EntryType::Fifo => SFlag::S_IFIFO,
- EntryType::CharacterDevice => SFlag::S_IFCHR,
- EntryType::Directory => SFlag::S_IFDIR,
- EntryType::BlockDevice => SFlag::S_IFBLK,
- EntryType::File => SFlag::S_IFREG,
- EntryType::Symlink => SFlag::S_IFLNK,
- EntryType::Socket => SFlag::S_IFSOCK,
- })
- .bits()
- >> 12,
- };
-
- let dirent = bytes_of(&dirent);
- let name = name.as_bytes();
-
- let padding = [0; 8];
- let padding = &padding[..7 - (dirent.len() + name.len() - 1) % 8];
-
- Ok(self.chain(OutputChain::tail(&[dirent, name, padding])))
- }
-
- Err(error) => Err((self, error)),
-
- Ok(None) => Ok(self.empty()),
- }
+ todo!()
}
- // See rust-lang/rust#61949
- pub async fn try_stream<'a, S, E, Ref>(
- self,
- entries: S,
- ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)>
- where
- S: Stream<Item = Result<Entry<'a, Ref>, E>> + Send,
- Ref: Borrow<Fs::Farc> + Send,
- E: Send,
- {
- //TODO: This is about as shitty as it gets
- let first = entries.boxed().try_next().await;
- self.try_iter(first.transpose().into_iter())
+ pub fn end(self) -> Done<'o> {
+ todo!()
}
- },
+ }
}
op! {
Access {
type RequestBody = &'o proto::AccessIn;
- },
+ type ReplyTail = ();
+ }
- Request {
+ impl Request {
pub fn mask(&self) -> AccessFlags {
AccessFlags::from_bits_truncate(self.body.mask as i32)
}
- },
+ }
- Reply {
+ impl Reply {
pub fn ok(self) -> Done<'o> {
self.empty()
}
@@ -320,12 +245,32 @@ op! {
pub fn permission_denied(self) -> Done<'o> {
self.fail(Errno::EACCES)
}
- },
+ }
+}
+
+op! {
+ Destroy {
+ type RequestBody = ();
+ type ReplyTail = ();
+ }
+}
+
+mod private {
+ use crate::proto;
+
+ #[derive(Copy, Clone)]
+ pub struct OpenFlags(pub proto::OpenOutFlags);
+
+ impl Default for OpenFlags {
+ fn default() -> Self {
+ OpenFlags(proto::OpenOutFlags::empty())
+ }
+ }
}
-impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
- fn done(self) -> Done<'o> {
- Done::from_result(Ok(()))
+impl<'o, O: Operation<'o>> Reply<'o, O> {
+ fn nop(self) -> Done<'o> {
+ Done::done()
}
fn empty(self) -> Done<'o> {
@@ -337,7 +282,8 @@ impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> {
}
fn chain(self, chain: OutputChain<'_>) -> Done<'o> {
- Done::from_result(session::ok(self.session, self.unique, chain))
+ let result = self.session.ok(self.unique, chain);
+ self.finish(result)
}
}
diff --git a/src/fuse/session.rs b/src/fuse/session.rs
index 8975c57..5045099 100644
--- a/src/fuse/session.rs
+++ b/src/fuse/session.rs
@@ -1,148 +1,121 @@
use std::{
- collections::{hash_map, HashMap},
convert::TryInto,
+ future::Future,
io,
+ marker::PhantomData,
os::unix::io::{IntoRawFd, RawFd},
sync::{Arc, Mutex},
};
use nix::{
fcntl::{fcntl, FcntlArg, OFlag},
- sys::uio::{readv, writev, IoVec},
- unistd::{sysconf, SysconfVar},
+ sys::uio::{writev, IoVec},
+ unistd::{read, sysconf, SysconfVar},
};
use tokio::{
io::unix::AsyncFd,
- sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore},
+ sync::{broadcast, OwnedSemaphorePermit, Semaphore},
};
-use bytemuck::{bytes_of, try_from_bytes};
+use bytemuck::bytes_of;
use smallvec::SmallVec;
use crate::{
- proto::{self, InHeader},
- util::{display_or, OutputChain},
- Errno, FuseError, FuseResult, Ino,
+ proto::{self, InHeader, Structured},
+ util::{DumbFd, OutputChain},
+ Errno, FuseError, FuseResult,
};
-use super::{
- fs::{Fuse, Inode},
- Reply, Request, Session, Start,
-};
+use super::{ops, Done, Op, Operation, Reply, Request};
-pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> {
- session.send(unique, 0, output)
+pub struct Start {
+ fusermount_fd: DumbFd,
+ session_fd: DumbFd,
}
-pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> {
- if errno <= 0 {
- log::warn!(
- "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG",
- unique,
- errno
- );
+pub struct Session {
+ _fusermount_fd: DumbFd,
+ session_fd: AsyncFd<RawFd>,
+ interrupt_tx: broadcast::Sender<u64>,
+ buffers: Mutex<Vec<Buffer>>,
+ buffer_semaphore: Arc<Semaphore>,
+ proto_minor: u32,
+ buffer_pages: usize,
+}
- errno = Errno::ENOMSG as i32;
- }
+pub struct Endpoint<'a> {
+ session: &'a Arc<Session>,
+ local_buffer: Buffer,
+}
- session.send(unique, -errno, OutputChain::empty())
+pub enum Dispatch<'o> {
+ Lookup(Incoming<'o, ops::Lookup>),
+ Getattr(Incoming<'o, ops::Getattr>),
+ Readlink(Incoming<'o, ops::Readlink>),
+ Read(Incoming<'o, ops::Read>),
+ Write(Incoming<'o, ops::Write>),
+ Statfs(Incoming<'o, ops::Statfs>),
+ Readdir(Incoming<'o, ops::Readdir>),
+ Access(Incoming<'o, ops::Access>),
+ Destroy(Incoming<'o, ops::Destroy>),
}
-pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) {
- let ino = <Fs as Fuse>::Inode::ino(inode);
- let mut known = session.known.lock().unwrap();
+pub struct Incoming<'o, O: Operation<'o>> {
+ common: IncomingCommon<'o>,
+ _phantom: PhantomData<O>,
+}
- use hash_map::Entry::*;
- match known.entry(ino) {
- Occupied(entry) => {
- let (_, count) = entry.into_mut();
- *count += 1;
- }
+pub struct Owned<O: for<'o> Operation<'o>> {
+ session: Arc<Session>,
+ buffer: Buffer,
+ header: InHeader,
+ _permit: OwnedSemaphorePermit,
+ _phantom: PhantomData<O>,
+}
- Vacant(entry) => {
- entry.insert((Fs::Farc::clone(inode), 1));
+impl Session {
+ pub fn endpoint<'a>(self: &'a Arc<Self>) -> Endpoint<'a> {
+ Endpoint {
+ session: self,
+ local_buffer: Buffer::new(self.buffer_pages),
}
}
-}
-pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> {
- session.interrupt_tx.subscribe()
-}
-
-impl<Fs: Fuse> Session<Fs> {
- pub fn fs(&self) -> &Fs {
- &self.fs
+ pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> {
+ self.send(unique, 0, output)
}
- pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> {
- let this = Arc::clone(&self);
- let main_loop = async move {
- loop {
- let incoming = this.receive().await;
- let this = Arc::clone(&this);
-
- tokio::spawn(async move {
- let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming {
- Ok(mut incoming) => match this.dispatch(&mut incoming).await {
- Ok(()) => (Ok(()), None),
-
- Err(error) => {
- let data = incoming.buffer.data();
- let data = &data[..std::mem::size_of::<InHeader>().max(data.len())];
- (Err(error), try_from_bytes(data).ok().copied())
- }
- },
-
- Err(error) => (Err(error.into()), None),
- };
-
- let header = display_or(header, "(bad)");
- if let Err(error) = result {
- log::error!("Handling request {}: {}", header, error);
- }
- });
- }
- };
+ pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> {
+ if errno <= 0 {
+ log::warn!(
+ "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG",
+ unique,
+ errno
+ );
- tokio::select! {
- () = main_loop => unreachable!(),
- () = self.destroy.notified() => Ok(()),
+ errno = Errno::ENOMSG as i32;
}
- }
- async fn do_handshake(
- &mut self,
- pages_per_buffer: usize,
- bytes_per_buffer: usize,
- ) -> FuseResult<Handshake> {
- use FuseError::*;
-
- let buffer = {
- self.session_fd.readable().await?.retain_ready();
- let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap();
-
- let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE]));
- let sbo = match &mut data {
- InputBufferStorage::Sbo(SboStorage(sbo)) => sbo,
- _ => unreachable!(),
- };
+ self.send(unique, -errno, OutputChain::empty())
+ }
- let mut io_vecs = [
- IoVec::from_mut_slice(sbo),
- IoVec::from_mut_slice(large_buffer),
- ];
+ pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver<u64> {
+ self.interrupt_tx.subscribe()
+ }
- let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(io::Error::from)?;
- InputBuffer { bytes, data }
- };
+ async fn handshake(&mut self, buffer: &mut Buffer) -> FuseResult<Handshake> {
+ self.session_fd.readable().await?.retain_ready();
+ let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?;
- let request: proto::Request<'_> = buffer.data().try_into()?;
+ let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?;
+ let init = match opcode {
+ proto::Opcode::Init => <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes])?,
- let unique = request.header().unique;
- let init = match request.body() {
- proto::RequestBody::Init(body) => body,
- _ => return Err(ProtocolInit),
+ _ => {
+ log::error!("First message from kernel is not Init, but {:?}", opcode);
+ return Err(FuseError::ProtocolInit);
+ }
};
use std::cmp::Ordering;
@@ -156,7 +129,7 @@ impl<Fs: Fuse> Session<Fs> {
Ordering::Greater => {
let tail = [bytes_of(&proto::MAJOR_VERSION)];
- ok(self, unique, OutputChain::tail(&tail))?;
+ self.ok(header.unique, OutputChain::tail(&tail))?;
return Ok(Handshake::Restart);
}
@@ -175,40 +148,27 @@ impl<Fs: Fuse> Session<Fs> {
major = proto::MAJOR_VERSION
);
- fail(self, unique, Errno::EPROTONOSUPPORT as i32)?;
- return Err(ProtocolInit);
+ self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?;
+ return Err(FuseError::ProtocolInit);
}
- let root = {
- let mut init_result = Err(0);
- let reply = Reply {
- session: self,
- unique,
- tail: &mut init_result,
- };
+ let flags = {
+ use proto::InitFlags;
- self.fs.init(reply).await.into_result()?;
+ let kernel = InitFlags::from_bits_truncate(init.flags);
+ let supported = InitFlags::PARALLEL_DIROPS
+ | InitFlags::ABORT_ERROR
+ | InitFlags::MAX_PAGES
+ | InitFlags::CACHE_SYMLINKS;
- match init_result {
- Ok(root) => root,
- Err(errno) => {
- log::error!("init() handler failed: {}", Errno::from_i32(errno));
- return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno)));
- }
- }
+ kernel & supported
};
- self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1));
+ let buffer_size = page_size() * self.buffer_pages;
- use proto::InitFlags;
- let flags = InitFlags::from_bits_truncate(init.flags);
- let supported = InitFlags::PARALLEL_DIROPS
- | InitFlags::ABORT_ERROR
- | InitFlags::MAX_PAGES
- | InitFlags::CACHE_SYMLINKS;
+ // See fs/fuse/dev.c in the kernel source tree for details about max_write
+ let max_write = buffer_size - std::mem::size_of::<(InHeader, proto::WriteIn)>();
- let flags = flags & supported;
- let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>();
let init_out = proto::InitOut {
major: proto::MAJOR_VERSION,
minor: proto::TARGET_MINOR_VERSION,
@@ -218,230 +178,17 @@ impl<Fs: Fuse> Session<Fs> {
congestion_threshold: 0, //TODO
max_write: max_write.try_into().unwrap(),
time_gran: 1, //TODO
- max_pages: pages_per_buffer.try_into().unwrap(),
+ max_pages: self.buffer_pages.try_into().unwrap(),
padding: Default::default(),
unused: Default::default(),
};
let tail = [bytes_of(&init_out)];
- ok(self, unique, OutputChain::tail(&tail))?;
+ self.ok(header.unique, OutputChain::tail(&tail))?;
Ok(Handshake::Done)
}
- async fn dispatch(self: &Arc<Self>, request: &mut Incoming) -> FuseResult<()> {
- let request: proto::Request<'_> = request.buffer.data().try_into()?;
- let header = request.header();
- let InHeader { unique, ino, .. } = *header;
- let ino = Ino(ino);
-
- use proto::RequestBody::*;
-
- macro_rules! op {
- () => {
- op!(())
- };
-
- ($body:expr) => {
- op!($body, ())
- };
-
- ($body:expr, $tail:expr) => {{
- let request = Request {
- header,
- body: $body,
- };
- let reply = Reply {
- session: &self,
- unique,
- tail: $tail,
- };
-
- (request, reply, self)
- }};
- }
-
- // These operations don't involve locking and searching self.known
- match request.body() {
- Forget(body) => {
- self.forget(std::iter::once((ino, body.nlookup))).await;
- return Ok(());
- }
-
- Statfs => return self.fs.statfs(op!()).await.into_result(),
-
- Interrupt(body) => {
- //TODO: Don't reply with EAGAIN if the interrupt is successful
- let _ = self.interrupt_tx.send(body.unique);
- return fail(self, unique, Errno::EAGAIN as i32);
- }
-
- Destroy => {
- self.destroy.notify_one();
- return Ok(());
- }
-
- BatchForget { forgets, .. } => {
- let forgets = forgets
- .iter()
- .map(|target| (Ino(target.ino), target.nlookup));
-
- self.forget(forgets).await;
- return Ok(());
- }
-
- _ => (),
- }
-
- // Some operations are handled while self.known is locked
- let inode = {
- let known = self.known.lock().unwrap();
- let inode = match known.get(&ino) {
- Some((farc, _)) => farc,
- None => {
- log::error!(
- "Lookup count for ino {} reached zero while still \
- known to the kernel, this is a bug",
- ino
- );
-
- return fail(self, unique, Errno::ENOANO as i32);
- }
- };
-
- match request.body() {
- Getattr(_) => {
- //TODO: Getattr flags
- let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode);
- let attrs = attrs.finish::<Fs>(inode);
- drop(known);
-
- let out = proto::AttrOut {
- attr_valid: ttl.seconds,
- attr_valid_nsec: ttl.nanoseconds,
- dummy: Default::default(),
- attr: attrs,
- };
-
- return ok(self, unique, OutputChain::tail(&[bytes_of(&out)]));
- }
-
- Access(body) => {
- return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result()
- }
-
- _ => inode.clone(),
- }
- };
-
- macro_rules! inode_op {
- ($op:ident, $($exprs:expr),+) => {
- <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await
- };
- }
-
- // These operations involve a Farc cloned from self.known
- let done = match request.body() {
- Lookup { name } => inode_op!(lookup, *name),
- Readlink => inode_op!(readlink, ()),
- Open(body) => {
- let mut flags = proto::OpenOutFlags::empty();
- if <Fs as Fuse>::Inode::direct_io(&inode) {
- flags |= proto::OpenOutFlags::DIRECT_IO;
- }
-
- inode_op!(open, *body, (ino, flags))
- }
- Opendir(body) => inode_op!(opendir, *body),
- Readdir(body) => inode_op!(readdir, *body),
-
- _ => return fail(self, unique, Errno::ENOSYS as i32),
- };
-
- done.into_result()
- }
-
- async fn forget<I>(&self, targets: I)
- where
- I: Iterator<Item = (Ino, u64)>,
- {
- let mut known = self.known.lock().unwrap();
-
- for (ino, subtracted) in targets {
- use hash_map::Entry::*;
-
- match known.entry(ino) {
- Occupied(mut entry) => {
- let (_, count) = entry.get_mut();
-
- *count = count.saturating_sub(subtracted);
- if *count > 0 {
- continue;
- }
-
- entry.remove();
- }
-
- Vacant(_) => {
- log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino);
- continue;
- }
- }
- }
- }
-
- async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming> {
- use InputBufferStorage::*;
-
- let permit = Arc::clone(&self.input_semaphore)
- .acquire_owned()
- .await
- .unwrap();
-
- let mut incoming = Incoming {
- buffer: InputBuffer {
- bytes: 0,
- data: Sbo(SboStorage([0; SBO_SIZE])),
- },
- };
-
- let sbo = match &mut incoming.buffer.data {
- Sbo(SboStorage(sbo)) => sbo,
- _ => unreachable!(),
- };
-
- loop {
- let mut readable = self.session_fd.readable().await?;
-
- let mut large_buffers = self.large_buffers.lock().unwrap();
- let large_buffer = large_buffers.last_mut().unwrap();
-
- let mut io_vecs = [
- IoVec::from_mut_slice(sbo),
- IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]),
- ];
-
- let mut read = |fd: &AsyncFd<RawFd>| readv(*fd.get_ref(), &mut io_vecs);
- match readable.try_io(|fd| read(fd).map_err(io::Error::from)) {
- Ok(Ok(bytes)) => {
- if bytes > SBO_SIZE {
- (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo);
- incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit);
- }
-
- incoming.buffer.bytes = bytes;
- return Ok(incoming);
- }
-
- // Interrupted
- Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue,
-
- Ok(Err(error)) => return Err(error),
- Err(_) => continue,
- }
- }
- }
-
fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> {
let after_header: usize = output
.iter()
@@ -479,81 +226,261 @@ impl<Fs: Fuse> Session<Fs> {
}
}
+impl<'o> Dispatch<'o> {
+ pub fn op(self) -> Op<'o> {
+ use Dispatch::*;
+
+ let common = match self {
+ Lookup(incoming) => incoming.common,
+ Getattr(incoming) => incoming.common,
+ Readlink(incoming) => incoming.common,
+ Read(incoming) => incoming.common,
+ Write(incoming) => incoming.common,
+ Statfs(incoming) => incoming.common,
+ Readdir(incoming) => incoming.common,
+ Access(incoming) => incoming.common,
+ Destroy(incoming) => incoming.common,
+ };
+
+ common.into_generic_op()
+ }
+}
+
+impl Endpoint<'_> {
+ pub async fn receive<'a, F, Fut>(&'a mut self, dispatcher: F) -> FuseResult<()>
+ where
+ F: FnOnce(Dispatch<'a>) -> Fut,
+ Fut: Future<Output = Done<'a>>,
+ {
+ let buffer = &mut self.local_buffer.0;
+ let bytes = loop {
+ let mut readable = self.session.session_fd.readable().await?;
+ let mut read = |fd: &AsyncFd<RawFd>| read(*fd.get_ref(), buffer);
+
+ let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) {
+ Ok(result) => result,
+ Err(_) => continue,
+ };
+
+ match result {
+ // Interrupted
+ Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
+
+ result => break result,
+ }
+ };
+
+ let (header, opcode) = proto::InHeader::from_bytes(&buffer[..bytes?])?;
+ let common = IncomingCommon {
+ session: self.session,
+ buffer: &mut self.local_buffer,
+ header,
+ };
+
+ let dispatch = {
+ use proto::Opcode::*;
+
+ macro_rules! dispatch {
+ ($op:ident) => {
+ Dispatch::$op(Incoming {
+ common,
+ _phantom: PhantomData,
+ })
+ };
+ }
+
+ match opcode {
+ Lookup => dispatch!(Lookup),
+ Getattr => dispatch!(Getattr),
+ Readlink => dispatch!(Readlink),
+ Read => dispatch!(Read),
+ Write => dispatch!(Write),
+ Statfs => dispatch!(Statfs),
+ Readdir => dispatch!(Readdir),
+ Access => dispatch!(Access),
+ Destroy => dispatch!(Destroy),
+
+ _ => {
+ log::warn!("Not implemented: {}", common.header);
+
+ let (_request, reply) = common.into_generic_op();
+ let _ = reply.not_implemented();
+
+ return Ok(());
+ }
+ }
+ };
+
+ let _ = dispatcher(dispatch).await;
+ Ok(())
+ }
+}
+
impl Start {
- pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> {
+ pub async fn start(self) -> FuseResult<Arc<Session>> {
let session_fd = self.session_fd.into_raw_fd();
let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE;
fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap();
- let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize;
- let pages_per_buffer = fs.request_buffer_pages().get();
- let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap();
- assert!(bytes_per_buffer >= proto::MIN_READ_SIZE);
+ let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY);
- let mut large_buffers = Vec::with_capacity(fs.request_buffers().get());
- for _ in 0..large_buffers.capacity() {
- large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice());
- }
+ let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO
+ let buffer_count = SHARED_BUFFERS; //TODO
+ let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages))
+ .take(buffer_count)
+ .collect();
- let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY);
let mut session = Session {
_fusermount_fd: self.fusermount_fd,
session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?,
- proto_minor: 0, // Set by Session::do_handshake()
- fs,
- input_semaphore: Arc::new(Semaphore::new(large_buffers.len())),
- large_buffers: Mutex::new(large_buffers),
- known: Mutex::new(HashMap::new()),
- destroy: Notify::new(),
interrupt_tx,
+ buffers: Mutex::new(buffers),
+ buffer_semaphore: Arc::new(Semaphore::new(buffer_count)),
+ proto_minor: 0, // Set by Session::do_handshake()
+ buffer_pages,
};
- loop {
- let state = session
- .do_handshake(pages_per_buffer, bytes_per_buffer)
- .await?;
+ let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap();
- if let Handshake::Done = state {
+ loop {
+ if let Handshake::Done = session.handshake(&mut init_buffer).await? {
+ session.buffers.get_mut().unwrap().push(init_buffer);
break Ok(Arc::new(session));
}
}
}
-}
-enum Handshake {
- Done,
- Restart,
+ pub(crate) fn new(fusermount_fd: DumbFd, session_fd: DumbFd) -> Self {
+ Start {
+ fusermount_fd,
+ session_fd,
+ }
+ }
}
-struct Incoming {
- buffer: InputBuffer,
+impl<'o, O: Operation<'o>> Incoming<'o, O> {
+ pub fn op(self) -> Result<Op<'o, O>, Done<'o>> {
+ try_op(
+ self.common.session,
+ &self.common.buffer.0,
+ self.common.header,
+ )
+ }
}
-struct InputBuffer {
- pub bytes: usize,
- pub data: InputBufferStorage,
+impl<O: for<'o> Operation<'o>> Incoming<'_, O> {
+ pub async fn owned(self) -> Owned<O> {
+ let session = self.common.session;
+
+ let (buffer, permit) = {
+ let semaphore = Arc::clone(&session.buffer_semaphore);
+ let permit = semaphore
+ .acquire_owned()
+ .await
+ .expect("Buffer semaphore error");
+
+ let mut buffers = session.buffers.lock().unwrap();
+ let mut buffer = buffers.pop().expect("Buffer semaphore out of sync");
+
+ std::mem::swap(&mut buffer, self.common.buffer);
+ (buffer, permit)
+ };
+
+ Owned {
+ session: Arc::clone(session),
+ buffer,
+ header: self.common.header,
+ _permit: permit,
+ _phantom: PhantomData,
+ }
+ }
}
-enum InputBufferStorage {
- Sbo(SboStorage),
- Spilled(Box<[u8]>, OwnedSemaphorePermit),
+impl<O: for<'o> Operation<'o>> Owned<O> {
+ pub fn op(&self) -> Result<Op<'_, O>, Done<'_>> {
+ try_op(&self.session, &self.buffer.0, self.header)
+ }
}
-#[repr(align(8))]
-struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]);
+impl<O: for<'o> Operation<'o>> Drop for Owned<O> {
+ fn drop(&mut self) {
+ if let Ok(mut buffers) = self.session.buffers.lock() {
+ let empty = Buffer(Vec::new().into_boxed_slice());
+ buffers.push(std::mem::replace(&mut self.buffer, empty));
+ }
+ }
+}
-const SBO_SIZE: usize = std::mem::size_of::<SboStorage>();
const INTERRUPT_BROADCAST_CAPACITY: usize = 32;
+const SHARED_BUFFERS: usize = 32;
+const HEADER_END: usize = std::mem::size_of::<InHeader>();
-impl InputBuffer {
- fn data(&self) -> &[u8] {
- use InputBufferStorage::*;
- let storage = match &self.data {
- Sbo(sbo) => &sbo.0,
- Spilled(buffer, _) => &buffer[..],
+struct IncomingCommon<'o> {
+ session: &'o Arc<Session>,
+ buffer: &'o mut Buffer,
+ header: proto::InHeader,
+}
+
+enum Handshake {
+ Done,
+ Restart,
+}
+
+struct Buffer(Box<[u8]>);
+
+impl<'o> IncomingCommon<'o> {
+ fn into_generic_op(self) -> Op<'o> {
+ let request = Request {
+ header: self.header,
+ body: (),
};
- &storage[..self.bytes]
+ let reply = Reply {
+ session: self.session,
+ unique: self.header.unique,
+ tail: (),
+ };
+
+ (request, reply)
+ }
+}
+
+impl Buffer {
+ fn new(pages: usize) -> Self {
+ Buffer(vec![0; pages * page_size()].into_boxed_slice())
}
}
+
+fn try_op<'o, O: Operation<'o>>(
+ session: &'o Session,
+ bytes: &'o [u8],
+ header: proto::InHeader,
+) -> Result<Op<'o, O>, Done<'o>> {
+ let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize]) {
+ Ok(body) => body,
+ Err(error) => {
+ log::error!("Parsing request {}: {}", header, error);
+ let reply = Reply::<ops::Any> {
+ session,
+ unique: header.unique,
+ tail: (),
+ };
+
+ return Err(reply.io_error());
+ }
+ };
+
+ let request = Request { header, body };
+ let reply = Reply {
+ session,
+ unique: header.unique,
+ tail: Default::default(),
+ };
+
+ Ok((request, reply))
+}
+
+fn page_size() -> usize {
+ sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize
+}
diff --git a/src/lib.rs b/src/lib.rs
index d45da43..8e39749 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -2,15 +2,8 @@
//!
//! `blown-fuse`
-#![feature(
- concat_idents,
- arbitrary_self_types,
- associated_type_bounds,
- associated_type_defaults,
- trait_alias,
- try_trait_v2,
- doc_cfg
-)]
+#![forbid(unsafe_code)]
+#![feature(try_trait_v2, doc_cfg)]
#[cfg(not(target_os = "linux"))]
compile_error!("Unsupported OS");
diff --git a/src/proto.rs b/src/proto.rs
index 53c2123..f15aaff 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -4,7 +4,7 @@ use bitflags::bitflags;
use bytemuck::{self, Pod};
use bytemuck_derive::{Pod, Zeroable};
use num_enum::TryFromPrimitive;
-use std::{convert::TryFrom, ffi::CStr, fmt, mem};
+use std::{convert::TryFrom, ffi::CStr, fmt};
use crate::{util::display_or, FuseError, FuseResult};
@@ -14,9 +14,15 @@ pub const MAJOR_VERSION: u32 = 7;
pub const TARGET_MINOR_VERSION: u32 = 32;
pub const REQUIRED_MINOR_VERSION: u32 = 31;
-pub struct Request<'a> {
- header: &'a InHeader,
- body: RequestBody<'a>,
+pub trait Structured<'o>: Sized {
+ fn split_from(bytes: &'o [u8], last: bool) -> FuseResult<(Self, &'o [u8])>;
+
+ fn toplevel_from(bytes: &'o [u8]) -> FuseResult<Self> {
+ match Self::split_from(bytes, true)? {
+ (ok, end) if end.is_empty() => Ok(ok),
+ _ => Err(FuseError::BadLength),
+ }
+ }
}
#[derive(Pod, Zeroable, Copy, Clone)]
@@ -40,98 +46,6 @@ pub struct OutHeader {
pub unique: u64,
}
-pub enum RequestBody<'a> {
- Lookup {
- name: &'a CStr,
- },
- Forget(&'a ForgetIn),
- Getattr(&'a GetattrIn),
- Setattr(&'a SetattrIn),
- Readlink,
- Symlink {
- name: &'a CStr,
- target: &'a CStr,
- },
- Mknod {
- prefix: &'a MknodIn,
- name: &'a CStr,
- },
- Mkdir {
- prefix: &'a MkdirIn,
- target: &'a CStr,
- },
- Unlink {
- name: &'a CStr,
- },
- Rmdir {
- name: &'a CStr,
- },
- Rename {
- prefix: &'a RenameIn,
- old: &'a CStr,
- new: &'a CStr,
- },
- Link(&'a LinkIn),
- Open(&'a OpenIn),
- Read(&'a ReadIn),
- Write {
- prefix: &'a WriteIn,
- data: &'a [u8],
- },
- Statfs,
- Release(&'a ReleaseIn),
- Fsync(&'a FsyncIn),
- Setxattr {
- prefix: &'a SetxattrIn,
- name: &'a CStr,
- value: &'a CStr,
- },
- Getxattr {
- prefix: &'a GetxattrIn,
- name: &'a CStr,
- },
- Listxattr(&'a ListxattrIn),
- Removexattr {
- name: &'a CStr,
- },
- Flush(&'a FlushIn),
- Init(&'a InitIn),
- Opendir(&'a OpendirIn),
- Readdir(&'a ReaddirIn),
- Releasedir(&'a ReleasedirIn),
- Fsyncdir(&'a FsyncdirIn),
- Getlk(&'a GetlkIn),
- Setlk(&'a SetlkIn),
- Setlkw(&'a SetlkwIn),
- Access(&'a AccessIn),
- Create {
- prefix: &'a CreateIn,
- name: &'a CStr,
- },
- Interrupt(&'a InterruptIn),
- Bmap(&'a BmapIn),
- Destroy,
- Ioctl {
- prefix: &'a IoctlIn,
- data: &'a [u8],
- },
- Poll(&'a PollIn),
- NotifyReply,
- BatchForget {
- prefix: &'a BatchForgetIn,
- forgets: &'a [ForgetOne],
- },
- Fallocate(&'a FallocateIn),
- ReaddirPlus(&'a ReaddirPlusIn),
- Rename2 {
- prefix: &'a Rename2In,
- old: &'a CStr,
- new: &'a CStr,
- },
- Lseek(&'a LseekIn),
- CopyFileRange(&'a CopyFileRangeIn),
-}
-
#[derive(TryFromPrimitive, Copy, Clone, Debug)]
#[repr(u32)]
pub enum Opcode {
@@ -650,211 +564,52 @@ pub struct CopyFileRangeIn {
pub flags: u64,
}
-impl Request<'_> {
- pub fn header(&self) -> &InHeader {
- self.header
- }
-
- pub fn body(&self) -> &RequestBody<'_> {
- &self.body
+impl<'o> Structured<'o> for () {
+ fn split_from(bytes: &'o [u8], _last: bool) -> FuseResult<(Self, &'o [u8])> {
+ Ok(((), bytes))
}
}
-impl<'a> TryFrom<&'a [u8]> for Request<'a> {
- type Error = FuseError;
-
- fn try_from(bytes: &'a [u8]) -> FuseResult<Self> {
- use FuseError::*;
-
- fn split_from_bytes<T: Pod>(bytes: &[u8]) -> FuseResult<(&T, &[u8])> {
- let (bytes, next_bytes) = bytes.split_at(bytes.len().min(std::mem::size_of::<T>()));
- match bytemuck::try_from_bytes(bytes) {
- Ok(t) => Ok((t, next_bytes)),
- Err(_) => Err(Truncated),
+impl<'o> Structured<'o> for &'o CStr {
+ fn split_from(bytes: &'o [u8], last: bool) -> FuseResult<(Self, &'o [u8])> {
+ let (cstr, after_cstr): (&[u8], &[u8]) = if last {
+ (bytes, &[])
+ } else {
+ match bytes.iter().position(|byte| *byte == b'\0') {
+ Some(nul) => bytes.split_at(nul + 1),
+ None => return Err(FuseError::Truncated),
}
- }
-
- let full_bytes = bytes;
- let (header, mut bytes) = split_from_bytes::<InHeader>(full_bytes)?;
-
- if header.len as usize != full_bytes.len() {
- return Err(BadLength);
- }
-
- let opcode = match Opcode::try_from(header.opcode) {
- Ok(opcode) => opcode,
- Err(_) => return Err(BadOpcode),
};
- macro_rules! prefix {
- ($op:ident, $ident:ident, $is_last:expr) => {
- prefix!($op, $ident);
- };
-
- ($op:ident, $ident:ident) => {
- let ($ident, after_prefix) = split_from_bytes::<concat_idents!($op, In)>(bytes)?;
- bytes = after_prefix;
- };
- }
-
- fn cstr_from_bytes(bytes: &[u8], is_last: bool) -> FuseResult<(&CStr, &[u8])> {
- let (cstr, after_cstr): (&[u8], &[u8]) = if is_last {
- (bytes, &[])
- } else {
- match bytes.iter().position(|byte| *byte == b'\0') {
- Some(nul) => bytes.split_at(nul + 1),
- None => return Err(Truncated),
- }
- };
-
- let cstr = CStr::from_bytes_with_nul(cstr).map_err(|_| BadLength)?;
- Ok((cstr, after_cstr))
- }
-
- macro_rules! cstr {
- ($op:ident, $ident:ident, $is_last:expr) => {
- let ($ident, next_bytes) = cstr_from_bytes(bytes, $is_last)?;
- bytes = next_bytes;
- };
- }
-
- macro_rules! name {
- ($op:ident, $ident:ident, $is_last:expr) => {
- cstr!($op, $ident, $is_last);
- };
- }
-
- macro_rules! value {
- ($op:ident, $ident:ident, $is_last:expr) => {
- cstr!($op, $ident, $is_last);
- };
- }
-
- macro_rules! target {
- ($op:ident, $ident:ident, $is_last:expr) => {
- cstr!($op, $ident, $is_last);
- };
- }
-
- macro_rules! old {
- ($op:ident, $ident:ident, $is_last:expr) => {
- cstr!($op, $ident, $is_last);
- };
- }
+ let cstr = CStr::from_bytes_with_nul(cstr).map_err(|_| FuseError::BadLength)?;
+ Ok((cstr, after_cstr))
+ }
+}
- macro_rules! new {
- ($op:ident, $ident:ident, $is_last:expr) => {
- cstr!($op, $ident, $is_last);
- };
+impl<'o, T: Pod> Structured<'o> for &'o T {
+ fn split_from(bytes: &'o [u8], _last: bool) -> FuseResult<(Self, &'o [u8])> {
+ let (bytes, next_bytes) = bytes.split_at(bytes.len().min(std::mem::size_of::<T>()));
+ match bytemuck::try_from_bytes(bytes) {
+ Ok(t) => Ok((t, next_bytes)),
+ Err(_) => Err(FuseError::Truncated),
}
+ }
+}
- macro_rules! build_body {
- ($op:ident, $last:ident) => {
- $last!($op, $last, true);
- };
-
- ($op:ident, $field:ident, $($next:ident),+) => {
- $field!($op, $field, false);
- build_body!($op, $($next),+);
- };
- }
+impl InHeader {
+ pub fn from_bytes(bytes: &[u8]) -> FuseResult<(Self, Opcode)> {
+ let (header, _) = <&InHeader>::split_from(bytes, false)?;
- macro_rules! body {
- ($op:ident) => {
- RequestBody::$op
- };
-
- ($op:ident, prefix) => {
- {
- prefix!($op, prefix);
- RequestBody::$op(prefix)
- }
- };
-
- ($op:ident, prefix, data where len == $size_field:ident) => {
- {
- prefix!($op, prefix);
- if prefix.$size_field as usize != bytes.len() {
- return Err(BadLength);
- }
-
- RequestBody::$op { prefix, data: mem::take(&mut bytes) }
- }
- };
-
- ($op:ident, $($fields:ident),+) => {
- {
- build_body!($op, $($fields),+);
- RequestBody::$op { $($fields),+ }
- }
- };
+ if header.len as usize != bytes.len() {
+ return Err(FuseError::BadLength);
}
- use Opcode::*;
- let body = match opcode {
- Lookup => body!(Lookup, name),
- Forget => body!(Forget, prefix),
- Getattr => body!(Getattr, prefix),
- Setattr => body!(Setattr, prefix),
- Readlink => body!(Readlink),
- Symlink => body!(Symlink, name, target),
- Mknod => body!(Mknod, prefix, name),
- Mkdir => body!(Mkdir, prefix, target),
- Unlink => body!(Unlink, name),
- Rmdir => body!(Rmdir, name),
- Rename => body!(Rename, prefix, old, new),
- Link => body!(Link, prefix),
- Open => body!(Open, prefix),
- Read => body!(Read, prefix),
- Write => body!(Write, prefix, data where len == size),
- Statfs => body!(Statfs),
- Release => body!(Release, prefix),
- Fsync => body!(Fsync, prefix),
- Setxattr => body!(Setxattr, prefix, name, value),
- Getxattr => body!(Getxattr, prefix, name),
- Listxattr => body!(Listxattr, prefix),
- Removexattr => body!(Removexattr, name),
- Flush => body!(Flush, prefix),
- Init => body!(Init, prefix),
- Opendir => body!(Opendir, prefix),
- Readdir => body!(Readdir, prefix),
- Releasedir => body!(Releasedir, prefix),
- Fsyncdir => body!(Fsyncdir, prefix),
- Getlk => body!(Getlk, prefix),
- Setlk => body!(Setlk, prefix),
- Setlkw => body!(Setlkw, prefix),
- Access => body!(Access, prefix),
- Create => body!(Create, prefix, name),
- Interrupt => body!(Interrupt, prefix),
- Bmap => body!(Bmap, prefix),
- Destroy => body!(Destroy),
- Ioctl => body!(Ioctl, prefix, data where len == in_size),
- Poll => body!(Poll, prefix),
- NotifyReply => body!(NotifyReply),
- BatchForget => {
- prefix!(BatchForget, prefix);
-
- let forgets = mem::take(&mut bytes);
- let forgets = bytemuck::try_cast_slice(forgets).map_err(|_| Truncated)?;
-
- if prefix.count as usize != forgets.len() {
- return Err(BadLength);
- }
-
- RequestBody::BatchForget { prefix, forgets }
- }
- Fallocate => body!(Fallocate, prefix),
- ReaddirPlus => body!(ReaddirPlus, prefix),
- Rename2 => body!(Rename2, prefix, old, new),
- Lseek => body!(Lseek, prefix),
- CopyFileRange => body!(CopyFileRange, prefix),
+ let opcode = match Opcode::try_from(header.opcode) {
+ Ok(opcode) => opcode,
+ Err(_) => return Err(FuseError::BadOpcode),
};
- if bytes.is_empty() {
- Ok(Request { header, body })
- } else {
- Err(BadLength)
- }
+ Ok((*header, opcode))
}
}