Nima Rezainia

Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector

Confluent Cloud users can now use the updated Elasticsearch Sink Connector with Elastic Agent and Elastic Integrations for a fully-managed and highly scalable data ingest architecture.

11 min read

Elastic and Confluent are key technology partners and we're pleased to announce new investments in that partnership. Built by the original creators of Apache Kafka®, Confluent's data streaming platform is a key component of many Enterprise ingest architectures, and it ensures that customers can guarantee delivery of critical Observability and Security data into their Elasticsearch clusters. Together, we've been working on key improvements to how our products fit together. With Elastic Agent's new Kafka output and Confluent's newly improved Elasticsearch Sink Connectors it's never been easier to seamlessly collect data from the edge, stream it through Kafka, and into an Elasticsearch cluster.

In this blog, we examine a simple way to integrate Elastic Agent with Confluent Cloud's Kafka offering to reduce the operational burden of ingesting business-critical data.

Benefits of Elastic Agent and Confluent Cloud

When combined, Elastic Agent and Confluent Cloud's updated Elasticsearch Sink connector provide a myriad of advantages for organizations of all sizes. This combined solution offers flexibility in handling any type of data ingest workload in an efficient and resilient manner.

Fully Managed

When combined, Elastic Cloud Serverless and Confluent Cloud provide users with a fully managed service. This makes it effortless to deploy and ingest nearly unlimited data volumes without having to worry about nodes, clusters, or scaling.

Full Elastic Integrations Support

Sending data through Kafka is fully supported with any of the 300+ Elastic Integrations. In this blog post, we outline how to set up the connection between the two platforms. This ensures you can benefit from our investments in built-in alerts, SLOs, AI Assistants, and more.

Decoupled Architecture

Kafka acts as a resilient buffer between data sources (such as Elastic Agent and Logstash) and Elasticsearch, decoupling data producers from consumers. This can significantly reduce total cost of ownership by enabling you to size your Elasticsearch cluster based on typical data ingest volume, not maximum ingest volume. It also ensures system resilience during spikes in data volume.

Ultimate control over your data

With our new Output per Integration capability, customers can now send different data to different destinations using the same agent. Customers can easily send security logs directly to Confluent Cloud/Kafka, which can provide delivery guarantees, while sending less critical application logs and system metrics directly to Elasticsearch.

Deploying the reference architecture

In the following sections, we will walk you through one of the ways Confluent Kafka can be integrated with Elastic Agent and Elasticsearch using Confluent Cloud's Elasticsearch Sink Connector. As with any streaming and data collection technology, there are many ways a pipeline can be configured depending on the particular use case. This blog post will focus on a simple architecture that can be used as a starting point for more complex deployments.

Some of the highlights of this architecture are:

  • Dynamic Kafka topic selection at Elastic Agents
  • Elasticsearch Sink Connectors for fully managed transfer from Confluent Kafka to Elasticsearch
  • Processing data leveraging Elastic's 300+ Integrations

Prerequisites

Before getting started ensure you have a Kafka cluster deployed in Confluent Cloud, an Elasticsearch cluster or project deployed in Elastic Cloud, and an installed and enrolled Elastic Agent.

Configure Confluent Cloud Kafka Cluster for Elastic Agent

Navigate to the Kafka cluster in Confluent Cloud, and select Cluster Settings. Locate and note the Bootstrap Server address, we will need this value later when we create the Kafka Output in Fleet.

Navigate to Topics in the left-hand navigation menu and create two topics:

  1. A topic named logs
  2. A topic named metrics

Next, navigate to API Keys in the left-hand navigation menu:

  1. Click + Add API Key
  2. Select the Service Account API key type
  3. Provide a meaningful name for this API Key
  4. Grant the key write permission to the metrics and logs topics
  5. Create the key

Note the provided Key and the Secret, we will need it later when we configure the Kafka Output in Fleet.

Configure Elasticsearch and Elastic Agent

In this section, we will configure the Elastic Agent to send data to Confluent Cloud's Kafka cluster and we will configure Elasticsearch so it can receive data from the Confluent Cloud Elasticsearch Sink Connector.

Configure Elastic Agent to send data to Confluent Cloud

Elastic Fleet simplifies sending data to Kafka and Confluent Cloud. With Elastic Agent, a Kafka "output" can be easily attached to all data coming from an agent or it can be applied only to data coming from a specific data source.

Find Fleet in the left-hand navigation, click the Settings tab. On the Settings tab, find the Outputs section and click Add Output.

Perform the following steps to configure the new Kafka output:

  1. Provide a Name for the output
  2. Set the Type to Kafka
  3. Populate the Hosts field with the Bootstrap Server address we noted earlier .
  4. Under Authentication, populate the Username with the API Key and the Password with the Secret we noted earlier
  5. Under Topics, select Dynamic Topic and set Topic from field to data_stream.type
  6. Click Save and apply settings

Next, we will navigate to the Agent Policies tab in Fleet and click to edit the Agent Policy that we want to attach the Kafka output to. With the Agent Policy open, click the Settings tab and change Output for integrations and Output for agent monitoring to the Kafka output we just created.

Selecting an Output per Elastic Integration: To set the Kafka output to be used for specific data sources, see the integration-level outputs documentation.

A note about Topic Selection: The data_stream.type field is a reserved field which Elastic Agent automatically sets to logs if the data we're sending is a log and metrics if the data we're sending is a metric. Enabling Dynamic Topic selection using data_stream.type, will cause Elastic Agent to automatically route metrics to a metrics topic and logs to a logs topic. For information on topic selection, see the Kafka Output's Topics settings documentation.

Configuring a publishing endpoint in Elasticsearch

Next, we will set up two publishing endpoints (data streams) for the Confluent Cloud Sink Connector to use when publishing documents to Elasticsearch:

  1. We will create a data stream logs-kafka.reroute-default for handling logs
  2. We will create a data stream metrics-kafka.reroute-default for handling metrics

If we were to leave the data in those data streams as-is, the data would be available but we would find the data is unparsed and lacking vital enrichment. So we will also create two index templates and two ingest pipelines to make sure the data is processed by our Elastic Integrations.

Creating the Elasticsearch Index Templates and Ingest Pipelines

The following steps use Dev Tools in Kibana, but all of these steps can be completed via the REST API or using the relevant user interfaces in Stack Management.

First, we will create the Index Template and Ingest Pipeline for handling logs:

PUT _index_template/logs-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "logs-kafka.reroute"
    }
  },
  "index_patterns": [
    "logs-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/logs-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{{data_stream.dataset}}"
        ],
        "namespace": [
          "{{data_stream.namespace}}"
        ]
      }
    }
  ]
}

Next, we will create the Index Template and Ingest Pipeline for handling metrics:

PUT _index_template/metrics-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "metrics-kafka.reroute"
    }
  },
  "index_patterns": [
    "metrics-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/metrics-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{{data_stream.dataset}}"
        ],
        "namespace": [
          "{{data_stream.namespace}}"
        ]
      }
    }
  ]
}

A note about rerouting: For a practical example of how this works, a document related to a Linux Network Metric would be first land in metrics-kafka.reroute-default and this Ingest Pipeline would inspect the document and find data_stream.dataset set to system.network and data_stream.namespace set to default. It would use these values to reroute the document from metrics-kafka.reroute-default to metrics-system.network-default where it would be processed by the system integration.

Configure the Confluent Cloud Elasticsearch Sink Connector

Now it's time to configure the Confluent Cloud Elasticsearch Sink Connector. We will perform the following steps twice and create two separate connectors, one connector for logs and one connector for metrics. Where the required settings differ, we will highlight the correct values.

Navigate to your Kafka cluster in Confluent Cloud and select Connectors from the left-hand navigation menu. On the Connectors page, select Elasticsearch Service Sink from a catalog of connectors available.

Confluent Cloud presents a simplified workflow for the user to configure a connector. Here we will walk through each step of the process:

Step 1: Topic Selection

First, we will select the topic that the connector will consume data from based on which connector we are deploying:

  • When deploying the Elasticsearch Sink Connector for logs, select the logs topic.
  • When deploying the Elasticsearch Sink Connector for metrics, select the metrics topic.

Step 2: Kafka Credentials

Choose KAFKA_API_KEY as the cluster authentication mode. Provide the API Key and Secret noted earlier when we gather required Confluent Cloud Cluster information.

Step 3: Authentication

Provide the Elasticsearch Endpoint address of our Elasticsearch cluster as the Connection URI. The Connection user and Connection password are the authentication information for the account in Elasticsearch that will be used by the Elasticsearch Sink Connector to write data to Elasticsearch.

Step 4: Configuration

In this step we will keep the Input Kafka record value format set to JSON. Next, expand Advanced Configuration.

  1. We will set Data Stream Dataset to kafka.reroute
  2. We will set Data Stream Typebased on the connector we are deploying:
    • When deploying the Elasticsearch Sink Connector for logs, we will set Data Stream Type to logs
    • When deploying the Elasticsearch Sink Connector for metrics, we will set Data Stream Type to metrics
  3. The correct values for other settings will depend on the specific environment.

Step 5: Sizing

In this step, notice that Confluent Cloud provides a recommended minimum number of tasks for our deployment. Following the recommendation here is a good starting place for most deployments.

Step 6: Review and Launch

Review the Connector configuration and Connector pricing sections and if everything looks good, it's time to click continue and launch the connector! The connector may report as provisioning but will soon start consuming data from the Kafka topic and writing it to the Elasticsearch cluster.

You can now navigate to Discover in Kibana and find your logs flowing into Elasticsearch! Also check out the real time metrics that Confluent Cloud provides for your new Elasticsearch Sink Connector deployments.

If you have only deployed the first logs sink connector, you can now repeat the steps above to deploy the second metrics sink connector.

Enjoy your fully managed data ingest architecture

If you followed the steps above, congratulations. You have successfully:

  1. Configured Elastic Agent to send logs and metrics to dedicated topics in Kafka
  2. Created publishing endpoints (data streams) in Elasticsearch dedicated to handling data from the Elasticsearch Sink Connector
  3. Configured managed Elasticsearch Sink connectors to consume data from multiple topics and publish that data to Elasticsearch

Next you should enable additional integrations, deploy more Elastic Agents, explore your data in Kibana, and enjoy the benefits of a fully managed data ingest architecture with Elastic Serverless and Confluent Cloud!

Share this article