diff --git a/src/callmanager.rs b/src/callmanager.rs index 4aaf95a..b44b3df 100644 --- a/src/callmanager.rs +++ b/src/callmanager.rs @@ -1,28 +1,26 @@ use crossbeam_channel::{Receiver, Sender}; -use matrix_sdk::{ - self, - identifiers::RoomId, - Client, - api::r0::voip, -}; -use tracing::{error}; +use matrix_sdk::{self, api::r0::voip, identifiers::RoomId, Client}; +use tracing::error; -use crate::{comm_types::*}; // get the functions i need from my gstream module +use crate::comm_types::*; // get the functions i need from my gstream module #[derive(Debug)] + struct GstChannel { sender: Sender, receiver: Receiver, } #[derive(Debug)] + struct MatrixChannel { sender: Sender, receiver: Receiver, } #[derive(Debug)] + pub struct CallManager { botname: String, client: Client, @@ -64,10 +62,10 @@ impl CallManager { } pub async fn get_turn_server(&self) -> Result<(), anyhow::Error> { + let client = &self.client; - let request = voip::get_turn_server_info::Request { - }; + let request = voip::get_turn_server_info::Request {}; let resp = client.send(request).await?; @@ -95,4 +93,6 @@ impl CallManager { Ok(()) } + + pub async fn handle_calls(&self) -> Result<(), anyhow::Error> { loop {} } } diff --git a/src/comm_types.rs b/src/comm_types.rs index 4068bd8..7fa1a37 100644 --- a/src/comm_types.rs +++ b/src/comm_types.rs @@ -1,21 +1,21 @@ -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TurnAuth { - uris: Vec, - ttl: i32, - username: String, - password: String, + pub uris: Vec, + pub ttl: i32, + pub username: String, + pub password: String, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct CallDescription { - sdp: String, - turn_server: Option, - stun_server: Option, + pub sdp: String, + pub turn_server: Option, + pub stun_server: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum MatrixToCallManager { StartActive(String), @@ -25,13 +25,13 @@ pub enum MatrixToCallManager { Exit(), } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum CallManagerToMatrix { Nop(), // TBD, does this channel need to exist? } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum CallManagerToGst { StartActive(CallDescription), @@ -39,7 +39,7 @@ pub enum CallManagerToGst { CloseActive(), } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum GstToCallManager { IceCandidate(String), diff --git a/src/config.rs b/src/config.rs index 88c47d2..b072f48 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,7 +4,7 @@ use serde_derive::Deserialize; use std::{error::Error, fmt, fs::File, io::Read}; use toml; -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] pub struct Config { pub gstreamer: GstreamerConfig, @@ -13,16 +13,17 @@ pub struct Config { pub security: Option, } -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] pub struct GstreamerConfig { pub active_pipeline: String, pub passive_pipeline: Option, - pub incoming_pipline: Option, + pub incoming_video_pipeline: Option, + pub incoming_audio_pipeline: Option, pub webrtcbin_name: String, } -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] pub struct MatrixConfig { pub homeserver_url: String, @@ -32,7 +33,7 @@ pub struct MatrixConfig { pub password: String, } -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] pub struct SecurityConfig { pub whitelist_users: Option>, @@ -41,7 +42,7 @@ pub struct SecurityConfig { pub whitelist_status_servers: Option>, } -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] pub struct TargetConfig { pub username: String, diff --git a/src/gstream.rs b/src/gstream.rs index 29e6ce6..1e0a182 100644 --- a/src/gstream.rs +++ b/src/gstream.rs @@ -1,15 +1,17 @@ extern crate gstreamer as gst; extern crate gstreamer_webrtc as gst_webrtc; -use gst::{ prelude::*}; //gst_element_error, +use url::Url; -use crate::{ - comm_types::*, - config, -}; +use gst::{prelude::*, Bin, Element}; //gst_element_error, + +use crate::{comm_types::*, config}; use anyhow; use crossbeam_channel::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex, Weak}; + +#[derive(Debug)] struct GstreamerPipeline { pub description: String, // TBD what other stuff goes in here? @@ -19,7 +21,13 @@ struct GstreamerPipeline { // I should use a channel and an async task to work with it since it can be a little weird // and I want it to never be able to block the matrix side of things + +// Weak reference to our application state +#[derive(Debug)] +struct GstreamerWeak(Weak); + // Better name TBD +#[derive(Debug)] pub struct GstreamerModel { current_pipeline: Option, incoming_pipeline: Option, @@ -27,10 +35,18 @@ pub struct GstreamerModel { gstream_sender: Sender, config: config::GstreamerConfig, call_active: bool, + + running_pipeline: Option, + webrtcbin: Option, } impl GstreamerModel { - pub fn new(config: config::Config, gstream_receiver: Receiver, gstream_sender: Sender) -> Self { + pub fn new( + config: config::Config, + gstream_receiver: Receiver, + gstream_sender: Sender, + ) -> Self + { let gst_config = config.gstreamer; @@ -46,6 +62,8 @@ impl GstreamerModel { gstream_receiver, gstream_sender, config: gst_config, + running_pipeline: None, + webrtcbin: None, } } @@ -95,12 +113,16 @@ impl GstreamerModel { /// open_active_pipeline turns off the passive pipeline, and then sets up the active /// pipeline. It also then connects the incoming sdp to the incoming pipeline - pub fn create_active_pipeline(&self, incoming_sdp: Option) -> Result<(), anyhow::Error> { - - if let Some(sdp) = incoming_sdp { + pub fn create_active_pipeline( + &mut self, + incoming_sdp: Option, + turn_server: Option, + ) -> Result<(), anyhow::Error> + { + /* if let Some(sdp) = incoming_sdp { &self.create_incoming_pipeline(sdp)?; - } + }*/ let gst_parsed = gst::parse_launch(&self.config.active_pipeline)?; @@ -109,21 +131,41 @@ impl GstreamerModel { if let Ok(pipeline) = pipeline_maybe { - let webrtcbin = pipeline.get_by_name("webrtcbin").expect("can't find webrtcbin"); + self.webrtcbin = pipeline.get_by_name("webrtcbin"); - //TODO turn stuff, this will fail if we aren't on the same network + let webrtcbin = self.webrtcbin.as_ref().unwrap(); + + // calculate each turn server url, with the username and password + if let Some(turn_server) = turn_server { + + // I can only use the first turn-server from matrix, this is because I don't know how to call the + // proper gstreamer api for this + + // for base_url in turn_server.uris { + let base_url = &turn_server.uris[0]; + + let mut turn_server_url = Url::parse(&base_url)?; + + turn_server_url.set_username(&turn_server.username).unwrap(); + + turn_server_url.set_password(Some(&turn_server.password)).unwrap(); + + webrtcbin.set_property_from_str("turn-server", &turn_server_url.to_string()); + + // } + + //webrtcbin.set_property_from_str("stun-server", STUN_SERVER); + } - //webrtcbin.set_property_from_str("stun-server", STUN_SERVER); - //webrtcbin.set_property_from_str("turn-server", TURN_SERVER); webrtcbin.set_property_from_str("bundle-policy", "max-bundle"); + + webrtcbin.connect_pad_added(move |_webrtc, pad| {}); } else { // TBD better handling panic!("Failed to parse pipeline") } - //.expect("not a pipeline"); - // Get access to the webrtcbin by name Ok(()) diff --git a/src/main.rs b/src/main.rs index fb55ab6..ef023ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,7 +77,9 @@ async fn main() -> Result<(), anyhow::Error> { }); let cm_future = tokio::spawn(async move { + let botname = &botname_ref; + //let config = config::load_config(&botname).unwrap(); let obj = callmanager::CallManager::new( @@ -89,10 +91,11 @@ async fn main() -> Result<(), anyhow::Error> { matrix_cm_receiver, ); - //obj.handle_calls().await? + obj.handle_calls().await.unwrap(); }); let gst_fut = tokio::spawn(async move { + let config = config::load_config(&botname).unwrap(); let gstmodel = gstream::GstreamerModel::new(config, cm_gst_receiver, gst_cm_sender); diff --git a/src/matrixbot.rs b/src/matrixbot.rs index 3a0b788..3c588f0 100644 --- a/src/matrixbot.rs +++ b/src/matrixbot.rs @@ -20,7 +20,7 @@ use matrix_sdk::{ AnySyncRoomEvent::Message, AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent, }, - identifiers::{RoomId}, + identifiers::RoomId, locks::RwLock, Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings, }; @@ -31,7 +31,7 @@ use url::Url; use crate::{ comm_types::{MatrixToCallManager::*, *}, - config, + config, }; // get the functions i need from my gstream module #[derive(Debug)] @@ -240,7 +240,9 @@ pub async fn login(botname: &String) -> Result { let commandbox = Box::new(commandbot); - client.login(username.clone(), password, None, Some(botname.to_string())).await?; + client + .login(username.clone(), password, None, Some(botname.to_string())) + .await?; println!("logged in as {}", username);