sort of there, debugging gstreamer

This commit is contained in:
Ryan Voots 2020-08-26 18:56:04 -07:00
parent 5966fc0826
commit 088ad95f31
4 changed files with 97 additions and 28 deletions

View file

@ -5,10 +5,10 @@ authors = ["Ryan Voots <simcop2387@simcop2387.info>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk", features = ["encryption"]} matrix-sdk = { git = "https://github.com/simcop2387/matrix-rust-sdk", features = ["encryption"]}
matrix-sdk-base = { git = "https://github.com/matrix-org/matrix-rust-sdk"} matrix-sdk-base = { git = "https://github.com/simcop2387/matrix-rust-sdk"}
matrix-sdk-common-macros = { git = "https://github.com/matrix-org/matrix-rust-sdk"} matrix-sdk-common-macros = { git = "https://github.com/simcop2387/matrix-rust-sdk"}
matrix-sdk-common = { git = "https://github.com/matrix-org/matrix-rust-sdk"} matrix-sdk-common = { git = "https://github.com/simcop2387/matrix-rust-sdk"}
dirs = "3.0.1" dirs = "3.0.1"
async-trait = "0.1.37" async-trait = "0.1.37"
tracing-subscriber = "0.2.11" tracing-subscriber = "0.2.11"
@ -29,7 +29,7 @@ tokio = { version = "*", features = ["macros", "rt-threaded", "time", "sync", "b
gstreamer-webrtc = "*" gstreamer-webrtc = "*"
gstreamer="*" gstreamer="*"
gstreamer-sdp="*" gstreamer-sdp="*"
js_int="*" js_int={version = "*", features = ["lax_deserialize"]}
toml="*" toml="*"
crossbeam-channel = "*" crossbeam-channel = "*"
tracing = "*" tracing = "*"
@ -46,3 +46,8 @@ version = "0.9.0"
#git = "https://github.com/ruma/ruma" #git = "https://github.com/ruma/ruma"
#rev = "9cf552f36186eedff44ebe0c6a32d598315f5860" #rev = "9cf552f36186eedff44ebe0c6a32d598315f5860"
#features = ["client-api", "unstable-pre-spec"] #features = ["client-api", "unstable-pre-spec"]
[patch.'https://github.com/matrix-org/matrix-rust-sdk']
ruma = {git = "https://github.com/ruma/ruma", rev = "409fbcc9d745fb7290327cb7f5defc714229ab30", features = ["client-api", "unstable-pre-spec"]}

View file

@ -93,9 +93,9 @@ impl CallManager {
let resp_result = client.send(request).await; let resp_result = client.send(request).await;
match resp_result { match resp_result {
Ok(resp) => { Ok(resp) => {
println!("Got turn severs: {:?}", resp);
if !resp.uris.is_empty() { if !resp.uris.is_empty() {
let turn_auth = TurnAuth { let turn_auth = TurnAuth {
username: resp.username, username: resp.username,
@ -108,14 +108,12 @@ impl CallManager {
} else { } else {
Ok(None) Ok(None)
} }
}, }
Err(e) => { Err(e) => {
eprintln!("Got an error getting the turn server {:?}", e); eprintln!("Got an error getting the turn server {:?}", e);
Ok(None) Ok(None)
} }
} }
} }
pub fn can_start_call(&self) -> bool { !&self.call_in_progress } pub fn can_start_call(&self) -> bool { !&self.call_in_progress }

View file

@ -39,6 +39,7 @@ struct GstreamerPipeline {
pub description: String, pub description: String,
pipeline: gst::Pipeline, pipeline: gst::Pipeline,
bin: gst::Element, bin: gst::Element,
bus_pipeline: gst::bus::BusStream,
// TBD what other stuff goes in here? // TBD what other stuff goes in here?
} }
@ -110,11 +111,20 @@ impl Gstreamer {
let tempval2 = tempval.lock(); let tempval2 = tempval.lock();
let tempval3 = tempval2.unwrap(); let tempval3 = tempval2.unwrap();
let tempval5 = tempval3.borrow(); let tempval5 = tempval3.borrow();
let tempval4 = tempval5.as_ref().unwrap(); let tempval4 = tempval5.as_ref();
let webrtcbin = &tempval4.bin;
match tempval4 {
Some(value) => {
let webrtcbin = &value.bin;
webrtcbin.emit("add-ice-candidate", &[&index, &sdp])?; webrtcbin.emit("add-ice-candidate", &[&index, &sdp])?;
},
None => {
println!("Got an ICE candidate before we're ready, ignoring: {}", sdp);
}
};
Ok(()) Ok(())
} }
@ -156,10 +166,15 @@ impl Gstreamer {
if let Ok(pipeline) = pipeline_maybe { if let Ok(pipeline) = pipeline_maybe {
let bin = pipeline.get_by_name(&pad_name).unwrap(); let bin = pipeline.get_by_name(&pad_name).unwrap();
// Create a stream for handling the GStreamer message asynchronously
let bus = pipeline.get_bus().unwrap();
let send_gst_msg_rx = bus.stream();
let pipeline_desc = Some(GstreamerPipeline { let pipeline_desc = Some(GstreamerPipeline {
description: self.config.active_pipeline.clone(), description: self.config.active_pipeline.clone(),
pipeline, pipeline,
bin, bin,
bus_pipeline: send_gst_msg_rx,
}); });
self.current_pipeline.lock().unwrap().replace(pipeline_desc); self.current_pipeline.lock().unwrap().replace(pipeline_desc);
@ -183,10 +198,15 @@ impl Gstreamer {
// for base_url in turn_server.uris { // for base_url in turn_server.uris {
let base_url = &turn_server.uris[0]; let base_url = &turn_server.uris[0];
let mut turn_server_url = Url::parse(&base_url)?; let real_base_url = &base_url.replacen(":", "://", 1);
let mut turn_server_url = Url::parse(&real_base_url)?;
turn_server_url.set_username(&turn_server.username).unwrap(); let result=turn_server_url.set_username(&turn_server.username);
turn_server_url.set_password(Some(&turn_server.password)).unwrap(); println!("debug error {:?}", result);
let result=turn_server_url.set_password(Some(&turn_server.password));
println!("debug error {:?}", result);
println!("Set turnserver to: {}", &turn_server_url.to_string());
webrtcbin.set_property_from_str("turn-server", &turn_server_url.to_string()); webrtcbin.set_property_from_str("turn-server", &turn_server_url.to_string());
// } // }
@ -258,13 +278,22 @@ impl Gstreamer {
let promise = gst::Promise::with_change_func(move |reply| { let promise = gst::Promise::with_change_func(move |reply| {
let app = upgrade_weak!(app_clone); let app = upgrade_weak!(app_clone);
if let Err(err) = app.on_answer_created(Ok(reply.unwrap().unwrap())) { println!("Debug: {:?}", reply);
if let Ok(value) = reply {
if let Some(value2) = value {
if let Err(err) = app.on_answer_created(Ok(value2)) {
gst_element_error!( gst_element_error!(
app.current_pipeline.lock().unwrap().borrow().as_ref().unwrap().pipeline, app.current_pipeline.lock().unwrap().borrow().as_ref().unwrap().pipeline,
gst::LibraryError::Failed, gst::LibraryError::Failed,
("Failed to send SDP answer: {:?}", err) ("Failed to send SDP answer: {:?}", err)
); );
} }
}
}
}); });
webrtcbin webrtcbin
@ -293,6 +322,13 @@ impl Gstreamer {
}) })
.unwrap(); .unwrap();
} }
// Asynchronously set the pipeline to Playing
webrtcbin.call_async(|pipeline| {
pipeline
.set_state(gst::State::Playing)
.expect("Couldn't set pipeline to Playing");
});
} else { } else {
// Any passive stuff involved? // Any passive stuff involved?
} }
@ -360,7 +396,7 @@ impl Gstreamer {
.borrow() .borrow()
.as_ref() .as_ref()
.unwrap() .unwrap()
.pipeline .bin
.emit("set-local-description", &[&offer, &None::<gst::Promise>]) .emit("set-local-description", &[&offer, &None::<gst::Promise>])
.unwrap(); .unwrap();
@ -497,7 +533,7 @@ impl Gstreamer {
.borrow() .borrow()
.as_ref() .as_ref()
.unwrap() .unwrap()
.pipeline .bin
.emit("set-local-description", &[&answer, &None::<gst::Promise>]) .emit("set-local-description", &[&answer, &None::<gst::Promise>])
.unwrap(); .unwrap();
@ -531,6 +567,29 @@ impl Gstreamer {
Ok(()) Ok(())
} }
// Handle GStreamer messages coming from the pipeline
fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> {
use gst::message::MessageView;
match message.view() {
MessageView::Error(err) => bail!(
"Error from element {}: {} ({})",
err
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
err.get_error(),
err.get_debug().unwrap_or_else(|| String::from("None")),
),
MessageView::Warning(warning) => {
println!("Warning: \"{}\"", warning.get_debug().unwrap());
}
_ => (),
}
Ok(())
}
pub async fn loop_once(&self) -> Result<(), anyhow::Error> { pub async fn loop_once(&self) -> Result<(), anyhow::Error> {
select! { select! {
recv(self.gstream_receiver) -> msg => match msg.unwrap() { recv(self.gstream_receiver) -> msg => match msg.unwrap() {
@ -560,6 +619,15 @@ impl Gstreamer {
default(Duration::from_secs(1)) => println!("gst recv heartbeat"), default(Duration::from_secs(1)) => println!("gst recv heartbeat"),
} }
/* if let Some(pipeline) = self.current_pipeline.lock().unwrap().borrow().as_ref() {
let gst_bus = pipeline.bus_pipeline;
let gst_msg = gst_bus.poll_next();
if let Some(gst_msg) = gst_msg {
self.handle_pipeline_message(&gst_msg)?;
}
}*/
Ok(()) Ok(())
} }
} }

View file

@ -91,9 +91,7 @@ impl CommandBot {
.await .await
.unwrap(); .unwrap();
self.callmanager.send( self.callmanager.send(TriggerCall(None)).unwrap();
TriggerCall(None)
).unwrap();
println!("message sent"); println!("message sent");
} }