feat(dgw): basic network monitoring (#1446) · Devolutions/devolutions-gateway@d135342
1+use std::collections::{HashMap, HashSet, VecDeque};
2+use std::future::Future;
3+use std::pin::Pin;
4+use std::sync::Arc;
5+use std::time::Duration;
6+use std::{fs, io, mem};
7+8+use anyhow::anyhow;
9+use camino::Utf8PathBuf;
10+use network_scanner_net::runtime::Socket2Runtime;
11+use serde::{Deserialize, Serialize};
12+use thiserror::Error;
13+use time::UtcDateTime;
14+use tokio::select;
15+use tokio_util::sync::CancellationToken;
16+use tracing::warn;
17+18+use network_scanner::ping;
19+20+mod log_queue;
21+mod state;
22+23+pub use crate::state::State;
24+25+#[derive(Error, Debug)]
26+#[error(transparent)]
27+pub enum SetConfigError {
28+Io(#[from] io::Error),
29+Serde(#[from] serde_json::Error),
30+Other(#[from] anyhow::Error),
31+}
32+33+pub async fn set_config(config: MonitorsConfig, state: Arc<State>) -> Result<(), SetConfigError> {
34+let file = fs::OpenOptions::new()
35+.create(true)
36+.write(true)
37+.truncate(true)
38+.open(&state.cache_path)?;
39+40+let mut config_write = state.config.write().map_err(|_| anyhow!("config lock poisoned"))?;
41+42+ serde_json::to_writer_pretty(&file, &config)?;
43+44+let old_config = mem::replace(&mut *config_write, config);
45+46+let new_config_set: HashSet<MonitorDefinition> = config_write.monitors.clone().into_iter().collect();
47+let old_config_set: HashSet<MonitorDefinition> = old_config.monitors.into_iter().collect();
48+49+drop(config_write);
50+51+let added = new_config_set.difference(&old_config_set).cloned();
52+let deleted = old_config_set.difference(&new_config_set);
53+54+let (new_cancellation_tokens, new_monitors): (
55+Vec<(String, CancellationToken)>,
56+Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
57+) = added
58+.map(|definition| {
59+let cancellation_token = CancellationToken::new();
60+let cancellation_monitor = cancellation_token.clone();
61+62+let state = Arc::clone(&state);
63+let definition_id = definition.id.clone();
64+65+let monitor = async move {
66+loop {
67+let start_time = UtcDateTime::now();
68+69+let monitor_result = match &definition.probe {
70+ProbeType::Ping => {
71+let scanner_runtime = match &*state.scanner_runtime {
72+Ok(scanner_runtime) => Arc::clone(scanner_runtime),
73+Err(error) => {
74+warn!(error = %error, monitor_id = definition.id, "scanning runtime failed to start, aborting monitor");
75+break;
76+},
77+};
78+do_ping_monitor(&definition, scanner_runtime).await
79+},
80+ProbeType::TcpOpen => do_tcpopen_monitor(&definition).await,
81+};
82+83+ state.log.write(monitor_result);
84+85+let elapsed = UtcDateTime::now() - start_time;
86+let next_run_in =
87+(definition.interval as f64 - elapsed.as_seconds_f64()).clamp(1.0, f64::INFINITY);
88+select! {
89+ _ = cancellation_monitor.cancelled() => { return }
90+ _ = tokio::time::sleep(Duration::from_secs_f64(next_run_in)) => { }
91+};
92+}
93+};
94+95+(
96+(definition_id, cancellation_token),
97+Box::pin(monitor) as Pin<Box<dyn Future<Output = ()> + Send>>,
98+)
99+})
100+.unzip();
101+102+let mut cancellation_tokens_write = state
103+.cancellation_tokens
104+.lock()
105+.map_err(|_| anyhow!("cancellation token lock poisoned"))?;
106+107+for definition in deleted {
108+ cancellation_tokens_write[&definition.id].cancel();
109+ cancellation_tokens_write.remove(&definition.id);
110+}
111+112+for (monitor_id, cancellation_token) in new_cancellation_tokens {
113+ cancellation_tokens_write.insert(monitor_id, cancellation_token);
114+}
115+116+for monitoring_task in new_monitors {
117+ tokio::spawn(monitoring_task);
118+}
119+120+Ok(())
121+}
122+123+async fn do_ping_monitor(definition: &MonitorDefinition, scanner_runtime: Arc<Socket2Runtime>) -> MonitorResult {
124+let start_time = UtcDateTime::now();
125+126+let ping_result = async || -> anyhow::Result<time::Duration> {
127+ ping::ping_addr(
128+ scanner_runtime,
129+format!("{hostname}:0", hostname = definition.address),
130+Duration::from_secs(definition.timeout),
131+)
132+.await?;
133+// TODO: send more than 1 ping packet
134+135+Ok(UtcDateTime::now() - start_time)
136+}()
137+.await;
138+139+match ping_result {
140+Ok(time) => MonitorResult {
141+monitor_id: definition.id.clone(),
142+request_start_time: start_time,
143+response_success: true,
144+response_messages: None,
145+response_time: time.as_seconds_f64(),
146+},
147+Err(error) => MonitorResult {
148+monitor_id: definition.id.clone(),
149+request_start_time: start_time,
150+response_success: false,
151+response_messages: Some(format!("{error:#}")),
152+response_time: f64::INFINITY,
153+},
154+}
155+}
156+157+async fn do_tcpopen_monitor(definition: &MonitorDefinition) -> MonitorResult {
158+MonitorResult {
159+monitor_id: definition.id.clone(),
160+request_start_time: UtcDateTime::now(),
161+response_success: false,
162+response_messages: Some("not implemented".into()),
163+response_time: f64::INFINITY,
164+}
165+}
166+167+pub fn drain_log(state: Arc<State>) -> VecDeque<MonitorResult> {
168+ state.log.drain()
169+}
170+171+#[derive(Debug, Serialize, Deserialize)]
172+pub struct MonitorsConfig {
173+pub monitors: Vec<MonitorDefinition>,
174+}
175+176+impl MonitorsConfig {
177+fn empty() -> MonitorsConfig {
178+MonitorsConfig { monitors: Vec::new() }
179+}
180+181+#[doc(hidden)]
182+fn mock() -> MonitorsConfig {
183+MonitorsConfig {
184+monitors: vec![MonitorDefinition {
185+ id: "a".to_owned(),
186+ probe: ProbeType::Ping,
187+ address: "c".to_owned(),
188+ interval: 1,
189+ timeout: 2,
190+ port: Some(3),
191+}],
192+}
193+}
194+}
195+196+#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)]
197+#[serde(rename_all = "camelCase")]
198+pub enum ProbeType {
199+Ping,
200+TcpOpen,
201+}
202+203+#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)]
204+pub struct MonitorDefinition {
205+pub id: String,
206+pub probe: ProbeType,
207+pub address: String,
208+pub interval: u64,
209+pub timeout: u64,
210+pub port: Option<i16>,
211+}
212+213+#[derive(PartialEq, Clone, Debug)]
214+pub struct MonitorResult {
215+pub monitor_id: String,
216+pub request_start_time: UtcDateTime,
217+pub response_success: bool,
218+pub response_messages: Option<String>,
219+pub response_time: f64,
220+}