Próbuje przy pomocy AdminClient zdefiniowac ACL dla Kafki:
KafkaPrincipal principal = new KafkaPrincipal("User", "test-producer");
ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "test", PatternType.LITERAL);
AclBinding aclBinding = new AclBinding(resourcePattern, new AccessControlEntry(principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
CreateAclsResult result = admin.createAcls(Collections.singletonList(aclBinding));
result.all().get();
zgodnie z dokumentacją w pliku server.properties zdefiniowałem też:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
ale przy wywołaniu kodu dostaje błąd:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at com.comarch.clm6.lib.businessConfiguration.PrepareACL.main(PrepareACL.java:31)
Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured.