تخطي إلى المحتوى الرئيسي

Streaming & CDC Ingestion with Kafka

Architecture Overview

Ilum provides a natural foundation for real-time streaming workloads. أباتشي كافكا is already a core dependency of the ilum platform, used internally for service communication. This means every ilum deployment has a Kafka cluster available that can also serve as the streaming backbone for your data pipelines.

The streaming architecture follows a three-layer pattern:

  1. كافكا acts as the durable, distributed message bus. Data producers (applications, CDC connectors, IoT devices) publish events to Kafka topics.
  2. Spark Structured Streamingأو Apache Flink runs as a long-lived ilum job that continuously reads from Kafka topics, transforms the data, and writes results downstream.
  3. Iceberg or Delta Lake serves as the sink table format, providing ACID transactions, schema evolution, and time-travel queries on top of object storage (S3, MinIO, HDFS).

This combination delivers low-latency, exactly-once data ingestion pipelines that are fully managed through the ilum platform.

المتطلبات المسبقه

Before building a streaming pipeline, ensure the following components are in place:

  • An ilum cluster with Kafka enabled (this is the default configuration).
  • Iceberg or Delta Lake table format configured in your catalog. You can verify table availability through the عارض SQL .
  • A Kafka topic populated with data (or a producer actively writing to one).
  • Familiarity with submitting jobs through ilum. See Run a Simple Spark Job for an introduction.

Connecting Spark Jobs to Kafka

Spark Structured Streaming reads from Kafka using the كافكا format. You must include the Kafka connector package when submitting your job.

Required Spark Package

Add the following Maven coordinate to your job's Spark Packages configuration in the ilum UI:

org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
بقشيش

When submitting through the ilum UI, add this under the موارد tab in the Spark Packages field. Ilum will resolve and distribute the dependency automatically.

Reading from Kafka

The core pattern for connecting to a Kafka topic uses readStream with the كافكا format.

PySpark:

مدافع =  ( 
شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. خيار ( "subscribe", "events")
. خيار ( "startingOffsets", "earliest")
. load( )
)

# Kafka messages are binary key/value pairs. Cast to string for processing.
parsed = مدافع . selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"الإزاحة" ,
"timestamp"
)

Scala:

valمدافع = شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. خيار ( "subscribe", "events")
. خيار ( "startingOffsets", "earliest")
. load( )

val parsed = مدافع . selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"الإزاحة" ,
"timestamp"
)
ملاحظه

The default Kafka bootstrap server within an ilum cluster is ilum-kafka:9092. If your Kafka deployment uses a different service name or port, adjust accordingly.

Authentication and Security

For Kafka clusters configured with SASL or TLS, add the corresponding properties to your Spark configuration:

مدافع =  ( 
شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9093")
. خيار ( "subscribe", "events")
. خيار ( "kafka.security.protocol", "SASL_SSL")
. خيار ( "kafka.sasl.mechanism", "SCRAM-SHA-512")
. خيار ( "kafka.sasl.jaas.config",
'org.apache.kafka.common.security.scram.ScramLoginModule required '
'username="user" password="password";')
. خيار ( "kafka.ssl.truststore.location", "/opt/spark/truststore.jks")
. خيار ( "kafka.ssl.truststore.password", "changeit")
. load( )
)
تحذير

Avoid hardcoding credentials in job code. Use Kubernetes secrets mounted as files or environment variables, and reference them in your Spark configuration.

Exactly-Once Semantics

Structured Streaming achieves exactly-once processing guarantees through checkpointingو idempotent writes.

Checkpointing Configuration

Checkpoints track the processing state, including Kafka offsets, so that a restarted job resumes from where it left off without data loss or duplication. Store checkpoints on durable object storage.

استفسار =  ( 
parsed. writeStream
. format( "iceberg")
. outputMode( "append")
. خيار ( "checkpointLocation", "s3a://my-bucket/checkpoints/events-pipeline")
. toTable( "spark_catalog.default.events")
)
تحذير

Each streaming query must have a unique checkpoint location. Reusing a checkpoint path across different queries will cause data corruption.

Idempotent Writes to Iceberg

Iceberg tables support atomic commits, which means each micro-batch is written as a single transaction. If a batch fails partway through, the partial write is rolled back. Combined with checkpointing, this provides end-to-end exactly-once delivery.

Failure Recovery

When a streaming job fails and is restarted (manually or via ilum's Max Retries setting):

  1. Spark reads the latest checkpoint to determine the last successfully committed Kafka offset.
  2. Processing resumes from the next offset, skipping already-committed data.
  3. No manual intervention or offset management is required.
بقشيش

جبر Max Retries on your ilum job to a reasonable value (e.g., 3-5) so that transient failures are automatically recovered without operator intervention.

CDC Patterns

Change Data Capture (CDC) pipelines replicate changes from operational databases (PostgreSQL, MySQL, MongoDB) into your lakehouse. The typical architecture is:

Source DB --> Debezium --> Kafka --> Structured Streaming --> Iceberg/Delta

Debezium CDC Source

Debezium is an open-source CDC platform that captures row-level changes from databases and publishes them to Kafka topics. A typical Debezium deployment runs as a Kafka Connect connector alongside your ilum Kafka cluster.

Key Debezium configuration properties:

connector.class= io.debezium.connector.postgresql.PostgresConnector
database.hostname= postgres-host
database.port= 5432
database.user= debezium
database.password= سر
database.dbname= ميد بي
database.server.name= ميد بي
table.include.list= public.orders,public.customers
topic.prefix= cdc

This produces Kafka topics like cdc.public.ordersو cdc.public.customers, each containing insert, update, and delete events.

Upsert with MERGE INTO

CDC events include inserts, updates, and deletes. To apply these changes to an Iceberg or Delta table, use the foreachBatch sink with a MERGE INTO statement.

PySpark example:

مواطنه  upsert_to_iceberg( batch_df,  batch_id) : 
# Register the micro-batch as a temporary view
batch_df. createOrReplaceTempView ( "cdc_batch")

batch_df. sparkSession. SQL ( """
MERGE INTO spark_catalog.default.orders AS target
USING cdc_batch AS source
ON target.id = source.id
WHEN MATCHED AND source.op = 'd' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND source.op != 'd' THEN INSERT *
""")

استفسار = (
cdc_stream. writeStream
. foreachBatch( upsert_to_iceberg)
. خيار ( "checkpointLocation", "s3a://my-bucket/checkpoints/orders-cdc")
. start( )
)
ملاحظه

ال op field in Debezium events indicates the operation type: c (create), u (update), d (delete), and r (read/snapshot). Adjust the MERGE logic based on your Debezium envelope format.

Example: End-to-End Streaming Pipeline

The following complete example reads JSON events from a Kafka topic, parses them, and writes them to an Iceberg table.

Complete PySpark Code

من بايسبارك . SQL استورد جلسة سبارك 
من بايسبارك . SQL . functions استورد from_json, col
من بايسبارك . SQL . types استورد StructType, StructField, StringType, DoubleType, TimestampType

شراره = جلسة سبارك . بان\
. appName ( "streaming-ingestion-pipeline") \
. getOrCreate ( )

# Define the expected JSON schema
مخطط = StructType( [
StructField( "event_id", StringType( ) , صحيح ) ,
StructField( "user_id", StringType( ) , صحيح ) ,
StructField( "action", StringType( ) , صحيح ) ,
StructField( "amount", DoubleType( ) , صحيح ) ,
StructField( "event_time", TimestampType( ) , صحيح ) ,
] )

# Read from Kafka
raw_stream = (
شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. خيار ( "subscribe", "user-events")
. خيار ( "startingOffsets", "earliest")
. load( )
)

# Parse JSON values
parsed_stream = (
raw_stream
. selectExpr( "CAST(value AS STRING) AS json_str")
. select( from_json( col( "json_str") , مخطط ) . alias( "data") )
. select( "data.*")
. راووق ( col( "event_id") . isNotNull( ) )
)

# Write to Iceberg table
استفسار = (
parsed_stream. writeStream
. format( "iceberg")
. outputMode( "append")
. خيار ( "checkpointLocation", "s3a://my-bucket/checkpoints/user-events")
. toTable( "spark_catalog.default.user_events")
)

استفسار . awaitTermination( )

Submitting as an Ilum Job

  1. Save the script above as streaming_pipeline.py.
  2. In the ilum UI, create a new job with Job Typeاضبط على Spark Jobو اللغة اضبط على بايثون .
  3. Upload streaming_pipeline.py under the موارد التبويب.
  4. Add the Kafka connector package under Spark Packages:
    org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
  5. Under the تكوين tab, add the following Spark properties for Iceberg support:
    spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.spark_catalog.type = hive
  6. Submit the job. For detailed submission steps, see Run a Simple Spark Job.

Monitoring Streaming Jobs

Streaming jobs run continuously, so monitoring is essential:

  • واجهة مستخدم Ilum : The job will appear with a RUNNING status. Use the سجلات tab to monitor micro-batch progress and throughput.
  • واجهة مستخدم Spark : Access the Spark UI through ilum to view the Structured Streaming tab, which shows input rate, processing rate, and batch duration metrics.
  • Max Retries: Configure automatic restart on failure to maintain pipeline uptime.
بقشيش

For long-running streaming jobs, allocate sufficient driver and executor memory to handle state accumulation. Monitor memory usage through the ilum UI and adjust resources as needed.

Apache Flink is supported as a streaming engine in ilum, complementing Spark Structured Streaming. Flink provides true event-at-a-time processing with lower latency and is well suited for complex event processing (CEP), session windowing, and use cases where sub-second response times are critical.

Ilum deploys Flink jobs on Kubernetes using the Flink Kubernetes Operator. Flink applications are submitted through the ilum UI or REST API just like Spark jobs. Ilum handles pod lifecycle management, checkpoint storage configuration, and integration with the shared Kafka cluster and catalog layer.

Key architectural points:

  • Job Managerو Task Manager pods are orchestrated by Kubernetes, with resource allocation managed through ilum's cluster configuration.
  • Flink jobs connect to the same كافكا cluster used by ilum internally (ilum-kafka:9092 by default).
  • Flink integrates with the Hive Metastore و مثلجة catalog, allowing Flink and Spark to read and write the same tables.
  • Checkpoints and savepoints are stored on the ilum default storage (S3/MinIO/HDFS), enabling job recovery and upgrades without data loss.

Flink SQL provides a declarative way to build streaming pipelines. The following example reads JSON events from a Kafka topic and writes them to an Iceberg table.

-- Register the Kafka source table
خلق جدول kafka_events (
event_id STRING,
user_id STRING,
actionخيط ,
مبلغ DOUBLE,
event_time الطابع الزمني ( 3 ) ,
WATERMARK FOR event_time مثل event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'ilum-kafka:9092',
'properties.group.id' = 'flink-ingestion',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
) ;

-- Register the Iceberg sink table
خلق جدول iceberg_events (
event_id STRING,
user_id STRING,
actionخيط ,
مبلغ DOUBLE,
event_time الطابع الزمني ( 3 )
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://ilum-hive-metastore:9083',
'warehouse' = 's3a://ilum-data/warehouse'
) ;

-- Continuous streaming insert
أدخل إلى iceberg_events
اختار event_id, user_id, action, مبلغ , event_time
من kafka_events
أين event_id هل لا صِفْر ;
بقشيش

Flink SQL jobs can be submitted as SQL scripts through the ilum UI. Set the Job Typeل شجاع and provide the SQL statements in the code editor.

For more complex processing logic, use the Flink DataStream API. The following Java example reads from Kafka, applies a tumbling window aggregation, and writes the results to an Iceberg table.

استورد  org. apache. flink. واجهة برمجة التطبيقات . common. eventtime. WatermarkStrategy; 
استورد org. apache. flink. connector. كافكا . source. KafkaSource;
استورد org. apache. flink. connector. كافكا . source. enumerator. initializer. OffsetsInitializer;
استورد org. apache. flink. streaming. واجهة برمجة التطبيقات . environment. StreamExecutionEnvironment;
استورد org. apache. flink. streaming. واجهة برمجة التطبيقات . windowing. assigners. TumblingEventTimeWindows;
استورد org. apache. flink. streaming. واجهة برمجة التطبيقات . windowing. time. Time;

StreamExecutionEnvironmentالحياه الفطريه = StreamExecutionEnvironment. getExecutionEnvironment( ) ;
الحياه الفطريه . enableCheckpointing( 60000 ) ; // checkpoint every 60 seconds

KafkaSource< String> source = KafkaSource. < String> builder( )
. setBootstrapServers( "ilum-kafka:9092")
. setTopics( "user-events")
. setGroupId( "flink-datastream")
. setStartingOffsets( OffsetsInitializer. earliest( ) )
. setValueOnlyDeserializer( new SimpleStringSchema( ) )
. build( ) ;

الحياه الفطريه . fromSource( source, WatermarkStrategy. forBoundedOutOfOrderness( Duration. ofSeconds( 5 ) ) , "Kafka Source")
. خريطة ( json -> parseEvent( json) ) // custom parsing logic
. keyBy( event -> event. getUserId( ) )
. window( TumblingEventTimeWindows. من ( Time. minutes( 5 ) ) )
. sum( "amount")
. sinkTo( icebergSink) ; // Iceberg sink connector

الحياه الفطريه . أعدم ( "flink-aggregation-pipeline") ;

Flink provides a native CDC connector that captures database changes directly without requiring a separate Debezium deployment. This simplifies the CDC pipeline from a three-component architecture to two.

Source DB --> Flink CDC Connector --> Iceberg/Delta

Flink SQL example with PostgreSQL CDC:

-- Register a CDC source directly from PostgreSQL
خلق جدول orders_cdc (
معرف الباحث ,
customer_id الباحث ,
product STRING,
مبلغ DOUBLE,
updated_at الطابع الزمني ( 3 ) ,
PRIMARY KEY ( معرف ) لا ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres-host',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'mydb',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_orders_slot'
) ;

-- Write changes directly to Iceberg with upsert semantics
أدخل إلى iceberg_orders
اختار معرف , customer_id, product, مبلغ , updated_at
من orders_cdc;
ملاحظه

Flink CDC connectors support PostgreSQL, MySQL, MongoDB, Oracle, and SQL Server. Each connector requires the corresponding Flink CDC library to be included in the job dependencies.

Flink achieves exactly-once semantics through its distributed snapshot (Chandy-Lamport) checkpointing algorithm:

  1. Checkpoint barriers flow through the data stream, ensuring consistent state snapshots across all operators.
  2. State backends (RocksDB or heap-based) persist operator state to durable storage (S3/MinIO) on each checkpoint.
  3. Two-phase commit sinks (Kafka, Iceberg) ensure that output records are committed atomically with the checkpoint.

Configure checkpointing in your Flink job:

الحياه الفطريه . enableCheckpointing( 60000 ) ;                           // checkpoint interval (ms)
الحياه الفطريه . getCheckpointConfig( ) . setCheckpointingMode( CheckpointingMode. EXACTLY_ONCE) ;
الحياه الفطريه . getCheckpointConfig( ) . setMinPauseBetweenCheckpoints( 30000) ;
الحياه الفطريه . getCheckpointConfig( ) . setCheckpointStorage( "s3://ilum-data/flink-checkpoints") ;
بقشيش

For Flink SQL jobs, set checkpointing via Flink configuration properties:

execution.checkpointing.interval= 60s
execution.checkpointing.mode= EXACTLY_ONCE
state.checkpoints.dir= s3://ilum-data/flink-checkpoints

Both engines are production-ready for streaming workloads on ilum. Choose based on your requirements:

الجانب Spark Structured StreamingApache Flink
Processing ModelMicro-batch (default) or continuousTrue event-at-a-time
LatencySeconds (micro-batch)Milliseconds
State ManagementLimited by micro-batch boundariesFine-grained, key-partitioned state
WindowingTumbling, sliding, sessionTumbling, sliding, session, custom
Complex Event Processingأساسي Advanced (CEP library)
CDCVia Debezium + KafkaNative CDC connectors
Batch + Stream UnificationStrong (same DataFrame API)Strong (same DataStream/Table API)
Catalog IntegrationHive, Nessie, Unity, Iceberg, Delta, HudiHive, Iceberg
أفضل ل ETL pipelines, unified batch/stream, existing Spark codebasesLow-latency event processing, CEP, native CDC

General guidance:

  • استخدام Spark Structured Streaming when you have existing Spark batch jobs and want a unified batch/streaming codebase, or when micro-batch latency (seconds) is acceptable.
  • استخدام شجاع when you need sub-second latency, complex event processing patterns, native CDC without Debezium, or fine-grained state management with event-time processing.