summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/ext2.rs62
-rw-r--r--src/fuse/io.rs38
2 files changed, 64 insertions, 36 deletions
diff --git a/examples/ext2.rs b/examples/ext2.rs
index fc39567..b085960 100644
--- a/examples/ext2.rs
+++ b/examples/ext2.rs
@@ -345,32 +345,30 @@ impl Ext2 {
async fn getattr<'o>(&self, (request, reply): Op<'o, Getattr>) -> Done<'o> {
let ino = request.ino();
- let (reply, inode) = reply.fallible(self.inode(ino))?;
+ let (reply, inode) = reply.and_then(self.inode(ino))?;
reply.known(&Resolved { ino, inode })
}
async fn lookup<'o>(&self, (request, reply): Op<'o, Lookup>) -> Done<'o> {
let name = request.name();
- let (reply, parent) = reply.fallible(self.inode(request.ino()))?;
+ let (mut reply, parent) = reply.and_then(self.inode(request.ino()))?;
//TODO: Indexed directories
- let lookup = async {
- let stream = self.directory_stream(parent, 0);
- tokio::pin!(stream);
+ let stream = self.directory_stream(parent, 0);
+ tokio::pin!(stream);
- loop {
- match stream.try_next().await? {
- Some(entry) if entry.name == name => break Ok(Some(entry.inode)),
- Some(_) => continue,
- None => break Ok(None),
- }
+ let inode = loop {
+ let (next_reply, entry) = reply.and_then(stream.try_next().await)?;
+ reply = next_reply;
+
+ match entry {
+ Some(entry) if entry.name == name => break Some(entry.inode),
+ Some(_) => continue,
+ None => break None,
}
};
- let (reply, result) = reply.interruptible(lookup).await?;
- let (reply, inode) = reply.fallible(result)?;
-
if let Some(inode) = inode {
reply.found(inode, Ttl::MAX)
} else {
@@ -380,7 +378,7 @@ impl Ext2 {
async fn readlink<'o>(&self, (request, reply): Op<'o, Readlink>) -> Done<'o> {
let ino = request.ino();
- let (reply, inode) = reply.fallible(self.inode(ino))?;
+ let (mut reply, inode) = reply.and_then(self.inode(ino))?;
let resolved = Resolved { ino, inode };
if resolved.inode_type() != Type::Symlink {
@@ -392,39 +390,35 @@ impl Ext2 {
return reply.target(OsStr::from_bytes(&cast_slice(&inode.i_block)[..size]));
}
- let segments = async {
- /* This is unlikely to ever spill, and is guaranteed not to
- * do so for valid symlinks on any fs where block_size >= 4096.
- */
- let mut segments = SmallVec::<[&[u8]; 1]>::new();
- let (mut size, mut offset) = (size, 0);
+ /* This is unlikely to ever spill, and is guaranteed not to
+ * do so for valid symlinks on any fs where block_size >= 4096.
+ */
+ let mut segments = SmallVec::<[&[u8]; 1]>::new();
+ let (mut size, mut offset) = (size, 0);
- while size > 0 {
- let segment = self.seek_contiguous(inode, offset)?;
- let segment = &segment[..segment.len().min(size)];
-
- segments.push(segment);
+ while size > 0 {
+ let (next_reply, segment) = reply.and_then(self.seek_contiguous(inode, offset))?;
+ reply = next_reply;
- size -= segment.len();
- offset += segment.len() as u64;
- }
+ let segment = &segment[..segment.len().min(size)];
+ segments.push(segment);
- Ok(segments)
- };
+ size -= segment.len();
+ offset += segment.len() as u64;
+ }
- let (reply, segments) = reply.fallible(segments.await)?;
reply.gather_target(&segments)
}
async fn readdir<'o>(&self, (request, reply): Op<'o, Readdir>) -> Done<'o> {
- let (reply, inode) = reply.fallible(self.inode(request.ino()))?;
+ let (reply, inode) = reply.and_then(self.inode(request.ino()))?;
let mut reply = reply.buffered(Vec::new());
let stream = self.directory_stream(inode, request.offset());
tokio::pin!(stream);
while let Some(entry) = stream.next().await {
- let (next_reply, entry) = reply.fallible(entry)?;
+ let (next_reply, entry) = reply.and_then(entry)?;
let (next_reply, ()) = next_reply.entry(entry)?;
reply = next_reply;
}
diff --git a/src/fuse/io.rs b/src/fuse/io.rs
index c0356ff..f9fa908 100644
--- a/src/fuse/io.rs
+++ b/src/fuse/io.rs
@@ -38,6 +38,12 @@ pub trait Known {
fn unveil(self);
}
+pub struct Failed<'o, E>(pub Done<'o>, pub E);
+
+pub trait Finish<'o, O: Operation<'o>> {
+ fn finish(&self, reply: Reply<'o, O>) -> Done<'o>;
+}
+
#[derive(Clone)]
pub struct Attrs(proto::Attrs);
@@ -51,6 +57,28 @@ pub struct Entry<'a, K> {
#[derive(Copy, Clone)]
pub struct FsInfo(proto::StatfsOut);
+impl<'o, E> From<Failed<'o, E>> for Done<'o> {
+ fn from(failed: Failed<'o, E>) -> Done<'o> {
+ failed.0
+ }
+}
+
+impl<'o, O: Operation<'o>> Finish<'o, O> for Errno {
+ fn finish(&self, reply: Reply<'o, O>) -> Done<'o> {
+ reply.fail(*self)
+ }
+}
+
+impl<'o, O: Operation<'o>> Finish<'o, O> for std::io::Error {
+ fn finish(&self, reply: Reply<'o, O>) -> Done<'o> {
+ reply.fail(
+ self.raw_os_error()
+ .map(Errno::from_i32)
+ .unwrap_or(Errno::EIO),
+ )
+ }
+}
+
impl<'o, O: Operation<'o>> Request<'o, O> {
pub fn ino(&self) -> Ino {
Ino(self.header.ino)
@@ -97,10 +125,16 @@ impl<'o, O: Operation<'o>> Reply<'o, O> {
}
}
- pub fn fallible<T>(self, result: Result<T, Errno>) -> Result<(Self, T), Done<'o>> {
+ pub fn and_then<T, E>(self, result: Result<T, E>) -> Result<(Self, T), Failed<'o, E>>
+ where
+ E: Finish<'o, O>,
+ {
match result {
Ok(t) => Ok((self, t)),
- Err(errno) => Err(self.fail(errno)),
+ Err(error) => {
+ let done = error.finish(self);
+ Err(Failed(done, error))
+ }
}
}