summaryrefslogtreecommitdiff
path: root/src/fuse/session.rs
diff options
context:
space:
mode:
authorAlejandro Soto <alejandro@34project.org>2021-12-27 00:44:23 -0600
committerAlejandro Soto <alejandro@34project.org>2021-12-28 19:43:44 -0600
commita3212a0ba07da7bdae9e17637fbc237e2ae01c08 (patch)
tree00be583beba0f321ebeea3af21582ce927943b44 /src/fuse/session.rs
parent311b2a40213aa48131a189f99dc4258d354c0c78 (diff)
Redesign the API around a user-provided main loop
This is basically a full library rewrite.
Diffstat (limited to '')
-rw-r--r--src/fuse/session.rs705
1 files changed, 316 insertions, 389 deletions
diff --git a/src/fuse/session.rs b/src/fuse/session.rs
index 8975c57..5045099 100644
--- a/src/fuse/session.rs
+++ b/src/fuse/session.rs
@@ -1,148 +1,121 @@
use std::{
- collections::{hash_map, HashMap},
convert::TryInto,
+ future::Future,
io,
+ marker::PhantomData,
os::unix::io::{IntoRawFd, RawFd},
sync::{Arc, Mutex},
};
use nix::{
fcntl::{fcntl, FcntlArg, OFlag},
- sys::uio::{readv, writev, IoVec},
- unistd::{sysconf, SysconfVar},
+ sys::uio::{writev, IoVec},
+ unistd::{read, sysconf, SysconfVar},
};
use tokio::{
io::unix::AsyncFd,
- sync::{broadcast, Notify, OwnedSemaphorePermit, Semaphore},
+ sync::{broadcast, OwnedSemaphorePermit, Semaphore},
};
-use bytemuck::{bytes_of, try_from_bytes};
+use bytemuck::bytes_of;
use smallvec::SmallVec;
use crate::{
- proto::{self, InHeader},
- util::{display_or, OutputChain},
- Errno, FuseError, FuseResult, Ino,
+ proto::{self, InHeader, Structured},
+ util::{DumbFd, OutputChain},
+ Errno, FuseError, FuseResult,
};
-use super::{
- fs::{Fuse, Inode},
- Reply, Request, Session, Start,
-};
+use super::{ops, Done, Op, Operation, Reply, Request};
-pub fn ok<Fs: Fuse>(session: &Session<Fs>, unique: u64, output: OutputChain<'_>) -> FuseResult<()> {
- session.send(unique, 0, output)
+pub struct Start {
+ fusermount_fd: DumbFd,
+ session_fd: DumbFd,
}
-pub fn fail<Fs: Fuse>(session: &Session<Fs>, unique: u64, mut errno: i32) -> FuseResult<()> {
- if errno <= 0 {
- log::warn!(
- "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG",
- unique,
- errno
- );
+pub struct Session {
+ _fusermount_fd: DumbFd,
+ session_fd: AsyncFd<RawFd>,
+ interrupt_tx: broadcast::Sender<u64>,
+ buffers: Mutex<Vec<Buffer>>,
+ buffer_semaphore: Arc<Semaphore>,
+ proto_minor: u32,
+ buffer_pages: usize,
+}
- errno = Errno::ENOMSG as i32;
- }
+pub struct Endpoint<'a> {
+ session: &'a Arc<Session>,
+ local_buffer: Buffer,
+}
- session.send(unique, -errno, OutputChain::empty())
+pub enum Dispatch<'o> {
+ Lookup(Incoming<'o, ops::Lookup>),
+ Getattr(Incoming<'o, ops::Getattr>),
+ Readlink(Incoming<'o, ops::Readlink>),
+ Read(Incoming<'o, ops::Read>),
+ Write(Incoming<'o, ops::Write>),
+ Statfs(Incoming<'o, ops::Statfs>),
+ Readdir(Incoming<'o, ops::Readdir>),
+ Access(Incoming<'o, ops::Access>),
+ Destroy(Incoming<'o, ops::Destroy>),
}
-pub fn unveil<Fs: Fuse>(session: &Session<Fs>, inode: &Fs::Farc) {
- let ino = <Fs as Fuse>::Inode::ino(inode);
- let mut known = session.known.lock().unwrap();
+pub struct Incoming<'o, O: Operation<'o>> {
+ common: IncomingCommon<'o>,
+ _phantom: PhantomData<O>,
+}
- use hash_map::Entry::*;
- match known.entry(ino) {
- Occupied(entry) => {
- let (_, count) = entry.into_mut();
- *count += 1;
- }
+pub struct Owned<O: for<'o> Operation<'o>> {
+ session: Arc<Session>,
+ buffer: Buffer,
+ header: InHeader,
+ _permit: OwnedSemaphorePermit,
+ _phantom: PhantomData<O>,
+}
- Vacant(entry) => {
- entry.insert((Fs::Farc::clone(inode), 1));
+impl Session {
+ pub fn endpoint<'a>(self: &'a Arc<Self>) -> Endpoint<'a> {
+ Endpoint {
+ session: self,
+ local_buffer: Buffer::new(self.buffer_pages),
}
}
-}
-pub fn interrupt_rx<Fs: Fuse>(session: &Session<Fs>) -> broadcast::Receiver<u64> {
- session.interrupt_tx.subscribe()
-}
-
-impl<Fs: Fuse> Session<Fs> {
- pub fn fs(&self) -> &Fs {
- &self.fs
+ pub(crate) fn ok(&self, unique: u64, output: OutputChain<'_>) -> FuseResult<()> {
+ self.send(unique, 0, output)
}
- pub async fn main_loop(self: Arc<Self>) -> FuseResult<()> {
- let this = Arc::clone(&self);
- let main_loop = async move {
- loop {
- let incoming = this.receive().await;
- let this = Arc::clone(&this);
-
- tokio::spawn(async move {
- let (result, header): (FuseResult<()>, Option<InHeader>) = match incoming {
- Ok(mut incoming) => match this.dispatch(&mut incoming).await {
- Ok(()) => (Ok(()), None),
-
- Err(error) => {
- let data = incoming.buffer.data();
- let data = &data[..std::mem::size_of::<InHeader>().max(data.len())];
- (Err(error), try_from_bytes(data).ok().copied())
- }
- },
-
- Err(error) => (Err(error.into()), None),
- };
-
- let header = display_or(header, "(bad)");
- if let Err(error) = result {
- log::error!("Handling request {}: {}", header, error);
- }
- });
- }
- };
+ pub(crate) fn fail(&self, unique: u64, mut errno: i32) -> FuseResult<()> {
+ if errno <= 0 {
+ log::warn!(
+ "Attempted to fail req#{} with errno {} <= 0, coercing to ENOMSG",
+ unique,
+ errno
+ );
- tokio::select! {
- () = main_loop => unreachable!(),
- () = self.destroy.notified() => Ok(()),
+ errno = Errno::ENOMSG as i32;
}
- }
- async fn do_handshake(
- &mut self,
- pages_per_buffer: usize,
- bytes_per_buffer: usize,
- ) -> FuseResult<Handshake> {
- use FuseError::*;
-
- let buffer = {
- self.session_fd.readable().await?.retain_ready();
- let large_buffer = self.large_buffers.get_mut().unwrap().first_mut().unwrap();
-
- let mut data = InputBufferStorage::Sbo(SboStorage([0; SBO_SIZE]));
- let sbo = match &mut data {
- InputBufferStorage::Sbo(SboStorage(sbo)) => sbo,
- _ => unreachable!(),
- };
+ self.send(unique, -errno, OutputChain::empty())
+ }
- let mut io_vecs = [
- IoVec::from_mut_slice(sbo),
- IoVec::from_mut_slice(large_buffer),
- ];
+ pub(crate) fn interrupt_rx(&self) -> broadcast::Receiver<u64> {
+ self.interrupt_tx.subscribe()
+ }
- let bytes = readv(*self.session_fd.get_ref(), &mut io_vecs).map_err(io::Error::from)?;
- InputBuffer { bytes, data }
- };
+ async fn handshake(&mut self, buffer: &mut Buffer) -> FuseResult<Handshake> {
+ self.session_fd.readable().await?.retain_ready();
+ let bytes = read(*self.session_fd.get_ref(), &mut buffer.0).map_err(io::Error::from)?;
- let request: proto::Request<'_> = buffer.data().try_into()?;
+ let (header, opcode) = InHeader::from_bytes(&buffer.0[..bytes])?;
+ let init = match opcode {
+ proto::Opcode::Init => <&proto::InitIn>::toplevel_from(&buffer.0[HEADER_END..bytes])?,
- let unique = request.header().unique;
- let init = match request.body() {
- proto::RequestBody::Init(body) => body,
- _ => return Err(ProtocolInit),
+ _ => {
+ log::error!("First message from kernel is not Init, but {:?}", opcode);
+ return Err(FuseError::ProtocolInit);
+ }
};
use std::cmp::Ordering;
@@ -156,7 +129,7 @@ impl<Fs: Fuse> Session<Fs> {
Ordering::Greater => {
let tail = [bytes_of(&proto::MAJOR_VERSION)];
- ok(self, unique, OutputChain::tail(&tail))?;
+ self.ok(header.unique, OutputChain::tail(&tail))?;
return Ok(Handshake::Restart);
}
@@ -175,40 +148,27 @@ impl<Fs: Fuse> Session<Fs> {
major = proto::MAJOR_VERSION
);
- fail(self, unique, Errno::EPROTONOSUPPORT as i32)?;
- return Err(ProtocolInit);
+ self.fail(header.unique, Errno::EPROTONOSUPPORT as i32)?;
+ return Err(FuseError::ProtocolInit);
}
- let root = {
- let mut init_result = Err(0);
- let reply = Reply {
- session: self,
- unique,
- tail: &mut init_result,
- };
+ let flags = {
+ use proto::InitFlags;
- self.fs.init(reply).await.into_result()?;
+ let kernel = InitFlags::from_bits_truncate(init.flags);
+ let supported = InitFlags::PARALLEL_DIROPS
+ | InitFlags::ABORT_ERROR
+ | InitFlags::MAX_PAGES
+ | InitFlags::CACHE_SYMLINKS;
- match init_result {
- Ok(root) => root,
- Err(errno) => {
- log::error!("init() handler failed: {}", Errno::from_i32(errno));
- return Err(FuseError::Io(std::io::Error::from_raw_os_error(errno)));
- }
- }
+ kernel & supported
};
- self.known.get_mut().unwrap().insert(Ino::ROOT, (root, 1));
+ let buffer_size = page_size() * self.buffer_pages;
- use proto::InitFlags;
- let flags = InitFlags::from_bits_truncate(init.flags);
- let supported = InitFlags::PARALLEL_DIROPS
- | InitFlags::ABORT_ERROR
- | InitFlags::MAX_PAGES
- | InitFlags::CACHE_SYMLINKS;
+ // See fs/fuse/dev.c in the kernel source tree for details about max_write
+ let max_write = buffer_size - std::mem::size_of::<(InHeader, proto::WriteIn)>();
- let flags = flags & supported;
- let max_write = bytes_per_buffer - std::mem::size_of::<(InHeader, proto::WriteIn)>();
let init_out = proto::InitOut {
major: proto::MAJOR_VERSION,
minor: proto::TARGET_MINOR_VERSION,
@@ -218,230 +178,17 @@ impl<Fs: Fuse> Session<Fs> {
congestion_threshold: 0, //TODO
max_write: max_write.try_into().unwrap(),
time_gran: 1, //TODO
- max_pages: pages_per_buffer.try_into().unwrap(),
+ max_pages: self.buffer_pages.try_into().unwrap(),
padding: Default::default(),
unused: Default::default(),
};
let tail = [bytes_of(&init_out)];
- ok(self, unique, OutputChain::tail(&tail))?;
+ self.ok(header.unique, OutputChain::tail(&tail))?;
Ok(Handshake::Done)
}
- async fn dispatch(self: &Arc<Self>, request: &mut Incoming) -> FuseResult<()> {
- let request: proto::Request<'_> = request.buffer.data().try_into()?;
- let header = request.header();
- let InHeader { unique, ino, .. } = *header;
- let ino = Ino(ino);
-
- use proto::RequestBody::*;
-
- macro_rules! op {
- () => {
- op!(())
- };
-
- ($body:expr) => {
- op!($body, ())
- };
-
- ($body:expr, $tail:expr) => {{
- let request = Request {
- header,
- body: $body,
- };
- let reply = Reply {
- session: &self,
- unique,
- tail: $tail,
- };
-
- (request, reply, self)
- }};
- }
-
- // These operations don't involve locking and searching self.known
- match request.body() {
- Forget(body) => {
- self.forget(std::iter::once((ino, body.nlookup))).await;
- return Ok(());
- }
-
- Statfs => return self.fs.statfs(op!()).await.into_result(),
-
- Interrupt(body) => {
- //TODO: Don't reply with EAGAIN if the interrupt is successful
- let _ = self.interrupt_tx.send(body.unique);
- return fail(self, unique, Errno::EAGAIN as i32);
- }
-
- Destroy => {
- self.destroy.notify_one();
- return Ok(());
- }
-
- BatchForget { forgets, .. } => {
- let forgets = forgets
- .iter()
- .map(|target| (Ino(target.ino), target.nlookup));
-
- self.forget(forgets).await;
- return Ok(());
- }
-
- _ => (),
- }
-
- // Some operations are handled while self.known is locked
- let inode = {
- let known = self.known.lock().unwrap();
- let inode = match known.get(&ino) {
- Some((farc, _)) => farc,
- None => {
- log::error!(
- "Lookup count for ino {} reached zero while still \
- known to the kernel, this is a bug",
- ino
- );
-
- return fail(self, unique, Errno::ENOANO as i32);
- }
- };
-
- match request.body() {
- Getattr(_) => {
- //TODO: Getattr flags
- let (attrs, ttl) = <Fs as Fuse>::Inode::attrs(inode);
- let attrs = attrs.finish::<Fs>(inode);
- drop(known);
-
- let out = proto::AttrOut {
- attr_valid: ttl.seconds,
- attr_valid_nsec: ttl.nanoseconds,
- dummy: Default::default(),
- attr: attrs,
- };
-
- return ok(self, unique, OutputChain::tail(&[bytes_of(&out)]));
- }
-
- Access(body) => {
- return <Fs as Fuse>::Inode::access(inode, op!(*body)).into_result()
- }
-
- _ => inode.clone(),
- }
- };
-
- macro_rules! inode_op {
- ($op:ident, $($exprs:expr),+) => {
- <Fs as Fuse>::Inode::$op(inode, op!($($exprs),+)).await
- };
- }
-
- // These operations involve a Farc cloned from self.known
- let done = match request.body() {
- Lookup { name } => inode_op!(lookup, *name),
- Readlink => inode_op!(readlink, ()),
- Open(body) => {
- let mut flags = proto::OpenOutFlags::empty();
- if <Fs as Fuse>::Inode::direct_io(&inode) {
- flags |= proto::OpenOutFlags::DIRECT_IO;
- }
-
- inode_op!(open, *body, (ino, flags))
- }
- Opendir(body) => inode_op!(opendir, *body),
- Readdir(body) => inode_op!(readdir, *body),
-
- _ => return fail(self, unique, Errno::ENOSYS as i32),
- };
-
- done.into_result()
- }
-
- async fn forget<I>(&self, targets: I)
- where
- I: Iterator<Item = (Ino, u64)>,
- {
- let mut known = self.known.lock().unwrap();
-
- for (ino, subtracted) in targets {
- use hash_map::Entry::*;
-
- match known.entry(ino) {
- Occupied(mut entry) => {
- let (_, count) = entry.get_mut();
-
- *count = count.saturating_sub(subtracted);
- if *count > 0 {
- continue;
- }
-
- entry.remove();
- }
-
- Vacant(_) => {
- log::warn!("Kernel attempted to forget {:?} (bad refcount?)", ino);
- continue;
- }
- }
- }
- }
-
- async fn receive(self: &Arc<Self>) -> std::io::Result<Incoming> {
- use InputBufferStorage::*;
-
- let permit = Arc::clone(&self.input_semaphore)
- .acquire_owned()
- .await
- .unwrap();
-
- let mut incoming = Incoming {
- buffer: InputBuffer {
- bytes: 0,
- data: Sbo(SboStorage([0; SBO_SIZE])),
- },
- };
-
- let sbo = match &mut incoming.buffer.data {
- Sbo(SboStorage(sbo)) => sbo,
- _ => unreachable!(),
- };
-
- loop {
- let mut readable = self.session_fd.readable().await?;
-
- let mut large_buffers = self.large_buffers.lock().unwrap();
- let large_buffer = large_buffers.last_mut().unwrap();
-
- let mut io_vecs = [
- IoVec::from_mut_slice(sbo),
- IoVec::from_mut_slice(&mut large_buffer[SBO_SIZE..]),
- ];
-
- let mut read = |fd: &AsyncFd<RawFd>| readv(*fd.get_ref(), &mut io_vecs);
- match readable.try_io(|fd| read(fd).map_err(io::Error::from)) {
- Ok(Ok(bytes)) => {
- if bytes > SBO_SIZE {
- (&mut large_buffer[..SBO_SIZE]).copy_from_slice(sbo);
- incoming.buffer.data = Spilled(large_buffers.pop().unwrap(), permit);
- }
-
- incoming.buffer.bytes = bytes;
- return Ok(incoming);
- }
-
- // Interrupted
- Ok(Err(error)) if error.kind() == std::io::ErrorKind::NotFound => continue,
-
- Ok(Err(error)) => return Err(error),
- Err(_) => continue,
- }
- }
- }
-
fn send(&self, unique: u64, error: i32, output: OutputChain<'_>) -> FuseResult<()> {
let after_header: usize = output
.iter()
@@ -479,81 +226,261 @@ impl<Fs: Fuse> Session<Fs> {
}
}
+impl<'o> Dispatch<'o> {
+ pub fn op(self) -> Op<'o> {
+ use Dispatch::*;
+
+ let common = match self {
+ Lookup(incoming) => incoming.common,
+ Getattr(incoming) => incoming.common,
+ Readlink(incoming) => incoming.common,
+ Read(incoming) => incoming.common,
+ Write(incoming) => incoming.common,
+ Statfs(incoming) => incoming.common,
+ Readdir(incoming) => incoming.common,
+ Access(incoming) => incoming.common,
+ Destroy(incoming) => incoming.common,
+ };
+
+ common.into_generic_op()
+ }
+}
+
+impl Endpoint<'_> {
+ pub async fn receive<'a, F, Fut>(&'a mut self, dispatcher: F) -> FuseResult<()>
+ where
+ F: FnOnce(Dispatch<'a>) -> Fut,
+ Fut: Future<Output = Done<'a>>,
+ {
+ let buffer = &mut self.local_buffer.0;
+ let bytes = loop {
+ let mut readable = self.session.session_fd.readable().await?;
+ let mut read = |fd: &AsyncFd<RawFd>| read(*fd.get_ref(), buffer);
+
+ let result = match readable.try_io(|fd| read(fd).map_err(io::Error::from)) {
+ Ok(result) => result,
+ Err(_) => continue,
+ };
+
+ match result {
+ // Interrupted
+ Err(error) if error.kind() == std::io::ErrorKind::NotFound => continue,
+
+ result => break result,
+ }
+ };
+
+ let (header, opcode) = proto::InHeader::from_bytes(&buffer[..bytes?])?;
+ let common = IncomingCommon {
+ session: self.session,
+ buffer: &mut self.local_buffer,
+ header,
+ };
+
+ let dispatch = {
+ use proto::Opcode::*;
+
+ macro_rules! dispatch {
+ ($op:ident) => {
+ Dispatch::$op(Incoming {
+ common,
+ _phantom: PhantomData,
+ })
+ };
+ }
+
+ match opcode {
+ Lookup => dispatch!(Lookup),
+ Getattr => dispatch!(Getattr),
+ Readlink => dispatch!(Readlink),
+ Read => dispatch!(Read),
+ Write => dispatch!(Write),
+ Statfs => dispatch!(Statfs),
+ Readdir => dispatch!(Readdir),
+ Access => dispatch!(Access),
+ Destroy => dispatch!(Destroy),
+
+ _ => {
+ log::warn!("Not implemented: {}", common.header);
+
+ let (_request, reply) = common.into_generic_op();
+ let _ = reply.not_implemented();
+
+ return Ok(());
+ }
+ }
+ };
+
+ let _ = dispatcher(dispatch).await;
+ Ok(())
+ }
+}
+
impl Start {
- pub async fn start<Fs: Fuse>(self, fs: Fs) -> FuseResult<Arc<Session<Fs>>> {
+ pub async fn start(self) -> FuseResult<Arc<Session>> {
let session_fd = self.session_fd.into_raw_fd();
let flags = OFlag::O_NONBLOCK | OFlag::O_LARGEFILE;
fcntl(session_fd, FcntlArg::F_SETFL(flags)).unwrap();
- let page_size = sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize;
- let pages_per_buffer = fs.request_buffer_pages().get();
- let bytes_per_buffer = pages_per_buffer.checked_mul(page_size).unwrap();
- assert!(bytes_per_buffer >= proto::MIN_READ_SIZE);
+ let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY);
- let mut large_buffers = Vec::with_capacity(fs.request_buffers().get());
- for _ in 0..large_buffers.capacity() {
- large_buffers.push(vec![0; bytes_per_buffer].into_boxed_slice());
- }
+ let buffer_pages = proto::MIN_READ_SIZE / page_size(); //TODO
+ let buffer_count = SHARED_BUFFERS; //TODO
+ let buffers = std::iter::repeat_with(|| Buffer::new(buffer_pages))
+ .take(buffer_count)
+ .collect();
- let (interrupt_tx, _) = broadcast::channel(INTERRUPT_BROADCAST_CAPACITY);
let mut session = Session {
_fusermount_fd: self.fusermount_fd,
session_fd: AsyncFd::with_interest(session_fd, tokio::io::Interest::READABLE)?,
- proto_minor: 0, // Set by Session::do_handshake()
- fs,
- input_semaphore: Arc::new(Semaphore::new(large_buffers.len())),
- large_buffers: Mutex::new(large_buffers),
- known: Mutex::new(HashMap::new()),
- destroy: Notify::new(),
interrupt_tx,
+ buffers: Mutex::new(buffers),
+ buffer_semaphore: Arc::new(Semaphore::new(buffer_count)),
+ proto_minor: 0, // Set by Session::do_handshake()
+ buffer_pages,
};
- loop {
- let state = session
- .do_handshake(pages_per_buffer, bytes_per_buffer)
- .await?;
+ let mut init_buffer = session.buffers.get_mut().unwrap().pop().unwrap();
- if let Handshake::Done = state {
+ loop {
+ if let Handshake::Done = session.handshake(&mut init_buffer).await? {
+ session.buffers.get_mut().unwrap().push(init_buffer);
break Ok(Arc::new(session));
}
}
}
-}
-enum Handshake {
- Done,
- Restart,
+ pub(crate) fn new(fusermount_fd: DumbFd, session_fd: DumbFd) -> Self {
+ Start {
+ fusermount_fd,
+ session_fd,
+ }
+ }
}
-struct Incoming {
- buffer: InputBuffer,
+impl<'o, O: Operation<'o>> Incoming<'o, O> {
+ pub fn op(self) -> Result<Op<'o, O>, Done<'o>> {
+ try_op(
+ self.common.session,
+ &self.common.buffer.0,
+ self.common.header,
+ )
+ }
}
-struct InputBuffer {
- pub bytes: usize,
- pub data: InputBufferStorage,
+impl<O: for<'o> Operation<'o>> Incoming<'_, O> {
+ pub async fn owned(self) -> Owned<O> {
+ let session = self.common.session;
+
+ let (buffer, permit) = {
+ let semaphore = Arc::clone(&session.buffer_semaphore);
+ let permit = semaphore
+ .acquire_owned()
+ .await
+ .expect("Buffer semaphore error");
+
+ let mut buffers = session.buffers.lock().unwrap();
+ let mut buffer = buffers.pop().expect("Buffer semaphore out of sync");
+
+ std::mem::swap(&mut buffer, self.common.buffer);
+ (buffer, permit)
+ };
+
+ Owned {
+ session: Arc::clone(session),
+ buffer,
+ header: self.common.header,
+ _permit: permit,
+ _phantom: PhantomData,
+ }
+ }
}
-enum InputBufferStorage {
- Sbo(SboStorage),
- Spilled(Box<[u8]>, OwnedSemaphorePermit),
+impl<O: for<'o> Operation<'o>> Owned<O> {
+ pub fn op(&self) -> Result<Op<'_, O>, Done<'_>> {
+ try_op(&self.session, &self.buffer.0, self.header)
+ }
}
-#[repr(align(8))]
-struct SboStorage(pub [u8; 4 * std::mem::size_of::<InHeader>()]);
+impl<O: for<'o> Operation<'o>> Drop for Owned<O> {
+ fn drop(&mut self) {
+ if let Ok(mut buffers) = self.session.buffers.lock() {
+ let empty = Buffer(Vec::new().into_boxed_slice());
+ buffers.push(std::mem::replace(&mut self.buffer, empty));
+ }
+ }
+}
-const SBO_SIZE: usize = std::mem::size_of::<SboStorage>();
const INTERRUPT_BROADCAST_CAPACITY: usize = 32;
+const SHARED_BUFFERS: usize = 32;
+const HEADER_END: usize = std::mem::size_of::<InHeader>();
-impl InputBuffer {
- fn data(&self) -> &[u8] {
- use InputBufferStorage::*;
- let storage = match &self.data {
- Sbo(sbo) => &sbo.0,
- Spilled(buffer, _) => &buffer[..],
+struct IncomingCommon<'o> {
+ session: &'o Arc<Session>,
+ buffer: &'o mut Buffer,
+ header: proto::InHeader,
+}
+
+enum Handshake {
+ Done,
+ Restart,
+}
+
+struct Buffer(Box<[u8]>);
+
+impl<'o> IncomingCommon<'o> {
+ fn into_generic_op(self) -> Op<'o> {
+ let request = Request {
+ header: self.header,
+ body: (),
};
- &storage[..self.bytes]
+ let reply = Reply {
+ session: self.session,
+ unique: self.header.unique,
+ tail: (),
+ };
+
+ (request, reply)
+ }
+}
+
+impl Buffer {
+ fn new(pages: usize) -> Self {
+ Buffer(vec![0; pages * page_size()].into_boxed_slice())
}
}
+
+fn try_op<'o, O: Operation<'o>>(
+ session: &'o Session,
+ bytes: &'o [u8],
+ header: proto::InHeader,
+) -> Result<Op<'o, O>, Done<'o>> {
+ let body = match Structured::toplevel_from(&bytes[HEADER_END..header.len as usize]) {
+ Ok(body) => body,
+ Err(error) => {
+ log::error!("Parsing request {}: {}", header, error);
+ let reply = Reply::<ops::Any> {
+ session,
+ unique: header.unique,
+ tail: (),
+ };
+
+ return Err(reply.io_error());
+ }
+ };
+
+ let request = Request { header, body };
+ let reply = Reply {
+ session,
+ unique: header.unique,
+ tail: Default::default(),
+ };
+
+ Ok((request, reply))
+}
+
+fn page_size() -> usize {
+ sysconf(SysconfVar::PAGE_SIZE).unwrap().unwrap() as usize
+}