Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 53 additions & 29 deletions cmake/kafka.cmake
Original file line number Diff line number Diff line change
@@ -1,61 +1,79 @@
# Kafka CMake Configuration
# kafka.cmake - Clean version without internal AWS check
FLB_OPTION(RDKAFKA_BUILD_STATIC On)
FLB_OPTION(RDKAFKA_BUILD_EXAMPLES Off)
FLB_OPTION(RDKAFKA_BUILD_TESTS Off)
FLB_OPTION(ENABLE_LZ4_EXT Off)

include(FindPkgConfig)

# Check for libsasl2 (required for SASL authentication)
set(FLB_SASL_ENABLED OFF)
# librdkafka has built-in support for:
# - SASL/PLAIN (built-in, no external deps)
# - SASL/SCRAM (built-in, no external deps)
# - SASL/OAUTHBEARER (built-in, no external deps)
# Only SASL/GSSAPI (Kerberos) requires cyrus-sasl library

# Check for cyrus-sasl (optional, only needed for GSSAPI/Kerberos)
set(FLB_SASL_CYRUS_ENABLED OFF)
if(PkgConfig_FOUND)
pkg_check_modules(SASL libsasl2)
if(SASL_FOUND)
message(STATUS "Found libsasl2: ${SASL_VERSION}")
set(FLB_SASL_ENABLED ON)
message(STATUS "Found cyrus-sasl: ${SASL_VERSION}")
set(FLB_SASL_CYRUS_ENABLED ON)
else()
message(WARNING "libsasl2 not found - SASL authentication will be disabled")
# Fallback detection when pkg-config finds no package
find_library(SASL2_LIB NAMES sasl2)
find_path(SASL2_INCLUDE NAMES sasl/sasl.h)
if(SASL2_LIB AND SASL2_INCLUDE)
set(FLB_SASL_CYRUS_ENABLED ON)
set(SASL_LIBRARIES ${SASL2_LIB})
set(SASL_INCLUDE_DIRS ${SASL2_INCLUDE})
message(STATUS "Found cyrus-sasl via fallback: ${SASL2_LIB}")
else()
message(STATUS "cyrus-sasl not found - SASL/GSSAPI (Kerberos) will be disabled")
endif()
endif()
else()
message(WARNING "pkg-config not available - trying fallback SASL detection")
# Fallback detection
message(STATUS "pkg-config not available - trying fallback cyrus-sasl detection")
find_library(SASL2_LIB NAMES sasl2)
find_path(SASL2_INCLUDE NAMES sasl/sasl.h)
if(SASL2_LIB AND SASL2_INCLUDE)
set(FLB_SASL_ENABLED ON)
message(STATUS "Found libsasl2 via fallback: ${SASL2_LIB}")
set(FLB_SASL_CYRUS_ENABLED ON)
set(SASL_LIBRARIES ${SASL2_LIB})
set(SASL_INCLUDE_DIRS ${SASL2_INCLUDE})
message(STATUS "Found cyrus-sasl via fallback: ${SASL2_LIB}")
else()
message(STATUS "cyrus-sasl not found - SASL/GSSAPI (Kerberos) will be disabled")
endif()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
endif()

# OAuth Bearer support:
# - Windows: Built-in SASL, only needs SSL (no Cyrus SASL required)
# - Linux/macOS: Needs both SSL and Cyrus SASL
if(FLB_SYSTEM_WINDOWS)
if(FLB_TLS)
set(FLB_SASL_OAUTHBEARER_ENABLED ON)
else()
set(FLB_SASL_OAUTHBEARER_ENABLED OFF)
endif()
# SASL is always enabled (built-in PLAIN/SCRAM/OAUTHBEARER support)
set(FLB_SASL_ENABLED ON)

# OAuth Bearer support requires TLS on all platforms
# librdkafka only enables WITH_SASL_OAUTHBEARER when WITH_SSL=ON
if(FLB_TLS)
set(FLB_SASL_OAUTHBEARER_ENABLED ON)
else()
# Non-Windows platforms: require Cyrus SASL
set(FLB_SASL_OAUTHBEARER_ENABLED ${FLB_SASL_ENABLED})
set(FLB_SASL_OAUTHBEARER_ENABLED OFF)
endif()

# MSK IAM requires OAuth Bearer support
set(FLB_KAFKA_MSK_IAM_ENABLED ${FLB_SASL_OAUTHBEARER_ENABLED})

# Configure librdkafka options
# On Windows, enable WITH_SASL for SSPI support (built-in, no Cyrus needed)
# On other platforms, WITH_SASL depends on Cyrus SASL availability
if(FLB_SYSTEM_WINDOWS)
FLB_OPTION(WITH_SASL ON)
# WITH_SASL is always ON for built-in SASL support (PLAIN/SCRAM/OAUTHBEARER)
# On Windows, this also enables SSPI support
FLB_OPTION(WITH_SASL ON)
FLB_OPTION(WITH_SSL ${FLB_TLS}) # Honor Fluent Bit TLS setting
FLB_OPTION(WITH_SASL_OAUTHBEARER ${FLB_SASL_OAUTHBEARER_ENABLED})

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
# Explicitly set WITH_SASL_CYRUS based on detection
# Must use set(... CACHE BOOL ... FORCE) to override any cached value
if(FLB_SASL_CYRUS_ENABLED)
set(WITH_SASL_CYRUS ON CACHE BOOL "Enable Cyrus SASL support" FORCE)
else()
FLB_OPTION(WITH_SASL ${FLB_SASL_ENABLED})
set(WITH_SASL_CYRUS OFF CACHE BOOL "Enable Cyrus SASL support" FORCE)
endif()
FLB_OPTION(WITH_SSL On)
FLB_OPTION(WITH_SASL_OAUTHBEARER ${FLB_SASL_OAUTHBEARER_ENABLED})
FLB_OPTION(WITH_SASL_CYRUS ${FLB_SASL_ENABLED})

# Export compile-time definitions using FLB_DEFINITION macro
if(FLB_SASL_ENABLED)
Expand Down Expand Up @@ -83,6 +101,12 @@ add_subdirectory(${FLB_PATH_LIB_RDKAFKA} EXCLUDE_FROM_ALL)

set(KAFKA_LIBRARIES "rdkafka")

# Add SASL libraries if cyrus-sasl is enabled
if(FLB_SASL_CYRUS_ENABLED AND SASL_LIBRARIES)
list(APPEND KAFKA_LIBRARIES ${SASL_LIBRARIES})
message(STATUS "Added SASL libraries to Kafka: ${SASL_LIBRARIES}")
endif()

# Summary of what's enabled
message(STATUS "=== Kafka Feature Summary ===")
message(STATUS "SASL Auth: ${FLB_SASL_ENABLED}")
Expand Down
216 changes: 216 additions & 0 deletions examples/kafka_filter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Fluent Bit Kafka Examples

This directory contains examples for using Fluent Bit with Apache Kafka, including support for AWS MSK (Managed Streaming for Apache Kafka) with IAM authentication.

## Examples

### 1. Basic Kafka Example (`kafka.conf`)

A simple example demonstrating Kafka input and output with a Lua filter.

**Features:**

- Kafka consumer input
- Lua filter for message transformation
- Kafka producer output

**Usage:**

```bash
docker-compose up
```

### 2. AWS MSK IAM Authentication (`kafka_msk_iam.conf`)

Comprehensive examples for AWS MSK with IAM authentication, covering various deployment scenarios.

**Scenarios covered:**

- Standard MSK cluster (auto-detected region)
- MSK via PrivateLink (explicit region)
- MSK Serverless (auto-detected region)
- VPC Endpoint (auto-detected region)

## AWS MSK IAM Authentication

### Overview

AWS MSK supports IAM authentication, which eliminates the need to manage separate credentials for Kafka. Fluent Bit seamlessly integrates with AWS MSK IAM authentication.

### Configuration

Enable MSK IAM authentication by setting:

```ini
rdkafka.sasl.mechanism aws_msk_iam
```

### Region Detection

Fluent Bit can automatically detect the AWS region from standard MSK broker hostnames:

- `b-1.example.kafka.us-east-1.amazonaws.com` → region: `us-east-1`
- `boot-abc.kafka-serverless.us-west-2.amazonaws.com` → region: `us-west-2`
- `vpce-123.kafka.eu-west-1.vpce.amazonaws.com` → region: `eu-west-1`

Comment thread
coderabbitai[bot] marked this conversation as resolved.
### Custom DNS / PrivateLink

When using PrivateLink aliases or custom DNS names that don't contain `.amazonaws.com`, you **must** explicitly specify the region:

```ini
[OUTPUT]
Name kafka
Match *
brokers my-privatelink-alias.internal.example.com:9098
topics my-topic
rdkafka.sasl.mechanism aws_msk_iam
aws_region us-east-1 # REQUIRED for custom DNS
```

### AWS Credentials

MSK IAM authentication uses the standard AWS credentials chain:

1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
2. EC2 instance profile / ECS task role (recommended for production)
3. AWS credentials file (`~/.aws/credentials`)

### Required IAM Permissions

Your IAM role or user needs the following permissions:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster",
"kafka-cluster:ReadData",
"kafka-cluster:WriteData"
],
"Resource": [
"arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID",
"arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/*",
"arn:aws:kafka:REGION:ACCOUNT:group/CLUSTER_NAME/CLUSTER_UUID/*"
]
}
]
}
```

**Note:** The cluster UUID can be found via the AWS Console, the DescribeCluster API, or the AWS CLI (`aws kafka describe-cluster`).

**Note:** Adjust permissions based on your use case:

- Consumers need: `Connect`, `DescribeCluster`, `ReadData`
- Producers need: `Connect`, `WriteData`

Comment thread
coderabbitai[bot] marked this conversation as resolved.
## Configuration Parameters

### Common Parameters

| Parameter | Description | Required |
| ------------------------ | ------------------------------------- | ------------------- |
| `brokers` | Comma-separated list of Kafka brokers | Yes |
| `topics` | Topic name(s) for input or output | Yes |
| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` for MSK IAM auth | For MSK IAM |
| `aws_region` | AWS region (auto-detected if not set) | Only for custom DNS |
| `group_id` | Consumer group ID | For input |

### Additional librdkafka Parameters

You can pass any librdkafka configuration using the `rdkafka.` prefix:

```ini
rdkafka.socket.timeout.ms 60000
rdkafka.metadata.max.age.ms 180000
rdkafka.request.timeout.ms 30000
```

For a complete list of parameters, see the [librdkafka configuration documentation](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).

## Testing

### Local Kafka (Docker)

1. Start the Kafka stack:

```bash
cd examples/kafka_filter
docker-compose up -d
```

2. Run Fluent Bit:

```bash
fluent-bit -c kafka.conf
```

3. Produce test messages:

```bash
./scripts/kafka-produce.sh
```

4. Consume messages:
```bash
./scripts/kafka-consume.sh
```

### AWS MSK

1. Update `kafka_msk_iam.conf` with your MSK cluster details
2. Ensure AWS credentials are configured
3. Run Fluent Bit:
```bash
fluent-bit -c kafka_msk_iam.conf
```

## Troubleshooting

### Authentication Failures

**Error:** `failed to setup MSK IAM authentication OAuth callback`

**Solutions:**

- For custom DNS/PrivateLink: Add `aws_region` parameter
- Verify AWS credentials are available
- Check IAM permissions

### Region Detection Issues

**Error:** `failed to auto-detect region from broker address`

**Solution:**
Explicitly set the region:

```ini
aws_region us-east-1
```

### Connection Timeouts

**Solution:**
Increase timeout values:

```ini
rdkafka.socket.timeout.ms 60000
rdkafka.metadata.max.age.ms 180000
```

## Additional Resources

- [Fluent Bit Kafka Documentation](https://docs.fluentbit.io/)
- [AWS MSK IAM Access Control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)
- [librdkafka Configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)

## Support

For issues or questions:

- [Fluent Bit GitHub Issues](https://github.com/fluent/fluent-bit/issues)
- [Fluent Bit Slack Community](https://fluentbit.io/slack)
Loading
Loading