spark java内存溢出怎么给java分配更多的内存

Spark三种属性配置方式详细说明 – 过往记忆
欢迎关注Hadoop、Spark、FlinkHive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop。
文章总数:725
浏览总数:8,398,445
评论:4545
分类目录:84 个
注册用户数:2154
最后更新:日
欢迎关注微信公共帐号:iteblog_hadoop
IT技术前沿:geek_toutiao
  随着项目的逐渐成熟, 越来越多的可配置参数被添加到中来。在Spark中提供了三个地方用于配置:
  1、Spark properties:这个可以控制应用程序的绝大部分属性。并且可以通过 SparkConf对象或者Java 系统属性进行设置;
  2、环境变量(Environment variables):这个可以分别对每台机器进行相应的设置,比如IP。这个可以在每台机器的$SPARK_HOME/ conf/spark-env.sh脚本中进行设置;
  3、日志:所有的日志相关的属性可以在log4j.properties文件中进行设置。
  下面对这三种属性设定进行详细的说明。
一、Spark properties
  Spark properties可以控制应用程序的绝大部分属性,并且可以分别在每个应用上进行设置。这些属性可以直接在SparkConf对象上设定,该对象可以传递给SparkContext。SparkConf对象允许你去设定一些通用的属性(比如master URL、应用的名称等),这些属性可以传递给set()方法的任意key-value对。如下:
val conf = new SparkConf()
.setMaster(&local&)
.setAppName(&CountingSheep&)
.set(&spark.executor.memory&, &1g&)
val sc = new SparkContext(conf)
动态加载Spark属性
  在一些场景中,你可能想避免在代码中将SparkConf对象的属性进行设死;比如,你可能想在不同的master上面或者不同内存容量运行你的应用程序。这就需要你运行程序的时候进行设置,Spark允许你创建一个空的conf对象,如下:
val sc = new SparkContext(new SparkConf())
  然后你可以在运行的时候通过命令行进行一些属性的配置:
./bin/spark-submit --name &My app&
--master local[4]
--conf spark.shuffle.spill=false
--conf &spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps&
  Spark shell和 spark-submit工具支持两种方式来动态加载配置属性。第一种是命令行方式,比如--master;spark-submit工具可以通过--conf标记接收任何的Spark属性。运行 ./bin/spark-submit --help将会显示全部的选项。
   ./bin/spark-submit工具也会从 conf/spark-defaults.conf配置文件中读取配置选项。 在conf/spark-defaults.conf配置文件中,每行是key-value对,中间可以是用空格进行分割,也可以直接用等号进行分割。如下:
spark.master
spark://:7077
spark.executor.memory
spark.eventLog.enabled
spark.serializer
org.apache.spark.serializer.KryoSerializer
  每个值将作为一个flags传递到应用中并个SparkConf对象中相应的属性进行合并。通过SparkConf 对象配置的属性优先级最高;其次是对spark-submit 或 spark-shell通过flags配置;最后是spark-defaults.conf文件中的配置。
哪里可以查看配置好的Spark属性
  在应用程序对应的WEB UI(http://&driver&:4040)上的Environment标签下面将会显示出该应用程序的所有Spark配置选项。在你想确定你的配置是否正确的情况下是非常有用的。需要注意的是,只有显示通过spark-defaults.conf 或SparkConf 进行配置的属性才会在那个页面显示。其他所有没有显示的属性,你可以认为这些属性的值为默认的。
二、环境变量
  有很大一部分的Spark设定可以通过环境变量来进行设定。这些环境变量设定在conf/spark-env.sh 脚本文件中(如果你是windows系统,那么这个文件名称是conf/spark-env.cmd)。在 Standalone 和 Mesos模式下,这个文件可以设定一些和机器相关的信息(比如hostname)。
  需要注意,在刚刚安装的Spark中conf/spark-env.sh文件是不存在的。但是你可以通过复制conf/spark-env.sh.template文件来创建,你的确保这个复制之后的文件是可运行的。
  下面的属性是可以在conf/spark-env.sh文件中配置
JAVA_HOME Java的安装目录
PYSPARK_PYTHON Python binary executable to use for PySpark.
SPARK_LOCAL_IP IP address of the machine to bind to.
SPARK_PUBLIC_DNS Hostname your Spark program will advertise to other machines.
对于 standalone 模式的集群除了上面的属性可以配置外,还有很多的属性可以配置,具体我就不说了,自己看文档去。
三、日志配置
  Spark用log4j来记录日志。你可以通过配置log4j.properties来设定不同日志的级别、存放位置等。这个文件默认也是不存在的,你可以通过复制log4j.properties.template文件来得到。
  在后期文章中,我将逐个的介绍Spark中各个参数的含义。欢迎大家关注。
  关于应用程序相关的属性设置解释:
本博客文章除特别声明,全部都是原创!
禁止个人和公司转载本文、谢谢理解:
下面文章您可能感兴趣今天看啥 热点:
Spark技术内幕:Executor分配详解,sparkexecutor
当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程。序列图如下:1. SparkContext创建TaskScheduler和DAG SchedulerSparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它。如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例。创建SparkContext的实例,主要的工作除了设置一些conf,比如executor使用到的memory的大小。如果系统的配置文件有,那么就读取该配置。否则则读取环境变量。如果都没有设置,那么取默认值为512M。当然了这个数值还是很保守的,特别是在内存已经那么昂贵的今天。private[spark] val executorMemory = conf.getOption(&spark.executor.memory&)
.orElse(Option(System.getenv(&SPARK_EXECUTOR_MEMORY&)))
.orElse(Option(System.getenv(&SPARK_MEM&)).map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(512)除了加载这些集群的参数,它完成了TaskScheduler和DAGScheduler的创建:
// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), &HeartbeatReceiver&)
@volatile private[spark] var dagScheduler: DAGScheduler = _
dagScheduler = new DAGScheduler(this)
case e: Exception =& throw
new SparkException(&DAGScheduler cannot be initialized due to %s&.format(e.getMessage))
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
taskScheduler.start()TaskScheduler是通过不同的SchedulerBackend来调度和管理任务。它包含资源分配和任务调度。它实现了FIFO调度和FAIR调度,基于此来决定不同jobs之间的调度顺序。并且管理任务,包括任务的提交和终止,为饥饿任务启动备份任务。不同的Cluster,包括local模式,都是通过不同的SchedulerBackend的实现其不同的功能。这个模块的类图如下:2. TaskScheduler通过SchedulerBackend创建AppClientSparkDeploySchedulerBackend是Standalone模式的SchedulerBackend。通过创建AppClient,可以向Standalone的Master注册Application,然后Master会通过Application的信息为它分配Worker,包括每个worker上使用CPU core的数目等。private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with AppClientListener
with Logging {
var client: AppClient = null
//注:Application与Master的接口
val maxCores = conf.getOption(&spark.cores.max&).map(_.toInt) //注:获得每个executor最多的CPU core数目
override def start() {
super.start()
// The endpoint for executors to talk to us
val driverUrl = &akka.tcp://%s@%s:%s/user/%s&.format(
SparkEnv.driverActorSystemName,
conf.get(&spark.driver.host&),
conf.get(&spark.driver.port&),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
//注:现在executor还没有申请,因此关于executor的所有信息都是未知的。
//这些参数将会在org.apache.spark.deploy.worker.ExecutorRunner启动ExecutorBackend的时候替换这些参数
val args = Seq(driverUrl, &{{EXECUTOR_ID}}&, &{{HOSTNAME}}&, &{{CORES}}&, &{{WORKER_URL}}&)
//注:设置executor运行时需要的环境变量
val extraJavaOpts = sc.conf.getOption(&spark.executor.extraJavaOptions&)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption(&spark.executor.extraClassPath&).toSeq.flatMap { cp =&
cp.split(java.io.File.pathSeparator)
val libraryPathEntries =
sc.conf.getOption(&spark.executor.extraLibraryPath&).toSeq.flatMap { cp =&
cp.split(java.io.File.pathSeparator)
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//注:在Worker上通过org.apache.spark.deploy.worker.ExecutorRunner启动
// org.apache.spark.executor.CoarseGrainedExecutorBackend,这里准备启动它需要的参数
val command = Command(&org.apache.spark.executor.CoarseGrainedExecutorBackend&,
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
//注:org.apache.spark.deploy.ApplicationDescription包含了所有注册这个Application的所有信息。
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
//注:在Master返回注册Application成功的消息后,AppClient会回调本class的connected,完成了Application的注册。
waitForRegistration()
}org.apache.spark.deploy.client.AppClientListener是一个trait,主要为了SchedulerBackend和AppClient之间的函数回调,在以下四种情况下,AppClient会回调相关函数以通知SchedulerBackend:private[spark] trait AppClientListener {
def connected(appId: String): Unit
/** Disconnection may be a temporary state, as we fail over to a new Master. */
def disconnected(): Unit
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}小结:SparkDeploySchedulerBackend装备好启动Executor的必要参数后,创建AppClient,并通过一些回调函数来得到Executor和连接等信息;通过org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor与ExecutorBackend来进行通信。3. AppClient向Master提交ApplicationAppClient是Application和Master交互的接口。它的包含一个类型为org.apache.spark.deploy.client.AppClient.ClientActor的成员变量actor。它负责了所有的与Master的交互。actor首先向Master注册Application。如果超过20s没有接收到注册成功的消息,那么会重新注册;如果重试超过3次仍未成功,那么本次提交就以失败结束了。
def tryRegisterAllMasters() {
for (masterUrl &- masterUrls) {
logInfo(&Connecting to master & + masterUrl + &...&)
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription) // 向Master注册
def registerWithMaster() {
tryRegisterAllMasters()
import context.dispatcher
var retries = 0
registrationRetryTimer = Some { // 如果注册20s内未收到成功的消息,那么再次重复注册
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) { // 注册成功,那么取消所有的重试
registrationRetryTimer.foreach(_.cancel())
} else if (retries &= REGISTRATION_RETRIES) { // 重试超过指定次数(3次),则认为当前Cluster不可用,退出
markDead(&All masters are unresponsive! Giving up.&)
} else { // 进行新一轮的重试
tryRegisterAllMasters()
}主要的消息如下:4. Master根据AppClient的提交选择WorkerMaster接收到AppClient的registerApplication的请求后,处理逻辑如下:
case RegisterApplication(description) =& {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response //注:AppClient有超时机制(20s),超时会重试
logInfo(&Registering app & + description.name)
val app = createApplication(description, sender)
// app is ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores), driver就是AppClient的actor
//保存到master维护的成员变量中,比如
/* apps +=
idToApp(app.id) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app */
registerApplication(app)
logInfo(&Registered app & + description.name + & with ID & + app.id)
persistenceEngine.addApplication(app) //持久化app的元数据信息,可以选择持久化到ZooKeeper,本地文件系统,或者不持久化
sender ! RegisteredApplication(app.id, masterUrl)
schedule() //为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度
}schedule() 为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度。为Application分配资源选择worker(executor),现在有两种策略:对于同一个Application,它在一个worker上只能拥有一个executor;当然了,这个executor可能拥有多于1个core。其主要逻辑如下:if (spreadOutApps) { //尽量的打散负载,如有可能,每个executor分配一个core
// Try to spread out each app among all the nodes, until it has all its cores
for (app &- waitingApps if app.coresLeft & 0) { //使用FIFO的方式为等待的app分配资源
// 可用的worker的标准:State是Alive,其上并没有该Application的executor,可用内存满足要求。
// 在可用的worker中,优先选择可用core数多的。
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node 保存在该节点上预分配的core数
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign & 0) {
if (usableWorkers(pos).coresFree - assigned(pos) & 0) {
toAssign -= 1
assigned(pos) += 1
pos = (pos + 1) % numUsable
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos &- 0 until numUsable) {
if (assigned(pos) & 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
} else {//尽可能多的利用worker的core
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker &- workers if worker.coresFree & 0 && worker.state == WorkerState.ALIVE) {
for (app &- waitingApps if app.coresLeft & 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse & 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}在选择了worker和确定了worker上得executor需要的CPU core数后,Master会调用&launchExecutor(worker: WorkerInfo, exec: ExecutorInfo)向Worker发送请求,向AppClient发送executor已经添加的消息。同时会更新master保存的worker的信息,包括增加executor,减少可用的CPU core数和memory数。Master不会等到真正在worker上成功启动executor后再更新worker的信息。如果worker启动executor失败,那么它会发送FAILED的消息给Master,Master收到该消息时再次更新worker的信息即可。这样是简化了逻辑。
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo(&Launching executor & + exec.fullId + & on worker & + worker.id)
worker.addExecutor(exec)//更新worker的信息,可用core数和memory数减去本次分配的executor占用的
// 向Worker发送启动executor的请求
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
// 向AppClient发送executor已经添加的消息ss
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}小结:现在的分配方式还是比较粗糙的。比如并没有考虑节点的当前总体负载。可能会导致节点上executor的分配是比较均匀的,单纯静态的从executor分配到得CPU core数和内存数来看,负载是比较均衡的。但是从实际情况来看,可能有的executor的资源消耗比较大,因此会导致集群负载不均衡。这个需要从生产环境的数据得到反馈来进一步的修正和细化分配策略,以达到更好的资源利用率。5. Worker根据Master的资源分配结果来创建ExecutorWorker接收到来自Master的LaunchExecutor的消息后,会创建org.apache.spark.deploy.worker.ExecutorRunner。Worker本身会记录本身资源的使用情况,包括已经使用的CPU core数,memory数等;但是这个统计只是为了web UI的展现。Master本身会记录Worker的资源使用情况,无需Worker自身汇报。Worker与Master之间的心跳的目的仅仅是为了报活,不会携带其他的信息。ExecutorRunner会将在org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend中准备好的org.apache.spark.deploy.ApplicationDescription以进程的形式启动起来。当时以下几个参数还是未知的:val args = Seq(driverUrl, &{{EXECUTOR_ID}}&, &{{HOSTNAME}}&, &{{CORES}}&, &{{WORKER_URL}}&)。ExecutorRunner需要将他们替换成已经分配好的实际值: /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
case &{{WORKER_URL}}& =& workerUrl
case &{{EXECUTOR_ID}}& =& execId.toString
case &{{HOSTNAME}}& =& host
case &{{CORES}}& =& cores.toString
case other =& other
}接下来就启动org.apache.spark.deploy.ApplicationDescription中携带的org.apache.spark.executor.CoarseGrainedExecutorBackend:def fetchAndRunExecutor() {
// Create the executor's working directory
val executorDir = new File(workDir, appId + &/& + execId)
if (!executorDir.mkdirs()) {
throw new IOException(&Failed to create directory & + executorDir)
// Launch the process
val command = getCommandSeq
logInfo(&Launch command: & + command.mkString(&\&&, &\& \&&, &\&&))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) &- mand.environment) {
env.put(key, value)
// In case we are running this from within the Spark Shell, avoid creating a &scala&
// parent process for the executor command
env.put(&SPARK_LAUNCH_WITH_SCALA&, &0&)
process = builder.start()CoarseGrainedExecutorBackend启动后,会首先通过传入的driverUrl这个参数向在org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend::DriverActor发送RegisterExecutor(executorId, hostPort, cores),DriverActor会回复RegisteredExecutor,此时CoarseGrainedExecutorBackend会创建一个org.apache.spark.executor.Executor。至此,Executor创建完毕。Executor在Mesos, YARN, and the standalone scheduler中,都是相同的。不同的只是资源的分配管理方式。
(1)8080端口   端口说明:8080端口同80端口,是被用于WWW代理服务的,可以实现网页浏览,经常在访问某个网站或使用代理服务器的时候,会加上“:8080”端口号 。
端口图端口漏洞:8080端口可以被各种病毒程序所利用,比如Brown Orifice(BrO)特洛伊木马病毒可以利用8080端口完全遥控被感染的计算机。另外,RemoConChubo,RingZero木马也可以利用该端口进行攻击。   操作建议:一般我们是使用80端口进行网页浏览的,为了避免病毒的攻击,我们可以关闭该端口。   (2)端口:21   服务:FTP   说明:FTP服务器所开放的端口,用于上传、下载。最常见的攻击者用于寻找打开anonymous的FTP服务器的方****。这些服务器带有可读写的目录。木马Doly Trojan、Fore、Invisible FTP、WebEx、WinCrash和Blade Runner所开放的端口。   (3)端口:22   服务:Ssh   说明:PcAnywhere建立的TCP和这一端口的连接可能是为了寻找ssh。这一服务有许多弱点,如果配置成特定的模式,许多使用RSAREF库的版本就会有不少的漏洞存在。   (4)端口:23   服务:Telnet   说明:远程登录,入侵者在搜索远程登录UNIX的服务。大多数情况下扫描这一端口是为了找到机器运行的操作系统。还有使用其他技术,入侵者也会找到密码。木马Tiny Telnet Server就开放这个端口。   (5)端口:25   服务:SMTP   说明:SMTP服务器所开放的端口,用于发送邮件。入侵者寻找SMTP服务器是为了传递他们的SPAM。入侵者的帐户被关闭,他们需要连接到高带宽的E-MAIL服务器上,将简单的信息传递到不同的地址。木马Antigen、Email Password Sender、Haebu Coceda、Shtrilitz Stealth、WinPC、WinSpy都开放这个端口。   (6)端口:80   服务:HTTP   说明:用于网页浏览。木马Executor开放此端口。   (7)端口:102   服务:Message transfer agent(MTA)-X.400 over TCP/IP   说明:消息传输代理。   (8)端口:109   服务:Post Office Protocol -Version3   说明:POP3服务器开放此端口,用于接收邮件,客户端访问服务器端的邮件服务。POP3服务有许多公认的弱点。关于用户名和密码交 换缓冲区溢出的弱点至少有20个,这意味着入侵者可以在真正登陆前进入系统。成功登陆后还有其他缓冲区溢出错误。   (9)端口:110   服务:SUN公司的RPC服务所有端口   说明:常见RPC服务有rpc.mountd、NFS、rpc.statd、rpc.csmd、rpc.ttybd、amd等   (10)端口:119   服务:Network News Transfer Protocol   说明:NEWS新闻组传输协议,承载USENET通信。这个端口的连接通常是人们在寻找USENET服务器。多数ISP限制,只有他们的客户才能访问他们的新闻组服务器。打开新闻组服务器将允许发/读任何人的帖子,访问被限制的新闻组服务器,匿名发帖或发送SPAM。   (11)端口:135   服务:Location Service   说明:Microsoft在这个端口运行DCE RPC end-poi......余下全文>>
相关搜索:
相关阅读:
相关频道:
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
云计算最近更新Spark 配置 - 技术翻译 - 开源中国社区
当前访客身份:游客 [
已有文章 2431 篇
当前位置:
Spark 配置
英文原文:
0人收藏此文章,
推荐于 3年前 (共 5 段, 翻译完成于 09-29) ()
参与翻译(3人):
Spark提供了三种主要本地设置来配置系统:
&用来加载Spark的workers,可以在你的驱动程序或theconf/spark-env.shscript中设定。
&控制内部配置参数,可以通过编程方式设置(通过在创建SparkContext之前调用System.setProperty)或者通过inspark-env.sh中的SPARK_JAVA_OPTS环境变量。
&通过log4j.properties来设置。
&翻译的不错哦!
Spark决定worker节点上如何初始化JVM,当你在本地运行spark-shell时也是这样,通过运行Spark安装目录下的conf/spark-env.sh脚本即可启动spark-shell。该脚本在Git库中默认不存在,但你可以通过复制conf/spark-env.sh.template来创建一个。确保复制后的脚本有执行权限。
在spark-env.sh中,你必需至少设置下面两个变量:
SCALA_HOME 指定你的Scala安装位置,或使用SCALA_LIBRARY_PATH指定Scala的jar库位置(如果你以Debian或RPM包方式安装Scala,无法设置SCALA_HOME,但库在一个特定目录里,通常是/usr/share/java,可以通过搜索scala-library.jar获取)。
MESOS_NATIVE_LIBRARY 如果你正设置该项。
&翻译的不错哦!
另外,这里有4个另外的用于控制执行的变量。这些变量需要在运行Job的驱动程序的上下文设置,而不是在spark-env.sh,因为这些变量将会被自动传递给workers。在每一个job里设置这些变量可以使得不同的job对这些变量有不同的配置。
SPARK_JAVA_OPTS, 添加JVM选项。这包含了你用-D传递的一些系统属性。
SPARK_CLASSPATH, 向Spark的classpath添加元素
SPARK_LIBRARY_PATH, 添加本地库的搜索路径
SPARK_MEM, 设置每一个节点使用的内存大小。这应该和JVM的-Xmx选项类似的形式,比如300m或者1g。需要注意为了推行spark.executor.memorysystem属性,这个选项很快将不被推荐使用,所以我们推荐你使用在代码中使用spark.executor.memorysystem。
注意,如果在spark-env.sh文件中设置这些变量,他们将会被用户程序中设置的值所覆盖。如果你愿意,你可以选择只有在用户程序中没有设置的情况下在spark-env.sh中如下设置他们:
if [ -z "$SPARK_JAVA_OPTS" ] ; then
SPARK_JAVA_OPTS="-verbose:gc"
&翻译的不错哦!
为了设置Spark的系统属性,你需要给JVM传递一个-D标志的参数 (比如,java -Dspark.cores.max=5 MyProgram) 或者是在你创建Spark上下文的时候调用System.setProperty()方法,就像下面这样:
System.setProperty("spark.cores.max", "5")
val sc = new SparkContext(...)
大多数的可配置系统会在内部设置一个比较合理的默认值。但是,至少下面的5个属性你应该自己去设置的:
spark.executor.memory
每个处理器可以使用的内存大小,跟JVM的内存表示的字符串格式是一样的(比如: '512m','2g')
spark.serializer
spark.JavaSerializer
一个类名,用于序列化网络传输或者以序列化形式缓存起来的各种对象。默认情况下Java的序列化机制可以序列化任何实现了Serializable接口的对象,但是速度是很慢的,因此当你在意运行速度的时候我们建议你使用。可以是任何&的子类。
spark.kryo.registrator
如果你使用的是Kryo序列化,就要为Kryo设置这个类去注册你自定义的类。这个类需要继承。 可以参考& 获取更多的信息。
spark.local.dir
设置Spark的暂存目录,包括映射输出文件盒需要存储在磁盘上的RDDs。这个磁盘目录在你的系统上面访问速度越快越好。可以用逗号隔开来设置多个目录。
spark.cores.max
(infinite)
当运行在一个上或者是一个上的时候,最多可以请求多少个CPU核心。默认是所有的都能用。
&翻译的不错哦!
除了上体的5个外,下面还列举了一些属性,在某些情况下你可能需要自己去配置下。
spark.mesos.coarse
如果设置为了"true",将以运行在Mesos集群上,&这时候Spark会在每台机器上面获得一个长期运行的Mesos任务,而不是对每个Spark任务都要产生一个Mesos任务。对于很多短查询,这个可能会有些许的延迟,但是会大大提高Spark工作时的资源利用率。
spark.default.parallelism
在用户没有指定时,用于分布式随机操作(groupByKey,reduceByKey等等)的默认的任务数。
spark.storage.memoryFraction
Spark用于缓存的内存大小所占用的Java堆的比率。这个不应该大于JVM中老年代所分配的内存大小,默认情况下老年代大小是堆大小的2/3,但是你可以通过配置你的老年代的大小,然后再去增加这个比率。
spark.ui.port
你的应用程序控制面板端口号,控制面板中可以显示每个RDD的内存使用情况。
是否压缩映射输出文件,通常设置为true是个不错的选择。
广播变量在发送之前是否先要被压缩,通常设置为true是个不错的选择。
是否要压缩序列化的RDD分区(比如,StorageLevel.MEMORY_ONLY_SER)。在消耗一点额外的CPU时间的代价下,可以极大的提高减少空间的使用。
spark.reducer.maxMbInFlight
同时获取每一个分解任务的时候,映射输出文件的最大的尺寸(以兆为单位)。由于对每个输出都需要我们去创建一个缓冲区去接受它,这个属性值代表了对每个分解任务所使用的内存的一个上限值,因此除非你机器内存很大,最好还是配置一下这个值。
spark.closure.serializer
spark.JavaSerializer
用于闭包的序列化类。通常Java是可以胜任的,除非在你的驱动程序中分布式函数(比如map函数)引用了大量的对象。
spark.kryoserializer.buffer.mb
Kryo中运行的对象的最大尺寸(Kryo库需要创建一个不小于最大的单个序列化对象的缓存区)。如果在Kryo中出现"buffer limit exceeded"异常,你就需要去增加这个值了。注意,对每个worker而言,一个核心就会有一个缓冲。
spark.broadcast.factory
spark.broadcast.HttpBroadcastFactory
使用哪一个广播实现
spark.locality.wait
在发布一个本地数据任务时候,放弃并发布到一个非本地数据的地方前,需要等待的时间。如果你的很多任务都是长时间运行的任务,并且看到了很多的脏数据的话,你就该增加这个值了。但是一般情况下缺省值就可以很好的工作了。
spark.worker.timeout
如果超过这个时间,独立部署master还没有收到worker的心跳回复,那么就认为这个worker已经丢失了。
spark.akka.frameSize
在控制面板通信(序列化任务和任务结果)的时候消息尺寸的最大值,单位是MB。如果你需要给驱动器发回大尺寸的结果(比如使用在一个大的数据集上面使用collect()方法),那么你就该增加这个值了。
spark.akka.threads
用于通信的actor线程数量。如果驱动器有很多CPU核心,那么在大集群上可以增大这个值。
spark.akka.timeout
Spark节点之间通信的超时时间,以秒为单位
spark.driver.host
(local hostname)
驱动器监听主机名或者IP地址.
spark.driver.port
驱动器监听端口号
spark.cleaner.ttl
Spark记忆任何元数据(stages生成,任务生成等等)的时间(秒)。周期性清除保证在这个时间之前的元数据会被遗忘。当长时间几小时,几天的运行Spark的时候设置这个是很有用的。注意:任何内存中的RDD只要过了这个时间就会被清除掉。
spark.streaming.blockInterval
从网络中批量接受对象时的持续时间。
Spark使用& 作为它的日志实现。 你可以在conf文件夹中增加一个log4j.properties配置文件去配置日志。开始的时候,你可以复制conf文件夹中已经存在一个log4j.properties.template模板,重命名为log4j.properties。
&翻译的不错哦!
好文要在第一时间顶起来!!

我要回帖

更多关于 java内存分配机制 的文章

 

随机推荐