جداول Ilum
Ilum Tables هو تنسيق Spark يعمل كغلاف لتنسيقات بيانات Delta و Iceberg و Hudi. يمكنك من الوصول إلى مجموعات البيانات وإنشائها بهذه التنسيقات باستخدام واجهة موحدة ، مما يؤدي إلى تصميم كود أكثر مرونة.
استيراد جداول Ilum
للاستفادة من جداول Ilum يجب أن تشمل ILUM-spark-format حزمة في الخاص بك وظائف إيلوم . يمكنك القيام بذلك عن طريق إضافة هذا التكوين:
spark.jars.packages=cloud.ilum:ilum-spark-format:6.1.0
أو عن طريق إضافة الحزمة كجرة منفصلة إلى موارد Ilum Job الخاصة بك
كيفية تضمين الحزمة في تطبيق سكالا
- باستخدام sbt:
libraryDependencies += "cloud.ilum" % "ilum-spark-format" % "6.1.0"
- باستخدام المخضرم:
< تبعية >
< معرف المجموعة > كلاود.إيلوم </ معرف المجموعة >
< معرف القطعة الأثرية > ILUM-spark-format </ معرف القطعة الأثرية >
< الإصدار > 6.1.0 </ الإصدار >
</ تبعية >
- باستخدام gradle:
مجموعة التنفيذ: "cloud.ilum"، الاسم: "ilum-spark-format"، الإصدار: "6.1.0"
كيفية استخدامه؟
- قراءة وكتابة البيانات مع تحديد ' تنسيق ILUM '
- قراءة البيانات وكتابتها باستخدام طريقة إيلوم
للقيام بذلك ، يجب عليك استيرادها على النحو التالي:
استورد سحابة . إيلوم . implicits. {
IlumDataFrameReader,
IlumDataFrameWriter,
IlumDataFrameWriterV2,
IlumDataStreamWriter,
IlumDataStreamReader
}
- قراءة البيانات وكتابتها عن طريق التكوين المسبق كتالوج (writeTo ، read.table)
قراءة
val filepath = "s3a://ilum-files/ilum-tables/table"
val tableFormat = Some( "delta")
بدون طريقة ILUM
val mydf = شراره . قرأ . format( "ilum") . خيار ( "tableFormat", tableFormat) . load( filepath)
مع طريقة إيلوم
val mydf2 = sparkSession. قرأ . إيلوم ( filePath, tableFormat)
كتابه
val filepath = "s3a://ilum-files/ilum-tables/table"
val tableFormat = "delta"
valبيانات = Seq(
( 1 , "Alice") ,
( 2 , "Bob") ,
( 3 , "Cathy")
)
valمدافع = شراره . createDataFrame ( بيانات ) . toDF( "المعرف" , "الاسم" )
باستخدام DataframeWriterV1
يمكنك استخدام بناء الجملة مثل هذا
مدافع . يكتب . format( "ilum") . خيار ( "tableFormat", tableFormat) . save( filepath)
أو يمكنك استخدام الدالة ILUM
مدافع . يكتب . إيلوم ( filepath + "/1", format)
استخدام DataframeWriterV2 مع كتالوج Delta الذي تم تكوينه مسبقا
valكتالوج = "catalog"
valجدول = "tablename"
مدافع . writeTo( s " ${كتالوج } . ${جدول } " ) . إيلوم ( format, None ) . createOrReplace( )
تدفق
بدون طرق إيلوم
val filepath = "s3a://ilum-files/ilum-tables/streaming"
val tableFormat = "delta"
val input = شراره . readStream
. format( "ilum")
. خيار ( "tableFormat", tableFormat)
. load( filepath)
valاستفسار = input. writeStream
. outputMode( "append")
. format( "ilum")
. خيار ( "tableFormat", tableFormat)
. خيار ( "path", filepath + "_copy")
. خيار ( "checkpointLocation", filepath + "_checkpoint")
. start( )
استفسار . awaitTermination( )
مع طرق إيلوم:
val filePath = s "s3a://ilum-files/ilum-tables/smth"
val tableFormat = Some( "delta")
valمدافع = sparkSession. readStream. إيلوم ( filePath, tableFormat)
valاستفسار = مدافع . writeStream
. خيار ( "checkpointLocation", filePath + "_checkpoint")
. إيلوم ( filePath+ "_copy", tableFormat)
استفسار . awaitTermination( )
تكوين تنسيقات البيانات
الدلتا
للاستفادة من Delta ، يجب عليك استخدام تكوينات الشرارة التالية:
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.warehouse.dir=s3a://ilum-files/ilum-warehouse
ويجب عليك تضمين حزمة دلتا في بيئتك. للقيام بذلك ، يمكنك استخدام صورة شرارة kubernetes مع امتدادات دلتا مثبت مسبقا:
spark.kubernetes.container.image=ilum/spark:3.5.2-delta
أو قم بتثبيت حزمة الامتداد المطلوبة بنفسك
مثلجة
للاستفادة من Iceberg ، يجب عليك إضافة هذه التكوينات:
spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.iceberg_catalog.type=خلية
spark.sql.catalog.iceberg_catalog ، org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.type=hadoop
spark.sql.catalog.iceberg_catalog.warehouse=s3a://ilum-files/ilum-tables/iceberg/warehouse
ويجب عليك تضمين org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.6.1 قم بتجميع في بيئتك عن طريق إضافة jar الخاص به إلى الموارد أو عن طريق إضافته إلى تكوينات Spark مثل
هذا:
spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.6.1
هودي
للاستفادة من Hudi ، يجب عليك إضافة هذه التكوينات:
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSpearkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
والاستيراد org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 في بيئتك عن طريق إضافة jar الخاص به إلى الموارد أو عن طريق إضافته إلى تكوينات Spark مثل هذا:
spark.jars.packages=org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0