moved pipeline to seperate file

This commit is contained in:
Nickiel12 2024-04-24 21:29:59 -07:00
parent 0cea5e5c11
commit dedae02afc
5 changed files with 207 additions and 42 deletions

26
Cargo.lock generated
View file

@ -1094,6 +1094,19 @@ dependencies = [
"libc",
]
[[package]]
name = "interprocess"
version = "2.0.0"
source = "git+https://github.com/kotauskas/interprocess.git#5023981cac9cc3dcc184a461b97061ec359f54d4"
dependencies = [
"futures-core",
"libc",
"recvmsg",
"tokio",
"widestring",
"windows-sys 0.52.0",
]
[[package]]
name = "io-kit-sys"
version = "0.4.1"
@ -1141,6 +1154,7 @@ dependencies = [
"gst-plugin-gtk4",
"gstreamer",
"gtk4",
"interprocess",
"log",
"serde",
"simplelog",
@ -1573,6 +1587,12 @@ dependencies = [
"getrandom",
]
[[package]]
name = "recvmsg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175"
[[package]]
name = "ron"
version = "0.8.1"
@ -2124,6 +2144,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "widestring"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311"
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -21,3 +21,5 @@ simplelog = "0.12.2"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "time"] }
tokio-tungstenite = "0.21.0"
toml = "0.8.12"
# interprocess = { version = "1.2.1", features = ["tokio_support"] }
interprocess = { version = "2.0.0", git = "https://github.com/kotauskas/interprocess.git", features = ["tokio"] }

View file

@ -0,0 +1,113 @@
use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExtManual, PadExt};
use gstreamer::{Element, ElementFactory, Pipeline, State};
pub struct WebcamPipeline {
pub pipeline: Pipeline,
pub src: Element,
pub converter: Element,
pub tee: Element,
pub sink_paintable: Element,
pub resize: Element,
pub decode: Element,
pub sink_frame: Element,
}
impl WebcamPipeline {
pub fn new() -> WebcamPipeline {
let pipeline = Pipeline::with_name("webcam_pipeline");
let source = ElementFactory::make("mfvideosrc")
.build()
.expect("Could not build video source for GStreamer");
let convert = ElementFactory::make("videoconvert")
.build()
.expect("Could not build video convert for GStreamer");
let tee = ElementFactory::make("tee")
.build()
.expect("Could not create tee element");
let sink_paintable = ElementFactory::make("gtk4paintablesink")
.build()
.expect("Could not build gtk sink for GStreamer");
let resize = ElementFactory::make("videoscale")
.build()
.expect("Could not build videoscale for GStreamer");
todo!("Video XRaw is not actually working yet");
let decode = ElementFactory::make("video/x-raw,width=650,height=480")
.build()
.expect("Could not create decoder");
let sink_frame = ElementFactory::make("appsink name=frame_sink")
.build()
.expect("Could not build appsrc for GStreamer");
pipeline.add_many(&[
&source,
&convert,
&tee,
&sink_paintable,
&resize,
&sink_frame,
]).expect("Could not link the elements to the pipeline");
source
.link(&convert)
.expect("Could not link video source to converter");
convert.link(&tee).expect("Could not link converter to tee");
let tee_pad_template = tee
.pad_template("src_%u")
.expect("Could not get pad templates");
let tee_src_1 = tee
.request_pad(&tee_pad_template, Some("paintable_src_pad"), None)
.expect("Could not create src pad 1");
let sink_paintable_sinkpad = sink_paintable
.static_pad("sink")
.expect("Could not get sink pad for paintablesink");
tee_src_1
.link(&sink_paintable_sinkpad)
.expect("Could not link tee srcpad 1 to paintablesink pad");
let tee_src_2 = tee
.request_pad(&tee_pad_template, Some("output_src_pad"), None)
.expect("Could not create src pad 2");
let sink_frameoutput_sinkpad = sink_frame
.static_pad("sink")
.expect("Could not get sink pad for frameoutput sink");
tee_src_2
.link(&sink_frameoutput_sinkpad)
.expect("Could not link tee srcpad 2 to frame output sink pad");
resize
.link(&decode)
.expect("Could not bind resize to decoder");
decode
.link(&sink_frame)
.expect("Could not bind decoder to webcam frame output");
WebcamPipeline {
pipeline,
src: source,
converter: convert,
tee,
sink_paintable,
resize,
decode,
sink_frame,
}
}
}
impl Drop for WebcamPipeline {
fn drop(&mut self) {
self.pipeline
.set_state(State::Null)
.expect("Could not close pipeline during window deconstruction");
}
}

View file

@ -1,7 +1,6 @@
use std::sync::{Arc, Mutex};
use gstreamer::prelude::{ElementExt, ElementExtManual, GstBinExt};
use gstreamer::{ElementFactory, Pipeline, State};
use gstreamer::{prelude::ElementExt, State};
use gtk::cairo::Context;
use gtk::gdk::Paintable;
use gtk::{glib, prelude::*, Box, Entry, Label, ListBox};
@ -11,6 +10,8 @@ use serde::{Deserialize, Serialize};
use tokio::runtime::Handle;
use tokio_tungstenite::tungstenite::Message;
mod gstreamer_pipeline;
mod shared_video_pipe;
use crate::config::{load_config, save_config};
use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent};
@ -123,20 +124,24 @@ pub fn build_ui(app: &Application, runtime: Handle) {
let overlay_box = gtk::Overlay::new();
main_box.append(&overlay_box);
let (pipeline, paintable) = create_pipeline();
let pipeline = gstreamer_pipeline::WebcamPipeline::new();
pipeline
.pipeline
.set_state(State::Playing)
.expect("Could not set pipeline state to playing");
webcam_picture.set_paintable(Some(&paintable));
let pipeline2 = pipeline.pipeline.clone();
window.connect_close_request(move |_| {
pipeline
pipeline2
.set_state(State::Null)
.expect("Could not close pipeline during window deconstruction");
glib::Propagation::Proceed
});
let paintable = pipeline.sink_paintable.property::<Paintable>("paintable");
webcam_picture.set_paintable(Some(&paintable));
let drawable = gtk::DrawingArea::builder()
.content_height(480)
.content_width(640)
@ -148,6 +153,12 @@ pub fn build_ui(app: &Application, runtime: Handle) {
overlay_box.set_child(Some(&webcam_picture));
overlay_box.add_overlay(&drawable);
let pipeline3 = pipeline.pipeline.clone();
runtime.spawn(shared_video_pipe::create_outbound_pipe(
pipeline3,
to_mec.clone(),
));
// Connect to "clicked" signal of `button`
button.connect_clicked(glib::clone!(@weak ip_entry, @weak port_entry, @strong to_mec => move |_button| {
let ip_text = ip_entry.text();
@ -245,39 +256,3 @@ fn draw_boxes(ctx: &Context, boxes: Arc<Mutex<Vec<BoxCoords>>>) {
error!("Could not get lock on boxes for drawing on the draw area!");
}
}
// Function to create GStreamer pipeline
fn create_pipeline() -> (Pipeline, Paintable) {
// Create pipeline
let pipeline = Pipeline::with_name("webcam_pipeline");
// Create elements
let source = ElementFactory::make("mfvideosrc")
.build()
.expect("Could not build video source for GStreamer");
let convert = ElementFactory::make("videoconvert")
.build()
.expect("Could not build video convert for GStreamer");
let sink = ElementFactory::make("gtk4paintablesink")
.build()
.expect("Could not build gtk sink for GStreamer");
let paintable = sink.property::<Paintable>("paintable");
// Add elements to the pipeline
pipeline
.add(&source)
.expect("Could not add GStreamer webcam source to pipeline");
pipeline
.add(&convert)
.expect("Could not add GStreamer webcam convert to pipeline");
pipeline
.add(&sink)
.expect("Could not add GStreamer gtksink to GStreamer pipeline");
// Link elements
source.link(&convert).unwrap();
convert.link(&sink).unwrap();
(pipeline, paintable)
}

View file

@ -0,0 +1,49 @@
use async_channel::Sender;
use gstreamer::{prelude::ElementExt, Pipeline};
use interprocess::os::windows::named_pipe::{
pipe_mode,
tokio::{DuplexPipeStream, PipeStream},
};
use log::error;
use tokio::{io::AsyncWriteExt, runtime::Handle, sync::broadcast};
use crate::coordinator::ApplicationEvent;
pub async fn create_outbound_pipe(pipeline: Pipeline, to_mec: Sender<ApplicationEvent>) {
if let Ok(pipe) =
DuplexPipeStream::<pipe_mode::Bytes>::connect_by_path(r"\\.\pipe\example_pipe").await
{
// send pipeline images as bytes
// let bus = pipeline.get_bus().expect("Pipeline has no bus");
// let mut frame_count = 0;
if let Err(e) = send_to_pipe(pipe).await {
error!("Error in sending to the pipe: {e}");
}
// while let Some(msg) = bus.timed_pop(gstreamer::ClockTime::from_seconds(1)) {
// use gstreamer::MessageView;
// match msg.view() {
// MessageView::Element(element) => {
// if let Some(buffer) = element.structure().get::<gstreamer::Buffer>("buffer") {
// frame_count += 1;
// println!("Writing frame {}", frame_count);
// let data = buffer.map_readable().expect("Failed to map buffer");
// // Send the data to the broadcast channel
// tx.send(data.as_slice()).expect("Failed to send data");
// }
// }
// _ => (),
// }
// }
}
}
async fn send_to_pipe(
mut pipe: PipeStream<pipe_mode::Bytes, pipe_mode::Bytes>,
) -> Result<(), Box<dyn std::error::Error>> {
pipe.write_all(b"Hello World").await?;
pipe.shutdown().await?;
Ok(())
}