More work here

This commit is contained in:
Ryan Voots 2020-08-19 22:28:16 -07:00
parent f995412c95
commit 25a7458b51
6 changed files with 97 additions and 49 deletions

View file

@ -1,28 +1,26 @@
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use matrix_sdk::{ use matrix_sdk::{self, api::r0::voip, identifiers::RoomId, Client};
self, use tracing::error;
identifiers::RoomId,
Client,
api::r0::voip,
};
use tracing::{error};
use crate::{comm_types::*}; // get the functions i need from my gstream module use crate::comm_types::*; // get the functions i need from my gstream module
#[derive(Debug)] #[derive(Debug)]
struct GstChannel { struct GstChannel {
sender: Sender<CallManagerToGst>, sender: Sender<CallManagerToGst>,
receiver: Receiver<GstToCallManager>, receiver: Receiver<GstToCallManager>,
} }
#[derive(Debug)] #[derive(Debug)]
struct MatrixChannel { struct MatrixChannel {
sender: Sender<CallManagerToMatrix>, sender: Sender<CallManagerToMatrix>,
receiver: Receiver<MatrixToCallManager>, receiver: Receiver<MatrixToCallManager>,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct CallManager { pub struct CallManager {
botname: String, botname: String,
client: Client, client: Client,
@ -64,10 +62,10 @@ impl CallManager {
} }
pub async fn get_turn_server(&self) -> Result<(), anyhow::Error> { pub async fn get_turn_server(&self) -> Result<(), anyhow::Error> {
let client = &self.client; let client = &self.client;
let request = voip::get_turn_server_info::Request { let request = voip::get_turn_server_info::Request {};
};
let resp = client.send(request).await?; let resp = client.send(request).await?;
@ -95,4 +93,6 @@ impl CallManager {
Ok(()) Ok(())
} }
pub async fn handle_calls(&self) -> Result<(), anyhow::Error> { loop {} }
} }

View file

@ -1,21 +1,21 @@
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct TurnAuth { pub struct TurnAuth {
uris: Vec<String>, pub uris: Vec<String>,
ttl: i32, pub ttl: i32,
username: String, pub username: String,
password: String, pub password: String,
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct CallDescription { pub struct CallDescription {
sdp: String, pub sdp: String,
turn_server: Option<TurnAuth>, pub turn_server: Option<TurnAuth>,
stun_server: Option<String>, pub stun_server: Option<String>,
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub enum MatrixToCallManager { pub enum MatrixToCallManager {
StartActive(String), StartActive(String),
@ -25,13 +25,13 @@ pub enum MatrixToCallManager {
Exit(), Exit(),
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub enum CallManagerToMatrix { pub enum CallManagerToMatrix {
Nop(), // TBD, does this channel need to exist? Nop(), // TBD, does this channel need to exist?
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub enum CallManagerToGst { pub enum CallManagerToGst {
StartActive(CallDescription), StartActive(CallDescription),
@ -39,7 +39,7 @@ pub enum CallManagerToGst {
CloseActive(), CloseActive(),
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub enum GstToCallManager { pub enum GstToCallManager {
IceCandidate(String), IceCandidate(String),

View file

@ -4,7 +4,7 @@ use serde_derive::Deserialize;
use std::{error::Error, fmt, fs::File, io::Read}; use std::{error::Error, fmt, fs::File, io::Read};
use toml; use toml;
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone, Debug)]
pub struct Config { pub struct Config {
pub gstreamer: GstreamerConfig, pub gstreamer: GstreamerConfig,
@ -13,16 +13,17 @@ pub struct Config {
pub security: Option<SecurityConfig>, pub security: Option<SecurityConfig>,
} }
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone, Debug)]
pub struct GstreamerConfig { pub struct GstreamerConfig {
pub active_pipeline: String, pub active_pipeline: String,
pub passive_pipeline: Option<String>, pub passive_pipeline: Option<String>,
pub incoming_pipline: Option<String>, pub incoming_video_pipeline: Option<String>,
pub incoming_audio_pipeline: Option<String>,
pub webrtcbin_name: String, pub webrtcbin_name: String,
} }
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone, Debug)]
pub struct MatrixConfig { pub struct MatrixConfig {
pub homeserver_url: String, pub homeserver_url: String,
@ -32,7 +33,7 @@ pub struct MatrixConfig {
pub password: String, pub password: String,
} }
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone, Debug)]
pub struct SecurityConfig { pub struct SecurityConfig {
pub whitelist_users: Option<Vec<String>>, pub whitelist_users: Option<Vec<String>>,
@ -41,7 +42,7 @@ pub struct SecurityConfig {
pub whitelist_status_servers: Option<Vec<String>>, pub whitelist_status_servers: Option<Vec<String>>,
} }
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone, Debug)]
pub struct TargetConfig { pub struct TargetConfig {
pub username: String, pub username: String,

View file

@ -1,15 +1,17 @@
extern crate gstreamer as gst; extern crate gstreamer as gst;
extern crate gstreamer_webrtc as gst_webrtc; extern crate gstreamer_webrtc as gst_webrtc;
use gst::{ prelude::*}; //gst_element_error, use url::Url;
use crate::{ use gst::{prelude::*, Bin, Element}; //gst_element_error,
comm_types::*,
config, use crate::{comm_types::*, config};
};
use anyhow; use anyhow;
use crossbeam_channel::{self, Receiver, Sender}; use crossbeam_channel::{self, Receiver, Sender};
use std::sync::{Arc, Mutex, Weak};
#[derive(Debug)]
struct GstreamerPipeline { struct GstreamerPipeline {
pub description: String, pub description: String,
// TBD what other stuff goes in here? // TBD what other stuff goes in here?
@ -19,7 +21,13 @@ struct GstreamerPipeline {
// I should use a channel and an async task to work with it since it can be a little weird // I should use a channel and an async task to work with it since it can be a little weird
// and I want it to never be able to block the matrix side of things // and I want it to never be able to block the matrix side of things
// Weak reference to our application state
#[derive(Debug)]
struct GstreamerWeak(Weak<GstreamerModel>);
// Better name TBD // Better name TBD
#[derive(Debug)]
pub struct GstreamerModel { pub struct GstreamerModel {
current_pipeline: Option<GstreamerPipeline>, current_pipeline: Option<GstreamerPipeline>,
incoming_pipeline: Option<GstreamerPipeline>, incoming_pipeline: Option<GstreamerPipeline>,
@ -27,10 +35,18 @@ pub struct GstreamerModel {
gstream_sender: Sender<GstToCallManager>, gstream_sender: Sender<GstToCallManager>,
config: config::GstreamerConfig, config: config::GstreamerConfig,
call_active: bool, call_active: bool,
running_pipeline: Option<gst::Element>,
webrtcbin: Option<gst::Element>,
} }
impl GstreamerModel { impl GstreamerModel {
pub fn new(config: config::Config, gstream_receiver: Receiver<CallManagerToGst>, gstream_sender: Sender<GstToCallManager>) -> Self { pub fn new(
config: config::Config,
gstream_receiver: Receiver<CallManagerToGst>,
gstream_sender: Sender<GstToCallManager>,
) -> Self
{
let gst_config = config.gstreamer; let gst_config = config.gstreamer;
@ -46,6 +62,8 @@ impl GstreamerModel {
gstream_receiver, gstream_receiver,
gstream_sender, gstream_sender,
config: gst_config, config: gst_config,
running_pipeline: None,
webrtcbin: None,
} }
} }
@ -95,12 +113,16 @@ impl GstreamerModel {
/// open_active_pipeline turns off the passive pipeline, and then sets up the active /// open_active_pipeline turns off the passive pipeline, and then sets up the active
/// pipeline. It also then connects the incoming sdp to the incoming pipeline /// pipeline. It also then connects the incoming sdp to the incoming pipeline
pub fn create_active_pipeline(&self, incoming_sdp: Option<String>) -> Result<(), anyhow::Error> { pub fn create_active_pipeline(
&mut self,
if let Some(sdp) = incoming_sdp { incoming_sdp: Option<String>,
turn_server: Option<TurnAuth>,
) -> Result<(), anyhow::Error>
{
/* if let Some(sdp) = incoming_sdp {
&self.create_incoming_pipeline(sdp)?; &self.create_incoming_pipeline(sdp)?;
} }*/
let gst_parsed = gst::parse_launch(&self.config.active_pipeline)?; let gst_parsed = gst::parse_launch(&self.config.active_pipeline)?;
@ -109,21 +131,41 @@ impl GstreamerModel {
if let Ok(pipeline) = pipeline_maybe { if let Ok(pipeline) = pipeline_maybe {
let webrtcbin = pipeline.get_by_name("webrtcbin").expect("can't find webrtcbin"); self.webrtcbin = pipeline.get_by_name("webrtcbin");
//TODO turn stuff, this will fail if we aren't on the same network let webrtcbin = self.webrtcbin.as_ref().unwrap();
// calculate each turn server url, with the username and password
if let Some(turn_server) = turn_server {
// I can only use the first turn-server from matrix, this is because I don't know how to call the
// proper gstreamer api for this
// for base_url in turn_server.uris {
let base_url = &turn_server.uris[0];
let mut turn_server_url = Url::parse(&base_url)?;
turn_server_url.set_username(&turn_server.username).unwrap();
turn_server_url.set_password(Some(&turn_server.password)).unwrap();
webrtcbin.set_property_from_str("turn-server", &turn_server_url.to_string());
// }
//webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
}
//webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
//webrtcbin.set_property_from_str("turn-server", TURN_SERVER);
webrtcbin.set_property_from_str("bundle-policy", "max-bundle"); webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
webrtcbin.connect_pad_added(move |_webrtc, pad| {});
} else { } else {
// TBD better handling // TBD better handling
panic!("Failed to parse pipeline") panic!("Failed to parse pipeline")
} }
//.expect("not a pipeline");
// Get access to the webrtcbin by name // Get access to the webrtcbin by name
Ok(()) Ok(())

View file

@ -77,7 +77,9 @@ async fn main() -> Result<(), anyhow::Error> {
}); });
let cm_future = tokio::spawn(async move { let cm_future = tokio::spawn(async move {
let botname = &botname_ref; let botname = &botname_ref;
//let config = config::load_config(&botname).unwrap(); //let config = config::load_config(&botname).unwrap();
let obj = callmanager::CallManager::new( let obj = callmanager::CallManager::new(
@ -89,10 +91,11 @@ async fn main() -> Result<(), anyhow::Error> {
matrix_cm_receiver, matrix_cm_receiver,
); );
//obj.handle_calls().await? obj.handle_calls().await.unwrap();
}); });
let gst_fut = tokio::spawn(async move { let gst_fut = tokio::spawn(async move {
let config = config::load_config(&botname).unwrap(); let config = config::load_config(&botname).unwrap();
let gstmodel = gstream::GstreamerModel::new(config, cm_gst_receiver, gst_cm_sender); let gstmodel = gstream::GstreamerModel::new(config, cm_gst_receiver, gst_cm_sender);

View file

@ -20,7 +20,7 @@ use matrix_sdk::{
AnySyncRoomEvent::Message, AnySyncRoomEvent::Message,
AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent, AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent,
}, },
identifiers::{RoomId}, identifiers::RoomId,
locks::RwLock, locks::RwLock,
Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings, Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings,
}; };
@ -240,7 +240,9 @@ pub async fn login(botname: &String) -> Result<Client, anyhow::Error> {
let commandbox = Box::new(commandbot); let commandbox = Box::new(commandbot);
client.login(username.clone(), password, None, Some(botname.to_string())).await?; client
.login(username.clone(), password, None, Some(botname.to_string()))
.await?;
println!("logged in as {}", username); println!("logged in as {}", username);