Bigdata_PySpark

Modelo XGBoost de machine learning para Big Data usando MLlib y PySpark

Banner

Código Python:

1. Importar bibliotecas

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from sparkxgb import XGBoostRegressor

2. Inicializar la sesión de Spark para computación distribuida

spark = SparkSession.builder.appName("XGBoostRegressionExample").getOrCreate()

3. Cargue el conjunto de datos grande en un DataFrame. Use un formato distribuido como CSV o Parquet.

data = spark.read.csv(‘datafile.scv’, header=True, inferSchema=True)

4. Características y etiquetas. VectorAssembler combina todas las columnas de características en una única columna vectorial requerida por MLlib.

feature_cols = [col for col in data.columns if col != "label"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

5. Transformar los datos antes de dividirlos

assembled_data = assembler.transform(data)

6. Dividir los datos en subconjuntos de entrenamiento y testeo

train, test = assembled_data.randomSplit([0.8, 0.2], seed=42)

7. Crear el regresor de XGBoost. Requiere el módulo spark-xgb

xgb = XGBoostRegressor(
    featuresCol="features",
    labelCol="label",
    maxDepth=6,
    n_estimators=100,
    objective="reg:squarederror"
)

8. Construcción de la pipeline. Combina preprocesamiento y modelado en una única pipeline para reproducibilidad

pipeline = Pipeline(stages=[assembler, xgb])

9. Entrena el modelo. Ajusta la canalización a los datos de entrenamiento.

model = pipeline.fit(train)

10. Predecir sobre el conjunto de testeo

predictions = model.transform(test)

11. Evaluar el modelo

evaluator_rmse = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"                           )
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")

12. Detener PySpark

spark.stop()