summary refs log tree commit diff
diff options
context:
space:
mode:
authorillustris <rharikrishnan95@gmail.com>2021-11-02 11:59:58 +0530
committerillustris <rharikrishnan95@gmail.com>2021-11-03 22:44:24 +0530
commit39c007ce9cc3f46db70bde794323b6cbfccfe62a (patch)
tree716256adcb8ae25afb1224dc20190e6f99d8fc70
parent455e7f19d005a52a2687d2996dbc94efd8434a40 (diff)
downloadnixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.tar
nixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.tar.gz
nixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.tar.bz2
nixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.tar.lz
nixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.tar.xz
nixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.tar.zst
nixpkgs-39c007ce9cc3f46db70bde794323b6cbfccfe62a.zip
nixos/hadoop: Add HA capabilities
- Add HDFS journalnode and ZKFC services
- Test failover of HDFS and YARN master services in full hadoop test
- Check if a minimal HDFS cluster works in the minimal HDFS test
-rw-r--r--nixos/modules/services/cluster/hadoop/hdfs.nix76
-rw-r--r--nixos/modules/services/cluster/hadoop/yarn.nix1
-rw-r--r--nixos/tests/hadoop/hadoop.nix236
-rw-r--r--nixos/tests/hadoop/hdfs.nix19
-rw-r--r--nixos/tests/hadoop/yarn.nix11
5 files changed, 282 insertions, 61 deletions
diff --git a/nixos/modules/services/cluster/hadoop/hdfs.nix b/nixos/modules/services/cluster/hadoop/hdfs.nix
index e347b682b90..961aa35a4b1 100644
--- a/nixos/modules/services/cluster/hadoop/hdfs.nix
+++ b/nixos/modules/services/cluster/hadoop/hdfs.nix
@@ -24,6 +24,15 @@ in
           Whether to run the HDFS NameNode
         '';
       };
+      formatOnInit = mkOption {
+        type = types.bool;
+        default = false;
+        description = ''
+          Format HDFS namenode on first start. This is useful for quickly spinning up ephemeral HDFS clusters with a single namenode.
+          For HA clusters, initialization involves multiple steps across multiple nodes. Follow [this guide](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html)
+          to initialize an HA cluster manually.
+        '';
+      };
       inherit restartIfChanged;
       openFirewall = mkOption {
         type = types.bool;
@@ -50,6 +59,33 @@ in
         '';
       };
     };
+    journalnode = {
+      enabled = mkOption {
+        type = types.bool;
+        default = false;
+        description = ''
+          Whether to run the HDFS JournalNode
+        '';
+      };
+      inherit restartIfChanged;
+      openFirewall = mkOption {
+        type = types.bool;
+        default = true;
+        description = ''
+          Open firewall ports for journalnode
+        '';
+      };
+    };
+    zkfc = {
+      enabled = mkOption {
+        type = types.bool;
+        default = false;
+        description = ''
+          Whether to run the HDFS ZooKeeper failover controller
+        '';
+      };
+      inherit restartIfChanged;
+    };
   };
 
   config = mkMerge [
@@ -59,9 +95,9 @@ in
         wantedBy = [ "multi-user.target" ];
         inherit (cfg.hdfs.namenode) restartIfChanged;
 
-        preStart = ''
+        preStart = (mkIf cfg.hdfs.namenode.formatOnInit ''
           ${cfg.package}/bin/hdfs --config ${hadoopConf} namenode -format -nonInteractive || true
-        '';
+        '');
 
         serviceConfig = {
           User = "hdfs";
@@ -74,6 +110,7 @@ in
       networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.namenode.openFirewall [
         9870 # namenode.http-address
         8020 # namenode.rpc-address
+        8022 # namenode. servicerpc-address
       ]);
     })
     (mkIf cfg.hdfs.datanode.enabled {
@@ -96,8 +133,41 @@ in
         9867 # datanode.ipc.address
       ]);
     })
+    (mkIf cfg.hdfs.journalnode.enabled {
+      systemd.services.hdfs-journalnode = {
+        description = "Hadoop HDFS JournalNode";
+        wantedBy = [ "multi-user.target" ];
+        inherit (cfg.hdfs.journalnode) restartIfChanged;
+
+        serviceConfig = {
+          User = "hdfs";
+          SyslogIdentifier = "hdfs-journalnode";
+          ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} journalnode";
+          Restart = "always";
+        };
+      };
+
+      networking.firewall.allowedTCPPorts = (mkIf cfg.hdfs.datanode.openFirewall [
+        8480 # dfs.journalnode.http-address
+        8485 # dfs.journalnode.rpc-address
+      ]);
+    })
+    (mkIf cfg.hdfs.zkfc.enabled {
+      systemd.services.hdfs-zkfc = {
+        description = "Hadoop HDFS ZooKeeper failover controller";
+        wantedBy = [ "multi-user.target" ];
+        inherit (cfg.hdfs.zkfc) restartIfChanged;
+
+        serviceConfig = {
+          User = "hdfs";
+          SyslogIdentifier = "hdfs-zkfc";
+          ExecStart = "${cfg.package}/bin/hdfs --config ${hadoopConf} zkfc";
+          Restart = "always";
+        };
+      };
+    })
     (mkIf (
-        cfg.hdfs.namenode.enabled || cfg.hdfs.datanode.enabled
+        cfg.hdfs.namenode.enabled || cfg.hdfs.datanode.enabled || cfg.hdfs.journalnode.enabled || cfg.hdfs.zkfc.enabled
     ) {
       users.users.hdfs = {
         description = "Hadoop HDFS user";
diff --git a/nixos/modules/services/cluster/hadoop/yarn.nix b/nixos/modules/services/cluster/hadoop/yarn.nix
index 0086a53e3b7..3d27ad8b109 100644
--- a/nixos/modules/services/cluster/hadoop/yarn.nix
+++ b/nixos/modules/services/cluster/hadoop/yarn.nix
@@ -91,6 +91,7 @@ in
         8030 # resourcemanager.scheduler.address
         8031 # resourcemanager.resource-tracker.address
         8032 # resourcemanager.address
+        8033 # resourcemanager.admin.address
       ]);
     })
 
diff --git a/nixos/tests/hadoop/hadoop.nix b/nixos/tests/hadoop/hadoop.nix
index 46dfac26e06..2a55c6fc3aa 100644
--- a/nixos/tests/hadoop/hadoop.nix
+++ b/nixos/tests/hadoop/hadoop.nix
@@ -1,70 +1,230 @@
+# This test is very comprehensive. It tests whether all hadoop services work well with each other.
+# Run this when updating the Hadoop package or making significant changes to the hadoop module.
+# For a more basic test, see hdfs.nix and yarn.nix
 import ../make-test-python.nix ({pkgs, ...}: {
 
   nodes = let
     package = pkgs.hadoop;
     coreSite = {
-      "fs.defaultFS" = "hdfs://master";
+      "fs.defaultFS" = "hdfs://ns1";
+    };
+    hdfsSite = {
+      "dfs.namenode.rpc-bind-host" = "0.0.0.0";
+      "dfs.namenode.http-bind-host" = "0.0.0.0";
+      "dfs.namenode.servicerpc-bind-host" = "0.0.0.0";
+
+      # HA Quorum Journal Manager configuration
+      "dfs.nameservices" = "ns1";
+      "dfs.ha.namenodes.ns1" = "nn1,nn2";
+      "dfs.namenode.shared.edits.dir.ns1.nn1" = "qjournal://jn1:8485;jn2:8485;jn3:8485/ns1";
+      "dfs.namenode.shared.edits.dir.ns1.nn2" = "qjournal://jn1:8485;jn2:8485;jn3:8485/ns1";
+      "dfs.namenode.rpc-address.ns1.nn1" = "nn1:8020";
+      "dfs.namenode.rpc-address.ns1.nn2" = "nn2:8020";
+      "dfs.namenode.servicerpc-address.ns1.nn1" = "nn1:8022";
+      "dfs.namenode.servicerpc-address.ns1.nn2" = "nn2:8022";
+      "dfs.namenode.http-address.ns1.nn1" = "nn1:9870";
+      "dfs.namenode.http-address.ns1.nn2" = "nn2:9870";
+
+      # Automatic failover configuration
+      "dfs.client.failover.proxy.provider.ns1" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
+      "dfs.ha.automatic-failover.enabled.ns1" = "true";
+      "dfs.ha.fencing.methods" = "shell(true)";
+      "ha.zookeeper.quorum" = "zk1:2181";
+    };
+    yarnSiteHA = {
+      "yarn.resourcemanager.zk-address" = "zk1:2181";
+      "yarn.resourcemanager.ha.enabled" = "true";
+      "yarn.resourcemanager.ha.rm-ids" = "rm1,rm2";
+      "yarn.resourcemanager.hostname.rm1" = "rm1";
+      "yarn.resourcemanager.hostname.rm2" = "rm2";
+      "yarn.resourcemanager.ha.automatic-failover.enabled" = "true";
+      "yarn.resourcemanager.cluster-id" = "cluster1";
+      # yarn.resourcemanager.webapp.address needs to be defined even though yarn.resourcemanager.hostname is set. This shouldn't be necessary, but there's a bug in
+      # hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/amfilter/AmFilterInitializer.java:70
+      # that causes AM containers to fail otherwise.
+      "yarn.resourcemanager.webapp.address.rm1" = "rm1:8088";
+      "yarn.resourcemanager.webapp.address.rm2" = "rm2:8088";
     };
   in {
-    master = {pkgs, options, ...}: {
+    zk1 = { ... }: {
+      services.zookeeper.enable = true;
+      networking.firewall.allowedTCPPorts = [ 2181 ];
+    };
+
+    # HDFS cluster
+    nn1 = {pkgs, options, ...}: {
       services.hadoop = {
-        inherit package coreSite;
+        inherit package coreSite hdfsSite;
         hdfs.namenode.enabled = true;
-        yarn.resourcemanager.enabled = true;
+        hdfs.zkfc.enabled = true;
+      };
+    };
+    nn2 = {pkgs, options, ...}: {
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
+        hdfs.namenode.enabled = true;
+        hdfs.zkfc.enabled = true;
       };
-      virtualisation.memorySize = 1024;
     };
 
-    worker = {pkgs, options, ...}: {
+    jn1 = {pkgs, options, ...}: {
       services.hadoop = {
-        inherit package coreSite;
+        inherit package coreSite hdfsSite;
+        hdfs.journalnode.enabled = true;
+      };
+    };
+    jn2 = {pkgs, options, ...}: {
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
+        hdfs.journalnode.enabled = true;
+      };
+    };
+    jn3 = {pkgs, options, ...}: {
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
+        hdfs.journalnode.enabled = true;
+      };
+    };
+
+    dn1 = {pkgs, options, ...}: {
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
         hdfs.datanode.enabled = true;
-        yarn.nodemanager.enabled = true;
-        yarnSite = options.services.hadoop.yarnSite.default // {
-          "yarn.resourcemanager.hostname" = "master";
-        };
       };
+    };
+
+    # YARN cluster
+    rm1 = {pkgs, options, ...}: {
+      virtualisation.memorySize = 1024;
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
+        yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
+        yarn.resourcemanager.enabled = true;
+      };
+    };
+    rm2 = {pkgs, options, ...}: {
+      virtualisation.memorySize = 1024;
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
+        yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
+        yarn.resourcemanager.enabled = true;
+      };
+    };
+    nm1 = {pkgs, options, ...}: {
       virtualisation.memorySize = 2048;
+      services.hadoop = {
+        inherit package coreSite hdfsSite;
+        yarnSite = options.services.hadoop.yarnSite.default // yarnSiteHA;
+        yarn.nodemanager.enabled = true;
+      };
     };
   };
 
   testScript = ''
     start_all()
 
-    master.wait_for_unit("network.target")
-    master.wait_for_unit("hdfs-namenode")
+    #### HDFS tests ####
+
+    zk1.wait_for_unit("network.target")
+    jn1.wait_for_unit("network.target")
+    jn2.wait_for_unit("network.target")
+    jn3.wait_for_unit("network.target")
+    nn1.wait_for_unit("network.target")
+    nn2.wait_for_unit("network.target")
+    dn1.wait_for_unit("network.target")
+
+    zk1.wait_for_unit("zookeeper")
+    jn1.wait_for_unit("hdfs-journalnode")
+    jn2.wait_for_unit("hdfs-journalnode")
+    jn3.wait_for_unit("hdfs-journalnode")
+
+    zk1.wait_for_open_port(2181)
+    jn1.wait_for_open_port(8480)
+    jn1.wait_for_open_port(8485)
+    jn2.wait_for_open_port(8480)
+    jn2.wait_for_open_port(8485)
+
+    # Namenodes must be stopped before initializing the cluster
+    nn1.succeed("systemctl stop hdfs-namenode")
+    nn2.succeed("systemctl stop hdfs-namenode")
+    nn1.succeed("systemctl stop hdfs-zkfc")
+    nn2.succeed("systemctl stop hdfs-zkfc")
+
+    # Initialize zookeeper for failover controller
+    nn1.succeed("sudo -u hdfs hdfs zkfc -formatZK 2>&1 | systemd-cat")
+
+    # Format NN1 and start it
+    nn1.succeed("sudo -u hdfs hadoop namenode -format 2>&1 | systemd-cat")
+    nn1.succeed("systemctl start hdfs-namenode")
+    nn1.wait_for_open_port(9870)
+    nn1.wait_for_open_port(8022)
+    nn1.wait_for_open_port(8020)
+
+    # Bootstrap NN2 from NN1 and start it
+    nn2.succeed("sudo -u hdfs hdfs namenode -bootstrapStandby 2>&1 | systemd-cat")
+    nn2.succeed("systemctl start hdfs-namenode")
+    nn2.wait_for_open_port(9870)
+    nn2.wait_for_open_port(8022)
+    nn2.wait_for_open_port(8020)
+    nn1.succeed("netstat -tulpne | systemd-cat")
+
+    # Start failover controllers
+    nn1.succeed("systemctl start hdfs-zkfc")
+    nn2.succeed("systemctl start hdfs-zkfc")
 
-    master.wait_for_open_port(8020)
-    master.wait_for_open_port(9870)
+    # DN should have started by now, but confirm anyway
+    dn1.wait_for_unit("hdfs-datanode")
+    # Print states of namenodes
+    dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+    # Wait for cluster to exit safemode
+    dn1.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
+    dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+    # test R/W
+    dn1.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile")
+    assert "testfilecontents" in dn1.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
 
-    worker.wait_for_unit("network.target")
-    worker.wait_for_unit("hdfs-datanode")
-    worker.wait_for_open_port(9864)
-    worker.wait_for_open_port(9866)
-    worker.wait_for_open_port(9867)
+    # Test NN failover
+    nn1.succeed("systemctl stop hdfs-namenode")
+    assert "active" in dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
+    dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
+    assert "testfilecontents" in dn1.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
 
-    master.succeed("curl -f http://worker:9864")
-    worker.succeed("curl -f http://master:9870")
+    nn1.succeed("systemctl start hdfs-namenode")
+    nn1.wait_for_open_port(9870)
+    nn1.wait_for_open_port(8022)
+    nn1.wait_for_open_port(8020)
+    assert "standby" in dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
+    dn1.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
 
-    worker.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
+    #### YARN tests ####
 
-    master.wait_for_unit("yarn-resourcemanager")
+    rm1.wait_for_unit("network.target")
+    rm2.wait_for_unit("network.target")
+    nm1.wait_for_unit("network.target")
 
-    master.wait_for_open_port(8030)
-    master.wait_for_open_port(8031)
-    master.wait_for_open_port(8032)
-    master.wait_for_open_port(8088)
-    worker.succeed("curl -f http://master:8088")
+    rm1.wait_for_unit("yarn-resourcemanager")
+    rm1.wait_for_open_port(8088)
+    rm2.wait_for_unit("yarn-resourcemanager")
+    rm2.wait_for_open_port(8088)
 
-    worker.wait_for_unit("yarn-nodemanager")
-    worker.wait_for_open_port(8042)
-    worker.wait_for_open_port(8040)
-    master.succeed("curl -f http://worker:8042")
+    nm1.wait_for_unit("yarn-nodemanager")
+    nm1.wait_for_open_port(8042)
+    nm1.wait_for_open_port(8040)
+    nm1.wait_until_succeeds("yarn node -list | grep Nodes:1")
+    nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
+    nm1.succeed("sudo -u yarn yarn node -list | systemd-cat")
 
-    assert "Total Nodes:1" in worker.succeed("yarn node -list")
+    # Test RM failover
+    rm1.succeed("systemctl stop yarn-resourcemanager")
+    assert "standby" not in nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
+    nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
+    rm1.succeed("systemctl start yarn-resourcemanager")
+    rm1.wait_for_unit("yarn-resourcemanager")
+    rm1.wait_for_open_port(8088)
+    assert "standby" in nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
+    nm1.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
 
-    assert "Estimated value of Pi is" in worker.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10")
-    assert "SUCCEEDED" in worker.succeed("yarn application -list -appStates FINISHED")
-    worker.succeed("sudo -u hdfs hdfs dfs -ls / | systemd-cat")
+    assert "Estimated value of Pi is" in nm1.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10")
+    assert "SUCCEEDED" in nm1.succeed("yarn application -list -appStates FINISHED")
   '';
- })
+})
diff --git a/nixos/tests/hadoop/hdfs.nix b/nixos/tests/hadoop/hdfs.nix
index f5907185c03..9bc32cc7f8a 100644
--- a/nixos/tests/hadoop/hdfs.nix
+++ b/nixos/tests/hadoop/hdfs.nix
@@ -1,9 +1,13 @@
+# Test a minimal HDFS cluster with no HA
 import ../make-test-python.nix ({...}: {
   nodes = {
     namenode = {pkgs, ...}: {
       services.hadoop = {
         package = pkgs.hadoop;
-        hdfs.namenode.enabled = true;
+        hdfs.namenode = {
+          enabled = true;
+          formatOnInit = true;
+        };
         coreSite = {
           "fs.defaultFS" = "hdfs://namenode:8020";
         };
@@ -13,10 +17,6 @@ import ../make-test-python.nix ({...}: {
           "dfs.namenode.http-bind-host" = "0.0.0.0";
         };
       };
-      networking.firewall.allowedTCPPorts = [
-        9870 # namenode.http-address
-        8020 # namenode.rpc-address
-      ];
     };
     datanode = {pkgs, ...}: {
       services.hadoop = {
@@ -26,11 +26,6 @@ import ../make-test-python.nix ({...}: {
           "fs.defaultFS" = "hdfs://namenode:8020";
         };
       };
-      networking.firewall.allowedTCPPorts = [
-        9864 # datanode.http.address
-        9866 # datanode.address
-        9867 # datanode.ipc.address
-      ];
     };
   };
 
@@ -50,5 +45,9 @@ import ../make-test-python.nix ({...}: {
 
     namenode.succeed("curl -f http://namenode:9870")
     datanode.succeed("curl -f http://datanode:9864")
+
+    datanode.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
+    datanode.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile")
+    assert "testfilecontents" in datanode.succeed("sudo -u hdfs hdfs dfs -cat /testfile")
   '';
 })
diff --git a/nixos/tests/hadoop/yarn.nix b/nixos/tests/hadoop/yarn.nix
index fbbb293eecd..70367dfec18 100644
--- a/nixos/tests/hadoop/yarn.nix
+++ b/nixos/tests/hadoop/yarn.nix
@@ -1,3 +1,4 @@
+# This only tests if YARN is able to start its services
 import ../make-test-python.nix ({...}: {
   nodes = {
     resourcemanager = {pkgs, ...}: {
@@ -6,10 +7,6 @@ import ../make-test-python.nix ({...}: {
       services.hadoop.yarnSite = {
         "yarn.resourcemanager.scheduler.class" = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler";
       };
-      networking.firewall.allowedTCPPorts = [
-        8088 # resourcemanager.webapp.address
-        8031 # resourcemanager.resource-tracker.address
-      ];
     };
     nodemanager = {pkgs, ...}: {
       services.hadoop.package = pkgs.hadoop;
@@ -17,12 +14,7 @@ import ../make-test-python.nix ({...}: {
       services.hadoop.yarnSite = {
         "yarn.resourcemanager.hostname" = "resourcemanager";
         "yarn.nodemanager.log-dirs" = "/tmp/userlogs";
-        "yarn.nodemanager.address" = "0.0.0.0:8041";
       };
-      networking.firewall.allowedTCPPorts = [
-        8042 # nodemanager.webapp.address
-        8041 # nodemanager.address
-      ];
     };
 
   };
@@ -38,7 +30,6 @@ import ../make-test-python.nix ({...}: {
     nodemanager.wait_for_unit("yarn-nodemanager")
     nodemanager.wait_for_unit("network.target")
     nodemanager.wait_for_open_port(8042)
-    nodemanager.wait_for_open_port(8041)
 
     resourcemanager.succeed("curl -f http://localhost:8088")
     nodemanager.succeed("curl -f http://localhost:8042")