Additional progress on tokio and async_std versions

main
Tait Hoyem 1 year ago
parent 0187c049cf
commit cbcd244e46

@ -16,17 +16,18 @@ mio = { version = "0.8", optional = true }
thiserror = "1"
strum = "0.24"
strum_macros = "0.24"
tokio = { version = "^1.21.2", features = ["io-util"] }
tokio = { version = "^1.21.2", features = ["io-util", "rt", "macros", "net"] }
async-std = { version = "1.12.0", default_features = true }
[features]
async-mio = ["mio/net", "mio/os-poll"]
tokio = ["tokio/io-util"]
tokio = ["tokio/io-util", "tokio/rt", "tokio/macros"]
async-std = ["async-std/default"]
[dev-dependencies]
mio = { version = "0.8", features = ["os-poll", "os-ext"] }
tokio = { version = "^1.21.2", features = ["io-util"] }
async-std = { version = "1.12.0" }
tokio = { version = "^1.21.2", features = ["io-util", "rt"] }
async-std = { version = "1.12.0", default_features = true }
lazy_static = "1"
popol = "1"
tempfile = "3"

@ -0,0 +1,31 @@
extern crate tokio;
use ssip_client::{fifo, ClientName, ClientResult};
#[cfg(all(unix, feature = "tokio"))]
#[tokio::main(flavor = "current_thread")]
async fn main() -> ClientResult<()> {
let mut client = fifo::Builder::new().build()?;
client
.set_client_name(ClientName::new("joe", "hello"))?
.check_client_name_set()?;
let msg_id = client
.speak()?
.check_receiving_data()?
.send_line("Lorem ipsum dollar mit amit dor BIG CHEESE! Hi 123 hi 123 hi 123 hi 123.")?
.receive_message_id()?;
println!("message: {}", msg_id);
let volume = client.get_volume()?.receive_u8()?;
println!("volume: {}", volume);
client.quit()?;
Ok(())
}
#[cfg(all(unix, not(feature = "tokio")))]
fn main() {
println!("see hello.rs for an example of a synchronous client.");
}
#[cfg(not(unix))]
fn main() {
println!("example only available on unix.");
}

@ -16,7 +16,7 @@ use crate::protocol::{
};
use crate::types::*;
use async_std::io::{AsyncBufRead, AsyncWrite};
use async_std::io::{BufRead as AsyncBufRead, Write as AsyncWrite};
/// Convert boolean to ON or OFF
fn on_off(value: bool) -> &'static str {
@ -190,7 +190,7 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> AsyncClient<R, W> {
/// Send lines of text (terminated by a single dot).
pub async fn send_lines(&mut self, lines: &[String]) -> ClientResult<&mut Self> {
const END_OF_DATA: [&str; 1] = ["."];
write_lines_tokio(
write_lines_async_std(
&mut self.output,
lines
.iter()
@ -198,12 +198,12 @@ impl<R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin> AsyncClient<R, W> {
.collect::<Vec<&str>>()
.as_slice(),
).await?;
flush_lines_tokio(&mut self.output, &END_OF_DATA).await?;
flush_lines_async_std(&mut self.output, &END_OF_DATA).await?;
Ok(self)
}
/// Receive answer from server
async fn receive_answer(&mut self, lines: &mut Vec<String>) -> ClientStatus {
crate::protocol::receive_answer_tokio(&mut self.input, Some(lines)).await
crate::protocol::receive_answer_async_std(&mut self.input, Some(lines)).await
}
/// Receive one response.
pub async fn receive(&mut self) -> ClientResult<Response> {

@ -155,6 +155,51 @@ mod asynchronous {
}
}
}
#[cfg(feature = "tokio")]
mod asynchronous_tokio {
pub use tokio::net::UnixStream;
use tokio::io::{self, BufReader, BufWriter};
use std::path::Path;
use crate::tokio::AsyncClient;
use super::FifoPath;
pub struct Builder {
path: FifoPath,
}
impl Builder {
pub fn new() -> Self {
Self {
path: FifoPath::new(),
}
}
pub fn path<P>(&mut self, socket_path: P) -> &mut Self
where
P: AsRef<Path>,
{
self.path.set(socket_path);
self
}
fn non_blocking(socket: UnixStream) -> io::Result<UnixStream> {
socket.set_nonblocking(true)?;
Ok(socket)
}
pub fn build(&self) -> io::Result<AsyncClient<UnixStream, UnixStream>> {
let stream = UnixStream::connect(self.path.get()?)?;
Ok(AsyncClient::new(
BufReader::new(UnixStream::from_std(Self::non_blocking(
stream.try_clone()?,
)?)),
BufWriter::new(UnixStream::from_std(Self::non_blocking(stream)?)),
))
}
}
}
#[cfg(feature = "async-mio")]
pub use asynchronous::{Builder, UnixStream};

@ -17,8 +17,14 @@ use tokio::io::{
};
#[cfg(feature = "async-std")]
use async_std::io::{
AsyncWrite, AsyncRead,
Read as AsyncReadStd,
BufRead as AsyncBufReadStd,
Write as AsyncWriteStd,
ReadExt,
WriteExt,
prelude::BufReadExt,
};
use std::str::FromStr;
use crate::types::{ClientError, ClientResult, ClientStatus, EventId, StatusLine};
@ -95,7 +101,7 @@ pub(crate) async fn write_lines_tokio(output: &mut (dyn AsyncWrite + Unpin), lin
}
/// Write lines (asyncronously) separated by CRLF.
#[cfg(feature = "async-std")]
pub(crate) async fn write_lines_tokio(output: &mut (dyn AsyncWrite + Unpin), lines: &[&str]) -> ClientResult<()> {
pub(crate) async fn write_lines_async_std(output: &mut (dyn AsyncWriteStd + Unpin), lines: &[&str]) -> ClientResult<()> {
for line in lines.iter() {
debug!("SSIP(out): {}", line);
output.write_all(line.as_bytes()).await?;
@ -117,6 +123,13 @@ pub(crate) async fn flush_lines_tokio(output: &mut (dyn AsyncWrite + Unpin), lin
output.flush().await?;
Ok(())
}
/// Write lines separated by CRLF and flush the output asyncronously.
#[cfg(feature = "async-std")]
pub(crate) async fn flush_lines_async_std(output: &mut (dyn AsyncWriteStd + Unpin), lines: &[&str]) -> ClientResult<()> {
write_lines_async_std(output, lines).await?;
output.flush().await?;
Ok(())
}
/// Strip prefix if found
fn strip_prefix(line: &str, prefix: &str) -> String {
@ -165,6 +178,35 @@ pub(crate) async fn receive_answer_tokio(
}
}
}
/// Read lines from server until a status line is found.
#[cfg(feature = "async-std")]
pub(crate) async fn receive_answer_async_std(
input: &mut (dyn AsyncBufReadStd + Unpin),
mut lines: Option<&mut Vec<String>>,
) -> ClientStatus {
loop {
let mut line = String::new();
input.read_line(&mut line).await.map_err(ClientError::Io)?;
debug!("SSIP(in): {}", line.trim_end());
match line.chars().nth(3) {
Some(ch) => match ch {
' ' => match line[0..3].parse::<u16>() {
Ok(code) => return parse_status_line(code, line[4..].trim_end()),
Err(err) => return Err(invalid_input!(err.to_string())),
},
'-' => match lines {
Some(ref mut lines) => lines.push(line[4..].trim_end().to_string()),
None => return Err(invalid_input!("unexpected line: {}", line)),
},
ch => {
return Err(invalid_input!("expecting space or dash, got {}.", ch));
}
},
None if line.is_empty() => return Err(invalid_input!("empty line")),
None => return Err(invalid_input!("line too short: {}", line)),
}
}
}
/// Read lines from server until a status line is found asyncronously.
pub(crate) fn receive_answer(

@ -9,11 +9,10 @@
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.
use std::io::{self, Read, Write};
use crate::constants::*;
use crate::protocol::{
flush_lines, parse_event_id, parse_single_integer, parse_single_value, parse_typed_lines,
parse_event_id, parse_single_integer, parse_single_value, parse_typed_lines,
flush_lines_tokio, write_lines_tokio,
};
use crate::types::*;
@ -147,35 +146,6 @@ pub enum Response {
EventResumed(EventId), // 705
}
macro_rules! send_one_line {
($self:expr, $fmt:expr, $( $arg:expr ),+) => {
flush_lines(&mut $self.output, &[format!($fmt, $( $arg ),+).as_str()])
};
($self:expr, $fmt:expr) => {
flush_lines(&mut $self.output, &[$fmt])
}
}
macro_rules! send_toggle {
($output:expr, $fmt:expr, $val:expr) => {
send_one_line!($output, $fmt, on_off($val))
};
($output:expr, $fmt:expr, $arg:expr, $val:expr) => {
send_one_line!($output, $fmt, $arg, on_off($val))
};
}
macro_rules! send_range {
($output:expr, $fmt:expr, $scope:expr, $val:expr) => {
send_one_line!(
$output,
$fmt,
$scope,
std::cmp::max(-100, std::cmp::min(100, $val))
)
};
}
/// SSIP client on generic async stream
///
/// There are two ways to send requests and receive responses:

Loading…
Cancel
Save