From f995412c952f5695d726abe36874b715fbda600c Mon Sep 17 00:00:00 2001 From: Ryan Voots Date: Wed, 19 Aug 2020 20:30:47 -0700 Subject: [PATCH] builds again, not fully plumbed --- src/callmanager.rs | 93 ++++++++++++++++++++++++++++------------------ src/comm_types.rs | 10 ++++- src/gstream.rs | 10 ++--- src/main.rs | 37 ++++++++++++++---- src/matrixbot.rs | 26 ++++++++----- 5 files changed, 116 insertions(+), 60 deletions(-) diff --git a/src/callmanager.rs b/src/callmanager.rs index 3f6ae5c..4aaf95a 100644 --- a/src/callmanager.rs +++ b/src/callmanager.rs @@ -1,47 +1,30 @@ use crossbeam_channel::{Receiver, Sender}; -use async_trait::async_trait; - -use tokio::time::{Duration, Instant}; -use tracing::{error, warn}; - use matrix_sdk::{ self, - events::{ - call::{invite::InviteEventContent, SessionDescription}, - room::{ - member::MemberEventContent, - message::{MessageEventContent, TextMessageEventContent}, - }, - AnySyncMessageEvent::{CallAnswer, CallCandidates, CallHangup, CallInvite}, - AnySyncRoomEvent::Message, - AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent, - }, - identifiers::{DeviceId, RoomId, UserId}, - locks::RwLock, - Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings, + identifiers::RoomId, + Client, + api::r0::voip, }; +use tracing::{error}; -use std::sync::Arc; -use url::Url; -use futures::join; +use crate::{comm_types::*}; // get the functions i need from my gstream module -use crate::{config, gstream, comm_types::*}; // get the functions i need from my gstream module - -#[derive(Clone)] +#[derive(Debug)] struct GstChannel { sender: Sender, - receiver: Receiver -} - -#[derive(Clone)] -struct MatrixChannel { - sender: Sender, - receiver: Receiver + receiver: Receiver, } #[derive(Debug)] -struct CallManager { +struct MatrixChannel { + sender: Sender, + receiver: Receiver, +} + +#[derive(Debug)] +pub struct CallManager { + botname: String, client: Client, call_in_progress: bool, prescence_state: String, @@ -52,14 +35,51 @@ struct CallManager { } impl CallManager { - pub fn new(client: Client) -> Self { Self {client, call_in_progress: false, prescence_state: "".to_string(), call_id: None, room_id: None}} + pub fn new( + botname: &String, + client: Client, + gst_sender: Sender, + gst_receiver: Receiver, + matrix_sender: Sender, + matrix_receiver: Receiver, + ) -> Self + { - pub fn can_start_call(&self) -> bool { - !&self.call_in_progress + Self { + botname: botname.to_string(), + client, + call_in_progress: false, + prescence_state: "".to_string(), + call_id: None, + room_id: None, + gst_channel: GstChannel { + sender: gst_sender, + receiver: gst_receiver, + }, + matrix_channel: MatrixChannel { + sender: matrix_sender, + receiver: matrix_receiver, + }, + } } + pub async fn get_turn_server(&self) -> Result<(), anyhow::Error> { + let client = &self.client; + + let request = voip::get_turn_server_info::Request { + }; + + let resp = client.send(request).await?; + + Ok(()) + } + + pub fn can_start_call(&self) -> bool { !&self.call_in_progress } + pub async fn start_incoming_call(&self, offer: String, room_id: RoomId) -> Result<(), anyhow::Error> { + if self.call_in_progress { + error!("Existing call in progress already"); } @@ -67,11 +87,12 @@ impl CallManager { } pub async fn start_outgoing_call(&self, room_id: RoomId) -> Result<(), anyhow::Error> { + if self.call_in_progress { + error!("Existing call in progress already"); } Ok(()) } - } diff --git a/src/comm_types.rs b/src/comm_types.rs index d1b2915..4068bd8 100644 --- a/src/comm_types.rs +++ b/src/comm_types.rs @@ -1,12 +1,14 @@ #[derive(Clone)] + pub struct TurnAuth { uris: Vec, ttl: i32, username: String, - password: String + password: String, } #[derive(Clone)] + pub struct CallDescription { sdp: String, turn_server: Option, @@ -14,6 +16,7 @@ pub struct CallDescription { } #[derive(Clone)] + pub enum MatrixToCallManager { StartActive(String), IceCandidate(String), @@ -23,11 +26,13 @@ pub enum MatrixToCallManager { } #[derive(Clone)] + pub enum CallManagerToMatrix { Nop(), // TBD, does this channel need to exist? } #[derive(Clone)] + pub enum CallManagerToGst { StartActive(CallDescription), NewIceCandidate(String), @@ -35,7 +40,8 @@ pub enum CallManagerToGst { } #[derive(Clone)] + pub enum GstToCallManager { IceCandidate(String), CallError(String), // Forces a hangup -} \ No newline at end of file +} diff --git a/src/gstream.rs b/src/gstream.rs index daeed4e..29e6ce6 100644 --- a/src/gstream.rs +++ b/src/gstream.rs @@ -1,10 +1,10 @@ extern crate gstreamer as gst; extern crate gstreamer_webrtc as gst_webrtc; -use gst::{gst_element_error, prelude::*}; +use gst::{ prelude::*}; //gst_element_error, use crate::{ - comm_types::{GstReq, GstResp}, + comm_types::*, config, }; use anyhow; @@ -23,14 +23,14 @@ struct GstreamerPipeline { pub struct GstreamerModel { current_pipeline: Option, incoming_pipeline: Option, - gstream_receiver: Receiver, - gstream_sender: Sender, + gstream_receiver: Receiver, + gstream_sender: Sender, config: config::GstreamerConfig, call_active: bool, } impl GstreamerModel { - pub fn new(config: config::Config, gstream_receiver: Receiver, gstream_sender: Sender) -> Self { + pub fn new(config: config::Config, gstream_receiver: Receiver, gstream_sender: Sender) -> Self { let gst_config = config.gstreamer; diff --git a/src/main.rs b/src/main.rs index 1bbe7d3..fb55ab6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,12 +4,12 @@ use std::{env, process::exit}; // let the 'crate' publically publish these modules +pub mod callmanager; pub mod chans; pub mod comm_types; pub mod config; pub mod gstream; pub mod matrixbot; -pub mod callmanager; use crate::comm_types::*; @@ -48,33 +48,54 @@ async fn main() -> Result<(), anyhow::Error> { } }; + let botname_ref = botname.clone(); + + let cm_gst_channels = chans::create_pair::(); + let cm_matrix_channels = chans::create_pair::(); - + let (cm_gst_sender, cm_gst_receiver) = cm_gst_channels.get_pair_left(); + let (gst_cm_sender, gst_cm_receiver) = cm_gst_channels.get_pair_right(); + let (cm_matrix_sender, cm_matrix_receiver) = cm_matrix_channels.get_pair_left(); + let (matrix_cm_sender, matrix_cm_receiver) = cm_matrix_channels.get_pair_right(); - + let matrix_client = matrixbot::login(&botname).await.unwrap(); + let looping_client = matrix_client.clone(); + let cm_client = matrix_client.clone(); let matrix_fut = tokio::spawn(async move { - matrixbot::sync_and_loop(looping_client, matrix_cm_sender, cm_matrix_reciver).await.unwrap(); + + matrixbot::sync_and_loop(looping_client, matrix_cm_sender, cm_matrix_receiver) + .await + .unwrap(); }); let cm_future = tokio::spawn(async move { - let config = config::load_config(&botname).unwrap(); + let botname = &botname_ref; + //let config = config::load_config(&botname).unwrap(); - let obj = callmanager::new(config, cm_client, cm_gst_sender, gst_cm_receiver, cm_matrix_sender, matrix_cm_receiver); - obj.handle_calls().await? + let obj = callmanager::CallManager::new( + botname, + cm_client, + cm_gst_sender, + gst_cm_receiver, + cm_matrix_sender, + matrix_cm_receiver, + ); + + //obj.handle_calls().await? }); let gst_fut = tokio::spawn(async move { let config = config::load_config(&botname).unwrap(); - let gstmodel = gstream::GstreamerModel::new(config, gst_cm_sender, cm_gst_receiver); + let gstmodel = gstream::GstreamerModel::new(config, cm_gst_receiver, gst_cm_sender); loop { diff --git a/src/matrixbot.rs b/src/matrixbot.rs index 85cfdef..3a0b788 100644 --- a/src/matrixbot.rs +++ b/src/matrixbot.rs @@ -5,8 +5,8 @@ use crossbeam_channel::{Receiver, Sender}; use async_trait::async_trait; -use tokio::time::{Duration, Instant}; -use tracing::{error, warn}; +//use tokio::time::{Duration, Instant}; +//use tracing::{error, warn}; use matrix_sdk::{ self, @@ -20,18 +20,22 @@ use matrix_sdk::{ AnySyncRoomEvent::Message, AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent, }, - identifiers::{DeviceId, RoomId, UserId}, + identifiers::{RoomId}, locks::RwLock, Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings, }; +//use futures::join; use std::sync::Arc; use url::Url; -use futures::join; -use crate::{config, gstream, comm_types::*, comm_types::MatrixToCallManager::*}; // get the functions i need from my gstream module +use crate::{ + comm_types::{MatrixToCallManager::*, *}, + config, +}; // get the functions i need from my gstream module #[derive(Debug)] + struct CommandBot { /// This clone of the `Client` will send requests to the server, /// while the other keeps us in sync with the server using `sync_forever`. @@ -42,6 +46,7 @@ impl CommandBot { pub fn new(client: Client) -> Self { Self { client } } pub fn say_hello(&self) { + println!("Testing"); } @@ -208,7 +213,8 @@ impl EventEmitter for CommandBot { } pub async fn login(botname: &String) -> Result { -// the location for `JsonStore` to save files to + + // the location for `JsonStore` to save files to // homeserver_url: String, username: String, password: String //let {homeserver_url: String, username: String, password: String} = config.matrix; let config: config::Config = config::load_config(&botname)?; @@ -231,11 +237,12 @@ pub async fn login(botname: &String) -> Result { let mut client = Client::new_with_config(homeserver_url, client_config).unwrap(); let commandbot = CommandBot::new(client.clone()); + let commandbox = Box::new(commandbot); - client.login(username.clone(), password, None, Some(botname)).await?; + client.login(username.clone(), password, None, Some(botname.to_string())).await?; + - println!("logged in as {}", username); // An initial sync to set up state and so our bot doesn't respond to old messages. @@ -256,6 +263,7 @@ pub async fn sync_and_loop( receiver: Receiver, ) -> Result<(), anyhow::Error> { + // since we called sync before we `sync_forever` we must pass that sync token to // `sync_forever` let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); @@ -366,4 +374,4 @@ pub async fn sync_and_loop( .await; Ok(()) -} \ No newline at end of file +}