summary refs log tree commit diff
path: root/nixos/tests/hadoop/hadoop.nix
blob: b132f4fa58b01e632facf129626d2f6ff14caa43 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# This test is very comprehensive. It tests whether all hadoop services work well with each other.
# Run this when updating the Hadoop package or making significant changes to the hadoop module.
# For a more basic test, see hdfs.nix and yarn.nix
import ../make-test-python.nix ({ package, ... }: {
  name = "hadoop-combined";

  nodes =
    let
      coreSite = {
        "fs.defaultFS" = "hdfs://ns1";
      };
      hdfsSite = {
        # HA Quorum Journal Manager configuration
        "dfs.nameservices" = "ns1";
        "dfs.ha.namenodes.ns1" = "nn1,nn2";
        "dfs.namenode.shared.edits.dir.ns1" = "qjournal://jn1:8485;jn2:8485;jn3:8485/ns1";
        "dfs.namenode.rpc-address.ns1.nn1" = "nn1:8020";
        "dfs.namenode.rpc-address.ns1.nn2" = "nn2:8020";
        "dfs.namenode.servicerpc-address.ns1.nn1" = "nn1:8022";
        "dfs.namenode.servicerpc-address.ns1.nn2" = "nn2:8022";
        "dfs.namenode.http-address.ns1.nn1" = "nn1:9870";
        "dfs.namenode.http-address.ns1.nn2" = "nn2:9870";

        # Automatic failover configuration
        "dfs.client.failover.proxy.provider.ns1" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
        "dfs.ha.automatic-failover.enabled.ns1" = "true";
        "dfs.ha.fencing.methods" = "shell(true)";
        "ha.zookeeper.quorum" = "zk1:2181";
      };
      yarnSite = {
        "yarn.resourcemanager.zk-address" = "zk1:2181";
        "yarn.resourcemanager.ha.enabled" = "true";
        "yarn.resourcemanager.ha.rm-ids" = "rm1,rm2";
        "yarn.resourcemanager.hostname.rm1" = "rm1";
        "yarn.resourcemanager.hostname.rm2" = "rm2";
        "yarn.resourcemanager.ha.automatic-failover.enabled" = "true";
        "yarn.resourcemanager.cluster-id" = "cluster1";
        # yarn.resourcemanager.webapp.address needs to be defined even though yarn.resourcemanager.hostname is set. This shouldn't be necessary, but there's a bug in
        # hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/amfilter/AmFilterInitializer.java:70
        # that causes AM containers to fail otherwise.
        "yarn.resourcemanager.webapp.address.rm1" = "rm1:8088";
        "yarn.resourcemanager.webapp.address.rm2" = "rm2:8088";
      };
    in
    {
      zk1 = { ... }: {
        services.zookeeper.enable = true;
        networking.firewall.allowedTCPPorts = [ 2181 ];
      };

      # HDFS cluster
      nn1 = { ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite;
          hdfs.namenode = {
            enable = true;
            openFirewall = true;
          };
          hdfs.zkfc.enable = true;
        };
      };
      nn2 = { ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite;
          hdfs.namenode = {
            enable = true;
            openFirewall = true;
          };
          hdfs.zkfc.enable = true;
        };
      };

      jn1 = { ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite;
          hdfs.journalnode = {
            enable = true;
            openFirewall = true;
          };
        };
      };
      jn2 = { ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite;
          hdfs.journalnode = {
            enable = true;
            openFirewall = true;
          };
        };
      };
      jn3 = { ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite;
          hdfs.journalnode = {
            enable = true;
            openFirewall = true;
          };
        };
      };

      dn1 = { ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite;
          hdfs.datanode = {
            enable = true;
            openFirewall = true;
          };
        };
      };

      # YARN cluster
      rm1 = { options, ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite yarnSite;
          yarn.resourcemanager = {
            enable = true;
            openFirewall = true;
          };
        };
      };
      rm2 = { options, ... }: {
        services.hadoop = {
          inherit package coreSite hdfsSite yarnSite;
          yarn.resourcemanager = {
            enable = true;
            openFirewall = true;
          };
        };
      };
      nm1 = { options, ... }: {
        virtualisation.memorySize = 2048;
        services.hadoop = {
          inherit package coreSite hdfsSite yarnSite;
          yarn.nodemanager = {
            enable = true;
            openFirewall = true;
          };
        };
      };
      client = { options, ... }: {
        services.hadoop = {
          gatewayRole.enable = true;
          inherit package coreSite hdfsSite yarnSite;
        };
      };
  };

  testScript = ''
    start_all()

    #### HDFS tests ####

    zk1.wait_for_unit("network.target")
    jn1.wait_for_unit("network.target")
    jn2.wait_for_unit("network.target")
    jn3.wait_for_unit("network.target")
    nn1.wait_for_unit("network.target")
    nn2.wait_for_unit("network.target")
    dn1.wait_for_unit("network.target")

    zk1.wait_for_unit("zookeeper")
    jn1.wait_for_unit("hdfs-journalnode")
    jn2.wait_for_unit("hdfs-journalnode")
    jn3.wait_for_unit("hdfs-journalnode")

    zk1.wait_for_open_port(2181)
    jn1.wait_for_open_port(8480)
    jn1.wait_for_open_port(8485)
    jn2.wait_for_open_port(8480)
    jn2.wait_for_open_port(8485)

    # Namenodes must be stopped before initializing the cluster
    nn1.succeed("systemctl stop hdfs-namenode")
    nn2.succeed("systemctl stop hdfs-namenode")
    nn1.succeed("systemctl stop hdfs-zkfc")
    nn2.succeed("systemctl stop hdfs-zkfc")

    # Initialize zookeeper for failover controller
    nn1.succeed("sudo -u hdfs hdfs zkfc -formatZK 2>&1 | systemd-cat")

    # Format NN1 and start it
    nn1.succeed("sudo -u hdfs hadoop namenode -format 2>&1 | systemd-cat")
    nn1.succeed("systemctl start hdfs-namenode")
    nn1.wait_for_open_port(9870)
    nn1.wait_for_open_port(8022)
    nn1.wait_for_open_port(8020)

    # Bootstrap NN2 from NN1 and start it
    nn2.succeed("sudo -u hdfs hdfs namenode -bootstrapStandby 2>&1 | systemd-cat")
    nn2.succeed("systemctl start hdfs-namenode")
    nn2.wait_for_open_port(9870)
    nn2.wait_for_open_port(8022)
    nn2.wait_for_open_port(8020)
    nn1.succeed("netstat -tulpne | systemd-cat")

    # Start failover controllers
    nn1.succeed("systemctl start hdfs-zkfc")
    nn2.succeed("systemctl start hdfs-zkfc")

    # DN should have started by now, but confirm anyway
    dn1.wait_for_unit("hdfs-datanode")
    # Print states of namenodes
    client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
    # Wait for cluster to exit safemode
    client.succeed("sudo -u hdfs hdfs dfsadmin -safemode wait")
    client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
    # test R/W
    client.succeed("echo testfilecontents | sudo -u hdfs hdfs dfs -put - /testfile")
    assert "testfilecontents" in client.succeed("sudo -u hdfs hdfs dfs -cat /testfile")

    # Test NN failover
    nn1.succeed("systemctl stop hdfs-namenode")
    assert "active" in client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
    client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")
    assert "testfilecontents" in client.succeed("sudo -u hdfs hdfs dfs -cat /testfile")

    nn1.succeed("systemctl start hdfs-namenode")
    nn1.wait_for_open_port(9870)
    nn1.wait_for_open_port(8022)
    nn1.wait_for_open_port(8020)
    assert "standby" in client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState")
    client.succeed("sudo -u hdfs hdfs haadmin -getAllServiceState | systemd-cat")

    #### YARN tests ####

    rm1.wait_for_unit("network.target")
    rm2.wait_for_unit("network.target")
    nm1.wait_for_unit("network.target")

    rm1.wait_for_unit("yarn-resourcemanager")
    rm1.wait_for_open_port(8088)
    rm2.wait_for_unit("yarn-resourcemanager")
    rm2.wait_for_open_port(8088)

    nm1.wait_for_unit("yarn-nodemanager")
    nm1.wait_for_open_port(8042)
    nm1.wait_for_open_port(8040)
    client.wait_until_succeeds("yarn node -list | grep Nodes:1")
    client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
    client.succeed("sudo -u yarn yarn node -list | systemd-cat")

    # Test RM failover
    rm1.succeed("systemctl stop yarn-resourcemanager")
    assert "standby" not in client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
    client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")
    rm1.succeed("systemctl start yarn-resourcemanager")
    rm1.wait_for_unit("yarn-resourcemanager")
    rm1.wait_for_open_port(8088)
    assert "standby" in client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState")
    client.succeed("sudo -u yarn yarn rmadmin -getAllServiceState | systemd-cat")

    assert "Estimated value of Pi is" in client.succeed("HADOOP_USER_NAME=hdfs yarn jar $(readlink $(which yarn) | sed -r 's~bin/yarn~lib/hadoop-*/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar~g') pi 2 10")
    assert "SUCCEEDED" in client.succeed("yarn application -list -appStates FINISHED")
  '';
})