Rust Learning from Zero (20) —— Sobani Tracker in Rust

A few weeks ago, Ayaka and me started a project named Sobani, a simple cross platform UDP-based P2P audio sharing application for couples, friends, coworkers and etc. Sobani also has a public tracker server for storing all its peers' connection info.

Well, given that the last time that I wrote any Rust code was quite a while ago, I decided to rewrite the Sobani tracker in Rust. The Rust version of Sobani tracker is on BlueCocoa/sobani-tracker-rust.

In brief, this time I'd like to write this Rust project with async-std, and the tracker service will be encapsulated in SobaniTracker class. The SobaniTracker will be initialised with an IP:Port string as the listening endpoint for its UDP socket. Let's also add pretty_env_logger for beautiful logging.

Without further ado, the main.rs is shown below, which should be rather straightforward.

extern crate pretty_env_logger;
#[macro_use] extern crate log;

mod server;
use server::SobaniTracker;

#[async_std::main]
async fn main() -> std::io::Result<()> {
    pretty_env_logger::init();
    let mut server = SobaniTracker::config("0.0.0.0:3000");
    server.start().await?;
    Ok(())
}

As for the SobaniTracker (in server.rs), it should hold its listening endpoint, e.g. 0.0.0.0:3000, the internal UDP socket and all clients' connection info. And because the server will handle multiple clients in parallel, I added Mutex on clients so that the connection data can be consistent.

use async_std::net::UdpSocket;
use serde_json::json;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::sync::Mutex;
use sha2::{Sha256, Digest};

/// Sobani Tracker
///
/// This struct can be created by the [`server::SobaniTracker::config`] function. See its
/// documentation for more.
pub struct SobaniTracker {
    /// listening endpoint, `IP:Port` format
    bind: String,

    /// internal UDP socket
    socket: Option<UdpSocket>,

    /// bi-directional client mapping
    /// - maps from `IP:Port` to `ShareID`
    /// - maps `ShareID` to `IP:Port`
    clients: Mutex<HashMap<String, String>>
}

And the API of SobaniTracker goes below

impl SobaniTracker {
    /// Constructs a new `SobaniTracker` with listening endpoint at `bind`.
    ///
    /// # Example
    ///
    /// ```
    /// use server::SobaniTracker;
    ///
    /// let mut server = SobaniTracker::config("0.0.0.0:3000");
    /// ```
    pub fn config(bind: &str) -> SobaniTracker { /* */ }
    
    /// Starts the `SobaniTracker` server
    /// This function won't return
    ///
    /// # Exmaple
    /// 
    /// ```
    /// use server::SobaniTracker;
    ///
    /// let mut server = SobaniTracker::config("0.0.0.0:3000");
    /// server.start().await?;
    /// ```
    pub async fn start(&mut self) -> std::io::Result<()> { /* */ }
    
    /// Handles announce message from remote client
    /// This function will generate a `shareId` for the remote client
    async fn announce_handler(&self, remote: SocketAddr) -> Option<serde_json::Value> { /* */ }
    
    /// Handles push message from remote client
    /// The push message will be sent if a client (the requestor) wants to connect to
    /// another client (requestee) by requestee's shareId
    /// This function looks up the requested peer by `shareId` that sent in the push message
    /// And if both requestor and requestee have announced themselves on this tracker
    /// A pushed message containing the requestee's connection info will be sent back to the requestor
    /// An income message containing the requestor's connection info will be sent to the requestee
    async fn push_handler(&self, msg: serde_json::Value, remote: SocketAddr) -> Option<serde_json::Value> { /* */ }
}

The SobaniTracker::config function will simply create the struct,

pub fn config(bind: &str) -> SobaniTracker {
    SobaniTracker {
        bind: String::from(bind),
        socket: None,
        clients: Mutex::new(HashMap::new())
    }
}

As for the SobaniTracker::start function, it starts the service, begins to receive UDP message from remote client and handles messages correspondingly.

pub async fn start(&mut self) -> std::io::Result<()> {
    // try to bind UDP socket at given IP address and port
    self.socket = Some(UdpSocket::bind(&self.bind).await?);
    // get an unwrapped reference of the UDP socket for simplicity
    let unwrapped_socket = self.socket.as_ref().unwrap();
    info!("Sobani tracker listening on {}", unwrapped_socket.local_addr()?);

    // according to RFC, the maximum UDP message size is 65507
    // thus we add one more byte to the buffer
    let mut buf = vec![0u8; 65508];
    loop {
        // keep trying to receive UDP message from remote client
        let (recv, peer) = unwrapped_socket.recv_from(&mut buf).await?;
        let msg = Vec::from(&buf[..recv]);

        // try to parse the received message as JSON string
        match serde_json::from_slice::<serde_json::Value>(&msg) {
            // if error, warn and ignore
            Err(e) => warn!("Cannot parse msg from {}: {}", peer, e),
            // otherwise we will get the parsed JSON object
            Ok(parsed) => {
                // our protocol requires that every message should have an `action` field
                // so let's check that requirement
                match parsed.get("action") {
                    // if `action` not exists
                    // warn and ignore
                    None => warn!("Cannot understand msg from {}", peer),
                    // otherwise
                    Some(action) => {
                        // also we should check whether if the value of `action` field is string type
                        if action.is_string() {
                            // if it is a string, we should check what exact action it is
                            let action = action.as_str().unwrap();
                            match action {
                                // announce message
                                "announce" => {
                                    // announce message only contains `action` field
                                    // so we just pass the `peer` connection info to its handler
                                    // if the handler returns some response
                                    if let Some(resp) = self.announce_handler(peer).await {
                                        // then send the response back to `peer`
                                        debug!("[{}] {}", peer, resp.to_string());
                                        let _ = unwrapped_socket.send_to(resp.to_string().as_bytes(), peer).await;
                                    }
                                },
                                // push message
                                "push" => {
                                    // push message contains more fields
                                    // but I decided to check the integrity in corresponding handler
                                    // if the handler returns some response
                                    if let Some(resp) = self.push_handler(parsed, peer).await {
                                        // then send the response back to `peer`
                                        debug!("[{}] {}", peer, resp.to_string());
                                        let _ = unwrapped_socket.send_to(resp.to_string().as_bytes(), peer).await;
                                    }
                                },
                                // alive message
                                // the purpose of alive message is to keep the connection alive
                                "alive" => (),
                                // unknown message
                                // for other unknown action, warn and ignore
                                _ => {
                                    warn!("not supported such action: {}", action);
                                }
                            };
                        }
                    }
                }
            }
        };
    }
    #[allow(unreachable_code)]
    Ok(())
}

Now we can write the handler for announce message. It should generate a shareId for the remote client, and then write the bi-directional mapping, i.e., IP:Port to shareId and shareId to IP:Port, into self.clients. Finally, the answering JSON message will be returned to SobaniTracker::start and sent back to the remote client.

async fn announce_handler(&self, remote: SocketAddr) -> Option<serde_json::Value> {
    info!("[announce] from {}", remote);
    // convert remote SocketAddr to string
    // e.g. "1.2.3.4:23333"
    let remote = remote.to_string();

    // compute SHA256 of "1.2.3.4:23333"
    let mut hasher = Sha256::new();
    hasher.input(remote.as_bytes());
    let digest = hasher.result();
    // and take the most significant 32bits of the digest
    // encode to hex format as this remote client's `shareId`
    let share_id = hex::encode(&digest[..4]);

    {
        // lock `self.clients` for data consistency
        let mut locked_clients = self.clients.lock().unwrap();
        // write mapping from `IP:Port` to `shareId`
        locked_clients.insert(remote.clone(), share_id.clone());
        // write mapping from `shareId` to `IP:Port`
        locked_clients.insert(share_id.clone(), remote.clone());
    }

    // returns announced message
    Some(json!({
        "action": "announced",
        "data": {
            "shareId": share_id
        }
    }))
}

And finally the push message handler. The push message will be sent if a client (the requestor) wants to connect to another client (requestee) by requestee's shareId. This function looks up the requested peer by shareId that sent in the push message.

And if both requestor and requestee have announced themselves on this tracker, a pushed message containing the requestee's connection info will be sent back to the requestor, as well as an income message containing the requestor's connection info will be sent to the requestee, so that they can begin the UDP hole punching progress.

async fn push_handler(&self, msg: serde_json::Value, remote: SocketAddr) -> Option<serde_json::Value> {
    info!("[push] from {}", remote);
    // convert remote SocketAddr to string
    // e.g. "1.2.3.4:23333"
    let remote_addr_string = remote.to_string();
    let requestor_addr = &remote_addr_string[..];

    // the push message shall contain a field called `shareId`
    match msg.get("shareId") {
        // if it does appear
        Some(requestee_share_id) => {
            // then check whether if its value is a string
            if requestee_share_id.is_string() {
                // if yes, take the value of `shareId`
                let requestee_share_id = requestee_share_id.as_str().unwrap();
                // lock `self.clients`
                let locked_clients = self.clients.lock().unwrap();
                // check whether if the requestor has been announced on this tracker
                if let Some(requestor_share_id) = locked_clients.get(requestor_addr) {
                    // check whether if the `shareId` appears in `self.clients`
                    // i.e., whether if the requested peer (requestee) is on this tracker
                    if let Some(requestee_addr) = locked_clients.get(requestee_share_id) {
                        // if both requestor and requestee are on this tracker
                        // send pushed message containing the requestee's info to the requestor
                        let pushed_message = json!({
                            "action": "pushed",
                            "data": {
                                "peeraddr": requestee_addr,
                                "peerShareId": requestee_share_id
                            }
                        }).to_string();
                        let _ = self.socket.as_ref().unwrap().send_to(pushed_message.as_bytes(), remote).await;

                        // and send income message containing the requestor's info to the requestor
                        let income_addr: SocketAddr = requestee_addr.parse().unwrap();
                        let income_message = json!({
                            "action": "income",
                            "data": {
                                "peeraddr": requestor_addr,
                                "peerShareId": requestor_share_id
                            }
                        }).to_string();
                        let _ = self.socket.as_ref().unwrap().send_to(income_message.as_bytes(), income_addr).await;
                    } else {
                        warn!("Cannot find requestee {}", requestee_share_id);
                    }
                } else {
                    warn!("Requestor not registered yet")
                }
            } else {
                warn!("`shareId` field in request from {} is not a string", remote);
            }
        },
        None => warn!("Cannot find `shareId` field for request from {}", remote),
    };
    None
}

Build and run, all tests passed~

声明: 本文为 Cocoa 原创, 转载注明出处喵~

Leave a Reply

Your email address will not be published. Required fields are marked *