diff --git a/nixos/tests/kafka.nix b/nixos/tests/kafka.nix index 8635cbd3ff63..f4f9827ab7b5 100644 --- a/nixos/tests/kafka.nix +++ b/nixos/tests/kafka.nix @@ -6,13 +6,62 @@ with pkgs.lib; let - makeKafkaTest = name: kafkaPackage: (import ./make-test-python.nix ({ + makeKafkaTest = name: { kafkaPackage, mode ? "zookeeper" }: (import ./make-test-python.nix ({ inherit name; meta = with pkgs.lib.maintainers; { maintainers = [ nequissimus ]; }; nodes = { + kafka = { ... }: { + services.apache-kafka = mkMerge [ + ({ + enable = true; + package = kafkaPackage; + settings = { + "offsets.topic.replication.factor" = 1; + "log.dirs" = [ + "/var/lib/kafka/logdir1" + "/var/lib/kafka/logdir2" + ]; + }; + }) + (mkIf (mode == "zookeeper") { + settings = { + "zookeeper.session.timeout.ms" = 600000; + "zookeeper.connect" = [ "zookeeper1:2181" ]; + }; + }) + (mkIf (mode == "kraft") { + clusterId = "ak2fIHr4S8WWarOF_ODD0g"; + formatLogDirs = true; + settings = { + "node.id" = 1; + "process.roles" = [ + "broker" + "controller" + ]; + "listeners" = [ + "PLAINTEXT://:9092" + "CONTROLLER://:9093" + ]; + "listener.security.protocol.map" = [ + "PLAINTEXT:PLAINTEXT" + "CONTROLLER:PLAINTEXT" + ]; + "controller.quorum.voters" = [ + "1@kafka:9093" + ]; + "controller.listener.names" = [ "CONTROLLER" ]; + }; + }) + ]; + + networking.firewall.allowedTCPPorts = [ 9092 9093 ]; + # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048) + virtualisation.memorySize = 2047; + }; + } // optionalAttrs (mode == "zookeeper") { zookeeper1 = { ... }: { services.zookeeper = { enable = true; @@ -20,31 +69,16 @@ let networking.firewall.allowedTCPPorts = [ 2181 ]; }; - kafka = { ... }: { - services.apache-kafka = { - enable = true; - settings = { - "offsets.topic.replication.factor" = 1; - "zookeeper.session.timeout.ms" = 600000; - "zookeeper.connect" = [ "zookeeper1:2181" ]; - "log.dirs" = [ "/tmp/apache-kafka" ]; - }; - - package = kafkaPackage; - }; - - networking.firewall.allowedTCPPorts = [ 9092 ]; - # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048) - virtualisation.memorySize = 2047; - }; }; testScript = '' start_all() + ${optionalString (mode == "zookeeper") '' zookeeper1.wait_for_unit("default.target") zookeeper1.wait_for_unit("zookeeper.service") zookeeper1.wait_for_open_port(2181) + ''} kafka.wait_for_unit("default.target") kafka.wait_for_unit("apache-kafka.service") @@ -69,12 +103,13 @@ let }) { inherit system; }); in with pkgs; { - kafka_2_8 = makeKafkaTest "kafka_2_8" apacheKafka_2_8; - kafka_3_0 = makeKafkaTest "kafka_3_0" apacheKafka_3_0; - kafka_3_1 = makeKafkaTest "kafka_3_1" apacheKafka_3_1; - kafka_3_2 = makeKafkaTest "kafka_3_2" apacheKafka_3_2; - kafka_3_3 = makeKafkaTest "kafka_3_3" apacheKafka_3_3; - kafka_3_4 = makeKafkaTest "kafka_3_4" apacheKafka_3_4; - kafka_3_5 = makeKafkaTest "kafka_3_5" apacheKafka_3_5; - kafka = makeKafkaTest "kafka" apacheKafka; + kafka_2_8 = makeKafkaTest "kafka_2_8" { kafkaPackage = apacheKafka_2_8; }; + kafka_3_0 = makeKafkaTest "kafka_3_0" { kafkaPackage = apacheKafka_3_0; }; + kafka_3_1 = makeKafkaTest "kafka_3_1" { kafkaPackage = apacheKafka_3_1; }; + kafka_3_2 = makeKafkaTest "kafka_3_2" { kafkaPackage = apacheKafka_3_2; }; + kafka_3_3 = makeKafkaTest "kafka_3_3" { kafkaPackage = apacheKafka_3_3; }; + kafka_3_4 = makeKafkaTest "kafka_3_4" { kafkaPackage = apacheKafka_3_4; }; + kafka_3_5 = makeKafkaTest "kafka_3_5" { kafkaPackage = apacheKafka_3_5; }; + kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; }; + kafka_kraft = makeKafkaTest "kafka_kraft" { kafkaPackage = apacheKafka; mode = "kraft"; }; }