summary refs log blame commit diff
path: root/nixos/modules/services/misc/apache-kafka.nix
blob: 69dfadfe54e0dfcdc640015fca31f033cb1a50d0 (plain) (tree)




















                                                    

                                                                     






                                                      
                        



                                 
                   











                                                               
                       






                                           
 


                                                  
                       
      
 

















                                                                          

                                     








                                                                            
                   
                                    






                                                        

                                               
                                 
                                       


                           






                                                      



                            
                                               
 
                                




                                               
                                                                                               
 


                                          
                                   

                       
                               

                                                     

                                        
                           

                              
                                    
        



      
{ config, lib, pkgs, ... }:

with lib;

let
  cfg = config.services.apache-kafka;

  serverProperties =
    if cfg.serverProperties != null then
      cfg.serverProperties
    else
      ''
        # Generated by nixos
        broker.id=${toString cfg.brokerId}
        port=${toString cfg.port}
        host.name=${cfg.hostname}
        log.dirs=${concatStringsSep "," cfg.logDirs}
        zookeeper.connect=${cfg.zookeeper}
        ${toString cfg.extraProperties}
      '';

  serverConfig = pkgs.writeText "server.properties" serverProperties;
  logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;

in {

  options.services.apache-kafka = {
    enable = mkOption {
      description = "Whether to enable Apache Kafka.";
      default = false;
      type = types.bool;
    };

    brokerId = mkOption {
      description = "Broker ID.";
      default = -1;
      type = types.int;
    };

    port = mkOption {
      description = "Port number the broker should listen on.";
      default = 9092;
      type = types.int;
    };

    hostname = mkOption {
      description = "Hostname the broker should bind to.";
      default = "localhost";
      type = types.str;
    };

    logDirs = mkOption {
      description = "Log file directories";
      default = [ "/tmp/kafka-logs" ];
      type = types.listOf types.path;
    };

    zookeeper = mkOption {
      description = "Zookeeper connection string";
      default = "localhost:2181";
      type = types.str;
    };

    extraProperties = mkOption {
      description = "Extra properties for server.properties.";
      type = types.nullOr types.lines;
      default = null;
    };

    serverProperties = mkOption {
      description = ''
        Complete server.properties content. Other server.properties config
        options will be ignored if this option is used.
      '';
      type = types.nullOr types.lines;
      default = null;
    };

    log4jProperties = mkOption {
      description = "Kafka log4j property configuration.";
      default = ''
        log4j.rootLogger=INFO, stdout

        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
        log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
      '';
      type = types.lines;
    };

    jvmOptions = mkOption {
      description = "Extra command line options for the JVM running Kafka.";
      default = [];
      type = types.listOf types.str;
      example = [
        "-Djava.net.preferIPv4Stack=true"
        "-Dcom.sun.management.jmxremote"
        "-Dcom.sun.management.jmxremote.local.only=true"
      ];
    };

    package = mkOption {
      description = "The kafka package to use";
      default = pkgs.apacheKafka;
      defaultText = "pkgs.apacheKafka";
      type = types.package;
    };

    jre = mkOption {
      description = "The JRE with which to run Kafka";
      default = cfg.package.passthru.jre;
      defaultText = "pkgs.apacheKafka.passthru.jre";
      type = types.package;
    };

  };

  config = mkIf cfg.enable {

    environment.systemPackages = [cfg.package];

    users.users.apache-kafka = {
      uid = config.ids.uids.apache-kafka;
      description = "Apache Kafka daemon user";
      home = head cfg.logDirs;
    };

    systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;

    systemd.services.apache-kafka = {
      description = "Apache Kafka Daemon";
      wantedBy = [ "multi-user.target" ];
      after = [ "network.target" ];
      serviceConfig = {
        ExecStart = ''
          ${cfg.jre}/bin/java \
            -cp "${cfg.package}/libs/*" \
            -Dlog4j.configuration=file:${logConfig} \
            ${toString cfg.jvmOptions} \
            kafka.Kafka \
            ${serverConfig}
        '';
        User = "apache-kafka";
        SuccessExitStatus = "0 143";
      };
    };

  };
}