Streaming & CDC Ingestion with Kafka
Architecture Overview
Ilum provides a natural foundation for real-time streaming workloads. أباتشي كافكا is already a core dependency of the ilum platform, used internally for service communication. This means every ilum deployment has a Kafka cluster available that can also serve as the streaming backbone for your data pipelines.
The streaming architecture follows a three-layer pattern:
- كافكا acts as the durable, distributed message bus. Data producers (applications, CDC connectors, IoT devices) publish events to Kafka topics.
- Spark Structured Streamingأو Apache Flink runs as a long-lived ilum job that continuously reads from Kafka topics, transforms the data, and writes results downstream.
- Iceberg or Delta Lake serves as the sink table format, providing ACID transactions, schema evolution, and time-travel queries on top of object storage (S3, MinIO, HDFS).
This combination delivers low-latency, exactly-once data ingestion pipelines that are fully managed through the ilum platform.
المتطلبات المسبقه
Before building a streaming pipeline, ensure the following components are in place:
- An ilum cluster with Kafka enabled (this is the default configuration).
- Iceberg or Delta Lake table format configured in your catalog. You can verify table availability through the عارض SQL .
- A Kafka topic populated with data (or a producer actively writing to one).
- Familiarity with submitting jobs through ilum. See Run a Simple Spark Job for an introduction.
Connecting Spark Jobs to Kafka
Spark Structured Streaming reads from Kafka using the كافكا format. You must include the Kafka connector package when submitting your job.
Required Spark Package
Add the following Maven coordinate to your job's Spark Packages configuration in the ilum UI:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
When submitting through the ilum UI, add this under the موارد tab in the Spark Packages field. Ilum will resolve and distribute the dependency automatically.
Reading from Kafka
The core pattern for connecting to a Kafka topic uses readStream with the كافكا format.
PySpark:
مدافع = (
شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. خيار ( "subscribe", "events")
. خيار ( "startingOffsets", "earliest")
. load( )
)
# Kafka messages are binary key/value pairs. Cast to string for processing.
parsed = مدافع . selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"الإزاحة" ,
"timestamp"
)
Scala:
valمدافع = شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. خيار ( "subscribe", "events")
. خيار ( "startingOffsets", "earliest")
. load( )
val parsed = مدافع . selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"الإزاحة" ,
"timestamp"
)
The default Kafka bootstrap server within an ilum cluster is ilum-kafka:9092. If your Kafka deployment uses a different service name or port, adjust accordingly.
Authentication and Security
For Kafka clusters configured with SASL or TLS, add the corresponding properties to your Spark configuration:
مدافع = (
شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9093")
. خيار ( "subscribe", "events")
. خيار ( "kafka.security.protocol", "SASL_SSL")
. خيار ( "kafka.sasl.mechanism", "SCRAM-SHA-512")
. خيار ( "kafka.sasl.jaas.config",
'org.apache.kafka.common.security.scram.ScramLoginModule required '
'username="user" password="password";')
. خيار ( "kafka.ssl.truststore.location", "/opt/spark/truststore.jks")
. خيار ( "kafka.ssl.truststore.password", "changeit")
. load( )
)
Avoid hardcoding credentials in job code. Use Kubernetes secrets mounted as files or environment variables, and reference them in your Spark configuration.
Exactly-Once Semantics
Structured Streaming achieves exactly-once processing guarantees through checkpointingو idempotent writes.
Checkpointing Configuration
Checkpoints track the processing state, including Kafka offsets, so that a restarted job resumes from where it left off without data loss or duplication. Store checkpoints on durable object storage.
استفسار = (
parsed. writeStream
. format( "iceberg")
. outputMode( "append")
. خيار ( "checkpointLocation", "s3a://my-bucket/checkpoints/events-pipeline")
. toTable( "spark_catalog.default.events")
)
Each streaming query must have a unique checkpoint location. Reusing a checkpoint path across different queries will cause data corruption.
Idempotent Writes to Iceberg
Iceberg tables support atomic commits, which means each micro-batch is written as a single transaction. If a batch fails partway through, the partial write is rolled back. Combined with checkpointing, this provides end-to-end exactly-once delivery.
Failure Recovery
When a streaming job fails and is restarted (manually or via ilum's Max Retries setting):
- Spark reads the latest checkpoint to determine the last successfully committed Kafka offset.
- Processing resumes from the next offset, skipping already-committed data.
- No manual intervention or offset management is required.
جبر Max Retries on your ilum job to a reasonable value (e.g., 3-5) so that transient failures are automatically recovered without operator intervention.
CDC Patterns
Change Data Capture (CDC) pipelines replicate changes from operational databases (PostgreSQL, MySQL, MongoDB) into your lakehouse. The typical architecture is:
Source DB --> Debezium --> Kafka --> Structured Streaming --> Iceberg/Delta
Debezium CDC Source
Debezium is an open-source CDC platform that captures row-level changes from databases and publishes them to Kafka topics. A typical Debezium deployment runs as a Kafka Connect connector alongside your ilum Kafka cluster.
Key Debezium configuration properties:
connector.class= io.debezium.connector.postgresql.PostgresConnector
database.hostname= postgres-host
database.port= 5432
database.user= debezium
database.password= سر
database.dbname= ميد بي
database.server.name= ميد بي
table.include.list= public.orders,public.customers
topic.prefix= cdc
This produces Kafka topics like cdc.public.ordersو cdc.public.customers, each containing insert, update, and delete events.
Upsert with MERGE INTO
CDC events include inserts, updates, and deletes. To apply these changes to an Iceberg or Delta table, use the foreachBatch sink with a MERGE INTO statement.
PySpark example:
مواطنه upsert_to_iceberg( batch_df, batch_id) :
# Register the micro-batch as a temporary view
batch_df. createOrReplaceTempView ( "cdc_batch")
batch_df. sparkSession. SQL ( """
MERGE INTO spark_catalog.default.orders AS target
USING cdc_batch AS source
ON target.id = source.id
WHEN MATCHED AND source.op = 'd' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND source.op != 'd' THEN INSERT *
""")
استفسار = (
cdc_stream. writeStream
. foreachBatch( upsert_to_iceberg)
. خيار ( "checkpointLocation", "s3a://my-bucket/checkpoints/orders-cdc")
. start( )
)
ال op field in Debezium events indicates the operation type: c (create), u (update), d (delete), and r (read/snapshot). Adjust the MERGE logic based on your Debezium envelope format.
Example: End-to-End Streaming Pipeline
The following complete example reads JSON events from a Kafka topic, parses them, and writes them to an Iceberg table.
Complete PySpark Code
من بايسبارك . SQL استورد جلسة سبارك
من بايسبارك . SQL . functions استورد from_json, col
من بايسبارك . SQL . types استورد StructType, StructField, StringType, DoubleType, TimestampType
شراره = جلسة سبارك . بان\
. appName ( "streaming-ingestion-pipeline") \
. getOrCreate ( )
# Define the expected JSON schema
مخطط = StructType( [
StructField( "event_id", StringType( ) , صحيح ) ,
StructField( "user_id", StringType( ) , صحيح ) ,
StructField( "action", StringType( ) , صحيح ) ,
StructField( "amount", DoubleType( ) , صحيح ) ,
StructField( "event_time", TimestampType( ) , صحيح ) ,
] )
# Read from Kafka
raw_stream = (
شراره . readStream
. format( "kafka")
. خيار ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. خيار ( "subscribe", "user-events")
. خيار ( "startingOffsets", "earliest")
. load( )
)
# Parse JSON values
parsed_stream = (
raw_stream
. selectExpr( "CAST(value AS STRING) AS json_str")
. select( from_json( col( "json_str") , مخطط ) . alias( "data") )
. select( "data.*")
. راووق ( col( "event_id") . isNotNull( ) )
)
# Write to Iceberg table
استفسار = (
parsed_stream. writeStream
. format( "iceberg")
. outputMode( "append")
. خيار ( "checkpointLocation", "s3a://my-bucket/checkpoints/user-events")
. toTable( "spark_catalog.default.user_events")
)
استفسار . awaitTermination( )
Submitting as an Ilum Job
- Save the script above as
streaming_pipeline.py. - In the ilum UI, create a new job with Job Typeاضبط على
Spark Jobو اللغة اضبط علىبايثون. - Upload
streaming_pipeline.pyunder the موارد التبويب. - Add the Kafka connector package under Spark Packages:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 - Under the تكوين tab, add the following Spark properties for Iceberg support:
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type = hive - Submit the job. For detailed submission steps, see Run a Simple Spark Job.
Monitoring Streaming Jobs
Streaming jobs run continuously, so monitoring is essential:
- واجهة مستخدم Ilum : The job will appear with a
RUNNINGstatus. Use the سجلات tab to monitor micro-batch progress and throughput. - واجهة مستخدم Spark : Access the Spark UI through ilum to view the Structured Streaming tab, which shows input rate, processing rate, and batch duration metrics.
- Max Retries: Configure automatic restart on failure to maintain pipeline uptime.
For long-running streaming jobs, allocate sufficient driver and executor memory to handle state accumulation. Monitor memory usage through the ilum UI and adjust resources as needed.
شجاع
Apache Flink is supported as a streaming engine in ilum, complementing Spark Structured Streaming. Flink provides true event-at-a-time processing with lower latency and is well suited for complex event processing (CEP), session windowing, and use cases where sub-second response times are critical.
Flink on ilum
Ilum deploys Flink jobs on Kubernetes using the Flink Kubernetes Operator. Flink applications are submitted through the ilum UI or REST API just like Spark jobs. Ilum handles pod lifecycle management, checkpoint storage configuration, and integration with the shared Kafka cluster and catalog layer.
Key architectural points:
- Job Managerو Task Manager pods are orchestrated by Kubernetes, with resource allocation managed through ilum's cluster configuration.
- Flink jobs connect to the same كافكا cluster used by ilum internally (
ilum-kafka:9092by default). - Flink integrates with the Hive Metastore و مثلجة catalog, allowing Flink and Spark to read and write the same tables.
- Checkpoints and savepoints are stored on the ilum default storage (S3/MinIO/HDFS), enabling job recovery and upgrades without data loss.
Flink SQL: Kafka to Iceberg
Flink SQL provides a declarative way to build streaming pipelines. The following example reads JSON events from a Kafka topic and writes them to an Iceberg table.
-- Register the Kafka source table
خلق جدول kafka_events (
event_id STRING,
user_id STRING,
actionخيط ,
مبلغ DOUBLE,
event_time الطابع الزمني ( 3 ) ,
WATERMARK FOR event_time مثل event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'ilum-kafka:9092',
'properties.group.id' = 'flink-ingestion',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
) ;
-- Register the Iceberg sink table
خلق جدول iceberg_events (
event_id STRING,
user_id STRING,
actionخيط ,
مبلغ DOUBLE,
event_time الطابع الزمني ( 3 )
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://ilum-hive-metastore:9083',
'warehouse' = 's3a://ilum-data/warehouse'
) ;
-- Continuous streaming insert
أدخل إلى iceberg_events
اختار event_id, user_id, action, مبلغ , event_time
من kafka_events
أين event_id هل لا صِفْر ;
Flink SQL jobs can be submitted as SQL scripts through the ilum UI. Set the Job Typeل شجاع and provide the SQL statements in the code editor.
Flink DataStream API
For more complex processing logic, use the Flink DataStream API. The following Java example reads from Kafka, applies a tumbling window aggregation, and writes the results to an Iceberg table.
استورد org. apache. flink. واجهة برمجة التطبيقات . common. eventtime. WatermarkStrategy;
استورد org. apache. flink. connector. كافكا . source. KafkaSource;
استورد org. apache. flink. connector. كافكا . source. enumerator. initializer. OffsetsInitializer;
استورد org. apache. flink. streaming. واجهة برمجة التطبيقات . environment. StreamExecutionEnvironment;
استورد org. apache. flink. streaming. واجهة برمجة التطبيقات . windowing. assigners. TumblingEventTimeWindows;
استورد org. apache. flink. streaming. واجهة برمجة التطبيقات . windowing. time. Time;
StreamExecutionEnvironmentالحياه الفطريه = StreamExecutionEnvironment. getExecutionEnvironment( ) ;
الحياه الفطريه . enableCheckpointing( 60000 ) ; // checkpoint every 60 seconds
KafkaSource< String> source = KafkaSource. < String> builder( )
. setBootstrapServers( "ilum-kafka:9092")
. setTopics( "user-events")
. setGroupId( "flink-datastream")
. setStartingOffsets( OffsetsInitializer. earliest( ) )
. setValueOnlyDeserializer( new SimpleStringSchema( ) )
. build( ) ;
الحياه الفطريه . fromSource( source, WatermarkStrategy. forBoundedOutOfOrderness( Duration. ofSeconds( 5 ) ) , "Kafka Source")
. خريطة ( json -> parseEvent( json) ) // custom parsing logic
. keyBy( event -> event. getUserId( ) )
. window( TumblingEventTimeWindows. من ( Time. minutes( 5 ) ) )
. sum( "amount")
. sinkTo( icebergSink) ; // Iceberg sink connector
الحياه الفطريه . أعدم ( "flink-aggregation-pipeline") ;
Flink CDC Connector
Flink provides a native CDC connector that captures database changes directly without requiring a separate Debezium deployment. This simplifies the CDC pipeline from a three-component architecture to two.
Source DB --> Flink CDC Connector --> Iceberg/Delta
Flink SQL example with PostgreSQL CDC:
-- Register a CDC source directly from PostgreSQL
خلق جدول orders_cdc (
معرف الباحث ,
customer_id الباحث ,
product STRING,
مبلغ DOUBLE,
updated_at الطابع الزمني ( 3 ) ,
PRIMARY KEY ( معرف ) لا ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres-host',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'mydb',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_orders_slot'
) ;
-- Write changes directly to Iceberg with upsert semantics
أدخل إلى iceberg_orders
اختار معرف , customer_id, product, مبلغ , updated_at
من orders_cdc;
Flink CDC connectors support PostgreSQL, MySQL, MongoDB, Oracle, and SQL Server. Each connector requires the corresponding Flink CDC library to be included in the job dependencies.
Exactly-Once Guarantees with Flink
Flink achieves exactly-once semantics through its distributed snapshot (Chandy-Lamport) checkpointing algorithm:
- Checkpoint barriers flow through the data stream, ensuring consistent state snapshots across all operators.
- State backends (RocksDB or heap-based) persist operator state to durable storage (S3/MinIO) on each checkpoint.
- Two-phase commit sinks (Kafka, Iceberg) ensure that output records are committed atomically with the checkpoint.
Configure checkpointing in your Flink job:
الحياه الفطريه . enableCheckpointing( 60000 ) ; // checkpoint interval (ms)
الحياه الفطريه . getCheckpointConfig( ) . setCheckpointingMode( CheckpointingMode. EXACTLY_ONCE) ;
الحياه الفطريه . getCheckpointConfig( ) . setMinPauseBetweenCheckpoints( 30000) ;
الحياه الفطريه . getCheckpointConfig( ) . setCheckpointStorage( "s3://ilum-data/flink-checkpoints") ;
For Flink SQL jobs, set checkpointing via Flink configuration properties:
execution.checkpointing.interval= 60s
execution.checkpointing.mode= EXACTLY_ONCE
state.checkpoints.dir= s3://ilum-data/flink-checkpoints
When to Use Flink vs Spark Structured Streaming
Both engines are production-ready for streaming workloads on ilum. Choose based on your requirements:
| الجانب | Spark Structured Streaming | Apache Flink |
|---|---|---|
| Processing Model | Micro-batch (default) or continuous | True event-at-a-time |
| Latency | Seconds (micro-batch) | Milliseconds |
| State Management | Limited by micro-batch boundaries | Fine-grained, key-partitioned state |
| Windowing | Tumbling, sliding, session | Tumbling, sliding, session, custom |
| Complex Event Processing | أساسي | Advanced (CEP library) |
| CDC | Via Debezium + Kafka | Native CDC connectors |
| Batch + Stream Unification | Strong (same DataFrame API) | Strong (same DataStream/Table API) |
| Catalog Integration | Hive, Nessie, Unity, Iceberg, Delta, Hudi | Hive, Iceberg |
| أفضل ل | ETL pipelines, unified batch/stream, existing Spark codebases | Low-latency event processing, CEP, native CDC |
General guidance:
- استخدام Spark Structured Streaming when you have existing Spark batch jobs and want a unified batch/streaming codebase, or when micro-batch latency (seconds) is acceptable.
- استخدام شجاع when you need sub-second latency, complex event processing patterns, native CDC without Debezium, or fine-grained state management with event-time processing.