builds again, not fully plumbed

This commit is contained in:
Ryan Voots 2020-08-19 20:30:47 -07:00
parent d5c68258eb
commit f995412c95
5 changed files with 116 additions and 60 deletions

View file

@ -1,47 +1,30 @@
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use async_trait::async_trait;
use tokio::time::{Duration, Instant};
use tracing::{error, warn};
use matrix_sdk::{ use matrix_sdk::{
self, self,
events::{ identifiers::RoomId,
call::{invite::InviteEventContent, SessionDescription}, Client,
room::{ api::r0::voip,
member::MemberEventContent,
message::{MessageEventContent, TextMessageEventContent},
},
AnySyncMessageEvent::{CallAnswer, CallCandidates, CallHangup, CallInvite},
AnySyncRoomEvent::Message,
AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent,
},
identifiers::{DeviceId, RoomId, UserId},
locks::RwLock,
Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings,
}; };
use tracing::{error};
use std::sync::Arc; use crate::{comm_types::*}; // get the functions i need from my gstream module
use url::Url;
use futures::join;
use crate::{config, gstream, comm_types::*}; // get the functions i need from my gstream module #[derive(Debug)]
#[derive(Clone)]
struct GstChannel { struct GstChannel {
sender: Sender<CallManagerToGst>, sender: Sender<CallManagerToGst>,
receiver: Receiver<GstToCallManager> receiver: Receiver<GstToCallManager>,
}
#[derive(Clone)]
struct MatrixChannel {
sender: Sender<CallManagerToMatrix>,
receiver: Receiver<MatrixToCallManager>
} }
#[derive(Debug)] #[derive(Debug)]
struct CallManager { struct MatrixChannel {
sender: Sender<CallManagerToMatrix>,
receiver: Receiver<MatrixToCallManager>,
}
#[derive(Debug)]
pub struct CallManager {
botname: String,
client: Client, client: Client,
call_in_progress: bool, call_in_progress: bool,
prescence_state: String, prescence_state: String,
@ -52,14 +35,51 @@ struct CallManager {
} }
impl CallManager { impl CallManager {
pub fn new(client: Client) -> Self { Self {client, call_in_progress: false, prescence_state: "".to_string(), call_id: None, room_id: None}} pub fn new(
botname: &String,
client: Client,
gst_sender: Sender<CallManagerToGst>,
gst_receiver: Receiver<GstToCallManager>,
matrix_sender: Sender<CallManagerToMatrix>,
matrix_receiver: Receiver<MatrixToCallManager>,
) -> Self
{
pub fn can_start_call(&self) -> bool { Self {
!&self.call_in_progress botname: botname.to_string(),
client,
call_in_progress: false,
prescence_state: "".to_string(),
call_id: None,
room_id: None,
gst_channel: GstChannel {
sender: gst_sender,
receiver: gst_receiver,
},
matrix_channel: MatrixChannel {
sender: matrix_sender,
receiver: matrix_receiver,
},
}
} }
pub async fn get_turn_server(&self) -> Result<(), anyhow::Error> {
let client = &self.client;
let request = voip::get_turn_server_info::Request {
};
let resp = client.send(request).await?;
Ok(())
}
pub fn can_start_call(&self) -> bool { !&self.call_in_progress }
pub async fn start_incoming_call(&self, offer: String, room_id: RoomId) -> Result<(), anyhow::Error> { pub async fn start_incoming_call(&self, offer: String, room_id: RoomId) -> Result<(), anyhow::Error> {
if self.call_in_progress { if self.call_in_progress {
error!("Existing call in progress already"); error!("Existing call in progress already");
} }
@ -67,11 +87,12 @@ impl CallManager {
} }
pub async fn start_outgoing_call(&self, room_id: RoomId) -> Result<(), anyhow::Error> { pub async fn start_outgoing_call(&self, room_id: RoomId) -> Result<(), anyhow::Error> {
if self.call_in_progress { if self.call_in_progress {
error!("Existing call in progress already"); error!("Existing call in progress already");
} }
Ok(()) Ok(())
} }
} }

View file

@ -1,12 +1,14 @@
#[derive(Clone)] #[derive(Clone)]
pub struct TurnAuth { pub struct TurnAuth {
uris: Vec<String>, uris: Vec<String>,
ttl: i32, ttl: i32,
username: String, username: String,
password: String password: String,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct CallDescription { pub struct CallDescription {
sdp: String, sdp: String,
turn_server: Option<TurnAuth>, turn_server: Option<TurnAuth>,
@ -14,6 +16,7 @@ pub struct CallDescription {
} }
#[derive(Clone)] #[derive(Clone)]
pub enum MatrixToCallManager { pub enum MatrixToCallManager {
StartActive(String), StartActive(String),
IceCandidate(String), IceCandidate(String),
@ -23,11 +26,13 @@ pub enum MatrixToCallManager {
} }
#[derive(Clone)] #[derive(Clone)]
pub enum CallManagerToMatrix { pub enum CallManagerToMatrix {
Nop(), // TBD, does this channel need to exist? Nop(), // TBD, does this channel need to exist?
} }
#[derive(Clone)] #[derive(Clone)]
pub enum CallManagerToGst { pub enum CallManagerToGst {
StartActive(CallDescription), StartActive(CallDescription),
NewIceCandidate(String), NewIceCandidate(String),
@ -35,7 +40,8 @@ pub enum CallManagerToGst {
} }
#[derive(Clone)] #[derive(Clone)]
pub enum GstToCallManager { pub enum GstToCallManager {
IceCandidate(String), IceCandidate(String),
CallError(String), // Forces a hangup CallError(String), // Forces a hangup
} }

View file

@ -1,10 +1,10 @@
extern crate gstreamer as gst; extern crate gstreamer as gst;
extern crate gstreamer_webrtc as gst_webrtc; extern crate gstreamer_webrtc as gst_webrtc;
use gst::{gst_element_error, prelude::*}; use gst::{ prelude::*}; //gst_element_error,
use crate::{ use crate::{
comm_types::{GstReq, GstResp}, comm_types::*,
config, config,
}; };
use anyhow; use anyhow;
@ -23,14 +23,14 @@ struct GstreamerPipeline {
pub struct GstreamerModel { pub struct GstreamerModel {
current_pipeline: Option<GstreamerPipeline>, current_pipeline: Option<GstreamerPipeline>,
incoming_pipeline: Option<GstreamerPipeline>, incoming_pipeline: Option<GstreamerPipeline>,
gstream_receiver: Receiver<GstReq>, gstream_receiver: Receiver<CallManagerToGst>,
gstream_sender: Sender<GstResp>, gstream_sender: Sender<GstToCallManager>,
config: config::GstreamerConfig, config: config::GstreamerConfig,
call_active: bool, call_active: bool,
} }
impl GstreamerModel { impl GstreamerModel {
pub fn new(config: config::Config, gstream_receiver: Receiver<GstReq>, gstream_sender: Sender<GstResp>) -> Self { pub fn new(config: config::Config, gstream_receiver: Receiver<CallManagerToGst>, gstream_sender: Sender<GstToCallManager>) -> Self {
let gst_config = config.gstreamer; let gst_config = config.gstreamer;

View file

@ -4,12 +4,12 @@
use std::{env, process::exit}; use std::{env, process::exit};
// let the 'crate' publically publish these modules // let the 'crate' publically publish these modules
pub mod callmanager;
pub mod chans; pub mod chans;
pub mod comm_types; pub mod comm_types;
pub mod config; pub mod config;
pub mod gstream; pub mod gstream;
pub mod matrixbot; pub mod matrixbot;
pub mod callmanager;
use crate::comm_types::*; use crate::comm_types::*;
@ -48,33 +48,54 @@ async fn main() -> Result<(), anyhow::Error> {
} }
}; };
let botname_ref = botname.clone();
let cm_gst_channels = chans::create_pair::<CallManagerToGst, GstToCallManager>(); let cm_gst_channels = chans::create_pair::<CallManagerToGst, GstToCallManager>();
let cm_matrix_channels = chans::create_pair::<CallManagerToMatrix, MatrixToCallManager>(); let cm_matrix_channels = chans::create_pair::<CallManagerToMatrix, MatrixToCallManager>();
let (cm_gst_sender, cm_gst_receiver) = cm_gst_channels.get_pair_left(); let (cm_gst_sender, cm_gst_receiver) = cm_gst_channels.get_pair_left();
let (gst_cm_sender, gst_cm_receiver) = cm_gst_channels.get_pair_right(); let (gst_cm_sender, gst_cm_receiver) = cm_gst_channels.get_pair_right();
let (cm_matrix_sender, cm_matrix_receiver) = cm_matrix_channels.get_pair_left(); let (cm_matrix_sender, cm_matrix_receiver) = cm_matrix_channels.get_pair_left();
let (matrix_cm_sender, matrix_cm_receiver) = cm_matrix_channels.get_pair_right(); let (matrix_cm_sender, matrix_cm_receiver) = cm_matrix_channels.get_pair_right();
let matrix_client = matrixbot::login(&botname).await.unwrap(); let matrix_client = matrixbot::login(&botname).await.unwrap();
let looping_client = matrix_client.clone(); let looping_client = matrix_client.clone();
let cm_client = matrix_client.clone(); let cm_client = matrix_client.clone();
let matrix_fut = tokio::spawn(async move { let matrix_fut = tokio::spawn(async move {
matrixbot::sync_and_loop(looping_client, matrix_cm_sender, cm_matrix_reciver).await.unwrap();
matrixbot::sync_and_loop(looping_client, matrix_cm_sender, cm_matrix_receiver)
.await
.unwrap();
}); });
let cm_future = tokio::spawn(async move { let cm_future = tokio::spawn(async move {
let config = config::load_config(&botname).unwrap(); let botname = &botname_ref;
//let config = config::load_config(&botname).unwrap();
let obj = callmanager::new(config, cm_client, cm_gst_sender, gst_cm_receiver, cm_matrix_sender, matrix_cm_receiver); let obj = callmanager::CallManager::new(
obj.handle_calls().await? botname,
cm_client,
cm_gst_sender,
gst_cm_receiver,
cm_matrix_sender,
matrix_cm_receiver,
);
//obj.handle_calls().await?
}); });
let gst_fut = tokio::spawn(async move { let gst_fut = tokio::spawn(async move {
let config = config::load_config(&botname).unwrap(); let config = config::load_config(&botname).unwrap();
let gstmodel = gstream::GstreamerModel::new(config, gst_cm_sender, cm_gst_receiver); let gstmodel = gstream::GstreamerModel::new(config, cm_gst_receiver, gst_cm_sender);
loop { loop {

View file

@ -5,8 +5,8 @@ use crossbeam_channel::{Receiver, Sender};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::time::{Duration, Instant}; //use tokio::time::{Duration, Instant};
use tracing::{error, warn}; //use tracing::{error, warn};
use matrix_sdk::{ use matrix_sdk::{
self, self,
@ -20,18 +20,22 @@ use matrix_sdk::{
AnySyncRoomEvent::Message, AnySyncRoomEvent::Message,
AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent, AnyToDeviceEvent, StrippedStateEvent, SyncMessageEvent,
}, },
identifiers::{DeviceId, RoomId, UserId}, identifiers::{RoomId},
locks::RwLock, locks::RwLock,
Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings, Client, ClientConfig, EventEmitter, JsonStore, Room, Sas, SyncRoom, SyncSettings,
}; };
//use futures::join;
use std::sync::Arc; use std::sync::Arc;
use url::Url; use url::Url;
use futures::join;
use crate::{config, gstream, comm_types::*, comm_types::MatrixToCallManager::*}; // get the functions i need from my gstream module use crate::{
comm_types::{MatrixToCallManager::*, *},
config,
}; // get the functions i need from my gstream module
#[derive(Debug)] #[derive(Debug)]
struct CommandBot { struct CommandBot {
/// This clone of the `Client` will send requests to the server, /// This clone of the `Client` will send requests to the server,
/// while the other keeps us in sync with the server using `sync_forever`. /// while the other keeps us in sync with the server using `sync_forever`.
@ -42,6 +46,7 @@ impl CommandBot {
pub fn new(client: Client) -> Self { Self { client } } pub fn new(client: Client) -> Self { Self { client } }
pub fn say_hello(&self) { pub fn say_hello(&self) {
println!("Testing"); println!("Testing");
} }
@ -208,7 +213,8 @@ impl EventEmitter for CommandBot {
} }
pub async fn login(botname: &String) -> Result<Client, anyhow::Error> { pub async fn login(botname: &String) -> Result<Client, anyhow::Error> {
// the location for `JsonStore` to save files to
// the location for `JsonStore` to save files to
// homeserver_url: String, username: String, password: String // homeserver_url: String, username: String, password: String
//let {homeserver_url: String, username: String, password: String} = config.matrix; //let {homeserver_url: String, username: String, password: String} = config.matrix;
let config: config::Config = config::load_config(&botname)?; let config: config::Config = config::load_config(&botname)?;
@ -231,11 +237,12 @@ pub async fn login(botname: &String) -> Result<Client, anyhow::Error> {
let mut client = Client::new_with_config(homeserver_url, client_config).unwrap(); let mut client = Client::new_with_config(homeserver_url, client_config).unwrap();
let commandbot = CommandBot::new(client.clone()); let commandbot = CommandBot::new(client.clone());
let commandbox = Box::new(commandbot); let commandbox = Box::new(commandbot);
client.login(username.clone(), password, None, Some(botname)).await?; client.login(username.clone(), password, None, Some(botname.to_string())).await?;
println!("logged in as {}", username); println!("logged in as {}", username);
// An initial sync to set up state and so our bot doesn't respond to old messages. // An initial sync to set up state and so our bot doesn't respond to old messages.
@ -256,6 +263,7 @@ pub async fn sync_and_loop(
receiver: Receiver<CallManagerToMatrix>, receiver: Receiver<CallManagerToMatrix>,
) -> Result<(), anyhow::Error> ) -> Result<(), anyhow::Error>
{ {
// since we called sync before we `sync_forever` we must pass that sync token to // since we called sync before we `sync_forever` we must pass that sync token to
// `sync_forever` // `sync_forever`
let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); let settings = SyncSettings::default().token(client.sync_token().await.unwrap());
@ -366,4 +374,4 @@ pub async fn sync_and_loop(
.await; .await;
Ok(()) Ok(())
} }