Builds again, and now uses thread-safe mutexes and interior mutability to handle the pipelines

This commit is contained in:
Ryan Voots 2020-08-21 13:05:23 -07:00
parent 25a7458b51
commit f6e422accb
7 changed files with 119 additions and 97 deletions

View file

@ -1,5 +1,5 @@
unstable_features = true
blank_lines_lower_bound = 1
blank_lines_lower_bound = 0
blank_lines_upper_bound = 3
#chain_width = 120
wrap_comments = true

View file

@ -42,7 +42,6 @@ impl CallManager {
matrix_receiver: Receiver<MatrixToCallManager>,
) -> Self
{
Self {
botname: botname.to_string(),
client,
@ -62,7 +61,6 @@ impl CallManager {
}
pub async fn get_turn_server(&self) -> Result<(), anyhow::Error> {
let client = &self.client;
let request = voip::get_turn_server_info::Request {};
@ -75,9 +73,7 @@ impl CallManager {
pub fn can_start_call(&self) -> bool { !&self.call_in_progress }
pub async fn start_incoming_call(&self, offer: String, room_id: RoomId) -> Result<(), anyhow::Error> {
if self.call_in_progress {
error!("Existing call in progress already");
}
@ -85,9 +81,7 @@ impl CallManager {
}
pub async fn start_outgoing_call(&self, room_id: RoomId) -> Result<(), anyhow::Error> {
if self.call_in_progress {
error!("Existing call in progress already");
}

View file

@ -11,14 +11,12 @@ pub struct Channels<L: Send + Clone, R: Send + Clone> {
impl<L: Send + Clone, R: Send + Clone> Channels<L, R> {
pub fn get_pair_left(&self) -> (Sender<L>, Receiver<L>) {
let (sender, receiver) = &self.left;
(sender.clone(), receiver.clone())
}
pub fn get_pair_right(&self) -> (Sender<R>, Receiver<R>) {
let (sender, receiver) = &self.right;
(sender.clone(), receiver.clone())
@ -26,7 +24,6 @@ impl<L: Send + Clone, R: Send + Clone> Channels<L, R> {
}
pub fn create_pair<L: Send + Clone, R: Send + Clone>() -> Channels<L, R> {
let (left_sender, left_receiver) = unbounded::<L>();
let (right_sender, right_receiver) = unbounded::<R>();

View file

@ -56,7 +56,6 @@ pub struct ConfigError {
impl ConfigError {
fn new(msg: &str) -> ConfigError {
ConfigError {
details: msg.to_string(),
}
@ -72,26 +71,22 @@ impl Error for ConfigError {
}
pub fn get_config_directory(botname: &str) -> Result<std::path::PathBuf, ConfigError> {
let home_maybe = dirs::home_dir();
match home_maybe {
Some(mut home) => {
// TODO make this fit LFS setup
home.push(botname);
return Ok(home);
}
None => {
return Err(ConfigError::new("Unable to find home directory"));
}
}
}
pub fn load_config(botname: &str) -> Result<Config, anyhow::Error> {
let mut config_path = get_config_directory(botname)?;
config_path.push("config.toml");
@ -102,7 +97,6 @@ pub fn load_config(botname: &str) -> Result<Config, anyhow::Error> {
match config_filemaybe {
Ok(mut config_file) => {
let mut config_raw = String::new();
config_file.read_to_string(&mut config_raw)?;
@ -112,7 +106,6 @@ pub fn load_config(botname: &str) -> Result<Config, anyhow::Error> {
match config_maybe {
Ok(config) => return Ok(config),
Err(e) => {
let error_string = format!("Failed parsing config file {}", config_filename);
return Err(anyhow::Error::new(e).context(error_string));
@ -120,7 +113,6 @@ pub fn load_config(botname: &str) -> Result<Config, anyhow::Error> {
}
}
Err(e) => {
// I'm sure there's a more idiomatic way to do this
let error_string = format!("Couldn't load config file {}", config_filename);

View file

@ -10,10 +10,32 @@ use anyhow;
use crossbeam_channel::{self, Receiver, Sender};
use std::sync::{Arc, Mutex, Weak};
use gst::gst_element_error;
use std::ops::{Deref, DerefMut};
use std::cell::{RefCell, Ref};
// This whole module borrows heavily from https://github.com/centricular/gstwebrtc-demos/blob/master/sendrecv/gst-rust/src/main.rs
// I've changed it quite a bit still but it's got a lot in common still. TBD licensing
// upgrade weak reference or return
#[macro_export]
macro_rules! upgrade_weak {
($x:ident, $r:expr) => {{
match $x.upgrade() {
Some(o) => o,
None => return $r,
}
}};
($x:ident) => {
upgrade_weak!($x, ())
};
}
#[derive(Debug)]
struct GstreamerPipeline {
pub description: String,
pipeline: gst::Pipeline,
bin: gst::Element,
// TBD what other stuff goes in here?
}
@ -24,51 +46,61 @@ struct GstreamerPipeline {
// Weak reference to our application state
#[derive(Debug)]
struct GstreamerWeak(Weak<GstreamerModel>);
struct GstreamerWeak(Weak<GstreamerInner>);
// Better name TBD
#[derive(Debug)]
pub struct GstreamerModel {
current_pipeline: Option<GstreamerPipeline>,
incoming_pipeline: Option<GstreamerPipeline>,
pub struct GstreamerInner {
current_pipeline: Mutex<RefCell<Option<GstreamerPipeline>>>,
incoming_pipeline: Mutex<RefCell<Option<GstreamerPipeline>>>,
gstream_receiver: Receiver<CallManagerToGst>,
gstream_sender: Sender<GstToCallManager>,
config: config::GstreamerConfig,
call_active: bool,
running_pipeline: Option<gst::Element>,
webrtcbin: Option<gst::Element>,
}
impl GstreamerModel {
#[derive(Debug)]
pub struct Gstreamer(Arc<GstreamerInner>);
impl GstreamerWeak {
// Try upgrading a weak reference to a strong one
fn upgrade(&self) -> Option<Gstreamer> {
self.0.upgrade().map(Gstreamer)
}
}
// To be able to access the App's fields directly
impl std::ops::Deref for Gstreamer {
type Target = GstreamerInner;
fn deref(&self) -> &GstreamerInner { &self.0 }
}
impl Gstreamer {
fn downgrade(&self) -> GstreamerWeak { GstreamerWeak(Arc::downgrade(&self.0)) }
pub fn new(
config: config::Config,
gstream_receiver: Receiver<CallManagerToGst>,
gstream_sender: Sender<GstToCallManager>,
) -> Self
{
let gst_config = config.gstreamer;
// TODO initialize the passive pipeline if it exists
// TODO make this only ever happen once somehow, not a big deal at the moment
gst::init().unwrap();
GstreamerModel {
Gstreamer(Arc::new(GstreamerInner {
call_active: false,
current_pipeline: None,
incoming_pipeline: None,
current_pipeline: Mutex::new(RefCell::new(None)),
incoming_pipeline: Mutex::new(RefCell::new(None)),
gstream_receiver,
gstream_sender,
config: gst_config,
running_pipeline: None,
webrtcbin: None,
}
}))
}
pub fn _foo() -> Result<i32, anyhow::Error> {
// Dummy webrtc pipeline, it sends test sources to the webrtc type
// The video is encoded with vp8 with deadline encoding, and opus for audio
// This is then put into an rtp payload for vp8 and opus (no idea what pt=96/97 are)
@ -87,29 +119,24 @@ impl GstreamerModel {
fn setup_pipeline(&self, pipeline_desc: String) -> Result<(), anyhow::Error> { Ok(()) }
/// get the incoming pipeline from the config, should this do the full setup?
/// return a null pipeline if not found
fn get_incoming_pipeline(&self) -> Result<String, anyhow::Error> { Ok("foo".to_string()) }
/// get the passive pipeline from the config, if it exists
/// if it doesn't exist we don't return anything
fn get_passive_pipeline(&self) -> Result<Option<String>, anyhow::Error> { Ok(None) }
fn create_incoming_pipeline(&self, sdp: String) -> Result<(), anyhow::Error> {
// We have an existing incoming pipeline, kill it
if let Some(_) = &self.incoming_pipeline {
&self.delete_incoming_pipeline()?;
}
// if let Some(_) = &self.incoming_pipeline {
// &self.delete_incoming_pipeline()?;
// }
Ok(())
}
fn delete_incoming_pipeline(&self) -> Result<(), anyhow::Error> { Ok(()) }
/// we got a new ICE candidate, send it along
pub fn on_ice_candidate(&self, index: u32, sdp: String) -> Result<(), anyhow::Error> {
Ok(())
}
/// open_active_pipeline turns off the passive pipeline, and then sets up the active
/// pipeline. It also then connects the incoming sdp to the incoming pipeline
@ -119,7 +146,6 @@ impl GstreamerModel {
turn_server: Option<TurnAuth>,
) -> Result<(), anyhow::Error>
{
/* if let Some(sdp) = incoming_sdp {
&self.create_incoming_pipeline(sdp)?;
}*/
@ -130,14 +156,23 @@ impl GstreamerModel {
let pipeline_maybe = gst_parsed.downcast::<gst::Pipeline>();
if let Ok(pipeline) = pipeline_maybe {
let bin = pipeline.get_by_name("webrtcbin").unwrap();
let pipeline_desc = Some(GstreamerPipeline {description: self.config.active_pipeline.clone(), pipeline, bin});
self.current_pipeline.lock().unwrap().replace(pipeline_desc);
self.webrtcbin = pipeline.get_by_name("webrtcbin");
let webrtcbin = self.webrtcbin.as_ref().unwrap();
// We have to reborrow this because of the move from above
// TODO rewrite this bit once I figure out what's going on
// I'm 100% positive that there's a better way to do this
let tempval = &self.current_pipeline;
let tempval2 = tempval.lock();
let tempval3 = tempval2.unwrap();
let tempval5 = tempval3.borrow();
let tempval4 = tempval5.as_ref().unwrap();
let webrtcbin = &tempval4.bin;
// calculate each turn server url, with the username and password
if let Some(turn_server) = turn_server {
// I can only use the first turn-server from matrix, this is because I don't know how to call the
// proper gstreamer api for this
@ -147,9 +182,7 @@ impl GstreamerModel {
let mut turn_server_url = Url::parse(&base_url)?;
turn_server_url.set_username(&turn_server.username).unwrap();
turn_server_url.set_password(Some(&turn_server.password)).unwrap();
webrtcbin.set_property_from_str("turn-server", &turn_server_url.to_string());
// }
@ -159,9 +192,32 @@ impl GstreamerModel {
webrtcbin.set_property_from_str("bundle-policy", "max-bundle");
// Whenever there is a new ICE candidate, send it to the peer
let app_clone = self.downgrade();
webrtcbin
.connect("on-ice-candidate", false, move |values| {
let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
let mlineindex = values[1].get_some::<u32>().expect("Invalid argument");
let candidate = values[2].get::<String>().expect("Invalid argument").unwrap();
let app = upgrade_weak!(app_clone, None);
if let Err(err) = app.on_ice_candidate(mlineindex, candidate) {
gst_element_error!(
app.current_pipeline.lock().unwrap().borrow().as_ref().unwrap().bin,
gst::LibraryError::Failed,
("Failed to send ICE candidate: {:?}", err)
);
}
None
})
.unwrap();
webrtcbin.connect_pad_added(move |_webrtc, pad| {});
} else {
// TBD better handling
panic!("Failed to parse pipeline")
}
@ -175,3 +231,23 @@ impl GstreamerModel {
pub async fn loop_once(&self) {}
}
// Make sure to shut down the pipeline when it goes out of scope
// to release any system resources
impl Drop for GstreamerInner {
fn drop(&mut self) {
let current_cell = &self.current_pipeline.lock().unwrap();
let incoming_cell = &self.incoming_pipeline.lock().unwrap();
let old_current = current_cell.replace(None);
let old_incoming = incoming_cell.replace(None);
if let Some(pipe_desc) = old_current {
let _ = pipe_desc.bin.set_state(gst::State::Null);
}
if let Some(pipe_desc) = old_incoming {
let _ = pipe_desc.bin.set_state(gst::State::Null);
}
}
}

View file

@ -21,7 +21,6 @@ use tracing_subscriber;
#[tokio::main(core_threads = 4)]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
//let (event_tx, event_rx) = channel::<ClientEventThingy>(100);// setup a channel with 100
@ -34,7 +33,6 @@ async fn main() -> Result<(), anyhow::Error> {
let botname = match env::args().nth(1) {
Some(a) => a,
_ => {
println!(
"Usage: {} <botname>
@ -70,14 +68,12 @@ async fn main() -> Result<(), anyhow::Error> {
let cm_client = matrix_client.clone();
let matrix_fut = tokio::spawn(async move {
matrixbot::sync_and_loop(looping_client, matrix_cm_sender, cm_matrix_receiver)
.await
.unwrap();
});
let cm_future = tokio::spawn(async move {
let botname = &botname_ref;
//let config = config::load_config(&botname).unwrap();
@ -95,13 +91,11 @@ async fn main() -> Result<(), anyhow::Error> {
});
let gst_fut = tokio::spawn(async move {
let config = config::load_config(&botname).unwrap();
let gstmodel = gstream::GstreamerModel::new(config, cm_gst_receiver, gst_cm_sender);
let gstmodel = gstream::Gstreamer::new(config, cm_gst_receiver, gst_cm_sender);
loop {
gstmodel.loop_once().await;
// this should really be doing stuff for gstreamer

View file

@ -46,17 +46,14 @@ impl CommandBot {
pub fn new(client: Client) -> Self { Self { client } }
pub fn say_hello(&self) {
println!("Testing");
}
// TODO figure out a better way to refer to this type
async fn handle_command(&self, room: Arc<RwLock<Room>>, sender: String, message: String) {
println!("<{}> {} ", sender, message);
if message.contains("!party") {
let content = MessageEventContent::Text(TextMessageEventContent {
body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(),
formatted: None,
@ -82,7 +79,6 @@ impl CommandBot {
}
async fn wait_for_confirmation(sas: Sas) {
println!("Emoji: {:?}", sas.emoji());
// TODO make this verify things somehow, I can't do it interactively so I think send them in a
@ -91,7 +87,6 @@ async fn wait_for_confirmation(sas: Sas) {
}
fn print_result(sas: Sas) {
let device = sas.other_device();
println!(
@ -111,7 +106,6 @@ async fn handle_call_invite(
_offer: SessionDescription,
)
{
let myself = client.user_id().await.unwrap();
println!("Incoming call from {}", sender);
@ -125,7 +119,6 @@ async fn handle_call_invite(
//client.share_group_session(room_id);
let encrypted = {
let room = client.get_joined_room(room_id).await;
match room {
@ -147,9 +140,7 @@ async fn create_call_invite(client: &Client, room_id: &RoomId) {
impl EventEmitter for CommandBot {
async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent<MessageEventContent>) {
if let SyncRoom::Joined(room) = room {
println!("Syncroom joined: {:?}", room);
if let SyncMessageEvent {
@ -158,7 +149,6 @@ impl EventEmitter for CommandBot {
..
} = event
{
let user_id = sender.localpart();
let user_server = sender.server_name();
@ -166,12 +156,9 @@ impl EventEmitter for CommandBot {
let myself = self.client.user_id().await.unwrap();
if (myself.localpart() == user_id && myself.server_name() == user_server) {
println!("Saw my own message, ignoring it");
} else {
if (user_server == "matrix.voots.org") {
// TODO make this configurable on which servers
&self
.handle_command(room, user_id.to_string(), msg_body.to_string())
@ -189,16 +176,13 @@ impl EventEmitter for CommandBot {
event: Option<MemberEventContent>,
)
{
println!("STRIPPED: {:?}", event);
if room_member.state_key != self.client.user_id().await.unwrap() {
return;
}
if let SyncRoom::Invited(room) = room {
let room = room.read().await;
println!("Autojoining room {}", room.display_name());
@ -213,7 +197,6 @@ impl EventEmitter for CommandBot {
}
pub async fn login(botname: &String) -> Result<Client, anyhow::Error> {
// the location for `JsonStore` to save files to
// homeserver_url: String, username: String, password: String
//let {homeserver_url: String, username: String, password: String} = config.matrix;
@ -265,7 +248,6 @@ pub async fn sync_and_loop(
receiver: Receiver<CallManagerToMatrix>,
) -> Result<(), anyhow::Error>
{
// since we called sync before we `sync_forever` we must pass that sync token to
// `sync_forever`
let settings = SyncSettings::default().token(client.sync_token().await.unwrap());
@ -274,16 +256,13 @@ pub async fn sync_and_loop(
client
.sync_forever(settings, async move |response| {
let client = &client_ref;
for event in &response.to_device.events {
let e_maybe = event.deserialize();
match e_maybe {
Ok(AnyToDeviceEvent::KeyVerificationStart(e)) => {
let sas = client
.get_verification(&e.content.transaction_id)
.await
@ -293,7 +272,6 @@ pub async fn sync_and_loop(
}
Ok(AnyToDeviceEvent::KeyVerificationKey(e)) => {
let sas = client
.get_verification(&e.content.transaction_id)
.await
@ -303,38 +281,31 @@ pub async fn sync_and_loop(
}
Ok(AnyToDeviceEvent::KeyVerificationMac(e)) => {
let sas = client
.get_verification(&e.content.transaction_id)
.await
.expect("Sas object wasn't created");
if sas.is_done() {
print_result(sas);
}
}
Ok(_) => (), // Unknown or unhandled event
Err(error) => {
eprintln!("Couldn't deserialize to to-device event: {:?} from {:?}", error, event);
}
}
}
for (room_id, room) in response.rooms.join {
for rawevent in &room.timeline.events {
let event = rawevent.deserialize().unwrap();
// Check if it's a message type
if let Message(msg) = event {
match msg {
CallCandidates(_e) => {}
CallInvite(e) => {
println!("incoming debug: {:?}", e);
let SyncMessageEvent {
@ -362,12 +333,10 @@ pub async fn sync_and_loop(
CallHangup(_e) => {}
CallAnswer(_e) => {}
e => {
println!("unhandled debug: {:?}", e);
}
};
} else {
println!("Got unknown event type here: {:?}", event);
}
}