From 1139ff7080b5fab81207d7ded44d0c446f9baa49 Mon Sep 17 00:00:00 2001 From: Laurent Pelecq Date: Sat, 16 Apr 2022 20:22:47 +0200 Subject: [PATCH] Add tcp communication --- Cargo.toml | 2 +- README.md | 6 +- src/client.rs | 4 +- src/fifo.rs | 23 ++-- src/lib.rs | 2 + src/net.rs | 18 ++++ src/tcp.rs | 101 ++++++++++++++++++ tests/{fifo_async_tests.rs => mio_tests.rs} | 73 ++++++++----- tests/server.rs | 61 ++++++++--- ...ifo_sync_tests.rs => synchronous_tests.rs} | 54 +++++++--- 10 files changed, 270 insertions(+), 74 deletions(-) create mode 100644 src/net.rs create mode 100644 src/tcp.rs rename tests/{fifo_async_tests.rs => mio_tests.rs} (74%) rename tests/{fifo_sync_tests.rs => synchronous_tests.rs} (89%) diff --git a/Cargo.toml b/Cargo.toml index 15c4893..6a43dab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ssip-client" -version = "0.7.0" +version = "0.8.0" authors = ["Laurent Pelecq "] edition = "2018" description = "Client API for Speech Dispatcher" diff --git a/README.md b/README.md index 5471197..91a14a6 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ A non-blocking API can be used with a low-level polling mechanism based on `poll with [mio](https://github.com/tokio-rs/mio). - [x] Unix socket. -- [ ] TCP socket. +- [x] TCP socket. - [x] Stop, cancel, pause and resume. - [x] List, set voices. - [x] Set rate, pitch, volume. @@ -28,13 +28,13 @@ To use the synchronous API or an asynchronous API compatible with low-level crat ```toml [dependencies] -ssip-client = "0.7" +ssip-client = "0.8" ``` For the asynchronous API, use: ```toml [dependencies] -ssip-client = { version = "0.7", features = ["async-mio"] } +ssip-client = { version = "0.8", features = ["async-mio"] } ``` Example diff --git a/src/client.rs b/src/client.rs index 428036a..f98ed57 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,10 +18,10 @@ use crate::types::*; // Trick to have common implementation for std and mio streams.. #[cfg(all(not(feature = "async-mio"), unix))] -pub(crate) use std::os::unix::io::AsRawFd as Source; +pub use std::os::unix::io::AsRawFd as Source; #[cfg(feature = "async-mio")] -pub(crate) use mio::event::Source; +pub use mio::event::Source; /// Convert boolean to ON or OFF fn on_off(value: bool) -> &'static str { diff --git a/src/fifo.rs b/src/fifo.rs index a15bc08..5e37fa3 100644 --- a/src/fifo.rs +++ b/src/fifo.rs @@ -1,5 +1,5 @@ // ssip-client -- Speech Dispatcher client in Rust -// Copyright (c) 2021 Laurent Pelecq +// Copyright (c) 2021-2022 Laurent Pelecq // // Licensed under the Apache License, Version 2.0 // or the MIT @@ -58,25 +58,20 @@ mod synchronous { use std::time::Duration; use crate::client::Client; + use crate::net::StreamMode; use super::FifoPath; - enum FifoMode { - Blocking, - NonBlocking, - TimeOut(Duration), - } - pub struct Builder { path: FifoPath, - mode: FifoMode, + mode: StreamMode, } impl Builder { pub fn new() -> Self { Self { path: FifoPath::new(), - mode: FifoMode::Blocking, + mode: StreamMode::Blocking, } } @@ -89,21 +84,21 @@ mod synchronous { } pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self { - self.mode = FifoMode::TimeOut(read_timeout); + self.mode = StreamMode::TimeOut(read_timeout); self } pub fn nonblocking(&mut self) -> &mut Self { - self.mode = FifoMode::NonBlocking; + self.mode = StreamMode::NonBlocking; self } pub fn build(&self) -> io::Result> { let input = UnixStream::connect(self.path.get()?)?; match self.mode { - FifoMode::Blocking => input.set_nonblocking(false)?, - FifoMode::NonBlocking => input.set_nonblocking(true)?, - FifoMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?, + StreamMode::Blocking => input.set_nonblocking(false)?, + StreamMode::NonBlocking => input.set_nonblocking(true)?, + StreamMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?, } let output = input.try_clone()?; Ok(Client::new(BufReader::new(input), BufWriter::new(output))) diff --git a/src/lib.rs b/src/lib.rs index 4f457a3..8b3c0a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,8 @@ mod types; pub mod client; pub mod constants; pub mod fifo; +pub mod net; +pub mod tcp; #[cfg(any(not(feature = "async-mio"), doc))] pub use client::Client; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000..147cb8a --- /dev/null +++ b/src/net.rs @@ -0,0 +1,18 @@ +// ssip-client -- Speech Dispatcher client in Rust +// Copyright (c) 2022 Laurent Pelecq +// +// Licensed under the Apache License, Version 2.0 +// or the MIT +// license , at your +// option. All files in the project carrying such notice may not be copied, +// modified, or distributed except according to those terms. + +#[cfg(not(feature = "async-mio"))] +pub(crate) enum StreamMode { + Blocking, + NonBlocking, + TimeOut(std::time::Duration), +} + +#[cfg(test)] +mod tests {} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..7e07d50 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,101 @@ +// ssip-client -- Speech Dispatcher client in Rust +// Copyright (c) 2022 Laurent Pelecq +// +// Licensed under the Apache License, Version 2.0 +// or the MIT +// license , at your +// option. All files in the project carrying such notice may not be copied, +// modified, or distributed except according to those terms. + +#[cfg(not(feature = "async-mio"))] +mod synchronous { + use std::io::{self, BufReader, BufWriter}; + pub use std::net::TcpStream; + use std::net::{SocketAddr, ToSocketAddrs}; + use std::time::Duration; + use std::vec; + + use crate::client::Client; + use crate::net::StreamMode; + + struct Addresses(Vec); + + impl ToSocketAddrs for Addresses { + type Iter = vec::IntoIter; + fn to_socket_addrs(&self) -> io::Result { + Ok(self.0.clone().into_iter()) + } + } + + pub struct Builder { + addrs: Addresses, + mode: StreamMode, + } + + impl Builder { + pub fn new(addrs: A) -> io::Result { + Ok(Self { + addrs: Addresses(addrs.to_socket_addrs()?.collect::>()), + mode: StreamMode::Blocking, + }) + } + + pub fn timeout(&mut self, read_timeout: Duration) -> &mut Self { + self.mode = StreamMode::TimeOut(read_timeout); + self + } + + pub fn nonblocking(&mut self) -> &mut Self { + self.mode = StreamMode::NonBlocking; + self + } + + pub fn build(&self) -> io::Result> { + let input = TcpStream::connect(&self.addrs)?; + match self.mode { + StreamMode::Blocking => input.set_nonblocking(false)?, + StreamMode::NonBlocking => input.set_nonblocking(true)?, + StreamMode::TimeOut(timeout) => input.set_read_timeout(Some(timeout))?, + } + let output = input.try_clone()?; + Ok(Client::new(BufReader::new(input), BufWriter::new(output))) + } + } +} + +#[cfg(not(feature = "async-mio"))] +pub use synchronous::{Builder, TcpStream}; + +#[cfg(feature = "async-mio")] +mod asynchronous { + pub use mio::net::TcpStream; + use std::io::{self, BufReader, BufWriter}; + use std::net::SocketAddr; + use std::net::TcpStream as StdTcpStream; + + use crate::client::Client; + + pub struct Builder { + addr: SocketAddr, + } + + impl Builder { + pub fn new(addr: SocketAddr) -> Self { + Self { addr } + } + + pub fn build(&self) -> io::Result> { + let stream = StdTcpStream::connect(self.addr)?; + Ok(Client::new( + BufReader::new(TcpStream::from_std(stream.try_clone()?)), + BufWriter::new(TcpStream::from_std(stream)), + )) + } + } +} + +#[cfg(feature = "async-mio")] +pub use asynchronous::{Builder, TcpStream}; + +#[cfg(test)] +mod tests {} diff --git a/tests/fifo_async_tests.rs b/tests/mio_tests.rs similarity index 74% rename from tests/fifo_async_tests.rs rename to tests/mio_tests.rs index e84ee3f..1dfc378 100644 --- a/tests/fifo_async_tests.rs +++ b/tests/mio_tests.rs @@ -9,10 +9,14 @@ #[cfg(feature = "async-mio")] use mio::{Events, Poll, Token}; #[cfg(feature = "async-mio")] -use std::{slice::Iter, time::Duration}; +use std::{ + io::{Read, Write}, + slice::Iter, + time::Duration, +}; #[cfg(feature = "async-mio")] -use ssip_client::*; +use ssip_client::{client::Source, *}; #[cfg(feature = "async-mio")] mod server; @@ -64,34 +68,16 @@ impl<'a, 'b> State<'a, 'b> { } } -#[test] #[cfg(feature = "async-mio")] -fn basic_async_communication() -> ClientResult<()> { - const COMMUNICATION: [(&str, &str); 5] = [ - ( - "SET self CLIENT_NAME test:test:main\r\n", - "208 OK CLIENT NAME SET\r\n", - ), - ("SET self LANGUAGE en\r\n", "201 OK LANGUAGE SET\r\n"), - ("STOP self\r\n", "210 OK STOPPED\r\n"), - ( - "GET OUTPUT_MODULE\r\n", - "251-espeak\r\n251 OK GET RETURNED\r\n", - ), - ("GET RATE\r\n", "251-10\r\n251 OK GET RETURNED\r\n"), - ]; - +fn basic_async_client_communication( + client: &mut QueuedClient, +) -> ClientResult { let get_requests = vec![Request::GetOutputModule, Request::GetRate]; let get_answers = vec!["espeak", "10"]; let mut state = State::new(get_requests.iter(), get_answers.iter()); - let socket_dir = tempfile::tempdir()?; - let socket_path = socket_dir.path().join("basic_async_communication.socket"); - assert!(!socket_path.exists()); - let handle = server::run_unix(&socket_path, &COMMUNICATION)?; let mut poll = Poll::new()?; let mut events = Events::with_capacity(128); - let mut client = QueuedClient::new(fifo::Builder::new().path(&socket_path).build()?); let input_token = Token(0); let output_token = Token(1); let timeout = Duration::new(0, 500 * 1000 * 1000 /* 500 ms */); @@ -131,8 +117,47 @@ fn basic_async_communication() -> ClientResult<()> { } } } + Ok(state.countdown) +} + +#[cfg(feature = "async-mio")] +const BASIC_COMMUNICATION: [(&str, &str); 5] = [ + ( + "SET self CLIENT_NAME test:test:main\r\n", + "208 OK CLIENT NAME SET\r\n", + ), + ("SET self LANGUAGE en\r\n", "201 OK LANGUAGE SET\r\n"), + ("STOP self\r\n", "210 OK STOPPED\r\n"), + ( + "GET OUTPUT_MODULE\r\n", + "251-espeak\r\n251 OK GET RETURNED\r\n", + ), + ("GET RATE\r\n", "251-10\r\n251 OK GET RETURNED\r\n"), +]; + +#[test] +#[cfg(feature = "async-mio")] +fn basic_async_unix_communication() -> ClientResult<()> { + let socket_dir = tempfile::tempdir()?; + let socket_path = socket_dir.path().join("basic_async_communication.socket"); + assert!(!socket_path.exists()); + let handle = server::run_unix(&socket_path, &BASIC_COMMUNICATION)?; + let mut client = QueuedClient::new(fifo::Builder::new().path(&socket_path).build()?); + let countdown = basic_async_client_communication(&mut client)?; handle.join().unwrap().unwrap(); - assert!(state.countdown > 0); socket_dir.close()?; + assert!(countdown > 0); + Ok(()) +} + +#[test] +#[cfg(feature = "async-mio")] +fn basic_async_tcp_communication() -> ClientResult<()> { + let addr = "127.0.0.1:9999"; + let handle = server::run_tcp(addr, &BASIC_COMMUNICATION)?; + let mut client = QueuedClient::new(tcp::Builder::new(addr.parse().unwrap()).build()?); + let countdown = basic_async_client_communication(&mut client)?; + handle.join().unwrap().unwrap(); + assert!(countdown > 0); Ok(()) } diff --git a/tests/server.rs b/tests/server.rs index 632e136..bd99f22 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -10,6 +10,7 @@ use std::io::{self, BufRead, BufReader, BufWriter, Read, Write}; use std::path::Path; use std::thread; +use std::net::{TcpListener, ToSocketAddrs}; use std::os::unix::net::UnixListener; /// Split lines on CRLF @@ -51,13 +52,12 @@ fn serve_streams( /// Server traits pub trait Server { - fn serve(&mut self) -> io::Result<()>; + fn serve(&mut self, communication: &[(&'static str, &'static str)]) -> io::Result<()>; } /// Server on a named socket. pub struct UnixServer { listener: UnixListener, - communication: Vec<(&'static str, &'static str)>, } impl UnixServer { @@ -65,32 +65,52 @@ impl UnixServer { /// /// Argument `communication` is an array of pairs. The first item is a list of strings /// the server will receive and the second item is the answer. - pub fn new

( - socket_path: P, - communication: &[(&'static str, &'static str)], - ) -> io::Result + pub fn new

(socket_path: P) -> io::Result where P: AsRef, { let listener = UnixListener::bind(socket_path.as_ref())?; - Ok(Self { - listener, - communication: communication.to_vec(), - }) + Ok(Self { listener }) } } impl Server for UnixServer { - fn serve(&mut self) -> io::Result<()> { + fn serve(&mut self, communication: &[(&'static str, &'static str)]) -> io::Result<()> { let (mut stream, _) = self.listener.accept()?; - serve_streams(&mut stream.try_clone()?, &mut stream, &self.communication) + serve_streams(&mut stream.try_clone()?, &mut stream, communication) + } +} + +/// Server on a named socket. +pub struct TcpServer { + listener: TcpListener, +} + +impl TcpServer { + /// Create a new server on a named socket. + /// + /// Argument `communication` is an array of pairs. The first item is a list of strings + /// the server will receive and the second item is the answer. + pub fn new(addr: A) -> io::Result { + let listener = TcpListener::bind(addr)?; + Ok(Self { listener }) + } +} + +impl Server for TcpServer { + fn serve(&mut self, communication: &[(&'static str, &'static str)]) -> io::Result<()> { + let (mut stream, _) = self.listener.accept()?; + serve_streams(&mut stream.try_clone()?, &mut stream, communication) } } /// Run the server in a thread -pub fn run_server(mut server: Box) -> thread::JoinHandle> { +pub fn run_server( + mut server: Box, + communication: &'static [(&'static str, &'static str)], +) -> thread::JoinHandle> { thread::spawn(move || -> io::Result<()> { - server.serve()?; + server.serve(&communication)?; Ok(()) }) } @@ -102,10 +122,17 @@ pub fn run_unix

( where P: AsRef, { - Ok(run_server(Box::new(UnixServer::new( - &socket_path, + Ok(run_server( + Box::new(UnixServer::new(&socket_path)?), communication, - )?))) + )) +} + +pub fn run_tcp( + addr: A, + communication: &'static [(&'static str, &'static str)], +) -> io::Result>> { + Ok(run_server(Box::new(TcpServer::new(addr)?), communication)) } #[cfg(test)] diff --git a/tests/fifo_sync_tests.rs b/tests/synchronous_tests.rs similarity index 89% rename from tests/fifo_sync_tests.rs rename to tests/synchronous_tests.rs index 3aadf26..afc4f5f 100644 --- a/tests/fifo_sync_tests.rs +++ b/tests/synchronous_tests.rs @@ -7,14 +7,18 @@ // modified, or distributed except according to those terms. #[cfg(not(feature = "async-mio"))] -use ssip_client::*; +use ssip_client::{client::Source, *}; #[cfg(not(feature = "async-mio"))] -use std::{io, os::unix::net::UnixStream}; +use std::{ + io::{self, Read, Write}, + net::TcpStream, + os::unix::net::UnixStream, +}; #[cfg(not(feature = "async-mio"))] mod server; -/// Create a server and run the client +/// Create a server on a Unix socket and run the client /// /// The communication is an array of (["question", ...], "response") #[cfg(not(feature = "async-mio"))] @@ -42,6 +46,29 @@ where Ok(()) } +/// Create a server on a inet socket and run the client +/// +/// The communication is an array of (["question", ...], "response") +#[cfg(not(feature = "async-mio"))] +fn test_tcp_client( + communication: &'static [(&'static str, &'static str)], + process: F, +) -> ClientResult<()> +where + F: FnMut(&mut Client) -> io::Result<()>, +{ + let mut process_wrapper = std::panic::AssertUnwindSafe(process); + let addr = "127.0.0.1:9999"; + let handle = server::run_tcp(addr, communication)?; + let mut client = ssip_client::tcp::Builder::new(addr)?.build()?; + client + .set_client_name(ClientName::new("test", "test"))? + .check_client_name_set()?; + process_wrapper(&mut client)?; + handle.join().unwrap().unwrap(); + Ok(()) +} + #[cfg(not(feature = "async-mio"))] const SET_CLIENT_COMMUNICATION: (&str, &str) = ( "SET self CLIENT_NAME test:test:main\r\n", @@ -51,16 +78,17 @@ const SET_CLIENT_COMMUNICATION: (&str, &str) = ( #[test] #[cfg(not(feature = "async-mio"))] fn connect_and_quit() -> ClientResult<()> { - test_unix_client( - &[ - SET_CLIENT_COMMUNICATION, - ("QUIT\r\n", "231 HAPPY HACKING\r\n"), - ], - |client| { - client.quit().unwrap().check_status(OK_BYE).unwrap(); - Ok(()) - }, - ) + const COMMUNICATION: [(&'static str, &'static str); 2] = [ + SET_CLIENT_COMMUNICATION, + ("QUIT\r\n", "231 HAPPY HACKING\r\n"), + ]; + fn process(client: &mut Client) -> io::Result<()> { + client.quit().unwrap().check_status(OK_BYE).unwrap(); + Ok(()) + } + test_unix_client(&COMMUNICATION, process)?; + test_tcp_client(&COMMUNICATION, process)?; + Ok(()) } #[test]