out_opentelemetry: Add resource attributes support by cb645j · Pull Request #11574 · fluent/fluent-bit
/*
* For each key name in ctx->ra_resource_attributes_message_list, look it up in
* the msgpack message body and promote the value to an OTLP resource attribute.
*/
static void set_resource_attributes_from_message_body(
struct opentelemetry_context *ctx,
msgpack_object *body,
Opentelemetry__Proto__Resource__V1__Resource *resource)
{
int i;
size_t key_len;
size_t map_key_len;
char *map_key_ptr;
struct mk_list *head;
struct flb_config_map_val *mv;
struct flb_slist_entry *entry;
const char *normalized_key;
msgpack_object_kv *kv;
Opentelemetry__Proto__Common__V1__KeyValue *attr;
Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs;
/* * Use ctx->ra_resource_attributes_message directly — this is the pointer * managed by the Fluent Bit config-map framework and is reliably available * in every worker thread context, unlike an embedded mk_list copy. */ if (!ctx->ra_resource_attributes_message || mk_list_size(ctx->ra_resource_attributes_message) == 0) { return; }
if (body == NULL || body->type != MSGPACK_OBJECT_MAP) { return; }
/* Iterate directly over the config-map-managed list */ flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { if (mk_list_size(mv->val.list) != 1) { continue; }
entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); normalized_key = entry->str; key_len = flb_sds_len(entry->str);
if (key_len == 0) { continue; }
/* * Allow optional record accessor prefix so both "service.name" and * "$service.name" are treated as the same map key. */ if (key_len > 0 && normalized_key[0] == '$') { normalized_key++; key_len--; }
/* * Also tolerate bracket forms like $['service.name'] and * $["service.name"] for literal keys. */ if (key_len >= 4 && normalized_key[0] == '[') { if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' && normalized_key[key_len - 1] == ']') || (normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' && normalized_key[key_len - 1] == ']')) { normalized_key += 2; key_len -= 4; } }
if (key_len == 0) { continue; }
/* tolerate quoted key names like "service.name" or 'service.name' */ if (key_len >= 2) { if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') || (normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) { normalized_key++; key_len -= 2; } }
if (key_len == 0) { continue; }
for (i = 0; i < body->via.map.size; i++) { kv = &body->via.map.ptr[i];
if (kv->key.type == MSGPACK_OBJECT_STR) { map_key_ptr = kv->key.via.str.ptr; map_key_len = kv->key.via.str.size; } else if (kv->key.type == MSGPACK_OBJECT_BIN) { map_key_ptr = (char *) kv->key.via.bin.ptr; map_key_len = kv->key.via.bin.size; } else { continue; }
if (map_key_len != key_len) { continue; }
if (strncmp(map_key_ptr, normalized_key, key_len) != 0) { continue; }
/* Found the key — convert to OTLP KeyValue */ if (kv->key.type == MSGPACK_OBJECT_STR) { attr = msgpack_kv_to_otlp_any_value(kv); } else { attr = otlp_kvpair_value_initialize(); if (attr != NULL) { attr->key = flb_strndup(map_key_ptr, map_key_len);
if (attr->key != NULL) { attr->value = msgpack_object_to_otlp_any_value(&kv->val); }
if (attr->key == NULL || attr->value == NULL) { otlp_kvpair_destroy(attr); attr = NULL; } } }
if (!attr) { flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", entry->str); break; }
/* Grow the resource attributes array by one slot */ tmp_attrs = flb_realloc(resource->attributes, (resource->n_attributes + 1) * sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); if (!tmp_attrs) { flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'", entry->str); otlp_kvpair_destroy(attr); break; }
resource->attributes = tmp_attrs; resource->attributes[resource->n_attributes] = attr; resource->n_attributes++; break; } } }
/* * Use ctx->ra_resource_attributes_message directly — this is the pointer * managed by the Fluent Bit config-map framework and is reliably available * in every worker thread context, unlike an embedded mk_list copy. */ if (!ctx->ra_resource_attributes_message || mk_list_size(ctx->ra_resource_attributes_message) == 0) { return; }
if (body == NULL || body->type != MSGPACK_OBJECT_MAP) { return; }
/* Iterate directly over the config-map-managed list */ flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { if (mk_list_size(mv->val.list) != 1) { continue; }
entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); normalized_key = entry->str; key_len = flb_sds_len(entry->str);
if (key_len == 0) { continue; }
/* * Allow optional record accessor prefix so both "service.name" and * "$service.name" are treated as the same map key. */ if (key_len > 0 && normalized_key[0] == '$') { normalized_key++; key_len--; }
/* * Also tolerate bracket forms like $['service.name'] and * $["service.name"] for literal keys. */ if (key_len >= 4 && normalized_key[0] == '[') { if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' && normalized_key[key_len - 1] == ']') || (normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' && normalized_key[key_len - 1] == ']')) { normalized_key += 2; key_len -= 4; } }
if (key_len == 0) { continue; }
/* tolerate quoted key names like "service.name" or 'service.name' */ if (key_len >= 2) { if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') || (normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) { normalized_key++; key_len -= 2; } }
if (key_len == 0) { continue; }
for (i = 0; i < body->via.map.size; i++) { kv = &body->via.map.ptr[i];
if (kv->key.type == MSGPACK_OBJECT_STR) { map_key_ptr = kv->key.via.str.ptr; map_key_len = kv->key.via.str.size; } else if (kv->key.type == MSGPACK_OBJECT_BIN) { map_key_ptr = (char *) kv->key.via.bin.ptr; map_key_len = kv->key.via.bin.size; } else { continue; }
if (map_key_len != key_len) { continue; }
if (strncmp(map_key_ptr, normalized_key, key_len) != 0) { continue; }
/* Found the key — convert to OTLP KeyValue */ if (kv->key.type == MSGPACK_OBJECT_STR) { attr = msgpack_kv_to_otlp_any_value(kv); } else { attr = otlp_kvpair_value_initialize(); if (attr != NULL) { attr->key = flb_strndup(map_key_ptr, map_key_len);
if (attr->key != NULL) { attr->value = msgpack_object_to_otlp_any_value(&kv->val); }
if (attr->key == NULL || attr->value == NULL) { otlp_kvpair_destroy(attr); attr = NULL; } } }
if (!attr) { flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", entry->str); break; }
/* Grow the resource attributes array by one slot */ tmp_attrs = flb_realloc(resource->attributes, (resource->n_attributes + 1) * sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); if (!tmp_attrs) { flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'", entry->str); otlp_kvpair_destroy(attr); break; }
resource->attributes = tmp_attrs; resource->attributes[resource->n_attributes] = attr; resource->n_attributes++; break; } } }