Two threads!

This commit is contained in:
Ryan Voots 2020-08-16 20:00:37 -07:00
parent 1d946a0fd3
commit 4df0637a7b
7 changed files with 132 additions and 15 deletions

View file

@ -18,7 +18,7 @@ dirs = "3.0.1"
async-trait = "0.1.37"
tracing-subscriber = "0.2.11"
tokio-async-std = {version ="1.5.3", features = ["unstable"]}
futures = "*"
futures-core = "0.3.5"
futures-util = "0.3.5"
http = "0.2.1"
@ -30,10 +30,11 @@ serde_urlencoded = "0.6.1"
serde_derive="*"
url = "2.1.1"
anyhow = "1.0.31"
tokio = { version = "0.2.22", features = ["macros"] }
tokio = { version = "*", features = ["macros", "rt-threaded", "time", "sync", "blocking", "process", "io-std", "io-util", "rt-util"] }
gstreamer-webrtc = "*"
gstreamer="*"
toml="*"
crossbeam-channel = "*"
[dependencies.ruma]

View file

@ -1,13 +1,28 @@
[gstreamer]
# These pipelines can do anything you want, but MUST have a webrtcbin to provide the webrtc side of things
# These pipelines can do anything you want, but the active one MUST have a webrtcbin to provide the webrtc side of things
webrtcbin_name = "webrtcpad"
active_pipeline = "..."
passive_pipeline = "..."
# The active pipeline is the one used during a call
active_pipeline = """
videotestsrc pattern=ball is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! webrtcpad.
audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 ! webrtcpad.
webrtcbin name=webrtcpad
"""
# The passive pipeline is used when there's no active call going on
# This can be used to put a feed to an rtmp server, display to a screen,
# or otherwise play with the video. If it's present it will be used
# passive_pipeline = "..."
# How to handle the incoming webrtc stream. Defaults to a null sink that does nothing with it.
# provided example just does playback via pulseaudio and dumps the video stream
# incoming_pipeline = "..."
[matrix]
homeserver_url = "https://matrix.org"
username = "example_user"
password = "eXaMpLe PaSsWoRd!123"
homeserver_url = "..."
username = "..."
password = "..."
### proxy sets the http proxy to use for connecting to the system
# proxy = "http://localhost:8080/"
@ -18,14 +33,14 @@ password = "eXaMpLe PaSsWoRd!123"
###
# TBD, force TURN/STUN servers, in gstreamer or in matrix? default to asking the matrix server
[target]
# When the trigger happens (GPIO button on raspberry pi, rest endpoint, whatever)
# This is who we try to call
# TBD handle verification of users differently? maybe allow it to be required
[target]
username = "@user:matrix.org"
# TODO use this for video messages too, maybe a way to map buttons to specific users?
leave_mail = false
# This section is optional, but recommended
# It's used to control who can directly call in to the intercom

34
src/chans.rs Normal file
View file

@ -0,0 +1,34 @@
use crossbeam_channel::{unbounded, Sender, Receiver};
// I'm 100% positive that there's a better way to do this
// That's far more idiomatic, but I'm doing this the hard way right
// now to learn
pub struct Channels<L: Send + Clone, R: Send + Clone> {
left: (Sender<L>, Receiver<L>),
right: (Sender<R>, Receiver<R>),
}
impl<L: Send + Clone, R: Send + Clone> Channels<L, R> {
pub fn get_pair_left(&self) -> (Sender<L>, Receiver<L>) {
let (sender, receiver) = &self.left;
(sender.clone(), receiver.clone())
}
pub fn get_pair_right(&self) -> (Sender<R>, Receiver<R>) {
let (sender, receiver) = &self.right;
(sender.clone(), receiver.clone())
}
}
pub fn create_pair<L: Send + Clone, R: Send + Clone>() -> Channels<L, R> {
let (left_sender, left_receiver) = unbounded::<L>();
let (right_sender, right_receiver) = unbounded::<R>();
Channels::<L,R> {
left: (left_sender, left_receiver),
right: (right_sender, right_receiver),
}
}

View file

@ -19,6 +19,7 @@ pub struct Config {
pub struct GstreamerConfig {
pub active_pipeline: String,
pub passive_pipeline: Option<String>,
pub incoming_pipline: Option<String>,
pub webrtcbin_name: String,
}

View file

@ -7,6 +7,11 @@ use gst::prelude::*;
use anyhow;
use crate::config;
struct GstreamerPipeline {
pub description: String,
// TBD what other stuff goes in here?
}
// TBD, how to manage the gstreamer instance/mode/whatever. Throw it in an Arc?
// I should use a channel and an async task to work with it since it can be a little weird
// and I want it to never be able to block the matrix side of things
@ -23,3 +28,32 @@ pub fn foo() -> Result<i32, anyhow::Error> {
return Ok(1);
}
/// setup_pipeline gets the gstreamer objects from the pipeline description
/// TBD: return type
fn setup_pipeline(pipeline_desc: String) -> Result<(), anyhow::Error> {
Ok(())
}
/// get the incoming pipeline from the config, should this do the full setup?
/// return a null pipeline if not found
fn get_incoming_pipeline() -> Result<String, anyhow::Error> {
Ok("foo".to_string())
}
/// get the passive pipeline from the config, if it exists
/// if it doesn't exist we don't return anything
fn get_passive_pipeline() -> Result<Option<String>, anyhow::Error> {
Ok(None)
}
/// open_active_pipeline turns off the passive pipeline, and then sets up the active
/// pipeline. It also then connects the incoming sdp to the incoming pipeline
pub fn open_active_pipeline(incoming_sdp: String) -> Result<String, anyhow::Error> {
Ok("foo".to_string())
}
pub fn close_active_pipeline() -> Result<(), anyhow::Error> {
Ok(())
}

View file

@ -7,12 +7,14 @@ use std::{env, process::exit};
pub mod matrixbot;
pub mod gstream;
pub mod config;
pub mod chans;
use matrix_sdk;
use tracing_subscriber;
use anyhow;
use tokio;
use futures::join;
#[tokio::main]
#[tokio::main(core_threads=4)]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
//let (event_tx, event_rx) = channel::<ClientEventThingy>(100);// setup a channel with 100 messages
@ -37,6 +39,34 @@ async fn main() -> Result<(), anyhow::Error> {
}
};
matrixbot::login_and_sync(botname).await?;
// In the future I'll have to design some proper enums/types for this.
// for now just use Strings for basic stuff
let matrix_stream_channels = chans::create_pair::<String, String>();
let (matrix_sender, gstreamer_receiver) = matrix_stream_channels.get_pair_left();
let (gstreamer_sender, matrix_receiver) = matrix_stream_channels.get_pair_right();
let matrix_fut = tokio::spawn(async move {
println!("{:?} {:?}", matrix_sender, matrix_receiver);
matrixbot::login_and_sync(botname, matrix_sender, matrix_receiver).await.unwrap();
});
let gst_fut = tokio::spawn(async move {
println!("{:?} {:?}", gstreamer_sender, gstreamer_receiver);
loop {
// this should really be doing stuff for gstreamer
tokio::time::delay_for(tokio::time::Duration::from_millis(1000)).await;
println!("Waited");
}
});
//
join!(gst_fut, matrix_fut).unwrap();
//gstream::get_active_sdp(botname);
Ok(())
}

View file

@ -1,6 +1,8 @@
extern crate gstreamer_webrtc as gst_webrtc;
extern crate gstreamer as gst;
use crossbeam_channel::{Sender, Receiver};
use async_trait::async_trait;
use matrix_sdk::{
@ -163,7 +165,7 @@ impl EventEmitter for CommandBot {
}
}
pub async fn login_and_sync(botname: String) -> Result<(), anyhow::Error> {
pub async fn login_and_sync(botname: String, _sender: Sender<String>, _receiver: Receiver<String>) -> Result<(), anyhow::Error> {
// the location for `JsonStore` to save files to
// homeserver_url: String, username: String, password: String
//let {homeserver_url: String, username: String, password: String} = config.matrix;