import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.catalog._
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql._
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
// 在hive的default库里创建example表(注意这里不是iceberg的库)
val name = TableIdentifier.of("default","example");
// 输入数据
val df1=Seq((1,"A"),(2,"A"),(3,"B"),(4,"C")).toDF("id","level");
val df1_schema = SparkSchemaUtil.convert(df1.schema);
// Specify partition strategy based on column "level"
val partition_spec=PartitionSpec.builderFor(df1_schema).identity("level").build;
// Creating the table
val table=catalog.createTable(name,df1_schema,partition_spec);
//Writing sample data to the table
df1.write.format("iceberg").mode("overwrite").save("default.example");
//Reading it back
val read_df1=spark.read.format("iceberg").load("default.example");
read_df1.show;
//增加新的count列,从而改变table schema
val table = catalog.loadTable(TableIdentifier.of("default", "example"));
table.updateSchema.addColumn("count", LongType.get()).commit();
table.schema.toString;
//向表里添加新的数据
val df2=Seq((5,"A",5)).toDF("id","level", "count");
df2.write.format("iceberg").mode("append").save("default.example");
val df3=Seq((6,"A",3)).toDF("id","level", "count");
df3.write.format("iceberg").mode("append").save("default.example");
// The DataFrame point to the table will get new data
val read_df2=spark.read.format("iceberg").load("default.example");
read_df2.show;
//也可以查看表的历史数据
spark.read.format("iceberg").load("default.example.history").show(truncate = false)