Merge pull request #137429 from illustris/spark3

Spark: init module
This commit is contained in:
Aaron Andersen 2021-09-18 07:28:19 -04:00 committed by GitHub
commit 4ec195a9c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 316 additions and 46 deletions

View File

@ -37,6 +37,13 @@
PostgreSQL now defaults to major version 13.
</para>
</listitem>
<listitem>
<para>
spark now defaults to spark 3, updated from 2. A
<link xlink:href="https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-24-to-30">migration
guide</link> is available.
</para>
</listitem>
<listitem>
<para>
Activation scripts can now opt int to be run when running
@ -250,6 +257,12 @@
entry</link>.
</para>
</listitem>
<listitem>
<para>
<link xlink:href="https://spark.apache.org/">spark</link>, a
unified analytics engine for large-scale data processing.
</para>
</listitem>
</itemizedlist>
</section>
<section xml:id="sec-release-21.11-incompatibilities">

View File

@ -14,6 +14,8 @@ In addition to numerous new and upgraded packages, this release has the followin
- PostgreSQL now defaults to major version 13.
- spark now defaults to spark 3, updated from 2. A [migration guide](https://spark.apache.org/docs/latest/core-migration-guide.html#upgrading-from-core-24-to-30) is available.
- Activation scripts can now opt int to be run when running `nixos-rebuild dry-activate` and detect the dry activation by reading `$NIXOS_ACTION`.
This allows activation scripts to output what they would change if the activation was really run.
The users/modules activation script supports this and outputs some of is actions.
@ -78,6 +80,8 @@ subsonic-compatible api. Available as [navidrome](#opt-services.navidrome.enable
or sends them to a downstream service for further analysis.
Documented in [its manual entry](#module-services-parsedmarc).
- [spark](https://spark.apache.org/), a unified analytics engine for large-scale data processing.
## Backward Incompatibilities {#sec-release-21.11-incompatibilities}

View File

@ -297,6 +297,7 @@
./services/cluster/kubernetes/pki.nix
./services/cluster/kubernetes/proxy.nix
./services/cluster/kubernetes/scheduler.nix
./services/cluster/spark/default.nix
./services/computing/boinc/client.nix
./services/computing/foldingathome/client.nix
./services/computing/slurm/slurm.nix

View File

@ -0,0 +1,162 @@
{config, pkgs, lib, ...}:
let
cfg = config.services.spark;
in
with lib;
{
options = {
services.spark = {
master = {
enable = mkEnableOption "Spark master service";
bind = mkOption {
type = types.str;
description = "Address the spark master binds to.";
default = "127.0.0.1";
example = "0.0.0.0";
};
restartIfChanged = mkOption {
type = types.bool;
description = ''
Automatically restart master service on config change.
This can be set to false to defer restarts on clusters running critical applications.
Please consider the security implications of inadvertently running an older version,
and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
'';
default = true;
};
extraEnvironment = mkOption {
type = types.attrsOf types.str;
description = "Extra environment variables to pass to spark master. See spark-standalone documentation.";
default = {};
example = {
SPARK_MASTER_WEBUI_PORT = 8181;
SPARK_MASTER_OPTS = "-Dspark.deploy.defaultCores=5";
};
};
};
worker = {
enable = mkEnableOption "Spark worker service";
workDir = mkOption {
type = types.path;
description = "Spark worker work dir.";
default = "/var/lib/spark";
};
master = mkOption {
type = types.str;
description = "Address of the spark master.";
default = "127.0.0.1:7077";
};
restartIfChanged = mkOption {
type = types.bool;
description = ''
Automatically restart worker service on config change.
This can be set to false to defer restarts on clusters running critical applications.
Please consider the security implications of inadvertently running an older version,
and the possibility of unexpected behavior caused by inconsistent versions across a cluster when disabling this option.
'';
default = true;
};
extraEnvironment = mkOption {
type = types.attrsOf types.str;
description = "Extra environment variables to pass to spark worker.";
default = {};
example = {
SPARK_WORKER_CORES = 5;
SPARK_WORKER_MEMORY = "2g";
};
};
};
confDir = mkOption {
type = types.path;
description = "Spark configuration directory. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory.";
default = "${cfg.package}/lib/${cfg.package.untarDir}/conf";
defaultText = literalExample "\${cfg.package}/lib/\${cfg.package.untarDir}/conf";
};
logDir = mkOption {
type = types.path;
description = "Spark log directory.";
default = "/var/log/spark";
};
package = mkOption {
type = types.package;
description = "Spark package.";
default = pkgs.spark;
defaultText = "pkgs.spark";
example = literalExample ''pkgs.spark.overrideAttrs (super: rec {
pname = "spark";
version = "2.4.4";
src = pkgs.fetchzip {
url = "mirror://apache/spark/"''${pname}-''${version}/''${pname}-''${version}-bin-without-hadoop.tgz";
sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g";
};
})'';
};
};
};
config = lib.mkIf (cfg.worker.enable || cfg.master.enable) {
environment.systemPackages = [ cfg.package ];
systemd = {
services = {
spark-master = lib.mkIf cfg.master.enable {
path = with pkgs; [ procps openssh nettools ];
description = "spark master service.";
after = [ "network.target" ];
wantedBy = [ "multi-user.target" ];
restartIfChanged = cfg.master.restartIfChanged;
environment = cfg.master.extraEnvironment // {
SPARK_MASTER_HOST = cfg.master.bind;
SPARK_CONF_DIR = cfg.confDir;
SPARK_LOG_DIR = cfg.logDir;
};
serviceConfig = {
Type = "forking";
User = "spark";
Group = "spark";
WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}";
ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-master.sh";
ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-master.sh";
TimeoutSec = 300;
StartLimitBurst=10;
Restart = "always";
};
};
spark-worker = lib.mkIf cfg.worker.enable {
path = with pkgs; [ procps openssh nettools rsync ];
description = "spark master service.";
after = [ "network.target" ];
wantedBy = [ "multi-user.target" ];
restartIfChanged = cfg.worker.restartIfChanged;
environment = cfg.worker.extraEnvironment // {
SPARK_MASTER = cfg.worker.master;
SPARK_CONF_DIR = cfg.confDir;
SPARK_LOG_DIR = cfg.logDir;
SPARK_WORKER_DIR = cfg.worker.workDir;
};
serviceConfig = {
Type = "forking";
User = "spark";
WorkingDirectory = "${cfg.package}/lib/${cfg.package.untarDir}";
ExecStart = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/start-worker.sh spark://${cfg.worker.master}";
ExecStop = "${cfg.package}/lib/${cfg.package.untarDir}/sbin/stop-worker.sh";
TimeoutSec = 300;
StartLimitBurst=10;
Restart = "always";
};
};
};
tmpfiles.rules = [
"d '${cfg.worker.workDir}' - spark spark - -"
"d '${cfg.logDir}' - spark spark - -"
];
};
users = {
users.spark = {
description = "spark user.";
group = "spark";
isSystemUser = true;
};
groups.spark = { };
};
};
}

View File

@ -0,0 +1,28 @@
import ../make-test-python.nix ({...}: {
name = "spark";
nodes = {
worker = { nodes, pkgs, ... }: {
virtualisation.memorySize = 1024;
services.spark.worker = {
enable = true;
master = "master:7077";
};
};
master = { config, pkgs, ... }: {
services.spark.master = {
enable = true;
bind = "0.0.0.0";
};
networking.firewall.allowedTCPPorts = [ 22 7077 8080 ];
};
};
testScript = ''
master.wait_for_unit("spark-master.service")
worker.wait_for_unit("spark-worker.service")
worker.copy_from_host( "${./spark_sample.py}", "/spark_sample.py" )
assert "<title>Spark Master at spark://" in worker.succeed("curl -sSfkL http://master:8080/")
worker.succeed("spark-submit --master spark://master:7077 --executor-memory 512m --executor-cores 1 /spark_sample.py")
'';
})

View File

@ -0,0 +1,40 @@
from pyspark.sql import Row, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import explode
def explode_col(weight):
return int(weight//10) * [10.0] + ([] if weight%10==0 else [weight%10])
spark = SparkSession.builder.getOrCreate()
dataSchema = [
StructField("feature_1", FloatType()),
StructField("feature_2", FloatType()),
StructField("bias_weight", FloatType())
]
data = [
Row(0.1, 0.2, 10.32),
Row(0.32, 1.43, 12.8),
Row(1.28, 1.12, 0.23)
]
df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(dataSchema))
normalizing_constant = 100
sum_bias_weight = df.select(F.sum('bias_weight')).collect()[0][0]
normalizing_factor = normalizing_constant / sum_bias_weight
df = df.withColumn('normalized_bias_weight', df.bias_weight * normalizing_factor)
df = df.drop('bias_weight')
df = df.withColumnRenamed('normalized_bias_weight', 'bias_weight')
my_udf = udf(lambda x: explode_col(x), ArrayType(FloatType()))
df1 = df.withColumn('explode_val', my_udf(df.bias_weight))
df1 = df1.withColumn("explode_val_1", explode(df1.explode_val)).drop("explode_val")
df1 = df1.drop('bias_weight').withColumnRenamed('explode_val_1', 'bias_weight')
df1.show()
assert(df1.count() == 12)

View File

@ -1,56 +1,75 @@
{ lib, stdenv, fetchzip, makeWrapper, jre, pythonPackages, coreutils, hadoop
{ lib, stdenv, fetchzip, makeWrapper, jdk8, python3Packages, extraPythonPackages ? [], coreutils, hadoop
, RSupport? true, R
}:
with lib;
stdenv.mkDerivation rec {
let
spark = { pname, version, src }:
stdenv.mkDerivation rec {
inherit pname version src;
nativeBuildInputs = [ makeWrapper ];
buildInputs = [ jdk8 python3Packages.python ]
++ extraPythonPackages
++ optional RSupport R;
pname = "spark";
version = "2.4.4";
untarDir = "${pname}-${version}";
installPhase = ''
mkdir -p $out/{lib/${untarDir}/conf,bin,/share/java}
mv * $out/lib/${untarDir}
src = fetchzip {
url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz";
sha256 = "1a9w5k0207fysgpxx6db3a00fs5hdc2ncx99x4ccy2s0v5ndc66g";
cp $out/lib/${untarDir}/conf/log4j.properties{.template,}
cat > $out/lib/${untarDir}/conf/spark-env.sh <<- EOF
export JAVA_HOME="${jdk8}"
export SPARK_HOME="$out/lib/${untarDir}"
export SPARK_DIST_CLASSPATH=$(${hadoop}/bin/hadoop classpath)
export PYSPARK_PYTHON="${python3Packages.python}/bin/${python3Packages.python.executable}"
export PYTHONPATH="\$PYTHONPATH:$PYTHONPATH"
${optionalString RSupport ''
export SPARKR_R_SHELL="${R}/bin/R"
export PATH="\$PATH:${R}/bin"''}
EOF
for n in $(find $out/lib/${untarDir}/bin -type f ! -name "*.*"); do
makeWrapper "$n" "$out/bin/$(basename $n)"
substituteInPlace "$n" --replace dirname ${coreutils.out}/bin/dirname
done
for n in $(find $out/lib/${untarDir}/sbin -type f); do
# Spark deprecated scripts with "slave" in the name.
# This line adds forward compatibility with the nixos spark module for
# older versions of spark that don't have the new "worker" scripts.
ln -s "$n" $(echo "$n" | sed -r 's/slave(s?).sh$/worker\1.sh/g') || true
done
ln -s $out/lib/${untarDir}/lib/spark-assembly-*.jar $out/share/java
'';
meta = {
description = "Apache Spark is a fast and general engine for large-scale data processing";
homepage = "http://spark.apache.org";
license = lib.licenses.asl20;
platforms = lib.platforms.all;
maintainers = with maintainers; [ thoughtpolice offline kamilchm illustris ];
repositories.git = "git://git.apache.org/spark.git";
};
};
in {
spark3 = spark rec {
pname = "spark";
version = "3.1.2";
src = fetchzip {
url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz";
sha256 = "1bgh2y6jm7wqy6yc40rx68xkki31i3jiri2yixb1bm0i9pvsj9yf";
};
};
spark2 = spark rec {
pname = "spark";
version = "2.4.8";
nativeBuildInputs = [ makeWrapper ];
buildInputs = [ jre pythonPackages.python pythonPackages.numpy ]
++ optional RSupport R;
untarDir = "${pname}-${version}-bin-without-hadoop";
installPhase = ''
mkdir -p $out/{lib/${untarDir}/conf,bin,/share/java}
mv * $out/lib/${untarDir}
sed -e 's/INFO, console/WARN, console/' < \
$out/lib/${untarDir}/conf/log4j.properties.template > \
$out/lib/${untarDir}/conf/log4j.properties
cat > $out/lib/${untarDir}/conf/spark-env.sh <<- EOF
export JAVA_HOME="${jre}"
export SPARK_HOME="$out/lib/${untarDir}"
export SPARK_DIST_CLASSPATH=$(${hadoop}/bin/hadoop classpath)
export PYSPARK_PYTHON="${pythonPackages.python}/bin/${pythonPackages.python.executable}"
export PYTHONPATH="\$PYTHONPATH:$PYTHONPATH"
${optionalString RSupport
''export SPARKR_R_SHELL="${R}/bin/R"
export PATH=$PATH:"${R}/bin/R"''}
EOF
for n in $(find $out/lib/${untarDir}/bin -type f ! -name "*.*"); do
makeWrapper "$n" "$out/bin/$(basename $n)"
substituteInPlace "$n" --replace dirname ${coreutils.out}/bin/dirname
done
ln -s $out/lib/${untarDir}/lib/spark-assembly-*.jar $out/share/java
'';
meta = {
description = "Apache Spark is a fast and general engine for large-scale data processing";
homepage = "http://spark.apache.org";
license = lib.licenses.asl20;
platforms = lib.platforms.all;
maintainers = with maintainers; [ thoughtpolice offline kamilchm ];
repositories.git = "git://git.apache.org/spark.git";
src = fetchzip {
url = "mirror://apache/spark/${pname}-${version}/${pname}-${version}-bin-without-hadoop.tgz";
sha256 = "1mkyq0gz9fiav25vr0dba5ivp0wh0mh7kswwnx8pvsmb6wbwyfxv";
};
};
}

View File

@ -13146,7 +13146,10 @@ with pkgs;
self = pkgsi686Linux.callPackage ../development/interpreters/self { };
spark = callPackage ../applications/networking/cluster/spark { };
inherit (callPackages ../applications/networking/cluster/spark { hadoop = hadoop_3_1; })
spark3
spark2;
spark = spark3;
sparkleshare = callPackage ../applications/version-management/sparkleshare { };