From cb50318a5f00a7019f0ad1eaf278e0654783201b Mon Sep 17 00:00:00 2001 From: TheLie0 Date: Mon, 20 Apr 2020 15:29:24 +0200 Subject: [PATCH] Got closer to completion and split up the files. --- rust/Cargo.lock | 27 ++++--- rust/Cargo.toml | 3 +- rust/src/lib.rs | 135 +++++++++------------------------ rust/src/netman.rs | 185 +++++++++++++++++++++++++++++++++++++++++++++ rust/src/socks.rs | 11 ++- 5 files changed, 247 insertions(+), 114 deletions(-) create mode 100644 rust/src/netman.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 886980a..ff03834 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -17,24 +17,30 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.67" +version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb147597cdf94ed43ab7a9038716637d2d1bf2bc571da995d0028dec06bd3018" +checksum = "dea0c0405123bba743ee3f91f49b1c7cfb684eef0da0a50110f758ccf24cdff0" [[package]] name = "proc-macro2" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435" +checksum = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" dependencies = [ "unicode-xid", ] [[package]] -name = "quote" -version = "1.0.2" +name = "protobuf" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe" +checksum = "12b869b619141004a542c9e585b40c1432e7b0d67a826f68cb5fa4c1ee9aff56" + +[[package]] +name = "quote" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f" dependencies = [ "proc-macro2", ] @@ -69,9 +75,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "syn" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859" +checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03" dependencies = [ "proc-macro2", "quote", @@ -79,10 +85,11 @@ dependencies = [ ] [[package]] -name = "tf2p" +name = "tf2p-network" version = "0.1.0" dependencies = [ "flume", + "protobuf", "sm-ext", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a1a25ac..a596456 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "tf2p" +name = "tf2p-network" version = "0.1.0" authors = ["TheLie0 "] edition = "2018" @@ -10,3 +10,4 @@ crate-type = ["cdylib"] [dependencies] sm-ext = "0.3.0" flume = "0.5.1" +protobuf = "2.12.0" diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 46c0b0c..9ff2bcb 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -1,19 +1,23 @@ -use sm_ext::{cell_t, register_natives, IExtension, IExtensionInterface, IShareSys, SMExtension, TryIntoPlugin, HandleError, HandleId, HandleType}; +use sm_ext::{cell_t, native, register_natives, IExtension, IExtensionInterface, IShareSys, SMExtension, TryIntoPlugin, HandleError, IHandleSys, HandleType, SMInterfaceApi, IPluginContext, forwards, IForwardManager, ExecType}; use std::thread; use std::net::{SocketAddr, UdpSocket}; -use std::error::Error; -use std::collections::VecDeque; use std::cell::RefCell; -// use std::rc::Rc; Using Rc like it's in the example threw an Error pub mod test; -pub mod socks; -use sm_ext::{native, IPluginContext}; +mod socks; +mod netman; + use std::ffi::CStr; use flume; +#[forwards] +struct Tf2pNetworkingForwards { + #[global_forward("OnConnectionRequest", ExecType::Single)] + on_con_req: fn(ip: &CStr) -> i32, +} + /// The network plugin entrypoint. #[native] fn start_manager(_ctx: &IPluginContext, ip: &CStr, port1: i32) -> NMSender { @@ -48,103 +52,14 @@ fn start_manager(_ctx: &IPluginContext, ip: &CStr, port1: i32) -> NMSender { // Spawn manager thread thread::spawn(move || { - manager(rx_c, rx_s2m, tx_m2s, serv).unwrap(); + netman::manager(rx_s2m, tx_m2s, serv).unwrap(); }); NMSender::new(tx_c) } -/// The network manager function. -/// It handles the incoming requests and sends the corresponding answers -fn manager<'a> (controller: flume::Receiver, input: flume::Receiver<(Vec, SocketAddr)>, output: flume::Sender<(Vec, SocketAddr)>, serv_addr: SocketAddr) -> Result<(), Box> { - - let mut cl_ip: Option = None; - let mut addr_queue = VecDeque::new(); - - let conns: [Option; 3]; - - // This is the String "Source Engine Query" in binary form - let cliet_query = vec!(0xff, 0xff, 0xff, 0xff, 0x54, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x20, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x20, 0x51, 0x75, 0x65, 0x72, 0x79, 0x00); - - 'main: loop { - - // Get game info for cache - output.send((cliet_query.clone(), serv_addr)).unwrap(); - let game_info = match input.recv() { - Ok((buf, src)) => { - let search = serv_addr.port().to_le_bytes(); - let replace = (27419 as u16).to_le_bytes(); - - if src == serv_addr { - let mut buf = buf; - for i in 0..buf.len()-1 { - if buf[i..i+2] == search { - buf[i] = replace[0]; - buf[i+1] = replace[1]; - } - } - buf - } else { - panic!("Expected query answer from server, got message from {:?}", src); - } - } - Err(e) => panic!(e), - }; - - // Send cached game info on query and pass any other request to server - 'inner1: loop { - - //check for controller input - //TODO: rewrite this so it doesn't block - match controller.recv() { - Ok(comm) => { - match comm { - NMCommand::Kill => break 'main, - _ => (), - } - } - Err(e) => panic!(e), - } - - match input.recv() { - Ok((buf, source)) => { - if source == serv_addr { - match addr_queue.pop_front() { - Some(ip) => { - output.send((buf.clone(), ip)).unwrap(); - } - None => println!("No clients in queue."), - } - }else { - if buf[0 .. cliet_query.len()] == cliet_query[..] { - output.send((game_info.clone(), source)).unwrap(); - }else { - output.send((buf.clone(), serv_addr)).unwrap(); - addr_queue.push_back(source); - } - } - } - Err(e) => panic!(e), - } - } - - //TODO: Get game info again - - //TODO: Actual p2p communication - } - - Ok(()) - -} - -enum NMCommand{ - Connect(u8, SocketAddr), - Kill -} - - -struct NMSender(flume::Sender); +struct NMSender(flume::Sender); impl<'ctx> TryIntoPlugin<'ctx> for NMSender { type Error = HandleError; @@ -159,7 +74,7 @@ impl<'ctx> TryIntoPlugin<'ctx> for NMSender { impl NMSender { - fn new(s: flume::Sender) -> Self { + fn new(s: flume::Sender) -> Self { Self(s) } @@ -174,7 +89,12 @@ pub struct Tf2pNetworking{ impl Tf2pNetworking { fn get() -> &'static Self { - EXTENSION_GLOBAL.with(|ext| unsafe { &(*ext.borrow().unwrap()).delegate }) + EXTENSION_GLOBAL.with(|ext| unsafe { + &(*(match *ext.borrow(){ + Some(e) => e, + None => panic!("Couldn't find myself"), + })).delegate + }) } fn handle_type() -> &'static HandleType> { @@ -185,7 +105,17 @@ impl Tf2pNetworking { impl IExtensionInterface for Tf2pNetworking { fn on_extension_load(&mut self, myself: IExtension, sys: IShareSys, late: bool) -> Result<(), Box> { - println!(">>> TF2P loaded! me = {:?}, sys = {:?}, late = {:?}", myself, sys, late); + println!(">>> TF2P loaded! me = {:?}, sys = {:?}, late = {:?}", myself, sys, late); + + let handlesys: IHandleSys = sys.request_interface(&myself)?; + println!(">>> Got interface: {:?} v{:?}", handlesys.get_interface_name(), handlesys.get_interface_version()); + + self.handle_type = Some(handlesys.create_type("NMSender", myself.get_identity())?); + + let forward_manager: IForwardManager = sys.request_interface(&myself)?; + println!(">>> Got interface: {:?} v{:?}", forward_manager.get_interface_name(), forward_manager.get_interface_version()); + + Tf2pNetworkingForwards::register(&forward_manager)?; register_natives!( &sys, @@ -197,4 +127,9 @@ impl IExtensionInterface for Tf2pNetworking { Ok(()) } + + fn on_extension_unload(&mut self) { + Tf2pNetworkingForwards::unregister(); + } + } diff --git a/rust/src/netman.rs b/rust/src/netman.rs new file mode 100644 index 0000000..ed655aa --- /dev/null +++ b/rust/src/netman.rs @@ -0,0 +1,185 @@ +use std::net::SocketAddr; +use std::error::Error; +use std::collections::VecDeque; + +use std::ffi::CStr; + +use flume; + +pub enum NMMessage{ + Connect(u8, SocketAddr), + UDPMessage(Vec, SocketAddr), + Kill +} + + +fn check_if_in_peer_ips (cl_ip: SocketAddr, peer_ips: &[Option]) -> bool { + let mut answer = false; + for ip_opt in peer_ips { + match ip_opt { + Some(ip) => { + if ip.ip() == cl_ip.ip() { + answer == true; + } + + } + _ => () + } + } + return answer; +} + + +/// The network manager function. +/// It handles the incoming requests and sends the corresponding answers +pub fn manager<'a> (input: flume::Receiver, output: flume::Sender<(Vec, SocketAddr)>, serv_addr: SocketAddr) -> Result<(), Box> { + + let mut cl_ip: Option = None; + let mut addr_queue = VecDeque::new(); + + let mut conns = [None, None, None]; + + // This is the String "Source Engine Query" in binary form + let cliet_query = vec!(0xff, 0xff, 0xff, 0xff, 0x54, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x20, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x20, 0x51, 0x75, 0x65, 0x72, 0x79, 0x00); + + 'main: loop { + + // Get game info for cache + let mut game_info = None; + let search = serv_addr.port().to_le_bytes(); + let replace = (27419 as u16).to_le_bytes(); + output.send((cliet_query.clone(), serv_addr)).unwrap(); + while game_info == None { + match input.recv() { + Ok(m) => { + match m { + NMMessage::UDPMessage(buf, src) => { + if src == serv_addr { + let buf = &mut *buf.clone(); + for i in 0..buf.len()-1{ + if buf[i..i+2] == search { + println!("Port replaced."); + buf[i] = replace[0]; + buf[i+1] = replace[1]; + } + } + game_info = Some(Vec::from(buf)); + } else { + panic!("Expected query answer from server, got message from {:?}", src); + } + } + NMMessage::Kill => break 'main, + _ => () + } + } + Err(e) => panic!(e), + } + }; + + // Send cached game info on query and pass any other request to server + // Break when player data has been received + let mut player_data = [None, None]; + 'inner1: loop { + + match input.recv() { + Ok(m) => { + match m { + NMMessage::UDPMessage(buf, src) => { + if src == serv_addr { + match addr_queue.pop_front() { + Some(ip) => { + output.send((buf.clone(), ip)).unwrap(); + } + None => println!("No clients in queue."), + } + }else { + if buf.len() <= cliet_query.len() && buf[..] == cliet_query[..buf.len()] || + buf.len() > cliet_query.len() && buf[..cliet_query.len()] == cliet_query[..] { + output.send((game_info.clone().unwrap(), src)).unwrap(); + }else if buf.len() >= 256 && buf[0..6] == [0xff, 0xff, 0xff, 0xff, 0x6b, 0x18] { + cl_ip = Some(src); + player_data[0] = Some(buf); + output.send((player_data[0].clone().unwrap(), serv_addr)).unwrap(); + println!("Got player Data pt 1."); + + 'even_more_inner: loop { + match input.recv() { + Ok(m) => { + match m { + NMMessage::UDPMessage(buf, src) => { + if src == serv_addr { + output.send((buf.clone(), cl_ip.unwrap())).unwrap(); + } else if src == cl_ip.unwrap() { + if buf.len() >= 256 { + player_data[1] = Some(buf); + output.send((player_data[1].clone().unwrap(), serv_addr)).unwrap(); + println!("Got player Data pt 2."); + break 'even_more_inner; + } + } + }, + NMMessage::Connect(i, addr) => (), + NMMessage::Kill => break 'main, + } + }, + Err(e) => panic!(e), + } + } + break 'inner1; + }else { + output.send((buf.clone(), serv_addr)).unwrap(); + addr_queue.push_back(src); + } + } + }, + NMMessage::Connect(i, addr) => (), + NMMessage::Kill => break 'main, + } + }, + Err(e) => panic!(e), + } + } + + 'inner2: loop { + + match input.recv() { + Ok(m) => { + match m { + NMMessage::UDPMessage(buf, src) => { + if src == serv_addr { // from server? + output.send((buf, cl_ip.unwrap())); // send to client + } else if check_if_in_peer_ips(src, &conns) { // from peer? + // handle request + // /send to cliet port (callback) + } else { // else + if buf == "tf2p?".as_bytes() {// handshake + output.send((Vec::from("Hey Ho!".as_bytes()), src)); + } + else if buf == "Connect?".as_bytes() { + match super::Tf2pNetworkingForwards::on_con_req(|fwd| fwd.execute(CStr::from_bytes_with_nul(src.ip()?.to_string()?[..])?))? { // Callback? + Ok(1) => { + // Add to List + // Send Ack and Ports + }, + _ => (), + } + } + } + }, + NMMessage::Connect(i, addr) => { + // ask for handshawe + // do it + // add to list + }, + NMMessage::Kill => break 'main, + } + } + Err(e) => panic!(e), + } + + } + } + + Ok(()) + +} diff --git a/rust/src/socks.rs b/rust/src/socks.rs index 3f111fe..7fe2f86 100644 --- a/rust/src/socks.rs +++ b/rust/src/socks.rs @@ -1,6 +1,8 @@ use std::net::{UdpSocket, SocketAddr}; use std::error::Error; +use crate::NMMessage; + use flume; /// Sends buffer messages from a mpsc channel to an udp socket. @@ -16,7 +18,7 @@ pub fn chan_to_usock(input: flume::Receiver<(Vec, SocketAddr)>, output: UdpS } /// Sends buffer messages from an udp socket to a mpsc channel. -pub fn usock_to_chan(input: UdpSocket, output: flume::Sender<(Vec, SocketAddr)>) -> Result<(), Box> { +pub fn usock_to_chan(input: UdpSocket, output: flume::Sender) -> Result<(), Box> { let mut buf = [0; 2048]; let mut tup; @@ -25,9 +27,12 @@ pub fn usock_to_chan(input: UdpSocket, output: flume::Sender<(Vec, SocketAdd { tup = input.recv_from(&mut buf).unwrap(); } - match output.send((buf[0..tup.0 + 1].to_vec(), tup.1)) { + match output.send(NMMessage::UDPMessage(buf[0..tup.0 + 1].to_vec(), tup.1)) { Ok(s) => s, - Err(e) => panic!(e) + Err(e) => { + println!("Whoops"); + panic!(e) + } } } }