Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 11
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '11'
java-version: '17'
cache: 'gradle'
- name: Download opa
run: wget -O opa https://openpolicyagent.org/downloads/latest/opa_linux_amd64
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Open Policy Agent (OPA) plugin for Kafka authorization.
### Prerequisites

* Kafka 2.7.0+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true? i.e. you can still drop this jar into a Kafka that old?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The code itself is not really changed much. But the changes around Jackson dependencies make it compatible with Kafka 3.8 and newer (because of the Jackson versions it uses). I guess that should be fine as there are no real new feature here and users of older Kafka versions can use the previous release of the OPA Authorizer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fine with me 👍

* Java 11 or above
* Java 17 or above
* OPA installed and running on the brokers

## Installation
Expand Down
22 changes: 12 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ group 'org.openpolicyagent.kafka'
version '1.5.1'

java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
withJavadocJar()
withSourcesJar()
}
Expand All @@ -21,26 +21,28 @@ repositories {
mavenCentral()
}

// See versions used in Kafka here https://github.com/apache/kafka/blob/2.8.0/gradle/dependencies.gradle
// See versions used in Kafka here https://github.com/apache/kafka/blob/4.0.0/gradle/dependencies.gradle
dependencies {
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.13', version: '2.10.5'
compileOnly group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.8.0'
compileOnly group: 'org.apache.kafka', name: 'kafka_2.13', version: '4.0.0'
compileOnly group: 'com.typesafe.scala-logging', name: 'scala-logging_2.13', version: '3.9.5'
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.13', version: '2.16.2'
implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'

testImplementation group: 'org.scalatest', name: 'scalatest_2.13', version: '3.2.17'
testImplementation group: 'org.scalatestplus', name: 'junit-4-13_2.13', version: '3.2.17.0'
testImplementation group: 'junit', name: 'junit', version: '4.12'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.14.0'
testImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.8.0'
testImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '4.0.0'
testImplementation group: 'org.apache.kafka', name: 'kafka-server', version: '4.0.0'
testImplementation group: 'com.typesafe.scala-logging', name: 'scala-logging_2.13', version: '3.9.5'
}

shadowJar {
dependencies {
exclude(dependency {
!(it.moduleGroup in [
'org.openpolicyagent.kafka',
'com.google.guava'
])
!(it.moduleGroup in ['org.openpolicyagent.kafka', 'com.google.guava']
|| (it.moduleGroup == 'com.fasterxml.jackson.module' && it.moduleName == 'jackson-module-scala_2.13')
|| (it.moduleGroup == 'com.thoughtworks.paranamer' && it.moduleName == 'paranamer'))
})
}
}
Expand Down
26 changes: 11 additions & 15 deletions example/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,29 @@ services:
- "--set=bundles.authz.resource=bundle.tar.gz"
depends_on:
- nginx
zookeeper:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

image: confluentinc/cp-zookeeper:6.2.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
broker:
# If experiencing hangs on darwin/arm64, explicitly setting the platform here seems to help
# platform: linux/amd64
image: confluentinc/cp-kafka:6.2.1
image: apache/kafka:4.0.0
ports:
- "9093:9093"
- "9093:9093"
environment:
CLASSPATH: "/plugin/*"
KAFKA_AUTHORIZER_CLASS_NAME: org.openpolicyagent.kafka.OpaAuthorizer
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10 # For development only
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: CONTROLLER://broker:9092,SSL://broker:9093
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: SSL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,SSL:SSL
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_SSL_KEYSTORE_FILENAME: server.keystore
KAFKA_SSL_KEYSTORE_CREDENTIALS: credentials.txt
Expand All @@ -57,4 +54,3 @@ services:
- "./cert/server:/etc/kafka/secrets"
depends_on:
- opa
- zookeeper
33 changes: 14 additions & 19 deletions example/opa_tutorial/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,36 @@ services:
- "--set=bundles.authz.resource=bundle.tar.gz"
depends_on:
- nginx
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
broker:
image: confluentinc/cp-kafka:6.2.1
ports:
- "9093:9093"
environment:
# Set cache expiry to low value for development in order to see decisions
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
CLASSPATH: "/plugin/*"
KAFKA_AUTHORIZER_CLASS_NAME: org.openpolicyagent.kafka.OpaAuthorizer
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: SSL://broker:9093
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_OPA_AUTHORIZER_URL: http://opa:8181/v1/data/kafka/authz/allow
KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_SECONDS: 10 # For development only
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: CONTROLLER://broker:9092,SSL://broker:9093
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: SSL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,SSL:SSL
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_SSL_KEYSTORE_FILENAME: server.keystore
KAFKA_SSL_KEYSTORE_CREDENTIALS: credentials.txt
KAFKA_SSL_KEY_CREDENTIALS: credentials.txt
KAFKA_SSL_TRUSTSTORE_FILENAME: server.truststore
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: credentials.txt
KAFKA_SSL_CLIENT_AUTH: required
CLASSPATH: "/plugin/*"
volumes:
- "./plugin:/plugin"
- "./cert/server:/etc/kafka/secrets"
depends_on:
- opa
- zookeeper
- opa
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ case class AzServerInfo(
brokerId: Int,
clusterResource: ClusterResource,
endpoints: Collection[Endpoint],
interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo
interBrokerEndpoint: Endpoint,
earlyStartListeners: Collection[String]) extends AuthorizerServerInfo
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package org.openpolicyagent.kafka
import java.net.InetAddress
import java.util
import java.util.concurrent.TimeUnit

import kafka.network.RequestChannel
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.resource.ResourceType.TOPIC
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.Action

import scala.jdk.CollectionConverters._

object OpaAuthorizerBenchmark {
Expand Down Expand Up @@ -45,7 +45,7 @@ class OpaAuthorizerBenchmark {

def createRequest = {
val principal = new KafkaPrincipal("User", "user-" + new scala.util.Random().nextInt())
val session = RequestChannel.Session(principal, InetAddress.getLoopbackAddress)
val session = new Session(principal, InetAddress.getLoopbackAddress)
val resource = new ResourcePattern(TOPIC, "my-topic", PatternType.LITERAL)
val authzReqContext = new AzRequestContext(
clientId = "rdkafka",
Expand Down
23 changes: 17 additions & 6 deletions src/test/scala/org/openpolicyagent/kafka/OpaAuthorizerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.openpolicyagent.kafka

import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.databind.module.SimpleModule

import java.net.{InetAddress, URI}
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import org.junit.runner.RunWith
import org.scalatestplus.junit.JUnitRunner
import com.typesafe.scalalogging.LazyLogging
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import kafka.network.RequestChannel
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.ApiKeys
Expand All @@ -18,10 +19,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult}
import org.scalatest._
import matchers.should._
import flatspec._
import org.apache.kafka.common.message.RequestHeaderData

import java.lang.management.ManagementFactory
import javax.management.ObjectName
Expand All @@ -35,7 +38,15 @@ import scala.jdk.CollectionConverters._
class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTester with LazyLogging {

private val opaUrl = "http://localhost:8181/v1/data/kafka/authz/allow"
private val objectMapper = (new ObjectMapper() with ScalaObjectMapper).registerModule(DefaultScalaModule)
private val requestSerializerModule = new SimpleModule()
.addSerializer(classOf[ResourcePattern], new ResourcePatternSerializer)
.addSerializer(classOf[Action], new ActionSerializer)
.addSerializer(classOf[RequestContext], new RequestContextSerializer)
.addSerializer(classOf[ClientInformation], new ClientInformationSerializer)
.addSerializer(classOf[KafkaPrincipal], new KafkaPrincipalSerializer)
.addSerializer(classOf[RequestHeader], new RequestHeaderSerializer)
.addSerializer(classOf[RequestHeaderData], new RequestHeaderDataSerializer)
private val objectMapper = JsonMapper.builder().addModule(requestSerializerModule).addModule(DefaultScalaModule).build()
private val defaultCacheCapacity = 50000
private lazy val opaResponse = testOpaConnection()

Expand Down Expand Up @@ -295,8 +306,8 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest

def createRequest(username: String, actions: List[Action]): FullRequest = {
val principal = new KafkaPrincipal("User", username)
val session = RequestChannel.Session(principal, InetAddress.getLoopbackAddress)
val authzReqContext = new AzRequestContext(
val session = new Session(principal, InetAddress.getLoopbackAddress)
val authzReqContext = AzRequestContext(
clientId = "rdkafka",
requestType = 1,
listenerName = "SASL_PLAINTEXT",
Expand Down