SparkStreaming+Kafka

news/2024/7/24 7:38:05 标签: 大数据, java, 开发工具

摘自 :
Spark踩坑记——Spark Streaming+Kafka

  • SpringStreaming+Kafka
    • 1.SpringStreaming+Kafka 接受数据和发送数据
      • (1)SparkStreaming 接受kafka方式
      • (2)Spark 发送数据至Kafka中
    • 2.Spark streaming+Kafka调优
      • 2.1 批处理时间设置
      • 2.2 合理的Kafka拉取量
      • 2.3 缓存反复使用的Dstream(RDD)
      • 2.4 设置合理的GC
      • 2.5 设置合理的CPU资源数
      • 2.6设置合理的parallelism
      • 2.7使用高性能的算子

SpringStreaming+Kafka

1.SpringStreaming+Kafka 接受数据和发送数据

(1)SparkStreaming 接受kafka方式

  • 基于Received的方式
    基于Receiverd方式获取数据
    这里写图片描述
  • 基于DirectKafkaStreaming
    KafkaStream-Recevied方式

DirectKafkaStreaming 相比较 ReceiverKafkaStreaming
- 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
- 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
- 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

(2)Spark 发送数据至Kafka中

一般处理方式 : 在RDD.forpartition进行操作

input.foreachRDD(rdd =>
  // 不能在这里创建KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
) 

此方式的缺点在于每次foreach操作都需要重新创建一次kafkaProduce 主要花费时间都在 创建连接的时候.
基于此我们以以下方式进行操作

  • 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    }
    object KafkaSink {
      import scala.collection.JavaConversions._
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }
  • 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", Conf.brokers)
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
  • 这样我们就能在每个executor中愉快的将数据输入到kafka当中:
//输出到kafka
segmentedStream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreach(record => {
      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
      // do something else
    })
  }
})

2.Spark streaming+Kafka调优

2.1 批处理时间设置

参数设置:

2.2 合理的Kafka拉取量

参数设置: spark.streaming.kafka.maxRatePerPartition

2.3 缓存反复使用的Dstream(RDD)

DStream.cache()

2.4 设置合理的GC

长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

2.5 设置合理的CPU资源数

CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

2.6设置合理的parallelism

partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

2.7使用高性能的算子

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之后进行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

http://www.niftyadmin.cn/n/734843.html

相关文章

vue中css的用var()挂载在全局使用

1,新建一个.css文件, 例如: 2,在需要的地方引入改css文件,并使用该变量 控制台打开可以看到你定义的这些样式就说明引入成功了 没了,结束了,是不是很简单呐,如有问题,欢迎留言。 最…

.net reactor 学习系列(一)---.net reactor介绍

原文:.net reactor 学习系列(一)---.net reactor介绍学习.net已经一年多了,从语言的编写到框架类库的运用再到.net三大解决方案的了解(WF,WCF,WPF),不断地让我更深入地了解了.net平台的应用场景。逐渐熟练地运用.net技术来解决实际的业务需求。 由于我们…

Selenium学习(4) 键盘操作

需要引入 Keys 类,模拟键盘的操作示例:from selenium import webdriverfrom selenium.webdriver.common.keys import Keysfrom time import sleepdriver webdriver.Chrome()driver.get("https://www.baidu.com")driver.maximize_window()slee…

vue常见的传值方式,子传父,父传子,兄弟之间,多个子传一个父,多个父传一个子,兄弟之间传值,事件中央总线传值

1,父传子 父组件&#xff1a; <template><div><abnormal-warning :datas"3"></abnormal-warning> // 1子组件</div>//多个子传一个父&#xff0c;变量一样就可以了&#xff0c;值随便传&#xff0c;其他不变<abnormal-warning :da…

Echarts图表根据浏览器窗口缩放进行动态缩放,多个echarts同时缩放

更改之前的效果图: 更改之前浏览器窗口放大缩小图表都不会进行动态的缩放&#xff0c; 更改之后的效果图&#xff1a; 更改之后图表就会根据浏览器窗口大小实时监听进行缩放 代码&#xff1a; topChart.setOption({series: [{name: 最大值,type: line,stack: 最大值,data: data…

1,uniapp功能之—NFC

在根目录的static中新建nfc.js文件 nfc.js var NfcAdapter; export default {// 初始化NFCNFCInit() {try {let main2 plus.android.runtimeMainActivity();let Intent plus.android.importClass(android.content.Intent);let Activity plus.android.importClass(android.a…

NOIP2017题解

T1小凯的疑惑 小凯手中有两种面值的金币&#xff0c;两种面值均为正整数且彼此互素。每种金币小凯都有 无数个。在不找零的情况下&#xff0c;仅凭这两种金币&#xff0c;有些物品他是无法准确支付的。现在小 凯想知道在无法准确支付的物品中&#xff0c;最贵的价值是多少金币&…

2,uniapp功能之—扫码(条形码,二维码)点击扫码或者pda侧边按钮扫码

最近在搞uniapp的项目,所以最近的文章基本上是关于uniapp的。 分享一个自动生成二维码的网址,点击前往 注意 如果扫码和上一个nfc功能在同一个页面里面,会发生冲突,只有一个有用,一个没有用,所有,我给扫码的分装成了一个组件,这样就不会冲突了。 1,在component里面新建…