Dual synchronous and asynchronous API.

* Calls have been separate in a "send" and a "receive" to have the same methods in both modes.
* Asynchrone API is enabled with feature "async-mio".
main
Laurent Pelecq 2 years ago
parent 58b1d5513f
commit a186c456db

@ -11,7 +11,7 @@ repository = "https://gitlab.com/lp-accessibility/ssip-client"
[dependencies]
dirs = "4"
mio = { version = "0", optional = true }
mio = { version = "0.8", optional = true }
thiserror = "1"
strum = "0.24"
strum_macros = "0.24"
@ -20,4 +20,4 @@ strum_macros = "0.24"
libc = "0"
[features]
metal-io = ["mio/net", "mio/os-poll"]
async-mio = ["mio/net", "mio/os-poll"]

@ -8,6 +8,8 @@ Rust SSIP Client
Speech Dispatcher [SSIP client library](http://htmlpreview.github.io/?https://github.com/brailcom/speechd/blob/master/doc/ssip.html) in pure rust.
The API is synchronous by default. An asynchronous API based on [Mio](https://github.com/tokio-rs/mio) is available with a feature.
- [x] Unix socket.
- [ ] TCP socket.
- [x] Stop, cancel, pause and resume.
@ -16,16 +18,31 @@ Speech Dispatcher [SSIP client library](http://htmlpreview.github.io/?https://gi
- [x] Notifications.
- [ ] Message history.
Getting Started
---------------
To use the synchronous API, use:
```toml
[dependencies]
ssip-client = "0.3"
```
For the asynchronous API, use:
```toml
[dependencies]
ssip-client = { version = "0.3", features = ["async-mio"] }
```
Example
-------
```rust
use ssip_client::{new_default_fifo_client, ClientName, OK_CLIENT_NAME_SET};
let mut client = new_default_fifo_client(None)?;
use ssip_client::{FifoBuilder, ClientName};
let mut client = FifoBuilder::new().build()?;
client
.open(ClientName::new("joe", "hello"))?
.check_status(OK_CLIENT_NAME_SET)?;
.set_client_name(ClientName::new("joe", "hello"))?
.check_client_name_set()?;
let msg_id = client.speak()?.send_line("hello")?.receive_message_id()?;
client.quit()?;
```

@ -4,7 +4,7 @@ use ssip_client::{ClientName, ClientResult, FifoBuilder};
// Synchronous implementation
// ==============================
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn main() -> ClientResult<()> {
let mut client = FifoBuilder::new().build()?;
client
@ -16,6 +16,8 @@ fn main() -> ClientResult<()> {
.send_line("hello")?
.receive_message_id()?;
println!("message: {}", msg_id);
let volume = client.get_volume()?.receive_u8()?;
println!("volume: {}", volume);
client.quit()?;
Ok(())
}
@ -24,66 +26,155 @@ fn main() -> ClientResult<()> {
// Asynchronous implementation
// ==============================
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
use mio::{Events, Poll, Token};
#[cfg(feature = "metal-io")]
use ssip_client::ClientError;
#[cfg(feature = "async-mio")]
mod control {
#[cfg(feature = "metal-io")]
fn increment<V>(result: ClientResult<V>) -> ClientResult<u16> {
match result {
Ok(_) => Ok(1),
Err(ClientError::NotReady) => Ok(0),
Err(err) => Err(err),
use ssip_client::{ClientError, ClientResult};
/// Controller to follow the sequence of actions and keep the socket state.
pub struct Controller {
step: u16,
done: bool,
writable: bool,
next_is_read: bool,
}
}
#[cfg(feature = "metal-io")]
fn get_value<V>(result: ClientResult<V>) -> ClientResult<Option<V>> {
match result {
Ok(value) => Ok(Some(value)),
Err(ClientError::NotReady) => Ok(None),
Err(err) => Err(err),
impl Controller {
pub fn new() -> Controller {
Controller {
step: 0,
done: false,
writable: false,
next_is_read: false,
}
}
/// Current step to execute.
pub fn step(&self) -> u16 {
self.step
}
/// Return true when done.
pub fn done(&self) -> bool {
self.done
}
/// If the next action is a read or the socket is not writable.
pub fn must_poll(&self) -> bool {
self.next_is_read || !self.writable
}
/// Record that the socket is writable.
pub fn set_writable(&mut self) {
self.writable = true;
}
/// Stop.
pub fn stop(&mut self) {
self.done = true;
}
/// Interpret the result of the action and move to the next step if necessary.
///
/// When the socket is set to writable, no other writable event will be generated until
/// the I/O returns error WouldBlock which is mapped to client error NotReady.
pub fn next<V>(&mut self, next_is_read: bool, result: ClientResult<V>) -> ClientResult<()> {
match result {
Ok(_) => {
self.step += 1;
self.next_is_read = next_is_read;
Ok(())
}
Err(ClientError::NotReady) => {
if !self.next_is_read {
// let's wait for the socket to become writable
self.writable = false;
}
Ok(())
}
Err(err) => Err(err),
}
}
/// Return the value returned by last read and move to next step.
pub fn get_value<V>(&mut self, result: ClientResult<V>) -> ClientResult<Option<V>> {
match result {
Ok(value) => {
self.step += 1;
self.next_is_read = false;
Ok(Some(value))
}
Err(ClientError::NotReady) => Ok(None),
Err(err) => Err(err),
}
}
}
}
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
fn main() -> ClientResult<()> {
enum Action {
None,
Read,
Write,
}
// Create the poll object, the client and register the socket.
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
let mut events = Events::with_capacity(16);
let mut client = FifoBuilder::new().build()?;
let token = Token(0);
client.register(&poll, token)?;
let mut step: u16 = 0;
while step < 7 {
poll.poll(&mut events, None)?;
for event in &events {
if event.token() == token {
if event.is_writable() {
match step {
0 => {
step +=
increment(client.set_client_name(ClientName::new("test", "test")))?
}
2 => step += increment(client.speak())?,
4 => step += increment(client.send_line("hello"))?,
6 => step += increment(client.quit())?,
_ => (),
let input_token = Token(0);
let output_token = Token(1);
client.register(&poll, input_token, output_token)?;
let mut ctl = control::Controller::new();
while !ctl.done() {
if ctl.must_poll() {
// Poll is only necessary to read or if the last write failed.
poll.poll(&mut events, None)?;
}
let mut event_iter = events.iter();
loop {
let event = event_iter.next();
let action = match event {
Some(event) if event.token() == output_token && event.is_writable() => {
ctl.set_writable();
Action::Write
}
Some(event) if event.token() == input_token && event.is_readable() => Action::Read,
Some(_) => panic!("unexpected event"),
None if ctl.must_poll() => Action::None,
None => Action::Write, // Next action is write and socket is writable
};
match action {
Action::Write => match ctl.step() {
0 => ctl.next(
true,
client.set_client_name(ClientName::new("test", "test")),
)?,
2 => ctl.next(true, client.speak())?,
4 => ctl.next(true, client.send_line("hello"))?,
6 => {
ctl.next(true, client.quit())?;
ctl.stop();
break;
}
} else if event.is_readable() {
match step {
1 => step += increment(client.check_client_name_set())?,
3 => step += increment(client.check_receiving_data())?,
5 => {
if let Some(msgid) = get_value(client.receive_message_id())? {
println!("Message identifier: {}", msgid);
step += 1;
}
_ => (),
},
Action::Read => match ctl.step() {
1 => ctl.next(false, client.check_client_name_set())?,
3 => ctl.next(false, client.check_receiving_data())?,
5 => {
if let Some(msgid) = ctl.get_value(client.receive_message_id())? {
println!("Message identifier: {}", msgid);
}
_ => (),
}
}
_ => (),
},
Action::None => break,
}
}
}

@ -1,10 +1,10 @@
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
use ssip_client::{
ClientName, ClientResult, FifoBuilder, SynthesisVoice, OK_OUTPUT_MODULES_LIST_SENT,
OK_VOICES_LIST_SENT,
};
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn main() -> ClientResult<()> {
fn voice_to_string(voice: &SynthesisVoice) -> String {
match &voice.language {
@ -54,7 +54,7 @@ fn main() -> ClientResult<()> {
Ok(())
}
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
fn main() {
println!("asynchronous client not implemented");
}

@ -1,16 +1,27 @@
#[cfg(not(feature = "metal-io"))]
use ssip_client::{ClientName, ClientResult, EventType, FifoBuilder, NotificationType};
#[cfg(not(feature = "async-mio"))]
use ssip_client::{
ClientName, ClientResult, EventType, FifoBuilder, NotificationType, OK_NOTIFICATION_SET,
};
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn main() -> ClientResult<()> {
let mut client = FifoBuilder::new().build()?;
client
.set_client_name(ClientName::new("joe", "notifications"))?
.check_client_name_set()?;
client.enable_notification(NotificationType::All).unwrap();
let msg_id = client.speak()?.send_line("hello")?.receive_message_id()?;
println!("message: {}", msg_id);
// Enabling notifications
client
.enable_notification(NotificationType::All)?
.check_status(OK_NOTIFICATION_SET)?;
// Sending message
let msg_id = client
.speak()?
.check_receiving_data()?
.send_line("hello")?
.receive_message_id()?;
println!("message identifier: {}", msg_id);
loop {
// Waiting for event
match client.receive_event() {
Ok(event) => {
println!(
@ -27,11 +38,12 @@ fn main() -> ClientResult<()> {
}
}
}
println!("exiting...");
client.quit()?;
Ok(())
}
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
fn main() {
println!("asynchronous client not implemented");
}

@ -213,7 +213,8 @@ impl<S: Read + Write + Source> Client<S> {
/// Send a line
pub fn send_line(&mut self, line: &str) -> ClientResult<&mut Client<S>> {
const END_OF_DATA: &str = ".";
self.send_lines(&[line, END_OF_DATA])
send_lines(&mut self.output, &[line, END_OF_DATA])?;
Ok(self)
}
/// Send a char
@ -470,8 +471,8 @@ impl<S: Read + Write + Source> Client<S> {
}
/// Receive integer
pub fn receive_u8(&mut self, expected_code: ReturnCode) -> ClientResult<u8> {
self.receive_string(expected_code)
pub fn receive_u8(&mut self) -> ClientResult<u8> {
self.receive_string(OK_GET)
.and_then(|s| s.parse().map_err(|_| ClientError::InvalidType))
}
@ -533,12 +534,17 @@ impl<S: Read + Write + Source> Client<S> {
}
/// Register the socket for polling.
#[cfg(feature = "metal-io")]
pub fn register(&mut self, poll: &mio::Poll, token: mio::Token) -> ClientResult<()> {
#[cfg(feature = "async-mio")]
pub fn register(
&mut self,
poll: &mio::Poll,
input_token: mio::Token,
output_token: mio::Token,
) -> ClientResult<()> {
poll.registry()
.register(self.output.get_mut(), token, mio::Interest::WRITABLE)?;
.register(self.input.get_mut(), input_token, mio::Interest::READABLE)?;
poll.registry()
.register(self.input.get_mut(), token, mio::Interest::READABLE)?;
.register(self.output.get_mut(), output_token, mio::Interest::WRITABLE)?;
Ok(())
}
}
@ -546,10 +552,10 @@ impl<S: Read + Write + Source> Client<S> {
#[cfg(test)]
mod tests {
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
use std::net::TcpStream;
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
use mio::net::TcpStream;
use super::{Client, ClientError};

@ -50,7 +50,7 @@ impl FifoPath {
}
}
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
mod synchronous {
use std::io::{self, BufReader, BufWriter};
pub use std::os::unix::net::UnixStream;
@ -96,10 +96,10 @@ mod synchronous {
}
}
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
pub use synchronous::{FifoBuilder, UnixStream};
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
mod asynchronous {
pub use mio::net::UnixStream;
use std::io::{self, BufReader, BufWriter};
@ -146,7 +146,7 @@ mod asynchronous {
}
}
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
pub use asynchronous::{FifoBuilder, UnixStream};
#[cfg(test)]

@ -16,7 +16,7 @@
//!
//! Example
//! ```no_run
//! use ssip_client::{FifoBuilder, ClientName, OK_CLIENT_NAME_SET};
//! use ssip_client::{FifoBuilder, ClientName};
//! let mut client = FifoBuilder::new().build()?;
//! client
//! .set_client_name(ClientName::new("joe", "hello"))?

@ -21,14 +21,17 @@ macro_rules! invalid_input {
};
}
/// Write lines separated by CRLF.
pub(crate) fn write_lines(output: &mut dyn Write, lines: &[&str]) -> ClientResult<()> {
for line in lines.iter() {
// Uncomment to debug: dbg!(line.to_owned());
output.write_all(line.as_bytes())?;
output.write_all(b"\r\n")?;
}
Ok(())
}
/// Write lines separated by CRLF and flush the output.
pub(crate) fn send_lines(output: &mut dyn Write, lines: &[&str]) -> ClientResult<()> {
write_lines(output, lines)?;
output.flush()?;
@ -53,6 +56,7 @@ fn parse_status_line(code: u16, line: &str) -> ClientStatus {
}
}
/// Read lines from server until a status line is found.
pub(crate) fn receive_answer(
input: &mut dyn BufRead,
mut lines: Option<&mut Vec<String>>,
@ -60,6 +64,7 @@ pub(crate) fn receive_answer(
loop {
let mut line = String::new();
input.read_line(&mut line).map_err(ClientError::Io)?;
// Uncomment to debug: dbg!(line.to_owned());
match line.chars().nth(3) {
Some(ch) => match ch {
' ' => match line[0..3].parse::<u16>() {

@ -361,10 +361,10 @@ impl fmt::Display for StatusLine {
}
}
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
pub use std::fmt::Debug as Source; // Trick to have common implementation for sync and async.
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
pub use mio::event::Source;
#[cfg(test)]

@ -6,18 +6,18 @@
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
use mio::{Events, Poll, Token};
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
use ssip_client::*;
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
mod server;
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
use server::Server;
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
mod utils {
use ssip_client::*;
@ -58,11 +58,11 @@ mod utils {
}
}
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
use utils::Controler;
#[test]
#[cfg(feature = "metal-io")]
#[cfg(feature = "async-mio")]
fn basic_async_communication() -> std::io::Result<()> {
const COMMUNICATION: [(&str, &str); 1] = [(
"SET self CLIENT_NAME test:test:main\r\n",
@ -77,30 +77,21 @@ fn basic_async_communication() -> std::io::Result<()> {
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
let mut client = FifoBuilder::new().path(&server_path).build().unwrap();
let token = Token(0);
client.register(&poll, token).unwrap();
let input_token = Token(0);
let output_token = Token(1);
client.register(&poll, input_token, output_token).unwrap();
let mut controler = Controler::new();
use std::io::Write;
let mut log_file = std::fs::File::create("/home/laurent/tmp/test_client.log")?;
while controler.step() < 2 {
poll.poll(&mut events, None)?;
log_file.write_all(format!("Step: {}\n", controler.step()).as_bytes())?;
for event in &events {
if event.token() == token {
if event.is_writable() {
log_file.write_all(b"Event writable\n")?;
if controler.step() == 0 {
log_file.write_all(b"Send: set client name\n")?;
controler.check_result(
client.set_client_name(ClientName::new("test", "test")),
);
}
} else if event.is_readable() {
log_file.write_all(b"Event readable\n")?;
if controler.step() == 1 {
log_file.write_all(b"Receive: client name set\n")?;
controler.check_result(client.check_client_name_set());
}
if event.token() == output_token && event.is_writable() {
if controler.step() == 0 {
controler
.check_result(client.set_client_name(ClientName::new("test", "test")));
}
} else if event.token() == input_token && event.is_readable() {
if controler.step() == 1 {
controler.check_result(client.check_client_name_set());
}
}
}

@ -6,21 +6,21 @@
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
use ssip_client::*;
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
use std::{io, os::unix::net::UnixStream};
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
mod server;
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
use server::Server;
/// Create a server and run the client
///
/// The communication is an array of (["question", ...], "response")
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn test_client<F>(
communication: &'static [(&'static str, &'static str)],
process: F,
@ -51,14 +51,14 @@ where
Ok(())
}
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
const SET_CLIENT_COMMUNICATION: (&str, &str) = (
"SET self CLIENT_NAME test:test:main\r\n",
"208 OK CLIENT NAME SET\r\n",
);
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn connect_and_quit() -> io::Result<()> {
test_client(
&[
@ -73,7 +73,7 @@ fn connect_and_quit() -> io::Result<()> {
}
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn say_one_line() -> io::Result<()> {
test_client(
&[
@ -105,7 +105,7 @@ fn say_one_line() -> io::Result<()> {
macro_rules! test_setter {
($setter:ident, $question:expr, $answer:expr, $code:expr, $($arg:tt)*) => {
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn $setter() -> io::Result<()> {
test_client(
&[SET_CLIENT_COMMUNICATION, ($question, $answer)],
@ -119,14 +119,14 @@ macro_rules! test_setter {
}
macro_rules! test_getter {
($getter:ident, $receive:ident, $question:expr, $answer:expr, $value:expr) => {
($getter:ident, $receive:ident, $arg:tt, $question:expr, $answer:expr, $value:expr) => {
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn $getter() -> io::Result<()> {
test_client(
&[SET_CLIENT_COMMUNICATION, ($question, $answer)],
|client| {
let value = client.$getter().unwrap().$receive(251).unwrap();
let value = client.$getter().unwrap().$receive $arg.unwrap();
assert_eq!($value, value);
Ok(())
},
@ -134,14 +134,14 @@ macro_rules! test_getter {
}
};
($getter:ident, $question:expr, $answer:expr, $value:expr) => {
test_getter!($getter, receive_string, $question, $answer, $value);
test_getter!($getter, receive_string, (OK_GET), $question, $answer, $value);
};
}
macro_rules! test_list {
($getter:ident, $question:expr, $answer:expr, $code:expr, $values:expr) => {
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn $getter() -> io::Result<()> {
test_client(
&[SET_CLIENT_COMMUNICATION, ($question, $answer)],
@ -164,7 +164,7 @@ test_setter!(
);
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn set_debug() -> io::Result<()> {
test_client(
&[
@ -238,6 +238,7 @@ test_setter!(
test_getter!(
get_rate,
receive_u8,
(),
"GET RATE\r\n",
"251-0\r\n251 OK GET RETURNED\r\n",
0
@ -255,6 +256,7 @@ test_setter!(
test_getter!(
get_volume,
receive_u8,
(),
"GET VOLUME\r\n",
"251-100\r\n251 OK GET RETURNED\r\n",
100
@ -263,6 +265,7 @@ test_getter!(
test_getter!(
get_pitch,
receive_u8,
(),
"GET PITCH\r\n",
"251-0\r\n251 OK GET RETURNED\r\n",
0
@ -337,7 +340,7 @@ test_list!(
);
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn list_synthesis_voices() -> io::Result<()> {
test_client(
&[
@ -363,7 +366,7 @@ fn list_synthesis_voices() -> io::Result<()> {
}
#[test]
#[cfg(not(feature = "metal-io"))]
#[cfg(not(feature = "async-mio"))]
fn receive_notification() -> io::Result<()> {
test_client(
&[

@ -49,16 +49,10 @@ impl Server {
let (stream, _) = self.listener.accept()?;
let mut input = BufReader::new(stream.try_clone()?);
let mut output = BufWriter::new(stream);
let mut log_file = std::fs::File::create("/home/laurent/tmp/test_server.log")?;
for (questions, answer) in self.communication.iter() {
log_file.write_all(format!("Next answer: {}", answer).as_bytes())?;
for question in Server::split_lines(questions).iter() {
log_file.write_all(format!("Expecting: {}", question).as_bytes())?;
log_file.flush()?;
let mut line = String::new();
input.read_line(&mut line)?;
log_file.write_all(format!("Read: {}", line).as_bytes())?;
log_file.flush()?;
if line != *question {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
@ -66,12 +60,8 @@ impl Server {
));
}
}
log_file.write_all(format!("Write: {}", answer).as_bytes())?;
log_file.flush()?;
output.write_all(answer.as_bytes())?;
output.flush()?;
log_file.write_all(b"Flushed\r\n")?;
log_file.flush()?;
}
Ok(())
}

Loading…
Cancel
Save