4 Commits

Author SHA1 Message Date
Eric Zhang
b0bfd52707 Bump version to 0.2.1 2022-04-09 02:55:52 -04:00
Eric Zhang
526d02d789 Add integration tests for auth and end-to-end proxying (#4)
* Add authentication handshake tests

* Add basic proxy test

* Add mismatched secret failure test

* Add a failure test for invalid addresses
2022-04-09 02:54:52 -04:00
Orhun Parmaksız
23db4047ff Add LICENSE (#3) 2022-04-08 16:42:58 -04:00
Eric Zhang
2d0dcf9889 Improve stability by exiting immediately on common errors (#2)
* Kill connections immediately on missing or close

* Add timeout to initial protocol messages

* Add low-level tracing for JSON messages

* Add timeout to initial TCP connections
2022-04-08 15:55:54 -04:00
10 changed files with 245 additions and 33 deletions

32
Cargo.lock generated
View File

@@ -84,13 +84,15 @@ dependencies = [
[[package]]
name = "bore-cli"
version = "0.2.0"
version = "0.2.1"
dependencies = [
"anyhow",
"clap",
"dashmap",
"hex",
"hmac",
"lazy_static",
"rstest",
"serde",
"serde_json",
"sha2",
@@ -461,12 +463,34 @@ dependencies = [
"bitflags",
]
[[package]]
name = "rstest"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d912f35156a3f99a66ee3e11ac2e0b3f34ac85a07e05263d05a7e2c8810d616f"
dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"rustc_version",
"syn",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.9"
@@ -479,6 +503,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "semver"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4"
[[package]]
name = "serde"
version = "1.0.136"

View File

@@ -1,6 +1,6 @@
[package]
name = "bore-cli"
version = "0.2.0"
version = "0.2.1"
authors = ["Eric Zhang <ekzhang1@gmail.com>"]
license = "MIT"
description = "A modern, simple TCP tunnel in Rust that exposes local ports to a remote server, bypassing standard NAT connection firewalls."
@@ -28,3 +28,7 @@ tokio = { version = "1.17.0", features = ["full"] }
tracing = "0.1.32"
tracing-subscriber = "0.3.10"
uuid = { version = "0.8.2", features = ["serde", "v4"] }
[dev-dependencies]
lazy_static = "1.4.0"
rstest = "0.12.0"

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 Eric Zhang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -38,7 +38,7 @@ You can optionally pass in a `--port` option to pick a specific port on the remo
The full options are shown below.
```shell
bore-local 0.2.0
bore-local 0.2.1
Starts a local proxy to the remote server
USAGE:
@@ -68,7 +68,7 @@ That's all it takes! After the server starts running at a given address, you can
The full options for the `bore server` command are shown below.
```shell
bore-server 0.2.0
bore-server 0.2.1
Runs the remote proxy server
USAGE:

View File

@@ -6,7 +6,7 @@ use sha2::{Digest, Sha256};
use tokio::io::{AsyncBufRead, AsyncWrite};
use uuid::Uuid;
use crate::shared::{recv_json, send_json, ClientMessage, ServerMessage};
use crate::shared::{recv_json_timeout, send_json, ClientMessage, ServerMessage};
/// Wrapper around a MAC used for authenticating clients that have a secret.
pub struct Authenticator(Hmac<Sha256>);
@@ -54,7 +54,7 @@ impl Authenticator {
) -> Result<()> {
let challenge = Uuid::new_v4();
send_json(stream, ServerMessage::Challenge(challenge)).await?;
match recv_json(stream, &mut Vec::new()).await? {
match recv_json_timeout(stream).await? {
Some(ClientMessage::Authenticate(tag)) => {
ensure!(self.validate(&challenge, &tag), "invalid secret");
Ok(())
@@ -68,7 +68,7 @@ impl Authenticator {
&self,
stream: &mut (impl AsyncBufRead + AsyncWrite + Unpin),
) -> Result<()> {
let challenge = match recv_json(stream, &mut Vec::new()).await? {
let challenge = match recv_json_timeout(stream).await? {
Some(ServerMessage::Challenge(challenge)) => challenge,
_ => bail!("expected authentication challenge, but no secret was required"),
};

View File

@@ -3,12 +3,15 @@
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use tokio::{io::BufReader, net::TcpStream};
use tokio::{io::BufReader, net::TcpStream, time::timeout};
use tracing::{error, info, info_span, warn, Instrument};
use uuid::Uuid;
use crate::auth::Authenticator;
use crate::shared::{proxy, recv_json, send_json, ClientMessage, ServerMessage, CONTROL_PORT};
use crate::shared::{
proxy, recv_json, recv_json_timeout, send_json, ClientMessage, ServerMessage, CONTROL_PORT,
NETWORK_TIMEOUT,
};
/// State structure for the client.
pub struct Client {
@@ -31,10 +34,7 @@ pub struct Client {
impl Client {
/// Create a new client.
pub async fn new(local_port: u16, to: &str, port: u16, secret: Option<&str>) -> Result<Self> {
let stream = TcpStream::connect((to, CONTROL_PORT))
.await
.with_context(|| format!("could not connect to {to}:{CONTROL_PORT}"))?;
let mut stream = BufReader::new(stream);
let mut stream = BufReader::new(connect_with_timeout(to, CONTROL_PORT).await?);
let auth = secret.map(Authenticator::new);
if let Some(auth) = &auth {
@@ -42,7 +42,7 @@ impl Client {
}
send_json(&mut stream, ClientMessage::Hello(port)).await?;
let remote_port = match recv_json(&mut stream, &mut Vec::new()).await? {
let remote_port = match recv_json_timeout(&mut stream).await? {
Some(ServerMessage::Hello(remote_port)) => remote_port,
Some(ServerMessage::Error(message)) => bail!("server error: {message}"),
Some(ServerMessage::Challenge(_)) => {
@@ -99,21 +99,23 @@ impl Client {
}
async fn handle_connection(&self, id: Uuid) -> Result<()> {
let local_conn = TcpStream::connect(("localhost", self.local_port))
.await
.context("failed TCP connection to local port")?;
let mut remote_conn = BufReader::new(
TcpStream::connect((&self.to[..], CONTROL_PORT))
.await
.context("failed TCP connection to remote port")?,
);
let mut remote_conn =
BufReader::new(connect_with_timeout(&self.to[..], CONTROL_PORT).await?);
if let Some(auth) = &self.auth {
auth.client_handshake(&mut remote_conn).await?;
}
send_json(&mut remote_conn, ClientMessage::Accept(id)).await?;
let local_conn = connect_with_timeout("localhost", self.local_port).await?;
proxy(local_conn, remote_conn).await?;
Ok(())
}
}
async fn connect_with_timeout(to: &str, port: u16) -> Result<TcpStream> {
match timeout(NETWORK_TIMEOUT, TcpStream::connect((to, port))).await {
Ok(res) => res,
Err(err) => Err(err.into()),
}
.with_context(|| format!("could not connect to {to}:{port}"))
}

View File

@@ -13,7 +13,9 @@ use tracing::{info, info_span, warn, Instrument};
use uuid::Uuid;
use crate::auth::Authenticator;
use crate::shared::{proxy, recv_json, send_json, ClientMessage, ServerMessage, CONTROL_PORT};
use crate::shared::{
proxy, recv_json_timeout, send_json, ClientMessage, ServerMessage, CONTROL_PORT,
};
/// State structure for the server.
pub struct Server {
@@ -71,10 +73,7 @@ impl Server {
}
}
let mut buf = Vec::new();
let msg = recv_json(&mut stream, &mut buf).await?;
match msg {
match recv_json_timeout(&mut stream).await? {
Some(ClientMessage::Authenticate(_)) => {
warn!("unexpected authenticate");
Ok(())

View File

@@ -1,14 +1,21 @@
//! Shared data structures, utilities, and protocol definitions.
use std::time::Duration;
use anyhow::{Context, Result};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::time::timeout;
use tracing::trace;
use uuid::Uuid;
/// TCP port used for control connections with the server.
pub const CONTROL_PORT: u16 = 7835;
/// Timeout for network connections and initial protocol messages.
pub const NETWORK_TIMEOUT: Duration = Duration::from_secs(3);
/// A message from the client on the control connection.
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientMessage {
@@ -49,10 +56,10 @@ where
{
let (mut s1_read, mut s1_write) = io::split(stream1);
let (mut s2_read, mut s2_write) = io::split(stream2);
tokio::try_join!(
io::copy(&mut s1_read, &mut s2_write),
io::copy(&mut s2_read, &mut s1_write),
)?;
tokio::select! {
res = io::copy(&mut s1_read, &mut s2_write) => res,
res = io::copy(&mut s2_read, &mut s1_write) => res,
}?;
Ok(())
}
@@ -61,6 +68,7 @@ pub async fn recv_json<T: DeserializeOwned>(
reader: &mut (impl AsyncBufRead + Unpin),
buf: &mut Vec<u8>,
) -> Result<Option<T>> {
trace!("waiting to receive json message");
buf.clear();
reader.read_until(0, buf).await?;
if buf.is_empty() {
@@ -72,8 +80,21 @@ pub async fn recv_json<T: DeserializeOwned>(
Ok(serde_json::from_slice(buf).context("failed to parse JSON")?)
}
/// Read the next null-delimited JSON instruction, with a default timeout.
///
/// This is useful for parsing the initial message of a stream for handshake or
/// other protocol purposes, where we do not want to wait indefinitely.
pub async fn recv_json_timeout<T: DeserializeOwned>(
reader: &mut (impl AsyncBufRead + Unpin),
) -> Result<Option<T>> {
timeout(NETWORK_TIMEOUT, recv_json(reader, &mut Vec::new()))
.await
.context("timed out waiting for initial message")?
}
/// Send a null-terminated JSON instruction on a stream.
pub async fn send_json<T: Serialize>(writer: &mut (impl AsyncWrite + Unpin), msg: T) -> Result<()> {
trace!("sending json message");
let msg = serde_json::to_vec(&msg)?;
writer.write_all(&msg).await?;
writer.write_all(&[0]).await?;

35
tests/auth_test.rs Normal file
View File

@@ -0,0 +1,35 @@
use anyhow::Result;
use bore_cli::auth::Authenticator;
use tokio::io::{self, BufReader};
#[tokio::test]
async fn auth_handshake() -> Result<()> {
let auth = Authenticator::new("some secret string");
let (client, server) = io::duplex(8); // Ensure correctness with limited capacity.
let mut client = BufReader::new(client);
let mut server = BufReader::new(server);
tokio::try_join!(
auth.client_handshake(&mut client),
auth.server_handshake(&mut server),
)?;
Ok(())
}
#[tokio::test]
async fn auth_handshake_fail() {
let auth = Authenticator::new("client secret");
let auth2 = Authenticator::new("different server secret");
let (client, server) = io::duplex(8); // Ensure correctness with limited capacity.
let mut client = BufReader::new(client);
let mut server = BufReader::new(server);
let result = tokio::try_join!(
auth.client_handshake(&mut client),
auth2.server_handshake(&mut server),
);
assert!(result.is_err());
}

100
tests/e2e_test.rs Normal file
View File

@@ -0,0 +1,100 @@
use std::net::SocketAddr;
use std::time::Duration;
use anyhow::{anyhow, Result};
use bore_cli::{client::Client, server::Server};
use lazy_static::lazy_static;
use rstest::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio::time;
lazy_static! {
/// Guard to make sure that tests are run serially, not concurrently.
static ref SERIAL_GUARD: Mutex<()> = Mutex::new(());
}
/// Spawn the server, giving some time for the control port TcpListener to start.
async fn spawn_server(secret: Option<&str>) {
tokio::spawn(Server::new(1024, secret).listen());
time::sleep(Duration::from_millis(50)).await;
}
/// Spawns a client with randomly assigned ports, returning the listener and remote address.
async fn spawn_client(secret: Option<&str>) -> Result<(TcpListener, SocketAddr)> {
let listener = TcpListener::bind("localhost:0").await?;
let client = Client::new(listener.local_addr()?.port(), "localhost", 0, secret).await?;
let remote_addr = ([0, 0, 0, 0], client.remote_port()).into();
tokio::spawn(client.listen());
Ok((listener, remote_addr))
}
#[rstest]
#[tokio::test]
async fn basic_proxy(#[values(None, Some(""), Some("abc"))] secret: Option<&str>) -> Result<()> {
let _guard = SERIAL_GUARD.lock().await;
spawn_server(secret).await;
let (listener, addr) = spawn_client(secret).await?;
tokio::spawn(async move {
let (mut stream, _) = listener.accept().await?;
let mut buf = [0u8; 11];
stream.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello world");
stream.write_all(b"I can send a message too!").await?;
anyhow::Ok(())
});
let mut stream = TcpStream::connect(addr).await?;
stream.write_all(b"hello world").await?;
let mut buf = [0u8; 25];
stream.read_exact(&mut buf).await?;
assert_eq!(&buf, b"I can send a message too!");
// Ensure that the client end of the stream is closed now.
assert_eq!(stream.read(&mut buf).await?, 0);
// Also ensure that additional connections do not produce any data.
let mut stream = TcpStream::connect(addr).await?;
assert_eq!(stream.read(&mut buf).await?, 0);
Ok(())
}
#[rstest]
#[case(None, Some("my secret"))]
#[case(Some("my secret"), None)]
#[tokio::test]
async fn mismatched_secret(
#[case] server_secret: Option<&str>,
#[case] client_secret: Option<&str>,
) {
let _guard = SERIAL_GUARD.lock().await;
spawn_server(server_secret).await;
assert!(spawn_client(client_secret).await.is_err());
}
#[tokio::test]
async fn invalid_address() -> Result<()> {
// We don't need the serial guard for this test because it doesn't create a server.
async fn check_address(to: &str, use_secret: bool) -> Result<()> {
match Client::new(5000, to, 0, use_secret.then(|| "a secret")).await {
Ok(_) => Err(anyhow!("expected error for {to}, use_secret={use_secret}")),
Err(_) => Ok(()),
}
}
tokio::try_join!(
check_address("google.com", false),
check_address("google.com", true),
check_address("nonexistent.domain.for.demonstration", false),
check_address("nonexistent.domain.for.demonstration", true),
check_address("malformed !$uri$%", false),
check_address("malformed !$uri$%", true),
)?;
Ok(())
}