add support for observability pipeline by jchrostek-dd · Pull Request #826 · DataDog/datadog-lambda-extension
Conversation
Task
Overview
- Add support for sending logs to an Observability Pipeline instead of directly to Datadog.
- To enable, customers must set
DD_ENABLE_OBSERVABILITY_PIPELINE_FORWARDINGto true, andDD_LOGS_CONFIG_LOGS_DD_URLto their Observability Pipeline endpoint. Will fast follow and update docs to reflect this. - Initially, I was using setting up the observability pipeline with 'Datadog Agent' as the source. This required us to format the log message in a certain format. However, chatting with the Observability Pipeline Team, they actually recommend we use 'Http Server' as the source for our pipeline setup instead since this just accepts any json.
Testing
Created an observability pipeline and deployed a lambda function with the changes. Triggered the lambda function and confirmed we see it in our logs. We know it is going through the observability pipeline because we can see an attached 'http_server' attached as the source type.
| let mut current_batch = guard.get_batch(); | ||
| while !current_batch.is_empty() { | ||
| batches.push(self.compress(current_batch)); | ||
| // Temporarily disable flat transform to OPW v2 format |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meant to remove. Will fix.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| } | ||
| */ | ||
| pub(crate) fn to_flat_json(data: Vec<u8>) -> Vec<u8> { | ||
| let _input_bytes = data.len(); |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also meant to remove?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
| obj.insert("ddtags".to_string(), serde_json::Value::String(ddtags)); | ||
| out.push(serde_json::Value::Object(obj)); | ||
| } | ||
| let _out_len = out.len(); |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also meant to remove?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
| #[serde(deserialize_with = "deserialize_logs_additional_endpoints")] | ||
| pub logs_config_additional_endpoints: Vec<LogsAdditionalEndpoint>, | ||
|
|
||
| /// @env `DD_ENABLE_OBSERVABILITY_PIPELINE_FORWARDING` |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this environment variable come from?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by where does it come from?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in, from which docs are we pulling the environment variable, normally we want to ensure that new environment variables make sense or are consistent across products
Comment on lines +257 to +258
| let transformed = Self::to_flat_json(current_batch); | ||
| batches.push(self.compress(transformed)); |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always flatten the json?
| "timestamp": 1757352915141 | ||
| } | ||
| */ | ||
| pub(crate) fn to_flat_json(data: Vec<u8>) -> Vec<u8> { |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't serde do something like this already?
| let mut flushers = Vec::new(); | ||
|
|
||
| let endpoint = if config.observability_pipelines_worker_logs_enabled { | ||
| config.observability_pipelines_worker_logs_url.clone() |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the url is empty?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM – just ensure we log an error when endpoint is empty, should the mod.rs for the obs pipeline url be an option?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters