add support for observability pipeline by jchrostek-dd · Pull Request #826 · DataDog/datadog-lambda-extension

Conversation

@jchrostek-dd

Task

https://datadoghq.atlassian.net/jira/software/c/projects/SVLS/boards/5420?quickFilter=7573&selectedIssue=SVLS-7525

Overview

  • Add support for sending logs to an Observability Pipeline instead of directly to Datadog.
  • To enable, customers must set DD_ENABLE_OBSERVABILITY_PIPELINE_FORWARDING to true, and DD_LOGS_CONFIG_LOGS_DD_URL to their Observability Pipeline endpoint. Will fast follow and update docs to reflect this.
  • Initially, I was using setting up the observability pipeline with 'Datadog Agent' as the source. This required us to format the log message in a certain format. However, chatting with the Observability Pipeline Team, they actually recommend we use 'Http Server' as the source for our pipeline setup instead since this just accepts any json.

Testing

Created an observability pipeline and deployed a lambda function with the changes. Triggered the lambda function and confirmed we see it in our logs. We know it is going through the observability pipeline because we can see an attached 'http_server' attached as the source type.

jchrostek-dd

let mut current_batch = guard.get_batch();
while !current_batch.is_empty() {
batches.push(self.compress(current_batch));
// Temporarily disable flat transform to OPW v2 format

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meant to remove. Will fix.

lym953

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
*/
pub(crate) fn to_flat_json(data: Vec<u8>) -> Vec<u8> {
let _input_bytes = data.len();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also meant to remove?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

obj.insert("ddtags".to_string(), serde_json::Value::String(ddtags));
out.push(serde_json::Value::Object(obj));
}
let _out_len = out.len();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also meant to remove?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

duncanista

#[serde(deserialize_with = "deserialize_logs_additional_endpoints")]
pub logs_config_additional_endpoints: Vec<LogsAdditionalEndpoint>,

/// @env `DD_ENABLE_OBSERVABILITY_PIPELINE_FORWARDING`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this environment variable come from?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by where does it come from?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in, from which docs are we pulling the environment variable, normally we want to ensure that new environment variables make sense or are consistent across products

duncanista

duncanista

Comment on lines +257 to +258

let transformed = Self::to_flat_json(current_batch);
batches.push(self.compress(transformed));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always flatten the json?

duncanista

"timestamp": 1757352915141
}
*/
pub(crate) fn to_flat_json(data: Vec<u8>) -> Vec<u8> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't serde do something like this already?

lym953

duncanista

let mut flushers = Vec::new();

let endpoint = if config.observability_pipelines_worker_logs_enabled {
config.observability_pipelines_worker_logs_url.clone()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the url is empty?

duncanista

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM – just ensure we log an error when endpoint is empty, should the mod.rs for the obs pipeline url be an option?