From de5ce128cb76ee6bb46f22a44cb6c8d85aa847f6 Mon Sep 17 00:00:00 2001 From: Laurent Pelecq Date: Fri, 25 Mar 2022 18:42:04 +0100 Subject: [PATCH] Change AsyncClient to QueuedClient to use with different polling mechanism. * When compiled without mio, QueuedClient returns the raw fd. * When compiled with mio, there is a register method. --- Cargo.toml | 3 +- examples/async_mio_loop.rs | 6 +- examples/async_popol_loop.rs | 119 ++++++++++++++++++++++++++++++++++ src/client.rs | 16 ++++- src/fifo.rs | 23 +++++-- src/lib.rs | 10 +-- src/{async_mio.rs => poll.rs} | 32 +++++++-- tests/fifo_async_tests.rs | 6 +- 8 files changed, 188 insertions(+), 27 deletions(-) create mode 100644 examples/async_popol_loop.rs rename src/{async_mio.rs => poll.rs} (74%) diff --git a/Cargo.toml b/Cargo.toml index e31da07..ba4cb97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ssip-client" -version = "0.5.1" +version = "0.6.0" authors = ["Laurent Pelecq "] edition = "2018" description = "Client API for Speech Dispatcher" @@ -23,5 +23,6 @@ async-mio = ["mio/net", "mio/os-poll"] [dev-dependencies] libc = "0" mio = { version = "0.8", features = ["os-poll", "os-ext"] } +popol = "0.5" tempfile = "3" diff --git a/examples/async_mio_loop.rs b/examples/async_mio_loop.rs index 9945f0e..1a58a91 100644 --- a/examples/async_mio_loop.rs +++ b/examples/async_mio_loop.rs @@ -8,7 +8,7 @@ use std::{ }; #[cfg(feature = "async-mio")] -use ssip_client::{fifo, AsyncClient, ClientError, ClientName, ClientResult, Request, Response}; +use ssip_client::{fifo, QueuedClient, ClientError, ClientName, ClientResult, Request, Response}; #[cfg(feature = "async-mio")] fn main() -> ClientResult<()> { @@ -25,7 +25,7 @@ fn main() -> ClientResult<()> { .register(&mut source_fd, stdin_token, Interest::READABLE)?; // Register the SSIP client - let mut ssip_client = AsyncClient::new(fifo::Builder::new().build()?); + let mut ssip_client = QueuedClient::new(fifo::Builder::new().build()?); let speech_input_token = Token(1); let speech_output_token = Token(2); ssip_client.register(&poll, speech_input_token, speech_output_token)?; @@ -91,7 +91,7 @@ fn main() -> ClientResult<()> { Err(ClientError::NotReady) => speech_writable = false, Err(ClientError::Io(err)) => return Err(ClientError::from(err)), Err(_) => panic!("internal error"), - Ok(()) => (), + Ok(_) => (), } } } diff --git a/examples/async_popol_loop.rs b/examples/async_popol_loop.rs new file mode 100644 index 0000000..699b050 --- /dev/null +++ b/examples/async_popol_loop.rs @@ -0,0 +1,119 @@ +#[cfg(not(feature = "async-mio"))] +use std::{ + collections::VecDeque, + io::{self, Write}, +}; + +#[cfg(not(feature = "async-mio"))] +use ssip_client::{fifo, ClientError, ClientName, ClientResult, QueuedClient, Request, Response}; + +#[cfg(not(feature = "async-mio"))] +fn main() -> ClientResult<()> { + #[derive(Clone, Eq, PartialEq)] + enum SourceKey { + Stdin, + SpeechIn, + SpeechOut, + } + + let mut sources = popol::Sources::with_capacity(2); + let mut events = popol::Events::with_capacity(4); + + let stdin = io::stdin(); + let mut ssip_client = QueuedClient::new(fifo::Builder::new().nonblocking().build()?); + + sources.register(SourceKey::Stdin, &stdin, popol::interest::READ); + sources.register( + SourceKey::SpeechIn, + ssip_client.input_source(), + popol::interest::READ, + ); + sources.register( + SourceKey::SpeechOut, + ssip_client.output_source(), + popol::interest::WRITE, + ); + + // Loop for events + let mut speech_writable = false; + let mut send_requests = VecDeque::with_capacity(4); + ssip_client.push(Request::SetName(ClientName::new("joe", "async"))); + + fn prompt() -> io::Result<()> { + let mut stdout = io::stdout(); + write!(stdout, "> ")?; + stdout.flush() + } + + println!("Enter an empty line to quit."); + prompt()?; + loop { + sources.wait(&mut events)?; + for (key, _event) in events.iter() { + match key { + SourceKey::Stdin => { + let mut text = String::new(); + stdin.read_line(&mut text)?; + text = text.trim_end().to_string(); + match text.len() { + 0 => return Ok(()), + 1 => { + if let Some(ch) = text.chars().next() { + println!("sending char: {}", ch); + ssip_client.push(Request::SpeakChar(ch)) + } + } + _ => { + println!("sending line: {}", text); + send_requests.push_back(Request::SendLine(text.to_owned())); + ssip_client.push(Request::Speak); + } + } + prompt()?; + } + SourceKey::SpeechIn => match ssip_client.receive_next() { + Err(ClientError::Io(err)) => return Err(ClientError::from(err)), + Err(ClientError::Ssip(err)) => eprintln!("SSIP error: {:?}", err), + Err(_) => panic!("internal error"), + Ok(result) => match result { + Response::MessageQueued | Response::ClientNameSet => (), + Response::ReceivingData => { + ssip_client.push(send_requests.pop_front().unwrap()) + } + _ => panic!("Unexpected response: {:?}", result), + }, + }, + SourceKey::SpeechOut => { + speech_writable = true; + } + } + } + if speech_writable { + if sources.len() >= 3 { + sources.unregister(&SourceKey::SpeechOut); + } + + loop { + match ssip_client.send_next() { + Err(ClientError::NotReady) => speech_writable = false, + Err(ClientError::Io(err)) => return Err(ClientError::from(err)), + Err(_) => panic!("internal error"), + Ok(true) => (), + Ok(false) => break, + } + } + } + if !speech_writable && ssip_client.has_next() { + sources.register( + SourceKey::SpeechOut, + ssip_client.output_source(), + popol::interest::WRITE, + ); + } + } +} + +#[cfg(feature = "async-mio")] +fn main() { + println!("see async_mio_loop for an example of asynchronous client."); +} diff --git a/src/client.rs b/src/client.rs index b5f3de3..e51f45b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -16,8 +16,8 @@ use crate::protocol::{ use crate::types::*; // Trick to have common implementation for std and mio streams.. -#[cfg(not(feature = "async-mio"))] -pub(crate) use std::fmt::Debug as Source; +#[cfg(all(not(feature = "async-mio"), unix))] +pub(crate) use std::os::unix::io::AsRawFd as Source; #[cfg(feature = "async-mio")] pub(crate) use mio::event::Source; @@ -179,6 +179,18 @@ impl Client { Self { input, output } } + #[cfg(all(not(feature = "async-mio"), unix))] + /// Input source for asynchronous API based on `poll`. + pub(crate) fn input_source(&self) -> &S { + self.input.get_ref() + } + + #[cfg(all(not(feature = "async-mio"), unix))] + /// Output source for asynchronous API based on `poll`. + pub(crate) fn output_source(&self) -> &S { + self.output.get_ref() + } + /// Send lines of text (terminated by a single dot). pub fn send_lines(&mut self, lines: &[String]) -> ClientResult<&mut Self> { const END_OF_DATA: [&str; 1] = ["."]; diff --git a/src/fifo.rs b/src/fifo.rs index 308846f..a15bc08 100644 --- a/src/fifo.rs +++ b/src/fifo.rs @@ -61,16 +61,22 @@ mod synchronous { use super::FifoPath; + enum FifoMode { + Blocking, + NonBlocking, + TimeOut(Duration), + } + pub struct Builder { path: FifoPath, - read_timeout: Option, + mode: FifoMode, } impl Builder { pub fn new() -> Self { Self { path: FifoPath::new(), - read_timeout: None, + mode: FifoMode::Blocking, } } @@ -83,13 +89,22 @@ mod synchronous { } pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self { - self.read_timeout = Some(read_timeout); + self.mode = FifoMode::TimeOut(read_timeout); + self + } + + pub fn nonblocking(&mut self) -> &mut Self { + self.mode = FifoMode::NonBlocking; self } pub fn build(&self) -> io::Result> { let input = UnixStream::connect(self.path.get()?)?; - input.set_read_timeout(self.read_timeout)?; + match self.mode { + FifoMode::Blocking => input.set_nonblocking(false)?, + FifoMode::NonBlocking => input.set_nonblocking(true)?, + FifoMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?, + } let output = input.try_clone()?; Ok(Client::new(BufReader::new(input), BufWriter::new(output))) } diff --git a/src/lib.rs b/src/lib.rs index 9436be7..4f457a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ //! `ssip-client` implements a Speech Dispatcher SSIP client library in //! pure rust. //! -//! See [`client::Client`] for the synchronous API and [`async_mio::AsyncClient`] for the asynchronous API based on [mio](https://crates.io/crates/mio). +//! See [`client::Client`] for the synchronous API and [`poll::QueuedClient`] for the asynchronous API. //! //! Example //! ```no_run @@ -29,6 +29,7 @@ #[macro_use] mod protocol; +mod poll; mod types; pub mod client; @@ -40,10 +41,5 @@ pub use client::Client; pub use client::{Request, Response}; pub use constants::*; +pub use poll::QueuedClient; pub use types::*; - -#[cfg(any(feature = "async-mio", doc))] -mod async_mio; - -#[cfg(any(feature = "async-mio", doc))] -pub use async_mio::AsyncClient; diff --git a/src/async_mio.rs b/src/poll.rs similarity index 74% rename from src/async_mio.rs rename to src/poll.rs index 9e32c25..a4568f5 100644 --- a/src/async_mio.rs +++ b/src/poll.rs @@ -8,7 +8,7 @@ // modified, or distributed except according to those terms. use std::collections::VecDeque; -use std::io::{self, Read, Write}; +use std::io::{Read, Write}; use crate::{ client::{Client, Request, Response, Source}, @@ -31,15 +31,18 @@ mod mio { const INITIAL_REQUEST_QUEUE_CAPACITY: usize = 4; -/// Asynchronous client based on `mio`. +/// Client with a queue of requests. /// +/// The client can be used with crates like [popol](https://crates.io/crates/popol) or +/// with [mio](https://crates.io/crates/mio) if feature `async-mio` is enabled. /// -pub struct AsyncClient { +/// When the output is ready, a next event can be sent. +pub struct QueuedClient { client: Client, requests: VecDeque, } -impl AsyncClient { +impl QueuedClient { /// New asynchronous client build on top of a synchronous client. pub fn new(client: Client) -> Self { Self { @@ -48,13 +51,26 @@ impl AsyncClient { } } + #[cfg(all(not(feature = "async-mio"), unix))] + /// Input source. + pub fn input_source(&self) -> &S { + self.client.input_source() + } + + #[cfg(all(not(feature = "async-mio"), unix))] + /// Output source. + pub fn output_source(&self) -> &S { + self.client.output_source() + } + + #[cfg(any(feature = "async-mio", doc))] /// Register client pub fn register( &mut self, poll: &mio::Poll, input_token: mio::Token, output_token: mio::Token, - ) -> io::Result<()> { + ) -> std::io::Result<()> { self.client.register(poll, input_token, output_token) } @@ -82,11 +98,13 @@ impl AsyncClient { /// /// Instance of `mio::Poll` generates a writable event only once until the socket returns `WouldBlock`. /// This error is mapped to `ClientError::NotReady`. - pub fn send_next(&mut self) -> ClientResult<()> { + pub fn send_next(&mut self) -> ClientResult { if let Some(request) = self.requests.pop_front() { self.client.send(request)?; + Ok(true) + } else { + Ok(false) } - Ok(()) } /// Receive one response. diff --git a/tests/fifo_async_tests.rs b/tests/fifo_async_tests.rs index 3c90f48..29af144 100644 --- a/tests/fifo_async_tests.rs +++ b/tests/fifo_async_tests.rs @@ -94,7 +94,7 @@ fn basic_async_communication() -> ClientResult<()> { let handle = Server::run(&socket_path, &COMMUNICATION); let mut poll = Poll::new()?; let mut events = Events::with_capacity(128); - let mut client = AsyncClient::new(fifo::Builder::new().path(&socket_path).build()?); + let mut client = QueuedClient::new(fifo::Builder::new().path(&socket_path).build()?); let input_token = Token(0); let output_token = Token(1); let timeout = Duration::new(0, 500 * 1000 * 1000 /* 500 ms */); @@ -128,9 +128,9 @@ fn basic_async_communication() -> ClientResult<()> { } if state.must_send() { match client.send_next() { - Ok(()) => (), + Ok(_) => (), Err(ClientError::NotReady) => state.writable = false, - err => return err, + Err(err) => return Err(err), } } }