تخطي إلى المحتوى الرئيسي

Automating Spark Jobs with GitLab CI/CD

This guide demonstrates how to build a complete CI/CD pipeline for Apache Spark applications on Kubernetes using GitLab CI/CD and Ilum. By implementing this architecture, you establish a robust DataOps practice that automates the lifecycle of your data engineering code.

الفوائد الرئيسية

  • Automated Deployments: Eliminate manual شرارة تقديم commands.
  • التحكم في الإصدار: Ensure every job running in your cluster corresponds to a specific Git commit.
  • Consistency: Automatically synchronize environment configurations (Ilum Groups) and job definitions.
  • Feedback Loop: Get immediate status reporting on job submission success or failure directly in GitLab.

المتطلبات المسبقه

  • A GitLab project (SaaS or Self-Managed).
  • Ilum API endpoint reachable from your GitLab Runner (e.g., via Ingress or internal network).
  • حليقه و jq installed in the runner image (or use an image like alpine and install them).

1. Architecture of a Spark CI/CD Pipeline

The pipeline uses the Ilum REST API to interact with the Kubernetes cluster.

  1. Check/Manage Groups: Ensures the target Ilum Group exists (and optionally recreates it to update group-level files/settings).
  2. Submit Job: Posts the Spark job definition and code to Ilum.
  3. Execute: The job runs on the Spark cluster, independent of the CI runner.

2. Automating Ilum Group Management & Environment Setup

This example pipeline stage demonstrates how to manage Ilum Groups—logical containers for interactive sessions and shared resources. It checks if a group exists, deletes it (to cleanup old state), and recreates it with fresh configuration.

The Group Service (service.py)

This Python script defines an interactive Spark job that the group will execute. It inherits from Ilum Job and provides details about a specified table.

service.py
من إيلوم . واجهة برمجة التطبيقات استورد Ilum Job 
من بايسبارك . SQL . functions استورد col, sum مثل spark_sum
من io استورد StringIO

فصل SparkInteractiveExample( Ilum Job ) :
مواطنه ركض ( ذات , شراره , التكوين ) - > str:
table_name = التكوين . حصل ( "الجدول" )
database_name = التكوين . حصل ( 'database') # optional
report_lines = [ ]

لو لا table_name :
raise ValueError( "Config must provide a 'table' key")

# Use specified database if provided
لو database_name:
شراره . كتالوج . setCurrentDatabase( database_name)
report_lines. append( f"Using database: { database_name} " )

# Check if table exists in catalog
لو table_name لا في [ t . اسم من أجل t في شراره . كتالوج . listTables( ) ] :
raise ValueError( f"Table '{ table_name } ' not found in catalog")

مدافع = شراره . جدول ( table_name )

report_lines. append( f"=== Details for table: { table_name } ===")

# Total rows
total_rows = مدافع . عد ( )
report_lines. append( f"Total rows: { total_rows} " )

# Total columns
total_columns = len( مدافع . columns)
report_lines. append( f"Total columns: { total_columns} " )

# Distinct counts per column
report_lines. append( "Distinct values per column:")
من أجل c في مدافع . columns:
distinct_count = مدافع . select( c ) . distinct( ) . عد ( )
report_lines. append( f" { c } : { distinct_count} " )

# Schema info
report_lines. append( "Schema:")
# Spark does not easily return schema as string; we can reconstruct:
من أجل f في مدافع . مخطط . fields:
report_lines. append( f" { f . اسم } : { f . dataType} " )

# Sample data
report_lines. append( "Sample data (first 5 rows):")
sample_rows = مدافع . take( 5 )
من أجل صف في sample_rows:
report_lines. append( str( صف . asDict( ) ) )

# Null counts per column
report_lines. append( "Null counts per column:")
null_counts_df = مدافع . select( [ spark_sum( col( c ) . isNull( ) . cast( "int") ) . alias( c ) من أجل c في مدافع . columns] )
null_counts = null_counts_df. collect( ) [ 0 ] . asDict( )
من أجل c , v في null_counts. items( ) :
report_lines. append( f" { c } : { v} " )

أعاد "\n". join( report_lines)

The CI Pipeline (Group Management)

This pipeline stage manages the lifecycle of the Ilum Group. It checks if the group exists, removes it if necessary to apply code updates, and recreates it with the latest service.py.

gitlab-ci-group.yml
stages: 
- manage_group

manage_group:
stage: manage_group
صورة : alpine: 3.20
before_script:
- apk add - - no- cache curl jq
script:
- echo "--- Checking Group Existence --- "
- |
GROUP_NAME="ILUM_COURSE"
API_URL="http://ilum-core.default:9888/api/v1"

# 1. Check if group exists
RESPONSE=$(curl - s "$API_URL/group")
GROUP_ID=$(echo "$RESPONSE" | jq - r ".content[ ] | select(.name==\"$GROUP_NAME\") | .id")

# 2. Delete if exists (to update code/config)
لو [ - n "$GROUP_ID" ] && [ "$GROUP_ID" != "null" ] ; then
echo "Deleting existing group $GROUP_ID... "
حليقه - s - X POST "$API_URL/group/$GROUP_ID/stop"
حليقه - s - X DELETE "$API_URL/group/$GROUP_ID"
fi

# 3. Create new group with updated service.py
echo "Creating new group... "
CREATE_RESP=$(curl - s - X POST \
- F "name=$GROUP_NAME" \
- F "clusterName=default" \
- F "language=PYTHON" \
- F "[البريد الإلكتروني محمي] " \
- F "jobConfig=spark.executor.instances=2;spark.driver.memory=2g" \
"$API_URL/group")

NEW_ID=$(echo "$CREATE_RESP" | jq - r '.groupId // empty')
لو [ - n "$NEW_ID" ] ; then
echo "Group created successfully with ID: $NEW_ID"
اخر
echo "Failed to create group. Response: $CREATE_RESP"
exit 1
fi

3. Continuous Deployment of Spark Jobs

This stage implements the Continuous Deployment (CD) logic by submitting a PySpark job to the Ilum cluster. It is configured to trigger automatically on a git push to the رئيسي branch.

The Spark Job (submit.py)

Create a file named submit.py in your repository root. This example job creates a university database with Hive tables and uses Delta Lake format.

submit.py
استورد  logging
من بايسبارك . SQL استورد جلسة سبارك

logging. basicConfig( level= logging. INFO)
logger = logging. getLogger( __اسم__ )

logger. معلومات ( """
=== Example Spark Job: Student Enrollments ===
This Spark job demonstrates a simple educational data pipeline using Hive tables.
It performs the following steps:
1. Creates a 'students' table with student information.
2. Creates a 'courses' table with available courses.
3. Creates an 'enrollments' table linking students to courses.
4. Joins the tables to calculate enrollment statistics and saves them into 'course_stats'.

All intermediate results are stored in the 'university' Hive database.
=============================================
""")

شراره = SparkSession \
. بان\
. appName ( "Spark") \
. getOrCreate ( )

logger. معلومات ( "SparkSession initialized")

شراره . SQL ( "CREATE DATABASE IF NOT EXISTS university")
logger. معلومات ( "Database 'university' ensured")

# --- Create Students Table ---
students_data = [
( 1 , "Alice", "Computer Science") ,
( 2 , "Bob", "Mathematics") ,
( 3 , "Charlie", "Physics") ,
( 4 , "Diana", "Computer Science")
]

df_students = شراره . createDataFrame ( students_data, [ "student_id", "الاسم" , "major"] )
شراره . SQL ( "DROP TABLE IF EXISTS university.students")
df_students. يكتب . format( "delta") . saveAsTable( "university.students")
logger. معلومات ( "Created table: university.students")

# --- Create Courses Table ---
courses_data = [
( 101, "Big Data") ,
( 102, "Linear Algebra") ,
( 103, "Quantum Mechanics")
]

df_courses = شراره . createDataFrame ( courses_data, [ "course_id", "course_name"] )
شراره . SQL ( "DROP TABLE IF EXISTS university.courses")
df_courses. يكتب . format( "delta") . saveAsTable( "university.courses")
logger. معلومات ( "Created table: university.courses")

# --- Create Enrollments Table ---
enrollments_data = [
( 1 , 101) , # Alice -> Big Data
( 2 , 102) , # Bob -> Linear Algebra
( 3 , 103) , # Charlie -> Quantum Mechanics
( 4 , 101) , # Diana -> Big Data
( 2 , 101) # Bob -> Big Data
]

df_enrollments = شراره . createDataFrame ( enrollments_data, [ "student_id", "course_id"] )
شراره . SQL ( "DROP TABLE IF EXISTS university.enrollments")
df_enrollments. يكتب . format( "delta") . saveAsTable( "university.enrollments")
logger. معلومات ( "Created table: university.enrollments")

# --- Join to calculate course enrollment counts ---
df_course_stats = شراره . SQL ( """
اختار
c.course_id,
c.course_name,
COUNT(e.student_id) AS total_students
FROM university.courses c
LEFT JOIN university.enrollments e ON c.course_id = e.course_id
GROUP BY c.course_id, c.course_name
""")

شراره . SQL ( "DROP TABLE IF EXISTS university.course_stats")
df_course_stats. يكتب . format( "delta") . saveAsTable( "university.course_stats")
logger. معلومات ( "Inserted final data into course_stats table")

The CI Pipeline (Job Submission)

Add the job submission stage to your .gitlab-ci.yml. This configuration uses alpine:3.20 and includes error handling for the REST API response.

gitlab-ci-job.yml
stages: 
- submit_job

# Define variables globally or per job
variables:
# In a real project, use CI/CD Variables for endpoints and tokens
ILUM_API_URL: "http://ilum-core.default:9888/api/v1"

submit_job:
stage: submit_job
صورة : alpine: 3.20
rules:
- لو : '$CI_COMMIT_BRANCH == "main"'
before_script:
- apk add - - no- cache curl jq
script:
- echo "Creating job ILUM_JOB_SUBMIT with submit.py... "
- |
# Submit job request and capture HTTP status code + body
# Note: jobClass is mandatory.
# - For a script, use the filename without .py (e.g., "submit").
# - For no specific class (main entry point), use an empty string.
# - "filename.classname" is only for interactive jobs or packages.
RESPONSE=$(curl -s -X POST \
-F "name=gitlab_pipeline_job" \
-F "[البريد الإلكتروني محمي] " \
-F "clusterName=default" \
-F "language=PYTHON" \
-F "jobClass=submit" \
-w "\nHTTP_STATUS:%{http_code}" \
"$ILUM_API_URL/job/submit")

# Extract Status and Body
HTTP_STATUS=$(echo "$RESPONSE" | grep HTTP_STATUS | cut - d': ' - f2)
BODY=$(echo "$RESPONSE" | sed '/HTTP_STATUS/d')

echo "HTTP Status: $HTTP_STATUS"
echo "Response Body: $BODY"

لو [ "$HTTP_STATUS" - ne 200 ] ; then
echo "Error: Failed to create job (Status: $HTTP_STATUS)"
exit 1
fi

JOB_ID=$(echo "$BODY" | jq - r '.jobId // empty')
لو [ - n "$JOB_ID" ] ; then
echo "✅ Job created successfully with ID $JOB_ID."
echo "You can check status in Ilum UI."
اخر
echo "Warning: Job created but ID not returned."
fi

Pipeline Variables

For security, avoid hardcoding URLs. Use GitLab CI/CD Variables (Settings -> CI/CD -> Variables):

jobConfig format

When using the بيانات متعددة الأجزاء / النماذج endpoint (like /وظيفة/إرسال أو /مجموعة ), the jobConfig should be a semicolon-separated string (e.g., spark.key=value;spark.key2=value2).

However, when submitting an interactive execution via the JSON API (e.g., /group/{groupId}/job/تنفيذ ), the jobConfig must be a standard JSON object.

  • ILUM_API_URL: e.g., https://ilum.example.com/api
  • ILUM_AUTH_TOKEN: If authentication is enabled, pass this header in حليقه ( -H "Authorization: Bearer $ILUM_AUTH_TOKEN").

Verification

  1. Push your code to the رئيسي branch.
  2. الانتقال إلى Build -> Pipelines in GitLab.
  3. Wait for the submit_job stage to pass.
  4. Open the واجهة مستخدم Ilum -> وظائف التبويب.
  5. You should see gitlab_daily_report in the running or completed state.

Frequently Asked Questions (FAQ)

Why use the REST API instead of شرارة تقديم ?

The Ilum REST API provides a programmatic, language-agnostic way to interact with the cluster. Unlike شرارة تقديم which requires a complex client-side setup (Java, Hadoop configs, K8s credentials) in your CI runner, the REST API only requires حليقه and network access to the Ilum endpoint. This drastically simplifies your CI runner images.

How do I handle secrets in my pipeline?

Never hardcode secrets like S3 keys or database passwords in your submit.py. Instead:

  1. Store them as GitLab CI/CD Variables (masked and protected).
  2. Pass them to the Ilum job as Spark configuration properties or environment variables during the API call (e.g., -F "jobConfig=spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY...").

Can I run this on GitHub Actions?

Yes. The concepts are identical. You would replace the .gitlab-ci.yml syntax with GitHub Actions workflow syntax, using حليقه steps to hit the same Ilum API endpoints.