diff --git a/src/gst.rs b/src/gst.rs index 5e6fd03..21252f4 100644 --- a/src/gst.rs +++ b/src/gst.rs @@ -1,6 +1,8 @@ use gstreamer::{ self as gst, - prelude::{Cast, ElementExtManual, GstBinExtManual}, + prelude::{ + Cast, ElementExt, ElementExtManual, GObjectExtManualGst, GstBinExt, GstBinExtManual, PadExt, + }, ElementFactory, }; use gstreamer_app as gst_app; @@ -14,16 +16,27 @@ pub struct Pipeline { } const HEIGHT: usize = 480; +const SOCKET_PATH: &str = "/tmp/VCS-streamer-shm"; pub fn new_pipeline(config: &AppConfig) -> Pipeline { let pipeline = gst::Pipeline::builder() .name("camera_to_rtp_pipeine") .build(); - // let source = ElementFactory::make("mfvideosrc") - let source = ElementFactory::make("v4l2src") - .build() - .expect("Could not make mfvideosrc element!"); + let source = { + #[cfg(target_os = "windows")] + { + ElementFactory::make("mfvideosrc") + .build() + .expect("Could not make mfvideosrc element!") + } + #[cfg(not(target_os = "windows"))] + { + ElementFactory::make("v4l2src") + .build() + .expect("Could not make mfvideosrc element!") + } + }; let video_convert = ElementFactory::make("videoconvert") .build() @@ -33,6 +46,10 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { .build() .expect("Could not make videoscale gst element!"); + let tee = ElementFactory::make("tee") + .build() + .expect("Could not make Tee gst element!"); + let video_scale = ElementFactory::make("videoscale") .property("add-borders", true) .build() @@ -61,6 +78,7 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { &source, &video_convert, &video_rate, + &tee, &video_scale, &vp8enc, &rtp, @@ -68,8 +86,31 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { ]) .expect("Could not add all the stuff to the pipeline"); - gst::Element::link_many(&[&source, &video_convert, &video_rate, &video_scale]) - .expect("Could not link source through video scale!"); + source + .link(&video_convert) + .expect("Could not link source to video convert"); + + let tee_caps = gstreamer::Caps::builder("video/x-raw").build(); + + video_convert + .link_filtered(&tee, &tee_caps) + .expect("Could not link video convert and tee!"); + + let tee_src_1 = tee + .request_pad_simple("src_%u") + .expect("Could not get src pad 1 from tee"); + + tee_src_1 + .link( + &video_rate + .static_pad("sink") + .expect("Could not get static sink pad from video_rate"), + ) + .expect("Could not link tee pad and video rate"); + + video_rate + .link(&video_scale) + .expect("Could not link video rate to video scale"); video_scale .link_filtered(&vp8enc, &video_scale_caps) @@ -78,6 +119,33 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]) .expect("Could not gst link vp8enc through appsink!"); + #[cfg(not(target_os = "windows"))] + { + let hailo_sink = ElementFactory::make("shmsink") + .build() + .expect("Could not create the shared memory sink!"); + + hailo_sink.set_property_from_str("socket-path", SOCKET_PATH); + hailo_sink.set_property_from_str("sync", "true"); + hailo_sink.set_property_from_str("wait-for-connection", "false"); + + pipeline + .add(&hailo_sink) + .expect("Could not add hailo shm to pipeline bin!"); + + let tee_src_2 = tee + .request_pad_simple("src_%u") + .expect("Could not create hailo tee src pad"); + + tee_src_2 + .link( + &hailo_sink + .static_pad("sink") + .expect("Could not get hailo static sink pad"), + ) + .expect("Could not link tee src pad to hailo sink pad"); + } + Pipeline { pipeline, sink: app_sink, diff --git a/src/main.rs b/src/main.rs index 275d59b..71da611 100644 --- a/src/main.rs +++ b/src/main.rs @@ -166,34 +166,37 @@ async fn main() { } let mut do_webrtc = false; - match app_receiver.recv().await { - Ok(ApplicationMessage::NameRequest(None)) => { - if let Err(e) = app_sender - .send(ApplicationMessage::NameRequest(Some( - SATELLITE_NAME.to_owned(), - ))) - .await - { - error!("Could not let the remote know my name! {e}"); + loop { + match app_receiver.recv().await { + Ok(ApplicationMessage::NameRequest(None)) => { + if let Err(e) = app_sender + .send(ApplicationMessage::NameRequest(Some( + SATELLITE_NAME.to_owned(), + ))) + .await + { + error!("Could not let the remote know my name! {e}"); + } } - } - Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => { - if let Err(e) = app_sender - .send(ApplicationMessage::ConnectionSupportsWebRTC(true)) - .await - { - error!("Coudl not post request webrtc support=true to async channel, dropping connection: {e}"); + Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => { + if let Err(e) = app_sender + .send(ApplicationMessage::ConnectionSupportsWebRTC(true)) + .await + { + error!("Coudl not post request webrtc support=true to async channel, dropping connection: {e}"); + break; + } + } + Ok(ApplicationMessage::ConnectionSupportsWebRTC(state)) => { + do_webrtc = state; break; } - } - Ok(ApplicationMessage::ConnectionSupportsWebRTC(state)) => { - do_webrtc = state; - } - Err(e) => { - error!("Could not receive incoming message from remote during webrtc status check!: {e}"); - } - _ => { - info!("Received unhandled message from remote"); + Err(e) => { + error!("Could not receive incoming message from remote during webrtc status check!: {e}"); + } + _ => { + info!("Received unhandled message from remote"); + } } } @@ -340,6 +343,7 @@ async fn main() { error!("I got a tracking boxes message?"); } ApplicationMessage::ManualMovementOverride((x, y)) => { + debug!("Received a manual override {}, {}", x, y); if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await { error!("Could not send manual override to state machine! {e}"); break;