تخطي إلى المحتوى الرئيسي

جداول Ilum

Ilum Tables هو تنسيق Spark يعمل كغلاف لتنسيقات بيانات Delta و Iceberg و Hudi. يمكنك من الوصول إلى مجموعات البيانات وإنشائها بهذه التنسيقات باستخدام واجهة موحدة ، مما يؤدي إلى تصميم كود أكثر مرونة.

استيراد جداول Ilum

للاستفادة من جداول Ilum يجب أن تشمل ILUM-spark-format حزمة في الخاص بك وظائف إيلوم . يمكنك القيام بذلك عن طريق إضافة هذا التكوين:

spark.jars.packages=cloud.ilum:ilum-spark-format:6.1.0 

أو عن طريق إضافة الحزمة كجرة منفصلة إلى موارد Ilum Job الخاصة بك

كيفية تضمين الحزمة في تطبيق سكالا

  • باستخدام sbt:
libraryDependencies += "cloud.ilum" %  "ilum-spark-format" %  "6.1.0"
  • باستخدام المخضرم:
< تبعية > 
< معرف المجموعة > كلاود.إيلوم </ معرف المجموعة >
< معرف القطعة الأثرية > ILUM-spark-format </ معرف القطعة الأثرية >
< الإصدار > 6.1.0 </ الإصدار >
</ تبعية >
  • باستخدام gradle:
مجموعة التنفيذ: "cloud.ilum"، الاسم: "ilum-spark-format"، الإصدار: "6.1.0" 

كيفية استخدامه؟

  • قراءة وكتابة البيانات مع تحديد ' تنسيق ILUM '
  • قراءة البيانات وكتابتها باستخدام طريقة إيلوم

للقيام بذلك ، يجب عليك استيرادها على النحو التالي:

استورد  سحابة . إيلوم . implicits. { 
IlumDataFrameReader,
IlumDataFrameWriter,
IlumDataFrameWriterV2,
IlumDataStreamWriter,
IlumDataStreamReader
}
  • قراءة البيانات وكتابتها عن طريق التكوين المسبق كتالوج (writeTo ، read.table)

قراءة

val filepath =  "s3a://ilum-files/ilum-tables/table"
val tableFormat = Some( "delta")

بدون طريقة ILUM
val mydf = شراره . قرأ . format( "ilum") . خيار ( "tableFormat", tableFormat) . load( filepath)

مع طريقة إيلوم
val mydf2 = sparkSession. قرأ . إيلوم ( filePath, tableFormat)


كتابه

val filepath =  "s3a://ilum-files/ilum-tables/table"
val tableFormat = "delta"

valبيانات = Seq(
( 1 , "Alice") ,
( 2 , "Bob") ,
( 3 , "Cathy")
)

valمدافع = شراره . createDataFrame ( بيانات ) . toDF( "المعرف" , "الاسم" )

باستخدام DataframeWriterV1

يمكنك استخدام بناء الجملة مثل هذا
مدافع . يكتب . format( "ilum") . خيار ( "tableFormat", tableFormat) . save( filepath)

أو يمكنك استخدام الدالة ILUM

مدافع . يكتب . إيلوم ( filepath + "/1", format)

استخدام DataframeWriterV2 مع كتالوج Delta الذي تم تكوينه مسبقا
valكتالوج = "catalog"
valجدول = "tablename"
مدافع . writeTo( s " ${كتالوج } . ${جدول } " ) . إيلوم ( format, None ) . createOrReplace( )

تدفق

بدون طرق إيلوم

val filepath =  "s3a://ilum-files/ilum-tables/streaming"
val tableFormat = "delta"

val input = شراره . readStream
. format( "ilum")
. خيار ( "tableFormat", tableFormat)
. load( filepath)

valاستفسار = input. writeStream
. outputMode( "append")
. format( "ilum")
. خيار ( "tableFormat", tableFormat)
. خيار ( "path", filepath + "_copy")
. خيار ( "checkpointLocation", filepath + "_checkpoint")
. start( )

استفسار . awaitTermination( )

مع طرق إيلوم:


val filePath = s "s3a://ilum-files/ilum-tables/smth"
val tableFormat = Some( "delta")

valمدافع = sparkSession. readStream. إيلوم ( filePath, tableFormat)

valاستفسار = مدافع . writeStream
. خيار ( "checkpointLocation", filePath + "_checkpoint")
. إيلوم ( filePath+ "_copy", tableFormat)

استفسار . awaitTermination( )

تكوين تنسيقات البيانات

الدلتا

للاستفادة من Delta ، يجب عليك استخدام تكوينات الشرارة التالية:

spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.warehouse.dir=s3a://ilum-files/ilum-warehouse

ويجب عليك تضمين حزمة دلتا في بيئتك. للقيام بذلك ، يمكنك استخدام صورة شرارة kubernetes مع امتدادات دلتا مثبت مسبقا:

spark.kubernetes.container.image=ilum/spark:3.5.2-delta 

أو قم بتثبيت حزمة الامتداد المطلوبة بنفسك

مثلجة

للاستفادة من Iceberg ، يجب عليك إضافة هذه التكوينات:

spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog 
spark.sql.catalog.iceberg_catalog.type=خلية
spark.sql.catalog.iceberg_catalog ، org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.type=hadoop
spark.sql.catalog.iceberg_catalog.warehouse=s3a://ilum-files/ilum-tables/iceberg/warehouse

ويجب عليك تضمين org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.6.1 قم بتجميع في بيئتك عن طريق إضافة jar الخاص به إلى الموارد أو عن طريق إضافته إلى تكوينات Spark مثل هذا:

spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.6.1 

هودي

للاستفادة من Hudi ، يجب عليك إضافة هذه التكوينات:

spark.serializer=org.apache.spark.serializer.KryoSerializer 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSpearkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog

والاستيراد org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 في بيئتك عن طريق إضافة jar الخاص به إلى الموارد أو عن طريق إضافته إلى تكوينات Spark مثل هذا:

spark.jars.packages=org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0