Kafka is an open-source distributed event and stream-processing platform built to process demanding real-time data feeds. It is inherently scalable, with high throughput and availability.
You can connect to DigitalOcean Managed Databases using command line tools and other third-party clients. This guide explains where to find your Kafka database’s connection details and how to use them to configure tools and clients.
This API call retrieves the information about your database, including its connection details. The connection details are located in the returned connection
JSON object.
You use your database’s connection details to configure tools, applications, and resources that connect to the database. To view your database’s connection details, click the name of the cluster on the Databases page to go to its Overview page.
You can view customized connection details based on how you want to connect to the database:
Public network and VPC network options generate connection details based on if you want to connect via the cluster’s public hostname or the cluster’s private hostname. Only other resources in the same VPC network as the cluster can access it using its private hostname.
The User field updates the connection details with the user credentials that you would like to connect with.
By default, the control panel doesn’t reveal the cluster’s password for security reasons. Click Copy to copy connection details with the password, or click show-password to reveal the password.
Each managed database comes with an SSL certificate. You can use this SSL certificate to encrypt connections between your client applications and the database.
To download your database’s SSL certificate, click the name of the cluster on the Databases page to go to its Overview page. In the Connection Details section, click Download CA certificate.
When you configure your client applications, you can use the certificate’s location on your local system. Each client application is configured differently, so check the documentation for the tool you’re using for more detail on setting up SSL connections.
You can connect and manage the database using one of the following programming languages, via either SSL or SASL. For the best security, we recommend you connect via SSL.
To connect via SSL, download your cluster’s CA certificate. Then, download its access key and access certificate by clicking Download access key and Download access certificate in the SSL tab.
To connect a producer, use one of the following code blocks and the files you downloaded above:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=f"{HOST}:{SSL_PORT}",
security_protocol="SSL",
ssl_cafile="ca-certificate.crt",
ssl_certfile="user-access-certificate.crt",
ssl_keyfile="user-access-key.key",
)
Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SSL_PORT}");
properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "{KEYSTORE_LOCATION}");
properties.put("ssl.keystore.password", "{KEYSTORE_PASSWORD}");
properties.put("ssl.key.password", "{KEY_PASSWORD}");
properties.put("key.serializer", "{SERIALIZER}");
properties.put("value.serializer", "{SERIALIZER}");
// create a producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
This method requires that you install Sarama, a Go client library for Kafka.
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"github.com/Shopify/sarama"
)
func main() {
keypair, err := tls.LoadX509KeyPair("user-access-certificate.crt", "user-access-key.key")
if err != nil {
log.Println(err)
return
}
caCert, err := ioutil.ReadFile("ca-certificate.crt")
if err != nil {
log.Println(err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Version = sarama.V0_10_2_0
brokers := []string{"{HOST}:{SSL_PORT}"}
producer, err := sarama.NewSyncProducer(brokers, config)
// add your logic
}
const Kafka = require('node-rdkafka');
console.log(Kafka.features); // this should print 'ssl', among other things
const producer = new Kafka.Producer({
'metadata.broker.list': HOST:SSL_PORT,
'security.protocol': 'ssl',
'ssl.key.location': 'user-access-key.key',
'ssl.certificate.location': 'user-access-certificate.crt',
'ssl.ca.location': 'ca-certificate.crt',
'dr_cb': true
});
producer.connect();
producer.on('ready', () => {
// produce the messages and disconnect
});
To connect a consumer, use one of the following code blocks and the files you downloaded above:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"TOPIC_NAME",
auto_offset_reset="START_FROM",
bootstrap_servers=f"{HOST}:{SSL_PORT}",
client_id = CONSUMER_CLIENT_ID,
group_id = CONSUMER_GROUP_ID,
security_protocol="SSL",
ssl_cafile="ca-certificate.crt",
ssl_certfile="user-access-certificate.crt",
ssl_keyfile="user-access-key.key",
)
String group_id = "groupid";
Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SSL_PORT}");
properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "{KEYSTORE_LOCATION}");
properties.put("ssl.keystore.password", "{KEYSTORE_PASSWORD}");
properties.put("ssl.key.password", "{KEY_PASSWORD}");
properties.put("key.deserializer", "{DESERIALIZER}");
properties.put("value.deserializer", "{DESERIALIZER}");
properties.put("group.id", group_id);
// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
This method requires that you install Sarama, a Go client library for Kafka.
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"github.com/Shopify/sarama"
)
func main() {
keypair, err := tls.LoadX509KeyPair("user-access-certificate.crt", "user-access-key.key")
if err != nil {
log.Println(err)
return
}
caCert, err := ioutil.ReadFile("ca-certificate.crt")
if err != nil {
log.Println(err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{keypair},
RootCAs: caCertPool,
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Version = sarama.V0_10_2_0
brokers := []string{"{HOST}:{SSL_PORT}"}
consumer, err := sarama.NewConsumer(brokers, config)
// add your logic
}
const Kafka = require('node-rdkafka');
const stream = new Kafka.createReadStream({
'metadata.broker.list': HOST:SSL_PORT,
'group.id': CONSUMER_GROUP,
'security.protocol': 'ssl',
'ssl.key.location': 'user-access-key.key',
'ssl.certificate.location': 'user-access-certificate.crt',
'ssl.ca.location': 'ca-certificate.crt'
}, {}, {'topics': ['demo-topic']});
stream.on('data', (message) => {
// process message
});
To connect via SASL, download your cluster’s CA certificate.
To connect a producer, use one of the following code blocks and the file you downloaded above:
from kafka import KafkaProducer
# Choose an appropriate SASL mechanism, for instance:
SASL_MECHANISM = 'SCRAM-SHA-256'
producer = KafkaProducer(
bootstrap_servers=f"{HOST}:{SASL_PORT}",
sasl_mechanism = SASL_MECHANISM,
sasl_plain_username = SASL_USERNAME,
sasl_plain_password = SASL_PASSWORD,
security_protocol="SASL_SSL",
ssl_cafile="ca-certificate.crt",
)
String sasl_username = "{USER_NAME}";
String sasl_password = "{SASL_PASSWORD}";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SASL_PORT}");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("ssl.truststore.type", "jks");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("key.serializer", "{SERIALIZER}");
properties.put("value.serializer", "{SERIALIZER}");
// create a producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
This method requires that you install Sarama, a Go client library for Kafka.
package main
import (
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
"io/ioutil"
)
func main() {
caCert, err := ioutil.ReadFile("ca-certificate.crt")
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: caCertPool,
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Metadata.Full = true
config.ClientID = "{CLIENT_ID}"
config.Producer.Return.Successes = true
// Kafka SASL configuration
config.Net.SASL.Enable = true
config.Net.SASL.User = "{SASL_USERNAME}"
config.Net.SASL.Password = "{SASL_PASSWORD}"
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
// TLS configuration
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
brokers := []string{"{HOST}:{SASL_PORT}"}
producer, err := sarama.NewSyncProducer(brokers, config)
// add your logic
}
const Kafka = require('node-rdkafka');
console.log(Kafka.features); // this should print 'sasl_ssl', among other things
const producer = new Kafka.Producer({
'metadata.broker.list': HOST:SASL_PORT,
'security.protocol': 'sasl_ssl',
'sasl.mechanism': SASL_MECHANISM,
'sasl.username': USER_NAME,
'sasl.password': SASL_PASSWORD,
'ssl.ca.location': 'ca-certificate.crt',
'dr_cb': true
});
producer.connect();
producer.on('ready', () => {
// produce the messages and disconnect
});
To connect a consumer, use one of the following code blocks and the files you downloaded above:
from kafka import KafkaConsumer
# Choose an appropriate SASL mechanism, for instance:
SASL_MECHANISM = 'SCRAM-SHA-256'
consumer = KafkaConsumer(
"TOPIC_NAME",
auto_offset_reset = "START_FROM",
bootstrap_servers = f'{HOST}:{SASL_PORT}',
client_id = CONSUMER_CLIENT_ID,
group_id = CONSUMER_GROUP_ID,
sasl_mechanism = SASL_MECHANISM,
sasl_plain_username = SASL_USERNAME,
sasl_plain_password = SASL_PASSWORD,
security_protocol = "SASL_SSL",
ssl_cafile = "ca-certificate.crt"
)
String group_id = "groupid";
String sasl_username = "{USER_NAME}";
String sasl_password = "{SASL_PASSWORD}";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
Properties properties = new Properties();
properties.put("bootstrap.servers", "{HOST}:{SASL_PORT}");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("ssl.truststore.type", "jks");
properties.put("ssl.truststore.location", "{TRUSTSTORE_LOCATION}");
properties.put("ssl.truststore.password", "{TRUSTSTORE_PASSWORD}");
properties.put("key.deserializer", "{DESERIALIZER}");
properties.put("value.deserializer", "{DESERIALIZER}");
properties.put("group.id", group_id);
// create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
This method requires that you install Sarama, a Go client library for Kafka.
package main
import (
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
"io/ioutil"
)
func main() {
caCert, err := ioutil.ReadFile("ca-certificate.crt")
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: caCertPool,
}
// init config, enable errors and notifications
config := sarama.NewConfig()
config.Metadata.Full = true
config.ClientID = "{CLIENT_ID}"
config.Producer.Return.Successes = true
// Kafka SASL configuration
config.Net.SASL.Enable = true
config.Net.SASL.User = "{SASL_USERNAME}"
config.Net.SASL.Password = "{SASL_PASSWORD}"
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
// TLS configuration
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
brokers := []string{"{HOST}:{SASL_PORT}"}
consumer, err := sarama.NewConsumer(brokers, config)
// add your logic
}
const Kafka = require('node-rdkafka');
const stream = new Kafka.createReadStream({
'metadata.broker.list': HOST:SASL_PORT,
'group.id': CONSUMER_GROUP,
'security.protocol': 'sasl_ssl',
'sasl.mechanism': SASL_MECHANISM,
'sasl.username': USER_NAME,
'sasl.password': SASL_PASSWORD,
'ssl.ca.location': 'ca-certificate.crt'
}, {}, {'topics': ['demo-topic']});
stream.on('data', (message) => {
// process message
});