Zhi Blog
  1. Iceberg
Zhi Blog
  • Zhi Blog
  • LLM&RAG&Agent
    • agent+大数据
    • deepseek的指令
    • 大模型控制电脑
    • 10.开发llm的两种方式
    • 11.展示写一个网站
    • 12.展示写一个reactflow前端
    • 13.大模型在反复迭代场景中的应用举例
    • 14.agent开发的workflow流程和自主编排
    • 15.利用大模型实现提示词的优化
    • 16.这就是为啥要学习提示词工程
    • 提示技巧
    • 17.提示词通用技巧以及提示词工程框架介绍
    • 18.Unsloth 大模型微调工具
    • autogen starter
    • MCP1:about MCP
    • MCP2: 如何用langchain创建自己的MCP server&client
    • 十分钟系列
      • 1.十分钟实现免费本地大模型对话框
      • 2.十分钟在本地部署大模型
      • 3.十分钟实现本地大模型部署并部署对话应用
      • 4.十分钟实现本地知识库部署
      • 5.十分钟在本地实现Deepseek R1 70B免费对话
      • 6.十分钟实现本地可视化开发Agent
      • 7.待补充
    • 参考
      • AI最大赛道Agent机遇全解析
      • 从第一性原理看大模型Agent技术
      • Agent项目
      • LLama部署和微调手册
      • Agent实战-JSON结构化智能
      • AI智能体卷爆大模型!AutoGPT等4大Agent打擂,「西部世界」谁将成为软件2.0
      • Agent调研--19类Agent框架对比
      • 国内近 50 款 AI Agent 产品问世,技术足够支撑应用可靠性了吗
      • 解析 AI Agent 的发展现状和技术难点
      • 清华大学与智谱 AI 联合推出 CogAgent:基于多模态大模型的 GUI Agent,具备视觉问答、视觉定位等能力
      • Agent 还没出圈,落地先有了“阻力”:进入平台期,智力能否独立担事?
      • 钉钉卡位战:SaaS 挣不到的钱,Agent 会挣到
      • 近三代操作系统开发元老一起创业搞 AI Agent 操作系统
      • 从科幻走向现实,LLM Agent 做到哪一步了
  • Hudi
    • 1.hudi介绍和简单实践
    • 2.flink基于hudi的数据湖实践
    • 3.概说hudi
  • Iceberg
    • iceberg初步实践
  1. Iceberg

iceberg初步实践

安装部署#

把mysql连接的jar和iceberg spark running 的jar包放入spark的依赖jars目录中
这里一定要支持spark3的jar,否则不能通过spark来创建iceberg表
把hdfs-site.xml,core-site.xml,slaves,hive-site.xml放入到spark的conf目录下
提前启动hive的metastore服务(hive的元数据库在建库时一定要提前执行格式化命令
schematool -dbType mysql -initSchema

catalog为hadoop#

执行spark-sql命令#

这里的hadoop_prod表示使用hadoop来创建iceberg表
并提前在hdfs中创建/user/iceberg/warehouse路径

利用spark-shell执行操作#

执行代码
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.catalog._
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql._
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration);
// 在hive的default库里创建example表(注意这里不是iceberg的库)
val name = TableIdentifier.of("default","example");
// 输入数据
val df1=Seq((1,"A"),(2,"A"),(3,"B"),(4,"C")).toDF("id","level");
val df1_schema = SparkSchemaUtil.convert(df1.schema);
// Specify partition strategy based on column "level"
val partition_spec=PartitionSpec.builderFor(df1_schema).identity("level").build;
// Creating the table
val table=catalog.createTable(name,df1_schema,partition_spec);
//Writing sample data to the table
df1.write.format("iceberg").mode("overwrite").save("default.example");
//Reading it back
val read_df1=spark.read.format("iceberg").load("default.example");
read_df1.show;

//增加新的count列,从而改变table schema
val table = catalog.loadTable(TableIdentifier.of("default", "example"));
table.updateSchema.addColumn("count", LongType.get()).commit();
table.schema.toString;
//向表里添加新的数据
val df2=Seq((5,"A",5)).toDF("id","level", "count");
df2.write.format("iceberg").mode("append").save("default.example");
val df3=Seq((6,"A",3)).toDF("id","level", "count");
df3.write.format("iceberg").mode("append").save("default.example");
// The DataFrame point to the table will get new data
val read_df2=spark.read.format("iceberg").load("default.example");
read_df2.show;
//也可以查看表的历史数据
spark.read.format("iceberg").load("default.example.history").show(truncate = false)
image.png

通过snapshot找到当时的#

这里可以查看表的snapshot
image.png
spark.read.option("snapshot-id", 3372567346381641315l).format("iceberg").load("/user/iceberg/warehouse/default/tb_test1").show

IDEA操作Spark#

依赖

catalog为hive#

在hive中使用iceberg支持以后可以再hive中对数据进行分析处理 , 但是不能对数据进行修改和创建表等操作 , 也就是说暂且还不支持写操作
这里创建的数据库就是在hive的路径下了
在hive中查看表
前提
1 将iceberg-hive-runtime-0.10.0.jar包添加到hive的lib包下 , 或者是在客户端使用add jar 添加到项目中
2 开启hive对iceberg的支持
show databases;
修改于 2025-03-20 05:54:08
上一页
3.概说hudi
Built with