diff --git a/Cargo.toml b/Cargo.toml index 454d4be..95d9344 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://gitlab.com/lp-accessibility/ssip-client" [dependencies] dirs = "4" -mio = { version = "0", optional = true } +mio = { version = "0.8", optional = true } thiserror = "1" strum = "0.24" strum_macros = "0.24" @@ -20,4 +20,4 @@ strum_macros = "0.24" libc = "0" [features] -metal-io = ["mio/net", "mio/os-poll"] +async-mio = ["mio/net", "mio/os-poll"] diff --git a/README.md b/README.md index 5b0bf1e..b928714 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ Rust SSIP Client Speech Dispatcher [SSIP client library](http://htmlpreview.github.io/?https://github.com/brailcom/speechd/blob/master/doc/ssip.html) in pure rust. +The API is synchronous by default. An asynchronous API based on [Mio](https://github.com/tokio-rs/mio) is available with a feature. + - [x] Unix socket. - [ ] TCP socket. - [x] Stop, cancel, pause and resume. @@ -16,16 +18,31 @@ Speech Dispatcher [SSIP client library](http://htmlpreview.github.io/?https://gi - [x] Notifications. - [ ] Message history. +Getting Started +--------------- + +To use the synchronous API, use: + +```toml +[dependencies] +ssip-client = "0.3" +``` + +For the asynchronous API, use: +```toml +[dependencies] +ssip-client = { version = "0.3", features = ["async-mio"] } +``` + Example ------- ```rust -use ssip_client::{new_default_fifo_client, ClientName, OK_CLIENT_NAME_SET}; - -let mut client = new_default_fifo_client(None)?; +use ssip_client::{FifoBuilder, ClientName}; +let mut client = FifoBuilder::new().build()?; client - .open(ClientName::new("joe", "hello"))? - .check_status(OK_CLIENT_NAME_SET)?; + .set_client_name(ClientName::new("joe", "hello"))? + .check_client_name_set()?; let msg_id = client.speak()?.send_line("hello")?.receive_message_id()?; client.quit()?; ``` diff --git a/examples/hello.rs b/examples/hello.rs index b73ae8e..25151e1 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -4,7 +4,7 @@ use ssip_client::{ClientName, ClientResult, FifoBuilder}; // Synchronous implementation // ============================== -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn main() -> ClientResult<()> { let mut client = FifoBuilder::new().build()?; client @@ -16,6 +16,8 @@ fn main() -> ClientResult<()> { .send_line("hello")? .receive_message_id()?; println!("message: {}", msg_id); + let volume = client.get_volume()?.receive_u8()?; + println!("volume: {}", volume); client.quit()?; Ok(()) } @@ -24,66 +26,155 @@ fn main() -> ClientResult<()> { // Asynchronous implementation // ============================== -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] use mio::{Events, Poll, Token}; -#[cfg(feature = "metal-io")] -use ssip_client::ClientError; +#[cfg(feature = "async-mio")] +mod control { -#[cfg(feature = "metal-io")] -fn increment(result: ClientResult) -> ClientResult { - match result { - Ok(_) => Ok(1), - Err(ClientError::NotReady) => Ok(0), - Err(err) => Err(err), + use ssip_client::{ClientError, ClientResult}; + + /// Controller to follow the sequence of actions and keep the socket state. + pub struct Controller { + step: u16, + done: bool, + writable: bool, + next_is_read: bool, } -} -#[cfg(feature = "metal-io")] -fn get_value(result: ClientResult) -> ClientResult> { - match result { - Ok(value) => Ok(Some(value)), - Err(ClientError::NotReady) => Ok(None), - Err(err) => Err(err), + impl Controller { + pub fn new() -> Controller { + Controller { + step: 0, + done: false, + writable: false, + next_is_read: false, + } + } + + /// Current step to execute. + pub fn step(&self) -> u16 { + self.step + } + + /// Return true when done. + pub fn done(&self) -> bool { + self.done + } + + /// If the next action is a read or the socket is not writable. + pub fn must_poll(&self) -> bool { + self.next_is_read || !self.writable + } + + /// Record that the socket is writable. + pub fn set_writable(&mut self) { + self.writable = true; + } + + /// Stop. + pub fn stop(&mut self) { + self.done = true; + } + + /// Interpret the result of the action and move to the next step if necessary. + /// + /// When the socket is set to writable, no other writable event will be generated until + /// the I/O returns error WouldBlock which is mapped to client error NotReady. + pub fn next(&mut self, next_is_read: bool, result: ClientResult) -> ClientResult<()> { + match result { + Ok(_) => { + self.step += 1; + self.next_is_read = next_is_read; + Ok(()) + } + Err(ClientError::NotReady) => { + if !self.next_is_read { + // let's wait for the socket to become writable + self.writable = false; + } + Ok(()) + } + Err(err) => Err(err), + } + } + + /// Return the value returned by last read and move to next step. + pub fn get_value(&mut self, result: ClientResult) -> ClientResult> { + match result { + Ok(value) => { + self.step += 1; + self.next_is_read = false; + Ok(Some(value)) + } + Err(ClientError::NotReady) => Ok(None), + Err(err) => Err(err), + } + } } } -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] fn main() -> ClientResult<()> { + enum Action { + None, + Read, + Write, + } + + // Create the poll object, the client and register the socket. let mut poll = Poll::new()?; - let mut events = Events::with_capacity(128); + let mut events = Events::with_capacity(16); let mut client = FifoBuilder::new().build()?; - let token = Token(0); - client.register(&poll, token)?; - let mut step: u16 = 0; - while step < 7 { - poll.poll(&mut events, None)?; - for event in &events { - if event.token() == token { - if event.is_writable() { - match step { - 0 => { - step += - increment(client.set_client_name(ClientName::new("test", "test")))? - } - 2 => step += increment(client.speak())?, - 4 => step += increment(client.send_line("hello"))?, - 6 => step += increment(client.quit())?, - _ => (), + let input_token = Token(0); + let output_token = Token(1); + client.register(&poll, input_token, output_token)?; + + let mut ctl = control::Controller::new(); + while !ctl.done() { + if ctl.must_poll() { + // Poll is only necessary to read or if the last write failed. + poll.poll(&mut events, None)?; + } + let mut event_iter = events.iter(); + loop { + let event = event_iter.next(); + let action = match event { + Some(event) if event.token() == output_token && event.is_writable() => { + ctl.set_writable(); + Action::Write + } + Some(event) if event.token() == input_token && event.is_readable() => Action::Read, + Some(_) => panic!("unexpected event"), + None if ctl.must_poll() => Action::None, + None => Action::Write, // Next action is write and socket is writable + }; + match action { + Action::Write => match ctl.step() { + 0 => ctl.next( + true, + client.set_client_name(ClientName::new("test", "test")), + )?, + 2 => ctl.next(true, client.speak())?, + 4 => ctl.next(true, client.send_line("hello"))?, + 6 => { + ctl.next(true, client.quit())?; + ctl.stop(); + break; } - } else if event.is_readable() { - match step { - 1 => step += increment(client.check_client_name_set())?, - 3 => step += increment(client.check_receiving_data())?, - 5 => { - if let Some(msgid) = get_value(client.receive_message_id())? { - println!("Message identifier: {}", msgid); - step += 1; - } + _ => (), + }, + Action::Read => match ctl.step() { + 1 => ctl.next(false, client.check_client_name_set())?, + 3 => ctl.next(false, client.check_receiving_data())?, + 5 => { + if let Some(msgid) = ctl.get_value(client.receive_message_id())? { + println!("Message identifier: {}", msgid); } - _ => (), } - } + _ => (), + }, + Action::None => break, } } } diff --git a/examples/list.rs b/examples/list.rs index a30c10b..9222720 100644 --- a/examples/list.rs +++ b/examples/list.rs @@ -1,10 +1,10 @@ -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] use ssip_client::{ ClientName, ClientResult, FifoBuilder, SynthesisVoice, OK_OUTPUT_MODULES_LIST_SENT, OK_VOICES_LIST_SENT, }; -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn main() -> ClientResult<()> { fn voice_to_string(voice: &SynthesisVoice) -> String { match &voice.language { @@ -54,7 +54,7 @@ fn main() -> ClientResult<()> { Ok(()) } -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] fn main() { println!("asynchronous client not implemented"); } diff --git a/examples/notifications.rs b/examples/notifications.rs index 9c9e5f4..aee7068 100644 --- a/examples/notifications.rs +++ b/examples/notifications.rs @@ -1,16 +1,27 @@ -#[cfg(not(feature = "metal-io"))] -use ssip_client::{ClientName, ClientResult, EventType, FifoBuilder, NotificationType}; +#[cfg(not(feature = "async-mio"))] +use ssip_client::{ + ClientName, ClientResult, EventType, FifoBuilder, NotificationType, OK_NOTIFICATION_SET, +}; -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn main() -> ClientResult<()> { let mut client = FifoBuilder::new().build()?; client .set_client_name(ClientName::new("joe", "notifications"))? .check_client_name_set()?; - client.enable_notification(NotificationType::All).unwrap(); - let msg_id = client.speak()?.send_line("hello")?.receive_message_id()?; - println!("message: {}", msg_id); + // Enabling notifications + client + .enable_notification(NotificationType::All)? + .check_status(OK_NOTIFICATION_SET)?; + // Sending message + let msg_id = client + .speak()? + .check_receiving_data()? + .send_line("hello")? + .receive_message_id()?; + println!("message identifier: {}", msg_id); loop { + // Waiting for event match client.receive_event() { Ok(event) => { println!( @@ -27,11 +38,12 @@ fn main() -> ClientResult<()> { } } } + println!("exiting..."); client.quit()?; Ok(()) } -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] fn main() { println!("asynchronous client not implemented"); } diff --git a/src/client.rs b/src/client.rs index 93d1621..5beb1db 100644 --- a/src/client.rs +++ b/src/client.rs @@ -213,7 +213,8 @@ impl Client { /// Send a line pub fn send_line(&mut self, line: &str) -> ClientResult<&mut Client> { const END_OF_DATA: &str = "."; - self.send_lines(&[line, END_OF_DATA]) + send_lines(&mut self.output, &[line, END_OF_DATA])?; + Ok(self) } /// Send a char @@ -470,8 +471,8 @@ impl Client { } /// Receive integer - pub fn receive_u8(&mut self, expected_code: ReturnCode) -> ClientResult { - self.receive_string(expected_code) + pub fn receive_u8(&mut self) -> ClientResult { + self.receive_string(OK_GET) .and_then(|s| s.parse().map_err(|_| ClientError::InvalidType)) } @@ -533,12 +534,17 @@ impl Client { } /// Register the socket for polling. - #[cfg(feature = "metal-io")] - pub fn register(&mut self, poll: &mio::Poll, token: mio::Token) -> ClientResult<()> { + #[cfg(feature = "async-mio")] + pub fn register( + &mut self, + poll: &mio::Poll, + input_token: mio::Token, + output_token: mio::Token, + ) -> ClientResult<()> { poll.registry() - .register(self.output.get_mut(), token, mio::Interest::WRITABLE)?; + .register(self.input.get_mut(), input_token, mio::Interest::READABLE)?; poll.registry() - .register(self.input.get_mut(), token, mio::Interest::READABLE)?; + .register(self.output.get_mut(), output_token, mio::Interest::WRITABLE)?; Ok(()) } } @@ -546,10 +552,10 @@ impl Client { #[cfg(test)] mod tests { - #[cfg(not(feature = "metal-io"))] + #[cfg(not(feature = "async-mio"))] use std::net::TcpStream; - #[cfg(feature = "metal-io")] + #[cfg(feature = "async-mio")] use mio::net::TcpStream; use super::{Client, ClientError}; diff --git a/src/fifo.rs b/src/fifo.rs index 51a02f3..9b62fa1 100644 --- a/src/fifo.rs +++ b/src/fifo.rs @@ -50,7 +50,7 @@ impl FifoPath { } } -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] mod synchronous { use std::io::{self, BufReader, BufWriter}; pub use std::os::unix::net::UnixStream; @@ -96,10 +96,10 @@ mod synchronous { } } -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] pub use synchronous::{FifoBuilder, UnixStream}; -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] mod asynchronous { pub use mio::net::UnixStream; use std::io::{self, BufReader, BufWriter}; @@ -146,7 +146,7 @@ mod asynchronous { } } -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] pub use asynchronous::{FifoBuilder, UnixStream}; #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index 214fb09..936d583 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ //! //! Example //! ```no_run -//! use ssip_client::{FifoBuilder, ClientName, OK_CLIENT_NAME_SET}; +//! use ssip_client::{FifoBuilder, ClientName}; //! let mut client = FifoBuilder::new().build()?; //! client //! .set_client_name(ClientName::new("joe", "hello"))? diff --git a/src/protocol.rs b/src/protocol.rs index 306e268..b6e42ea 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -21,14 +21,17 @@ macro_rules! invalid_input { }; } +/// Write lines separated by CRLF. pub(crate) fn write_lines(output: &mut dyn Write, lines: &[&str]) -> ClientResult<()> { for line in lines.iter() { + // Uncomment to debug: dbg!(line.to_owned()); output.write_all(line.as_bytes())?; output.write_all(b"\r\n")?; } Ok(()) } +/// Write lines separated by CRLF and flush the output. pub(crate) fn send_lines(output: &mut dyn Write, lines: &[&str]) -> ClientResult<()> { write_lines(output, lines)?; output.flush()?; @@ -53,6 +56,7 @@ fn parse_status_line(code: u16, line: &str) -> ClientStatus { } } +/// Read lines from server until a status line is found. pub(crate) fn receive_answer( input: &mut dyn BufRead, mut lines: Option<&mut Vec>, @@ -60,6 +64,7 @@ pub(crate) fn receive_answer( loop { let mut line = String::new(); input.read_line(&mut line).map_err(ClientError::Io)?; + // Uncomment to debug: dbg!(line.to_owned()); match line.chars().nth(3) { Some(ch) => match ch { ' ' => match line[0..3].parse::() { diff --git a/src/types.rs b/src/types.rs index d3a4237..344f454 100644 --- a/src/types.rs +++ b/src/types.rs @@ -361,10 +361,10 @@ impl fmt::Display for StatusLine { } } -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] pub use std::fmt::Debug as Source; // Trick to have common implementation for sync and async. -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] pub use mio::event::Source; #[cfg(test)] diff --git a/tests/fifo_async_tests.rs b/tests/fifo_async_tests.rs index c981a70..a4e5949 100644 --- a/tests/fifo_async_tests.rs +++ b/tests/fifo_async_tests.rs @@ -6,18 +6,18 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] use mio::{Events, Poll, Token}; -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] use ssip_client::*; -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] mod server; -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] use server::Server; -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] mod utils { use ssip_client::*; @@ -58,11 +58,11 @@ mod utils { } } -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] use utils::Controler; #[test] -#[cfg(feature = "metal-io")] +#[cfg(feature = "async-mio")] fn basic_async_communication() -> std::io::Result<()> { const COMMUNICATION: [(&str, &str); 1] = [( "SET self CLIENT_NAME test:test:main\r\n", @@ -77,30 +77,21 @@ fn basic_async_communication() -> std::io::Result<()> { let mut poll = Poll::new()?; let mut events = Events::with_capacity(128); let mut client = FifoBuilder::new().path(&server_path).build().unwrap(); - let token = Token(0); - client.register(&poll, token).unwrap(); + let input_token = Token(0); + let output_token = Token(1); + client.register(&poll, input_token, output_token).unwrap(); let mut controler = Controler::new(); - use std::io::Write; - let mut log_file = std::fs::File::create("/home/laurent/tmp/test_client.log")?; while controler.step() < 2 { poll.poll(&mut events, None)?; - log_file.write_all(format!("Step: {}\n", controler.step()).as_bytes())?; for event in &events { - if event.token() == token { - if event.is_writable() { - log_file.write_all(b"Event writable\n")?; - if controler.step() == 0 { - log_file.write_all(b"Send: set client name\n")?; - controler.check_result( - client.set_client_name(ClientName::new("test", "test")), - ); - } - } else if event.is_readable() { - log_file.write_all(b"Event readable\n")?; - if controler.step() == 1 { - log_file.write_all(b"Receive: client name set\n")?; - controler.check_result(client.check_client_name_set()); - } + if event.token() == output_token && event.is_writable() { + if controler.step() == 0 { + controler + .check_result(client.set_client_name(ClientName::new("test", "test"))); + } + } else if event.token() == input_token && event.is_readable() { + if controler.step() == 1 { + controler.check_result(client.check_client_name_set()); } } } diff --git a/tests/fifo_sync_tests.rs b/tests/fifo_sync_tests.rs index 95f5050..6b3fada 100644 --- a/tests/fifo_sync_tests.rs +++ b/tests/fifo_sync_tests.rs @@ -6,21 +6,21 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] use ssip_client::*; -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] use std::{io, os::unix::net::UnixStream}; -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] mod server; -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] use server::Server; /// Create a server and run the client /// /// The communication is an array of (["question", ...], "response") -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn test_client( communication: &'static [(&'static str, &'static str)], process: F, @@ -51,14 +51,14 @@ where Ok(()) } -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] const SET_CLIENT_COMMUNICATION: (&str, &str) = ( "SET self CLIENT_NAME test:test:main\r\n", "208 OK CLIENT NAME SET\r\n", ); #[test] -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn connect_and_quit() -> io::Result<()> { test_client( &[ @@ -73,7 +73,7 @@ fn connect_and_quit() -> io::Result<()> { } #[test] -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn say_one_line() -> io::Result<()> { test_client( &[ @@ -105,7 +105,7 @@ fn say_one_line() -> io::Result<()> { macro_rules! test_setter { ($setter:ident, $question:expr, $answer:expr, $code:expr, $($arg:tt)*) => { #[test] - #[cfg(not(feature = "metal-io"))] + #[cfg(not(feature = "async-mio"))] fn $setter() -> io::Result<()> { test_client( &[SET_CLIENT_COMMUNICATION, ($question, $answer)], @@ -119,14 +119,14 @@ macro_rules! test_setter { } macro_rules! test_getter { - ($getter:ident, $receive:ident, $question:expr, $answer:expr, $value:expr) => { + ($getter:ident, $receive:ident, $arg:tt, $question:expr, $answer:expr, $value:expr) => { #[test] - #[cfg(not(feature = "metal-io"))] + #[cfg(not(feature = "async-mio"))] fn $getter() -> io::Result<()> { test_client( &[SET_CLIENT_COMMUNICATION, ($question, $answer)], |client| { - let value = client.$getter().unwrap().$receive(251).unwrap(); + let value = client.$getter().unwrap().$receive $arg.unwrap(); assert_eq!($value, value); Ok(()) }, @@ -134,14 +134,14 @@ macro_rules! test_getter { } }; ($getter:ident, $question:expr, $answer:expr, $value:expr) => { - test_getter!($getter, receive_string, $question, $answer, $value); + test_getter!($getter, receive_string, (OK_GET), $question, $answer, $value); }; } macro_rules! test_list { ($getter:ident, $question:expr, $answer:expr, $code:expr, $values:expr) => { #[test] - #[cfg(not(feature = "metal-io"))] + #[cfg(not(feature = "async-mio"))] fn $getter() -> io::Result<()> { test_client( &[SET_CLIENT_COMMUNICATION, ($question, $answer)], @@ -164,7 +164,7 @@ test_setter!( ); #[test] -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn set_debug() -> io::Result<()> { test_client( &[ @@ -238,6 +238,7 @@ test_setter!( test_getter!( get_rate, receive_u8, + (), "GET RATE\r\n", "251-0\r\n251 OK GET RETURNED\r\n", 0 @@ -255,6 +256,7 @@ test_setter!( test_getter!( get_volume, receive_u8, + (), "GET VOLUME\r\n", "251-100\r\n251 OK GET RETURNED\r\n", 100 @@ -263,6 +265,7 @@ test_getter!( test_getter!( get_pitch, receive_u8, + (), "GET PITCH\r\n", "251-0\r\n251 OK GET RETURNED\r\n", 0 @@ -337,7 +340,7 @@ test_list!( ); #[test] -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn list_synthesis_voices() -> io::Result<()> { test_client( &[ @@ -363,7 +366,7 @@ fn list_synthesis_voices() -> io::Result<()> { } #[test] -#[cfg(not(feature = "metal-io"))] +#[cfg(not(feature = "async-mio"))] fn receive_notification() -> io::Result<()> { test_client( &[ diff --git a/tests/server.rs b/tests/server.rs index b22e310..788175c 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -49,16 +49,10 @@ impl Server { let (stream, _) = self.listener.accept()?; let mut input = BufReader::new(stream.try_clone()?); let mut output = BufWriter::new(stream); - let mut log_file = std::fs::File::create("/home/laurent/tmp/test_server.log")?; for (questions, answer) in self.communication.iter() { - log_file.write_all(format!("Next answer: {}", answer).as_bytes())?; for question in Server::split_lines(questions).iter() { - log_file.write_all(format!("Expecting: {}", question).as_bytes())?; - log_file.flush()?; let mut line = String::new(); input.read_line(&mut line)?; - log_file.write_all(format!("Read: {}", line).as_bytes())?; - log_file.flush()?; if line != *question { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -66,12 +60,8 @@ impl Server { )); } } - log_file.write_all(format!("Write: {}", answer).as_bytes())?; - log_file.flush()?; output.write_all(answer.as_bytes())?; output.flush()?; - log_file.write_all(b"Flushed\r\n")?; - log_file.flush()?; } Ok(()) }