summary refs log blame commit diff
path: root/nixos/modules/services/misc/apache-kafka.nix
blob: 168615153fedc6a407fc35a3243a83fd7179d786 (plain) (tree)



























































































































































                                                                            
{ config, lib, pkgs, ... }:

with lib;

let
  cfg = config.services.apache-kafka;

  serverProperties =
    if cfg.serverProperties != null then
      cfg.serverProperties
    else
      ''
        # Generated by nixos
        broker.id=${toString cfg.brokerId}
        port=${toString cfg.port}
        host.name=${cfg.hostname}
        log.dirs=${concatStringsSep "," cfg.logDirs}
        zookeeper.connect=${cfg.zookeeper}
        ${toString cfg.extraProperties}
      '';

  configDir = pkgs.buildEnv {
    name = "apache-kafka-conf";
    paths = [
      (pkgs.writeTextDir "server.properties" serverProperties)
      (pkgs.writeTextDir "log4j.properties" cfg.log4jProperties)
    ];
  };

in {

  options.services.apache-kafka = {
    enable = mkOption {
      description = "Whether to enable Apache Kafka.";
      default = false;
      type = types.uniq types.bool;
    };

    brokerId = mkOption {
      description = "Broker ID.";
      default = 0;
      type = types.int;
    };

    port = mkOption {
      description = "Port number the broker should listen on.";
      default = 9092;
      type = types.int;
    };

    hostname = mkOption {
      description = "Hostname the broker should bind to.";
      default = "localhost";
      type = types.string;
    };

    logDirs = mkOption {
      description = "Log file directories";
      default = [ "/tmp/kafka-logs" ];
      type = types.listOf types.path;
    };
    
    zookeeper = mkOption {
      description = "Zookeeper connection string";
      default = "localhost:2181";
      type = types.string;
    };
 
    extraProperties = mkOption {
      description = "Extra properties for server.properties.";
      type = types.nullOr types.lines;
      default = null;
    };

    serverProperties = mkOption {
      description = ''
        Complete server.properties content. Other server.properties config
        options will be ignored if this option is used.
      '';
      type = types.nullOr types.lines;
      default = null;
    };

    log4jProperties = mkOption {
      description = "Kafka log4j property configuration.";
      default = ''
        log4j.rootLogger=INFO, stdout 
        
        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
        log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
      '';
      type = types.lines;
    };

    jvmOptions = mkOption {
      description = "Extra command line options for the JVM running Kafka.";
      default = [
        "-server"
        "-Xmx1G"
        "-Xms1G"
        "-XX:+UseCompressedOops"
        "-XX:+UseParNewGC"
        "-XX:+UseConcMarkSweepGC"
        "-XX:+CMSClassUnloadingEnabled"
        "-XX:+CMSScavengeBeforeRemark"
        "-XX:+DisableExplicitGC"
        "-Djava.awt.headless=true"
        "-Djava.net.preferIPv4Stack=true"
      ];
      type = types.listOf types.string;
      example = [
        "-Djava.net.preferIPv4Stack=true"
        "-Dcom.sun.management.jmxremote"
        "-Dcom.sun.management.jmxremote.local.only=true"
      ];
    };

  };

  config = mkIf cfg.enable {

    environment.systemPackages = [pkgs.apacheKafka];

    users.extraUsers = singleton {
      name = "apache-kafka";
      uid = config.ids.uids.apache-kafka;
      description = "Apache Kafka daemon user";
      home = head cfg.logDirs;
    };

    systemd.services.apache-kafka = {
      description = "Apache Kafka Daemon";
      wantedBy = [ "multi-user.target" ];
      after = [ "network-interfaces.target" ];
      serviceConfig = {
        ExecStart = ''
          ${pkgs.jre}/bin/java \
            -cp "${pkgs.apacheKafka}/libs/*:${configDir}" \
            ${toString cfg.jvmOptions} \
            kafka.Kafka \
            ${configDir}/server.properties
        '';
        User = "apache-kafka";
        PermissionsStartOnly = true;
      };
      preStart = ''
        mkdir -m 0700 -p ${concatStringsSep " " cfg.logDirs}
        if [ "$(id -u)" = 0 ]; then
           chown apache-kafka ${concatStringsSep " " cfg.logDirs};
        fi
      '';
    };

  };
}