diff options
| author | Alejandro Soto <alejandro@34project.org> | 2021-12-27 00:44:23 -0600 |
|---|---|---|
| committer | Alejandro Soto <alejandro@34project.org> | 2021-12-28 19:43:44 -0600 |
| commit | a3212a0ba07da7bdae9e17637fbc237e2ae01c08 (patch) | |
| tree | 00be583beba0f321ebeea3af21582ce927943b44 /src/fuse/session.rs | |
| parent | 311b2a40213aa48131a189f99dc4258d354c0c78 (diff) | |
Redesign the API around a user-provided main loop
This is basically a full library rewrite.
Diffstat (limited to '')
| -rw-r--r-- | src/fuse/session.rs | 705 |
1 files changed, 316 insertions, 389 deletions
diff --git a/src/fuse/session.rs b/src/fuse/session.rs index 8975c57..5045099 100644 --- a/src/fuse/session.rs +++ b/src/fuse/session.rs @@ -1,148 +1,121 @@ use std::{ - collections::{hash_map, HashMap}, convert::TryInto, + future::Future, io, + marker::PhantomData, os::unix::io::{IntoRawFd, RawFd}, sync::{Arc, Mutex}, }; use nix::{ fcntl::{fcntl, FcntlArg, OFlag}, - sys::uio::{readv, writev, IoVec}, - unistd::{sysconf, SysconfVar}, + sys::uio::{writev, IoVec}, + unistd::{read, sysconf, SysconfVar}, }; use tokio::{ io::unix::AsyncFd, - sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore}, + sync::{broadcast, OwnedSemaphorePermit, Semaphore}, }; -use bytemuck::{bytes_of, try_from_bytes}; +use bytemuck::bytes_of; use smallvec::SmallVec; use crate::{ - proto::{self, InHeader}, - util::{display_or, OutputChain}, - Errno, FuseError, FuseResult, Ino, + proto::{self, InHeader, Structured}, + util::{DumbFd, OutputChain}, + Errno, FuseError, FuseResult, }; -use super::{ - fs::{Fuse, Inode}, - Reply, Request, Session, Start, -}; +use super::{ops, Done, Op, Operation, Reply, Request}; -pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { - session.send(unique, 0, output) +pub struct Start { + fusermount_fd: DumbFd, + session_fd: DumbFd, } -pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> { - if errno <= 0 { - log::warn!( - "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", - unique, - errno - ); +pub struct Session { + _fusermount_fd: DumbFd, + session_fd: AsyncFd<RawFd>, + interrupt_tx: broadcast::Sender<u64>, + buffers: Mutex<Vec<Buffer>>, + buffer_semaphore: Arc<Semaphore>, + proto_minor: u32, + buffer_pages: usize, +} - errno = Errno::ENOMSG as i32; - } +pub struct Endpoint<'a> { + session: &'a Arc<Session>, + local_buffer: Buffer, +} - session.send(unique, -errno, OutputChain::empty()) +pub enum Dispatch<'o> { + Lookup(Incoming<'o, ops::Lookup>), + Getattr(Incoming<'o, ops::Getattr>), + Readlink(Incoming<'o, ops::Readlink>), + Read(Incoming<'o, ops::Read>), + Write(Incoming<'o, ops::Write>), + Statfs(Incoming<'o, ops::Statfs>), + Readdir(Incoming<'o, ops::Readdir>), + Access(Incoming<'o, ops::Access>), + Destroy(Incoming<'o, ops::Destroy>), } -pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) { - let ino = <Fs as Fuse>::Inode::ino(inode); - let mut known = session.known.lock().unwrap(); +pub struct Incoming<'o, O: Operation<'o>> { + common: IncomingCommon<'o>, + _phantom: PhantomData<O>, +} - use hash_map::Entry::*; - match known.entry(ino) { - Occupied(entry) => { - let (_, count) = entry.into_mut(); - *count += 1; - } +pub struct Owned<O: for<'o> Operation<'o>> { + session: Arc<Session>, + buffer: Buffer, + header: InHeader, + _permit: OwnedSemaphorePermit, + _phantom: PhantomData<O>, +} - Vacant(entry) => { - entry.insert((Fs::Farc::clone(inode), 1)); +impl Session { + pub fn endpoint<'a>(self: &'a Arc<Self>) -> Endpoint<'a> { + Endpoint { + session: self, + local_buffer: Buffer::new(self.buffer_pages), } } -} -pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> { - session.interrupt_tx.subscribe() -} - -impl<Fs: Fuse> Session<Fs> { - pub fn fs(&self) -> &Fs { - &self.fs + pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> { + self.send(unique, 0, output) } - pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> { - let this = Arc::clone(&self); - let main_loop = async move { - loop { - let incoming = this.receive().await; - let this = Arc::clone(&this); - - tokio::spawn(async move { - let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming { - Ok(mut incoming) => match this.dispatch(&mut incoming).await { - Ok(()) => (Ok(()), None), - - Err(error) => { - let data = incoming.buffer.data(); - let data = &data[..std::mem::size_of::<InHeader>().max(data.len())]; - (Err(error), try_from_bytes(data).ok().copied()) - } - }, - - Err(error) => (Err(error.into()), None), - }; - - let header = display_or(header, "(bad)"); - if let Err(error) = result { - log::error!("Handling request {}: {}", header, error); - } - }); - } - }; + pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> { + if errno <= 0 { + log::warn!( + "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG", + unique, + errno + ); - tokio::select! { - () = main_loop => unreachable!(), - () = self.destroy.notified() => Ok(()), + errno = Errno::ENOMSG as i32; } - } - async fn do_handshake( - &mut self, - pages_per_buffer: usize, - bytes_per_buffer: usize, - ) -> FuseResult<Handshake> { - use FuseError::*; - - let buffer = { - self.session_fd.readable().await?.retain_ready(); - let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap(); - - let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE])); - let sbo = match &mut data { - InputBufferStorage::Sbo(SboStorage(sbo)) => sbo, - _ => unreachable!(), - }; + self.send(unique, -errno, OutputChain::empty()) + } - let mut io_vecs = [ - IoVec::from_mut_slice(sbo), - IoVec::from_mut_slice(large_buffer), - ]; + pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver<u64> { + self.interrupt_tx.subscribe() + } - let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(io::Error::from)?; - InputBuffer { bytes, data } - }; + async fn handshake(&mut self, buffer: &mut Buffer) -> FuseResult<Handshake> { + self.session_fd.readable().await?.retain_ready(); + let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?; - let request: proto::Request<'_> = buffer.data().try_into()?; + let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?; + let init = match opcode { + proto::Opcode::Init => <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes])?, - let unique = request.header().unique; - let init = match request.body() { - proto::RequestBody::Init(body) => body, - _ => return Err(ProtocolInit), + _ => { + log::error!("First message from kernel is not Init, but {:?}", opcode); + return Err(FuseError::ProtocolInit); + } }; use std::cmp::Ordering; @@ -156,7 +129,7 @@ impl<Fs: Fuse> Session<Fs> { Ordering::Greater => { let tail = [bytes_of(&proto::MAJOR_VERSION)]; - ok(self, unique, OutputChain::tail(&tail))?; + self.ok(header.unique, OutputChain::tail(&tail))?; return Ok(Handshake::Restart); } @@ -175,40 +148,27 @@ impl<Fs: Fuse> Session<Fs> { major = proto::MAJOR_VERSION ); - fail(self, unique, Errno::EPROTONOSUPPORT as i32)?; - return Err(ProtocolInit); + self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?; + return Err(FuseError::ProtocolInit); } - let root = { - let mut init_result = Err(0); - let reply = Reply { - session: self, - unique, - tail: &mut init_result, - }; + let flags = { + use proto::InitFlags; - self.fs.init(reply).await.into_result()?; + let kernel = InitFlags::from_bits_truncate(init.flags); + let supported = InitFlags::PARALLEL_DIROPS + | InitFlags::ABORT_ERROR + | InitFlags::MAX_PAGES + | InitFlags::CACHE_SYMLINKS; - match init_result { - Ok(root) => root, - Err(errno) => { - log::error!("init() handler failed: {}", Errno::from_i32(errno)); - return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno))); - } - } + kernel & supported }; - self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1)); + let buffer_size = page_size() * self.buffer_pages; - use proto::InitFlags; - let flags = InitFlags::from_bits_truncate(init.flags); - let supported = InitFlags::PARALLEL_DIROPS - | InitFlags::ABORT_ERROR - | InitFlags::MAX_PAGES - | InitFlags::CACHE_SYMLINKS; + // See fs/fuse/dev.c in the kernel source tree for details about max_write + let max_write = buffer_size - std::mem::size_of::<(InHeader, proto::WriteIn)>(); - let flags = flags & supported; - let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>(); let init_out = proto::InitOut { major: proto::MAJOR_VERSION, minor: proto::TARGET_MINOR_VERSION, @@ -218,230 +178,17 @@ impl<Fs: Fuse> Session<Fs> { congestion_threshold: 0, //TODO max_write: max_write.try_into().unwrap(), time_gran: 1, //TODO - max_pages: pages_per_buffer.try_into().unwrap(), + max_pages: self.buffer_pages.try_into().unwrap(), padding: Default::default(), unused: Default::default(), }; let tail = [bytes_of(&init_out)]; - ok(self, unique, OutputChain::tail(&tail))?; + self.ok(header.unique, OutputChain::tail(&tail))?; Ok(Handshake::Done) } - async fn dispatch(self: &Arc<Self>, request: &mut Incoming) -> FuseResult<()> { - let request: proto::Request<'_> = request.buffer.data().try_into()?; - let header = request.header(); - let InHeader { unique, ino, .. } = *header; - let ino = Ino(ino); - - use proto::RequestBody::*; - - macro_rules! op { - () => { - op!(()) - }; - - ($body:expr) => { - op!($body, ()) - }; - - ($body:expr, $tail:expr) => {{ - let request = Request { - header, - body: $body, - }; - let reply = Reply { - session: &self, - unique, - tail: $tail, - }; - - (request, reply, self) - }}; - } - - // These operations don't involve locking and searching self.known - match request.body() { - Forget(body) => { - self.forget(std::iter::once((ino, body.nlookup))).await; - return Ok(()); - } - - Statfs => return self.fs.statfs(op!()).await.into_result(), - - Interrupt(body) => { - //TODO: Don't reply with EAGAIN if the interrupt is successful - let _ = self.interrupt_tx.send(body.unique); - return fail(self, unique, Errno::EAGAIN as i32); - } - - Destroy => { - self.destroy.notify_one(); - return Ok(()); - } - - BatchForget { forgets, .. } => { - let forgets = forgets - .iter() - .map(|target| (Ino(target.ino), target.nlookup)); - - self.forget(forgets).await; - return Ok(()); - } - - _ => (), - } - - // Some operations are handled while self.known is locked - let inode = { - let known = self.known.lock().unwrap(); - let inode = match known.get(&ino) { - Some((farc, _)) => farc, - None => { - log::error!( - "Lookup count for ino {} reached zero while still \ - known to the kernel, this is a bug", - ino - ); - - return fail(self, unique, Errno::ENOANO as i32); - } - }; - - match request.body() { - Getattr(_) => { - //TODO: Getattr flags - let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode); - let attrs = attrs.finish::<Fs>(inode); - drop(known); - - let out = proto::AttrOut { - attr_valid: ttl.seconds, - attr_valid_nsec: ttl.nanoseconds, - dummy: Default::default(), - attr: attrs, - }; - - return ok(self, unique, OutputChain::tail(&[bytes_of(&out)])); - } - - Access(body) => { - return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result() - } - - _ => inode.clone(), - } - }; - - macro_rules! inode_op { - ($op:ident, $($exprs:expr),+) => { - <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await - }; - } - - // These operations involve a Farc cloned from self.known - let done = match request.body() { - Lookup { name } => inode_op!(lookup, *name), - Readlink => inode_op!(readlink, ()), - Open(body) => { - let mut flags = proto::OpenOutFlags::empty(); - if <Fs as Fuse>::Inode::direct_io(&inode) { - flags |= proto::OpenOutFlags::DIRECT_IO; - } - - inode_op!(open, *body, (ino, flags)) - } - Opendir(body) => inode_op!(opendir, *body), - Readdir(body) => inode_op!(readdir, *body), - - _ => return fail(self, unique, Errno::ENOSYS as i32), - }; - - done.into_result() - } - - async fn forget<I>(&self, targets: I) - where - I: Iterator<Item = (Ino, u64)>, - { - let mut known = self.known.lock().unwrap(); - - for (ino, subtracted) in targets { - use hash_map::Entry::*; - - match known.entry(ino) { - Occupied(mut entry) => { - let (_, count) = entry.get_mut(); - - *count = count.saturating_sub(subtracted); - if *count > 0 { - continue; - } - - entry.remove(); - } - - Vacant(_) => { - log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino); - continue; - } - } - } - } - - async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming> { - use InputBufferStorage::*; - - let permit = Arc::clone(&self.input_semaphore) - .acquire_owned() - .await - .unwrap(); - - let mut incoming = Incoming { - buffer: InputBuffer { - bytes: 0, - data: Sbo(SboStorage([0; SBO_SIZE])), - }, - }; - - let sbo = match &mut incoming.buffer.data { - Sbo(SboStorage(sbo)) => sbo, - _ => unreachable!(), - }; - - loop { - let mut readable = self.session_fd.readable().await?; - - let mut large_buffers = self.large_buffers.lock().unwrap(); - let large_buffer = large_buffers.last_mut().unwrap(); - - let mut io_vecs = [ - IoVec::from_mut_slice(sbo), - IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]), - ]; - - let mut read = |fd: &AsyncFd<RawFd>| readv(*fd.get_ref(), &mut io_vecs); - match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { - Ok(Ok(bytes)) => { - if bytes > SBO_SIZE { - (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo); - incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit); - } - - incoming.buffer.bytes = bytes; - return Ok(incoming); - } - - // Interrupted - Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue, - - Ok(Err(error)) => return Err(error), - Err(_) => continue, - } - } - } - fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> { let after_header: usize = output .iter() @@ -479,81 +226,261 @@ impl<Fs: Fuse> Session<Fs> { } } +impl<'o> Dispatch<'o> { + pub fn op(self) -> Op<'o> { + use Dispatch::*; + + let common = match self { + Lookup(incoming) => incoming.common, + Getattr(incoming) => incoming.common, + Readlink(incoming) => incoming.common, + Read(incoming) => incoming.common, + Write(incoming) => incoming.common, + Statfs(incoming) => incoming.common, + Readdir(incoming) => incoming.common, + Access(incoming) => incoming.common, + Destroy(incoming) => incoming.common, + }; + + common.into_generic_op() + } +} + +impl Endpoint<'_> { + pub async fn receive<'a, F, Fut>(&'a mut self, dispatcher: F) -> FuseResult<()> + where + F: FnOnce(Dispatch<'a>) -> Fut, + Fut: Future<Output = Done<'a>>, + { + let buffer = &mut self.local_buffer.0; + let bytes = loop { + let mut readable = self.session.session_fd.readable().await?; + let mut read = |fd: &AsyncFd<RawFd>| read(*fd.get_ref(), buffer); + + let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) { + Ok(result) => result, + Err(_) => continue, + }; + + match result { + // Interrupted + Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue, + + result => break result, + } + }; + + let (header, opcode) = proto::InHeader::from_bytes(&buffer[..bytes?])?; + let common = IncomingCommon { + session: self.session, + buffer: &mut self.local_buffer, + header, + }; + + let dispatch = { + use proto::Opcode::*; + + macro_rules! dispatch { + ($op:ident) => { + Dispatch::$op(Incoming { + common, + _phantom: PhantomData, + }) + }; + } + + match opcode { + Lookup => dispatch!(Lookup), + Getattr => dispatch!(Getattr), + Readlink => dispatch!(Readlink), + Read => dispatch!(Read), + Write => dispatch!(Write), + Statfs => dispatch!(Statfs), + Readdir => dispatch!(Readdir), + Access => dispatch!(Access), + Destroy => dispatch!(Destroy), + + _ => { + log::warn!("Not implemented: {}", common.header); + + let (_request, reply) = common.into_generic_op(); + let _ = reply.not_implemented(); + + return Ok(()); + } + } + }; + + let _ = dispatcher(dispatch).await; + Ok(()) + } +} + impl Start { - pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> { + pub async fn start(self) -> FuseResult<Arc<Session>> { let session_fd = self.session_fd.into_raw_fd(); let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE; fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap(); - let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize; - let pages_per_buffer = fs.request_buffer_pages().get(); - let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap(); - assert!(bytes_per_buffer >= proto::MIN_READ_SIZE); + let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); - let mut large_buffers = Vec::with_capacity(fs.request_buffers().get()); - for _ in 0..large_buffers.capacity() { - large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice()); - } + let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO + let buffer_count = SHARED_BUFFERS; //TODO + let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages)) + .take(buffer_count) + .collect(); - let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY); let mut session = Session { _fusermount_fd: self.fusermount_fd, session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?, - proto_minor: 0, // Set by Session::do_handshake() - fs, - input_semaphore: Arc::new(Semaphore::new(large_buffers.len())), - large_buffers: Mutex::new(large_buffers), - known: Mutex::new(HashMap::new()), - destroy: Notify::new(), interrupt_tx, + buffers: Mutex::new(buffers), + buffer_semaphore: Arc::new(Semaphore::new(buffer_count)), + proto_minor: 0, // Set by Session::do_handshake() + buffer_pages, }; - loop { - let state = session - .do_handshake(pages_per_buffer, bytes_per_buffer) - .await?; + let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap(); - if let Handshake::Done = state { + loop { + if let Handshake::Done = session.handshake(&mut init_buffer).await? { + session.buffers.get_mut().unwrap().push(init_buffer); break Ok(Arc::new(session)); } } } -} -enum Handshake { - Done, - Restart, + pub(crate) fn new(fusermount_fd: DumbFd, session_fd: DumbFd) -> Self { + Start { + fusermount_fd, + session_fd, + } + } } -struct Incoming { - buffer: InputBuffer, +impl<'o, O: Operation<'o>> Incoming<'o, O> { + pub fn op(self) -> Result<Op<'o, O>, Done<'o>> { + try_op( + self.common.session, + &self.common.buffer.0, + self.common.header, + ) + } } -struct InputBuffer { - pub bytes: usize, - pub data: InputBufferStorage, +impl<O: for<'o> Operation<'o>> Incoming<'_, O> { + pub async fn owned(self) -> Owned<O> { + let session = self.common.session; + + let (buffer, permit) = { + let semaphore = Arc::clone(&session.buffer_semaphore); + let permit = semaphore + .acquire_owned() + .await + .expect("Buffer semaphore error"); + + let mut buffers = session.buffers.lock().unwrap(); + let mut buffer = buffers.pop().expect("Buffer semaphore out of sync"); + + std::mem::swap(&mut buffer, self.common.buffer); + (buffer, permit) + }; + + Owned { + session: Arc::clone(session), + buffer, + header: self.common.header, + _permit: permit, + _phantom: PhantomData, + } + } } -enum InputBufferStorage { - Sbo(SboStorage), - Spilled(Box<[u8]>, OwnedSemaphorePermit), +impl<O: for<'o> Operation<'o>> Owned<O> { + pub fn op(&self) -> Result<Op<'_, O>, Done<'_>> { + try_op(&self.session, &self.buffer.0, self.header) + } } -#[repr(align(8))] -struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]); +impl<O: for<'o> Operation<'o>> Drop for Owned<O> { + fn drop(&mut self) { + if let Ok(mut buffers) = self.session.buffers.lock() { + let empty = Buffer(Vec::new().into_boxed_slice()); + buffers.push(std::mem::replace(&mut self.buffer, empty)); + } + } +} -const SBO_SIZE: usize = std::mem::size_of::<SboStorage>(); const INTERRUPT_BROADCAST_CAPACITY: usize = 32; +const SHARED_BUFFERS: usize = 32; +const HEADER_END: usize = std::mem::size_of::<InHeader>(); -impl InputBuffer { - fn data(&self) -> &[u8] { - use InputBufferStorage::*; - let storage = match &self.data { - Sbo(sbo) => &sbo.0, - Spilled(buffer, _) => &buffer[..], +struct IncomingCommon<'o> { + session: &'o Arc<Session>, + buffer: &'o mut Buffer, + header: proto::InHeader, +} + +enum Handshake { + Done, + Restart, +} + +struct Buffer(Box<[u8]>); + +impl<'o> IncomingCommon<'o> { + fn into_generic_op(self) -> Op<'o> { + let request = Request { + header: self.header, + body: (), }; - &storage[..self.bytes] + let reply = Reply { + session: self.session, + unique: self.header.unique, + tail: (), + }; + + (request, reply) + } +} + +impl Buffer { + fn new(pages: usize) -> Self { + Buffer(vec![0; pages * page_size()].into_boxed_slice()) } } + +fn try_op<'o, O: Operation<'o>>( + session: &'o Session, + bytes: &'o [u8], + header: proto::InHeader, +) -> Result<Op<'o, O>, Done<'o>> { + let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize]) { + Ok(body) => body, + Err(error) => { + log::error!("Parsing request {}: {}", header, error); + let reply = Reply::<ops::Any> { + session, + unique: header.unique, + tail: (), + }; + + return Err(reply.io_error()); + } + }; + + let request = Request { header, body }; + let reply = Reply { + session, + unique: header.unique, + tail: Default::default(), + }; + + Ok((request, reply)) +} + +fn page_size() -> usize { + sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize +} |
