博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的CsvTableSink
阅读量:6813 次
发布时间:2019-06-26

本文共 7606 字,大约阅读时间需要 25 分钟。

本文主要研究一下flink的CsvTableSink

TableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala

trait TableSink[T] {  /**    * Returns the type expected by this [[TableSink]].    *    * This type should depend on the types returned by [[getFieldNames]].    *    * @return The type expected by this [[TableSink]].    */  def getOutputType: TypeInformation[T]  /** Returns the names of the table fields. */  def getFieldNames: Array[String]  /** Returns the types of the table fields. */  def getFieldTypes: Array[TypeInformation[_]]  /**    * Return a copy of this [[TableSink]] configured with the field names and types of the    * [[Table]] to emit.    *    * @param fieldNames The field names of the table to emit.    * @param fieldTypes The field types of the table to emit.    * @return A copy of this [[TableSink]] configured with the field names and types of the    *         [[Table]] to emit.    */  def configure(fieldNames: Array[String],                fieldTypes: Array[TypeInformation[_]]): TableSink[T]}
  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法

BatchTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala

trait BatchTableSink[T] extends TableSink[T] {  /** Emits the DataSet. */  def emitDataSet(dataSet: DataSet[T]): Unit}
  • BatchTableSink继承了TableSink,定义了emitDataSet方法

StreamTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/StreamTableSink.scala

trait StreamTableSink[T] extends TableSink[T] {  /** Emits the DataStream. */  def emitDataStream(dataStream: DataStream[T]): Unit}
  • StreamTableSink继承了TableSink,定义了emitDataStream方法

TableSinkBase

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSinkBase.scala

trait TableSinkBase[T] extends TableSink[T] {  private var fieldNames: Option[Array[String]] = None  private var fieldTypes: Option[Array[TypeInformation[_]]] = None  /** Return a deep copy of the [[TableSink]]. */  protected def copy: TableSinkBase[T]  /**    * Return the field names of the [[Table]] to emit. */  def getFieldNames: Array[String] = {    fieldNames match {      case Some(n) => n      case None => throw new IllegalStateException(        "TableSink must be configured to retrieve field names.")    }  }  /** Return the field types of the [[Table]] to emit. */  def getFieldTypes: Array[TypeInformation[_]] = {    fieldTypes match {      case Some(t) => t      case None => throw new IllegalStateException(        "TableSink must be configured to retrieve field types.")    }  }  /**    * Return a copy of this [[TableSink]] configured with the field names and types of the    * [[Table]] to emit.    *    * @param fieldNames The field names of the table to emit.    * @param fieldTypes The field types of the table to emit.    * @return A copy of this [[TableSink]] configured with the field names and types of the    *         [[Table]] to emit.    */  final def configure(fieldNames: Array[String],                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {    val configuredSink = this.copy    configuredSink.fieldNames = Some(fieldNames)    configuredSink.fieldTypes = Some(fieldTypes)    configuredSink  }}
  • TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法

CsvTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/CsvTableSink.scala

class CsvTableSink(    path: String,    fieldDelim: Option[String],    numFiles: Option[Int],    writeMode: Option[WriteMode])  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {  /**    * A simple [[TableSink]] to emit data as CSV files.    *    * @param path The output path to write the Table to.    * @param fieldDelim The field delimiter, ',' by default.    */  def this(path: String, fieldDelim: String = ",") {    this(path, Some(fieldDelim), None, None)  }  /**    * A simple [[TableSink]] to emit data as CSV files.    *    * @param path The output path to write the Table to.    * @param fieldDelim The field delimiter.    * @param numFiles The number of files to write to.    * @param writeMode The write mode to specify whether existing files are overwritten or not.    */  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))  }  override def emitDataSet(dataSet: DataSet[Row]): Unit = {    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))    if (numFiles.isDefined) {      csvRows.setParallelism(numFiles.get)    }    val sink = writeMode match {      case None => csvRows.writeAsText(path)      case Some(wm) => csvRows.writeAsText(path, wm)    }    if (numFiles.isDefined) {      sink.setParallelism(numFiles.get)    }    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))  }  override def emitDataStream(dataStream: DataStream[Row]): Unit = {    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))    if (numFiles.isDefined) {      csvRows.setParallelism(numFiles.get)    }    val sink = writeMode match {      case None => csvRows.writeAsText(path)      case Some(wm) => csvRows.writeAsText(path, wm)    }    if (numFiles.isDefined) {      sink.setParallelism(numFiles.get)    }    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))  }  override protected def copy: TableSinkBase[Row] = {    new CsvTableSink(path, fieldDelim, numFiles, writeMode)  }  override def getOutputType: TypeInformation[Row] = {    new RowTypeInfo(getFieldTypes: _*)  }}/**  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.  *  * @param fieldDelim The field delimiter.  */class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {  override def map(row: Row): String = {    val builder = new StringBuilder    // write first value    val v = row.getField(0)    if (v != null) {      builder.append(v.toString)    }    // write following values    for (i <- 1 until row.getArity) {      builder.append(fieldDelim)      val v = row.getField(i)      if (v != null) {        builder.append(v.toString)      }    }    builder.mkString  }}/**  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.  *  * @param fieldDelim The field delimiter.  */class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {  override def map(row: Row): String = {    val builder = new StringBuilder    // write first value    val v = row.getField(0)    if (v != null) {      builder.append(v.toString)    }    // write following values    for (i <- 1 until row.getArity) {      builder.append(fieldDelim)      val v = row.getField(i)      if (v != null) {        builder.append(v.toString)      }    }    builder.mkString  }}
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink
  • emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

小结

  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法;BatchTableSink继承了TableSink,定义了emitDataSet方法;StreamTableSink继承了TableSink,定义了emitDataStream方法;TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink;emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

doc

转载地址:http://exzzl.baihongyu.com/

你可能感兴趣的文章
redhat4.5 linux samba笔记 2
查看>>
Linux学习记录--程序编译与函数库
查看>>
列表配置中的按钮操作——JEPLUS软件快速开发平台
查看>>
Shell脚本学习二
查看>>
福州seo维思:浅谈微博营销十大因素
查看>>
android 网络框架
查看>>
省赛热身赛之Javabeans
查看>>
$.format,jquery.format 使用说明
查看>>
安装LSB--lsb_release:command not found
查看>>
大数据量时Mysql的优化要点
查看>>
我的友情链接
查看>>
形似而神不似:云计算治理与SOA治理
查看>>
Ubuntu使用root用户+安装mysql5.07+基础配置
查看>>
常用命令及目录介绍
查看>>
实现手机号属地查询
查看>>
区块链3.0_精通Hyperledger之搭建Farbic-samples环境(2)
查看>>
VMware下ubuntu上网设置(二)
查看>>
Maven3路程(三)用Maven创建第一个web项目(转)
查看>>
网站pr突然从0变成3了.一夜之间pr升到3了.
查看>>
php新手入门之PHP常用特殊运算符号
查看>>