diff --git a/Cargo.toml b/Cargo.toml index a9c3722..ad931a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,17 +16,18 @@ mio = { version = "0.8", optional = true } thiserror = "1" strum = "0.24" strum_macros = "0.24" -tokio = { version = "^1.21.2", features = ["io-util"] } +tokio = { version = "^1.21.2", features = ["io-util", "rt", "macros", "net"] } +async-std = { version = "1.12.0", default_features = true } [features] async-mio = ["mio/net", "mio/os-poll"] -tokio = ["tokio/io-util"] +tokio = ["tokio/io-util", "tokio/rt", "tokio/macros"] async-std = ["async-std/default"] [dev-dependencies] mio = { version = "0.8", features = ["os-poll", "os-ext"] } -tokio = { version = "^1.21.2", features = ["io-util"] } -async-std = { version = "1.12.0" } +tokio = { version = "^1.21.2", features = ["io-util", "rt"] } +async-std = { version = "1.12.0", default_features = true } lazy_static = "1" popol = "1" tempfile = "3" diff --git a/examples/async_tokio.rs b/examples/async_tokio.rs new file mode 100644 index 0000000..9d351fc --- /dev/null +++ b/examples/async_tokio.rs @@ -0,0 +1,31 @@ +extern crate tokio; +use ssip_client::{fifo, ClientName, ClientResult}; + +#[cfg(all(unix, feature = "tokio"))] +#[tokio::main(flavor = "current_thread")] +async fn main() -> ClientResult<()> { + let mut client = fifo::Builder::new().build()?; + client + .set_client_name(ClientName::new("joe", "hello"))? + .check_client_name_set()?; + let msg_id = client + .speak()? + .check_receiving_data()? + .send_line("Lorem ipsum dollar mit amit dor BIG CHEESE! Hi 123 hi 123 hi 123 hi 123.")? + .receive_message_id()?; + println!("message: {}", msg_id); + let volume = client.get_volume()?.receive_u8()?; + println!("volume: {}", volume); + client.quit()?; + Ok(()) +} + +#[cfg(all(unix, not(feature = "tokio")))] +fn main() { + println!("see hello.rs for an example of a synchronous client."); +} + +#[cfg(not(unix))] +fn main() { + println!("example only available on unix."); +} diff --git a/src/async_std.rs b/src/async_std.rs index 609d8eb..658716c 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -16,7 +16,7 @@ use crate::protocol::{ }; use crate::types::*; -use async_std::io::{AsyncBufRead, AsyncWrite}; +use async_std::io::{BufRead as AsyncBufRead, Write as AsyncWrite}; /// Convert boolean to ON or OFF fn on_off(value: bool) -> &'static str { @@ -190,7 +190,7 @@ impl AsyncClient { /// Send lines of text (terminated by a single dot). pub async fn send_lines(&mut self, lines: &[String]) -> ClientResult<&mut Self> { const END_OF_DATA: [&str; 1] = ["."]; - write_lines_tokio( + write_lines_async_std( &mut self.output, lines .iter() @@ -198,12 +198,12 @@ impl AsyncClient { .collect::>() .as_slice(), ).await?; - flush_lines_tokio(&mut self.output, &END_OF_DATA).await?; + flush_lines_async_std(&mut self.output, &END_OF_DATA).await?; Ok(self) } /// Receive answer from server async fn receive_answer(&mut self, lines: &mut Vec) -> ClientStatus { - crate::protocol::receive_answer_tokio(&mut self.input, Some(lines)).await + crate::protocol::receive_answer_async_std(&mut self.input, Some(lines)).await } /// Receive one response. pub async fn receive(&mut self) -> ClientResult { diff --git a/src/fifo.rs b/src/fifo.rs index 5e37fa3..492e72e 100644 --- a/src/fifo.rs +++ b/src/fifo.rs @@ -155,6 +155,51 @@ mod asynchronous { } } } +#[cfg(feature = "tokio")] +mod asynchronous_tokio { + pub use tokio::net::UnixStream; + use tokio::io::{self, BufReader, BufWriter}; + use std::path::Path; + + use crate::tokio::AsyncClient; + + use super::FifoPath; + + pub struct Builder { + path: FifoPath, + } + + impl Builder { + pub fn new() -> Self { + Self { + path: FifoPath::new(), + } + } + + pub fn path

(&mut self, socket_path: P) -> &mut Self + where + P: AsRef, + { + self.path.set(socket_path); + self + } + + fn non_blocking(socket: UnixStream) -> io::Result { + socket.set_nonblocking(true)?; + Ok(socket) + } + + pub fn build(&self) -> io::Result> { + let stream = UnixStream::connect(self.path.get()?)?; + Ok(AsyncClient::new( + BufReader::new(UnixStream::from_std(Self::non_blocking( + stream.try_clone()?, + )?)), + BufWriter::new(UnixStream::from_std(Self::non_blocking(stream)?)), + )) + } + } +} #[cfg(feature = "async-mio")] pub use asynchronous::{Builder, UnixStream}; diff --git a/src/protocol.rs b/src/protocol.rs index 457d80a..154b8b9 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -17,8 +17,14 @@ use tokio::io::{ }; #[cfg(feature = "async-std")] use async_std::io::{ - AsyncWrite, AsyncRead, + Read as AsyncReadStd, + BufRead as AsyncBufReadStd, + Write as AsyncWriteStd, + ReadExt, + WriteExt, + prelude::BufReadExt, }; + use std::str::FromStr; use crate::types::{ClientError, ClientResult, ClientStatus, EventId, StatusLine}; @@ -95,7 +101,7 @@ pub(crate) async fn write_lines_tokio(output: &mut (dyn AsyncWrite + Unpin), lin } /// Write lines (asyncronously) separated by CRLF. #[cfg(feature = "async-std")] -pub(crate) async fn write_lines_tokio(output: &mut (dyn AsyncWrite + Unpin), lines: &[&str]) -> ClientResult<()> { +pub(crate) async fn write_lines_async_std(output: &mut (dyn AsyncWriteStd + Unpin), lines: &[&str]) -> ClientResult<()> { for line in lines.iter() { debug!("SSIP(out): {}", line); output.write_all(line.as_bytes()).await?; @@ -117,6 +123,13 @@ pub(crate) async fn flush_lines_tokio(output: &mut (dyn AsyncWrite + Unpin), lin output.flush().await?; Ok(()) } +/// Write lines separated by CRLF and flush the output asyncronously. +#[cfg(feature = "async-std")] +pub(crate) async fn flush_lines_async_std(output: &mut (dyn AsyncWriteStd + Unpin), lines: &[&str]) -> ClientResult<()> { + write_lines_async_std(output, lines).await?; + output.flush().await?; + Ok(()) +} /// Strip prefix if found fn strip_prefix(line: &str, prefix: &str) -> String { @@ -165,6 +178,35 @@ pub(crate) async fn receive_answer_tokio( } } } +/// Read lines from server until a status line is found. +#[cfg(feature = "async-std")] +pub(crate) async fn receive_answer_async_std( + input: &mut (dyn AsyncBufReadStd + Unpin), + mut lines: Option<&mut Vec>, +) -> ClientStatus { + loop { + let mut line = String::new(); + input.read_line(&mut line).await.map_err(ClientError::Io)?; + debug!("SSIP(in): {}", line.trim_end()); + match line.chars().nth(3) { + Some(ch) => match ch { + ' ' => match line[0..3].parse::() { + Ok(code) => return parse_status_line(code, line[4..].trim_end()), + Err(err) => return Err(invalid_input!(err.to_string())), + }, + '-' => match lines { + Some(ref mut lines) => lines.push(line[4..].trim_end().to_string()), + None => return Err(invalid_input!("unexpected line: {}", line)), + }, + ch => { + return Err(invalid_input!("expecting space or dash, got {}.", ch)); + } + }, + None if line.is_empty() => return Err(invalid_input!("empty line")), + None => return Err(invalid_input!("line too short: {}", line)), + } + } +} /// Read lines from server until a status line is found asyncronously. pub(crate) fn receive_answer( diff --git a/src/tokio.rs b/src/tokio.rs index f16bb82..ec3ba1f 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -9,11 +9,10 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -use std::io::{self, Read, Write}; use crate::constants::*; use crate::protocol::{ - flush_lines, parse_event_id, parse_single_integer, parse_single_value, parse_typed_lines, + parse_event_id, parse_single_integer, parse_single_value, parse_typed_lines, flush_lines_tokio, write_lines_tokio, }; use crate::types::*; @@ -147,35 +146,6 @@ pub enum Response { EventResumed(EventId), // 705 } -macro_rules! send_one_line { - ($self:expr, $fmt:expr, $( $arg:expr ),+) => { - flush_lines(&mut $self.output, &[format!($fmt, $( $arg ),+).as_str()]) - }; - ($self:expr, $fmt:expr) => { - flush_lines(&mut $self.output, &[$fmt]) - } -} - -macro_rules! send_toggle { - ($output:expr, $fmt:expr, $val:expr) => { - send_one_line!($output, $fmt, on_off($val)) - }; - ($output:expr, $fmt:expr, $arg:expr, $val:expr) => { - send_one_line!($output, $fmt, $arg, on_off($val)) - }; -} - -macro_rules! send_range { - ($output:expr, $fmt:expr, $scope:expr, $val:expr) => { - send_one_line!( - $output, - $fmt, - $scope, - std::cmp::max(-100, std::cmp::min(100, $val)) - ) - }; -} - /// SSIP client on generic async stream /// /// There are two ways to send requests and receive responses: