تخطي إلى المحتوى الرئيسي

How to Write Interactive Spark Jobs in Python (IlumJob)

This guide teaches you how to develop interactive Spark jobs in Python using the Ilum Job interface. You'll learn how to structure your code, pass parameters at execution time, and leverage the benefits of this approach for production workloads on Kubernetes.

What is the Ilum Job Interface?

ال Ilum Job interface is a Python base class used to create reusable, parameterized Spark jobs that run on interactive Ilum services. Unlike traditional شرارة تقديم scripts, Ilum Job allows you to:

  • Receive configuration at runtime: Parameters are passed as a dictionary, allowing the same job to handle different inputs without code changes.
  • Return structured results:ال ركض method returns a string, making it easy to extract and display results.
  • Run on-demand: Jobs can be triggered via the UI, REST API, or CI/CD pipelines.
Basic Structure
من إيلوم . واجهة برمجة التطبيقات استورد Ilum Job 

فصل MySparkJob( Ilum Job ) :
مواطنه ركض ( ذات , شراره , التكوين ) - > str:
# Your Spark logic here
أعاد "Job completed successfully"

Structure of an Interactive Spark Job

Every interactive job consists of three essential parts:

  1. Import the interface: from ilum.api import IlumJob
  2. Define a class: Create a class that inherits from Ilum Job .
  3. Implement ركض : Write your Spark logic inside the run(self, spark, config)أسلوب.
Parameterنوع وصف
شراره جلسة سبارك Pre-initialized Spark session, ready to use.
التكوين dictA dictionary containing parameters passed at execution time.
ReturnstrA string result that will be displayed in the UI or returned via API.

How to Pass Parameters to Spark Jobs

Parameters are passed as a JSON object when executing the job. Inside your ركض method, you access them using standard dictionary methods.

Example: Table Inspector

This example demonstrates reading databaseو جدول parameters to inspect a Hive table.

table_inspector.py
من إيلوم . واجهة برمجة التطبيقات استورد Ilum Job 
من بايسبارك . SQL . functions استورد col, sum مثل spark_sum

فصل TableInspector( Ilum Job ) :
مواطنه ركض ( ذات , شراره , التكوين ) - > str:
# Read required parameters
table_name = التكوين . حصل ( "الجدول" )
database_name = التكوين . حصل ( 'database') # Optional

لو لا table_name :
raise ValueError( "Config must provide a 'table' key")

# Set database if provided
لو database_name:
شراره . كتالوج . setCurrentDatabase( database_name)

# Check if table exists
لو table_name لا في [ t . اسم من أجل t في شراره . كتالوج . listTables( ) ] :
raise ValueError( f"Table '{ table_name } ' not found in catalog")

مدافع = شراره . جدول ( table_name )

# Build report
report = [
f"=== Table: { table_name } ===",
f"Total rows: { مدافع . عد ( ) } " ,
f"Total columns: { len( مدافع . columns) } " ,
"" ,
"Schema:",
]
من أجل field في مدافع . مخطط . fields:
report. append( f" { field. اسم } : { field. dataType} " )

report. append( "" )
report. append( "Sample (5 rows):")
من أجل صف في مدافع . take( 5 ) :
report. append( str( صف . asDict( ) ) )

# Null counts
report. append( "" )
report. append( "Null counts:")
null_df = مدافع . select( [ spark_sum( col( c ) . isNull( ) . cast( "int") ) . alias( c ) من أجل c في مدافع . columns] )
من أجل c , v في null_df. collect( ) [ 0 ] . asDict( ) . items( ) :
report. append( f" { c } : { v} " )

أعاد "\n". join( report)

Execution Parameters (JSON)

When executing via UI or API, provide parameters like this:

{ 
"database": "ilum_example_product_sales",
"table": "products"
}

Before You Start

To run an interactive job, you first need to create and deploy a Job-type Service in Ilum. This service provides the Spark environment where your jobs execute.

When creating the service:

  • نوع : Select مهمة
  • اللغة : Select بايثون
  • Py Files: Upload your job file (e.g., table_inspector.py)

👉 Learn how to deploy a Job Service — step-by-step guide with UI screenshots and configuration options.

Executing Jobs

You can execute interactive jobs in three ways:

  1. الانتقال إلى خدمات → Select your Job service
  2. في المربع أعدم section:
    • فصل: table_inspector.TableInspector
    • Parameters: {"database": "sales", "table": "orders"}
  3. نقر أعدم

The result string is displayed immediately in the UI.


Benefits of the Ilum Job Approach

Benefitوصف
ReusabilityWrite once, run many times with different parameters.
No Cold StartsInteractive services keep Spark warm, so subsequent executions are instant.
ParameterizationPass configuration at runtime—no need to hardcode values.
قابلية الملاحظة Results are captured and visible in the UI/API for easy debugging.
API-DrivenExecute jobs programmatically from orchestrators, CI/CD, or external systems.
التحكم في الإصدار Store job code in Git and deploy via pipelines.

Interactive Jobs vs. Batch Jobs (Spark Submit)

ميزة Interactive Jobs (Ilum Job ) Batch Jobs (شرارة تقديم )
Startup TimeInstant (uses warm executors)Slow (provisions new pods)
ContextShared Spark ContextIsolated Spark Context
حالة الاستخدام Ad-hoc queries, API backends, quick reportsLong-running ETL, heavy processing
نتيجة Returns string result to API/UILogs to driver stdout/file
موارد Shared within the serviceDedicated per job

أفضل الممارسات

1. Validate Input Parameters

Always validate required parameters and provide helpful error messages.

Validate Parameters
مواطنه  ركض ( ذات , شراره , التكوين )  - >  str: 
required_keys = [ "الجدول" , 'output_path']
من أجل مفتاح في required_keys:
لو مفتاح لا في التكوين :
raise ValueError( f"Missing required parameter: '{ مفتاح } '")

2. Use Default Values

For optional parameters, use config.get('key', default_value).

Use Default Values
batch_size =  الباحث ( التكوين . حصل ( 'batch_size',  1000 ) ) 

3. Structure Your Output

Return a well-formatted string for readability in the UI.

Structure Output
lines =  [ "=== Job Summary ==="] 
lines. append( f"Processed: { عد } records")
lines. append( f"Duration: { elapsed_time} s")
أعاد "\n". join( lines)

4. Handle Errors Gracefully

Wrap risky operations in try/except and return meaningful messages.

Handle Errors
try: 
مدافع . يكتب . saveAsTable( output_table)
أعاد f"Successfully wrote to { output_table} "
except Exception مثل e :
أعاد f"Error writing table: { str( e ) } "

Complete Example: Transaction Report Generator

This job generates a transaction summary report based on the transaction_anomaly_d.transactionsجدول.

transaction_report.py
من إيلوم . واجهة برمجة التطبيقات استورد Ilum Job 
من بايسبارك . SQL . functions استورد sum مثل spark_sum, عد , col

فصل TransactionReportGenerator( Ilum Job ) :
مواطنه ركض ( ذات , شراره , التكوين ) - > str:
# Parameters
merchant_filter = التكوين . حصل ( 'merchant') # Optional filter

# Load data from the default Ilum transactions table
مدافع = شراره . جدول ( "transaction_anomaly_detection.transactions")

لو merchant_filter:
مدافع = مدافع . راووق ( col( "Merchant") == merchant_filter)

# Aggregate by TransactionType
summary = مدافع . groupBy( "TransactionType") . agg(
عد ( "TransactionID") . alias( "transaction_count") ,
spark_sum( "Amount") . alias( "total_amount")
) . collect( )

# Build report
report = [
f"=== Transaction Report ===",
f"Merchant Filter: { merchant_filter أو 'All'} " ,
"" ,
"Summary by Transaction Type:",
]
من أجل صف في summary:
report. append( f" { صف [ 'TransactionType'] } : { صف [ 'transaction_count'] } txns, ${ صف [ 'total_amount'] : ,.2f} " )

أعاد "\n". join( report)

Execute with:

Execute with Payload
{ 
"merchant": "AcmeCorp"
}

الخطوات التالية


الأسئلة المتكررة

Can I use Scala for interactive jobs?

Yes. Currently, the Ilum Job interface is primarily documented for بايثون . Check the Interactive Job Service documentation for language support details.

How do I debug an interactive job?

Since interactive jobs run on a remote cluster, you can't use a local debugger directly. Instead:

  1. استخدام print() statements or a logger, which will appear in the driver logs.
  2. Return error messages as part of the string result in your try/except blocks.
  3. Check the واجهة مستخدم Spark for the specific job execution to analyze tasks and stages.
What happens if my job fails?

If your code raises an unhandled exception, the execution will fail, and the error trace will be returned in the API response. It is best practice to wrap your logic in a try/except block to return a user-friendly error message.