diff --git a/Cargo.lock b/Cargo.lock index b667076..f3eb227 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -250,7 +250,7 @@ checksum = "2c3d816ce6f0e2909a96830d6911c2aff044370b1ef92d7f267b43bae5addedd" dependencies = [ "atk-sys", "bitflags 1.3.2", - "glib 0.15.12", + "glib", "libc", ] @@ -260,8 +260,8 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58aeb089fb698e06db8089971c7ee317ab9644bade33383f63631437b03aafb6" dependencies = [ - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "glib-sys", + "gobject-sys", "libc", "system-deps 6.2.2", ] @@ -272,12 +272,6 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" -[[package]] -name = "atomic_refcell" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" - [[package]] name = "autocfg" version = "1.3.0" @@ -485,7 +479,7 @@ checksum = "c76ee391b03d35510d9fa917357c7f1855bd9a6659c95a1b392e33f49b3369bc" dependencies = [ "bitflags 1.3.2", "cairo-sys-rs", - "glib 0.15.12", + "glib", "libc", "thiserror", ] @@ -496,7 +490,7 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c55d429bef56ac9172d25fecb85dc8068307d17acd74b377866b7a1ef25d3c8" dependencies = [ - "glib-sys 0.15.10", + "glib-sys", "libc", "system-deps 6.2.2", ] @@ -1484,7 +1478,7 @@ dependencies = [ "gdk-pixbuf", "gdk-sys", "gio", - "glib 0.15.12", + "glib", "libc", "pango", ] @@ -1498,7 +1492,7 @@ dependencies = [ "bitflags 1.3.2", "gdk-pixbuf-sys", "gio", - "glib 0.15.12", + "glib", "libc", ] @@ -1508,9 +1502,9 @@ version = "0.15.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "140b2f5378256527150350a8346dbdb08fadc13453a7a2d73aecd5fab3c402a7" dependencies = [ - "gio-sys 0.15.10", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "gio-sys", + "glib-sys", + "gobject-sys", "libc", "system-deps 6.2.2", ] @@ -1523,9 +1517,9 @@ checksum = "32e7a08c1e8f06f4177fb7e51a777b8c1689f743a7bc11ea91d44d2226073a88" dependencies = [ "cairo-sys-rs", "gdk-pixbuf-sys", - "gio-sys 0.15.10", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "gio-sys", + "glib-sys", + "gobject-sys", "libc", "pango-sys", "pkg-config", @@ -1539,8 +1533,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cca49a59ad8cfdf36ef7330fe7bdfbe1d34323220cc16a0de2679ee773aee2c2" dependencies = [ "gdk-sys", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "glib-sys", + "gobject-sys", "libc", "pkg-config", "system-deps 6.2.2", @@ -1553,7 +1547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4b7f8c7a84b407aa9b143877e267e848ff34106578b64d1e0a24bf550716178" dependencies = [ "gdk-sys", - "glib-sys 0.15.10", + "glib-sys", "libc", "system-deps 6.2.2", "x11", @@ -1665,8 +1659,8 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", - "gio-sys 0.15.10", - "glib 0.15.12", + "gio-sys", + "glib", "libc", "once_cell", "thiserror", @@ -1678,26 +1672,13 @@ version = "0.15.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32157a475271e2c4a023382e9cab31c4584ee30a97da41d3c4e9fdd605abcf8d" dependencies = [ - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "glib-sys", + "gobject-sys", "libc", "system-deps 6.2.2", "winapi", ] -[[package]] -name = "gio-sys" -version = "0.19.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cd743ba4714d671ad6b6234e8ab2a13b42304d0e13ab7eba1dcdd78a7d6d4ef" -dependencies = [ - "glib-sys 0.19.8", - "gobject-sys 0.19.8", - "libc", - "system-deps 6.2.2", - "windows-sys 0.52.0", -] - [[package]] name = "glib" version = "0.15.12" @@ -1709,37 +1690,15 @@ dependencies = [ "futures-core", "futures-executor", "futures-task", - "glib-macros 0.15.13", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "glib-macros", + "glib-sys", + "gobject-sys", "libc", "once_cell", "smallvec", "thiserror", ] -[[package]] -name = "glib" -version = "0.19.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39650279f135469465018daae0ba53357942a5212137515777d5fdca74984a44" -dependencies = [ - "bitflags 2.6.0", - "futures-channel", - "futures-core", - "futures-executor", - "futures-task", - "futures-util", - "gio-sys 0.19.8", - "glib-macros 0.19.9", - "glib-sys 0.19.8", - "gobject-sys 0.19.8", - "libc", - "memchr", - "smallvec", - "thiserror", -] - [[package]] name = "glib-macros" version = "0.15.13" @@ -1748,26 +1707,13 @@ checksum = "10c6ae9f6fa26f4fb2ac16b528d138d971ead56141de489f8111e259b9df3c4a" dependencies = [ "anyhow", "heck 0.4.1", - "proc-macro-crate 1.3.1", + "proc-macro-crate", "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", ] -[[package]] -name = "glib-macros" -version = "0.19.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4429b0277a14ae9751350ad9b658b1be0abb5b54faa5bcdf6e74a3372582fad7" -dependencies = [ - "heck 0.5.0", - "proc-macro-crate 3.1.0", - "proc-macro2", - "quote", - "syn 2.0.76", -] - [[package]] name = "glib-sys" version = "0.15.10" @@ -1778,16 +1724,6 @@ dependencies = [ "system-deps 6.2.2", ] -[[package]] -name = "glib-sys" -version = "0.19.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c2dc18d3a82b0006d470b13304fbbb3e0a9bd4884cf985a60a7ed733ac2c4a5" -dependencies = [ - "libc", - "system-deps 6.2.2", -] - [[package]] name = "glob" version = "0.3.1" @@ -1813,18 +1749,7 @@ version = "0.15.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d57ce44246becd17153bd035ab4d32cfee096a657fc01f2231c9278378d1e0a" dependencies = [ - "glib-sys 0.15.10", - "libc", - "system-deps 6.2.2", -] - -[[package]] -name = "gobject-sys" -version = "0.19.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e697e252d6e0416fd1d9e169bda51c0f1c926026c39ca21fbe8b1bb5c3b8b9e" -dependencies = [ - "glib-sys 0.19.8", + "glib-sys", "libc", "system-deps 6.2.2", ] @@ -1840,98 +1765,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "gstreamer" -version = "0.22.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca0b90646bb67fccf80d228f5333f2a0745526818ccefbf5a97326c76d30e4d" -dependencies = [ - "cfg-if", - "futures-channel", - "futures-core", - "futures-util", - "glib 0.19.9", - "gstreamer-sys", - "itertools 0.13.0", - "libc", - "muldiv", - "num-integer", - "num-rational", - "once_cell", - "option-operations", - "paste", - "pin-project-lite", - "smallvec", - "thiserror", -] - -[[package]] -name = "gstreamer-app" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1363313eb1931d66ac0b82c9b477fdd066af9dc118ea844966f85b6d99f261fd" -dependencies = [ - "futures-core", - "futures-sink", - "glib 0.19.9", - "gstreamer", - "gstreamer-app-sys", - "gstreamer-base", - "libc", -] - -[[package]] -name = "gstreamer-app-sys" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed667453517b47754b9f9d28c096074e5d565f1cc48c6fa2483b1ea10d7688d3" -dependencies = [ - "glib-sys 0.19.8", - "gstreamer-base-sys", - "gstreamer-sys", - "libc", - "system-deps 6.2.2", -] - -[[package]] -name = "gstreamer-base" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39d55668b23fc69f1843daa42b43d289c00fe38e9586c5453b134783d2dd75a3" -dependencies = [ - "atomic_refcell", - "cfg-if", - "glib 0.19.9", - "gstreamer", - "gstreamer-base-sys", - "libc", -] - -[[package]] -name = "gstreamer-base-sys" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5448abb00c197e3ad306710293bf757303cbeab4036b5ccad21c7642b8bf00c9" -dependencies = [ - "glib-sys 0.19.8", - "gobject-sys 0.19.8", - "gstreamer-sys", - "libc", - "system-deps 6.2.2", -] - -[[package]] -name = "gstreamer-sys" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71f147e7c6bc9313d5569eb15da61f6f64026ec69791922749de230583a07286" -dependencies = [ - "glib-sys 0.19.8", - "gobject-sys 0.19.8", - "libc", - "system-deps 6.2.2", -] - [[package]] name = "gtk" version = "0.15.5" @@ -1946,7 +1779,7 @@ dependencies = [ "gdk", "gdk-pixbuf", "gio", - "glib 0.15.12", + "glib", "gtk-sys", "gtk3-macros", "libc", @@ -1965,9 +1798,9 @@ dependencies = [ "cairo-sys-rs", "gdk-pixbuf-sys", "gdk-sys", - "gio-sys 0.15.10", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "gio-sys", + "glib-sys", + "gobject-sys", "libc", "pango-sys", "system-deps 6.2.2", @@ -1980,7 +1813,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "684c0456c086e8e7e9af73ec5b84e35938df394712054550e81558d21c44ab0d" dependencies = [ "anyhow", - "proc-macro-crate 1.3.1", + "proc-macro-crate", "proc-macro-error", "proc-macro2", "quote", @@ -2387,15 +2220,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "0.4.8" @@ -2415,7 +2239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf053e7843f2812ff03ef5afe34bb9c06ffee120385caad4f6b9967fcd37d41c" dependencies = [ "bitflags 1.3.2", - "glib 0.15.12", + "glib", "javascriptcore-rs-sys", ] @@ -2425,8 +2249,8 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "905fbb87419c5cde6e3269537e4ea7d46431f3008c5d057e915ef3f115e7793c" dependencies = [ - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "glib-sys", + "gobject-sys", "libc", "system-deps 5.0.0", ] @@ -2719,12 +2543,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "muldiv" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" - [[package]] name = "ndk" version = "0.6.0" @@ -2835,16 +2653,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-rational" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" -dependencies = [ - "num-integer", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -2869,7 +2677,7 @@ version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" dependencies = [ - "proc-macro-crate 1.3.1", + "proc-macro-crate", "proc-macro2", "quote", "syn 1.0.109", @@ -2933,15 +2741,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "option-operations" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c26d27bb1aeab65138e4bf7666045169d1717febcc9ff870166be8348b223d0" -dependencies = [ - "paste", -] - [[package]] name = "ordered-multimap" version = "0.6.0" @@ -2989,7 +2788,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22e4045548659aee5313bde6c582b0d83a627b7904dd20dc2d9ef0895d414e4f" dependencies = [ "bitflags 1.3.2", - "glib 0.15.12", + "glib", "libc", "once_cell", "pango-sys", @@ -3001,8 +2800,8 @@ version = "0.15.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2a00081cde4661982ed91d80ef437c20eacaf6aa1a5962c0279ae194662c3aa" dependencies = [ - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "glib-sys", + "gobject-sys", "libc", "system-deps 6.2.2", ] @@ -3036,12 +2835,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "paste" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" - [[package]] name = "pathdiff" version = "0.2.1" @@ -3384,15 +3177,6 @@ dependencies = [ "toml_edit 0.19.15", ] -[[package]] -name = "proc-macro-crate" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" -dependencies = [ - "toml_edit 0.21.1", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3449,7 +3233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools", "proc-macro2", "quote", "syn 2.0.76", @@ -4112,7 +3896,7 @@ checksum = "b2b4d76501d8ba387cf0fefbe055c3e0a59891d09f0f995ae4e4b16f6b60f3c0" dependencies = [ "bitflags 1.3.2", "gio", - "glib 0.15.12", + "glib", "libc", "once_cell", "soup2-sys", @@ -4125,9 +3909,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "009ef427103fcb17f802871647a7fa6c60cbb654b4c4e4c0ac60a31c5f6dc9cf" dependencies = [ "bitflags 1.3.2", - "gio-sys 0.15.10", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "gio-sys", + "glib-sys", + "gobject-sys", "libc", "system-deps 5.0.0", ] @@ -4326,8 +4110,8 @@ dependencies = [ "gdkwayland-sys", "gdkx11-sys", "gio", - "glib 0.15.12", - "glib-sys 0.15.10", + "glib", + "glib-sys", "gtk", "image", "instant", @@ -4396,7 +4180,7 @@ dependencies = [ "flate2", "futures-util", "getrandom 0.2.15", - "glib 0.15.12", + "glib", "glob", "gtk", "heck 0.5.0", @@ -4824,17 +4608,6 @@ dependencies = [ "winnow 0.5.40", ] -[[package]] -name = "toml_edit" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" -dependencies = [ - "indexmap 2.4.0", - "toml_datetime", - "winnow 0.5.40", -] - [[package]] name = "toml_edit" version = "0.22.20" @@ -5163,11 +4936,10 @@ dependencies = [ "bincode", "config", "console-subscriber", + "futures", "futures-core", "futures-util", "gilrs", - "gstreamer", - "gstreamer-app", "lazy_static", "log", "serde", @@ -5344,10 +5116,10 @@ dependencies = [ "gdk", "gdk-sys", "gio", - "gio-sys 0.15.10", - "glib 0.15.12", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "gio-sys", + "glib", + "glib-sys", + "gobject-sys", "gtk", "gtk-sys", "javascriptcore-rs", @@ -5368,9 +5140,9 @@ dependencies = [ "cairo-sys-rs", "gdk-pixbuf-sys", "gdk-sys", - "gio-sys 0.15.10", - "glib-sys 0.15.10", - "gobject-sys 0.15.10", + "gio-sys", + "glib-sys", + "gobject-sys", "gtk-sys", "javascriptcore-rs-sys", "libc", @@ -6015,7 +5787,7 @@ dependencies = [ "dunce", "gdk", "gio", - "glib 0.15.12", + "glib", "gtk", "html5ever", "http 0.2.12", diff --git a/Cargo.toml b/Cargo.toml index 8c2fafd..c4bb6b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,45 +1,44 @@ -[package] -name = "vcs-controller" -version = "2.0.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[build-dependencies] -tauri-build = { version = "1.5.1", features = [] } - - -[features] -tracker-state-debug = [] -tokio-logging = [] -# this feature is used for production builds or when `devPath` points to the filesystem and the built-in dev server is disabled. -# If you use cargo directly instead of tauri's cli you can use this feature flag to switch between tauri's `dev` and `build` modes. -# DO NOT REMOVE!! -custom-protocol = [ "tauri/custom-protocol" ] - -[dependencies] -async-channel = "2.2.0" -async-recursion = "1.1.1" -config = "0.14.0" -futures-core = "0.3.30" -futures-util = { version = "0.3.30" } -gilrs = "0.10.6" -gstreamer = { version = "0.22.4", features = ["v1_22"] } -gstreamer-app = { version = "0.22.0", features = ["v1_22"] } -log = "0.4.21" -serde = { version = "1.0.197", features = ["derive"] } -serde_json = "1.0" -tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "sync"] } -tokio-tungstenite = "0.21.0" -toml = "0.8.12" -tracing = "0.1.40" -tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] } -tracing-appender = "0.2.3" -snafu = "0.8.2" -console-subscriber = "0.3.0" -tauri = { version = "1.6.1", features = [] } -lazy_static = "1.5.0" - -vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } -bincode = "1.3.3" - - +[package] +name = "vcs-controller" +version = "2.0.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[build-dependencies] +tauri-build = { version = "1.5.1", features = [] } + + +[features] +tracker-state-debug = [] +tokio-logging = [] +# this feature is used for production builds or when `devPath` points to the filesystem and the built-in dev server is disabled. +# If you use cargo directly instead of tauri's cli you can use this feature flag to switch between tauri's `dev` and `build` modes. +# DO NOT REMOVE!! +custom-protocol = [ "tauri/custom-protocol" ] + +[dependencies] +async-channel = "2.2.0" +async-recursion = "1.1.1" +config = "0.14.0" +futures-core = "0.3.30" +futures-util = { version = "0.3.30" } +gilrs = "0.10.6" +log = "0.4.21" +serde = { version = "1.0.197", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "sync"] } +tokio-tungstenite = "0.21.0" +toml = "0.8.12" +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] } +tracing-appender = "0.2.3" +snafu = "0.8.2" +console-subscriber = "0.3.0" +tauri = { version = "1.6.1", features = [] } +lazy_static = "1.5.0" + +vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } +bincode = "1.3.3" +futures = "0.3.30" + + diff --git a/flake.nix b/flake.nix index 6519e3f..62874ce 100644 --- a/flake.nix +++ b/flake.nix @@ -39,12 +39,6 @@ Some utility commands: tailwindcss openssl systemd - gtk4 - gst_all_1.gstreamer - gst_all_1.gst-plugins-base - gst_all_1.gst-plugins-good - gst_all_1.gst-plugins-bad - gst_all_1.gst-plugins-rs ]; cargoHash = nixpkgs.lib.fakeHash; }; @@ -147,3 +141,4 @@ Some utility commands: }; } + diff --git a/src/config.rs b/src/config.rs index 1c1ab97..ca9c356 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,10 +5,20 @@ use std::fs::File; use std::io::Write; use tracing::{info, instrument}; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ConnectionString { + pub ip: String, + pub port: u32, +} +impl ConnectionString { + pub fn build_conn_string(&self) -> String { + format!("ws://{}:{}", self.ip, self.port) + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppConfig { - pub camera_ip: String, - pub camera_port: u32, + pub cameras: Vec, pub tracker_ip: String, pub tracker_port: u32, @@ -19,8 +29,10 @@ pub struct AppConfig { impl Default for AppConfig { fn default() -> Self { AppConfig { - camera_ip: "10.0.0.33".to_string(), - camera_port: 8765, + cameras: vec![ConnectionString { + ip: "10.0.0.29".to_owned(), + port: 8765, + }], tracker_ip: "10.0.0.210".to_string(), tracker_port: 6543, diff --git a/src/coordinator/coord_state.rs b/src/coordinator/coord_state.rs deleted file mode 100644 index 0b17489..0000000 --- a/src/coordinator/coord_state.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::pin::Pin; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; -use std::time::Instant; - -use async_channel::{Receiver, Sender}; -use futures_util::{stream::SplitSink, SinkExt, StreamExt}; -use tokio::net::TcpStream; -use tokio::runtime::Handle; -use tokio::sync::RwLock; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; -use tracing::{debug, error, info, instrument}; - -use crate::config::AppConfig; -use crate::coordinator::socket_listen; -use crate::coordinator::tracker_state::TrackerState; -use crate::gstreamer_pipeline; -use crate::sources::joystick_source::joystick_loop; - -use super::perf_state::TrackerMetrics; -use super::remote_video_processor::remote_video_loop; -use super::{ApplicationEvent, ConnectionType}; - -#[derive(Debug)] -pub struct SocketState { - pub is_connected: AtomicBool, - pub stay_connected: AtomicBool, -} - -#[derive(Debug)] -pub struct CoordState<'a> { - pub settings: Arc>, - pub tracker_metrics: TrackerMetrics, - pub runtime: Handle, - - pub sck_outbound: Option>, Message>>, - pub stay_alive_sck_recvr: Arc, - pub joystick_loop_alive: Arc, - - pub current_priority: ConnectionType, - pub last_update_of_priority: Instant, - - pub mec: Pin<&'a mut Receiver>, - pub to_mec: Sender, - - pub pipeline: gstreamer_pipeline::WebcamPipeline, - - pub tracker_state: TrackerState, - pub tracker_connection_state: Arc, -} - -impl<'a> CoordState<'a> { - pub fn new( - mec: Pin<&'a mut Receiver>, - to_mec: Sender, - runtime: Handle, - settings: Arc>, - jpeg_quality: i32, - ) -> Self { - CoordState { - settings, - tracker_metrics: TrackerMetrics::new(), - runtime, - - sck_outbound: None, - stay_alive_sck_recvr: Arc::new(AtomicBool::new(false)), - joystick_loop_alive: Arc::new(AtomicBool::new(false)), - - current_priority: ConnectionType::Local, - last_update_of_priority: Instant::now(), - - mec, - to_mec, - - pipeline: gstreamer_pipeline::WebcamPipeline::new(jpeg_quality).unwrap(), - - tracker_state: TrackerState{ - tracking_id: 0, - highlighted_id: None, - last_detect: Instant::now(), - enabled: true, - - identity_boxes: vec![], - update_ids: false, - }, - tracker_connection_state: Arc::new(SocketState { - stay_connected: AtomicBool::new(false), - is_connected: AtomicBool::new(false), - }), - } - } - - #[instrument(skip(self))] - pub async fn socket_send(&mut self, message: Message) { - if let Some(mut socket) = self.sck_outbound.take() { - if let Err(e) = socket.send(message).await { - error!("There was an error sending to the socket: {:#?}", e); - } else { - self.sck_outbound = Some(socket); - } - } - } - - pub fn socket_connected(&self) -> bool { - self.sck_outbound.is_some() - } - - pub async fn socket_start(&mut self) { - self.stay_alive_sck_recvr.store(true, Ordering::SeqCst); - - let conn_string: String = { - let read_settings = self.settings.read().await; - - format!( - "ws://{}:{}", - read_settings.camera_ip, - read_settings.camera_port - ) - }; - - match connect_async(conn_string).await { - Ok((val, _)) => { - info!("Socket connection to camera made successfully"); - - let (outbound, inbound) = val.split(); - let _socket_task = self.runtime.spawn(socket_listen( - self.to_mec.clone(), - self.stay_alive_sck_recvr.clone(), - inbound, - )); - self.sck_outbound = Some(outbound); - - } - Err(_) => { - error!("Couldn't connect to URL!"); - } - } - } - - pub async fn socket_close(&mut self) { - debug!("Cleaning up camera socket state"); - - if let Some(mut socket) = self.sck_outbound.take() { - if let Err(e) = socket.close().await { - error!("Couldn't close socket during shutdown: {e}"); - } - } - - self.stay_alive_sck_recvr.store(false, Ordering::SeqCst); - } - - pub async fn start_video_loop(&mut self) { - let conn_string: String = { - let read_settings = self.settings.read().await; - - format!( - "ws://{}:{}", - read_settings.tracker_ip, - read_settings.tracker_port - ) - }; - - let _remote_loop = self.runtime.spawn(remote_video_loop( - conn_string, - self.pipeline.sink_frame.clone(), - self.to_mec.clone(), - self.tracker_connection_state.clone(), - )); - } - - pub async fn check_states(&mut self) { - // This one needs to always be alive, and restart after a crash - if !self.joystick_loop_alive.load(Ordering::SeqCst) { - info!("Restarting joystick loop"); - let _joystick_future = self.runtime.spawn(joystick_loop( - self.to_mec.clone(), - self.joystick_loop_alive.clone(), - )); - } - - // If tracker state is not connected, and it should be - if !self - .tracker_connection_state - .is_connected - .load(Ordering::SeqCst) - && self - .tracker_connection_state - .stay_connected - .load(Ordering::SeqCst) - { - self.start_video_loop().await; - } - - // if stay alive is false, and there is a connection, kill it - if !self.stay_alive_sck_recvr.load(Ordering::SeqCst) && self.sck_outbound.is_some() { - self.socket_close().await; - } - } - - pub async fn close(&mut self) { - info!("closing coord state"); - self.tracker_connection_state - .stay_connected - .store(false, Ordering::SeqCst); - self.socket_close().await; - - self.joystick_loop_alive.store(false, Ordering::SeqCst); - self.mec.close(); - } -} diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs index d6643e4..09df574 100644 --- a/src/coordinator/mod.rs +++ b/src/coordinator/mod.rs @@ -1,229 +1,160 @@ -use std::pin::pin; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; -use std::time::{Duration, Instant}; +use std::sync::{atomic::AtomicBool, Arc}; +use std::time::Duration; -use async_channel::{Receiver, Sender}; -use futures_util::{stream::SplitStream, StreamExt}; -use gstreamer::prelude::ElementExt; -use gstreamer::State; +use async_channel::{Receiver, Sender, TryRecvError}; use tokio::net::TcpStream; use tokio::runtime::Handle; -use tokio::sync::RwLock; -use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; -use tracing::{debug, error, info, instrument}; +use tokio::sync::oneshot; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tracing::error; -mod coord_state; -mod process_box_string; -mod remote_video_processor; +use crate::config::{AppConfig, ConnectionString}; +use crate::sources::joystick_source::joystick_loop; -use crate::states::perf_state; -use crate::states::tracker_state; -use crate::states::box_coords::NormalizedBoxCoords; -use crate::config::AppConfig; -pub use coord_state::{CoordState, SocketState}; +pub enum ApplicationEvent { + WebRTCMessage, + Move(Point), + Close, +} -const PRIORITY_TIMEOUT: Duration = Duration::from_secs(2); - -#[derive(Clone)] -pub struct MoveEvent { +pub struct Point { pub x: i32, pub y: i32, } -#[derive(Clone, Copy, PartialEq, PartialOrd, Debug)] -pub enum ConnectionType { - Local, - // Remote, - Automated, +pub struct SatelliteConnection { + connection: ConnectionString, + socket: Option>>, + socket_in_progress: Option< + oneshot::Receiver< + Result< + WebSocketStream>, + tokio_tungstenite::tungstenite::Error, + >, + >, + >, } -pub enum TrackerUpdate { - Clear, - Fail, - Update(TrackerUpdatePackage), -} - -#[derive(Clone)] -pub struct TrackerUpdatePackage { - pub boxes: Vec, - pub time: Instant, - pub request_duration: Duration, -} - -pub enum ApplicationEvent { - CameraConnectionPress, - SocketMessage(Message), - MoveEvent(MoveEvent, ConnectionType), - TrackerUpdate(TrackerUpdate), - WebRTCMessage(String), - Close, -} - -#[instrument(skip_all)] -pub async fn start_coordinator( - // Main_Event_Channel - mec: Receiver, +pub struct AppState { to_mec: Sender, - runtime: Handle, - settings: Arc>, -) { - info!("Starting coordinator!"); + mec: Receiver, + pub runtime: Handle, - let mec = pin!(mec); + config: Arc>, - let jpeg_quality = settings.read().await.tracker_jpeg_quality; + pub camera_satellites: Vec, + pub endpoint_satellites: Vec, - let mut state = CoordState::new( - mec, - to_mec, - runtime, - settings, - jpeg_quality, - ); + pub joystick_task_is_alive: Arc, +} - // state - // .pipeline - // .pipeline - // .set_state(State::Playing) - // .expect("Could not set pipeline state to playing"); - // +impl AppState { + pub async fn new( + mec: Receiver, + to_mec: Sender, + config: Arc>, + rt: Handle, + ) -> Self { + let camera_satellites = config + .read() + .await + .cameras + .iter() + .map(|x| SatelliteConnection { + connection: x.clone(), + socket: None, + socket_in_progress: None, + }) + .collect::>(); - state.check_states().await; + AppState { + to_mec, + mec, + runtime: rt, - while let Some(msg) = state.mec.next().await { - state.check_states().await; + config, + + camera_satellites, + endpoint_satellites: vec![], + + joystick_task_is_alive: Arc::new(AtomicBool::new(false)), + } + } + + pub fn check_alive_things(&mut self) { + if !self + .joystick_task_is_alive + .load(std::sync::atomic::Ordering::SeqCst) + { + self.runtime.spawn(joystick_loop( + self.to_mec.clone(), + Arc::clone(&self.joystick_task_is_alive), + )); + } + + for connection in self.camera_satellites.iter_mut() { + if connection.socket_in_progress.is_some() { + if connection.socket_in_progress.map(|x| x.try_recv()) { - match msg { - ApplicationEvent::CameraConnectionPress => { - println!("connecting to camera!"); - if state.socket_connected() { - state.socket_close().await; - } else { - state.socket_start().await; } } - ApplicationEvent::Close => { - break; - } - ApplicationEvent::SocketMessage(socket_message) => { - state.socket_send(socket_message).await; - } - ApplicationEvent::MoveEvent(coord, priority) => { - // If Automatic control, but local event happens, override the automatice events for 2 seconds - if priority <= state.current_priority - || Instant::now() > state.last_update_of_priority + PRIORITY_TIMEOUT - { - state.last_update_of_priority = Instant::now(); - state.current_priority = priority; - if state.socket_connected() { - let message = format!( - "{}{}:{}{}", - if coord.y > 0 { "D" } else { "U" }, - coord.y.abs(), - if coord.x > 0 { "R" } else { "L" }, - coord.x.abs() - ); - - state.socket_send(Message::Text(message)).await; - } - } - } - ApplicationEvent::WebRTCMessage(msg) => { - info!("Pew pew! WebRTCMessage! {msg}"); - } - ApplicationEvent::TrackerUpdate(update) => match update { - TrackerUpdate::Clear => { - state.tracker_state.clear(); - state.tracker_metrics.clear_times(); - } - TrackerUpdate::Fail => { - let fail_count: usize = state.tracker_metrics.fail_count + 1; - state.tracker_metrics.starting_connection(Some(fail_count)); - } - TrackerUpdate::Update(update) => { - let mut x_adj: i32 = 0; - let mut y_adj: i32 = 0; - - state.tracker_state.update_from_boxes(update.boxes); - state.tracker_state.last_detect = update.time; - - match state.tracker_state.calculate_tracking() { - Ok((x, y, _tracker_enabled)) => { - x_adj = x; - y_adj = y; + if connection.socket.is_none() && connection.socket_in_progress.is_none() { + let (send, recv) = oneshot::channel::< + Result< + WebSocketStream>, + tokio_tungstenite::tungstenite::Error, + >, + >(); + connection.socket_in_progress = Some(recv); + let conn_string = connection.connection.build_conn_string(); + tokio::spawn(async move { + let res = connect_async(conn_string).await; + match res { + Ok((res, _)) => { + let _ = send.send(Ok(res)); // can't even close the socket if send + // returns an error } Err(e) => { - if state.tracker_state.tracking_id > 0 { - info!("Could not calculate the tracking!: {e}"); - } + let _ = send.send(Err(e)); } } - - let me = MoveEvent { x: x_adj, y: y_adj }; - if let Err(e) = state - .to_mec - .send(ApplicationEvent::MoveEvent( - me.clone(), - ConnectionType::Automated, - )) - .await - { - error!("Could not send to MEC... even though in the MEC?! {e}"); - } - state.tracker_metrics.insert_time(update.request_duration); - } - }, - } - } - - state - .pipeline - .pipeline - .set_state(State::Null) - .expect("Could not set pipeline state to playing"); - - state.close().await; - - info!("Stopping Coordinator"); -} - -async fn socket_listen( - mec: Sender, - stay_alive_sck_recvr: Arc, - mut reader: SplitStream>>, -) { - if stay_alive_sck_recvr.load(std::sync::atomic::Ordering::SeqCst) { - while let Some(msg) = reader.next().await { - match msg { - Ok(val) => { - match val { - Message::Ping(_) => { - // Do nothing because pings are handled on reads and write by tungestenite - } - _ => { - info!("Received message from the camera websocket? {:#?}", val); - } - } - } - Err(e) => { - error!("Websocket error: {:#?}", e); - } + }); } } - - // setting this will call the internal state.socket_close next check states - stay_alive_sck_recvr.store(false, Ordering::SeqCst); } - - // If the mec is closed or full, then this socket should be closing anyways - // as there was most likely an unrecoverable error - let _ = mec - .send(ApplicationEvent::SocketMessage(Message::Close(None))) - .await; - - debug!("Closed socket reading thread"); +} + +pub async fn run_main_event_loop( + mec: Receiver, + to_mec: Sender, + config: Arc>, + rt: Handle, +) { + let mut state = AppState::new(mec, to_mec, config, rt).await; + loop { + state.check_alive_things(); + + match state.mec.try_recv() { + Err(TryRecvError::Empty) => tokio::time::sleep(Duration::from_millis(50)).await, + Err(TryRecvError::Closed) => { + let close_handles: Vec<_> = state + .camera_satellites + .iter_mut() + .filter(|x| x.socket.is_some()) + .map(|x| { + let skt = x.socket.take().unwrap(); + close_socket(skt) + }) + .collect(); + futures::future::join_all(close_handles).await; + break; + } + Ok(msg) => {} + } + } +} + +async fn close_socket(mut skt: WebSocketStream>) { + let _ = skt.close(None).await; } diff --git a/src/coordinator/process_box_string.rs b/src/coordinator/process_box_string.rs deleted file mode 100644 index 83b6465..0000000 --- a/src/coordinator/process_box_string.rs +++ /dev/null @@ -1,43 +0,0 @@ -use crate::states::box_coords::NormalizedBoxCoords; - -pub fn process_incoming_string(message: String) -> Result, String> { - let mut boxes: Vec = Vec::new(); - - for line in message.lines() { - let parts: Vec<&str> = line.split(' ').collect(); - - let id = parts[0] - .replace(['[', ']'], "") - .parse() - .map_err(|_| "Invalid ID")?; - - if parts.len() != 3 { - return Err("Invalid socket input format: number of parts".to_string()); - } - - let coords: Vec<&str> = parts[1].split(':').collect(); - if coords.len() != 2 { - return Err("Invalid socket input format: coords 1".to_string()); - } - let x1: u32 = coords[0].parse().map_err(|_| "Invalid x coordinate")?; - let y1: u32 = coords[1].parse().map_err(|_| "Invalid y coordinate")?; - - let coords2: Vec<&str> = parts[2].split(':').collect(); - if coords2.len() != 2 { - return Err("Invalid socket input format: coords 2".to_string()); - } - - let x2: u32 = coords2[0].parse().map_err(|_| "Invalid width")?; - let y2: u32 = coords2[1].parse().map_err(|_| "Invalid width")?; - - boxes.push(NormalizedBoxCoords { - id, - x1: (x1 as f32 / 1000.0), - x2: (x2 as f32 / 1000.0), - y1: (y1 as f32 / 1000.0), - y2: (y2 as f32 / 1000.0), - }); - } - - Ok(boxes) -} diff --git a/src/coordinator/remote_video_processor.rs b/src/coordinator/remote_video_processor.rs deleted file mode 100644 index ad6986f..0000000 --- a/src/coordinator/remote_video_processor.rs +++ /dev/null @@ -1,196 +0,0 @@ -use std::{ - sync::{atomic::Ordering, Arc}, - time::{Duration, Instant}, -}; - -use async_recursion::async_recursion; - -use async_channel::Sender; -use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt}; -use gstreamer_app::AppSink; -use tokio::{net::TcpStream, sync::Mutex, time::sleep_until}; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; -use tracing::{error, info, instrument, warn}; - -use super::{ - process_box_string::process_incoming_string, ApplicationEvent, SocketState, TrackerUpdate, - TrackerUpdatePackage, -}; - -#[instrument(skip_all)] -pub async fn remote_video_loop( - conn_string: String, - appsink: Arc>, - to_mec: Sender, - socket_state: Arc, -) { - info!( - "Starting remote tracker processing connection to: {}", - conn_string - ); - - socket_state.is_connected.store(true, Ordering::SeqCst); - - match connect_async(&conn_string).await { - Err(e) => { - warn!("Could not connect to remote computer: {e}"); - if let Err(e) = to_mec - .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Fail)) - .await - { - error!("Could not send message to MEC! {e}"); - } - } - Ok((connection, _)) => { - let (mut sender, mut recvr) = connection.split(); - - let mut last_iter: Instant; - - loop { - last_iter = Instant::now(); - // Do this in an encloser to not keep a lock on the appsink - let image_message = { - let res = { - let appsnk = appsink.lock().await; - - get_video_frame(&appsnk) - }; - match res { - Ok(e) => e, - Err(e) => { - error!("Could not get video frame! {e}"); - if let Err(e) = sender.close().await { - error!("Could not close socket to remote computer: {e}") - } - socket_state.is_connected.store(false, Ordering::SeqCst); - break; - } - } - }; - - if let Err(e) = sender.send(image_message).await { - error!("There was an error sending the video frame to the server: {e}"); - if let Err(e) = sender.close().await { - error!("Could not close socket to remote computer: {e}") - } - socket_state.is_connected.store(false, Ordering::SeqCst); - socket_state.stay_connected.store(false, Ordering::SeqCst); - break; - } - - let do_not_break = handle_message(&mut recvr, &to_mec, last_iter).await; - - if !do_not_break { break; } - - - if !socket_state.stay_connected.load(Ordering::SeqCst) { - info!("Shutting down remote video loop"); - break; - } - - // rate limit updates - sleep_until(tokio::time::Instant::now() + Duration::from_millis(10)).await; - } - } - } - info!("Shutting down remote video loop"); - - if let Err(e) = to_mec - .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Clear)) - .await - { - error!("Could not send message to MEC! {e}"); - } - - { - // This message forces a redraw after clearing the queue - if let Err(e) = to_mec - .send(ApplicationEvent::MoveEvent( - crate::coordinator::MoveEvent { x: 0, y: 0 }, - crate::coordinator::ConnectionType::Automated, - )) - .await - { - error!( - "Error sending message to MEC during shutdown of tracker thread: {}", - e - ); - } - } - socket_state.is_connected.store(false, Ordering::SeqCst); -} - -fn get_video_frame(appsink: &AppSink) -> Result { - let sample = appsink - .pull_sample() - .map_err(|e| format!("Could not get sample: {e}"))?; - let buffer = sample.buffer().ok_or("Could not get buffer, was None")?; - let map = buffer - .map_readable() - .map_err(|e| format!("Could not get readable map: {e}"))?; - Ok(Message::binary(map.to_vec())) -} - -#[async_recursion] -async fn handle_message( - recvr: &mut SplitStream>>, - // sender: &mut SplitSink>, Message>, - to_mec: &Sender, - last_iter: Instant, -) -> bool { - match recvr.try_next().await { - Ok(Some(message)) => { - match message { - Message::Close(_) => { - info!("Close packet received from remote computer: {:?}", message); - return false; - } - Message::Pong(_) | Message::Frame(_) | Message::Text(_) => { - warn!("There was an unhandled message type from the camera: {}\n{}", message, message.to_string()); - // this was not the expected response, recursion! - return handle_message(recvr, to_mec, last_iter).await; - } - Message::Ping(_) => { - // Ping/Pongs are handled by tokio tungstenite on reads and writes - // this was not the expected response, recursion! - return handle_message(recvr, to_mec, last_iter).await; - } - Message::Binary(bin) => { - let message = std::str::from_utf8(&bin); - if let Err(e) = message { - error!("Could not decode binary message! Assuming corrupted response! {e}"); - } - - match process_incoming_string(message.unwrap().to_string()) { - Ok(v) => { - if let Err(e) = to_mec - .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Update( - TrackerUpdatePackage { - boxes: v, - time: Instant::now(), - request_duration: Instant::now() - last_iter, - }, - ))) - .await - { - error!("Could not send to MEC! {e}"); - return false; - } - } - Err(e) => { - error!("Could not parse incoming string! {}\n{}", e, message.unwrap()); - } - }; - } - } - } - Ok(None) => { - info!("Recieved an empty message from the remote computer: Aborting"); - return false; - } - Err(e) => { - error!("Got an error on while recieving from remote computer: {e}"); - } - } - return true; -} diff --git a/src/gstreamer_pipeline.rs b/src/gstreamer_pipeline.rs deleted file mode 100644 index 2c35414..0000000 --- a/src/gstreamer_pipeline.rs +++ /dev/null @@ -1,233 +0,0 @@ -use gstreamer::{prelude::*, PadLinkError}; -use gstreamer::{Element, ElementFactory, Pipeline}; -use gstreamer_app::AppSink; -use gstreamer::glib::BoolError; -use snafu::prelude::*; -use std::str::FromStr; -use std::sync::Arc; -use tokio::sync::Mutex; - -#[derive(Debug)] -pub struct WebcamPipeline { - pub pipeline: Pipeline, - - pub sink_frame: Arc>, -} - -impl WebcamPipeline { - pub fn new(jpeg_quality: i32) -> Result { - let pipeline = Pipeline::with_name("webcam_pipeline"); - - // All of the following errors are unrecoverable - - let mut video_src = ""; - if cfg!(windows) { - video_src = "mfvideosrc"; - } else if cfg!(unix) { - video_src = "v4l2src"; - } - let source = ElementFactory::make(video_src) - .build() - .context(BuildSnafu { - element: "mfvideosrc", - })?; - let convert = ElementFactory::make("videoconvert") - .build() - .context(BuildSnafu { - element: "videoconvert", - })?; - let rate = ElementFactory::make("videorate") - .build() - .context(BuildSnafu { - element: "videorate", - })?; - - let tee = ElementFactory::make("tee") - .build() - .context(BuildSnafu { element: "tee" })?; - - let queue_app = ElementFactory::make("queue") - .property("max-size-time", 1u64) - .property("max-size-buffers", 0u32) - .property("max-size-bytes", 0u32) - .build() - .context(BuildSnafu { - element: "paintable queue", - })?; - - let webrtc_sink = ElementFactory::make("webrtcsink") - // .property("meta", "meta,name=gst-stream") - .name("web rtc sink") - .build() - .context(BuildSnafu { - element: "webrtcsink" - })?; - - // queue.connect_closure("overrun", false, glib::closure!(|queue: Element| { - // println!("The queue is full!"); - // })); - - let appsink_queue = ElementFactory::make("queue") - .property("max-size-time", 1u64) - .property("max-size-buffers", 0u32) - .property("max-size-bytes", 0u32) - .build() - .context(BuildSnafu { - element: "appsink queue", - })?; - - let resize = ElementFactory::make("videoscale") - .build() - .context(BuildSnafu { - element: "videoscale", - })?; - - let jpeg_enc = ElementFactory::make("jpegenc") - .property("quality", jpeg_quality) - .build() - .context(BuildSnafu { element: "jpegenc" })?; - - let caps_string = "image/jpeg,width=640,height=640"; - let appsrc_caps = gstreamer::Caps::from_str(caps_string).context(BuildSnafu { - element: "appsink caps", - })?; - - let sink_frame = AppSink::builder() - .name("frame_appsink") - .sync(false) - .max_buffers(1u32) - .drop(true) - .caps(&appsrc_caps) - .build(); - - sink_frame.set_property("caps", &appsrc_caps.to_value()); - - pipeline - .add_many([ - &source, - &convert, - &rate, - &tee, - &queue_app, - &webrtc_sink, - &appsink_queue, - &resize, - &jpeg_enc, - &sink_frame.upcast_ref(), - ]) - .context(LinkSnafu { - from: "all", - to: "pipeline", - })?; - - Element::link_many([&source, &convert, &rate]).context(LinkSnafu { - from: "source et. al.", - to: "rate", - })?; - - // -- BEGIN PAINTABLE SINK PIPELINE - let tee_caps = - // gstreamer::caps::Caps::from_str("video/x-raw,framerate=15/1").context(BuildSnafu { - gstreamer::caps::Caps::from_str("video/x-raw").context(BuildSnafu { - element: "tee caps", - })?; - - rate.link_filtered(&tee, &tee_caps).context(LinkSnafu { - from: "videorate", - to: "tee", - })?; - - let tee_src_1 = tee - .request_pad_simple("src_%u") - .ok_or(PipelineError::PadRequest { - element: "tee pad 1".to_string(), - })?; - let paintable_queue_sinkpad = - queue_app - .static_pad("sink") - .ok_or(PipelineError::PadRequest { - element: "gtk4 sink".to_string(), - })?; - - tee_src_1 - .link(&paintable_queue_sinkpad) - .context(PadLinkSnafu { - from: "tee src pad", - to: "gtk4 paintable queue", - })?; - - queue_app.link(&webrtc_sink).context(LinkSnafu { - from: "gtk4 paintable queue", - to: "gtk4 paintable", - })?; - - // -- END PAINTABLE SINK PIPELINE - - // -- BEGIN APPSINK PIPELINE - let tee_src_2 = tee - .request_pad_simple("src_%u") - .ok_or(PipelineError::PadRequest { - element: "tee pad 2".to_string(), - })?; - let appsink_queue_sinkpad = - appsink_queue - .static_pad("sink") - .ok_or(PipelineError::PadRequest { - element: "appsink queue".to_string(), - })?; - tee_src_2 - .link(&appsink_queue_sinkpad) - .context(PadLinkSnafu { - from: "tee src pad 2", - to: "appsink queue sinkpad", - })?; - - appsink_queue.link(&resize).context(LinkSnafu { - from: "appsink_queue", - to: "resize", - })?; - - let resize_caps = - gstreamer::caps::Caps::from_str("video/x-raw,format=RGB,width=640,height=640") - .context(BuildSnafu { - element: "resize_caps", - })?; - - resize - .link_filtered(&jpeg_enc, &resize_caps) - .context(LinkSnafu { - from: "jpeg_enc", - to: "resize_caps", - })?; - - Element::link_many([&jpeg_enc, &sink_frame.upcast_ref()]).context(LinkSnafu { - from: "jpeg_enc", - to: "appsink", - })?; - - Ok(WebcamPipeline { - pipeline, - sink_frame: Arc::new(Mutex::new(sink_frame)), - }) - } -} - -#[derive(Debug, Snafu)] -pub enum PipelineError { - #[snafu(display("Error during element linking"))] - Link { - source: BoolError, - from: String, - to: String, - }, - #[snafu(display("Error linking pads"))] - PadLink { - source: PadLinkError, - from: String, - to: String, - }, - #[snafu(display("Error creating element"))] - Build { source: BoolError, element: String }, - #[snafu(display("Error getting pad from element"))] - PadRequest { element: String }, -} diff --git a/src/main.rs b/src/main.rs index e5cd16b..d60a638 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,187 +1,216 @@ -// Prevents additional console window on Windows in release, DO NOT REMOVE!! -#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] - -use std::{sync::{Arc, Mutex}, time::Duration}; -use async_channel::Sender; -use tokio::{runtime, sync:: RwLock}; -use tracing::{self, debug, info, error}; -use tauri::{AppHandle, Manager, Window}; -use lazy_static::lazy_static; - -#[cfg(not(debug_assertions))] -use tracing_subscriber; -use vcs_common::ApplicationMessage; -use webrtc_remote::start_listener; - -use crate::config::{load_config, AppConfig}; - -mod config; -mod coordinator; -mod gstreamer_pipeline; -mod sources; -mod states; -mod tauri_functions; -mod webrtc_remote; - -use coordinator::{start_coordinator, ApplicationEvent}; - -lazy_static! { - static ref TO_MEC_REF: Mutex>> = Mutex::new(None); - static ref TO_WEBRTC: Mutex>> = Mutex::new(None); - static ref APP_HANDLE: Mutex> = Mutex::new(None); -} - -fn main() { - - #[cfg(not(debug_assertions))] - { - let file_appender = tracing_appender::rolling::daily(".\\logs", "camera-controller"); - let (non_blocking, _gaurd) = tracing_appender::non_blocking(file_appender); - tracing_subscriber::fmt() - .with_writer(non_blocking) - .with_max_level(tracing::Level::DEBUG) - .with_ansi(false) - .init(); - } - #[cfg(all(not(feature = "tokio-debug"), debug_assertions))] - { - let _sub = tracing_subscriber::fmt() - .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) - .init(); - } - #[cfg(feature = "tokio-debug")] - { - console_subscriber::init(); - } - - let (to_mec, mec) = async_channel::bounded::(10); - - info!("Logging intialized"); - - let config: Arc> = Arc::new(RwLock::new(load_config())); - - gstreamer::init().expect("Unable to start gstreamer"); - - let rt = runtime::Runtime::new().expect("Could not start tokio runtime"); - let handle = rt.handle().clone(); - - let _coordinator = rt.handle().spawn(start_coordinator( - mec, - to_mec.clone(), - handle, - config - )); - - *TO_MEC_REF.lock().unwrap() = Some(to_mec.clone()); - - let (to_webrtc_send, to_webrtc_recv) = async_channel::bounded::(10); - let (from_webrtc_send, from_webrtc_recv) = async_channel::bounded::(10); - - rt.handle().spawn(start_listener(to_webrtc_recv, from_webrtc_send)); - - *TO_WEBRTC.lock().unwrap() = Some(to_webrtc_send.clone()); - - rt.handle().spawn(async move { - while let Ok(msg) = from_webrtc_recv.recv().await { - let mut do_sleep = false; - { - if let Ok(mut e) = APP_HANDLE.lock() { - if e.is_none() { - do_sleep = true; - } else { - let handle = e.take().unwrap(); - - match msg { - vcs_common::ApplicationMessage::WebRTCPacket(msg) => { - debug!("Got a message from the webrtc connection! {:?}", msg); - handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap(); - } - vcs_common::ApplicationMessage::WebRTCIceCandidateInit(msg) => { - debug!("Got an ICE init candidate from the webrtc connection! {:?}", msg); - handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap(); - } - vcs_common::ApplicationMessage::WebRTCIceCandidate(msg) => { - - debug!("Got an ICE candidate from the webrtc connection! {:?}", msg); - handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap(); - } - } - - *e = Some(handle); - } - } - - } - if do_sleep { - tokio::time::sleep(Duration::from_millis(500)).await; - } - - } - }); - - tauri::Builder::default() - .manage(tauri_functions::TauriState { to_mec: to_mec.clone() }) - .invoke_handler(tauri::generate_handler![tauri_functions::connect_to_camera]) - .setup(|app| { - *APP_HANDLE.lock().unwrap() = Some(app.handle()); - - let _id2 = app.listen_global("webrtc-message", |event| { - debug!("Got webrtc-message event from Tauri client! {:#?}", event); - match event.payload() { - Some(payload) => { - if let Ok(e) = TO_WEBRTC.lock() { - match e.as_ref() { - Some(to_webrtc) => { - debug!("Sending message to the webrtc connection"); - let message: Option = match payload { - s if s.starts_with("{\"type") => Some(ApplicationMessage::WebRTCPacket(serde_json::from_str(payload).expect("Could not decode the browser's sdp"))), - s if s.starts_with("{\"candidate") => Some(ApplicationMessage::WebRTCIceCandidateInit(serde_json::from_str(payload).expect("Could not decode the browser's Ice Candidate"))), - _ => None - }; - - if message.is_some() { - if let Err(e) = to_webrtc.send_blocking(message.unwrap()) { - error!("Could not send to mec! {e}"); - } - } - }, - None => { - error!("TO_MEC_REF was none!"); - } - } - } - } - None => { - info!("There was an empty payload!"); - } - } - }); - - // let _id = app.listen_global("webrtc-event", |event| { - // match event.payload() { - // Some(payload) => { - // if let Ok(e) = TO_MEC_REF.lock() { - // match e.as_ref() { - // Some(to_mec) => { - // if let Err(e) = to_mec.send_blocking(ApplicationEvent::WebRTCMessage(payload.to_string())) { - // error!("Could not send to mec! {e}"); - // } - // }, - // None => { - // error!("TO_MEC_REF was none!"); - // } - // } - // } - // } - // None => { - // info!("There was an empty payload!"); - // } - // } - // }); - Ok(()) - }) - .run(tauri::generate_context!()) - .expect("error while running tauri application"); - - let _ = to_mec.send_blocking(ApplicationEvent::Close); -} +// Prevents additional console window on Windows in release, DO NOT REMOVE!! +#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] + +use async_channel::Sender; +use lazy_static::lazy_static; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tauri::{AppHandle, Manager}; +use tokio::{runtime, sync::RwLock}; +use tracing::{self, debug, error, info}; + +#[cfg(not(debug_assertions))] +use tracing_subscriber; +use vcs_common::ApplicationMessage; +use webrtc_remote::start_listener; + +use crate::config::{load_config, AppConfig}; + +mod config; +mod coordinator; +mod sources; +mod tauri_functions; +mod webrtc_remote; + +use coordinator::{run_main_event_loop, ApplicationEvent}; + +lazy_static! { + static ref TO_MEC_REF: Mutex>> = Mutex::new(None); + static ref TO_WEBRTC: Mutex>> = Mutex::new(None); + static ref APP_HANDLE: Mutex> = Mutex::new(None); +} + +fn main() { + #[cfg(not(debug_assertions))] + { + let file_appender = tracing_appender::rolling::daily(".\\logs", "camera-controller"); + let (non_blocking, _gaurd) = tracing_appender::non_blocking(file_appender); + tracing_subscriber::fmt() + .with_writer(non_blocking) + .with_max_level(tracing::Level::DEBUG) + .with_ansi(false) + .init(); + } + #[cfg(all(not(feature = "tokio-debug"), debug_assertions))] + { + let _sub = tracing_subscriber::fmt() + .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) + .init(); + } + #[cfg(feature = "tokio-debug")] + { + console_subscriber::init(); + } + + let (to_mec, mec) = async_channel::bounded::(10); + + info!("Logging intialized"); + + let config: Arc> = Arc::new(RwLock::new(load_config())); + + let rt = runtime::Runtime::new().expect("Could not start tokio runtime"); + let handle = rt.handle().clone(); + + let _coordinator = rt + .handle() + .spawn(run_main_event_loop(mec, to_mec.clone(), config, handle)); + + *TO_MEC_REF.lock().unwrap() = Some(to_mec.clone()); + + let (to_webrtc_send, to_webrtc_recv) = + async_channel::bounded::(10); + let (from_webrtc_send, from_webrtc_recv) = + async_channel::bounded::(10); + + rt.handle() + .spawn(start_listener(to_webrtc_recv, from_webrtc_send)); + + *TO_WEBRTC.lock().unwrap() = Some(to_webrtc_send.clone()); + + rt.handle().spawn(async move { + while let Ok(msg) = from_webrtc_recv.recv().await { + let mut do_sleep = false; + { + if let Ok(mut e) = APP_HANDLE.lock() { + if e.is_none() { + do_sleep = true; + } else { + let handle = e.take().unwrap(); + + match msg { + vcs_common::ApplicationMessage::WebRTCPacket(msg) => { + debug!("Got a message from the webrtc connection! {:?}", msg); + handle + .emit_all( + "frontend_message", + serde_json::to_string(&msg).unwrap(), + ) + .unwrap(); + } + vcs_common::ApplicationMessage::WebRTCIceCandidateInit(msg) => { + debug!( + "Got an ICE init candidate from the webrtc connection! {:?}", + msg + ); + handle + .emit_all( + "frontend_message", + serde_json::to_string(&msg).unwrap(), + ) + .unwrap(); + } + vcs_common::ApplicationMessage::WebRTCIceCandidate(msg) => { + debug!( + "Got an ICE candidate from the webrtc connection! {:?}", + msg + ); + handle + .emit_all( + "frontend_message", + serde_json::to_string(&msg).unwrap(), + ) + .unwrap(); + } + } + + *e = Some(handle); + } + } + } + if do_sleep { + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + }); + + tauri::Builder::default() + .manage(tauri_functions::TauriState { + to_mec: to_mec.clone(), + }) + .invoke_handler(tauri::generate_handler![tauri_functions::connect_to_camera]) + .setup(|app| { + *APP_HANDLE.lock().unwrap() = Some(app.handle()); + + let _id2 = app.listen_global("webrtc-message", |event| { + debug!("Got webrtc-message event from Tauri client! {:#?}", event); + match event.payload() { + Some(payload) => { + if let Ok(e) = TO_WEBRTC.lock() { + match e.as_ref() { + Some(to_webrtc) => { + debug!("Sending message to the webrtc connection"); + let message: Option = match payload { + s if s.starts_with("{\"type") => { + Some(ApplicationMessage::WebRTCPacket( + serde_json::from_str(payload) + .expect("Could not decode the browser's sdp"), + )) + } + s if s.starts_with("{\"candidate") => { + Some(ApplicationMessage::WebRTCIceCandidateInit( + serde_json::from_str(payload).expect( + "Could not decode the browser's Ice Candidate", + ), + )) + } + _ => None, + }; + + if message.is_some() { + if let Err(e) = to_webrtc.send_blocking(message.unwrap()) { + error!("Could not send to mec! {e}"); + } + } + } + None => { + error!("TO_MEC_REF was none!"); + } + } + } + } + None => { + info!("There was an empty payload!"); + } + } + }); + + // let _id = app.listen_global("webrtc-event", |event| { + // match event.payload() { + // Some(payload) => { + // if let Ok(e) = TO_MEC_REF.lock() { + // match e.as_ref() { + // Some(to_mec) => { + // if let Err(e) = to_mec.send_blocking(ApplicationEvent::WebRTCMessage(payload.to_string())) { + // error!("Could not send to mec! {e}"); + // } + // }, + // None => { + // error!("TO_MEC_REF was none!"); + // } + // } + // } + // } + // None => { + // info!("There was an empty payload!"); + // } + // } + // }); + Ok(()) + }) + .run(tauri::generate_context!()) + .expect("error while running tauri application"); + + let _ = to_mec.send_blocking(ApplicationEvent::Close); +} diff --git a/src/sources/joystick_source.rs b/src/sources/joystick_source.rs index ed221fa..3efb917 100644 --- a/src/sources/joystick_source.rs +++ b/src/sources/joystick_source.rs @@ -1,4 +1,4 @@ -use crate::coordinator::{ApplicationEvent, MoveEvent}; +use crate::coordinator::{ApplicationEvent, Point}; use async_channel::Sender; use gilrs::{ev::filter::FilterFn, Axis, Button, Event, EventType, Filter, Gilrs, GilrsBuilder}; @@ -86,13 +86,10 @@ pub async fn joystick_loop(tx: Sender, is_alive: Arc {} Err(async_channel::TrySendError::Closed(_)) => { info!("MEC is closed, stopping Joystick loop"); diff --git a/src/sources/mod.rs b/src/sources/mod.rs index bad116a..9b7a9ad 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -1,2 +1 @@ - pub mod joystick_source; diff --git a/src/states/box_coords.rs b/src/states/box_coords.rs deleted file mode 100644 index 00f66a7..0000000 --- a/src/states/box_coords.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::fmt::Display; - -#[derive(Debug, Clone, Copy)] -pub struct BoxCoords { - pub id: u32, - pub x1: u32, - pub y1: u32, - pub x2: u32, - pub y2: u32, -} - -impl Display for BoxCoords { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Absolute Box {}, x1: {}, y1: {}, x2: {}, y2: {}", - self.id, self.x1, self.y1, self.x2, self.y2 - ) - } -} - -#[derive(Debug, Clone, Copy)] -pub struct NormalizedBoxCoords { - pub id: u32, - pub x1: f32, - pub y1: f32, - pub x2: f32, - pub y2: f32, -} - -impl NormalizedBoxCoords { - fn absolute_coords(&self, width: i32, height: i32) -> BoxCoords { - BoxCoords { - id: self.id, - x1: (self.x1 * width as f32) as u32, - y1: (self.y1 * height as f32) as u32, - x2: (self.x2 * width as f32) as u32, - y2: (self.y2 * height as f32) as u32, - } - } - - fn area(&self) -> f32 { - (self.x2 - self.x1) * (self.y2 - self.y1) - } -} - -impl Display for NormalizedBoxCoords { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Normalized Box {}, x1: {}, y1: {}, x2: {}, y2: {}", - self.id, self.x1, self.y1, self.x2, self.y2 - ) - } -} - diff --git a/src/states/mod.rs b/src/states/mod.rs deleted file mode 100644 index 4300b31..0000000 --- a/src/states/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ - -pub mod perf_state; -pub mod tracker_state; -pub mod box_coords; - diff --git a/src/states/perf_state.rs b/src/states/perf_state.rs deleted file mode 100644 index fe693fc..0000000 --- a/src/states/perf_state.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::{collections::VecDeque, time::Duration}; - -use tracing::info; - -const MAX_RECORDED_TIMES: usize = 10; -const DEGRADED_TRACKER_TIME: u128 = 150; - -#[derive(Debug)] -pub struct TrackerMetrics { - pub header_text: String, - pub fail_count: usize, - tracker_times: VecDeque, -} - -impl TrackerMetrics { - pub fn new() -> Self { - let mut ret = TrackerMetrics { - header_text: String::from(""), - fail_count: 0, - - tracker_times: VecDeque::with_capacity(MAX_RECORDED_TIMES), - }; - ret.clear_times(); - ret - } - - fn update_gui(&mut self) { - info!("Trying to update the gui"); - // todo!("No gui channel sent yet"); - } - - pub fn starting_connection(&mut self, fail_count: Option) { - self.clear_times(); - - self.header_text.clear(); - match fail_count { - None => self.header_text.push_str("Status: Connecting ..."), - Some(v) => self.header_text.push_str(&format!("Status: Attempt {}/5", v)), - } - - self.update_gui(); - } - - pub fn clear_times(&mut self) { - for _ in 0..10 { - self.tracker_times.pop_front(); - } - self.header_text = "Status: Disconnected".to_string(); - - self.update_gui(); - } - - pub fn insert_time(&mut self, new_measurement: Duration) { - if self.tracker_times.len() > MAX_RECORDED_TIMES { - let _ = self.tracker_times.pop_front(); - } - self.tracker_times.push_back(new_measurement.as_millis()); - - let avg_time = self.tracker_times.iter().sum::() / self.tracker_times.len() as u128; - - if avg_time == 0 { - self.header_text = format!( - "Status: Failed Avg Response: {} ms", - avg_time - ); - } - - if avg_time > DEGRADED_TRACKER_TIME { - self.header_text = format!( - "Status: Degraded Avg Response: {} ms", - avg_time - ); - } else { - self.header_text = format!( - "Status: Nominal Avg Response: {} ms", - avg_time - ); - } - - self.update_gui(); - } -} diff --git a/src/states/tracker_state.rs b/src/states/tracker_state.rs deleted file mode 100644 index a77e710..0000000 --- a/src/states/tracker_state.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::{ - cmp::{max, min}, - time::Instant, -}; - -use crate::states::box_coords::NormalizedBoxCoords; - -#[derive(Debug)] -pub struct TrackerState { - pub tracking_id: u32, - pub highlighted_id: Option, - pub last_detect: Instant, - pub enabled: bool, - - pub update_ids: bool, - - pub identity_boxes: Vec, -} - -impl TrackerState { - pub fn clear(&mut self) { - self.tracking_id = 0; - self.highlighted_id = None; - self.last_detect = Instant::now(); - self.enabled = false; - self.update_ids = false; - self.identity_boxes.clear(); - } - - pub fn update_from_boxes(&mut self, new_boxes: Vec) { - let mut old_ids: Vec = self.identity_boxes.iter().map(|x| x.id).collect(); - old_ids.sort(); - let mut new_ids: Vec = new_boxes.iter().map(|x| x.id).collect(); - new_ids.sort(); - - self.update_ids = new_ids == old_ids; - - self.identity_boxes = new_boxes; - } - - // TODO: have a return type that says "highlighted ID not found" and "no highlighted id selected" - // It's not really worth logging... - pub fn calculate_tracking(&mut self) -> core::result::Result<(i32, i32, bool), String> { - if let Some(target_box) = self - .identity_boxes - .iter() - .find(|e| e.id == self.tracking_id) - { - let x_adjust = calc_x_adjust(target_box.x1, target_box.x2); - let y_adjust = calc_y_adjust(target_box.y1); - self.last_detect = std::time::Instant::now(); - Ok((x_adjust, y_adjust, self.enabled)) - } else { - Err("Couldn't find target in results".to_string()) - } - } -} - -fn calc_x_adjust(x1: f32, x2: f32) -> i32 { - let dist_from_center = ((x1 + x2) / 2.0) - 0.5; - let mut x_adjust = ((dist_from_center / 0.5 * 2.0) * 100.0) as i32; - if x_adjust < 15 && x_adjust > -15 { - x_adjust = 0; - } - min(max(x_adjust, -100), 100) -} - -fn calc_y_adjust(y1: f32) -> i32 { - // All values are normalized, then multiplied by 1000. 500 == 50% of the screen - let mut y_adjust = ((y1 - 0.1) * 250.0) as i32; - if y_adjust < 0 { - y_adjust -= 20; - } else if y_adjust < 30 { - y_adjust = 0; - } else { - y_adjust = (y_adjust as f32 * 0.75) as i32; - } - min(max(y_adjust, -100), 100) -} diff --git a/src/tauri_functions.rs b/src/tauri_functions.rs index 309520d..39c6920 100644 --- a/src/tauri_functions.rs +++ b/src/tauri_functions.rs @@ -1,14 +1,17 @@ - -use async_channel::Sender; -use tauri::State; - -use crate::coordinator::ApplicationEvent; - -pub struct TauriState { - pub to_mec: Sender, -} - -#[tauri::command] -pub fn connect_to_camera(state: State<'_, TauriState>) { - let _ = state.to_mec.send_blocking(ApplicationEvent::CameraConnectionPress); -} +use async_channel::Sender; +use tauri::State; +use tracing::info; + +use crate::coordinator::ApplicationEvent; + +pub struct TauriState { + pub to_mec: Sender, +} + +#[tauri::command] +pub fn connect_to_camera(state: State<'_, TauriState>) { + // let _ = state + // .to_mec + // .send_blocking(ApplicationEvent::CameraConnectionPress); + info!("Connect to Camera button press event"); +} diff --git a/src/webrtc_remote.rs b/src/webrtc_remote.rs index e4bbc1f..628d506 100644 --- a/src/webrtc_remote.rs +++ b/src/webrtc_remote.rs @@ -1,132 +1,120 @@ -use std::{cmp::Ordering, sync::{atomic::AtomicBool, Arc}}; - -use futures_util::{StreamExt, SinkExt}; -use tokio::{net::TcpListener, sync::Mutex}; -use tokio_tungstenite::{accept_async, tungstenite::Message}; -use tracing::{error, info, instrument, debug}; - -use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; - - - -#[instrument(skip_all)] -pub async fn start_listener( - from_app: AppReceiver, - to_ui: AppSender, -) { - - info!("starting tcplistener"); - let listener = TcpListener::bind("localhost:7891").await.unwrap(); - - info!("Listening for webrtc connections on localhost:7891"); - - if let Ok((stream, _)) = listener.accept().await { - match accept_async(stream).await { - Err(e) => error!("Could not convert incoming stream to websocket: {e}"), - Ok(ws_stream) => { - let (mut ws_sender, mut ws_receiver) = ws_stream.split(); - - tokio::spawn(async move { - while let Ok(msg) = from_app.recv().await { - #[cfg(debug_assertions)] - { - // serialized message - match serde_json::to_string(&msg) { - Err(e) => error!("Could not serialize ApplicationMessage to JSON! {e}"), - Ok(msg) => { - if let Err(e) = ws_sender.send(Message::text(msg)).await { - error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}"); - break; - } - } - } - - } - - #[cfg(not(debug_assertions))] - { - match bincode::serialize(&msg) { - Err(e) => error!("Could not serialize ApplicationMessage into binary! {e}"), - Ok(e) => { - if let Err(e) = sender.send(Message::binary(msg)).await { - error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}"); - break; - } - } - } - } - } - }); - - while let Some(msg) = ws_receiver.next().await { - match msg { - Err(e) => { - error!("There was an error getting a message from the remote! {e}"); - } - Ok(msg) => match msg { - Message::Ping(_) | Message::Pong(_) => {} - Message::Close(_) => { - info!("Received WebSocket close message! Closing the websocket"); - break; - } - Message::Frame(_) => { - info!("Received a Frame websocket message?"); - } - Message::Text(text) => { - debug!("Recieved text from websocket: {text}"); - #[cfg(debug_assertions)] - { - match serde_json::from_str(&text) { - Ok(msg) => { - if let Err(e) = to_ui.send(msg).await { - error!("Could not send message from ws to application! Closing and exiting\n{e}"); - break; - } - } - Err(e) => { - error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}"); - } - } - - } - #[cfg(not(debug_assertions))] - { - warn!("Recieved a `Text` message from the remote while running in release mode! " + - "Was the other endpoint running release mode?\n msg: {text}"); - } - } - Message::Binary(msg) => { - #[cfg(debug_assertions)] - { - match bincode::deserialize::(&msg) { - Ok(m) => { - if let Err(e) = to_ui.send(m).await { - error!("Could not send message to application! Closing and exiting\n{e}"); - break; - } - } - Err(e) => { - error!("Received a malformed binary message from the websocket!\n{e}"); - } - - } - } - - #[cfg(not(debug_assertions))] - { - warn!("Recieved a `Binary` message from the remote while running in debug mode! " + - "Was the other endpoing running debug mode?"); - } - - } - } - } - - } - - - } - } - - } -} +use futures_util::{SinkExt, StreamExt}; +use tokio::net::TcpListener; +use tokio_tungstenite::{accept_async, tungstenite::Message}; +use tracing::{debug, error, info, instrument}; + +use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; + +#[instrument(skip_all)] +pub async fn start_listener(from_app: AppReceiver, to_ui: AppSender) { + info!("starting tcplistener"); + let listener = TcpListener::bind("localhost:7891").await.unwrap(); + + info!("Listening for webrtc connections on localhost:7891"); + + if let Ok((stream, _)) = listener.accept().await { + match accept_async(stream).await { + Err(e) => error!("Could not convert incoming stream to websocket: {e}"), + Ok(ws_stream) => { + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + + tokio::spawn(async move { + while let Ok(msg) = from_app.recv().await { + #[cfg(debug_assertions)] + { + // serialized message + match serde_json::to_string(&msg) { + Err(e) => { + error!("Could not serialize ApplicationMessage to JSON! {e}") + } + Ok(msg) => { + if let Err(e) = ws_sender.send(Message::text(msg)).await { + error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}"); + break; + } + } + } + } + + #[cfg(not(debug_assertions))] + { + match bincode::serialize(&msg) { + Err(e) => error!( + "Could not serialize ApplicationMessage into binary! {e}" + ), + Ok(e) => { + if let Err(e) = sender.send(Message::binary(msg)).await { + error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}"); + break; + } + } + } + } + } + }); + + while let Some(msg) = ws_receiver.next().await { + match msg { + Err(e) => { + error!("There was an error getting a message from the remote! {e}"); + } + Ok(msg) => match msg { + Message::Ping(_) | Message::Pong(_) => {} + Message::Close(_) => { + info!("Received WebSocket close message! Closing the websocket"); + break; + } + Message::Frame(_) => { + info!("Received a Frame websocket message?"); + } + Message::Text(text) => { + debug!("Recieved text from websocket: {text}"); + #[cfg(debug_assertions)] + { + match serde_json::from_str(&text) { + Ok(msg) => { + if let Err(e) = to_ui.send(msg).await { + error!("Could not send message from ws to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}"); + } + } + } + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Text` message from the remote while running in release mode! " + + "Was the other endpoint running release mode?\n msg: {text}"); + } + } + Message::Binary(msg) => { + #[cfg(debug_assertions)] + { + match bincode::deserialize::(&msg) { + Ok(m) => { + if let Err(e) = to_ui.send(m).await { + error!("Could not send message to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed binary message from the websocket!\n{e}"); + } + } + } + + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Binary` message from the remote while running in debug mode! " + + "Was the other endpoing running debug mode?"); + } + } + }, + } + } + } + } + } +}