diff options
Diffstat (limited to '')
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | Cargo.toml | 31 | ||||
| -rw-r--r-- | examples/ext2.rs | 559 | ||||
| -rw-r--r-- | src/buffers.rs | 0 | ||||
| -rw-r--r-- | src/client.rs | 52 | ||||
| -rw-r--r-- | src/fuse/fs.rs | 127 | ||||
| -rw-r--r-- | src/fuse/io.rs | 319 | ||||
| -rw-r--r-- | src/fuse/mod.rs | 87 | ||||
| -rw-r--r-- | src/fuse/mount.rs | 160 | ||||
| -rw-r--r-- | src/fuse/ops.rs | 366 | ||||
| -rw-r--r-- | src/fuse/session.rs | 569 | ||||
| -rw-r--r-- | src/lib.rs | 113 | ||||
| -rw-r--r-- | src/proto.rs | 884 | ||||
| -rw-r--r-- | src/util.rs | 112 |
14 files changed, 3381 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6d97cea --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "blown-fuse" +version = "0.1.0" +authors = ["Alejandro Soto <alejandro@34project.org>"] +edition = "2018" +description = "Filesystem in Userspace" + +[features] +default = ["server", "mount"] +server = [] +mount = ["server"] +client = [] + +[dependencies] +async-trait = "0.1.42" +bitflags = "1.2.1" +bytemuck = "1.5.0" +bytemuck_derive = "1.0.1" +futures-util = "0.3.12" +log = "0.4.14" +nix = "0.19.1" +num_enum = "0.5.1" +quick-error = "2.0.0" +smallvec = "1.6.1" +tokio = { version = "1.2.0", features = ["rt", "net", "macros", "sync"] } + +[dev-dependencies] +clap = "2.33.3" +env_logger = "0.8.2" +tokio = { version = "1.2.0", features = ["rt-multi-thread"] } +uuid = "0.8.2" diff --git a/examples/ext2.rs b/examples/ext2.rs new file mode 100644 index 0000000..01bbf4b --- /dev/null +++ b/examples/ext2.rs @@ -0,0 +1,559 @@ +/* Read-only ext2 (rev 1.0) implementation. + * + * This is not really async, since the whole backing storage + * is mmap()ed for simplicity, and then treated as a regular + * slice (likely unsound, I don't care). Some yields are + * springled in a few places in order to emulate true async + * operations. + * + * Reference: <https://www.nongnu.org/ext2-doc/ext2.html> + */ + +#![feature(arbitrary_self_types)] + +#[cfg(target_endian = "big")] +compile_error!("This example assumes a little-endian system"); + +use std::{ + ffi::{CStr, OsStr}, + fs::File, + mem::size_of, + os::unix::{ffi::OsStrExt, io::AsRawFd}, + path::{Path, PathBuf}, + time::{Duration, UNIX_EPOCH}, +}; + +use nix::{ + dir::Type, + errno::Errno, + sys::mman::{mmap, MapFlags, ProtFlags}, + sys::stat::Mode, + unistd::{Gid, Uid}, +}; + +use blown_fuse::{ + fs::Fuse, + io::{Attrs, Entry, FsInfo}, + mount::{mount_sync, Options}, + ops::{Init, Lookup, Readdir, Readlink, Statfs}, + Done, Ino, Reply, TimeToLive, +}; + +use async_trait::async_trait; +use bytemuck::{cast_slice, from_bytes, try_from_bytes}; +use bytemuck_derive::{Pod, Zeroable}; +use clap::{App, Arg}; +use futures_util::stream::{self, Stream, TryStreamExt}; +use smallvec::SmallVec; +use tokio::{self, runtime::Runtime}; +use uuid::Uuid; + +const EXT2_ROOT: Ino = Ino(2); + +type Op<'o, O> = blown_fuse::Op<'o, Ext2, O>; + +#[derive(Copy, Clone)] +struct Farc { + ino: Ino, + inode: &'static Inode, +} + +impl std::ops::Deref for Farc { + type Target = Inode; + + fn deref(&self) -> &Self::Target { + self.inode + } +} + +struct Ext2 { + backing: &'static [u8], + superblock: &'static Superblock, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +struct Superblock { + s_inodes_count: u32, + s_blocks_count: u32, + s_r_blocks_count: u32, + s_free_blocks_count: u32, + s_free_inodes_count: u32, + s_first_data_block: u32, + s_log_block_size: u32, + s_log_frag_size: i32, + s_blocks_per_group: u32, + s_frags_per_group: u32, + s_inodes_per_group: u32, + s_mtime: u32, + s_wtime: u32, + s_mnt_count: u16, + s_max_mnt_count: u16, + s_magic: u16, + s_state: u16, + s_errors: u16, + s_minor_rev_level: u16, + s_lastcheck: u32, + s_checkinterval: u32, + s_creator_os: u32, + s_rev_level: u32, + s_def_resuid: u16, + s_def_resgid: u16, + s_first_ino: u32, + s_inode_size: u16, + s_block_group_nr: u16, + s_feature_compat: u32, + s_feature_incompat: u32, + s_feature_ro_compat: u32, + s_uuid: [u8; 16], + s_volume_name: [u8; 16], + s_last_mounted: [u8; 64], +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +struct GroupDescriptor { + bg_block_bitmap: u32, + bg_inode_bitmap: u32, + bg_inode_table: u32, + bg_free_blocks_count: u16, + bg_free_inodes_count: u16, + bg_used_dirs_count: u16, + bg_pad: u16, + bg_reserved: [u32; 3], +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +struct Inode { + i_mode: u16, + i_uid: u16, + i_size: u32, + i_atime: u32, + i_ctime: u32, + i_mtime: u32, + i_dtime: u32, + i_gid: u16, + i_links_count: u16, + i_blocks: u32, + i_flags: u32, + i_osd1: u32, + i_block: [u32; 15], + i_generation: u32, + i_file_acl: u32, + i_dir_acl: u32, + i_faddr: u32, + i_osd2: [u32; 3], +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +struct LinkedEntry { + inode: u32, + rec_len: u16, + name_len: u8, + file_type: u8, +} + +impl Ext2 { + fn directory_stream( + &self, + inode: Farc, + start: u64, + ) -> impl Stream<Item = Result<Entry<'static, Farc>, Errno>> + '_ { + stream::try_unfold(start, move |mut position| async move { + loop { + if position == inode.i_size as u64 { + break Ok(None); // End of stream + } + + let bytes = self.seek_contiguous(&inode, position)?; + let (header, bytes) = bytes.split_at(size_of::<LinkedEntry>()); + let header: &LinkedEntry = from_bytes(header); + + position += header.rec_len as u64; + if header.inode == 0 { + continue; // Unused entry + } + + let inode = self.inode(Ino(header.inode as u64))?; + let name = OsStr::from_bytes(&bytes[..header.name_len as usize]).into(); + + let entry = Entry { + inode, + name, + offset: position, + ttl: TimeToLive::MAX, + }; + + break Ok(Some((entry, position))); + } + }) + } + + fn inode(&self, Ino(ino): Ino) -> Result<Farc, Errno> { + if ino == 0 { + log::error!("Attempted to access the null (0) inode"); + return Err(Errno::EIO); + } + + let index = (ino - 1) as usize; + let inodes_per_group = self.superblock.s_inodes_per_group as usize; + let (block, index) = (index / inodes_per_group, index % inodes_per_group); + + let table_base = self.group_descriptors()?[block].bg_inode_table as usize; + let inode_size = self.superblock.s_inode_size as usize; + + let inodes_per_block = self.block_size() / inode_size; + let block = table_base + index / inodes_per_block; + + let start = index % inodes_per_block * inode_size; + let end = start + size_of::<Inode>(); + + Ok(Farc { + ino: Ino(ino), + inode: from_bytes(&self.block(block)?[start..end]), + }) + } + + fn seek_contiguous(&self, inode: &Farc, position: u64) -> Result<&'static [u8], Errno> { + let block_size = self.block_size(); + let position = position as usize; + let (direct, offset) = (position / block_size, position % block_size); + + let out_of_bounds = || { + log::error!("Offset {} out of bounds in inode {}", position, inode.ino); + }; + + let chase = |indices: &[usize]| { + let root: &[u8] = cast_slice(&inode.inode.i_block); + indices + .iter() + .try_fold(root, |ptrs, index| { + let ptrs: &[u32] = cast_slice(ptrs); + let block = ptrs[*index]; + + if block > 0 { + self.block(ptrs[*index] as usize) + } else { + out_of_bounds(); + Err(Errno::EIO) + } + }) + .map(|block| &block[offset..]) + }; + + const DIRECT_PTRS: usize = 12; + + if direct < DIRECT_PTRS { + return chase(&[direct]); + } + + let ptrs_per_block = block_size / size_of::<u32>(); + let (level1, level1_index) = { + let indirect = direct - DIRECT_PTRS; + (indirect / ptrs_per_block, indirect % ptrs_per_block) + }; + + if level1 == 0 { + return chase(&[DIRECT_PTRS, level1_index]); + } + + let (level2, level2_index) = (level1 / ptrs_per_block, level1 % ptrs_per_block); + if level2 == 0 { + return chase(&[DIRECT_PTRS + 1, level2_index, level1_index]); + } + + let (level3, level3_index) = (level2 / ptrs_per_block, level2 % ptrs_per_block); + if level3 == 0 { + chase(&[DIRECT_PTRS + 2, level3_index, level2_index, level1_index]) + } else { + out_of_bounds(); + Err(Errno::EIO) + } + } + + fn group_descriptors(&self) -> Result<&'static [GroupDescriptor], Errno> { + let start = (self.superblock.s_first_data_block + 1) as usize; + let groups = (self.superblock.s_blocks_count / self.superblock.s_blocks_per_group) as usize; + let descriptors_per_block = self.block_size() / size_of::<GroupDescriptor>(); + let table_blocks = (groups + descriptors_per_block - 1) / descriptors_per_block; + + self.blocks(start..start + table_blocks) + .map(|blocks| &cast_slice(blocks)[..groups]) + } + + fn block(&self, n: usize) -> Result<&'static [u8], Errno> { + self.blocks(n..n + 1) + } + + fn blocks(&self, range: std::ops::Range<usize>) -> Result<&'static [u8], Errno> { + let block_size = self.block_size(); + let (start, end) = (range.start * block_size, range.end * block_size); + + if self.backing.len() >= end { + Ok(&self.backing[start..end]) + } else { + log::error!("Bad block range: ({}..{})", range.start, range.end); + Err(Errno::EIO) + } + } + + fn block_size(&self) -> usize { + 1024usize << self.superblock.s_log_block_size + } +} + +#[async_trait] +impl Fuse for Ext2 { + type Farc = Farc; + type Inode = Inode; + + async fn init<'o>(&self, reply: Reply<'o, Ext2, Init>) -> Done<'o> { + let label = &self.superblock.s_volume_name; + let label = &label[..=label.iter().position(|byte| *byte == b'\0').unwrap_or(0)]; + let label = CStr::from_bytes_with_nul(label) + .ok() + .map(|label| { + let label = label.to_string_lossy(); + if !label.is_empty() { + label + } else { + "(empty)".into() + } + }) + .unwrap_or("(bad)".into()); + + log::info!("UUID: {}", Uuid::from_bytes(self.superblock.s_uuid)); + log::info!("Label: {}", label.escape_debug()); + + if let Ok(root) = self.inode(EXT2_ROOT) { + log::info!("Mounted successfully"); + reply.root(root) + } else { + log::error!("Failed to retrieve the root inode"); + reply.io_error() + } + } + + async fn statfs<'o>(&self, (_, reply, _): Op<'o, Statfs>) -> Done<'o> { + let total_blocks = self.superblock.s_blocks_count as u64; + let free_blocks = self.superblock.s_free_blocks_count as u64; + let available_blocks = free_blocks - self.superblock.s_r_blocks_count as u64; + let total_inodes = self.superblock.s_inodes_count as u64; + let free_inodes = self.superblock.s_free_inodes_count as u64; + + reply.info( + FsInfo::default() + .blocks( + self.block_size() as u32, + total_blocks, + free_blocks, + available_blocks, + ) + .inodes(total_inodes, free_inodes) + .filenames(255), + ) + } +} + +#[async_trait] +impl blown_fuse::fs::Inode for Inode { + type Fuse = Ext2; + + fn ino(self: &Farc) -> Ino { + match self.ino { + Ino::ROOT => EXT2_ROOT, + EXT2_ROOT => Ino::ROOT, + ino => ino, + } + } + + fn inode_type(self: &Farc) -> Type { + let inode_type = self.i_mode >> 12; + match inode_type { + 0x01 => Type::Fifo, + 0x02 => Type::CharacterDevice, + 0x04 => Type::Directory, + 0x06 => Type::BlockDevice, + 0x08 => Type::File, + 0x0A => Type::Symlink, + 0x0C => Type::Socket, + + _ => { + log::error!("Inode {} has invalid type {:x}", self.ino, inode_type); + Type::File + } + } + } + + fn attrs(self: &Farc) -> (Attrs, TimeToLive) { + let (access, modify, change) = { + let time = |seconds: u32| (UNIX_EPOCH + Duration::from_secs(seconds.into())).into(); + (time(self.i_atime), time(self.i_mtime), time(self.i_ctime)) + }; + + let attrs = Attrs::default() + .size((self.i_dir_acl as u64) << 32 | self.i_size as u64) + .owner( + Uid::from_raw(self.i_uid.into()), + Gid::from_raw(self.i_gid.into()), + ) + .mode(Mode::from_bits_truncate(self.i_mode.into())) + .blocks(self.i_blocks.into(), 512) + .times(access, modify, change) + .links(self.i_links_count.into()); + + (attrs, TimeToLive::MAX) + } + + async fn lookup<'o>(self: Farc, (request, reply, session): Op<'o, Lookup>) -> Done<'o> { + let fs = session.fs(); + let name = request.name(); + + //TODO: Indexed directories + let lookup = async move { + let stream = fs.directory_stream(self, 0); + tokio::pin!(stream); + + loop { + match stream.try_next().await? { + Some(entry) if entry.name == name => break Ok(Some(entry.inode)), + Some(_) => continue, + None => break Ok(None), + } + } + }; + + let (reply, result) = reply.interruptible(lookup).await?; + let (reply, inode) = reply.fallible(result)?; + + if let Some(inode) = inode { + reply.found(&inode, TimeToLive::MAX) + } else { + reply.not_found(TimeToLive::MAX) + } + } + + async fn readlink<'o>(self: Farc, (_, reply, session): Op<'o, Readlink>) -> Done<'o> { + if Inode::inode_type(&self) != Type::Symlink { + return reply.invalid_argument(); + } + + let size = self.i_size as usize; + if size < size_of::<[u32; 15]>() { + return reply.target(OsStr::from_bytes(&cast_slice(&self.i_block)[..size])); + } + + let fs = session.fs(); + let segments = async { + /* This is unlikely to ever spill, and is guaranteed not to + * do so for valid symlinks on any fs where block_size >= 4096. + */ + let mut segments = SmallVec::<[&OsStr; 1]>::new(); + let (mut size, mut offset) = (size, 0); + + while size > 0 { + let segment = fs.seek_contiguous(&self, offset)?; + let segment = &segment[..segment.len().min(size)]; + + segments.push(OsStr::from_bytes(segment)); + + size -= segment.len(); + offset += segment.len() as u64; + } + + Ok(segments) + }; + + let (reply, segments) = reply.fallible(segments.await)?; + reply.gather_target(&segments) + } + + async fn readdir<'o>(self: Farc, (request, reply, session): Op<'o, Readdir>) -> Done<'o> { + let stream = session.fs().directory_stream(self, request.offset()); + reply.try_stream(stream).await? + } +} + +fn early_error<T, E>(_: ()) -> Result<T, E> +where + nix::Error: Into<E>, +{ + Err(nix::Error::Sys(Errno::EINVAL).into()) +} + +fn main() -> Result<(), Box<dyn std::error::Error>> { + let matches = App::new("ext2") + .about("read-only ext2 FUSE driver") + .arg(Arg::from_usage("[mount_options] -o <options>... 'See fuse(8)'").number_of_values(1)) + .arg(Arg::from_usage("<image> 'Filesystem image file'")) + .arg(Arg::from_usage("<mountpoint> 'Filesystem mountpoint'")) + .get_matches(); + + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .init(); + + let (image, session) = { + let (image, mountpoint) = { + let required_path = |key| Path::new(matches.value_of(key).unwrap()); + (required_path("image"), required_path("mountpoint")) + }; + + let canonical = image.canonicalize(); + let canonical = canonical.as_ref().map(PathBuf::as_path).unwrap_or(image); + + let mut options = Options::default(); + options + .fs_name(canonical) + .read_only() + .extend(matches.values_of_os("mount_options").into_iter().flatten()); + + (image, mount_sync(mountpoint, &options)?) + }; + + let file = File::open(image)?; + let backing = unsafe { + let length = file.metadata().unwrap().len() as usize; + + let base = mmap( + std::ptr::null_mut(), + length, + ProtFlags::PROT_READ, + MapFlags::MAP_PRIVATE, + file.as_raw_fd(), + 0, + ); + + std::slice::from_raw_parts(base.unwrap() as *const u8, length) + }; + + let superblock = if backing.len() >= 1024 + size_of::<Superblock>() { + Some(&backing[1024..1024 + size_of::<Superblock>()]) + } else { + None + }; + + let superblock = superblock.and_then(|superblock| try_from_bytes(superblock).ok()); + let superblock: &'static Superblock = match superblock { + Some(superblock) => superblock, + None => return early_error(log::error!("Bad superblock")), + }; + + if superblock.s_magic != 0xef53 { + return early_error(log::error!("Bad magic")); + } + + let (major, minor) = (superblock.s_rev_level, superblock.s_minor_rev_level); + if (major, minor) != (1, 0) { + return early_error(log::error!("Unsupported revision: {}.{}", major, minor)); + } + + let fs = Ext2 { + backing, + superblock, + }; + + Ok(Runtime::new()?.block_on(async { session.start(fs).await?.main_loop().await })?) +} diff --git a/src/buffers.rs b/src/buffers.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/buffers.rs diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..9f036c3 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,52 @@ +//! FUSE client. +//! +//! Usually, a kernel module or other OS component takes the role of the FUSE client. This module +//! is a client-wise counterpart to the rest of `blown-fuse` API. So far, this only serves the +//! purpose of having agnostic tests, but wrappers might be written in the future with it. + +/*use crate::{proto, FuseResult}; + +#[cfg(feature = "server")] +use crate::session; + +struct Client {} + +struct RequestContext<'a> { + client: &'a, + uid: Uid, + gid: Gid, + pid: Pid, +} + +impl Client { + pub fn context(&self, uid: Uid, gid: Gid, pid: Pid) -> RequestContext<'_> { + RequestContext { + client: &self, + uid, + gid, + pid, + } + } +} + +impl RequestContext<'_> { + pub async fn lookup(&self) -> FuseResult<> { + self.tail() + } +} + +struct Start {} + +impl Start { + pub async fn start(self) -> FuseResult<Session> { + + } +} + +#[cfg(feature = "server")] +pub fn channel() -> std::io::Result<(session::Start, self::Start)> { + let client_start = ; + let server_start = ; + + (client_stasrt, server_start) +}*/ diff --git a/src/fuse/fs.rs b/src/fuse/fs.rs new file mode 100644 index 0000000..4cf3282 --- /dev/null +++ b/src/fuse/fs.rs @@ -0,0 +1,127 @@ +use async_trait::async_trait; +use nix::errno::Errno; + +use std::{ + num::NonZeroUsize, + ops::{Deref, DerefMut}, + sync::Arc, +}; + +use crate::{Ino, TimeToLive}; + +use super::{ + io::{Attrs, EntryType}, + ops::*, + Done, Op, Reply, +}; + +#[async_trait] +pub trait Fuse: Sized + Send + Sync + 'static { + type Inode: Inode<Fuse = Self> + ?Sized; + type Farc: Deref<Target = Self::Inode> + Clone + Send + Sync = Arc<Self::Inode>; + + async fn init<'o>(&self, reply: Reply<'o, Self, Init>) -> Done<'o>; + + async fn statfs<'o>(&self, (_, reply, _): Op<'o, Self, Statfs>) -> Done<'o> { + reply.not_implemented() + } + + fn request_buffers(&self) -> NonZeroUsize { + NonZeroUsize::new(16).unwrap() + } + + fn request_buffer_pages(&self) -> NonZeroUsize { + NonZeroUsize::new(4).unwrap() + } +} + +#[async_trait] +pub trait Inode: Send + Sync { + type Fuse: Fuse<Inode = Self>; + + fn ino(self: &FarcTo<Self>) -> Ino; + fn attrs(self: &FarcTo<Self>) -> (Attrs, TimeToLive); + fn inode_type(self: &FarcTo<Self>) -> EntryType; + + fn direct_io<'o>(self: &FarcTo<Self>) -> bool { + false + } + + fn access<'o>(self: &FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Access>) -> Done<'o> { + reply.not_implemented() + } + + async fn lookup<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Lookup>) -> Done<'o> { + reply.not_implemented() + } + + async fn readlink<'o>( + self: FarcTo<Self>, + (_, reply, _): Op<'o, Self::Fuse, Readlink>, + ) -> Done<'o> { + reply.not_implemented() + } + + async fn open<'o>(self: FarcTo<Self>, (_, reply, _): Op<'o, Self::Fuse, Open>) -> Done<'o> { + // Calling not_implemented() here would ignore direct_io() and similar flags + reply.ok() + } + + async fn opendir<'o>( + self: FarcTo<Self>, + (_, reply, _): Op<'o, Self::Fuse, Opendir>, + ) -> Done<'o> { + reply.not_implemented() + } + + async fn readdir<'o>( + self: FarcTo<Self>, + (_, reply, _): Op<'o, Self::Fuse, Readdir>, + ) -> Done<'o> { + reply.not_implemented() + } +} + +#[async_trait] +pub trait Tape: Send + Sync { + type Fuse: Fuse; + + async fn seek(self: &mut Head<Self>, offset: u64) -> Result<(), Errno>; + + async fn rewind(self: &mut Head<Self>) -> Result<(), Errno> { + self.seek(0).await + } + + async fn read<'o>(self: &mut Head<Self>, (_, reply, _): Op<'o, Self::Fuse, Read>) -> Done<'o> { + reply.not_implemented() + } + + async fn write<'o>( + self: &mut Head<Self>, + (_, reply, _): Op<'o, Self::Fuse, Write>, + ) -> Done<'o> { + reply.not_implemented() + } +} + +pub type FarcTo<I> = <<I as Inode>::Fuse as Fuse>::Farc; + +pub struct Head<T: Tape + ?Sized> { + offset: u64, + inode: <T::Fuse as Fuse>::Farc, + tape: T, +} + +impl<T: Tape + ?Sized> Deref for Head<T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.tape + } +} + +impl<T: Tape + ?Sized> DerefMut for Head<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.tape + } +} diff --git a/src/fuse/io.rs b/src/fuse/io.rs new file mode 100644 index 0000000..450d9f7 --- /dev/null +++ b/src/fuse/io.rs @@ -0,0 +1,319 @@ +use bytemuck::Zeroable; +use nix::{errno::Errno, sys::stat::SFlag}; + +use std::{ + borrow::Cow, + convert::Infallible, + ffi::OsStr, + future::Future, + ops::{ControlFlow, FromResidual, Try}, +}; + +use crate::{proto, Ino, TimeToLive, Timestamp}; + +use super::{ + fs::{Fuse, Inode}, + session, Done, Operation, Reply, Request, +}; + +#[doc(no_inline)] +pub use nix::{ + dir::Type as EntryType, + sys::stat::Mode, + unistd::{AccessFlags, Gid, Pid, Uid}, +}; + +pub enum Interruptible<'o, Fs: Fuse, O: Operation<'o, Fs>, T> { + Completed(Reply<'o, Fs, O>, T), + Interrupted(Done<'o>), +} + +#[derive(Clone)] +pub struct Attrs(proto::Attrs); + +pub struct Entry<'a, Ref> { + pub offset: u64, + pub name: Cow<'a, OsStr>, + pub inode: Ref, + pub ttl: TimeToLive, +} + +pub struct FsInfo(proto::StatfsOut); + +impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Request<'o, Fs, O> { + pub fn ino(&self) -> Ino { + Ino(self.header.ino) + } + + pub fn generation(&self) -> u64 { + 0 + } + + pub fn uid(&self) -> Uid { + Uid::from_raw(self.header.uid) + } + + pub fn gid(&self) -> Gid { + Gid::from_raw(self.header.gid) + } + + pub fn pid(&self) -> Pid { + Pid::from_raw(self.header.pid as i32) + } +} + +impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { + pub async fn interruptible<F, T>(self, f: F) -> Interruptible<'o, Fs, O, T> + where + F: Future<Output = T>, + { + tokio::pin!(f); + let mut rx = session::interrupt_rx(&self.session); + + use Interruptible::*; + loop { + tokio::select! { + output = &mut f => break Completed(self, output), + + result = rx.recv() => match result { + Ok(unique) if unique == self.unique => { + break Interrupted(self.interrupted()); + } + + _ => continue, + } + } + } + } + + pub fn fallible<T>(self, result: Result<T, Errno>) -> Result<(Self, T), Done<'o>> { + match result { + Ok(t) => Ok((self, t)), + Err(errno) => Err(self.fail(errno)), + } + } + + pub fn fail(mut self, errno: Errno) -> Done<'o> { + let errno = errno as i32; + O::consume_errno(errno, &mut self.tail); + + Done::from_result(session::fail(&self.session, self.unique, errno)) + } + + pub fn not_implemented(self) -> Done<'o> { + self.fail(Errno::ENOSYS) + } + + pub fn io_error(self) -> Done<'o> { + self.fail(Errno::EIO) + } + + pub fn invalid_argument(self) -> Done<'o> { + self.fail(Errno::EINVAL) + } + + pub fn interrupted(self) -> Done<'o> { + self.fail(Errno::EINTR) + } +} + +impl<'o, Fs, O> From<(Reply<'o, Fs, O>, Errno)> for Done<'o> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + fn from((reply, errno): (Reply<'o, Fs, O>, Errno)) -> Done<'o> { + reply.fail(errno) + } +} + +impl<'o> FromResidual<Done<'o>> for Done<'o> { + fn from_residual(residual: Done<'o>) -> Self { + residual + } +} + +impl<'o, T: Into<Done<'o>>> FromResidual<Result<Infallible, T>> for Done<'o> { + fn from_residual(residual: Result<Infallible, T>) -> Self { + match residual { + Ok(_) => unreachable!(), + Err(t) => t.into(), + } + } +} + +impl<'o, Fs, O> FromResidual<Interruptible<'o, Fs, O, Infallible>> for Done<'o> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { + match residual { + Interruptible::Completed(_, _) => unreachable!(), + Interruptible::Interrupted(done) => done, + } + } +} + +impl Try for Done<'_> { + type Output = Self; + type Residual = Self; + + fn from_output(output: Self::Output) -> Self { + output + } + + fn branch(self) -> ControlFlow<Self::Residual, Self::Output> { + ControlFlow::Break(self) + } +} + +impl<'o, Fs, O, T> FromResidual<Interruptible<'o, Fs, O, Infallible>> + for Interruptible<'o, Fs, O, T> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + fn from_residual(residual: Interruptible<'o, Fs, O, Infallible>) -> Self { + use Interruptible::*; + + match residual { + Completed(_, _) => unreachable!(), + Interrupted(done) => Interrupted(done), + } + } +} + +impl<'o, Fs, O, T> Try for Interruptible<'o, Fs, O, T> +where + Fs: Fuse, + O: Operation<'o, Fs>, +{ + type Output = (Reply<'o, Fs, O>, T); + type Residual = Interruptible<'o, Fs, O, Infallible>; + + fn from_output((reply, t): Self::Output) -> Self { + Self::Completed(reply, t) + } + + fn branch(self) -> ControlFlow<Self::Residual, Self::Output> { + use Interruptible::*; + + match self { + Completed(reply, t) => ControlFlow::Continue((reply, t)), + Interrupted(done) => ControlFlow::Break(Interrupted(done)), + } + } +} + +impl Attrs { + pub fn size(self, size: u64) -> Self { + Attrs(proto::Attrs { size, ..self.0 }) + } + + pub fn owner(self, uid: Uid, gid: Gid) -> Self { + Attrs(proto::Attrs { + uid: uid.as_raw(), + gid: gid.as_raw(), + ..self.0 + }) + } + + pub fn mode(self, mode: Mode) -> Self { + Attrs(proto::Attrs { + mode: mode.bits(), + ..self.0 + }) + } + + pub fn blocks(self, blocks: u64, block_size: u32) -> Self { + Attrs(proto::Attrs { + blocks, + blksize: block_size, + ..self.0 + }) + } + + pub fn times(self, access: Timestamp, modify: Timestamp, change: Timestamp) -> Self { + Attrs(proto::Attrs { + atime: access.seconds, + mtime: modify.seconds, + ctime: change.seconds, + atimensec: access.nanoseconds, + mtimensec: modify.nanoseconds, + ctimensec: change.nanoseconds, + ..self.0 + }) + } + + pub fn links(self, links: u32) -> Self { + Attrs(proto::Attrs { + nlink: links, + ..self.0 + }) + } + + pub(crate) fn finish<Fs: Fuse>(self, inode: &Fs::Farc) -> proto::Attrs { + let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); + let inode_type = match <Fs as Fuse>::Inode::inode_type(inode) { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }; + + proto::Attrs { + ino, + mode: self.0.mode | inode_type.bits(), + ..self.0 + } + } +} + +impl Default for Attrs { + fn default() -> Self { + Attrs(Zeroable::zeroed()).links(1) + } +} + +impl FsInfo { + pub fn blocks(self, size: u32, total: u64, free: u64, available: u64) -> Self { + FsInfo(proto::StatfsOut { + bsize: size, + blocks: total, + bfree: free, + bavail: available, + ..self.0 + }) + } + + pub fn inodes(self, total: u64, free: u64) -> Self { + FsInfo(proto::StatfsOut { + files: total, + ffree: free, + ..self.0 + }) + } + + pub fn filenames(self, max: u32) -> Self { + FsInfo(proto::StatfsOut { + namelen: max, + ..self.0 + }) + } +} + +impl Default for FsInfo { + fn default() -> Self { + FsInfo(Zeroable::zeroed()) + } +} + +impl From<FsInfo> for proto::StatfsOut { + fn from(FsInfo(statfs): FsInfo) -> proto::StatfsOut { + statfs + } +} diff --git a/src/fuse/mod.rs b/src/fuse/mod.rs new file mode 100644 index 0000000..0e39e6b --- /dev/null +++ b/src/fuse/mod.rs @@ -0,0 +1,87 @@ +use std::{ + collections::HashMap, + marker::PhantomData, + os::unix::io::RawFd, + sync::{Arc, Mutex}, +}; + +use tokio::{ + io::unix::AsyncFd, + sync::{broadcast, Notify, Semaphore}, +}; + +use crate::{proto, util::DumbFd, FuseResult, Ino}; + +pub mod io; + +#[doc(cfg(feature = "server"))] +pub mod fs; + +#[doc(cfg(feature = "server"))] +pub mod ops; + +#[doc(cfg(feature = "mount"))] +pub mod mount; + +mod session; +use fs::Fuse; + +#[doc(cfg(feature = "server"))] +pub struct Session<Fs: Fuse> { + _fusermount_fd: DumbFd, + session_fd: AsyncFd<RawFd>, + proto_minor: u32, + fs: Fs, + input_semaphore: Arc<Semaphore>, + large_buffers: Mutex<Vec<Box<[u8]>>>, + known: Mutex<HashMap<Ino, (Fs::Farc, u64)>>, + destroy: Notify, + interrupt_tx: broadcast::Sender<u64>, +} + +#[doc(cfg(feature = "server"))] +pub struct Start { + fusermount_fd: DumbFd, + session_fd: DumbFd, +} + +mod private_trait { + pub trait Operation<'o, Fs: super::Fuse> { + type RequestBody = (); + type ReplyTail = (); + + fn consume_errno(_errno: i32, _tail: &mut Self::ReplyTail) {} + } +} + +use private_trait::Operation; + +#[doc(cfg(feature = "server"))] +pub type Op<'o, Fs, O> = (Request<'o, Fs, O>, Reply<'o, Fs, O>, &'o Arc<Session<Fs>>); + +#[doc(cfg(feature = "server"))] +pub struct Request<'o, Fs: Fuse, O: Operation<'o, Fs>> { + header: &'o proto::InHeader, + body: O::RequestBody, +} + +#[doc(cfg(feature = "server"))] +pub struct Reply<'o, Fs: Fuse, O: Operation<'o, Fs>> { + session: &'o Session<Fs>, + unique: u64, + tail: O::ReplyTail, +} + +#[must_use] +#[doc(cfg(feature = "server"))] +pub struct Done<'o>(FuseResult<PhantomData<&'o ()>>); + +impl Done<'_> { + fn from_result(result: FuseResult<()>) -> Self { + Done(result.map(|()| PhantomData)) + } + + fn into_result(self) -> FuseResult<()> { + self.0.map(|PhantomData| ()) + } +} diff --git a/src/fuse/mount.rs b/src/fuse/mount.rs new file mode 100644 index 0000000..ebf7e5d --- /dev/null +++ b/src/fuse/mount.rs @@ -0,0 +1,160 @@ +use std::{ + ffi::{OsStr, OsString}, + os::unix::{ + ffi::OsStrExt, + io::{AsRawFd, IntoRawFd, RawFd}, + net::UnixStream, + }, + process::Command, +}; + +use nix::{ + self, cmsg_space, + fcntl::{fcntl, FcntlArg, FdFlag}, + sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}, +}; + +use quick_error::quick_error; + +use super::Start; +use crate::util::{from_nix_error, DumbFd}; + +quick_error! { + #[derive(Debug)] + pub enum MountError { + Io(err: std::io::Error) { from() } + Fusermount { display("fusermount failed") } + } +} + +#[derive(Default)] +pub struct Options(OsString); + +impl Options { + pub fn fs_name<O: AsRef<OsStr>>(&mut self, fs_name: O) -> &mut Self { + self.push_key_value("fsname", fs_name) + } + + pub fn read_only(&mut self) -> &mut Self { + self.push("ro") + } + + pub fn push<O: AsRef<OsStr>>(&mut self, option: O) -> &mut Self { + self.push_parts(&[option.as_ref()]) + } + + pub fn push_key_value<K, V>(&mut self, key: K, value: V) -> &mut Self + where + K: AsRef<OsStr>, + V: AsRef<OsStr>, + { + let (key, value) = (key.as_ref(), value.as_ref()); + + let assert_valid = |part: &OsStr| { + let bytes = part.as_bytes(); + assert!( + !bytes.is_empty() && bytes.iter().all(|b| !matches!(*b, b',' | b'=')), + "invalid key or value: {}", + part.to_string_lossy() + ); + }; + + assert_valid(key); + assert_valid(value); + + self.push_parts(&[key, OsStr::new("="), value]) + } + + fn push_parts(&mut self, segment: &[&OsStr]) -> &mut Self { + if !self.0.is_empty() { + self.0.push(","); + } + + let start = self.0.as_bytes().len(); + segment.iter().for_each(|part| self.0.push(part)); + + let bytes = self.0.as_bytes(); + let last = bytes.len() - 1; + + assert!( + last >= start && bytes[start] != b',' && bytes[last] != b',', + "invalid option string: {}", + OsStr::from_bytes(&bytes[start..]).to_string_lossy() + ); + + self + } +} + +impl<O: AsRef<OsStr>> Extend<O> for Options { + fn extend<I: IntoIterator<Item = O>>(&mut self, iter: I) { + iter.into_iter().for_each(|option| { + self.push(option); + }); + } +} + +pub fn mount_sync<M>(mountpoint: M, options: &Options) -> Result<Start, MountError> +where + M: AsRef<OsStr>, +{ + let (left_side, right_side) = UnixStream::pair()?; + + // The fusermount protocol requires us to preserve right_fd across execve() + let right_fd = right_side.as_raw_fd(); + fcntl( + right_fd, + FcntlArg::F_SETFD( + FdFlag::from_bits(fcntl(right_fd, FcntlArg::F_GETFD).unwrap()).unwrap() + & !FdFlag::FD_CLOEXEC, + ), + ) + .unwrap(); + + let mut command = Command::new("fusermount3"); + if !options.0.is_empty() { + command.args(&[OsStr::new("-o"), &options.0, mountpoint.as_ref()]); + } else { + command.arg(mountpoint); + }; + + let mut fusermount = command.env("_FUSE_COMMFD", right_fd.to_string()).spawn()?; + + // recvmsg() should fail if fusermount exits (last open fd is closed) + drop(right_side); + + let session_fd = (|| { + let mut buffer = cmsg_space!(RawFd); + let message = recvmsg( + left_side.as_raw_fd(), + &[], + Some(&mut buffer), + MsgFlags::empty(), + ) + .map_err(from_nix_error)?; + + let session_fd = match message.cmsgs().next() { + Some(ControlMessageOwned::ScmRights(fds)) => fds.into_iter().next(), + _ => None, + }; + + session_fd.ok_or(MountError::Fusermount) + })(); + + match session_fd { + Ok(session_fd) => { + let fusermount_fd = DumbFd(left_side.into_raw_fd()); + let session_fd = DumbFd(session_fd); + Ok(Start { + fusermount_fd, + session_fd, + }) + } + + Err(error) => { + drop(left_side); + fusermount.wait()?; + Err(error) + } + } +} diff --git a/src/fuse/ops.rs b/src/fuse/ops.rs new file mode 100644 index 0000000..7a0c18c --- /dev/null +++ b/src/fuse/ops.rs @@ -0,0 +1,366 @@ +use bytemuck::{bytes_of, Pod, Zeroable}; +use futures_util::stream::{Stream, StreamExt, TryStreamExt}; +use nix::sys::stat::SFlag; + +use std::{ + borrow::Borrow, + ffi::{CStr, OsStr}, + os::unix::ffi::OsStrExt, +}; + +use crate::{proto, util::OutputChain, Errno, Ino, TimeToLive}; + +use super::{ + fs::{Fuse, Inode, Tape}, + io::{AccessFlags, Entry, EntryType, FsInfo}, + session, Done, Operation, Reply, Request, +}; + +macro_rules! op { + { $name:ident $operation:tt $(,)+ } => { + pub struct $name(()); + + impl<'o, Fs: Fuse> Operation<'o, Fs> for $name $operation + }; + + { $name:ident $operation:tt, Request $request:tt $($next:tt)+ } => { + impl<'o, Fs: Fuse> Request<'o, Fs, $name> $request + + op! { $name $operation $($next)+ } + }; + + { $name:ident $operation:tt, Reply $reply:tt $($next:tt)+ } => { + impl<'o, Fs: Fuse> Reply<'o, Fs, $name> $reply + + op! { $name $operation $($next)+ } + }; +} + +op! { + Lookup { + // name() + type RequestBody = &'o CStr; + }, + + Request { + /// Returns the name of the entry being looked up in this directory. + pub fn name(&self) -> &OsStr { + c_to_os(self.body) + } + }, + + Reply { + /// The requested entry was found and a `Farc` was successfully determined from it. The + /// FUSE client will become aware of the found inode if it wasn't before. This result may + /// be cached by the client for up to the given TTL. + pub fn found(self, entry: &Fs::Farc, ttl: TimeToLive) -> Done<'o> { + let (attrs, attrs_ttl) = <Fs as Fuse>::Inode::attrs(entry); + session::unveil(&self.session, entry); + + self.single(&make_entry( + (<Fs as Fuse>::Inode::ino(entry), ttl), + (attrs.finish::<Fs>(entry), attrs_ttl), + )) + } + + /// The requested entry was not found in this directory. The FUSE clint may include this + /// response in negative cache for up to the given TTL. + pub fn not_found(self, ttl: TimeToLive) -> Done<'o> { + self.single(&make_entry((Ino::NULL, ttl), (Zeroable::zeroed(), Default::default()))) + } + + /// The requested entry was not found in this directory, but unlike [`Reply::not_found()`] + /// this does not report back a TTL to the FUSE client. The client should not cache the + /// response. + pub fn not_found_uncached(self) -> Done<'o> { + self.fail(Errno::ENOENT) + } + }, +} + +op! { + Readlink {}, + + Reply { + /// This inode corresponds to a symbolic link pointing to the given target path. + pub fn target(self, target: &OsStr) -> Done<'o> { + self.chain(OutputChain::tail(&[target.as_bytes()])) + } + + /// Same as [`Reply::target()`], except that the target path is taken from disjoint + /// slices. This involves no additional allocation. + pub fn gather_target(self, target: &[&OsStr]) -> Done<'o> { + //FIXME: Likely UB + self.chain(OutputChain::tail(unsafe { std::mem::transmute(target) })) + } + }, +} + +op! { + Open { + type RequestBody = &'o proto::OpenIn; + type ReplyTail = (Ino, proto::OpenOutFlags); + }, + + Reply { + /// The iinode may now be accessed. + pub fn ok(self) -> Done<'o> { + self.ok_with_handle(0) + } + + /*pub fn tape<R: Tape<Fuse = Fs>>(self, reel: R) -> Done<'o> { + let (ino, _) = self.tail; + self.ok_with_handle(session::allocate_handle(&self.session, ino, reel)) + }*/ + + fn ok_with_handle(self, handle: u64) -> Done<'o> { + let (_, flags) = self.tail; + self.single(&proto::OpenOut { + fh: handle, + open_flags: flags.bits(), + padding: Default::default(), + }) + } + }, +} + +op! { Read {}, } +/*op! { + Read { + type RequestBody = &'o proto::ReadIn; + type ReplyTail = &'o mut OutputBytes<'o>; + }, + + Request { + pub fn offset(&self) -> u64 { + self.body.offset + } + + pub fn size(&self) -> u32 { + self.body.size + } + }, + + Reply { + pub fn remaining(&self) -> u64 { + self.tail.remaining() + } + + pub fn end(self) -> Done<'o> { + if self.tail.ready() { + self.chain(OutputChain::tail(self.tail.segments())) + } else { + // The read() handler will be invoked again with same OutputBytes + self.done() + } + } + + pub fn hole(self, size: u64) -> Result<Self, Done<'o>> { + self.tail + } + + pub fn copy(self, data: &[u8]) -> Result<Self, Done<'o>> { + self.self_or_done(self.tail.copy(data)) + } + + pub fn put(self, data: &'o [u8]) -> Result<Self, Done<'o>> { + self.self_or_done(self.tail.put(data)) + } + + pub fn gather(self, data: &'o [&'o [u8]]) -> Result<Self, Done<'o>> { + self.self_or_done(self.tail.gather(data)) + } + + fn self_or_done(self, capacity: OutputCapacity) -> Result<Self, Done<'o>> { + match capacity { + OutputCapacity::Available => Ok(self), + OutputCapacity::Filled => Err(self.done()), + } + } + }, +}*/ + +op! { + Write { + type RequestBody = &'o proto::WriteIn; + }, +} + +op! { + Init { + type ReplyTail = &'o mut Result<Fs::Farc, i32>; + + fn consume_errno(errno: i32, tail: &mut Self::ReplyTail) { + **tail = Err(errno); + } + }, + + Reply { + /// Server-side initialization succeeded. The provided `Farc` references the filesystem's + /// root inode. + pub fn root(self, root: Fs::Farc) -> Done<'o> { + *self.tail = Ok(root); + self.done() + } + }, +} + +op! { + Statfs {}, + + Reply { + /// Replies with filesystem statistics. + pub fn info(self, statfs: FsInfo) -> Done<'o> { + let statfs: proto::StatfsOut = statfs.into(); + self.single(&statfs) + } + }, +} + +op! { + Opendir { + type RequestBody = &'o proto::OpendirIn; + }, +} + +op! { + Readdir { + type RequestBody = &'o proto::ReaddirIn; + }, + + Request { + /// Returns the base offset in the directory stream to read from. + pub fn offset(&self) -> u64 { + self.body.read_in.offset + } + }, + + Reply { + pub fn try_iter<'a, I, E, Ref>( + self, + mut entries: I, + ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> + where + I: Iterator<Item = Result<Entry<'a, Ref>, E>> + Send, + Ref: Borrow<Fs::Farc>, + { + //TODO: This is about as shitty as it gets + match entries.next().transpose() { + Ok(Some(entry)) => { + let Entry { + name, + inode, + offset, + .. + } = entry; + + let inode = inode.borrow(); + let Ino(ino) = <Fs as Fuse>::Inode::ino(inode); + + let dirent = proto::Dirent { + ino, + off: offset, + namelen: name.len() as u32, + entry_type: (match <Fs as Fuse>::Inode::inode_type(inode) { + EntryType::Fifo => SFlag::S_IFIFO, + EntryType::CharacterDevice => SFlag::S_IFCHR, + EntryType::Directory => SFlag::S_IFDIR, + EntryType::BlockDevice => SFlag::S_IFBLK, + EntryType::File => SFlag::S_IFREG, + EntryType::Symlink => SFlag::S_IFLNK, + EntryType::Socket => SFlag::S_IFSOCK, + }) + .bits() + >> 12, + }; + + let dirent = bytes_of(&dirent); + let name = name.as_bytes(); + + let padding = [0; 8]; + let padding = &padding[..7 - (dirent.len() + name.len() - 1) % 8]; + + Ok(self.chain(OutputChain::tail(&[dirent, name, padding]))) + } + + Err(error) => Err((self, error)), + + Ok(None) => Ok(self.empty()), + } + } + + // See rust-lang/rust#61949 + pub async fn try_stream<'a, S, E, Ref>( + self, + entries: S, + ) -> Result<Done<'o>, (Reply<'o, Fs, Readdir>, E)> + where + S: Stream<Item = Result<Entry<'a, Ref>, E>> + Send, + Ref: Borrow<Fs::Farc> + Send, + E: Send, + { + //TODO: This is about as shitty as it gets + let first = entries.boxed().try_next().await; + self.try_iter(first.transpose().into_iter()) + } + }, +} + +op! { + Access { + type RequestBody = &'o proto::AccessIn; + }, + + Request { + pub fn mask(&self) -> AccessFlags { + AccessFlags::from_bits_truncate(self.body.mask as i32) + } + }, + + Reply { + pub fn ok(self) -> Done<'o> { + self.empty() + } + + pub fn permission_denied(self) -> Done<'o> { + self.fail(Errno::EACCES) + } + }, +} + +impl<'o, Fs: Fuse, O: Operation<'o, Fs>> Reply<'o, Fs, O> { + fn done(self) -> Done<'o> { + Done::from_result(Ok(())) + } + + fn empty(self) -> Done<'o> { + self.chain(OutputChain::empty()) + } + + fn single<P: Pod>(self, single: &P) -> Done<'o> { + self.chain(OutputChain::tail(&[bytes_of(single)])) + } + + fn chain(self, chain: OutputChain<'_>) -> Done<'o> { + Done::from_result(session::ok(&self.session, self.unique, chain)) + } +} + +fn c_to_os(string: &CStr) -> &OsStr { + OsStr::from_bytes(string.to_bytes()) +} + +fn make_entry( + (Ino(ino), entry_ttl): (Ino, TimeToLive), + (attrs, attr_ttl): (proto::Attrs, TimeToLive), +) -> proto::EntryOut { + proto::EntryOut { + nodeid: ino, + generation: 0, //TODO + entry_valid: entry_ttl.seconds, + attr_valid: attr_ttl.seconds, + entry_valid_nsec: entry_ttl.nanoseconds, + attr_valid_nsec: attr_ttl.nanoseconds, + attr: attrs, + } +} diff --git a/src/fuse/session.rs b/src/fuse/session.rs new file mode 100644 index 0000000..35ebb69 --- /dev/null +++ b/src/fuse/session.rs @@ -0,0 +1,569 @@ +use std::{ + collections::{hash_map, HashMap}, + convert::TryInto, + os::unix::io::IntoRawFd, + sync::{Arc, Mutex}, +}; + +use nix::{ + fcntl::{fcntl, FcntlArg, OFlag}, + sys::uio::{readv, writev, IoVec}, + unistd::{sysconf, SysconfVar}, +}; + +use tokio::{ + io::unix::AsyncFd, + sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore}, +}; + +use bytemuck::{bytes_of, try_from_bytes}; +use smallvec::SmallVec; + +use crate::{ + proto::{self, InHeader}, + util::{display_or, from_nix_error, OutputChain}, + Errno, FuseError, FuseResult, Ino, +}; + +use super::{ + fs::{Fuse, Inode}, + Reply, Request, Session, Start, +}; + +pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { + session.send(unique, 0, output) +} + +pub fn notify<Fs: Fuse>( + session: &Session<Fs>, + op: proto::NotifyCode, + output: OutputChain<'_>, +) -> FuseResult<()> { + session.send(0, op as i32, output) +} + +pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> { + if errno <= 0 { + log::warn!( + "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", + unique, + errno + ); + + errno = Errno::ENOMSG as i32; + } + + session.send(unique, -errno, OutputChain::empty()) +} + +pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) { + let ino = <Fs as Fuse>::Inode::ino(inode); + let mut known = session.known.lock().unwrap(); + + use hash_map::Entry::*; + match known.entry(ino) { + Occupied(entry) => { + let (_, count) = entry.into_mut(); + *count += 1; + } + + Vacant(entry) => { + entry.insert((Fs::Farc::clone(inode), 1)); + } + } +} + +pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> { + session.interrupt_tx.subscribe() +} + +impl<Fs: Fuse> Session<Fs> { + pub fn fs(&self) -> &Fs { + &self.fs + } + + pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> { + let this = Arc::clone(&self); + let main_loop = async move { + loop { + let incoming = this.receive().await; + let this = Arc::clone(&this); + + tokio::spawn(async move { + let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming { + Ok(mut incoming) => match this.dispatch(&mut incoming).await { + Ok(()) => (Ok(()), None), + + Err(error) => { + let data = incoming.buffer.data(); + let data = &data[..std::mem::size_of::<InHeader>().max(data.len())]; + (Err(error), try_from_bytes(data).ok().copied()) + } + }, + + Err(error) => (Err(error.into()), None), + }; + + let header = display_or(header, "(bad)"); + if let Err(error) = result { + log::error!("Handling request {}: {}", header, error); + } + }); + } + }; + + tokio::select! { + () = main_loop => unreachable!(), + () = self.destroy.notified() => Ok(()), + } + } + + async fn do_handshake( + &mut self, + pages_per_buffer: usize, + bytes_per_buffer: usize, + ) -> FuseResult<Handshake> { + use FuseError::*; + + let buffer = { + self.session_fd.readable().await?.retain_ready(); + let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap(); + + let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE])); + let sbo = match &mut data { + InputBufferStorage::Sbo(SboStorage(sbo)) => sbo, + _ => unreachable!(), + }; + + let mut io_vecs = [ + IoVec::from_mut_slice(sbo), + IoVec::from_mut_slice(large_buffer), + ]; + + let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(from_nix_error)?; + InputBuffer { bytes, data } + }; + + let request: proto::Request<'_> = buffer.data().try_into()?; + + let unique = request.header().unique; + let init = match request.body() { + proto::RequestBody::Init(body) => body, + _ => return Err(ProtocolInit), + }; + + use std::cmp::Ordering; + let supported = match init.major.cmp(&proto::MAJOR_VERSION) { + Ordering::Less => false, + + Ordering::Equal => { + self.proto_minor = init.minor; + self.proto_minor >= proto::REQUIRED_MINOR_VERSION + } + + Ordering::Greater => { + let tail = [bytes_of(&proto::MAJOR_VERSION)]; + ok(&self, unique, OutputChain::tail(&tail))?; + + return Ok(Handshake::Restart); + } + }; + + //TODO: fake some decency by supporting a few older minor versions + if !supported { + log::error!( + "Unsupported protocol {}.{}; this build requires \ + {major}.{}..={major}.{} (or a greater version \ + through compatibility)", + init.major, + init.minor, + proto::REQUIRED_MINOR_VERSION, + proto::TARGET_MINOR_VERSION, + major = proto::MAJOR_VERSION + ); + + fail(&self, unique, Errno::EPROTONOSUPPORT as i32)?; + return Err(ProtocolInit); + } + + let root = { + let mut init_result = Err(0); + let reply = Reply { + session: &self, + unique, + tail: &mut init_result, + }; + + self.fs.init(reply).await.into_result()?; + + match init_result { + Ok(root) => root, + Err(errno) => { + log::error!("init() handler failed: {}", Errno::from_i32(errno)); + return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno))); + } + } + }; + + self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1)); + + use proto::InitFlags; + let flags = InitFlags::from_bits_truncate(init.flags); + let supported = InitFlags::PARALLEL_DIROPS + | InitFlags::ABORT_ERROR + | InitFlags::MAX_PAGES + | InitFlags::CACHE_SYMLINKS; + + let flags = flags & supported; + let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>(); + let init_out = proto::InitOut { + major: proto::MAJOR_VERSION, + minor: proto::TARGET_MINOR_VERSION, + max_readahead: 0, //TODO + flags: flags.bits(), + max_background: 0, //TODO + congestion_threshold: 0, //TODO + max_write: max_write.try_into().unwrap(), + time_gran: 1, //TODO + max_pages: pages_per_buffer.try_into().unwrap(), + padding: Default::default(), + unused: Default::default(), + }; + + let tail = [bytes_of(&init_out)]; + ok(&self, unique, OutputChain::tail(&tail))?; + + Ok(Handshake::Done) + } + + async fn dispatch(self: &Arc<Self>, request: &mut Incoming<Fs>) -> FuseResult<()> { + let request: proto::Request<'_> = request.buffer.data().try_into()?; + let header = request.header(); + let InHeader { unique, ino, .. } = *header; + let ino = Ino(ino); + + use proto::RequestBody::*; + + macro_rules! op { + () => { + op!(()) + }; + + ($body:expr) => { + op!($body, ()) + }; + + ($body:expr, $tail:expr) => {{ + let request = Request { + header, + body: $body, + }; + let reply = Reply { + session: &self, + unique, + tail: $tail, + }; + + (request, reply, self) + }}; + } + + // These operations don't involve locking and searching self.known + match request.body() { + Forget(body) => { + self.forget(std::iter::once((ino, body.nlookup))).await; + return Ok(()); + } + + Statfs => return self.fs.statfs(op!()).await.into_result(), + + Interrupt(body) => { + //TODO: Don't reply with EAGAIN if the interrupt is successful + let _ = self.interrupt_tx.send(body.unique); + return fail(&self, unique, Errno::EAGAIN as i32); + } + + Destroy => { + self.destroy.notify_one(); + return Ok(()); + } + + BatchForget { forgets, .. } => { + let forgets = forgets + .iter() + .map(|target| (Ino(target.ino), target.nlookup)); + + self.forget(forgets).await; + return Ok(()); + } + + _ => (), + } + + // Some operations are handled while self.known is locked + let inode = { + let known = self.known.lock().unwrap(); + let inode = match known.get(&ino) { + Some((farc, _)) => farc, + None => { + log::error!( + "Lookup count for ino {} reached zero while still \ + known to the kernel, this is a bug", + ino + ); + + return fail(&self, unique, Errno::ENOANO as i32); + } + }; + + match request.body() { + Getattr(_) => { + //TODO: Getattr flags + let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode); + let attrs = attrs.finish::<Fs>(inode); + drop(known); + + let out = proto::AttrOut { + attr_valid: ttl.seconds, + attr_valid_nsec: ttl.nanoseconds, + dummy: Default::default(), + attr: attrs, + }; + + return ok(&self, unique, OutputChain::tail(&[bytes_of(&out)])); + } + + Access(body) => { + return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result() + } + + _ => inode.clone(), + } + }; + + macro_rules! inode_op { + ($op:ident, $($exprs:expr),+) => { + <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await + }; + } + + // These operations involve a Farc cloned from self.known + let done = match request.body() { + Lookup { name } => inode_op!(lookup, *name), + Readlink => inode_op!(readlink, ()), + Open(body) => { + let mut flags = proto::OpenOutFlags::empty(); + if <Fs as Fuse>::Inode::direct_io(&inode) { + flags |= proto::OpenOutFlags::DIRECT_IO; + } + + inode_op!(open, *body, (ino, flags)) + } + Opendir(body) => inode_op!(opendir, *body), + Readdir(body) => inode_op!(readdir, *body), + + _ => return fail(&self, unique, Errno::ENOSYS as i32), + }; + + done.into_result() + } + + async fn forget<I>(&self, targets: I) + where + I: Iterator<Item = (Ino, u64)>, + { + let mut known = self.known.lock().unwrap(); + + for (ino, subtracted) in targets { + use hash_map::Entry::*; + + match known.entry(ino) { + Occupied(mut entry) => { + let (_, count) = entry.get_mut(); + + *count = count.saturating_sub(subtracted); + if *count > 0 { + continue; + } + + entry.remove(); + } + + Vacant(_) => { + log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino); + continue; + } + } + } + } + + async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming<Fs>> { + use InputBufferStorage::*; + + let permit = Arc::clone(&self.input_semaphore) + .acquire_owned() + .await + .unwrap(); + + let mut incoming = Incoming { + session: Arc::clone(self), + buffer: InputBuffer { + bytes: 0, + data: Sbo(SboStorage([0; SBO_SIZE])), + }, + }; + + let sbo = match &mut incoming.buffer.data { + Sbo(SboStorage(sbo)) => sbo, + _ => unreachable!(), + }; + + loop { + let mut readable = self.session_fd.readable().await?; + + let mut large_buffers = self.large_buffers.lock().unwrap(); + let large_buffer = large_buffers.last_mut().unwrap(); + + let mut io_vecs = [ + IoVec::from_mut_slice(sbo), + IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]), + ]; + + match readable.try_io(|fd| readv(*fd.get_ref(), &mut io_vecs).map_err(from_nix_error)) { + Ok(Ok(bytes)) => { + if bytes > SBO_SIZE { + (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo); + incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit); + } + + incoming.buffer.bytes = bytes; + return Ok(incoming); + } + + // Interrupted + Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue, + + Ok(Err(error)) => return Err(error), + Err(_) => continue, + } + } + } + + fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { + let length = std::mem::size_of::<proto::OutHeader>(); + let length = length + + output + .iter() + .map(<[_]>::iter) + .flatten() + .copied() + .map(<[_]>::len) + .sum::<usize>(); + + let length = length.try_into().unwrap(); + let header = proto::OutHeader { + len: length, + error, + unique, + }; + + //TODO: Full const generics any time now? Fs::EXPECTED_REQUEST_SEGMENTS + let header = [bytes_of(&header)]; + let output = output.preceded(&header); + let buffers: SmallVec<[_; 8]> = output + .iter() + .map(<[_]>::iter) + .flatten() + .copied() + .filter(|slice| !slice.is_empty()) + .map(IoVec::from_slice) + .collect(); + + let written = writev(*self.session_fd.get_ref(), &buffers).map_err(from_nix_error)?; + if written == length as usize { + Ok(()) + } else { + Err(FuseError::ShortWrite) + } + } +} + +impl Start { + pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> { + let session_fd = self.session_fd.into_raw_fd(); + + let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; + fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); + + let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize; + let pages_per_buffer = fs.request_buffer_pages().get(); + let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap(); + assert!(bytes_per_buffer >= proto::MIN_READ_SIZE); + + let mut large_buffers = Vec::with_capacity(fs.request_buffers().get()); + for _ in 0..large_buffers.capacity() { + large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice()); + } + + let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); + let mut session = Session { + _fusermount_fd: self.fusermount_fd, + session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, + proto_minor: 0, // Set by Session::do_handshake() + fs, + input_semaphore: Arc::new(Semaphore::new(large_buffers.len())), + large_buffers: Mutex::new(large_buffers), + known: Mutex::new(HashMap::new()), + destroy: Notify::new(), + interrupt_tx, + }; + + loop { + let state = session + .do_handshake(pages_per_buffer, bytes_per_buffer) + .await?; + + if let Handshake::Done = state { + break Ok(Arc::new(session)); + } + } + } +} + +enum Handshake { + Done, + Restart, +} + +struct Incoming<Fs: Fuse> { + session: Arc<Session<Fs>>, + buffer: InputBuffer, +} + +struct InputBuffer { + pub bytes: usize, + pub data: InputBufferStorage, +} + +enum InputBufferStorage { + Sbo(SboStorage), + Spilled(Box<[u8]>, OwnedSemaphorePermit), +} + +#[repr(align(8))] +struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]); + +const SBO_SIZE: usize = std::mem::size_of::<SboStorage>(); +const INTERRUPT_BROADCAST_CAPACITY: usize = 32; + +impl InputBuffer { + fn data(&self) -> &[u8] { + use InputBufferStorage::*; + let storage = match &self.data { + Sbo(sbo) => &sbo.0, + Spilled(buffer, _) => &buffer[..], + }; + + &storage[..self.bytes] + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..d45da43 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,113 @@ +//! An asynchronous and high-level implementation of the Filesystem in Userspace protocol. +//! +//! `blown-fuse` + +#![feature( + concat_idents, + arbitrary_self_types, + associated_type_bounds, + associated_type_defaults, + trait_alias, + try_trait_v2, + doc_cfg +)] + +#[cfg(not(target_os = "linux"))] +compile_error!("Unsupported OS"); + +use std::time::{SystemTime, UNIX_EPOCH}; + +pub use nix; + +#[cfg(any(feature = "server", doc))] +#[doc(cfg(feature = "server"))] +pub use crate::fuse::*; + +#[cfg(any(feature = "client", doc, test))] +#[doc(cfg(feature = "client"))] +pub mod client; + +mod proto; +mod util; + +#[cfg(any(feature = "server", doc))] +#[doc(cfg(feature = "server"))] +mod fuse; + +#[doc(no_inline)] +pub use nix::errno::Errno; + +pub use util::{FuseError, FuseResult}; + +#[derive(Copy, Clone, Default, Eq, PartialEq)] +pub struct TimeToLive { + seconds: u64, + nanoseconds: u32, +} + +#[derive(Copy, Clone, Default, Eq, PartialEq)] +pub struct Timestamp { + seconds: u64, + nanoseconds: u32, +} + +/// Inode number. +/// +/// This is a public newtype. Users are expected to inspect the underlying `u64` and construct +/// arbitrary `Ino` objects. +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct Ino(pub u64); + +impl Ino { + /// The invalid inode number, mostly useful for internal aspects of the FUSE protocol. + pub const NULL: Self = Ino(0); + + /// The inode number of the root inode as observed by a FUSE client. Other libraries refer to + /// this as `FUSE_ROOT_ID`. + /// + /// Note that a mounted session will remember the inode number given by `Inode::ino()` for the + /// root inode at initialization and transparently swap between it and `Ino::ROOT`. During + /// dispatch, requests targeted at `Ino::ROOT` will have this value replaced by the stored root + /// number, while replies involving the root inode will always report `Ino::ROOT` to the FUSE + /// client. Therefore, filesystems do not have to be aware of `Ino::ROOT` in most cases. + pub const ROOT: Self = Ino(proto::ROOT_ID); +} + +impl std::fmt::Display for Ino { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "{}", self.0) + } +} + +impl TimeToLive { + pub const MAX: Self = TimeToLive { + seconds: u64::MAX, + nanoseconds: u32::MAX, + }; + + pub fn new(seconds: u64, nanoseconds: u32) -> TimeToLive { + assert!(nanoseconds < 1_000_000_000); + TimeToLive { + seconds, + nanoseconds, + } + } + + pub fn seconds(&self) -> u64 { + self.seconds + } + + pub fn nanoseconds(&self) -> u32 { + self.nanoseconds + } +} + +impl From<SystemTime> for Timestamp { + fn from(time: SystemTime) -> Self { + let duration = time.duration_since(UNIX_EPOCH).unwrap(); + Timestamp { + seconds: duration.as_secs(), + nanoseconds: duration.subsec_nanos(), + } + } +} diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 0000000..c6b9925 --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,884 @@ +// Based on libfuse/include/fuse_kernel.h + +use bitflags::bitflags; +use bytemuck::{self, Pod}; +use bytemuck_derive::{Pod, Zeroable}; +use num_enum::TryFromPrimitive; +use std::{convert::TryFrom, ffi::CStr, fmt, mem::replace}; + +use crate::{util::display_or, FuseError, FuseResult}; + +pub const ROOT_ID: u64 = 1; +pub const MIN_READ_SIZE: usize = 8192; +pub const MAJOR_VERSION: u32 = 7; +pub const TARGET_MINOR_VERSION: u32 = 32; +pub const REQUIRED_MINOR_VERSION: u32 = 31; + +pub struct Request<'a> { + header: &'a InHeader, + body: RequestBody<'a>, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct InHeader { + pub len: u32, + pub opcode: u32, + pub unique: u64, + pub ino: u64, + pub uid: u32, + pub gid: u32, + pub pid: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct OutHeader { + pub len: u32, + pub error: i32, + pub unique: u64, +} + +pub enum RequestBody<'a> { + Lookup { + name: &'a CStr, + }, + Forget(&'a ForgetIn), + Getattr(&'a GetattrIn), + Setattr(&'a SetattrIn), + Readlink, + Symlink { + name: &'a CStr, + target: &'a CStr, + }, + Mknod { + prefix: &'a MknodIn, + name: &'a CStr, + }, + Mkdir { + prefix: &'a MkdirIn, + target: &'a CStr, + }, + Unlink { + name: &'a CStr, + }, + Rmdir { + name: &'a CStr, + }, + Rename { + prefix: &'a RenameIn, + old: &'a CStr, + new: &'a CStr, + }, + Link(&'a LinkIn), + Open(&'a OpenIn), + Read(&'a ReadIn), + Write { + prefix: &'a WriteIn, + data: &'a [u8], + }, + Statfs, + Release(&'a ReleaseIn), + Fsync(&'a FsyncIn), + Setxattr { + prefix: &'a SetxattrIn, + name: &'a CStr, + value: &'a CStr, + }, + Getxattr { + prefix: &'a GetxattrIn, + name: &'a CStr, + }, + Listxattr(&'a ListxattrIn), + Removexattr { + name: &'a CStr, + }, + Flush(&'a FlushIn), + Init(&'a InitIn), + Opendir(&'a OpendirIn), + Readdir(&'a ReaddirIn), + Releasedir(&'a ReleasedirIn), + Fsyncdir(&'a FsyncdirIn), + Getlk(&'a GetlkIn), + Setlk(&'a SetlkIn), + Setlkw(&'a SetlkwIn), + Access(&'a AccessIn), + Create { + prefix: &'a CreateIn, + name: &'a CStr, + }, + Interrupt(&'a InterruptIn), + Bmap(&'a BmapIn), + Destroy, + Ioctl { + prefix: &'a IoctlIn, + data: &'a [u8], + }, + Poll(&'a PollIn), + NotifyReply, + BatchForget { + prefix: &'a BatchForgetIn, + forgets: &'a [ForgetOne], + }, + Fallocate(&'a FallocateIn), + ReaddirPlus(&'a ReaddirPlusIn), + Rename2 { + prefix: &'a Rename2In, + old: &'a CStr, + new: &'a CStr, + }, + Lseek(&'a LseekIn), + CopyFileRange(&'a CopyFileRangeIn), +} + +#[derive(TryFromPrimitive, Copy, Clone, Debug)] +#[repr(u32)] +pub enum Opcode { + Lookup = 1, + Forget = 2, + Getattr = 3, + Setattr = 4, + Readlink = 5, + Symlink = 6, + Mknod = 8, + Mkdir = 9, + Unlink = 10, + Rmdir = 11, + Rename = 12, + Link = 13, + Open = 14, + Read = 15, + Write = 16, + Statfs = 17, + Release = 18, + Fsync = 20, + Setxattr = 21, + Getxattr = 22, + Listxattr = 23, + Removexattr = 24, + Flush = 25, + Init = 26, + Opendir = 27, + Readdir = 28, + Releasedir = 29, + Fsyncdir = 30, + Getlk = 31, + Setlk = 32, + Setlkw = 33, + Access = 34, + Create = 35, + Interrupt = 36, + Bmap = 37, + Destroy = 38, + Ioctl = 39, + Poll = 40, + NotifyReply = 41, + BatchForget = 42, + Fallocate = 43, + ReaddirPlus = 44, + Rename2 = 45, + Lseek = 46, + CopyFileRange = 47, +} + +#[derive(TryFromPrimitive, Copy, Clone)] +#[repr(i32)] +pub enum NotifyCode { + Poll = 1, + InvalInode = 2, + InvalEntry = 3, + Store = 4, + Retrieve = 5, + Delete = 6, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct Attrs { + pub ino: u64, + pub size: u64, + pub blocks: u64, + pub atime: u64, + pub mtime: u64, + pub ctime: u64, + pub atimensec: u32, + pub mtimensec: u32, + pub ctimensec: u32, + pub mode: u32, + pub nlink: u32, + pub uid: u32, + pub gid: u32, + pub rdev: u32, + pub blksize: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct FileLock { + pub start: u64, + pub end: u64, + pub lock_type: u32, + pub pid: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct EntryOut { + pub nodeid: u64, + pub generation: u64, + pub entry_valid: u64, + pub attr_valid: u64, + pub entry_valid_nsec: u32, + pub attr_valid_nsec: u32, + pub attr: Attrs, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct Dirent { + pub ino: u64, + pub off: u64, + pub namelen: u32, + pub entry_type: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct DirentPlus { + pub entry_out: EntryOut, + pub dirent: Dirent, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ForgetIn { + pub nlookup: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct GetattrIn { + pub flags: u32, + pub dummy: u32, + pub fh: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct AttrOut { + pub attr_valid: u64, + pub attr_valid_nsec: u32, + pub dummy: u32, + pub attr: Attrs, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct SetattrIn { + pub valid: u32, + pub padding: u32, + pub fh: u64, + pub size: u64, + pub lock_owner: u64, + pub atime: u64, + pub mtime: u64, + pub ctime: u64, + pub atimensec: u32, + pub mtimensec: u32, + pub ctimensec: u32, + pub mode: u32, + pub unused: u32, + pub uid: u32, + pub gid: u32, + pub unused2: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct MknodIn { + pub mode: u32, + pub device: u32, + pub umask: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct MkdirIn { + pub mode: u32, + pub umask: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct RenameIn { + pub new_dir: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct LinkIn { + pub old_ino: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct OpenIn { + pub flags: u32, + pub unused: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct OpenOut { + pub fh: u64, + pub open_flags: u32, + pub padding: u32, +} + +bitflags! { + pub struct OpenOutFlags: u32 { + const DIRECT_IO = 1 << 0; + const KEEP_CACHE = 1 << 1; + const NONSEEKABLE = 1 << 2; + const CACHE_DIR = 1 << 3; + const STREAM = 1 << 4; + } +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ReadIn { + pub fh: u64, + pub offset: u64, + pub size: u32, + pub read_flags: u32, + pub lock_owner: u64, + pub flags: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct WriteIn { + pub fh: u64, + pub offset: u64, + pub size: u32, + pub write_flags: u32, + pub lock_owner: u64, + pub flags: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct StatfsOut { + pub blocks: u64, + pub bfree: u64, + pub bavail: u64, + pub files: u64, + pub ffree: u64, + pub bsize: u32, + pub namelen: u32, + pub frsize: u32, + pub padding: u32, + pub spare: [u32; 6], +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ReleaseIn { + pub fh: u64, + pub flags: u32, + pub release_flags: u32, + pub lock_owner: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct FsyncIn { + pub fh: u64, + pub fsync_flags: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct SetxattrIn { + pub size: u32, + pub flags: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct GetxattrIn { + pub size: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ListxattrIn { + pub getxattr_in: GetxattrIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct FlushIn { + pub fh: u64, + pub unused: u32, + pub padding: u32, + pub lock_owner: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct InitIn { + pub major: u32, + pub minor: u32, + pub max_readahead: u32, + pub flags: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct InitOut { + pub major: u32, + pub minor: u32, + pub max_readahead: u32, + pub flags: u32, + pub max_background: u16, + pub congestion_threshold: u16, + pub max_write: u32, + pub time_gran: u32, + pub max_pages: u16, + pub padding: u16, + pub unused: [u32; 8], +} + +bitflags! { + pub struct InitFlags: u32 { + const ASYNC_READ = 1 << 0; + const POSIX_LOCKS = 1 << 1; + const FILE_OPS = 1 << 2; + const ATOMIC_O_TRUNC = 1 << 3; + const EXPORT_SUPPORT = 1 << 4; + const BIG_WRITES = 1 << 5; + const DONT_MASK = 1 << 6; + const SPLICE_WRITE = 1 << 7; + const SPLICE_MOVE = 1 << 8; + const SPLICE_READ = 1 << 9; + const FLOCK_LOCKS = 1 << 10; + const HAS_IOCTL_DIR = 1 << 11; + const AUTO_INVAL_DATA = 1 << 12; + const DO_READDIRPLUS = 1 << 13; + const READDIRPLUS_AUTO = 1 << 14; + const ASYNC_DIO = 1 << 15; + const WRITEBACK_CACHE = 1 << 16; + const NO_OPEN_SUPPOR = 1 << 17; + const PARALLEL_DIROPS = 1 << 18; + const HANDLE_KILLPRIV = 1 << 19; + const POSIX_ACL = 1 << 20; + const ABORT_ERROR = 1 << 21; + const MAX_PAGES = 1 << 22; + const CACHE_SYMLINKS = 1 << 23; + const NO_OPENDIR_SUPPORT = 1 << 24; + const EXPLICIT_INVAL_DATA = 1 << 25; + } +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct OpendirIn { + pub open_in: OpenIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ReaddirIn { + pub read_in: ReadIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ReleasedirIn { + pub release_in: ReleaseIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct FsyncdirIn { + pub fsync_in: FsyncIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct LkIn { + pub fh: u64, + pub owner: u64, + pub lock: FileLock, + pub lock_flags: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct GetlkIn { + pub lk_in: LkIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct SetlkIn { + pub lk_in: LkIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct SetlkwIn { + pub lk_in: LkIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct AccessIn { + pub mask: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct CreateIn { + pub flags: u32, + pub mode: u32, + pub umask: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct InterruptIn { + pub unique: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct BmapIn { + pub block: u64, + pub block_size: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct IoctlIn { + pub fh: u64, + pub flags: u32, + pub cmd: u32, + pub arg: u64, + pub in_size: u32, + pub out_size: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct PollIn { + pub fh: u64, + pub kh: u64, + pub flags: u32, + pub events: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ForgetOne { + pub ino: u64, + pub nlookup: u64, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct BatchForgetIn { + pub count: u32, + pub dummy: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct FallocateIn { + pub fh: u64, + pub offset: u64, + pub length: u64, + pub mode: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct ReaddirPlusIn { + pub read_in: ReadIn, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct Rename2In { + pub new_dir: u64, + pub flags: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct LseekIn { + pub fh: u64, + pub offset: u64, + pub whence: u32, + pub padding: u32, +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +pub struct CopyFileRangeIn { + pub fh_in: u64, + pub off_in: u64, + pub nodeid_out: u64, + pub fh_out: u64, + pub off_out: u64, + pub len: u64, + pub flags: u64, +} + +impl Request<'_> { + pub fn header(&self) -> &InHeader { + self.header + } + + pub fn body(&self) -> &RequestBody { + &self.body + } +} + +impl<'a> TryFrom<&'a [u8]> for Request<'a> { + type Error = FuseError; + + fn try_from(bytes: &'a [u8]) -> FuseResult<Self> { + use FuseError::*; + + fn split_from_bytes<T: Pod>(bytes: &[u8]) -> FuseResult<(&T, &[u8])> { + let (bytes, next_bytes) = bytes.split_at(bytes.len().min(std::mem::size_of::<T>())); + match bytemuck::try_from_bytes(bytes) { + Ok(t) => Ok((t, next_bytes)), + Err(_) => Err(Truncated), + } + } + + let full_bytes = bytes; + let (header, mut bytes) = split_from_bytes::<InHeader>(full_bytes)?; + + if header.len as usize != full_bytes.len() { + return Err(BadLength); + } + + let opcode = match Opcode::try_from(header.opcode) { + Ok(opcode) => opcode, + Err(_) => return Err(BadOpcode), + }; + + macro_rules! prefix { + ($op:ident, $ident:ident, $is_last:expr) => { + prefix!($op, $ident); + }; + + ($op:ident, $ident:ident) => { + let ($ident, after_prefix) = split_from_bytes::<concat_idents!($op, In)>(bytes)?; + bytes = after_prefix; + }; + } + + fn cstr_from_bytes(bytes: &[u8], is_last: bool) -> FuseResult<(&CStr, &[u8])> { + let (cstr, after_cstr): (&[u8], &[u8]) = if is_last { + (bytes, &[]) + } else { + match bytes.iter().position(|byte| *byte == b'\0') { + Some(nul) => bytes.split_at(nul + 1), + None => return Err(Truncated), + } + }; + + let cstr = CStr::from_bytes_with_nul(cstr).map_err(|_| BadLength)?; + Ok((cstr, after_cstr)) + } + + macro_rules! cstr { + ($op:ident, $ident:ident, $is_last:expr) => { + let ($ident, next_bytes) = cstr_from_bytes(bytes, $is_last)?; + bytes = next_bytes; + }; + } + + macro_rules! name { + ($op:ident, $ident:ident, $is_last:expr) => { + cstr!($op, $ident, $is_last); + }; + } + + macro_rules! value { + ($op:ident, $ident:ident, $is_last:expr) => { + cstr!($op, $ident, $is_last); + }; + } + + macro_rules! target { + ($op:ident, $ident:ident, $is_last:expr) => { + cstr!($op, $ident, $is_last); + }; + } + + macro_rules! old { + ($op:ident, $ident:ident, $is_last:expr) => { + cstr!($op, $ident, $is_last); + }; + } + + macro_rules! new { + ($op:ident, $ident:ident, $is_last:expr) => { + cstr!($op, $ident, $is_last); + }; + } + + macro_rules! build_body { + ($op:ident, $last:ident) => { + $last!($op, $last, true); + }; + + ($op:ident, $field:ident, $($next:ident),+) => { + $field!($op, $field, false); + build_body!($op, $($next),+); + }; + } + + macro_rules! body { + ($op:ident) => { + RequestBody::$op + }; + + ($op:ident, prefix) => { + { + prefix!($op, prefix); + RequestBody::$op(prefix) + } + }; + + ($op:ident, prefix, data where len == $size_field:ident) => { + { + prefix!($op, prefix); + if prefix.$size_field as usize != bytes.len() { + return Err(BadLength); + } + + RequestBody::$op { prefix, data: replace(&mut bytes, &[]) } + } + }; + + /*($op:ident, $($field:ident),+) => { + { + $($field!($op, $field));+; + RequestBody::$op { $($field),+ } + } + };*/ + + ($op:ident, $($fields:ident),+) => { + { + build_body!($op, $($fields),+); + RequestBody::$op { $($fields),+ } + } + }; + } + + use Opcode::*; + let body = match opcode { + Lookup => body!(Lookup, name), + Forget => body!(Forget, prefix), + Getattr => body!(Getattr, prefix), + Setattr => body!(Setattr, prefix), + Readlink => body!(Readlink), + Symlink => body!(Symlink, name, target), + Mknod => body!(Mknod, prefix, name), + Mkdir => body!(Mkdir, prefix, target), + Unlink => body!(Unlink, name), + Rmdir => body!(Rmdir, name), + Rename => body!(Rename, prefix, old, new), + Link => body!(Link, prefix), + Open => body!(Open, prefix), + Read => body!(Read, prefix), + Write => body!(Write, prefix, data where len == size), + Statfs => body!(Statfs), + Release => body!(Release, prefix), + Fsync => body!(Fsync, prefix), + Setxattr => body!(Setxattr, prefix, name, value), + Getxattr => body!(Getxattr, prefix, name), + Listxattr => body!(Listxattr, prefix), + Removexattr => body!(Removexattr, name), + Flush => body!(Flush, prefix), + Init => body!(Init, prefix), + Opendir => body!(Opendir, prefix), + Readdir => body!(Readdir, prefix), + Releasedir => body!(Releasedir, prefix), + Fsyncdir => body!(Fsyncdir, prefix), + Getlk => body!(Getlk, prefix), + Setlk => body!(Setlk, prefix), + Setlkw => body!(Setlkw, prefix), + Access => body!(Access, prefix), + Create => body!(Create, prefix, name), + Interrupt => body!(Interrupt, prefix), + Bmap => body!(Bmap, prefix), + Destroy => body!(Destroy), + Ioctl => body!(Ioctl, prefix, data where len == in_size), + Poll => body!(Poll, prefix), + NotifyReply => body!(NotifyReply), + BatchForget => { + prefix!(BatchForget, prefix); + + let forgets = replace(&mut bytes, &[]); + let forgets = bytemuck::try_cast_slice(forgets).map_err(|_| Truncated)?; + + if prefix.count as usize != forgets.len() { + return Err(BadLength); + } + + RequestBody::BatchForget { prefix, forgets } + } + Fallocate => body!(Fallocate, prefix), + ReaddirPlus => body!(ReaddirPlus, prefix), + Rename2 => body!(Rename2, prefix, old, new), + Lseek => body!(Lseek, prefix), + CopyFileRange => body!(CopyFileRange, prefix), + }; + + if bytes.is_empty() { + Ok(Request { header, body }) + } else { + Err(BadLength) + } + } +} + +impl fmt::Display for InHeader { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let opcode = display_or(Opcode::try_from(self.opcode).ok(), "bad opcode"); + + write!( + fmt, + "<{}> #{} len={} ino={} uid={} gid={} pid={}", + opcode, self.unique, self.len, self.ino, self.uid, self.gid, self.pid + ) + } +} + +impl fmt::Display for Opcode { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "{:?} ({})", self, *self as u32) + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..dba954b --- /dev/null +++ b/src/util.rs @@ -0,0 +1,112 @@ +use std::{ + fmt, + os::unix::io::{IntoRawFd, RawFd}, +}; + +use nix::{self, errno::Errno, unistd::close}; +use quick_error::quick_error; + +quick_error! { + #[derive(Debug)] + pub enum FuseError { + Io(err: std::io::Error) { from() } + ProtocolInit { display("fuse handshake failed (ancient kernel?)") } + Truncated { display("fuse request truncated") } + BadOpcode { display("unknown fuse operation") } + BadLength { display("bad length in fuse request") } + ShortWrite { display("fuse reply was trimmed on write()") } + } +} + +pub type FuseResult<T> = Result<T, FuseError>; + +pub struct DumbFd(pub RawFd); + +pub struct OutputChain<'a> { + segments: &'a [&'a [u8]], + then: Option<&'a OutputChain<'a>>, +} + +pub struct OutputChainIter<'a>(Option<&'a OutputChain<'a>>); + +impl IntoRawFd for DumbFd { + fn into_raw_fd(self) -> RawFd { + let fd = self.0; + std::mem::forget(self); + fd + } +} + +impl Drop for DumbFd { + fn drop(&mut self) { + let _ = close(self.0); + } +} + +impl<'a> OutputChain<'a> { + pub fn empty() -> Self { + OutputChain { + segments: &[], + then: None, + } + } + + pub fn tail(segments: &'a [&'a [u8]]) -> Self { + OutputChain { + segments, + then: None, + } + } + + pub fn preceded(&'a self, segments: &'a [&'a [u8]]) -> Self { + OutputChain { + segments, + then: Some(&self), + } + } + + pub fn iter(&self) -> OutputChainIter { + OutputChainIter(Some(&self)) + } +} + +impl<'a> Iterator for OutputChainIter<'a> { + type Item = &'a [&'a [u8]]; + + fn next(&mut self) -> Option<Self::Item> { + let next = self.0.and_then(|chain| chain.then); + std::mem::replace(&mut self.0, next).map(|chain| chain.segments) + } +} + +pub fn from_nix_error(error: nix::Error) -> std::io::Error { + use nix::Error::*; + let from_raw = |code| std::io::Error::from_raw_os_error(code as i32); + + match error { + Sys(errno) => errno.into(), + InvalidPath => from_raw(Errno::ENAMETOOLONG), + InvalidUtf8 => from_raw(Errno::EILSEQ), + UnsupportedOperation => from_raw(Errno::EOPNOTSUPP), + } +} + +pub fn display_or<'a, T: fmt::Display + 'a>( + maybe: Option<T>, + default: &'a str, +) -> impl fmt::Display + 'a { + struct Params<'a, T: fmt::Display>(Option<T>, &'a str); + + impl<T: fmt::Display> fmt::Display for Params<'_, T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let Params(maybe, placeholder) = &self; + if let Some(t) = maybe { + write!(fmt, "{}", t) + } else { + fmt.write_str(placeholder) + } + } + } + + Params(maybe, default) +} |
