Change AsyncClient to QueuedClient to use with different polling mechanism.

* When compiled without mio, QueuedClient returns the raw fd.
* When compiled with mio, there is a register method.
main
Laurent Pelecq 2 years ago
parent b5ec9e37ad
commit de5ce128cb

@ -1,6 +1,6 @@
[package] [package]
name = "ssip-client" name = "ssip-client"
version = "0.5.1" version = "0.6.0"
authors = ["Laurent Pelecq <lpelecq+rust@circoise.eu>"] authors = ["Laurent Pelecq <lpelecq+rust@circoise.eu>"]
edition = "2018" edition = "2018"
description = "Client API for Speech Dispatcher" description = "Client API for Speech Dispatcher"
@ -23,5 +23,6 @@ async-mio = ["mio/net", "mio/os-poll"]
[dev-dependencies] [dev-dependencies]
libc = "0" libc = "0"
mio = { version = "0.8", features = ["os-poll", "os-ext"] } mio = { version = "0.8", features = ["os-poll", "os-ext"] }
popol = "0.5"
tempfile = "3" tempfile = "3"

@ -8,7 +8,7 @@ use std::{
}; };
#[cfg(feature = "async-mio")] #[cfg(feature = "async-mio")]
use ssip_client::{fifo, AsyncClient, ClientError, ClientName, ClientResult, Request, Response}; use ssip_client::{fifo, QueuedClient, ClientError, ClientName, ClientResult, Request, Response};
#[cfg(feature = "async-mio")] #[cfg(feature = "async-mio")]
fn main() -> ClientResult<()> { fn main() -> ClientResult<()> {
@ -25,7 +25,7 @@ fn main() -> ClientResult<()> {
.register(&mut source_fd, stdin_token, Interest::READABLE)?; .register(&mut source_fd, stdin_token, Interest::READABLE)?;
// Register the SSIP client // Register the SSIP client
let mut ssip_client = AsyncClient::new(fifo::Builder::new().build()?); let mut ssip_client = QueuedClient::new(fifo::Builder::new().build()?);
let speech_input_token = Token(1); let speech_input_token = Token(1);
let speech_output_token = Token(2); let speech_output_token = Token(2);
ssip_client.register(&poll, speech_input_token, speech_output_token)?; ssip_client.register(&poll, speech_input_token, speech_output_token)?;
@ -91,7 +91,7 @@ fn main() -> ClientResult<()> {
Err(ClientError::NotReady) => speech_writable = false, Err(ClientError::NotReady) => speech_writable = false,
Err(ClientError::Io(err)) => return Err(ClientError::from(err)), Err(ClientError::Io(err)) => return Err(ClientError::from(err)),
Err(_) => panic!("internal error"), Err(_) => panic!("internal error"),
Ok(()) => (), Ok(_) => (),
} }
} }
} }

@ -0,0 +1,119 @@
#[cfg(not(feature = "async-mio"))]
use std::{
collections::VecDeque,
io::{self, Write},
};
#[cfg(not(feature = "async-mio"))]
use ssip_client::{fifo, ClientError, ClientName, ClientResult, QueuedClient, Request, Response};
#[cfg(not(feature = "async-mio"))]
fn main() -> ClientResult<()> {
#[derive(Clone, Eq, PartialEq)]
enum SourceKey {
Stdin,
SpeechIn,
SpeechOut,
}
let mut sources = popol::Sources::with_capacity(2);
let mut events = popol::Events::with_capacity(4);
let stdin = io::stdin();
let mut ssip_client = QueuedClient::new(fifo::Builder::new().nonblocking().build()?);
sources.register(SourceKey::Stdin, &stdin, popol::interest::READ);
sources.register(
SourceKey::SpeechIn,
ssip_client.input_source(),
popol::interest::READ,
);
sources.register(
SourceKey::SpeechOut,
ssip_client.output_source(),
popol::interest::WRITE,
);
// Loop for events
let mut speech_writable = false;
let mut send_requests = VecDeque::with_capacity(4);
ssip_client.push(Request::SetName(ClientName::new("joe", "async")));
fn prompt() -> io::Result<()> {
let mut stdout = io::stdout();
write!(stdout, "> ")?;
stdout.flush()
}
println!("Enter an empty line to quit.");
prompt()?;
loop {
sources.wait(&mut events)?;
for (key, _event) in events.iter() {
match key {
SourceKey::Stdin => {
let mut text = String::new();
stdin.read_line(&mut text)?;
text = text.trim_end().to_string();
match text.len() {
0 => return Ok(()),
1 => {
if let Some(ch) = text.chars().next() {
println!("sending char: {}", ch);
ssip_client.push(Request::SpeakChar(ch))
}
}
_ => {
println!("sending line: {}", text);
send_requests.push_back(Request::SendLine(text.to_owned()));
ssip_client.push(Request::Speak);
}
}
prompt()?;
}
SourceKey::SpeechIn => match ssip_client.receive_next() {
Err(ClientError::Io(err)) => return Err(ClientError::from(err)),
Err(ClientError::Ssip(err)) => eprintln!("SSIP error: {:?}", err),
Err(_) => panic!("internal error"),
Ok(result) => match result {
Response::MessageQueued | Response::ClientNameSet => (),
Response::ReceivingData => {
ssip_client.push(send_requests.pop_front().unwrap())
}
_ => panic!("Unexpected response: {:?}", result),
},
},
SourceKey::SpeechOut => {
speech_writable = true;
}
}
}
if speech_writable {
if sources.len() >= 3 {
sources.unregister(&SourceKey::SpeechOut);
}
loop {
match ssip_client.send_next() {
Err(ClientError::NotReady) => speech_writable = false,
Err(ClientError::Io(err)) => return Err(ClientError::from(err)),
Err(_) => panic!("internal error"),
Ok(true) => (),
Ok(false) => break,
}
}
}
if !speech_writable && ssip_client.has_next() {
sources.register(
SourceKey::SpeechOut,
ssip_client.output_source(),
popol::interest::WRITE,
);
}
}
}
#[cfg(feature = "async-mio")]
fn main() {
println!("see async_mio_loop for an example of asynchronous client.");
}

@ -16,8 +16,8 @@ use crate::protocol::{
use crate::types::*; use crate::types::*;
// Trick to have common implementation for std and mio streams.. // Trick to have common implementation for std and mio streams..
#[cfg(not(feature = "async-mio"))] #[cfg(all(not(feature = "async-mio"), unix))]
pub(crate) use std::fmt::Debug as Source; pub(crate) use std::os::unix::io::AsRawFd as Source;
#[cfg(feature = "async-mio")] #[cfg(feature = "async-mio")]
pub(crate) use mio::event::Source; pub(crate) use mio::event::Source;
@ -179,6 +179,18 @@ impl<S: Read + Write + Source> Client<S> {
Self { input, output } Self { input, output }
} }
#[cfg(all(not(feature = "async-mio"), unix))]
/// Input source for asynchronous API based on `poll`.
pub(crate) fn input_source(&self) -> &S {
self.input.get_ref()
}
#[cfg(all(not(feature = "async-mio"), unix))]
/// Output source for asynchronous API based on `poll`.
pub(crate) fn output_source(&self) -> &S {
self.output.get_ref()
}
/// Send lines of text (terminated by a single dot). /// Send lines of text (terminated by a single dot).
pub fn send_lines(&mut self, lines: &[String]) -> ClientResult<&mut Self> { pub fn send_lines(&mut self, lines: &[String]) -> ClientResult<&mut Self> {
const END_OF_DATA: [&str; 1] = ["."]; const END_OF_DATA: [&str; 1] = ["."];

@ -61,16 +61,22 @@ mod synchronous {
use super::FifoPath; use super::FifoPath;
enum FifoMode {
Blocking,
NonBlocking,
TimeOut(Duration),
}
pub struct Builder { pub struct Builder {
path: FifoPath, path: FifoPath,
read_timeout: Option<Duration>, mode: FifoMode,
} }
impl Builder { impl Builder {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
path: FifoPath::new(), path: FifoPath::new(),
read_timeout: None, mode: FifoMode::Blocking,
} }
} }
@ -83,13 +89,22 @@ mod synchronous {
} }
pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self { pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self {
self.read_timeout = Some(read_timeout); self.mode = FifoMode::TimeOut(read_timeout);
self
}
pub fn nonblocking(&mut self) -> &mut Self {
self.mode = FifoMode::NonBlocking;
self self
} }
pub fn build(&self) -> io::Result<Client<UnixStream>> { pub fn build(&self) -> io::Result<Client<UnixStream>> {
let input = UnixStream::connect(self.path.get()?)?; let input = UnixStream::connect(self.path.get()?)?;
input.set_read_timeout(self.read_timeout)?; match self.mode {
FifoMode::Blocking => input.set_nonblocking(false)?,
FifoMode::NonBlocking => input.set_nonblocking(true)?,
FifoMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?,
}
let output = input.try_clone()?; let output = input.try_clone()?;
Ok(Client::new(BufReader::new(input), BufWriter::new(output))) Ok(Client::new(BufReader::new(input), BufWriter::new(output)))
} }

@ -12,7 +12,7 @@
//! `ssip-client` implements a Speech Dispatcher SSIP client library in //! `ssip-client` implements a Speech Dispatcher SSIP client library in
//! pure rust. //! pure rust.
//! //!
//! See [`client::Client`] for the synchronous API and [`async_mio::AsyncClient`] for the asynchronous API based on [mio](https://crates.io/crates/mio). //! See [`client::Client`] for the synchronous API and [`poll::QueuedClient`] for the asynchronous API.
//! //!
//! Example //! Example
//! ```no_run //! ```no_run
@ -29,6 +29,7 @@
#[macro_use] #[macro_use]
mod protocol; mod protocol;
mod poll;
mod types; mod types;
pub mod client; pub mod client;
@ -40,10 +41,5 @@ pub use client::Client;
pub use client::{Request, Response}; pub use client::{Request, Response};
pub use constants::*; pub use constants::*;
pub use poll::QueuedClient;
pub use types::*; pub use types::*;
#[cfg(any(feature = "async-mio", doc))]
mod async_mio;
#[cfg(any(feature = "async-mio", doc))]
pub use async_mio::AsyncClient;

@ -8,7 +8,7 @@
// modified, or distributed except according to those terms. // modified, or distributed except according to those terms.
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{self, Read, Write}; use std::io::{Read, Write};
use crate::{ use crate::{
client::{Client, Request, Response, Source}, client::{Client, Request, Response, Source},
@ -31,15 +31,18 @@ mod mio {
const INITIAL_REQUEST_QUEUE_CAPACITY: usize = 4; const INITIAL_REQUEST_QUEUE_CAPACITY: usize = 4;
/// Asynchronous client based on `mio`. /// Client with a queue of requests.
/// ///
/// The client can be used with crates like [popol](https://crates.io/crates/popol) or
/// with [mio](https://crates.io/crates/mio) if feature `async-mio` is enabled.
/// ///
pub struct AsyncClient<S: Read + Write + Source> { /// When the output is ready, a next event can be sent.
pub struct QueuedClient<S: Read + Write + Source> {
client: Client<S>, client: Client<S>,
requests: VecDeque<Request>, requests: VecDeque<Request>,
} }
impl<S: Read + Write + Source> AsyncClient<S> { impl<S: Read + Write + Source> QueuedClient<S> {
/// New asynchronous client build on top of a synchronous client. /// New asynchronous client build on top of a synchronous client.
pub fn new(client: Client<S>) -> Self { pub fn new(client: Client<S>) -> Self {
Self { Self {
@ -48,13 +51,26 @@ impl<S: Read + Write + Source> AsyncClient<S> {
} }
} }
#[cfg(all(not(feature = "async-mio"), unix))]
/// Input source.
pub fn input_source(&self) -> &S {
self.client.input_source()
}
#[cfg(all(not(feature = "async-mio"), unix))]
/// Output source.
pub fn output_source(&self) -> &S {
self.client.output_source()
}
#[cfg(any(feature = "async-mio", doc))]
/// Register client /// Register client
pub fn register( pub fn register(
&mut self, &mut self,
poll: &mio::Poll, poll: &mio::Poll,
input_token: mio::Token, input_token: mio::Token,
output_token: mio::Token, output_token: mio::Token,
) -> io::Result<()> { ) -> std::io::Result<()> {
self.client.register(poll, input_token, output_token) self.client.register(poll, input_token, output_token)
} }
@ -82,11 +98,13 @@ impl<S: Read + Write + Source> AsyncClient<S> {
/// ///
/// Instance of `mio::Poll` generates a writable event only once until the socket returns `WouldBlock`. /// Instance of `mio::Poll` generates a writable event only once until the socket returns `WouldBlock`.
/// This error is mapped to `ClientError::NotReady`. /// This error is mapped to `ClientError::NotReady`.
pub fn send_next(&mut self) -> ClientResult<()> { pub fn send_next(&mut self) -> ClientResult<bool> {
if let Some(request) = self.requests.pop_front() { if let Some(request) = self.requests.pop_front() {
self.client.send(request)?; self.client.send(request)?;
Ok(true)
} else {
Ok(false)
} }
Ok(())
} }
/// Receive one response. /// Receive one response.

@ -94,7 +94,7 @@ fn basic_async_communication() -> ClientResult<()> {
let handle = Server::run(&socket_path, &COMMUNICATION); let handle = Server::run(&socket_path, &COMMUNICATION);
let mut poll = Poll::new()?; let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128); let mut events = Events::with_capacity(128);
let mut client = AsyncClient::new(fifo::Builder::new().path(&socket_path).build()?); let mut client = QueuedClient::new(fifo::Builder::new().path(&socket_path).build()?);
let input_token = Token(0); let input_token = Token(0);
let output_token = Token(1); let output_token = Token(1);
let timeout = Duration::new(0, 500 * 1000 * 1000 /* 500 ms */); let timeout = Duration::new(0, 500 * 1000 * 1000 /* 500 ms */);
@ -128,9 +128,9 @@ fn basic_async_communication() -> ClientResult<()> {
} }
if state.must_send() { if state.must_send() {
match client.send_next() { match client.send_next() {
Ok(()) => (), Ok(_) => (),
Err(ClientError::NotReady) => state.writable = false, Err(ClientError::NotReady) => state.writable = false,
err => return err, Err(err) => return Err(err),
} }
} }
} }

Loading…
Cancel
Save