import json
from kafka import KafkaConsumer
from river import linear_model, preprocessing, metrics
consumer = KafkaConsumer(
'realtime-data',
bootstrap_servers='localhost:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
model = preprocessing.StandardScaler() | linear_model.LinearRegression()
mae = metrics.MAE()
for message in consumer:
data = message.value
# Datos de entrada de ejemplo: {'f1': 1.0, 'f2': 2.5, 'target': 10.2}
y = data.pop('target')
X = data
# Predecir y actualizar
# Cuando `predict_one` es invocado antes de que el modelo haya recibido datos, `y_pred` puede recibir el valor `None`:
y_pred = model.predict_one(X) or 0.0
print(f"Predicted: {y_pred:.2f} | Actual: {y}")
model.learn_one(X, y)
mae.update(y, y_pred)
print(f"Current MAE: {mae.get():.4f}")