From 712a4a01e358c557b00c1a2d89335ff04ef7a5f4 Mon Sep 17 00:00:00 2001 From: Nickiel12 Date: Sat, 10 Aug 2024 19:10:32 +0000 Subject: [PATCH] stuff... and pain --- Cargo.lock | 40 ++++++++++------ Cargo.toml | 9 +++- src/config.rs | 53 +++++++++++++++++++++ src/main.rs | 73 +++++++++++++++++++--------- src/signaller/imp.rs | 110 +++++++++++++++++++++++++++++++++++++++++++ src/signaller/mod.rs | 21 +++++++++ 6 files changed, 269 insertions(+), 37 deletions(-) create mode 100644 src/config.rs create mode 100644 src/signaller/imp.rs create mode 100644 src/signaller/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f62a7b9..4cde573 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2676,6 +2676,27 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "snafu" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "socket2" version = "0.5.7" @@ -2938,18 +2959,6 @@ dependencies = [ "tungstenite 0.21.0", ] -[[package]] -name = "tokio-tungstenite" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.23.0", -] - [[package]] name = "tokio-util" version = "0.7.11" @@ -3253,13 +3262,16 @@ dependencies = [ "ctrlc", "gst-plugin-webrtc", "gstreamer", + "gstreamer-webrtc", + "lazy_static", "log", "serde", "serde_json", - "tokio", - "tokio-tungstenite 0.23.1", + "snafu", + "toml", "tracing", "tracing-subscriber", + "tungstenite 0.23.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 046b62f..e206a98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,15 @@ config = "0.14.0" ctrlc = "3.4.4" gst-plugin-webrtc = { version = "0.13.0", features = ["v1_22"] } gstreamer = { version = "0.23.0", features = ["v1_22"] } +gstreamer-webrtc = { version = "0.23.0", features = ["v1_22"] } +lazy_static = "1.5.0" log = "0.4.22" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.122" -tokio = "1.39.2" -tokio-tungstenite = "0.23.1" +snafu = "0.8.4" +# tokio = "1.39.2" +# tokio-tungstenite = "0.23.1" +toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = "0.3.18" +tungstenite = "0.23.0" diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..fb0eb81 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,53 @@ +use config::{Config, FileFormat}; +use serde::{Deserialize, Serialize}; +use snafu::prelude::*; +use std::fs::File; +use std::io::Write; +use tracing::{info, instrument}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AppConfig { + pub destination_ip: String, + pub destination_port: u32, +} + +impl Default for AppConfig { + fn default() -> Self { + AppConfig { + destination_ip: "localhost".to_string(), + destination_port: 7891, + } + } +} + +pub fn load_config() -> AppConfig { + Config::builder() + .add_source(config::File::new("./settings.toml", FileFormat::Toml)) + .build() + .and_then(|val| val.try_deserialize()) + .unwrap_or_default() +} + +#[derive(Debug, Snafu)] +pub enum SaveConfigError { + #[snafu(display("Could not serialize app state: {source}"))] + SerdeError { source: toml::ser::Error }, + #[snafu(display("Could not write app state to file: {path}"))] + IoError { + source: std::io::Error, + path: String, + }, +} + +#[instrument] +pub fn save_config(config: &AppConfig) -> Result<(), SaveConfigError> { + let toml_str = toml::to_string(&config).context(SerdeSnafu)?; + let mut file = File::create("./settings.toml").context(IoSnafu { + path: "./settings.toml", + })?; + file.write_all(toml_str.as_bytes()).context(IoSnafu { + path: "./settings.toml", + })?; + info!("Config file saved successfully"); + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 24fc879..544d3c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,17 +2,27 @@ use std::sync::{atomic::AtomicBool, Arc}; use anyhow::Error; use gstreamer::{prelude::*, Element, ElementFactory, Pipeline, State}; -use gstrswebrtc::{signaller::Signallable, webrtcsink::WhipWebRTCSink}; +use gstrswebrtc::{signaller as signaller_interface, webrtcsink}; +mod config; +mod signaller; + +use signaller::{SignallerEvent, SENDER_CHANNEL}; fn main() -> Result<(), Error> { - + // TRACING SETUP let sub = tracing_subscriber::FmtSubscriber::new(); if let Err(e) = tracing::subscriber::set_global_default(sub) { panic!("Could not set tracing global: {e}"); } + let config = config::load_config(); + let (sender, receiver) = std::sync::mpsc::channel::(); + + *SENDER_CHANNEL.lock().unwrap() = Some(sender.clone()); + + // EXIT HANDLER let to_quit = Arc::new(AtomicBool::new(false)); let to_quit_2 = to_quit.clone(); @@ -20,44 +30,65 @@ fn main() -> Result<(), Error> { to_quit_2.store(true, std::sync::atomic::Ordering::SeqCst); })?; + // GSTREAMER SETUP gstreamer::init()?; gstrswebrtc::plugin_register_static()?; + // PIPELINE INSTANTIATION + let custom_signaller = signaller::MyCustomSignaller::new(); + let webrtcsink = webrtcsink::BaseWebRTCSink::with_signaller( + signaller_interface::Signallable::from(custom_signaller), + ); + let pipeline = Pipeline::with_name("rstp-pipeline"); // let source = ElementFactory::make("videotestsrc") - let source = ElementFactory::make("videotestsrc") - .build().unwrap(); + let source = ElementFactory::make("videotestsrc").build().unwrap(); - let convert = ElementFactory::make("x264enc") - .build().unwrap(); + let convert = ElementFactory::make("x264enc").build().unwrap(); - let whipsink = gstreamer::ElementFactory::make("whipclientsink") - .name("whip-sink_123059") - .build()?; + pipeline + .add_many([&source, &convert, webrtcsink.upcast_ref()]) + .unwrap(); - if let Some(whipsink) = whipsink.dynamic_cast_ref::() { - let signaller = whipsink.property::("signaller"); - signaller.set_property_from_str( - "whip-endpoint", - &format!("http://localhost:{}/{}/whip", 8889, "mystream"), - ); - } + Element::link_many([&source, &convert, webrtcsink.upcast_ref::()]).unwrap(); - pipeline.add_many([&source, &convert, &whipsink]).unwrap(); + // PIPELINE START + // pipeline.set_state(State::Playing).unwrap(); - Element::link_many([&source, &convert, &whipsink]).unwrap(); - - pipeline.set_state(State::Playing).unwrap(); + // BUT WHEN WILL IT END? loop { + if let Ok((ws, resp)) = tungstenite::connect(format!("ws://{}:{}", config.destination_ip, config.destination_port)) { + while !to_quit.load(std::sync::atmoic::Ordering::SeqCst) { + match ws.read() { + Err(e) => { + error!("There was an error from the websocket: {e}"); + break; + }, + Ok(msg) => { + match msg { + Message::Text(text) => { + info!("There was a text message! {text}"); + match text { + "StartStream" => pipeline.set_state(State::Playing), + "StopStream" => pipeline.set_state(State::Paused), + _ => + } + } + } + } + } + } + + } + if to_quit.load(std::sync::atomic::Ordering::SeqCst) { println!("Recieved Ctrl+C, stopping"); pipeline.set_state(State::Null).unwrap(); break; } - std::thread::sleep(std::time::Duration::from_millis(200)); } diff --git a/src/signaller/imp.rs b/src/signaller/imp.rs new file mode 100644 index 0000000..48a70b6 --- /dev/null +++ b/src/signaller/imp.rs @@ -0,0 +1,110 @@ +use std::sync::Mutex; + +use gstreamer::glib; +use gstreamer::subclass::prelude::*; +use gstreamer_webrtc::WebRTCSessionDescription; +use gstrswebrtc::signaller::{Signallable, SignallableImpl}; +use lazy_static::lazy_static; +use std::sync::mpsc::Sender; +use tracing::error; + +lazy_static! { + pub static ref SENDER_CHANNEL: Mutex>> = Mutex::new(None); +} + +pub enum SignallerEvent { + Start, + Stop, + SendSDP(SdpMessage), + AddICE(ICE), + End(String), +} + +pub struct ICE { + pub session_id: String, + pub candidate: String, + pub sdp_m_line_index: u32, +} + +pub enum SdpMessage { + Offer(String), + Answer(String), +} + +#[derive(Default)] +pub struct Signaller {} + +impl Signaller {} + +impl SignallableImpl for Signaller { + fn start(&self) { + SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| { + if let Err(e) = msg_bus.send(SignallerEvent::Start) { + error!("Could not send message from the signaller to the message bus! {e}"); + } + }); + } + + fn stop(&self) { + SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| { + if let Err(e) = msg_bus.send(SignallerEvent::Stop) { + error!("Could not send message from the signaller to the message bus! {e}"); + } + }); + } + + fn send_sdp(&self, _session_id: &str, sdp: &WebRTCSessionDescription) { + let message = { + if sdp.type_() == gstreamer_webrtc::WebRTCSDPType::Offer { + SdpMessage::Offer(sdp.sdp().as_text().unwrap()) + } else { + SdpMessage::Answer(sdp.sdp().as_text().unwrap()) + } + }; + + SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| { + if let Err(e) = msg_bus.send(SignallerEvent::SendSDP(message)) { + error!("Could not send message from the signaller to the message bus! {e}"); + } + }); + } + + fn add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: u32, + _sdp_mid: Option, + ) { + let ice_msg = ICE { + session_id: session_id.to_string(), + candidate: candidate.to_string(), + sdp_m_line_index, + }; + + SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| { + if let Err(e) = msg_bus.send(SignallerEvent::AddICE(ice_msg)) { + error!("Could not send message from the signaller to the message bus! {e}"); + } + }); + } + + fn end_session(&self, sess_id: &str) { + let session_id = sess_id.to_string(); + SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| { + if let Err(e) = msg_bus.send(SignallerEvent::End(session_id)) { + error!("Could not send message from the signaller to the message bus! {e}"); + } + }); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Signaller { + const NAME: &'static str = "MyCustomWebRTCSinkSignaller"; + type Type = super::MyCustomSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl ObjectImpl for Signaller {} diff --git a/src/signaller/mod.rs b/src/signaller/mod.rs new file mode 100644 index 0000000..2040a6b --- /dev/null +++ b/src/signaller/mod.rs @@ -0,0 +1,21 @@ +use gstreamer::glib; +use gstrswebrtc::signaller::Signallable; + +mod imp; +pub use imp::{SdpMessage, SignallerEvent, ICE, SENDER_CHANNEL}; + +glib::wrapper! { + pub struct MyCustomSignaller(ObjectSubclass) @implements Signallable; +} + +impl MyCustomSignaller { + pub fn new() -> Self { + glib::Object::new() + } +} + +impl Default for MyCustomSignaller { + fn default() -> Self { + MyCustomSignaller::new() + } +}