summary refs log tree commit diff
path: root/nixos/modules/services/misc/apache-kafka.nix
diff options
context:
space:
mode:
Diffstat (limited to 'nixos/modules/services/misc/apache-kafka.nix')
-rw-r--r--nixos/modules/services/misc/apache-kafka.nix151
1 files changed, 151 insertions, 0 deletions
diff --git a/nixos/modules/services/misc/apache-kafka.nix b/nixos/modules/services/misc/apache-kafka.nix
new file mode 100644
index 00000000000..d1856fff4aa
--- /dev/null
+++ b/nixos/modules/services/misc/apache-kafka.nix
@@ -0,0 +1,151 @@
+{ config, lib, pkgs, ... }:
+
+with lib;
+
+let
+  cfg = config.services.apache-kafka;
+
+  serverProperties =
+    if cfg.serverProperties != null then
+      cfg.serverProperties
+    else
+      ''
+        # Generated by nixos
+        broker.id=${toString cfg.brokerId}
+        port=${toString cfg.port}
+        host.name=${cfg.hostname}
+        log.dirs=${concatStringsSep "," cfg.logDirs}
+        zookeeper.connect=${cfg.zookeeper}
+        ${toString cfg.extraProperties}
+      '';
+
+  serverConfig = pkgs.writeText "server.properties" serverProperties;
+  logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
+
+in {
+
+  options.services.apache-kafka = {
+    enable = mkOption {
+      description = "Whether to enable Apache Kafka.";
+      default = false;
+      type = types.bool;
+    };
+
+    brokerId = mkOption {
+      description = "Broker ID.";
+      default = -1;
+      type = types.int;
+    };
+
+    port = mkOption {
+      description = "Port number the broker should listen on.";
+      default = 9092;
+      type = types.int;
+    };
+
+    hostname = mkOption {
+      description = "Hostname the broker should bind to.";
+      default = "localhost";
+      type = types.str;
+    };
+
+    logDirs = mkOption {
+      description = "Log file directories";
+      default = [ "/tmp/kafka-logs" ];
+      type = types.listOf types.path;
+    };
+
+    zookeeper = mkOption {
+      description = "Zookeeper connection string";
+      default = "localhost:2181";
+      type = types.str;
+    };
+
+    extraProperties = mkOption {
+      description = "Extra properties for server.properties.";
+      type = types.nullOr types.lines;
+      default = null;
+    };
+
+    serverProperties = mkOption {
+      description = ''
+        Complete server.properties content. Other server.properties config
+        options will be ignored if this option is used.
+      '';
+      type = types.nullOr types.lines;
+      default = null;
+    };
+
+    log4jProperties = mkOption {
+      description = "Kafka log4j property configuration.";
+      default = ''
+        log4j.rootLogger=INFO, stdout
+
+        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+        log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+      '';
+      type = types.lines;
+    };
+
+    jvmOptions = mkOption {
+      description = "Extra command line options for the JVM running Kafka.";
+      default = [];
+      type = types.listOf types.str;
+      example = [
+        "-Djava.net.preferIPv4Stack=true"
+        "-Dcom.sun.management.jmxremote"
+        "-Dcom.sun.management.jmxremote.local.only=true"
+      ];
+    };
+
+    package = mkOption {
+      description = "The kafka package to use";
+      default = pkgs.apacheKafka;
+      defaultText = literalExpression "pkgs.apacheKafka";
+      type = types.package;
+    };
+
+    jre = mkOption {
+      description = "The JRE with which to run Kafka";
+      default = cfg.package.passthru.jre;
+      defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
+      type = types.package;
+    };
+
+  };
+
+  config = mkIf cfg.enable {
+
+    environment.systemPackages = [cfg.package];
+
+    users.users.apache-kafka = {
+      isSystemUser = true;
+      group = "apache-kafka";
+      description = "Apache Kafka daemon user";
+      home = head cfg.logDirs;
+    };
+    users.groups.apache-kafka = {};
+
+    systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
+
+    systemd.services.apache-kafka = {
+      description = "Apache Kafka Daemon";
+      wantedBy = [ "multi-user.target" ];
+      after = [ "network.target" ];
+      serviceConfig = {
+        ExecStart = ''
+          ${cfg.jre}/bin/java \
+            -cp "${cfg.package}/libs/*" \
+            -Dlog4j.configuration=file:${logConfig} \
+            ${toString cfg.jvmOptions} \
+            kafka.Kafka \
+            ${serverConfig}
+        '';
+        User = "apache-kafka";
+        SuccessExitStatus = "0 143";
+      };
+    };
+
+  };
+}