From 543378cb526744ef32781dc9795912d079fea1a7 Mon Sep 17 00:00:00 2001 From: Ritesh Chitlangi Date: Tue, 20 Jul 2021 00:52:02 +0800 Subject: [PATCH] fixed message length bug --- Cargo.toml | 1 + src/connection.rs | 71 +++++++++++++++++++++++++++++++++++----------- src/epp/request.rs | 2 +- src/main.rs | 11 +++---- 4 files changed, 62 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 201a4e4..2641d51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bytes = "1" confy = "0.4" futures = "0.3" lazy_static = "1.4" diff --git a/src/connection.rs b/src/connection.rs index b3006b6..ba14d29 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,20 +1,27 @@ use std::sync::Arc; use std::sync::mpsc; use std::{str, u32}; +use bytes::BytesMut; use std::convert::TryInto; use futures::executor::block_on; use std::{error::Error, net::ToSocketAddrs, io as stdio}; use tokio_rustls::{TlsConnector, rustls::ClientConfig, webpki::DNSNameRef, client::TlsStream}; -use tokio::{net::TcpStream, io::AsyncWriteExt, io::AsyncReadExt}; +use tokio::{net::TcpStream, io::AsyncWriteExt, io::AsyncReadExt, io::split, io::ReadHalf, io::WriteHalf}; use crate::config::{CONFIG, EppClientConnection}; +use tokio::time::{sleep, Duration}; use crate::error; use crate::epp::request::EppRequest; +struct ConnectionStream { + reader: ReadHalf>, + writer: WriteHalf>, +} + struct EppConnection { registry: String, credentials: (String, String), - stream: TlsStream, + stream: ConnectionStream, pub greeting: String, } @@ -26,9 +33,9 @@ impl EppConnection { pub async fn new( registry: String, credentials: (String, String), - mut stream: TlsStream) -> Result> { + mut stream: ConnectionStream) -> Result> { let mut buf = vec![0u8; 4096]; - stream.read(&mut buf).await?; + stream.reader.read(&mut buf).await?; let greeting = str::from_utf8(&buf)?.to_string(); Ok(EppConnection { @@ -40,7 +47,10 @@ impl EppConnection { } async fn write(&mut self, buf: &Vec) -> Result<(), Box> { - self.stream.write_all(buf).await?; + let wrote = self.stream.writer.write(buf).await?; + + println!("Wrote {} bytes", wrote); + Ok(()) } @@ -50,37 +60,60 @@ impl EppConnection { let buf_size = len + 4; let mut buf: Vec = vec![0u8; buf_size]; + let len = len + 4; let len_u32: [u8; 4] = u32::to_be_bytes(len.try_into()?); buf[..4].clone_from_slice(&len_u32); buf[4..].clone_from_slice(&content.as_bytes()); + let conv = str::from_utf8(&buf[4..])?; + println!("reconverted: {}", conv); + self.write(&buf).await } async fn read(&mut self) -> Result, Box> { let mut buf = vec![0u8; 4096]; - self.stream.read(&mut buf).await?; + self.stream.reader.read(&mut buf).await?; Ok(buf) } async fn read_epp_response(&mut self) -> Result, Box> { let mut buf = [0u8; 4]; - self.stream.read_exact(&mut buf).await?; + self.stream.reader.read_exact(&mut buf).await?; let buf_size :usize = u32::from_be_bytes(buf).try_into()?; - println!("Response buffer size: {}", buf_size); + let message_size = buf_size - 4; + println!("Message buffer size: {}", message_size); - let mut buf = vec![0u8; buf_size - 4]; + let mut buf = BytesMut::with_capacity(4096); + let mut read_buf = vec![0u8; 4096]; - self.stream.read(&mut buf).await?; + let mut read_size :usize = 0; - Ok(buf) + loop { + let read = self.stream.reader.read(&mut read_buf).await?; + println!("Read: {} bytes", read); + buf.extend_from_slice(&read_buf[0..read]); + + read_size = read_size + read; + println!("Total read: {} bytes", read_size); + + if read == 0 { + panic!("Unexpected eof") + } else if read_size >= message_size { + break; + } + } + + let data = buf.to_vec(); + + Ok(data) } pub async fn get_epp_response(&mut self) -> Result> { - let contents = self.read().await?; + let contents = self.read_epp_response().await?; let response = str::from_utf8(&contents)?.to_string(); @@ -88,16 +121,15 @@ impl EppConnection { } pub async fn transact(&mut self, content: &str) -> Result> { - let content = format!("{}\r\n\r\n", content); - self.send_epp_request(&content).await?; + self.get_epp_response().await } async fn close(&mut self) -> Result<(), Box> { println!("Closing ..."); - self.stream.shutdown().await?; + self.stream.writer.shutdown().await?; Ok(()) } } @@ -129,7 +161,7 @@ impl EppClient { } } -async fn epp_connect(registry_creds: &EppClientConnection) -> Result, error::Error> { +async fn epp_connect(registry_creds: &EppClientConnection) -> Result { let (host, port) = registry_creds.connection_details(); println!("{}: {}", host, port); @@ -153,7 +185,12 @@ async fn epp_connect(registry_creds: &EppClientConnection) -> Result Result> { diff --git a/src/epp/request.rs b/src/epp/request.rs index 3e4dbcc..d2aadd5 100644 --- a/src/epp/request.rs +++ b/src/epp/request.rs @@ -35,7 +35,7 @@ pub enum RequestType { Hello, Command { login: Login, - #[serde(rename = "clTRIDv")] + #[serde(rename = "clTRID")] client_tr_id: StringValue, }, } diff --git a/src/main.rs b/src/main.rs index 1133fa1..3bca744 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ pub mod epp; pub mod error; use std::time::SystemTime; +use tokio::time::{sleep, Duration}; use crate::{epp::request}; #[tokio::main] @@ -16,18 +17,18 @@ async fn main() { Err(e) => panic!("Error: {}", e) }; - let epp_hello = request::Hello::new(); - - client.transact(&epp_hello).await.unwrap(); + // sleep(Duration::from_millis(100000)).await; let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); let cl_trid = format!("eppdev:{}", timestamp.as_secs()); let epp_login = request::Login::new("eppdev", "sh48sja#27*A", &cl_trid); - // let response = epp_login.to_epp_xml().unwrap(); - client.transact(&epp_login).await.unwrap(); + let epp_hello = request::Hello::new(); + + client.transact(&epp_hello).await.unwrap(); + //let response = client.transact(&epp_hello).await.unwrap(); //println!("{}", response);