Add tcp communication

main
Laurent Pelecq 2 years ago
parent 14e372e2e8
commit 1139ff7080

@ -1,6 +1,6 @@
[package]
name = "ssip-client"
version = "0.7.0"
version = "0.8.0"
authors = ["Laurent Pelecq <lpelecq+rust@circoise.eu>"]
edition = "2018"
description = "Client API for Speech Dispatcher"

@ -14,7 +14,7 @@ A non-blocking API can be used with a low-level polling mechanism based on `poll
with [mio](https://github.com/tokio-rs/mio).
- [x] Unix socket.
- [ ] TCP socket.
- [x] TCP socket.
- [x] Stop, cancel, pause and resume.
- [x] List, set voices.
- [x] Set rate, pitch, volume.
@ -28,13 +28,13 @@ To use the synchronous API or an asynchronous API compatible with low-level crat
```toml
[dependencies]
ssip-client = "0.7"
ssip-client = "0.8"
```
For the asynchronous API, use:
```toml
[dependencies]
ssip-client = { version = "0.7", features = ["async-mio"] }
ssip-client = { version = "0.8", features = ["async-mio"] }
```
Example

@ -18,10 +18,10 @@ use crate::types::*;
// Trick to have common implementation for std and mio streams..
#[cfg(all(not(feature = "async-mio"), unix))]
pub(crate) use std::os::unix::io::AsRawFd as Source;
pub use std::os::unix::io::AsRawFd as Source;
#[cfg(feature = "async-mio")]
pub(crate) use mio::event::Source;
pub use mio::event::Source;
/// Convert boolean to ON or OFF
fn on_off(value: bool) -> &'static str {

@ -1,5 +1,5 @@
// ssip-client -- Speech Dispatcher client in Rust
// Copyright (c) 2021 Laurent Pelecq
// Copyright (c) 2021-2022 Laurent Pelecq
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
@ -58,25 +58,20 @@ mod synchronous {
use std::time::Duration;
use crate::client::Client;
use crate::net::StreamMode;
use super::FifoPath;
enum FifoMode {
Blocking,
NonBlocking,
TimeOut(Duration),
}
pub struct Builder {
path: FifoPath,
mode: FifoMode,
mode: StreamMode,
}
impl Builder {
pub fn new() -> Self {
Self {
path: FifoPath::new(),
mode: FifoMode::Blocking,
mode: StreamMode::Blocking,
}
}
@ -89,21 +84,21 @@ mod synchronous {
}
pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self {
self.mode = FifoMode::TimeOut(read_timeout);
self.mode = StreamMode::TimeOut(read_timeout);
self
}
pub fn nonblocking(&mut self) -> &mut Self {
self.mode = FifoMode::NonBlocking;
self.mode = StreamMode::NonBlocking;
self
}
pub fn build(&self) -> io::Result<Client<UnixStream>> {
let input = UnixStream::connect(self.path.get()?)?;
match self.mode {
FifoMode::Blocking => input.set_nonblocking(false)?,
FifoMode::NonBlocking => input.set_nonblocking(true)?,
FifoMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?,
StreamMode::Blocking => input.set_nonblocking(false)?,
StreamMode::NonBlocking => input.set_nonblocking(true)?,
StreamMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?,
}
let output = input.try_clone()?;
Ok(Client::new(BufReader::new(input), BufWriter::new(output)))

@ -35,6 +35,8 @@ mod types;
pub mod client;
pub mod constants;
pub mod fifo;
pub mod net;
pub mod tcp;
#[cfg(any(not(feature = "async-mio"), doc))]
pub use client::Client;

@ -0,0 +1,18 @@
// ssip-client -- Speech Dispatcher client in Rust
// Copyright (c) 2022 Laurent Pelecq
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
#[cfg(not(feature = "async-mio"))]
pub(crate) enum StreamMode {
Blocking,
NonBlocking,
TimeOut(std::time::Duration),
}
#[cfg(test)]
mod tests {}

@ -0,0 +1,101 @@
// ssip-client -- Speech Dispatcher client in Rust
// Copyright (c) 2022 Laurent Pelecq
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
#[cfg(not(feature = "async-mio"))]
mod synchronous {
use std::io::{self, BufReader, BufWriter};
pub use std::net::TcpStream;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use std::vec;
use crate::client::Client;
use crate::net::StreamMode;
struct Addresses(Vec<SocketAddr>);
impl ToSocketAddrs for Addresses {
type Iter = vec::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> io::Result<Self::Iter> {
Ok(self.0.clone().into_iter())
}
}
pub struct Builder {
addrs: Addresses,
mode: StreamMode,
}
impl Builder {
pub fn new<A: ToSocketAddrs>(addrs: A) -> io::Result<Self> {
Ok(Self {
addrs: Addresses(addrs.to_socket_addrs()?.collect::<Vec<SocketAddr>>()),
mode: StreamMode::Blocking,
})
}
pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self {
self.mode = StreamMode::TimeOut(read_timeout);
self
}
pub fn nonblocking(&mut self) -> &mut Self {
self.mode = StreamMode::NonBlocking;
self
}
pub fn build(&self) -> io::Result<Client<TcpStream>> {
let input = TcpStream::connect(&self.addrs)?;
match self.mode {
StreamMode::Blocking => input.set_nonblocking(false)?,
StreamMode::NonBlocking => input.set_nonblocking(true)?,
StreamMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?,
}
let output = input.try_clone()?;
Ok(Client::new(BufReader::new(input), BufWriter::new(output)))
}
}
}
#[cfg(not(feature = "async-mio"))]
pub use synchronous::{Builder, TcpStream};
#[cfg(feature = "async-mio")]
mod asynchronous {
pub use mio::net::TcpStream;
use std::io::{self, BufReader, BufWriter};
use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
use crate::client::Client;
pub struct Builder {
addr: SocketAddr,
}
impl Builder {
pub fn new(addr: SocketAddr) -> Self {
Self { addr }
}
pub fn build(&self) -> io::Result<Client<TcpStream>> {
let stream = StdTcpStream::connect(self.addr)?;
Ok(Client::new(
BufReader::new(TcpStream::from_std(stream.try_clone()?)),
BufWriter::new(TcpStream::from_std(stream)),
))
}
}
}
#[cfg(feature = "async-mio")]
pub use asynchronous::{Builder, TcpStream};
#[cfg(test)]
mod tests {}

@ -9,10 +9,14 @@
#[cfg(feature = "async-mio")]
use mio::{Events, Poll, Token};
#[cfg(feature = "async-mio")]
use std::{slice::Iter, time::Duration};
use std::{
io::{Read, Write},
slice::Iter,
time::Duration,
};
#[cfg(feature = "async-mio")]
use ssip_client::*;
use ssip_client::{client::Source, *};
#[cfg(feature = "async-mio")]
mod server;
@ -64,34 +68,16 @@ impl<'a, 'b> State<'a, 'b> {
}
}
#[test]
#[cfg(feature = "async-mio")]
fn basic_async_communication() -> ClientResult<()> {
const COMMUNICATION: [(&str, &str); 5] = [
(
"SET self CLIENT_NAME test:test:main\r\n",
"208 OK CLIENT NAME SET\r\n",
),
("SET self LANGUAGE en\r\n", "201 OK LANGUAGE SET\r\n"),
("STOP self\r\n", "210 OK STOPPED\r\n"),
(
"GET OUTPUT_MODULE\r\n",
"251-espeak\r\n251 OK GET RETURNED\r\n",
),
("GET RATE\r\n", "251-10\r\n251 OK GET RETURNED\r\n"),
];
fn basic_async_client_communication<S: Read + Write + Source>(
client: &mut QueuedClient<S>,
) -> ClientResult<usize> {
let get_requests = vec![Request::GetOutputModule, Request::GetRate];
let get_answers = vec!["espeak", "10"];
let mut state = State::new(get_requests.iter(), get_answers.iter());
let socket_dir = tempfile::tempdir()?;
let socket_path = socket_dir.path().join("basic_async_communication.socket");
assert!(!socket_path.exists());
let handle = server::run_unix(&socket_path, &COMMUNICATION)?;
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
let mut client = QueuedClient::new(fifo::Builder::new().path(&socket_path).build()?);
let input_token = Token(0);
let output_token = Token(1);
let timeout = Duration::new(0, 500 * 1000 * 1000 /* 500 ms */);
@ -131,8 +117,47 @@ fn basic_async_communication() -> ClientResult<()> {
}
}
}
Ok(state.countdown)
}
#[cfg(feature = "async-mio")]
const BASIC_COMMUNICATION: [(&str, &str); 5] = [
(
"SET self CLIENT_NAME test:test:main\r\n",
"208 OK CLIENT NAME SET\r\n",
),
("SET self LANGUAGE en\r\n", "201 OK LANGUAGE SET\r\n"),
("STOP self\r\n", "210 OK STOPPED\r\n"),
(
"GET OUTPUT_MODULE\r\n",
"251-espeak\r\n251 OK GET RETURNED\r\n",
),
("GET RATE\r\n", "251-10\r\n251 OK GET RETURNED\r\n"),
];
#[test]
#[cfg(feature = "async-mio")]
fn basic_async_unix_communication() -> ClientResult<()> {
let socket_dir = tempfile::tempdir()?;
let socket_path = socket_dir.path().join("basic_async_communication.socket");
assert!(!socket_path.exists());
let handle = server::run_unix(&socket_path, &BASIC_COMMUNICATION)?;
let mut client = QueuedClient::new(fifo::Builder::new().path(&socket_path).build()?);
let countdown = basic_async_client_communication(&mut client)?;
handle.join().unwrap().unwrap();
assert!(state.countdown > 0);
socket_dir.close()?;
assert!(countdown > 0);
Ok(())
}
#[test]
#[cfg(feature = "async-mio")]
fn basic_async_tcp_communication() -> ClientResult<()> {
let addr = "127.0.0.1:9999";
let handle = server::run_tcp(addr, &BASIC_COMMUNICATION)?;
let mut client = QueuedClient::new(tcp::Builder::new(addr.parse().unwrap()).build()?);
let countdown = basic_async_client_communication(&mut client)?;
handle.join().unwrap().unwrap();
assert!(countdown > 0);
Ok(())
}

@ -10,6 +10,7 @@ use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
use std::path::Path;
use std::thread;
use std::net::{TcpListener, ToSocketAddrs};
use std::os::unix::net::UnixListener;
/// Split lines on CRLF
@ -51,13 +52,12 @@ fn serve_streams(
/// Server traits
pub trait Server {
fn serve(&mut self) -> io::Result<()>;
fn serve(&mut self, communication: &[(&'static str, &'static str)]) -> io::Result<()>;
}
/// Server on a named socket.
pub struct UnixServer {
listener: UnixListener,
communication: Vec<(&'static str, &'static str)>,
}
impl UnixServer {
@ -65,32 +65,52 @@ impl UnixServer {
///
/// Argument `communication` is an array of pairs. The first item is a list of strings
/// the server will receive and the second item is the answer.
pub fn new<P>(
socket_path: P,
communication: &[(&'static str, &'static str)],
) -> io::Result<Self>
pub fn new<P>(socket_path: P) -> io::Result<Self>
where
P: AsRef<Path>,
{
let listener = UnixListener::bind(socket_path.as_ref())?;
Ok(Self {
listener,
communication: communication.to_vec(),
})
Ok(Self { listener })
}
}
impl Server for UnixServer {
fn serve(&mut self) -> io::Result<()> {
fn serve(&mut self, communication: &[(&'static str, &'static str)]) -> io::Result<()> {
let (mut stream, _) = self.listener.accept()?;
serve_streams(&mut stream.try_clone()?, &mut stream, &self.communication)
serve_streams(&mut stream.try_clone()?, &mut stream, communication)
}
}
/// Server on a named socket.
pub struct TcpServer {
listener: TcpListener,
}
impl TcpServer {
/// Create a new server on a named socket.
///
/// Argument `communication` is an array of pairs. The first item is a list of strings
/// the server will receive and the second item is the answer.
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
let listener = TcpListener::bind(addr)?;
Ok(Self { listener })
}
}
impl Server for TcpServer {
fn serve(&mut self, communication: &[(&'static str, &'static str)]) -> io::Result<()> {
let (mut stream, _) = self.listener.accept()?;
serve_streams(&mut stream.try_clone()?, &mut stream, communication)
}
}
/// Run the server in a thread
pub fn run_server(mut server: Box<dyn Server + Send>) -> thread::JoinHandle<io::Result<()>> {
pub fn run_server(
mut server: Box<dyn Server + Send>,
communication: &'static [(&'static str, &'static str)],
) -> thread::JoinHandle<io::Result<()>> {
thread::spawn(move || -> io::Result<()> {
server.serve()?;
server.serve(&communication)?;
Ok(())
})
}
@ -102,10 +122,17 @@ pub fn run_unix<P>(
where
P: AsRef<Path>,
{
Ok(run_server(Box::new(UnixServer::new(
&socket_path,
Ok(run_server(
Box::new(UnixServer::new(&socket_path)?),
communication,
)?)))
))
}
pub fn run_tcp<A: ToSocketAddrs>(
addr: A,
communication: &'static [(&'static str, &'static str)],
) -> io::Result<thread::JoinHandle<io::Result<()>>> {
Ok(run_server(Box::new(TcpServer::new(addr)?), communication))
}
#[cfg(test)]

@ -7,14 +7,18 @@
// modified, or distributed except according to those terms.
#[cfg(not(feature = "async-mio"))]
use ssip_client::*;
use ssip_client::{client::Source, *};
#[cfg(not(feature = "async-mio"))]
use std::{io, os::unix::net::UnixStream};
use std::{
io::{self, Read, Write},
net::TcpStream,
os::unix::net::UnixStream,
};
#[cfg(not(feature = "async-mio"))]
mod server;
/// Create a server and run the client
/// Create a server on a Unix socket and run the client
///
/// The communication is an array of (["question", ...], "response")
#[cfg(not(feature = "async-mio"))]
@ -42,6 +46,29 @@ where
Ok(())
}
/// Create a server on a inet socket and run the client
///
/// The communication is an array of (["question", ...], "response")
#[cfg(not(feature = "async-mio"))]
fn test_tcp_client<F>(
communication: &'static [(&'static str, &'static str)],
process: F,
) -> ClientResult<()>
where
F: FnMut(&mut Client<TcpStream>) -> io::Result<()>,
{
let mut process_wrapper = std::panic::AssertUnwindSafe(process);
let addr = "127.0.0.1:9999";
let handle = server::run_tcp(addr, communication)?;
let mut client = ssip_client::tcp::Builder::new(addr)?.build()?;
client
.set_client_name(ClientName::new("test", "test"))?
.check_client_name_set()?;
process_wrapper(&mut client)?;
handle.join().unwrap().unwrap();
Ok(())
}
#[cfg(not(feature = "async-mio"))]
const SET_CLIENT_COMMUNICATION: (&str, &str) = (
"SET self CLIENT_NAME test:test:main\r\n",
@ -51,16 +78,17 @@ const SET_CLIENT_COMMUNICATION: (&str, &str) = (
#[test]
#[cfg(not(feature = "async-mio"))]
fn connect_and_quit() -> ClientResult<()> {
test_unix_client(
&[
SET_CLIENT_COMMUNICATION,
("QUIT\r\n", "231 HAPPY HACKING\r\n"),
],
|client| {
client.quit().unwrap().check_status(OK_BYE).unwrap();
Ok(())
},
)
const COMMUNICATION: [(&'static str, &'static str); 2] = [
SET_CLIENT_COMMUNICATION,
("QUIT\r\n", "231 HAPPY HACKING\r\n"),
];
fn process<S: Read + Write + Source>(client: &mut Client<S>) -> io::Result<()> {
client.quit().unwrap().check_status(OK_BYE).unwrap();
Ok(())
}
test_unix_client(&COMMUNICATION, process)?;
test_tcp_client(&COMMUNICATION, process)?;
Ok(())
}
#[test]
Loading…
Cancel
Save