fixed message length bug

This commit is contained in:
Ritesh Chitlangi 2021-07-20 00:52:02 +08:00
parent f503132d85
commit 543378cb52
4 changed files with 62 additions and 23 deletions

View File

@ -6,6 +6,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
bytes = "1"
confy = "0.4" confy = "0.4"
futures = "0.3" futures = "0.3"
lazy_static = "1.4" lazy_static = "1.4"

View File

@ -1,20 +1,27 @@
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc; use std::sync::mpsc;
use std::{str, u32}; use std::{str, u32};
use bytes::BytesMut;
use std::convert::TryInto; use std::convert::TryInto;
use futures::executor::block_on; use futures::executor::block_on;
use std::{error::Error, net::ToSocketAddrs, io as stdio}; use std::{error::Error, net::ToSocketAddrs, io as stdio};
use tokio_rustls::{TlsConnector, rustls::ClientConfig, webpki::DNSNameRef, client::TlsStream}; use tokio_rustls::{TlsConnector, rustls::ClientConfig, webpki::DNSNameRef, client::TlsStream};
use tokio::{net::TcpStream, io::AsyncWriteExt, io::AsyncReadExt}; use tokio::{net::TcpStream, io::AsyncWriteExt, io::AsyncReadExt, io::split, io::ReadHalf, io::WriteHalf};
use crate::config::{CONFIG, EppClientConnection}; use crate::config::{CONFIG, EppClientConnection};
use tokio::time::{sleep, Duration};
use crate::error; use crate::error;
use crate::epp::request::EppRequest; use crate::epp::request::EppRequest;
struct ConnectionStream {
reader: ReadHalf<TlsStream<TcpStream>>,
writer: WriteHalf<TlsStream<TcpStream>>,
}
struct EppConnection { struct EppConnection {
registry: String, registry: String,
credentials: (String, String), credentials: (String, String),
stream: TlsStream<TcpStream>, stream: ConnectionStream,
pub greeting: String, pub greeting: String,
} }
@ -26,9 +33,9 @@ impl EppConnection {
pub async fn new( pub async fn new(
registry: String, registry: String,
credentials: (String, String), credentials: (String, String),
mut stream: TlsStream<TcpStream>) -> Result<EppConnection, Box<dyn Error>> { mut stream: ConnectionStream) -> Result<EppConnection, Box<dyn Error>> {
let mut buf = vec![0u8; 4096]; let mut buf = vec![0u8; 4096];
stream.read(&mut buf).await?; stream.reader.read(&mut buf).await?;
let greeting = str::from_utf8(&buf)?.to_string(); let greeting = str::from_utf8(&buf)?.to_string();
Ok(EppConnection { Ok(EppConnection {
@ -40,7 +47,10 @@ impl EppConnection {
} }
async fn write(&mut self, buf: &Vec<u8>) -> Result<(), Box<dyn Error>> { async fn write(&mut self, buf: &Vec<u8>) -> Result<(), Box<dyn Error>> {
self.stream.write_all(buf).await?; let wrote = self.stream.writer.write(buf).await?;
println!("Wrote {} bytes", wrote);
Ok(()) Ok(())
} }
@ -50,37 +60,60 @@ impl EppConnection {
let buf_size = len + 4; let buf_size = len + 4;
let mut buf: Vec<u8> = vec![0u8; buf_size]; let mut buf: Vec<u8> = vec![0u8; buf_size];
let len = len + 4;
let len_u32: [u8; 4] = u32::to_be_bytes(len.try_into()?); let len_u32: [u8; 4] = u32::to_be_bytes(len.try_into()?);
buf[..4].clone_from_slice(&len_u32); buf[..4].clone_from_slice(&len_u32);
buf[4..].clone_from_slice(&content.as_bytes()); buf[4..].clone_from_slice(&content.as_bytes());
let conv = str::from_utf8(&buf[4..])?;
println!("reconverted: {}", conv);
self.write(&buf).await self.write(&buf).await
} }
async fn read(&mut self) -> Result<Vec<u8>, Box<dyn Error>> { async fn read(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
let mut buf = vec![0u8; 4096]; let mut buf = vec![0u8; 4096];
self.stream.read(&mut buf).await?; self.stream.reader.read(&mut buf).await?;
Ok(buf) Ok(buf)
} }
async fn read_epp_response(&mut self) -> Result<Vec<u8>, Box<dyn Error>> { async fn read_epp_response(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
let mut buf = [0u8; 4]; let mut buf = [0u8; 4];
self.stream.read_exact(&mut buf).await?; self.stream.reader.read_exact(&mut buf).await?;
let buf_size :usize = u32::from_be_bytes(buf).try_into()?; let buf_size :usize = u32::from_be_bytes(buf).try_into()?;
println!("Response buffer size: {}", buf_size); let message_size = buf_size - 4;
println!("Message buffer size: {}", message_size);
let mut buf = vec![0u8; buf_size - 4]; let mut buf = BytesMut::with_capacity(4096);
let mut read_buf = vec![0u8; 4096];
self.stream.read(&mut buf).await?; let mut read_size :usize = 0;
Ok(buf) loop {
let read = self.stream.reader.read(&mut read_buf).await?;
println!("Read: {} bytes", read);
buf.extend_from_slice(&read_buf[0..read]);
read_size = read_size + read;
println!("Total read: {} bytes", read_size);
if read == 0 {
panic!("Unexpected eof")
} else if read_size >= message_size {
break;
}
}
let data = buf.to_vec();
Ok(data)
} }
pub async fn get_epp_response(&mut self) -> Result<String, Box<dyn Error>> { pub async fn get_epp_response(&mut self) -> Result<String, Box<dyn Error>> {
let contents = self.read().await?; let contents = self.read_epp_response().await?;
let response = str::from_utf8(&contents)?.to_string(); let response = str::from_utf8(&contents)?.to_string();
@ -88,16 +121,15 @@ impl EppConnection {
} }
pub async fn transact(&mut self, content: &str) -> Result<String, Box<dyn Error>> { pub async fn transact(&mut self, content: &str) -> Result<String, Box<dyn Error>> {
let content = format!("{}\r\n\r\n", content);
self.send_epp_request(&content).await?; self.send_epp_request(&content).await?;
self.get_epp_response().await self.get_epp_response().await
} }
async fn close(&mut self) -> Result<(), Box<dyn Error>> { async fn close(&mut self) -> Result<(), Box<dyn Error>> {
println!("Closing ..."); println!("Closing ...");
self.stream.shutdown().await?; self.stream.writer.shutdown().await?;
Ok(()) Ok(())
} }
} }
@ -129,7 +161,7 @@ impl EppClient {
} }
} }
async fn epp_connect(registry_creds: &EppClientConnection) -> Result<TlsStream<TcpStream>, error::Error> { async fn epp_connect(registry_creds: &EppClientConnection) -> Result<ConnectionStream, error::Error> {
let (host, port) = registry_creds.connection_details(); let (host, port) = registry_creds.connection_details();
println!("{}: {}", host, port); println!("{}: {}", host, port);
@ -153,7 +185,12 @@ async fn epp_connect(registry_creds: &EppClientConnection) -> Result<TlsStream<T
let stream = connector.connect(domain, stream).await?; let stream = connector.connect(domain, stream).await?;
Ok(stream) let (reader, writer) = split(stream);
Ok(ConnectionStream {
reader: reader,
writer: writer,
})
} }
pub async fn connect(registry: &'static str) -> Result<EppClient, Box<dyn Error>> { pub async fn connect(registry: &'static str) -> Result<EppClient, Box<dyn Error>> {

View File

@ -35,7 +35,7 @@ pub enum RequestType {
Hello, Hello,
Command { Command {
login: Login, login: Login,
#[serde(rename = "clTRIDv")] #[serde(rename = "clTRID")]
client_tr_id: StringValue, client_tr_id: StringValue,
}, },
} }

View File

@ -4,6 +4,7 @@ pub mod epp;
pub mod error; pub mod error;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use crate::{epp::request}; use crate::{epp::request};
#[tokio::main] #[tokio::main]
@ -16,18 +17,18 @@ async fn main() {
Err(e) => panic!("Error: {}", e) Err(e) => panic!("Error: {}", e)
}; };
let epp_hello = request::Hello::new(); // sleep(Duration::from_millis(100000)).await;
client.transact(&epp_hello).await.unwrap();
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let cl_trid = format!("eppdev:{}", timestamp.as_secs()); let cl_trid = format!("eppdev:{}", timestamp.as_secs());
let epp_login = request::Login::new("eppdev", "sh48sja#27*A", &cl_trid); let epp_login = request::Login::new("eppdev", "sh48sja#27*A", &cl_trid);
// let response = epp_login.to_epp_xml().unwrap();
client.transact(&epp_login).await.unwrap(); client.transact(&epp_login).await.unwrap();
let epp_hello = request::Hello::new();
client.transact(&epp_hello).await.unwrap();
//let response = client.transact(&epp_hello).await.unwrap(); //let response = client.transact(&epp_hello).await.unwrap();
//println!("{}", response); //println!("{}", response);