iceberg初步实践
安装部署
这里一定要支持spark3的jar,否则不能通过spark来创建iceberg表
schematool -dbType mysql -initSchema
catalog为hadoop
执行spark-sql命令
并提前在hdfs中创建/user/iceberg/warehouse路径
利用spark-shell执行操作
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.catalog._
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql._
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
// 在hive的default库里创建example表(注意这里不是iceberg的库)
val name = TableIdentifier.of("default","example");
// 输入数据
val df1=Seq((1,"A"),(2,"A"),(3,"B"),(4,"C")).toDF("id","level");
val df1_schema = SparkSchemaUtil.convert(df1.schema);
// Specify partition strategy based on column "level"
val partition_spec=PartitionSpec.builderFor(df1_schema).identity("level").build;
// Creating the table
val table=catalog.createTable(name,df1_schema,partition_spec);
//Writing sample data to the table
df1.write.format("iceberg").mode("overwrite").save("default.example");
//Reading it back
val read_df1=spark.read.format("iceberg").load("default.example");
read_df1.show;
//增加新的count列,从而改变table schema
val table = catalog.loadTable(TableIdentifier.of("default", "example"));
table.updateSchema.addColumn("count", LongType.get()).commit();
table.schema.toString;
//向表里添加新的数据
val df2=Seq((5,"A",5)).toDF("id","level", "count");
df2.write.format("iceberg").mode("append").save("default.example");
val df3=Seq((6,"A",3)).toDF("id","level", "count");
df3.write.format("iceberg").mode("append").save("default.example");
// The DataFrame point to the table will get new data
val read_df2=spark.read.format("iceberg").load("default.example");
read_df2.show;
//也可以查看表的历史数据
spark.read.format("iceberg").load("default.example.history").show(truncate = false)
通过snapshot找到当时的
spark.read.option("snapshot-id", 3372567346381641315l).format("iceberg").load("/user/iceberg/warehouse/default/tb_test1").show
IDEA操作Spark
catalog为hive
前提
1 将iceberg-hive-runtime-0.10.0.jar包添加到hive的lib包下 , 或者是在客户端使用add jar 添加到项目中
2 开启hive对iceberg的支持
修改于 2025-03-20 05:54:08