How to Connect to Kafka Clusters

Kafka is an open-source distributed event and stream-processing platform built to process demanding real-time data feeds. It is inherently scalable, with high throughput and availability.


You can connect to DigitalOcean Managed Databases using command line tools and other third-party clients. This guide explains where to find your Kafka database’s connection details and how to use them to configure tools and clients.

Retrieve Database Connection Details Using the CLI

How to retrieve database connection details using the DigitalOcean CLI

To retrieve database connection details via the command-line, follow these steps:

  1. Install doctl, the DigitalOcean command-line tool.

  2. Create a personal access token, and save it for use with doctl.

  3. Use the token to grant doctl access to your DigitalOcean account.

                  doctl auth init
                
  4. Finally, retrieve database connection details with doctl databases connection. The basic usage looks like this, but you'll want to read the usage docs for more details:

                  doctl databases connection <database-cluster-id> [flags]
                

    The following example retrieves the connection details for a database cluster with the ID f81d4fae-7dec-11d0-a765-00a0c91e6bf6

                       doctl databases connection f81d4fae-7dec-11d0-a765-00a0c91e6bf6
                    

Retrieve Database Connection Details Using the API

This API call retrieves the information about your database, including its connection details. The connection details are located in the returned connection JSON object.

How to retrieve database connection details using the DigitalOcean API

To retrieve database connection details using the DigitalOcean API, follow these steps:

  1. Create a personal access token, and save it for use with the API.

  2. Send a GET request to https://api.digitalocean.com/v2/databases/{database_cluster_uuid}

    cURL

    To retrieve database connection details with cURL, call:

    
                    curl -X GET \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $DIGITALOCEAN_TOKEN" \
      "https://api.digitalocean.com/v2/databases/9cc10173-e9ea-4176-9dbc-a4cee4c4ff30"

    Go

    Go developers can use Godo, the official DigitalOcean V2 API client for Go. To retrieve database connection details with Godo, use the following code:

    
                    import (
        "context"
        "os"
    
        "github.com/digitalocean/godo"
    )
    
    func main() {
        token := os.Getenv("DIGITALOCEAN_TOKEN")
    
        client := godo.NewFromToken(token)
        ctx := context.TODO()
    
        cluster, _, err := client.Databases.Get(ctx, "9cc10173-e9ea-4176-9dbc-a4cee4c4ff30")
    }

    Python

    
                    import os
    from pydo import Client
    
    client = Client(token=os.environ.get("DIGITALOCEAN_TOKEN"))
    
    get_resp = client.databases.get_cluster(database_cluster_uuid="a7a89a")

View Kafka Cluster Connection Details

You use your database’s connection details to configure tools, applications, and resources that connect to the database. To view your database’s connection details, click the name of the cluster on the Databases page to go to its Overview page.

Databases Overview screen showing connection string

You can view customized connection details based on how you want to connect to the database:

  • Public network and VPC network options generate connection details based on if you want to connect via the cluster’s public hostname or the cluster’s private hostname. Only other resources in the same VPC network as the cluster can access it using its private hostname.

  • The User field updates the connection details with the user credentials that you would like to connect with.

By default, the control panel doesn’t reveal the cluster’s password for security reasons. Click Copy to copy connection details with the password, or click show-password to reveal the password.

Download the SSL Encryption

Each managed database comes with an SSL certificate. You can use this SSL certificate to encrypt connections between your client applications and the database.

To download your database’s SSL certificate, click the name of the cluster on the Databases page to go to its Overview page. In the Connection Details section, click Download CA certificate.

Databases connection details with Download CA Certificate selected

When you configure your client applications, you can use the certificate’s location on your local system. Each client application is configured differently, so check the documentation for the tool you’re using for more detail on setting up SSL connections.

Connect to the Database

You can connect and manage the database using one of the following programming languages, via either SSL or SASL. For the best security, we recommend you connect via SSL.

Connect via SSL

To connect via SSL, download your cluster’s CA certificate. Then, download its access key and access certificate by clicking Download access key and Download access certificate in the SSL tab.

To connect a producer, use one of the following code blocks and the files you downloaded above:

    
        
            
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=f"{HOST}:{SSL_PORT}",
    security_protocol="SSL",
    ssl_cafile="ca-certificate.crt",
    ssl_certfile="user-access-certificate.crt",
    ssl_keyfile="user-access-key.key",
)

        
    
    
        
            
Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SSL_PORT}");
properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "{KEYSTORE_LOCATION}");
properties.put("ssl.keystore.password", "{KEYSTORE_PASSWORD}");
properties.put("ssl.key.password", "{KEY_PASSWORD}");
properties.put("key.serializer", "{SERIALIZER}");
properties.put("value.serializer", "{SERIALIZER}");

// create a producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        
    

This method requires that you install Sarama, a Go client library for Kafka.

    
        
            
package main

import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    keypair, err := tls.LoadX509KeyPair("user-access-certificate.crt", "user-access-key.key")
    if err != nil {
        log.Println(err)
        return
    }

    caCert, err := ioutil.ReadFile("ca-certificate.crt")
    if err != nil {
        log.Println(err)
        return
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    tlsConfig := &tls.Config{
        Certificates: []tls.Certificate{keypair},
        RootCAs: caCertPool,
    }

    // init config, enable errors and notifications
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = tlsConfig
    config.Version = sarama.V0_10_2_0

    brokers := []string{"{HOST}:{SSL_PORT}"}

    producer, err := sarama.NewSyncProducer(brokers, config)

    // add your logic
}

        
    
    
        
            
const Kafka = require('node-rdkafka');
console.log(Kafka.features); // this should print 'ssl', among other things

const producer = new Kafka.Producer({
    'metadata.broker.list': HOST:SSL_PORT,
    'security.protocol': 'ssl',
    'ssl.key.location': 'user-access-key.key',
    'ssl.certificate.location': 'user-access-certificate.crt',
    'ssl.ca.location': 'ca-certificate.crt',
    'dr_cb': true
});

producer.connect();

producer.on('ready', () => {
    // produce the messages and disconnect
});

        
    

To connect a consumer, use one of the following code blocks and the files you downloaded above:

    
        
            
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "TOPIC_NAME",
    auto_offset_reset="START_FROM",
    bootstrap_servers=f"{HOST}:{SSL_PORT}",
    client_id = CONSUMER_CLIENT_ID,
    group_id = CONSUMER_GROUP_ID,
    security_protocol="SSL",
    ssl_cafile="ca-certificate.crt",
    ssl_certfile="user-access-certificate.crt",
    ssl_keyfile="user-access-key.key",
)

        
    
    
        
            
String group_id = "groupid";

Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SSL_PORT}");
properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "{KEYSTORE_LOCATION}");
properties.put("ssl.keystore.password", "{KEYSTORE_PASSWORD}");
properties.put("ssl.key.password", "{KEY_PASSWORD}");
properties.put("key.deserializer", "{DESERIALIZER}");
properties.put("value.deserializer", "{DESERIALIZER}");
properties.put("group.id", group_id);

// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        
    

This method requires that you install Sarama, a Go client library for Kafka.

    
        
            
package main

import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    keypair, err := tls.LoadX509KeyPair("user-access-certificate.crt", "user-access-key.key")
    if err != nil {
        log.Println(err)
        return
    }

    caCert, err := ioutil.ReadFile("ca-certificate.crt")
    if err != nil {
        log.Println(err)
        return
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    tlsConfig := &tls.Config{
        Certificates: []tls.Certificate{keypair},
        RootCAs: caCertPool,
    }

    // init config, enable errors and notifications
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = tlsConfig
    config.Version = sarama.V0_10_2_0

    brokers := []string{"{HOST}:{SSL_PORT}"}

    consumer, err := sarama.NewConsumer(brokers, config)

    // add your logic
}

        
    
    
        
            
const Kafka = require('node-rdkafka');

const stream = new Kafka.createReadStream({
    'metadata.broker.list': HOST:SSL_PORT,
    'group.id': CONSUMER_GROUP,
    'security.protocol': 'ssl',
    'ssl.key.location': 'user-access-key.key',
    'ssl.certificate.location': 'user-access-certificate.crt',
    'ssl.ca.location': 'ca-certificate.crt'
}, {}, {'topics': ['demo-topic']});

stream.on('data', (message) => {
    // process message
});

        
    

Connect via SASL

To connect via SASL, download your cluster’s CA certificate.

To connect a producer, use one of the following code blocks and the file you downloaded above:

    
        
            
from kafka import KafkaProducer

# Choose an appropriate SASL mechanism, for instance:
SASL_MECHANISM = 'SCRAM-SHA-256'

producer = KafkaProducer(
   bootstrap_servers=f"{HOST}:{SASL_PORT}",
   sasl_mechanism = SASL_MECHANISM,
   sasl_plain_username = SASL_USERNAME,
   sasl_plain_password = SASL_PASSWORD,
   security_protocol="SASL_SSL",
   ssl_cafile="ca-certificate.crt",
)

        
    
    
        
            
String sasl_username = "{USER_NAME}";
String sasl_password = "{SASL_PASSWORD}";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);

Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SASL_PORT}");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("ssl.truststore.type", "jks");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("key.serializer", "{SERIALIZER}");
properties.put("value.serializer", "{SERIALIZER}");

// create a producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        
    

This method requires that you install Sarama, a Go client library for Kafka.

    
        
            
package main

import (
    "crypto/tls"
    "crypto/x509"
    "github.com/Shopify/sarama"
    "io/ioutil"
)

func main() {
    caCert, err := ioutil.ReadFile("ca-certificate.crt")
    if err != nil {
        panic(err)
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    tlsConfig := &tls.Config{
        RootCAs: caCertPool,
    }

    // init config, enable errors and notifications
    config := sarama.NewConfig()
    config.Metadata.Full = true
    config.ClientID = "{CLIENT_ID}"
    config.Producer.Return.Successes = true

    // Kafka SASL configuration
    config.Net.SASL.Enable = true
    config.Net.SASL.User = "{SASL_USERNAME}"
    config.Net.SASL.Password = "{SASL_PASSWORD}"
    config.Net.SASL.Handshake = true
    config.Net.SASL.Mechanism = sarama.SASLTypePlaintext

    // TLS configuration
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = tlsConfig

    brokers := []string{"{HOST}:{SASL_PORT}"}
    producer, err := sarama.NewSyncProducer(brokers, config)

    // add your logic
}

        
    
    
        
            
const Kafka = require('node-rdkafka');
console.log(Kafka.features); // this should print 'sasl_ssl', among other things

const producer = new Kafka.Producer({
    'metadata.broker.list': HOST:SASL_PORT,
    'security.protocol': 'sasl_ssl',
    'sasl.mechanism': SASL_MECHANISM,
    'sasl.username': USER_NAME,
    'sasl.password': SASL_PASSWORD,
    'ssl.ca.location': 'ca-certificate.crt',
    'dr_cb': true
});

producer.connect();

producer.on('ready', () => {
  // produce the messages and disconnect
});

        
    

To connect a consumer, use one of the following code blocks and the files you downloaded above:

    
        
            
from kafka import KafkaConsumer

# Choose an appropriate SASL mechanism, for instance:
SASL_MECHANISM = 'SCRAM-SHA-256'

consumer = KafkaConsumer(
    "TOPIC_NAME",
    auto_offset_reset = "START_FROM",
    bootstrap_servers = f'{HOST}:{SASL_PORT}',
    client_id = CONSUMER_CLIENT_ID,
    group_id = CONSUMER_GROUP_ID,
    sasl_mechanism = SASL_MECHANISM,
    sasl_plain_username = SASL_USERNAME,
    sasl_plain_password = SASL_PASSWORD,
    security_protocol = "SASL_SSL",
    ssl_cafile = "ca-certificate.crt"
)

        
    
    
        
            
String group_id = "groupid";
String sasl_username = "{USER_NAME}";
String sasl_password = "{SASL_PASSWORD}";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);

Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SASL_PORT}");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("ssl.truststore.type", "jks");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("key.deserializer", "{DESERIALIZER}");
properties.put("value.deserializer", "{DESERIALIZER}");
properties.put("group.id", group_id);

// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        
    

This method requires that you install Sarama, a Go client library for Kafka.

    
        
            
package main

import (
    "crypto/tls"
    "crypto/x509"
    "github.com/Shopify/sarama"
    "io/ioutil"
)

func main() {
    caCert, err := ioutil.ReadFile("ca-certificate.crt")
    if err != nil {
        panic(err)
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    tlsConfig := &tls.Config{
        RootCAs: caCertPool,
    }

    // init config, enable errors and notifications
    config := sarama.NewConfig()
    config.Metadata.Full = true
    config.ClientID = "{CLIENT_ID}"
    config.Producer.Return.Successes = true

    // Kafka SASL configuration
    config.Net.SASL.Enable = true
    config.Net.SASL.User = "{SASL_USERNAME}"
    config.Net.SASL.Password = "{SASL_PASSWORD}"
    config.Net.SASL.Handshake = true
    config.Net.SASL.Mechanism = sarama.SASLTypePlaintext

    // TLS configuration
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = tlsConfig

    brokers := []string{"{HOST}:{SASL_PORT}"}
    consumer, err := sarama.NewConsumer(brokers, config)

    // add your logic
}

        
    
    
        
            
const Kafka = require('node-rdkafka');

const stream = new Kafka.createReadStream({
    'metadata.broker.list': HOST:SASL_PORT,
    'group.id': CONSUMER_GROUP,
    'security.protocol': 'sasl_ssl',
    'sasl.mechanism': SASL_MECHANISM,
    'sasl.username': USER_NAME,
    'sasl.password': SASL_PASSWORD,
    'ssl.ca.location': 'ca-certificate.crt'
}, {}, {'topics': ['demo-topic']});

stream.on('data', (message) => {
    // process message
});