From 86cfef51cf89d43bbeefa9a06ba1f80ef8e8ab15 Mon Sep 17 00:00:00 2001 From: Alejandro Soto Date: Thu, 30 Dec 2021 10:14:45 -0600 Subject: Fix use semantics of Incoming<'o, O>::owned() Lifetime requirements involving Done<'o> previously prevented any proper use of Owned. --- src/fuse/session.rs | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) (limited to 'src/fuse/session.rs') diff --git a/src/fuse/session.rs b/src/fuse/session.rs index 7441230..d54549c 100644 --- a/src/fuse/session.rs +++ b/src/fuse/session.rs @@ -73,7 +73,7 @@ pub struct Incoming<'o, O: Operation<'o>> { _phantom: PhantomData, } -pub struct Owned Operation<'o>> { +pub struct Owned { session: Arc, buffer: Buffer, header: InHeader, @@ -174,7 +174,7 @@ impl Session { }, }; - let _ = init((request, reply)); + init((request, reply)).consume(); Ok(Handshake::Done) } @@ -240,10 +240,10 @@ impl<'o> Dispatch<'o> { } impl Endpoint<'_> { - pub async fn receive<'a, F, Fut>(&'a mut self, dispatcher: F) -> FuseResult> + pub async fn receive<'o, F, Fut>(&'o mut self, dispatcher: F) -> FuseResult> where - F: FnOnce(Dispatch<'a>) -> Fut, - Fut: Future>, + F: FnOnce(Dispatch<'o>) -> Fut, + Fut: Future>, { let buffer = &mut self.local_buffer.0; let bytes = loop { @@ -309,14 +309,14 @@ impl Endpoint<'_> { log::warn!("Not implemented: {}", common.header); let (_request, reply) = common.into_generic_op(); - let _ = reply.not_implemented(); + reply.not_implemented().consume(); return Ok(ControlFlow::Continue(())); } } }; - let _ = dispatcher(dispatch).await; + dispatcher(dispatch).await.consume(); Ok(ControlFlow::Continue(())) } } @@ -381,10 +381,8 @@ where self.common.header, ) } -} -impl Operation<'o>> Incoming<'_, O> { - pub async fn owned(self) -> Owned { + pub async fn owned(self) -> (Done<'o>, Owned) { let session = self.common.session; let (buffer, permit) = { @@ -395,19 +393,21 @@ impl Operation<'o>> Incoming<'_, O> { .expect("Buffer semaphore error"); let mut buffers = session.buffers.lock().unwrap(); - let mut buffer = buffers.pop().expect("Buffer semaphore out of sync"); + let buffer = buffers.pop().expect("Buffer semaphore out of sync"); + let buffer = std::mem::replace(self.common.buffer, buffer); - std::mem::swap(&mut buffer, self.common.buffer); (buffer, permit) }; - Owned { + let owned = Owned { session: Arc::clone(session), buffer, header: self.common.header, _permit: permit, _phantom: PhantomData, - } + }; + + (Done::done(), owned) } } @@ -415,12 +415,19 @@ impl Operation<'o>> Owned where for<'o> >::ReplyTail: FromRequest<'o, O>, { - pub fn op(&self) -> Result, Done<'_>> { - try_op(&self.session, &self.buffer.0, self.header) + pub async fn op<'o, F, Fut>(&'o self, handler: F) + where + F: FnOnce(Op<'o, O>) -> Fut, + Fut: Future>, + { + match try_op(&self.session, &self.buffer.0, self.header) { + Ok(op) => handler(op).await.consume(), + Err(done) => done.consume(), + } } } -impl Operation<'o>> Drop for Owned { +impl Drop for Owned { fn drop(&mut self) { if let Ok(mut buffers) = self.session.buffers.lock() { let empty = Buffer(Vec::new().into_boxed_slice()); -- cgit v1.2.3