stuff... and pain

This commit is contained in:
Nickiel12 2024-08-10 19:10:32 +00:00
parent 823f1313af
commit 712a4a01e3
6 changed files with 269 additions and 37 deletions

40
Cargo.lock generated
View file

@ -2676,6 +2676,27 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "snafu"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d"
dependencies = [
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "socket2"
version = "0.5.7"
@ -2938,18 +2959,6 @@ dependencies = [
"tungstenite 0.21.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.23.0",
]
[[package]]
name = "tokio-util"
version = "0.7.11"
@ -3253,13 +3262,16 @@ dependencies = [
"ctrlc",
"gst-plugin-webrtc",
"gstreamer",
"gstreamer-webrtc",
"lazy_static",
"log",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite 0.23.1",
"snafu",
"toml",
"tracing",
"tracing-subscriber",
"tungstenite 0.23.0",
]
[[package]]

View file

@ -11,10 +11,15 @@ config = "0.14.0"
ctrlc = "3.4.4"
gst-plugin-webrtc = { version = "0.13.0", features = ["v1_22"] }
gstreamer = { version = "0.23.0", features = ["v1_22"] }
gstreamer-webrtc = { version = "0.23.0", features = ["v1_22"] }
lazy_static = "1.5.0"
log = "0.4.22"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.122"
tokio = "1.39.2"
tokio-tungstenite = "0.23.1"
snafu = "0.8.4"
# tokio = "1.39.2"
# tokio-tungstenite = "0.23.1"
toml = "0.8.19"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tungstenite = "0.23.0"

53
src/config.rs Normal file
View file

@ -0,0 +1,53 @@
use config::{Config, FileFormat};
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use std::fs::File;
use std::io::Write;
use tracing::{info, instrument};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppConfig {
pub destination_ip: String,
pub destination_port: u32,
}
impl Default for AppConfig {
fn default() -> Self {
AppConfig {
destination_ip: "localhost".to_string(),
destination_port: 7891,
}
}
}
pub fn load_config() -> AppConfig {
Config::builder()
.add_source(config::File::new("./settings.toml", FileFormat::Toml))
.build()
.and_then(|val| val.try_deserialize())
.unwrap_or_default()
}
#[derive(Debug, Snafu)]
pub enum SaveConfigError {
#[snafu(display("Could not serialize app state: {source}"))]
SerdeError { source: toml::ser::Error },
#[snafu(display("Could not write app state to file: {path}"))]
IoError {
source: std::io::Error,
path: String,
},
}
#[instrument]
pub fn save_config(config: &AppConfig) -> Result<(), SaveConfigError> {
let toml_str = toml::to_string(&config).context(SerdeSnafu)?;
let mut file = File::create("./settings.toml").context(IoSnafu {
path: "./settings.toml",
})?;
file.write_all(toml_str.as_bytes()).context(IoSnafu {
path: "./settings.toml",
})?;
info!("Config file saved successfully");
Ok(())
}

View file

@ -2,17 +2,27 @@ use std::sync::{atomic::AtomicBool, Arc};
use anyhow::Error;
use gstreamer::{prelude::*, Element, ElementFactory, Pipeline, State};
use gstrswebrtc::{signaller::Signallable, webrtcsink::WhipWebRTCSink};
use gstrswebrtc::{signaller as signaller_interface, webrtcsink};
mod config;
mod signaller;
use signaller::{SignallerEvent, SENDER_CHANNEL};
fn main() -> Result<(), Error> {
// TRACING SETUP
let sub = tracing_subscriber::FmtSubscriber::new();
if let Err(e) = tracing::subscriber::set_global_default(sub) {
panic!("Could not set tracing global: {e}");
}
let config = config::load_config();
let (sender, receiver) = std::sync::mpsc::channel::<SignallerEvent>();
*SENDER_CHANNEL.lock().unwrap() = Some(sender.clone());
// EXIT HANDLER
let to_quit = Arc::new(AtomicBool::new(false));
let to_quit_2 = to_quit.clone();
@ -20,44 +30,65 @@ fn main() -> Result<(), Error> {
to_quit_2.store(true, std::sync::atomic::Ordering::SeqCst);
})?;
// GSTREAMER SETUP
gstreamer::init()?;
gstrswebrtc::plugin_register_static()?;
// PIPELINE INSTANTIATION
let custom_signaller = signaller::MyCustomSignaller::new();
let webrtcsink = webrtcsink::BaseWebRTCSink::with_signaller(
signaller_interface::Signallable::from(custom_signaller),
);
let pipeline = Pipeline::with_name("rstp-pipeline");
// let source = ElementFactory::make("videotestsrc")
let source = ElementFactory::make("videotestsrc")
.build().unwrap();
let source = ElementFactory::make("videotestsrc").build().unwrap();
let convert = ElementFactory::make("x264enc")
.build().unwrap();
let convert = ElementFactory::make("x264enc").build().unwrap();
let whipsink = gstreamer::ElementFactory::make("whipclientsink")
.name("whip-sink_123059")
.build()?;
pipeline
.add_many([&source, &convert, webrtcsink.upcast_ref()])
.unwrap();
if let Some(whipsink) = whipsink.dynamic_cast_ref::<gstrswebrtc::webrtcsink::WhipWebRTCSink>() {
let signaller = whipsink.property::<Signallable>("signaller");
signaller.set_property_from_str(
"whip-endpoint",
&format!("http://localhost:{}/{}/whip", 8889, "mystream"),
);
Element::link_many([&source, &convert, webrtcsink.upcast_ref::<Element>()]).unwrap();
// PIPELINE START
// pipeline.set_state(State::Playing).unwrap();
// BUT WHEN WILL IT END?
loop {
if let Ok((ws, resp)) = tungstenite::connect(format!("ws://{}:{}", config.destination_ip, config.destination_port)) {
while !to_quit.load(std::sync::atmoic::Ordering::SeqCst) {
match ws.read() {
Err(e) => {
error!("There was an error from the websocket: {e}");
break;
},
Ok(msg) => {
match msg {
Message::Text(text) => {
info!("There was a text message! {text}");
match text {
"StartStream" => pipeline.set_state(State::Playing),
"StopStream" => pipeline.set_state(State::Paused),
_ =>
}
}
}
}
}
}
pipeline.add_many([&source, &convert, &whipsink]).unwrap();
}
Element::link_many([&source, &convert, &whipsink]).unwrap();
pipeline.set_state(State::Playing).unwrap();
loop {
if to_quit.load(std::sync::atomic::Ordering::SeqCst) {
println!("Recieved Ctrl+C, stopping");
pipeline.set_state(State::Null).unwrap();
break;
}
std::thread::sleep(std::time::Duration::from_millis(200));
}

110
src/signaller/imp.rs Normal file
View file

@ -0,0 +1,110 @@
use std::sync::Mutex;
use gstreamer::glib;
use gstreamer::subclass::prelude::*;
use gstreamer_webrtc::WebRTCSessionDescription;
use gstrswebrtc::signaller::{Signallable, SignallableImpl};
use lazy_static::lazy_static;
use std::sync::mpsc::Sender;
use tracing::error;
lazy_static! {
pub static ref SENDER_CHANNEL: Mutex<Option<Sender<SignallerEvent>>> = Mutex::new(None);
}
pub enum SignallerEvent {
Start,
Stop,
SendSDP(SdpMessage),
AddICE(ICE),
End(String),
}
pub struct ICE {
pub session_id: String,
pub candidate: String,
pub sdp_m_line_index: u32,
}
pub enum SdpMessage {
Offer(String),
Answer(String),
}
#[derive(Default)]
pub struct Signaller {}
impl Signaller {}
impl SignallableImpl for Signaller {
fn start(&self) {
SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| {
if let Err(e) = msg_bus.send(SignallerEvent::Start) {
error!("Could not send message from the signaller to the message bus! {e}");
}
});
}
fn stop(&self) {
SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| {
if let Err(e) = msg_bus.send(SignallerEvent::Stop) {
error!("Could not send message from the signaller to the message bus! {e}");
}
});
}
fn send_sdp(&self, _session_id: &str, sdp: &WebRTCSessionDescription) {
let message = {
if sdp.type_() == gstreamer_webrtc::WebRTCSDPType::Offer {
SdpMessage::Offer(sdp.sdp().as_text().unwrap())
} else {
SdpMessage::Answer(sdp.sdp().as_text().unwrap())
}
};
SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| {
if let Err(e) = msg_bus.send(SignallerEvent::SendSDP(message)) {
error!("Could not send message from the signaller to the message bus! {e}");
}
});
}
fn add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: u32,
_sdp_mid: Option<String>,
) {
let ice_msg = ICE {
session_id: session_id.to_string(),
candidate: candidate.to_string(),
sdp_m_line_index,
};
SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| {
if let Err(e) = msg_bus.send(SignallerEvent::AddICE(ice_msg)) {
error!("Could not send message from the signaller to the message bus! {e}");
}
});
}
fn end_session(&self, sess_id: &str) {
let session_id = sess_id.to_string();
SENDER_CHANNEL.lock().unwrap().as_ref().map(|msg_bus| {
if let Err(e) = msg_bus.send(SignallerEvent::End(session_id)) {
error!("Could not send message from the signaller to the message bus! {e}");
}
});
}
}
#[glib::object_subclass]
impl ObjectSubclass for Signaller {
const NAME: &'static str = "MyCustomWebRTCSinkSignaller";
type Type = super::MyCustomSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for Signaller {}

21
src/signaller/mod.rs Normal file
View file

@ -0,0 +1,21 @@
use gstreamer::glib;
use gstrswebrtc::signaller::Signallable;
mod imp;
pub use imp::{SdpMessage, SignallerEvent, ICE, SENDER_CHANNEL};
glib::wrapper! {
pub struct MyCustomSignaller(ObjectSubclass<imp::Signaller>) @implements Signallable;
}
impl MyCustomSignaller {
pub fn new() -> Self {
glib::Object::new()
}
}
impl Default for MyCustomSignaller {
fn default() -> Self {
MyCustomSignaller::new()
}
}