feat(dgw): basic network monitoring (#1446) · Devolutions/devolutions-gateway@d135342

1+

use std::collections::{HashMap, HashSet, VecDeque};

2+

use std::future::Future;

3+

use std::pin::Pin;

4+

use std::sync::Arc;

5+

use std::time::Duration;

6+

use std::{fs, io, mem};

7+8+

use anyhow::anyhow;

9+

use camino::Utf8PathBuf;

10+

use network_scanner_net::runtime::Socket2Runtime;

11+

use serde::{Deserialize, Serialize};

12+

use thiserror::Error;

13+

use time::UtcDateTime;

14+

use tokio::select;

15+

use tokio_util::sync::CancellationToken;

16+

use tracing::warn;

17+18+

use network_scanner::ping;

19+20+

mod log_queue;

21+

mod state;

22+23+

pub use crate::state::State;

24+25+

#[derive(Error, Debug)]

26+

#[error(transparent)]

27+

pub enum SetConfigError {

28+

Io(#[from] io::Error),

29+

Serde(#[from] serde_json::Error),

30+

Other(#[from] anyhow::Error),

31+

}

32+33+

pub async fn set_config(config: MonitorsConfig, state: Arc<State>) -> Result<(), SetConfigError> {

34+

let file = fs::OpenOptions::new()

35+

.create(true)

36+

.write(true)

37+

.truncate(true)

38+

.open(&state.cache_path)?;

39+40+

let mut config_write = state.config.write().map_err(|_| anyhow!("config lock poisoned"))?;

41+42+

serde_json::to_writer_pretty(&file, &config)?;

43+44+

let old_config = mem::replace(&mut *config_write, config);

45+46+

let new_config_set: HashSet<MonitorDefinition> = config_write.monitors.clone().into_iter().collect();

47+

let old_config_set: HashSet<MonitorDefinition> = old_config.monitors.into_iter().collect();

48+49+

drop(config_write);

50+51+

let added = new_config_set.difference(&old_config_set).cloned();

52+

let deleted = old_config_set.difference(&new_config_set);

53+54+

let (new_cancellation_tokens, new_monitors): (

55+

Vec<(String, CancellationToken)>,

56+

Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,

57+

) = added

58+

.map(|definition| {

59+

let cancellation_token = CancellationToken::new();

60+

let cancellation_monitor = cancellation_token.clone();

61+62+

let state = Arc::clone(&state);

63+

let definition_id = definition.id.clone();

64+65+

let monitor = async move {

66+

loop {

67+

let start_time = UtcDateTime::now();

68+69+

let monitor_result = match &definition.probe {

70+

ProbeType::Ping => {

71+

let scanner_runtime = match &*state.scanner_runtime {

72+

Ok(scanner_runtime) => Arc::clone(scanner_runtime),

73+

Err(error) => {

74+

warn!(error = %error, monitor_id = definition.id, "scanning runtime failed to start, aborting monitor");

75+

break;

76+

},

77+

};

78+

do_ping_monitor(&definition, scanner_runtime).await

79+

},

80+

ProbeType::TcpOpen => do_tcpopen_monitor(&definition).await,

81+

};

82+83+

state.log.write(monitor_result);

84+85+

let elapsed = UtcDateTime::now() - start_time;

86+

let next_run_in =

87+

(definition.interval as f64 - elapsed.as_seconds_f64()).clamp(1.0, f64::INFINITY);

88+

select! {

89+

_ = cancellation_monitor.cancelled() => { return }

90+

_ = tokio::time::sleep(Duration::from_secs_f64(next_run_in)) => { }

91+

};

92+

}

93+

};

94+95+

(

96+

(definition_id, cancellation_token),

97+

Box::pin(monitor) as Pin<Box<dyn Future<Output = ()> + Send>>,

98+

)

99+

})

100+

.unzip();

101+102+

let mut cancellation_tokens_write = state

103+

.cancellation_tokens

104+

.lock()

105+

.map_err(|_| anyhow!("cancellation token lock poisoned"))?;

106+107+

for definition in deleted {

108+

cancellation_tokens_write[&definition.id].cancel();

109+

cancellation_tokens_write.remove(&definition.id);

110+

}

111+112+

for (monitor_id, cancellation_token) in new_cancellation_tokens {

113+

cancellation_tokens_write.insert(monitor_id, cancellation_token);

114+

}

115+116+

for monitoring_task in new_monitors {

117+

tokio::spawn(monitoring_task);

118+

}

119+120+

Ok(())

121+

}

122+123+

async fn do_ping_monitor(definition: &MonitorDefinition, scanner_runtime: Arc<Socket2Runtime>) -> MonitorResult {

124+

let start_time = UtcDateTime::now();

125+126+

let ping_result = async || -> anyhow::Result<time::Duration> {

127+

ping::ping_addr(

128+

scanner_runtime,

129+

format!("{hostname}:0", hostname = definition.address),

130+

Duration::from_secs(definition.timeout),

131+

)

132+

.await?;

133+

// TODO: send more than 1 ping packet

134+135+

Ok(UtcDateTime::now() - start_time)

136+

}()

137+

.await;

138+139+

match ping_result {

140+

Ok(time) => MonitorResult {

141+

monitor_id: definition.id.clone(),

142+

request_start_time: start_time,

143+

response_success: true,

144+

response_messages: None,

145+

response_time: time.as_seconds_f64(),

146+

},

147+

Err(error) => MonitorResult {

148+

monitor_id: definition.id.clone(),

149+

request_start_time: start_time,

150+

response_success: false,

151+

response_messages: Some(format!("{error:#}")),

152+

response_time: f64::INFINITY,

153+

},

154+

}

155+

}

156+157+

async fn do_tcpopen_monitor(definition: &MonitorDefinition) -> MonitorResult {

158+

MonitorResult {

159+

monitor_id: definition.id.clone(),

160+

request_start_time: UtcDateTime::now(),

161+

response_success: false,

162+

response_messages: Some("not implemented".into()),

163+

response_time: f64::INFINITY,

164+

}

165+

}

166+167+

pub fn drain_log(state: Arc<State>) -> VecDeque<MonitorResult> {

168+

state.log.drain()

169+

}

170+171+

#[derive(Debug, Serialize, Deserialize)]

172+

pub struct MonitorsConfig {

173+

pub monitors: Vec<MonitorDefinition>,

174+

}

175+176+

impl MonitorsConfig {

177+

fn empty() -> MonitorsConfig {

178+

MonitorsConfig { monitors: Vec::new() }

179+

}

180+181+

#[doc(hidden)]

182+

fn mock() -> MonitorsConfig {

183+

MonitorsConfig {

184+

monitors: vec![MonitorDefinition {

185+

id: "a".to_owned(),

186+

probe: ProbeType::Ping,

187+

address: "c".to_owned(),

188+

interval: 1,

189+

timeout: 2,

190+

port: Some(3),

191+

}],

192+

}

193+

}

194+

}

195+196+

#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)]

197+

#[serde(rename_all = "camelCase")]

198+

pub enum ProbeType {

199+

Ping,

200+

TcpOpen,

201+

}

202+203+

#[derive(Eq, PartialEq, Hash, Clone, Serialize, Deserialize, Debug)]

204+

pub struct MonitorDefinition {

205+

pub id: String,

206+

pub probe: ProbeType,

207+

pub address: String,

208+

pub interval: u64,

209+

pub timeout: u64,

210+

pub port: Option<i16>,

211+

}

212+213+

#[derive(PartialEq, Clone, Debug)]

214+

pub struct MonitorResult {

215+

pub monitor_id: String,

216+

pub request_start_time: UtcDateTime,

217+

pub response_success: bool,

218+

pub response_messages: Option<String>,

219+

pub response_time: f64,

220+

}