- 现在让我们通过对目标表进行Left Anti Join过滤掉增量表中的所有 Insert only 记录 。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")val tgtHudiDf = spark.sql("select * from hudi_product_catalog")hudiTableData.createOrReplaceTempView("hudiTable")//Cast as neededval stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts"))).withColumn("seller_id",col("seller_id").cast("int"))//Prepare an insert DF from incremental temp DFval instmpDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"left_anti").select("stg.*")val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59"))).withColumn("actv_ind",lit(1))insDf.show(false)+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary|Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
- 我们有一个只插入记录的DataFrame 。接下来让我们创建一个DataFrame , 其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录 。
//Prepare an update DF from incremental temp DF, select columns from both the tablesval updDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"inner") .where(col("stg.eff_start_ts") > col("tgt.eff_start_ts")).select((stgDf.columns.map(c => stgDf(c).as(s"stg_$c"))++ tgtHudiDf.columns.map(c => tgtHudiDf(c).as(s"tgt_$c"))):_*)updDf.show(false)+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+|stg_seller_id|stg_prod_category| stg_product_name|stg_product_package|stg_discount_percentage| stg_eff_start_ts|tgt__hoodie_commit_time|tgt__hoodie_commit_seqno|tgt__hoodie_record_key|tgt__hoodie_partition_path|tgt__hoodie_file_name|tgt_seller_id|tgt_prod_category|tgt_product_name|tgt_product_package|tgt_discount_percentage| tgt_eff_start_ts| tgt_eff_end_ts|tgt_actv_ind|+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+| 1234| Detergent| Tide 5L| 6| 25|2022-01-31 10:00:30| 20220710113622931| 20220710113622931...| seller_id:1234,pr...| actv_ind=1| 2dd6109f-2173-429...| 1234| Detergent| Tide 2L| 6| 15|2021-12-15 15:20:30|9999-12-31 23:59:59| 1|| 4565| Gourmet|Dairy Milk Almond| 12| 45|2022-06-12 20:30:40| 20220710113622931| 20220710113622931...| seller_id:4565,pr...| actv_ind=1| 2dd6109f-2173-429...| 4565| Gourmet| Dairy Milk Silk| 6| 30|2021-06-12 20:30:40|9999-12-31 23:59:59| 1|+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
推荐阅读
-
-
-
-
-
-
-
2023武汉中考志愿填报规则详解 2021年武汉中考志愿填报时间
-
关于深圳市万科教育发展基金会简述 深圳市万科教育发展基金会
-
没有足够的可用内存来运行此程序 没有足够的可用内存来运行程序
-
-
-
-
-
-
绝地求生刺激战场军团名字怎么改?军团名字修改攻略[多图]
-
-
-
OPPO Reno6简单评测 oppo reno6有nfc吗
-
-