rewriting the event loop

This commit is contained in:
Nickiel12 2024-08-31 21:22:10 +00:00
parent af87f9166f
commit 1507bc6bac
18 changed files with 602 additions and 1783 deletions

336
Cargo.lock generated
View file

@ -250,7 +250,7 @@ checksum = "2c3d816ce6f0e2909a96830d6911c2aff044370b1ef92d7f267b43bae5addedd"
dependencies = [
"atk-sys",
"bitflags 1.3.2",
"glib 0.15.12",
"glib",
"libc",
]
@ -260,8 +260,8 @@ version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58aeb089fb698e06db8089971c7ee317ab9644bade33383f63631437b03aafb6"
dependencies = [
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"glib-sys",
"gobject-sys",
"libc",
"system-deps 6.2.2",
]
@ -272,12 +272,6 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atomic_refcell"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c"
[[package]]
name = "autocfg"
version = "1.3.0"
@ -485,7 +479,7 @@ checksum = "c76ee391b03d35510d9fa917357c7f1855bd9a6659c95a1b392e33f49b3369bc"
dependencies = [
"bitflags 1.3.2",
"cairo-sys-rs",
"glib 0.15.12",
"glib",
"libc",
"thiserror",
]
@ -496,7 +490,7 @@ version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c55d429bef56ac9172d25fecb85dc8068307d17acd74b377866b7a1ef25d3c8"
dependencies = [
"glib-sys 0.15.10",
"glib-sys",
"libc",
"system-deps 6.2.2",
]
@ -1484,7 +1478,7 @@ dependencies = [
"gdk-pixbuf",
"gdk-sys",
"gio",
"glib 0.15.12",
"glib",
"libc",
"pango",
]
@ -1498,7 +1492,7 @@ dependencies = [
"bitflags 1.3.2",
"gdk-pixbuf-sys",
"gio",
"glib 0.15.12",
"glib",
"libc",
]
@ -1508,9 +1502,9 @@ version = "0.15.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "140b2f5378256527150350a8346dbdb08fadc13453a7a2d73aecd5fab3c402a7"
dependencies = [
"gio-sys 0.15.10",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"gio-sys",
"glib-sys",
"gobject-sys",
"libc",
"system-deps 6.2.2",
]
@ -1523,9 +1517,9 @@ checksum = "32e7a08c1e8f06f4177fb7e51a777b8c1689f743a7bc11ea91d44d2226073a88"
dependencies = [
"cairo-sys-rs",
"gdk-pixbuf-sys",
"gio-sys 0.15.10",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"gio-sys",
"glib-sys",
"gobject-sys",
"libc",
"pango-sys",
"pkg-config",
@ -1539,8 +1533,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cca49a59ad8cfdf36ef7330fe7bdfbe1d34323220cc16a0de2679ee773aee2c2"
dependencies = [
"gdk-sys",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"glib-sys",
"gobject-sys",
"libc",
"pkg-config",
"system-deps 6.2.2",
@ -1553,7 +1547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4b7f8c7a84b407aa9b143877e267e848ff34106578b64d1e0a24bf550716178"
dependencies = [
"gdk-sys",
"glib-sys 0.15.10",
"glib-sys",
"libc",
"system-deps 6.2.2",
"x11",
@ -1665,8 +1659,8 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"gio-sys 0.15.10",
"glib 0.15.12",
"gio-sys",
"glib",
"libc",
"once_cell",
"thiserror",
@ -1678,26 +1672,13 @@ version = "0.15.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32157a475271e2c4a023382e9cab31c4584ee30a97da41d3c4e9fdd605abcf8d"
dependencies = [
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"glib-sys",
"gobject-sys",
"libc",
"system-deps 6.2.2",
"winapi",
]
[[package]]
name = "gio-sys"
version = "0.19.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cd743ba4714d671ad6b6234e8ab2a13b42304d0e13ab7eba1dcdd78a7d6d4ef"
dependencies = [
"glib-sys 0.19.8",
"gobject-sys 0.19.8",
"libc",
"system-deps 6.2.2",
"windows-sys 0.52.0",
]
[[package]]
name = "glib"
version = "0.15.12"
@ -1709,37 +1690,15 @@ dependencies = [
"futures-core",
"futures-executor",
"futures-task",
"glib-macros 0.15.13",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"glib-macros",
"glib-sys",
"gobject-sys",
"libc",
"once_cell",
"smallvec",
"thiserror",
]
[[package]]
name = "glib"
version = "0.19.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39650279f135469465018daae0ba53357942a5212137515777d5fdca74984a44"
dependencies = [
"bitflags 2.6.0",
"futures-channel",
"futures-core",
"futures-executor",
"futures-task",
"futures-util",
"gio-sys 0.19.8",
"glib-macros 0.19.9",
"glib-sys 0.19.8",
"gobject-sys 0.19.8",
"libc",
"memchr",
"smallvec",
"thiserror",
]
[[package]]
name = "glib-macros"
version = "0.15.13"
@ -1748,26 +1707,13 @@ checksum = "10c6ae9f6fa26f4fb2ac16b528d138d971ead56141de489f8111e259b9df3c4a"
dependencies = [
"anyhow",
"heck 0.4.1",
"proc-macro-crate 1.3.1",
"proc-macro-crate",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "glib-macros"
version = "0.19.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4429b0277a14ae9751350ad9b658b1be0abb5b54faa5bcdf6e74a3372582fad7"
dependencies = [
"heck 0.5.0",
"proc-macro-crate 3.1.0",
"proc-macro2",
"quote",
"syn 2.0.76",
]
[[package]]
name = "glib-sys"
version = "0.15.10"
@ -1778,16 +1724,6 @@ dependencies = [
"system-deps 6.2.2",
]
[[package]]
name = "glib-sys"
version = "0.19.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c2dc18d3a82b0006d470b13304fbbb3e0a9bd4884cf985a60a7ed733ac2c4a5"
dependencies = [
"libc",
"system-deps 6.2.2",
]
[[package]]
name = "glob"
version = "0.3.1"
@ -1813,18 +1749,7 @@ version = "0.15.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d57ce44246becd17153bd035ab4d32cfee096a657fc01f2231c9278378d1e0a"
dependencies = [
"glib-sys 0.15.10",
"libc",
"system-deps 6.2.2",
]
[[package]]
name = "gobject-sys"
version = "0.19.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e697e252d6e0416fd1d9e169bda51c0f1c926026c39ca21fbe8b1bb5c3b8b9e"
dependencies = [
"glib-sys 0.19.8",
"glib-sys",
"libc",
"system-deps 6.2.2",
]
@ -1840,98 +1765,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "gstreamer"
version = "0.22.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca0b90646bb67fccf80d228f5333f2a0745526818ccefbf5a97326c76d30e4d"
dependencies = [
"cfg-if",
"futures-channel",
"futures-core",
"futures-util",
"glib 0.19.9",
"gstreamer-sys",
"itertools 0.13.0",
"libc",
"muldiv",
"num-integer",
"num-rational",
"once_cell",
"option-operations",
"paste",
"pin-project-lite",
"smallvec",
"thiserror",
]
[[package]]
name = "gstreamer-app"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1363313eb1931d66ac0b82c9b477fdd066af9dc118ea844966f85b6d99f261fd"
dependencies = [
"futures-core",
"futures-sink",
"glib 0.19.9",
"gstreamer",
"gstreamer-app-sys",
"gstreamer-base",
"libc",
]
[[package]]
name = "gstreamer-app-sys"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed667453517b47754b9f9d28c096074e5d565f1cc48c6fa2483b1ea10d7688d3"
dependencies = [
"glib-sys 0.19.8",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps 6.2.2",
]
[[package]]
name = "gstreamer-base"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39d55668b23fc69f1843daa42b43d289c00fe38e9586c5453b134783d2dd75a3"
dependencies = [
"atomic_refcell",
"cfg-if",
"glib 0.19.9",
"gstreamer",
"gstreamer-base-sys",
"libc",
]
[[package]]
name = "gstreamer-base-sys"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5448abb00c197e3ad306710293bf757303cbeab4036b5ccad21c7642b8bf00c9"
dependencies = [
"glib-sys 0.19.8",
"gobject-sys 0.19.8",
"gstreamer-sys",
"libc",
"system-deps 6.2.2",
]
[[package]]
name = "gstreamer-sys"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71f147e7c6bc9313d5569eb15da61f6f64026ec69791922749de230583a07286"
dependencies = [
"glib-sys 0.19.8",
"gobject-sys 0.19.8",
"libc",
"system-deps 6.2.2",
]
[[package]]
name = "gtk"
version = "0.15.5"
@ -1946,7 +1779,7 @@ dependencies = [
"gdk",
"gdk-pixbuf",
"gio",
"glib 0.15.12",
"glib",
"gtk-sys",
"gtk3-macros",
"libc",
@ -1965,9 +1798,9 @@ dependencies = [
"cairo-sys-rs",
"gdk-pixbuf-sys",
"gdk-sys",
"gio-sys 0.15.10",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"gio-sys",
"glib-sys",
"gobject-sys",
"libc",
"pango-sys",
"system-deps 6.2.2",
@ -1980,7 +1813,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "684c0456c086e8e7e9af73ec5b84e35938df394712054550e81558d21c44ab0d"
dependencies = [
"anyhow",
"proc-macro-crate 1.3.1",
"proc-macro-crate",
"proc-macro-error",
"proc-macro2",
"quote",
@ -2387,15 +2220,6 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "0.4.8"
@ -2415,7 +2239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf053e7843f2812ff03ef5afe34bb9c06ffee120385caad4f6b9967fcd37d41c"
dependencies = [
"bitflags 1.3.2",
"glib 0.15.12",
"glib",
"javascriptcore-rs-sys",
]
@ -2425,8 +2249,8 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "905fbb87419c5cde6e3269537e4ea7d46431f3008c5d057e915ef3f115e7793c"
dependencies = [
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"glib-sys",
"gobject-sys",
"libc",
"system-deps 5.0.0",
]
@ -2719,12 +2543,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "muldiv"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"
[[package]]
name = "ndk"
version = "0.6.0"
@ -2835,16 +2653,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
dependencies = [
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@ -2869,7 +2677,7 @@ version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 1.0.109",
@ -2933,15 +2741,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "option-operations"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c26d27bb1aeab65138e4bf7666045169d1717febcc9ff870166be8348b223d0"
dependencies = [
"paste",
]
[[package]]
name = "ordered-multimap"
version = "0.6.0"
@ -2989,7 +2788,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e4045548659aee5313bde6c582b0d83a627b7904dd20dc2d9ef0895d414e4f"
dependencies = [
"bitflags 1.3.2",
"glib 0.15.12",
"glib",
"libc",
"once_cell",
"pango-sys",
@ -3001,8 +2800,8 @@ version = "0.15.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2a00081cde4661982ed91d80ef437c20eacaf6aa1a5962c0279ae194662c3aa"
dependencies = [
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"glib-sys",
"gobject-sys",
"libc",
"system-deps 6.2.2",
]
@ -3036,12 +2835,6 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "pathdiff"
version = "0.2.1"
@ -3384,15 +3177,6 @@ dependencies = [
"toml_edit 0.19.15",
]
[[package]]
name = "proc-macro-crate"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
dependencies = [
"toml_edit 0.21.1",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -3449,7 +3233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.12.1",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.76",
@ -4112,7 +3896,7 @@ checksum = "b2b4d76501d8ba387cf0fefbe055c3e0a59891d09f0f995ae4e4b16f6b60f3c0"
dependencies = [
"bitflags 1.3.2",
"gio",
"glib 0.15.12",
"glib",
"libc",
"once_cell",
"soup2-sys",
@ -4125,9 +3909,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "009ef427103fcb17f802871647a7fa6c60cbb654b4c4e4c0ac60a31c5f6dc9cf"
dependencies = [
"bitflags 1.3.2",
"gio-sys 0.15.10",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"gio-sys",
"glib-sys",
"gobject-sys",
"libc",
"system-deps 5.0.0",
]
@ -4326,8 +4110,8 @@ dependencies = [
"gdkwayland-sys",
"gdkx11-sys",
"gio",
"glib 0.15.12",
"glib-sys 0.15.10",
"glib",
"glib-sys",
"gtk",
"image",
"instant",
@ -4396,7 +4180,7 @@ dependencies = [
"flate2",
"futures-util",
"getrandom 0.2.15",
"glib 0.15.12",
"glib",
"glob",
"gtk",
"heck 0.5.0",
@ -4824,17 +4608,6 @@ dependencies = [
"winnow 0.5.40",
]
[[package]]
name = "toml_edit"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [
"indexmap 2.4.0",
"toml_datetime",
"winnow 0.5.40",
]
[[package]]
name = "toml_edit"
version = "0.22.20"
@ -5163,11 +4936,10 @@ dependencies = [
"bincode",
"config",
"console-subscriber",
"futures",
"futures-core",
"futures-util",
"gilrs",
"gstreamer",
"gstreamer-app",
"lazy_static",
"log",
"serde",
@ -5344,10 +5116,10 @@ dependencies = [
"gdk",
"gdk-sys",
"gio",
"gio-sys 0.15.10",
"glib 0.15.12",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"gio-sys",
"glib",
"glib-sys",
"gobject-sys",
"gtk",
"gtk-sys",
"javascriptcore-rs",
@ -5368,9 +5140,9 @@ dependencies = [
"cairo-sys-rs",
"gdk-pixbuf-sys",
"gdk-sys",
"gio-sys 0.15.10",
"glib-sys 0.15.10",
"gobject-sys 0.15.10",
"gio-sys",
"glib-sys",
"gobject-sys",
"gtk-sys",
"javascriptcore-rs-sys",
"libc",
@ -6015,7 +5787,7 @@ dependencies = [
"dunce",
"gdk",
"gio",
"glib 0.15.12",
"glib",
"gtk",
"html5ever",
"http 0.2.12",

View file

@ -1,45 +1,44 @@
[package]
name = "vcs-controller"
version = "2.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
tauri-build = { version = "1.5.1", features = [] }
[features]
tracker-state-debug = []
tokio-logging = []
# this feature is used for production builds or when `devPath` points to the filesystem and the built-in dev server is disabled.
# If you use cargo directly instead of tauri's cli you can use this feature flag to switch between tauri's `dev` and `build` modes.
# DO NOT REMOVE!!
custom-protocol = [ "tauri/custom-protocol" ]
[dependencies]
async-channel = "2.2.0"
async-recursion = "1.1.1"
config = "0.14.0"
futures-core = "0.3.30"
futures-util = { version = "0.3.30" }
gilrs = "0.10.6"
gstreamer = { version = "0.22.4", features = ["v1_22"] }
gstreamer-app = { version = "0.22.0", features = ["v1_22"] }
log = "0.4.21"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "sync"] }
tokio-tungstenite = "0.21.0"
toml = "0.8.12"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] }
tracing-appender = "0.2.3"
snafu = "0.8.2"
console-subscriber = "0.3.0"
tauri = { version = "1.6.1", features = [] }
lazy_static = "1.5.0"
vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" }
bincode = "1.3.3"
[package]
name = "vcs-controller"
version = "2.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
tauri-build = { version = "1.5.1", features = [] }
[features]
tracker-state-debug = []
tokio-logging = []
# this feature is used for production builds or when `devPath` points to the filesystem and the built-in dev server is disabled.
# If you use cargo directly instead of tauri's cli you can use this feature flag to switch between tauri's `dev` and `build` modes.
# DO NOT REMOVE!!
custom-protocol = [ "tauri/custom-protocol" ]
[dependencies]
async-channel = "2.2.0"
async-recursion = "1.1.1"
config = "0.14.0"
futures-core = "0.3.30"
futures-util = { version = "0.3.30" }
gilrs = "0.10.6"
log = "0.4.21"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "sync"] }
tokio-tungstenite = "0.21.0"
toml = "0.8.12"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] }
tracing-appender = "0.2.3"
snafu = "0.8.2"
console-subscriber = "0.3.0"
tauri = { version = "1.6.1", features = [] }
lazy_static = "1.5.0"
vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" }
bincode = "1.3.3"
futures = "0.3.30"

View file

@ -39,12 +39,6 @@ Some utility commands:
tailwindcss
openssl
systemd
gtk4
gst_all_1.gstreamer
gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-good
gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-rs
];
cargoHash = nixpkgs.lib.fakeHash;
};
@ -147,3 +141,4 @@ Some utility commands:
};
}

View file

@ -5,10 +5,20 @@ use std::fs::File;
use std::io::Write;
use tracing::{info, instrument};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ConnectionString {
pub ip: String,
pub port: u32,
}
impl ConnectionString {
pub fn build_conn_string(&self) -> String {
format!("ws://{}:{}", self.ip, self.port)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppConfig {
pub camera_ip: String,
pub camera_port: u32,
pub cameras: Vec<ConnectionString>,
pub tracker_ip: String,
pub tracker_port: u32,
@ -19,8 +29,10 @@ pub struct AppConfig {
impl Default for AppConfig {
fn default() -> Self {
AppConfig {
camera_ip: "10.0.0.33".to_string(),
camera_port: 8765,
cameras: vec![ConnectionString {
ip: "10.0.0.29".to_owned(),
port: 8765,
}],
tracker_ip: "10.0.0.210".to_string(),
tracker_port: 6543,

View file

@ -1,212 +0,0 @@
use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::time::Instant;
use async_channel::{Receiver, Sender};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, instrument};
use crate::config::AppConfig;
use crate::coordinator::socket_listen;
use crate::coordinator::tracker_state::TrackerState;
use crate::gstreamer_pipeline;
use crate::sources::joystick_source::joystick_loop;
use super::perf_state::TrackerMetrics;
use super::remote_video_processor::remote_video_loop;
use super::{ApplicationEvent, ConnectionType};
#[derive(Debug)]
pub struct SocketState {
pub is_connected: AtomicBool,
pub stay_connected: AtomicBool,
}
#[derive(Debug)]
pub struct CoordState<'a> {
pub settings: Arc<RwLock<AppConfig>>,
pub tracker_metrics: TrackerMetrics,
pub runtime: Handle,
pub sck_outbound: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
pub stay_alive_sck_recvr: Arc<AtomicBool>,
pub joystick_loop_alive: Arc<AtomicBool>,
pub current_priority: ConnectionType,
pub last_update_of_priority: Instant,
pub mec: Pin<&'a mut Receiver<ApplicationEvent>>,
pub to_mec: Sender<ApplicationEvent>,
pub pipeline: gstreamer_pipeline::WebcamPipeline,
pub tracker_state: TrackerState,
pub tracker_connection_state: Arc<SocketState>,
}
impl<'a> CoordState<'a> {
pub fn new(
mec: Pin<&'a mut Receiver<ApplicationEvent>>,
to_mec: Sender<ApplicationEvent>,
runtime: Handle,
settings: Arc<RwLock<AppConfig>>,
jpeg_quality: i32,
) -> Self {
CoordState {
settings,
tracker_metrics: TrackerMetrics::new(),
runtime,
sck_outbound: None,
stay_alive_sck_recvr: Arc::new(AtomicBool::new(false)),
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
current_priority: ConnectionType::Local,
last_update_of_priority: Instant::now(),
mec,
to_mec,
pipeline: gstreamer_pipeline::WebcamPipeline::new(jpeg_quality).unwrap(),
tracker_state: TrackerState{
tracking_id: 0,
highlighted_id: None,
last_detect: Instant::now(),
enabled: true,
identity_boxes: vec![],
update_ids: false,
},
tracker_connection_state: Arc::new(SocketState {
stay_connected: AtomicBool::new(false),
is_connected: AtomicBool::new(false),
}),
}
}
#[instrument(skip(self))]
pub async fn socket_send(&mut self, message: Message) {
if let Some(mut socket) = self.sck_outbound.take() {
if let Err(e) = socket.send(message).await {
error!("There was an error sending to the socket: {:#?}", e);
} else {
self.sck_outbound = Some(socket);
}
}
}
pub fn socket_connected(&self) -> bool {
self.sck_outbound.is_some()
}
pub async fn socket_start(&mut self) {
self.stay_alive_sck_recvr.store(true, Ordering::SeqCst);
let conn_string: String = {
let read_settings = self.settings.read().await;
format!(
"ws://{}:{}",
read_settings.camera_ip,
read_settings.camera_port
)
};
match connect_async(conn_string).await {
Ok((val, _)) => {
info!("Socket connection to camera made successfully");
let (outbound, inbound) = val.split();
let _socket_task = self.runtime.spawn(socket_listen(
self.to_mec.clone(),
self.stay_alive_sck_recvr.clone(),
inbound,
));
self.sck_outbound = Some(outbound);
}
Err(_) => {
error!("Couldn't connect to URL!");
}
}
}
pub async fn socket_close(&mut self) {
debug!("Cleaning up camera socket state");
if let Some(mut socket) = self.sck_outbound.take() {
if let Err(e) = socket.close().await {
error!("Couldn't close socket during shutdown: {e}");
}
}
self.stay_alive_sck_recvr.store(false, Ordering::SeqCst);
}
pub async fn start_video_loop(&mut self) {
let conn_string: String = {
let read_settings = self.settings.read().await;
format!(
"ws://{}:{}",
read_settings.tracker_ip,
read_settings.tracker_port
)
};
let _remote_loop = self.runtime.spawn(remote_video_loop(
conn_string,
self.pipeline.sink_frame.clone(),
self.to_mec.clone(),
self.tracker_connection_state.clone(),
));
}
pub async fn check_states(&mut self) {
// This one needs to always be alive, and restart after a crash
if !self.joystick_loop_alive.load(Ordering::SeqCst) {
info!("Restarting joystick loop");
let _joystick_future = self.runtime.spawn(joystick_loop(
self.to_mec.clone(),
self.joystick_loop_alive.clone(),
));
}
// If tracker state is not connected, and it should be
if !self
.tracker_connection_state
.is_connected
.load(Ordering::SeqCst)
&& self
.tracker_connection_state
.stay_connected
.load(Ordering::SeqCst)
{
self.start_video_loop().await;
}
// if stay alive is false, and there is a connection, kill it
if !self.stay_alive_sck_recvr.load(Ordering::SeqCst) && self.sck_outbound.is_some() {
self.socket_close().await;
}
}
pub async fn close(&mut self) {
info!("closing coord state");
self.tracker_connection_state
.stay_connected
.store(false, Ordering::SeqCst);
self.socket_close().await;
self.joystick_loop_alive.store(false, Ordering::SeqCst);
self.mec.close();
}
}

View file

@ -1,229 +1,160 @@
use std::pin::pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use async_channel::{Receiver, Sender};
use futures_util::{stream::SplitStream, StreamExt};
use gstreamer::prelude::ElementExt;
use gstreamer::State;
use async_channel::{Receiver, Sender, TryRecvError};
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, instrument};
use tokio::sync::oneshot;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::error;
mod coord_state;
mod process_box_string;
mod remote_video_processor;
use crate::config::{AppConfig, ConnectionString};
use crate::sources::joystick_source::joystick_loop;
use crate::states::perf_state;
use crate::states::tracker_state;
use crate::states::box_coords::NormalizedBoxCoords;
use crate::config::AppConfig;
pub use coord_state::{CoordState, SocketState};
pub enum ApplicationEvent {
WebRTCMessage,
Move(Point),
Close,
}
const PRIORITY_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Clone)]
pub struct MoveEvent {
pub struct Point {
pub x: i32,
pub y: i32,
}
#[derive(Clone, Copy, PartialEq, PartialOrd, Debug)]
pub enum ConnectionType {
Local,
// Remote,
Automated,
pub struct SatelliteConnection {
connection: ConnectionString,
socket: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
socket_in_progress: Option<
oneshot::Receiver<
Result<
WebSocketStream<MaybeTlsStream<TcpStream>>,
tokio_tungstenite::tungstenite::Error,
>,
>,
>,
}
pub enum TrackerUpdate {
Clear,
Fail,
Update(TrackerUpdatePackage),
}
#[derive(Clone)]
pub struct TrackerUpdatePackage {
pub boxes: Vec<NormalizedBoxCoords>,
pub time: Instant,
pub request_duration: Duration,
}
pub enum ApplicationEvent {
CameraConnectionPress,
SocketMessage(Message),
MoveEvent(MoveEvent, ConnectionType),
TrackerUpdate(TrackerUpdate),
WebRTCMessage(String),
Close,
}
#[instrument(skip_all)]
pub async fn start_coordinator(
// Main_Event_Channel
mec: Receiver<ApplicationEvent>,
pub struct AppState {
to_mec: Sender<ApplicationEvent>,
runtime: Handle,
settings: Arc<RwLock<AppConfig>>,
) {
info!("Starting coordinator!");
mec: Receiver<ApplicationEvent>,
pub runtime: Handle,
let mec = pin!(mec);
config: Arc<tokio::sync::RwLock<AppConfig>>,
let jpeg_quality = settings.read().await.tracker_jpeg_quality;
pub camera_satellites: Vec<SatelliteConnection>,
pub endpoint_satellites: Vec<SatelliteConnection>,
let mut state = CoordState::new(
mec,
to_mec,
runtime,
settings,
jpeg_quality,
);
pub joystick_task_is_alive: Arc<AtomicBool>,
}
// state
// .pipeline
// .pipeline
// .set_state(State::Playing)
// .expect("Could not set pipeline state to playing");
//
impl AppState {
pub async fn new(
mec: Receiver<ApplicationEvent>,
to_mec: Sender<ApplicationEvent>,
config: Arc<tokio::sync::RwLock<AppConfig>>,
rt: Handle,
) -> Self {
let camera_satellites = config
.read()
.await
.cameras
.iter()
.map(|x| SatelliteConnection {
connection: x.clone(),
socket: None,
socket_in_progress: None,
})
.collect::<Vec<SatelliteConnection>>();
state.check_states().await;
AppState {
to_mec,
mec,
runtime: rt,
while let Some(msg) = state.mec.next().await {
state.check_states().await;
config,
camera_satellites,
endpoint_satellites: vec![],
joystick_task_is_alive: Arc::new(AtomicBool::new(false)),
}
}
pub fn check_alive_things(&mut self) {
if !self
.joystick_task_is_alive
.load(std::sync::atomic::Ordering::SeqCst)
{
self.runtime.spawn(joystick_loop(
self.to_mec.clone(),
Arc::clone(&self.joystick_task_is_alive),
));
}
for connection in self.camera_satellites.iter_mut() {
if connection.socket_in_progress.is_some() {
if connection.socket_in_progress.map(|x| x.try_recv()) {
match msg {
ApplicationEvent::CameraConnectionPress => {
println!("connecting to camera!");
if state.socket_connected() {
state.socket_close().await;
} else {
state.socket_start().await;
}
}
ApplicationEvent::Close => {
break;
}
ApplicationEvent::SocketMessage(socket_message) => {
state.socket_send(socket_message).await;
}
ApplicationEvent::MoveEvent(coord, priority) => {
// If Automatic control, but local event happens, override the automatice events for 2 seconds
if priority <= state.current_priority
|| Instant::now() > state.last_update_of_priority + PRIORITY_TIMEOUT
{
state.last_update_of_priority = Instant::now();
state.current_priority = priority;
if state.socket_connected() {
let message = format!(
"{}{}:{}{}",
if coord.y > 0 { "D" } else { "U" },
coord.y.abs(),
if coord.x > 0 { "R" } else { "L" },
coord.x.abs()
);
state.socket_send(Message::Text(message)).await;
}
}
}
ApplicationEvent::WebRTCMessage(msg) => {
info!("Pew pew! WebRTCMessage! {msg}");
}
ApplicationEvent::TrackerUpdate(update) => match update {
TrackerUpdate::Clear => {
state.tracker_state.clear();
state.tracker_metrics.clear_times();
}
TrackerUpdate::Fail => {
let fail_count: usize = state.tracker_metrics.fail_count + 1;
state.tracker_metrics.starting_connection(Some(fail_count));
}
TrackerUpdate::Update(update) => {
let mut x_adj: i32 = 0;
let mut y_adj: i32 = 0;
state.tracker_state.update_from_boxes(update.boxes);
state.tracker_state.last_detect = update.time;
match state.tracker_state.calculate_tracking() {
Ok((x, y, _tracker_enabled)) => {
x_adj = x;
y_adj = y;
if connection.socket.is_none() && connection.socket_in_progress.is_none() {
let (send, recv) = oneshot::channel::<
Result<
WebSocketStream<MaybeTlsStream<TcpStream>>,
tokio_tungstenite::tungstenite::Error,
>,
>();
connection.socket_in_progress = Some(recv);
let conn_string = connection.connection.build_conn_string();
tokio::spawn(async move {
let res = connect_async(conn_string).await;
match res {
Ok((res, _)) => {
let _ = send.send(Ok(res)); // can't even close the socket if send
// returns an error
}
Err(e) => {
if state.tracker_state.tracking_id > 0 {
info!("Could not calculate the tracking!: {e}");
}
let _ = send.send(Err(e));
}
}
let me = MoveEvent { x: x_adj, y: y_adj };
if let Err(e) = state
.to_mec
.send(ApplicationEvent::MoveEvent(
me.clone(),
ConnectionType::Automated,
))
.await
{
error!("Could not send to MEC... even though in the MEC?! {e}");
}
state.tracker_metrics.insert_time(update.request_duration);
}
},
}
}
state
.pipeline
.pipeline
.set_state(State::Null)
.expect("Could not set pipeline state to playing");
state.close().await;
info!("Stopping Coordinator");
}
async fn socket_listen(
mec: Sender<ApplicationEvent>,
stay_alive_sck_recvr: Arc<AtomicBool>,
mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) {
if stay_alive_sck_recvr.load(std::sync::atomic::Ordering::SeqCst) {
while let Some(msg) = reader.next().await {
match msg {
Ok(val) => {
match val {
Message::Ping(_) => {
// Do nothing because pings are handled on reads and write by tungestenite
}
_ => {
info!("Received message from the camera websocket? {:#?}", val);
}
}
}
Err(e) => {
error!("Websocket error: {:#?}", e);
}
});
}
}
// setting this will call the internal state.socket_close next check states
stay_alive_sck_recvr.store(false, Ordering::SeqCst);
}
// If the mec is closed or full, then this socket should be closing anyways
// as there was most likely an unrecoverable error
let _ = mec
.send(ApplicationEvent::SocketMessage(Message::Close(None)))
.await;
debug!("Closed socket reading thread");
}
pub async fn run_main_event_loop(
mec: Receiver<ApplicationEvent>,
to_mec: Sender<ApplicationEvent>,
config: Arc<tokio::sync::RwLock<AppConfig>>,
rt: Handle,
) {
let mut state = AppState::new(mec, to_mec, config, rt).await;
loop {
state.check_alive_things();
match state.mec.try_recv() {
Err(TryRecvError::Empty) => tokio::time::sleep(Duration::from_millis(50)).await,
Err(TryRecvError::Closed) => {
let close_handles: Vec<_> = state
.camera_satellites
.iter_mut()
.filter(|x| x.socket.is_some())
.map(|x| {
let skt = x.socket.take().unwrap();
close_socket(skt)
})
.collect();
futures::future::join_all(close_handles).await;
break;
}
Ok(msg) => {}
}
}
}
async fn close_socket(mut skt: WebSocketStream<MaybeTlsStream<TcpStream>>) {
let _ = skt.close(None).await;
}

View file

@ -1,43 +0,0 @@
use crate::states::box_coords::NormalizedBoxCoords;
pub fn process_incoming_string(message: String) -> Result<Vec<NormalizedBoxCoords>, String> {
let mut boxes: Vec<NormalizedBoxCoords> = Vec::new();
for line in message.lines() {
let parts: Vec<&str> = line.split(' ').collect();
let id = parts[0]
.replace(['[', ']'], "")
.parse()
.map_err(|_| "Invalid ID")?;
if parts.len() != 3 {
return Err("Invalid socket input format: number of parts".to_string());
}
let coords: Vec<&str> = parts[1].split(':').collect();
if coords.len() != 2 {
return Err("Invalid socket input format: coords 1".to_string());
}
let x1: u32 = coords[0].parse().map_err(|_| "Invalid x coordinate")?;
let y1: u32 = coords[1].parse().map_err(|_| "Invalid y coordinate")?;
let coords2: Vec<&str> = parts[2].split(':').collect();
if coords2.len() != 2 {
return Err("Invalid socket input format: coords 2".to_string());
}
let x2: u32 = coords2[0].parse().map_err(|_| "Invalid width")?;
let y2: u32 = coords2[1].parse().map_err(|_| "Invalid width")?;
boxes.push(NormalizedBoxCoords {
id,
x1: (x1 as f32 / 1000.0),
x2: (x2 as f32 / 1000.0),
y1: (y1 as f32 / 1000.0),
y2: (y2 as f32 / 1000.0),
});
}
Ok(boxes)
}

View file

@ -1,196 +0,0 @@
use std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use async_recursion::async_recursion;
use async_channel::Sender;
use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt};
use gstreamer_app::AppSink;
use tokio::{net::TcpStream, sync::Mutex, time::sleep_until};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{error, info, instrument, warn};
use super::{
process_box_string::process_incoming_string, ApplicationEvent, SocketState, TrackerUpdate,
TrackerUpdatePackage,
};
#[instrument(skip_all)]
pub async fn remote_video_loop(
conn_string: String,
appsink: Arc<Mutex<AppSink>>,
to_mec: Sender<ApplicationEvent>,
socket_state: Arc<SocketState>,
) {
info!(
"Starting remote tracker processing connection to: {}",
conn_string
);
socket_state.is_connected.store(true, Ordering::SeqCst);
match connect_async(&conn_string).await {
Err(e) => {
warn!("Could not connect to remote computer: {e}");
if let Err(e) = to_mec
.send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Fail))
.await
{
error!("Could not send message to MEC! {e}");
}
}
Ok((connection, _)) => {
let (mut sender, mut recvr) = connection.split();
let mut last_iter: Instant;
loop {
last_iter = Instant::now();
// Do this in an encloser to not keep a lock on the appsink
let image_message = {
let res = {
let appsnk = appsink.lock().await;
get_video_frame(&appsnk)
};
match res {
Ok(e) => e,
Err(e) => {
error!("Could not get video frame! {e}");
if let Err(e) = sender.close().await {
error!("Could not close socket to remote computer: {e}")
}
socket_state.is_connected.store(false, Ordering::SeqCst);
break;
}
}
};
if let Err(e) = sender.send(image_message).await {
error!("There was an error sending the video frame to the server: {e}");
if let Err(e) = sender.close().await {
error!("Could not close socket to remote computer: {e}")
}
socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst);
break;
}
let do_not_break = handle_message(&mut recvr, &to_mec, last_iter).await;
if !do_not_break { break; }
if !socket_state.stay_connected.load(Ordering::SeqCst) {
info!("Shutting down remote video loop");
break;
}
// rate limit updates
sleep_until(tokio::time::Instant::now() + Duration::from_millis(10)).await;
}
}
}
info!("Shutting down remote video loop");
if let Err(e) = to_mec
.send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Clear))
.await
{
error!("Could not send message to MEC! {e}");
}
{
// This message forces a redraw after clearing the queue
if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent(
crate::coordinator::MoveEvent { x: 0, y: 0 },
crate::coordinator::ConnectionType::Automated,
))
.await
{
error!(
"Error sending message to MEC during shutdown of tracker thread: {}",
e
);
}
}
socket_state.is_connected.store(false, Ordering::SeqCst);
}
fn get_video_frame(appsink: &AppSink) -> Result<Message, String> {
let sample = appsink
.pull_sample()
.map_err(|e| format!("Could not get sample: {e}"))?;
let buffer = sample.buffer().ok_or("Could not get buffer, was None")?;
let map = buffer
.map_readable()
.map_err(|e| format!("Could not get readable map: {e}"))?;
Ok(Message::binary(map.to_vec()))
}
#[async_recursion]
async fn handle_message(
recvr: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
// sender: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
to_mec: &Sender<ApplicationEvent>,
last_iter: Instant,
) -> bool {
match recvr.try_next().await {
Ok(Some(message)) => {
match message {
Message::Close(_) => {
info!("Close packet received from remote computer: {:?}", message);
return false;
}
Message::Pong(_) | Message::Frame(_) | Message::Text(_) => {
warn!("There was an unhandled message type from the camera: {}\n{}", message, message.to_string());
// this was not the expected response, recursion!
return handle_message(recvr, to_mec, last_iter).await;
}
Message::Ping(_) => {
// Ping/Pongs are handled by tokio tungstenite on reads and writes
// this was not the expected response, recursion!
return handle_message(recvr, to_mec, last_iter).await;
}
Message::Binary(bin) => {
let message = std::str::from_utf8(&bin);
if let Err(e) = message {
error!("Could not decode binary message! Assuming corrupted response! {e}");
}
match process_incoming_string(message.unwrap().to_string()) {
Ok(v) => {
if let Err(e) = to_mec
.send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Update(
TrackerUpdatePackage {
boxes: v,
time: Instant::now(),
request_duration: Instant::now() - last_iter,
},
)))
.await
{
error!("Could not send to MEC! {e}");
return false;
}
}
Err(e) => {
error!("Could not parse incoming string! {}\n{}", e, message.unwrap());
}
};
}
}
}
Ok(None) => {
info!("Recieved an empty message from the remote computer: Aborting");
return false;
}
Err(e) => {
error!("Got an error on while recieving from remote computer: {e}");
}
}
return true;
}

View file

@ -1,233 +0,0 @@
use gstreamer::{prelude::*, PadLinkError};
use gstreamer::{Element, ElementFactory, Pipeline};
use gstreamer_app::AppSink;
use gstreamer::glib::BoolError;
use snafu::prelude::*;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct WebcamPipeline {
pub pipeline: Pipeline,
pub sink_frame: Arc<Mutex<AppSink>>,
}
impl WebcamPipeline {
pub fn new(jpeg_quality: i32) -> Result<WebcamPipeline, PipelineError> {
let pipeline = Pipeline::with_name("webcam_pipeline");
// All of the following errors are unrecoverable
let mut video_src = "";
if cfg!(windows) {
video_src = "mfvideosrc";
} else if cfg!(unix) {
video_src = "v4l2src";
}
let source = ElementFactory::make(video_src)
.build()
.context(BuildSnafu {
element: "mfvideosrc",
})?;
let convert = ElementFactory::make("videoconvert")
.build()
.context(BuildSnafu {
element: "videoconvert",
})?;
let rate = ElementFactory::make("videorate")
.build()
.context(BuildSnafu {
element: "videorate",
})?;
let tee = ElementFactory::make("tee")
.build()
.context(BuildSnafu { element: "tee" })?;
let queue_app = ElementFactory::make("queue")
.property("max-size-time", 1u64)
.property("max-size-buffers", 0u32)
.property("max-size-bytes", 0u32)
.build()
.context(BuildSnafu {
element: "paintable queue",
})?;
let webrtc_sink = ElementFactory::make("webrtcsink")
// .property("meta", "meta,name=gst-stream")
.name("web rtc sink")
.build()
.context(BuildSnafu {
element: "webrtcsink"
})?;
// queue.connect_closure("overrun", false, glib::closure!(|queue: Element| {
// println!("The queue is full!");
// }));
let appsink_queue = ElementFactory::make("queue")
.property("max-size-time", 1u64)
.property("max-size-buffers", 0u32)
.property("max-size-bytes", 0u32)
.build()
.context(BuildSnafu {
element: "appsink queue",
})?;
let resize = ElementFactory::make("videoscale")
.build()
.context(BuildSnafu {
element: "videoscale",
})?;
let jpeg_enc = ElementFactory::make("jpegenc")
.property("quality", jpeg_quality)
.build()
.context(BuildSnafu { element: "jpegenc" })?;
let caps_string = "image/jpeg,width=640,height=640";
let appsrc_caps = gstreamer::Caps::from_str(caps_string).context(BuildSnafu {
element: "appsink caps",
})?;
let sink_frame = AppSink::builder()
.name("frame_appsink")
.sync(false)
.max_buffers(1u32)
.drop(true)
.caps(&appsrc_caps)
.build();
sink_frame.set_property("caps", &appsrc_caps.to_value());
pipeline
.add_many([
&source,
&convert,
&rate,
&tee,
&queue_app,
&webrtc_sink,
&appsink_queue,
&resize,
&jpeg_enc,
&sink_frame.upcast_ref(),
])
.context(LinkSnafu {
from: "all",
to: "pipeline",
})?;
Element::link_many([&source, &convert, &rate]).context(LinkSnafu {
from: "source et. al.",
to: "rate",
})?;
// -- BEGIN PAINTABLE SINK PIPELINE
let tee_caps =
// gstreamer::caps::Caps::from_str("video/x-raw,framerate=15/1").context(BuildSnafu {
gstreamer::caps::Caps::from_str("video/x-raw").context(BuildSnafu {
element: "tee caps",
})?;
rate.link_filtered(&tee, &tee_caps).context(LinkSnafu {
from: "videorate",
to: "tee",
})?;
let tee_src_1 = tee
.request_pad_simple("src_%u")
.ok_or(PipelineError::PadRequest {
element: "tee pad 1".to_string(),
})?;
let paintable_queue_sinkpad =
queue_app
.static_pad("sink")
.ok_or(PipelineError::PadRequest {
element: "gtk4 sink".to_string(),
})?;
tee_src_1
.link(&paintable_queue_sinkpad)
.context(PadLinkSnafu {
from: "tee src pad",
to: "gtk4 paintable queue",
})?;
queue_app.link(&webrtc_sink).context(LinkSnafu {
from: "gtk4 paintable queue",
to: "gtk4 paintable",
})?;
// -- END PAINTABLE SINK PIPELINE
// -- BEGIN APPSINK PIPELINE
let tee_src_2 = tee
.request_pad_simple("src_%u")
.ok_or(PipelineError::PadRequest {
element: "tee pad 2".to_string(),
})?;
let appsink_queue_sinkpad =
appsink_queue
.static_pad("sink")
.ok_or(PipelineError::PadRequest {
element: "appsink queue".to_string(),
})?;
tee_src_2
.link(&appsink_queue_sinkpad)
.context(PadLinkSnafu {
from: "tee src pad 2",
to: "appsink queue sinkpad",
})?;
appsink_queue.link(&resize).context(LinkSnafu {
from: "appsink_queue",
to: "resize",
})?;
let resize_caps =
gstreamer::caps::Caps::from_str("video/x-raw,format=RGB,width=640,height=640")
.context(BuildSnafu {
element: "resize_caps",
})?;
resize
.link_filtered(&jpeg_enc, &resize_caps)
.context(LinkSnafu {
from: "jpeg_enc",
to: "resize_caps",
})?;
Element::link_many([&jpeg_enc, &sink_frame.upcast_ref()]).context(LinkSnafu {
from: "jpeg_enc",
to: "appsink",
})?;
Ok(WebcamPipeline {
pipeline,
sink_frame: Arc::new(Mutex::new(sink_frame)),
})
}
}
#[derive(Debug, Snafu)]
pub enum PipelineError {
#[snafu(display("Error during element linking"))]
Link {
source: BoolError,
from: String,
to: String,
},
#[snafu(display("Error linking pads"))]
PadLink {
source: PadLinkError,
from: String,
to: String,
},
#[snafu(display("Error creating element"))]
Build { source: BoolError, element: String },
#[snafu(display("Error getting pad from element"))]
PadRequest { element: String },
}

View file

@ -1,187 +1,216 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use std::{sync::{Arc, Mutex}, time::Duration};
use async_channel::Sender;
use tokio::{runtime, sync:: RwLock};
use tracing::{self, debug, info, error};
use tauri::{AppHandle, Manager, Window};
use lazy_static::lazy_static;
#[cfg(not(debug_assertions))]
use tracing_subscriber;
use vcs_common::ApplicationMessage;
use webrtc_remote::start_listener;
use crate::config::{load_config, AppConfig};
mod config;
mod coordinator;
mod gstreamer_pipeline;
mod sources;
mod states;
mod tauri_functions;
mod webrtc_remote;
use coordinator::{start_coordinator, ApplicationEvent};
lazy_static! {
static ref TO_MEC_REF: Mutex<Option<Sender<ApplicationEvent>>> = Mutex::new(None);
static ref TO_WEBRTC: Mutex<Option<Sender<ApplicationMessage>>> = Mutex::new(None);
static ref APP_HANDLE: Mutex<Option<AppHandle>> = Mutex::new(None);
}
fn main() {
#[cfg(not(debug_assertions))]
{
let file_appender = tracing_appender::rolling::daily(".\\logs", "camera-controller");
let (non_blocking, _gaurd) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_max_level(tracing::Level::DEBUG)
.with_ansi(false)
.init();
}
#[cfg(all(not(feature = "tokio-debug"), debug_assertions))]
{
let _sub = tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
.init();
}
#[cfg(feature = "tokio-debug")]
{
console_subscriber::init();
}
let (to_mec, mec) = async_channel::bounded::<ApplicationEvent>(10);
info!("Logging intialized");
let config: Arc<RwLock<AppConfig>> = Arc::new(RwLock::new(load_config()));
gstreamer::init().expect("Unable to start gstreamer");
let rt = runtime::Runtime::new().expect("Could not start tokio runtime");
let handle = rt.handle().clone();
let _coordinator = rt.handle().spawn(start_coordinator(
mec,
to_mec.clone(),
handle,
config
));
*TO_MEC_REF.lock().unwrap() = Some(to_mec.clone());
let (to_webrtc_send, to_webrtc_recv) = async_channel::bounded::<vcs_common::ApplicationMessage>(10);
let (from_webrtc_send, from_webrtc_recv) = async_channel::bounded::<vcs_common::ApplicationMessage>(10);
rt.handle().spawn(start_listener(to_webrtc_recv, from_webrtc_send));
*TO_WEBRTC.lock().unwrap() = Some(to_webrtc_send.clone());
rt.handle().spawn(async move {
while let Ok(msg) = from_webrtc_recv.recv().await {
let mut do_sleep = false;
{
if let Ok(mut e) = APP_HANDLE.lock() {
if e.is_none() {
do_sleep = true;
} else {
let handle = e.take().unwrap();
match msg {
vcs_common::ApplicationMessage::WebRTCPacket(msg) => {
debug!("Got a message from the webrtc connection! {:?}", msg);
handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap();
}
vcs_common::ApplicationMessage::WebRTCIceCandidateInit(msg) => {
debug!("Got an ICE init candidate from the webrtc connection! {:?}", msg);
handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap();
}
vcs_common::ApplicationMessage::WebRTCIceCandidate(msg) => {
debug!("Got an ICE candidate from the webrtc connection! {:?}", msg);
handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap();
}
}
*e = Some(handle);
}
}
}
if do_sleep {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
});
tauri::Builder::default()
.manage(tauri_functions::TauriState { to_mec: to_mec.clone() })
.invoke_handler(tauri::generate_handler![tauri_functions::connect_to_camera])
.setup(|app| {
*APP_HANDLE.lock().unwrap() = Some(app.handle());
let _id2 = app.listen_global("webrtc-message", |event| {
debug!("Got webrtc-message event from Tauri client! {:#?}", event);
match event.payload() {
Some(payload) => {
if let Ok(e) = TO_WEBRTC.lock() {
match e.as_ref() {
Some(to_webrtc) => {
debug!("Sending message to the webrtc connection");
let message: Option<ApplicationMessage> = match payload {
s if s.starts_with("{\"type") => Some(ApplicationMessage::WebRTCPacket(serde_json::from_str(payload).expect("Could not decode the browser's sdp"))),
s if s.starts_with("{\"candidate") => Some(ApplicationMessage::WebRTCIceCandidateInit(serde_json::from_str(payload).expect("Could not decode the browser's Ice Candidate"))),
_ => None
};
if message.is_some() {
if let Err(e) = to_webrtc.send_blocking(message.unwrap()) {
error!("Could not send to mec! {e}");
}
}
},
None => {
error!("TO_MEC_REF was none!");
}
}
}
}
None => {
info!("There was an empty payload!");
}
}
});
// let _id = app.listen_global("webrtc-event", |event| {
// match event.payload() {
// Some(payload) => {
// if let Ok(e) = TO_MEC_REF.lock() {
// match e.as_ref() {
// Some(to_mec) => {
// if let Err(e) = to_mec.send_blocking(ApplicationEvent::WebRTCMessage(payload.to_string())) {
// error!("Could not send to mec! {e}");
// }
// },
// None => {
// error!("TO_MEC_REF was none!");
// }
// }
// }
// }
// None => {
// info!("There was an empty payload!");
// }
// }
// });
Ok(())
})
.run(tauri::generate_context!())
.expect("error while running tauri application");
let _ = to_mec.send_blocking(ApplicationEvent::Close);
}
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use async_channel::Sender;
use lazy_static::lazy_static;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tauri::{AppHandle, Manager};
use tokio::{runtime, sync::RwLock};
use tracing::{self, debug, error, info};
#[cfg(not(debug_assertions))]
use tracing_subscriber;
use vcs_common::ApplicationMessage;
use webrtc_remote::start_listener;
use crate::config::{load_config, AppConfig};
mod config;
mod coordinator;
mod sources;
mod tauri_functions;
mod webrtc_remote;
use coordinator::{run_main_event_loop, ApplicationEvent};
lazy_static! {
static ref TO_MEC_REF: Mutex<Option<Sender<ApplicationEvent>>> = Mutex::new(None);
static ref TO_WEBRTC: Mutex<Option<Sender<ApplicationMessage>>> = Mutex::new(None);
static ref APP_HANDLE: Mutex<Option<AppHandle>> = Mutex::new(None);
}
fn main() {
#[cfg(not(debug_assertions))]
{
let file_appender = tracing_appender::rolling::daily(".\\logs", "camera-controller");
let (non_blocking, _gaurd) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_max_level(tracing::Level::DEBUG)
.with_ansi(false)
.init();
}
#[cfg(all(not(feature = "tokio-debug"), debug_assertions))]
{
let _sub = tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
.init();
}
#[cfg(feature = "tokio-debug")]
{
console_subscriber::init();
}
let (to_mec, mec) = async_channel::bounded::<ApplicationEvent>(10);
info!("Logging intialized");
let config: Arc<RwLock<AppConfig>> = Arc::new(RwLock::new(load_config()));
let rt = runtime::Runtime::new().expect("Could not start tokio runtime");
let handle = rt.handle().clone();
let _coordinator = rt
.handle()
.spawn(run_main_event_loop(mec, to_mec.clone(), config, handle));
*TO_MEC_REF.lock().unwrap() = Some(to_mec.clone());
let (to_webrtc_send, to_webrtc_recv) =
async_channel::bounded::<vcs_common::ApplicationMessage>(10);
let (from_webrtc_send, from_webrtc_recv) =
async_channel::bounded::<vcs_common::ApplicationMessage>(10);
rt.handle()
.spawn(start_listener(to_webrtc_recv, from_webrtc_send));
*TO_WEBRTC.lock().unwrap() = Some(to_webrtc_send.clone());
rt.handle().spawn(async move {
while let Ok(msg) = from_webrtc_recv.recv().await {
let mut do_sleep = false;
{
if let Ok(mut e) = APP_HANDLE.lock() {
if e.is_none() {
do_sleep = true;
} else {
let handle = e.take().unwrap();
match msg {
vcs_common::ApplicationMessage::WebRTCPacket(msg) => {
debug!("Got a message from the webrtc connection! {:?}", msg);
handle
.emit_all(
"frontend_message",
serde_json::to_string(&msg).unwrap(),
)
.unwrap();
}
vcs_common::ApplicationMessage::WebRTCIceCandidateInit(msg) => {
debug!(
"Got an ICE init candidate from the webrtc connection! {:?}",
msg
);
handle
.emit_all(
"frontend_message",
serde_json::to_string(&msg).unwrap(),
)
.unwrap();
}
vcs_common::ApplicationMessage::WebRTCIceCandidate(msg) => {
debug!(
"Got an ICE candidate from the webrtc connection! {:?}",
msg
);
handle
.emit_all(
"frontend_message",
serde_json::to_string(&msg).unwrap(),
)
.unwrap();
}
}
*e = Some(handle);
}
}
}
if do_sleep {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
});
tauri::Builder::default()
.manage(tauri_functions::TauriState {
to_mec: to_mec.clone(),
})
.invoke_handler(tauri::generate_handler![tauri_functions::connect_to_camera])
.setup(|app| {
*APP_HANDLE.lock().unwrap() = Some(app.handle());
let _id2 = app.listen_global("webrtc-message", |event| {
debug!("Got webrtc-message event from Tauri client! {:#?}", event);
match event.payload() {
Some(payload) => {
if let Ok(e) = TO_WEBRTC.lock() {
match e.as_ref() {
Some(to_webrtc) => {
debug!("Sending message to the webrtc connection");
let message: Option<ApplicationMessage> = match payload {
s if s.starts_with("{\"type") => {
Some(ApplicationMessage::WebRTCPacket(
serde_json::from_str(payload)
.expect("Could not decode the browser's sdp"),
))
}
s if s.starts_with("{\"candidate") => {
Some(ApplicationMessage::WebRTCIceCandidateInit(
serde_json::from_str(payload).expect(
"Could not decode the browser's Ice Candidate",
),
))
}
_ => None,
};
if message.is_some() {
if let Err(e) = to_webrtc.send_blocking(message.unwrap()) {
error!("Could not send to mec! {e}");
}
}
}
None => {
error!("TO_MEC_REF was none!");
}
}
}
}
None => {
info!("There was an empty payload!");
}
}
});
// let _id = app.listen_global("webrtc-event", |event| {
// match event.payload() {
// Some(payload) => {
// if let Ok(e) = TO_MEC_REF.lock() {
// match e.as_ref() {
// Some(to_mec) => {
// if let Err(e) = to_mec.send_blocking(ApplicationEvent::WebRTCMessage(payload.to_string())) {
// error!("Could not send to mec! {e}");
// }
// },
// None => {
// error!("TO_MEC_REF was none!");
// }
// }
// }
// }
// None => {
// info!("There was an empty payload!");
// }
// }
// });
Ok(())
})
.run(tauri::generate_context!())
.expect("error while running tauri application");
let _ = to_mec.send_blocking(ApplicationEvent::Close);
}

View file

@ -1,4 +1,4 @@
use crate::coordinator::{ApplicationEvent, MoveEvent};
use crate::coordinator::{ApplicationEvent, Point};
use async_channel::Sender;
use gilrs::{ev::filter::FilterFn, Axis, Button, Event, EventType, Filter, Gilrs, GilrsBuilder};
@ -86,13 +86,10 @@ pub async fn joystick_loop(tx: Sender<ApplicationEvent>, is_alive: Arc<AtomicBoo
count_zeros = 0;
}
match tx.try_send(ApplicationEvent::MoveEvent(
MoveEvent {
x: curr_x,
y: curr_y,
},
crate::coordinator::ConnectionType::Local,
)) {
match tx.try_send(ApplicationEvent::Move(Point {
x: curr_x,
y: curr_y,
})) {
Ok(_) => {}
Err(async_channel::TrySendError::Closed(_)) => {
info!("MEC is closed, stopping Joystick loop");

View file

@ -1,2 +1 @@
pub mod joystick_source;

View file

@ -1,56 +0,0 @@
use std::fmt::Display;
#[derive(Debug, Clone, Copy)]
pub struct BoxCoords {
pub id: u32,
pub x1: u32,
pub y1: u32,
pub x2: u32,
pub y2: u32,
}
impl Display for BoxCoords {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Absolute Box {}, x1: {}, y1: {}, x2: {}, y2: {}",
self.id, self.x1, self.y1, self.x2, self.y2
)
}
}
#[derive(Debug, Clone, Copy)]
pub struct NormalizedBoxCoords {
pub id: u32,
pub x1: f32,
pub y1: f32,
pub x2: f32,
pub y2: f32,
}
impl NormalizedBoxCoords {
fn absolute_coords(&self, width: i32, height: i32) -> BoxCoords {
BoxCoords {
id: self.id,
x1: (self.x1 * width as f32) as u32,
y1: (self.y1 * height as f32) as u32,
x2: (self.x2 * width as f32) as u32,
y2: (self.y2 * height as f32) as u32,
}
}
fn area(&self) -> f32 {
(self.x2 - self.x1) * (self.y2 - self.y1)
}
}
impl Display for NormalizedBoxCoords {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Normalized Box {}, x1: {}, y1: {}, x2: {}, y2: {}",
self.id, self.x1, self.y1, self.x2, self.y2
)
}
}

View file

@ -1,5 +0,0 @@
pub mod perf_state;
pub mod tracker_state;
pub mod box_coords;

View file

@ -1,82 +0,0 @@
use std::{collections::VecDeque, time::Duration};
use tracing::info;
const MAX_RECORDED_TIMES: usize = 10;
const DEGRADED_TRACKER_TIME: u128 = 150;
#[derive(Debug)]
pub struct TrackerMetrics {
pub header_text: String,
pub fail_count: usize,
tracker_times: VecDeque<u128>,
}
impl TrackerMetrics {
pub fn new() -> Self {
let mut ret = TrackerMetrics {
header_text: String::from(""),
fail_count: 0,
tracker_times: VecDeque::with_capacity(MAX_RECORDED_TIMES),
};
ret.clear_times();
ret
}
fn update_gui(&mut self) {
info!("Trying to update the gui");
// todo!("No gui channel sent yet");
}
pub fn starting_connection(&mut self, fail_count: Option<usize>) {
self.clear_times();
self.header_text.clear();
match fail_count {
None => self.header_text.push_str("Status: Connecting ..."),
Some(v) => self.header_text.push_str(&format!("Status: Attempt {}/5", v)),
}
self.update_gui();
}
pub fn clear_times(&mut self) {
for _ in 0..10 {
self.tracker_times.pop_front();
}
self.header_text = "Status: Disconnected".to_string();
self.update_gui();
}
pub fn insert_time(&mut self, new_measurement: Duration) {
if self.tracker_times.len() > MAX_RECORDED_TIMES {
let _ = self.tracker_times.pop_front();
}
self.tracker_times.push_back(new_measurement.as_millis());
let avg_time = self.tracker_times.iter().sum::<u128>() / self.tracker_times.len() as u128;
if avg_time == 0 {
self.header_text = format!(
"Status: Failed Avg Response: {} ms",
avg_time
);
}
if avg_time > DEGRADED_TRACKER_TIME {
self.header_text = format!(
"Status: Degraded Avg Response: {} ms",
avg_time
);
} else {
self.header_text = format!(
"Status: Nominal Avg Response: {} ms",
avg_time
);
}
self.update_gui();
}
}

View file

@ -1,79 +0,0 @@
use std::{
cmp::{max, min},
time::Instant,
};
use crate::states::box_coords::NormalizedBoxCoords;
#[derive(Debug)]
pub struct TrackerState {
pub tracking_id: u32,
pub highlighted_id: Option<u32>,
pub last_detect: Instant,
pub enabled: bool,
pub update_ids: bool,
pub identity_boxes: Vec<NormalizedBoxCoords>,
}
impl TrackerState {
pub fn clear(&mut self) {
self.tracking_id = 0;
self.highlighted_id = None;
self.last_detect = Instant::now();
self.enabled = false;
self.update_ids = false;
self.identity_boxes.clear();
}
pub fn update_from_boxes(&mut self, new_boxes: Vec<NormalizedBoxCoords>) {
let mut old_ids: Vec<u32> = self.identity_boxes.iter().map(|x| x.id).collect();
old_ids.sort();
let mut new_ids: Vec<u32> = new_boxes.iter().map(|x| x.id).collect();
new_ids.sort();
self.update_ids = new_ids == old_ids;
self.identity_boxes = new_boxes;
}
// TODO: have a return type that says "highlighted ID not found" and "no highlighted id selected"
// It's not really worth logging...
pub fn calculate_tracking(&mut self) -> core::result::Result<(i32, i32, bool), String> {
if let Some(target_box) = self
.identity_boxes
.iter()
.find(|e| e.id == self.tracking_id)
{
let x_adjust = calc_x_adjust(target_box.x1, target_box.x2);
let y_adjust = calc_y_adjust(target_box.y1);
self.last_detect = std::time::Instant::now();
Ok((x_adjust, y_adjust, self.enabled))
} else {
Err("Couldn't find target in results".to_string())
}
}
}
fn calc_x_adjust(x1: f32, x2: f32) -> i32 {
let dist_from_center = ((x1 + x2) / 2.0) - 0.5;
let mut x_adjust = ((dist_from_center / 0.5 * 2.0) * 100.0) as i32;
if x_adjust < 15 && x_adjust > -15 {
x_adjust = 0;
}
min(max(x_adjust, -100), 100)
}
fn calc_y_adjust(y1: f32) -> i32 {
// All values are normalized, then multiplied by 1000. 500 == 50% of the screen
let mut y_adjust = ((y1 - 0.1) * 250.0) as i32;
if y_adjust < 0 {
y_adjust -= 20;
} else if y_adjust < 30 {
y_adjust = 0;
} else {
y_adjust = (y_adjust as f32 * 0.75) as i32;
}
min(max(y_adjust, -100), 100)
}

View file

@ -1,14 +1,17 @@
use async_channel::Sender;
use tauri::State;
use crate::coordinator::ApplicationEvent;
pub struct TauriState {
pub to_mec: Sender<ApplicationEvent>,
}
#[tauri::command]
pub fn connect_to_camera(state: State<'_, TauriState>) {
let _ = state.to_mec.send_blocking(ApplicationEvent::CameraConnectionPress);
}
use async_channel::Sender;
use tauri::State;
use tracing::info;
use crate::coordinator::ApplicationEvent;
pub struct TauriState {
pub to_mec: Sender<ApplicationEvent>,
}
#[tauri::command]
pub fn connect_to_camera(state: State<'_, TauriState>) {
// let _ = state
// .to_mec
// .send_blocking(ApplicationEvent::CameraConnectionPress);
info!("Connect to Camera button press event");
}

View file

@ -1,132 +1,120 @@
use std::{cmp::Ordering, sync::{atomic::AtomicBool, Arc}};
use futures_util::{StreamExt, SinkExt};
use tokio::{net::TcpListener, sync::Mutex};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use tracing::{error, info, instrument, debug};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
#[instrument(skip_all)]
pub async fn start_listener(
from_app: AppReceiver,
to_ui: AppSender,
) {
info!("starting tcplistener");
let listener = TcpListener::bind("localhost:7891").await.unwrap();
info!("Listening for webrtc connections on localhost:7891");
if let Ok((stream, _)) = listener.accept().await {
match accept_async(stream).await {
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
Ok(ws_stream) => {
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
tokio::spawn(async move {
while let Ok(msg) = from_app.recv().await {
#[cfg(debug_assertions)]
{
// serialized message
match serde_json::to_string(&msg) {
Err(e) => error!("Could not serialize ApplicationMessage to JSON! {e}"),
Ok(msg) => {
if let Err(e) = ws_sender.send(Message::text(msg)).await {
error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
#[cfg(not(debug_assertions))]
{
match bincode::serialize(&msg) {
Err(e) => error!("Could not serialize ApplicationMessage into binary! {e}"),
Ok(e) => {
if let Err(e) = sender.send(Message::binary(msg)).await {
error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
}
});
while let Some(msg) = ws_receiver.next().await {
match msg {
Err(e) => {
error!("There was an error getting a message from the remote! {e}");
}
Ok(msg) => match msg {
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => {
info!("Received WebSocket close message! Closing the websocket");
break;
}
Message::Frame(_) => {
info!("Received a Frame websocket message?");
}
Message::Text(text) => {
debug!("Recieved text from websocket: {text}");
#[cfg(debug_assertions)]
{
match serde_json::from_str(&text) {
Ok(msg) => {
if let Err(e) = to_ui.send(msg).await {
error!("Could not send message from ws to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Text` message from the remote while running in release mode! " +
"Was the other endpoint running release mode?\n msg: {text}");
}
}
Message::Binary(msg) => {
#[cfg(debug_assertions)]
{
match bincode::deserialize::<ApplicationMessage>(&msg) {
Ok(m) => {
if let Err(e) = to_ui.send(m).await {
error!("Could not send message to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed binary message from the websocket!\n{e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
"Was the other endpoing running debug mode?");
}
}
}
}
}
}
}
}
}
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_tungstenite::{accept_async, tungstenite::Message};
use tracing::{debug, error, info, instrument};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
#[instrument(skip_all)]
pub async fn start_listener(from_app: AppReceiver, to_ui: AppSender) {
info!("starting tcplistener");
let listener = TcpListener::bind("localhost:7891").await.unwrap();
info!("Listening for webrtc connections on localhost:7891");
if let Ok((stream, _)) = listener.accept().await {
match accept_async(stream).await {
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
Ok(ws_stream) => {
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
tokio::spawn(async move {
while let Ok(msg) = from_app.recv().await {
#[cfg(debug_assertions)]
{
// serialized message
match serde_json::to_string(&msg) {
Err(e) => {
error!("Could not serialize ApplicationMessage to JSON! {e}")
}
Ok(msg) => {
if let Err(e) = ws_sender.send(Message::text(msg)).await {
error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
#[cfg(not(debug_assertions))]
{
match bincode::serialize(&msg) {
Err(e) => error!(
"Could not serialize ApplicationMessage into binary! {e}"
),
Ok(e) => {
if let Err(e) = sender.send(Message::binary(msg)).await {
error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
}
});
while let Some(msg) = ws_receiver.next().await {
match msg {
Err(e) => {
error!("There was an error getting a message from the remote! {e}");
}
Ok(msg) => match msg {
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => {
info!("Received WebSocket close message! Closing the websocket");
break;
}
Message::Frame(_) => {
info!("Received a Frame websocket message?");
}
Message::Text(text) => {
debug!("Recieved text from websocket: {text}");
#[cfg(debug_assertions)]
{
match serde_json::from_str(&text) {
Ok(msg) => {
if let Err(e) = to_ui.send(msg).await {
error!("Could not send message from ws to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Text` message from the remote while running in release mode! " +
"Was the other endpoint running release mode?\n msg: {text}");
}
}
Message::Binary(msg) => {
#[cfg(debug_assertions)]
{
match bincode::deserialize::<ApplicationMessage>(&msg) {
Ok(m) => {
if let Err(e) = to_ui.send(m).await {
error!("Could not send message to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed binary message from the websocket!\n{e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
"Was the other endpoing running debug mode?");
}
}
},
}
}
}
}
}
}