728x90
https://www.kaggle.com/competitions/predict-energy-behavior-of-prosumers/overview
1. Import Modules
import warnings
warnings.filterwarnings("ignore")
import os
import gc
import pickle
import datetime
import copy
from tqdm.auto import tqdm
import numpy as np
import pandas as pd
import polars as pl
import plotly.express as px
from sklearn.ensemble import VotingRegressor
import lightgbm as lgb
import holidays
2. DataStorage
polars Tip
pl.read_csv( )
DataFrame = pl.read_csv(
os.path.join(self.root, "dataset_name.csv"),
columns=분석할 columns,
try_parse_dates: string -> datetime 자료형으로 변환
)
Class 내부 직접 정의된 변수 호출할 때 앞에 self 넣어주기: self.variable_name
Column 선택
단일 Column 선택: pl.DataFrame.col("column_name")
복수 Column 선택: pl.DataFrame.select(column_list)
Filter Rows
Filter Rows: pl.DataFrame.filter(condition)
pl.DataFrame.with_columns( pl.col("column_name")을 이용한 Expression )
- 특정 Column의 DataType을 바꾸고 싶을 때
pl.DataFrame.with_column(pl.col("column_name").cast(pl.datatypes.DataType))
- 특정 Column에 변형을 가해서 새로운 Column으로 추가하고 싶을 때
pl.DataFrame.with_column(pl.col("column_name") -> expression ).alias("new_column_name")
pl.DataFrame.with_column(pl.col("column_name") -> expression ), "new_column_name")
df_features.with_columns(
pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
pl.col("datetime").dt.hour().alias("hour"),
pl.col("datetime").dt.day().alias("day"),
pl.col("datetime").dt.weekday().alias("weekday"),
pl.col("datetime").dt.month().alias("month"),
pl.col("datetime").dt.year().alias("year"),
)
pl.DataFrame.schema
- DataFrame의 각 Column이 Key로 들어감
- 해당 Column의 자료형이 Value로 들어감
OrderedDict[column_name : DataType, column_name2 : DataType2, ...]
Pandas Series/DataFrame -> Polar Series/DataFrame으로 변형
pl.from_pandas(
pd.DataFrame,
schema_overrides
- SchemaDict 형태: {column_name : DataType}의 OrderedDict
- 각 Column에 대해 DataType이 Override 되도록 설정
- Override란: 부모클래스의 method와 같은 이름, 매개변수를 재정의 하는 것
)
pl.concat([DataFrame_list])
pl.concat([DataFrame1, DataFrame2, ...])
pl.DataFrame.unique(Subset)
pl.DataFrame.unique(Subset)
- DataFrame의 Row 중에서 겹치는 Row 제거
- Subset: Column names(list 형태로), None일 경우 모든 Column
pd.DataFrame.rename(columns)
pd.DataFrame.rename(columns)
- columns: Dictionary 형태로 {바꾸기 전 : 바꾼 후}로 설정
DataStorage FLOW
- Class DataStorage 내부에 root와 각 dataset의 분석할 Column들을 list로 정리
- 생성자 정의
- 각 Dataset을 polar.DataFrame 형태로 불러옴
- df_data의 경우에 코로나기간 반영을 위해 2022년 이후를 filtering
- schema_dataset을 각 dataset의 schema 속성에서 가져와 할당
- OrderedDict 자료형은 나중에 pl.from_pandas()의 parameter중 schema_overrides에 사용
- df_weather_station_to_county_mapping: "lantitude"와 "longitude" Column의 자료형 변환
- update_with_new_data 정의
- 새로운 Dataset을 받아서 분석할 Column만 Filtering 후 schema_override 적용
- 기존 Dataset과 새로운 Dataset을 Concat & unique( ) 속성으로 겹치는 Row 제거
- preprocess_test 정의
- Test DataFrame을 입력으로 받아서 "prediction_datetime" column을 "datetime" column으로 rename
- Test DataFrame의 분석할 Column을 "target"만 제외한 나머지 Column으로 결정 후 Polar DataFrame으로 변환
class DataStorage:
root = "/Users/eric/Documents/Competition/KAGGLE/predict-energy-behavior-of-prosumers"
data_cols = [
"target",
"county",
"is_business",
"product_type",
"is_consumption",
"datetime",
"row_id",
]
client_cols = [
"product_type",
"county",
"eic_count",
"installed_capacity",
"is_business",
"date",
]
gas_prices_cols = ["forecast_date", "lowest_price_per_mwh", "highest_price_per_mwh"]
electricity_prices_cols = ["forecast_date", "euros_per_mwh"]
forecast_weather_cols = [
"latitude",
"longitude",
"origin_datetime",
"hours_ahead",
"temperature",
"dewpoint",
"cloudcover_high",
"cloudcover_low",
"cloudcover_mid",
"cloudcover_total",
"10_metre_u_wind_component",
"10_metre_v_wind_component",
"forecast_datetime",
"direct_solar_radiation",
"surface_solar_radiation_downwards",
"snowfall",
"total_precipitation",
]
historical_weather_cols = [
"datetime",
"temperature",
"dewpoint",
"rain",
"snowfall",
"surface_pressure",
"cloudcover_total",
"cloudcover_low",
"cloudcover_mid",
"cloudcover_high",
"windspeed_10m",
"winddirection_10m",
"shortwave_radiation",
"direct_solar_radiation",
"diffuse_radiation",
"latitude",
"longitude",
]
location_cols = ["longitude", "latitude", "county"]
target_cols = [
"target",
"county",
"is_business",
"product_type",
"is_consumption",
"datetime",
]
def __init__(self):
self.df_data = pl.read_csv(
os.path.join(self.root, "train.csv"),
columns=self.data_cols,
try_parse_dates=True,
)
self.df_client = pl.read_csv(
os.path.join(self.root, "client.csv"),
columns=self.client_cols,
try_parse_dates=True,
)
self.df_gas_prices = pl.read_csv(
os.path.join(self.root, "gas_prices.csv"),
columns=self.gas_prices_cols,
try_parse_dates=True,
)
self.df_electricity_prices = pl.read_csv(
os.path.join(self.root, "electricity_prices.csv"),
columns=self.electricity_prices_cols,
try_parse_dates=True,
)
self.df_forecast_weather = pl.read_csv(
os.path.join(self.root, "forecast_weather.csv"),
columns=self.forecast_weather_cols,
try_parse_dates=True,
)
self.df_historical_weather = pl.read_csv(
os.path.join(self.root, "historical_weather.csv"),
columns=self.historical_weather_cols,
try_parse_dates=True,
)
self.df_weather_station_to_county_mapping = pl.read_csv(
os.path.join(self.root, "weather_station_to_county_mapping.csv"),
columns=self.location_cols,
try_parse_dates=True,
)
self.df_data = self.df_data.filter(
pl.col("datetime") >= pd.to_datetime("2022-01-01")
)
self.df_target = self.df_data.select(self.target_cols)
self.schema_data = self.df_data.schema
self.schema_client = self.df_client.schema
self.schema_gas_prices = self.df_gas_prices.schema
self.schema_electricity_prices = self.df_electricity_prices.schema
self.schema_forecast_weather = self.df_forecast_weather.schema
self.schema_historical_weather = self.df_historical_weather.schema
self.schema_target = self.df_target.schema
self.df_weather_station_to_county_mapping = (
self.df_weather_station_to_county_mapping.with_columns(
pl.col("latitude").cast(pl.datatypes.Float32),
pl.col("longitude").cast(pl.datatypes.Float32),
)
)
def update_with_new_data(
self,
df_new_client,
df_new_gas_prices,
df_new_electricity_prices,
df_new_forecast_weather,
df_new_historical_weather,
df_new_target,
):
df_new_client = pl.from_pandas(
df_new_client[self.client_cols], schema_overrides=self.schema_client
)
df_new_gas_prices = pl.from_pandas(
df_new_gas_prices[self.gas_prices_cols],
schema_overrides=self.schema_gas_prices,
)
df_new_electricity_prices = pl.from_pandas(
df_new_electricity_prices[self.electricity_prices_cols],
schema_overrides=self.schema_electricity_prices,
)
df_new_forecast_weather = pl.from_pandas(
df_new_forecast_weather[self.forecast_weather_cols],
schema_overrides=self.schema_forecast_weather,
)
df_new_historical_weather = pl.from_pandas(
df_new_historical_weather[self.historical_weather_cols],
schema_overrides=self.schema_historical_weather,
)
df_new_target = pl.from_pandas(
df_new_target[self.target_cols], schema_overrides=self.schema_target
)
self.df_client = pl.concat([self.df_client, df_new_client]).unique(
["date", "county", "is_business", "product_type"]
)
self.df_gas_prices = pl.concat([self.df_gas_prices, df_new_gas_prices]).unique(
["forecast_date"]
)
self.df_electricity_prices = pl.concat(
[self.df_electricity_prices, df_new_electricity_prices]
).unique(["forecast_date"])
self.df_forecast_weather = pl.concat(
[self.df_forecast_weather, df_new_forecast_weather]
).unique(["forecast_datetime", "latitude", "longitude", "hours_ahead"])
self.df_historical_weather = pl.concat(
[self.df_historical_weather, df_new_historical_weather]
).unique(["datetime", "latitude", "longitude"])
self.df_target = pl.concat([self.df_target, df_new_target]).unique(
["datetime", "county", "is_business", "product_type", "is_consumption"]
)
def preprocess_test(self, df_test):
df_test = df_test.rename(columns={"prediction_datetime": "datetime"})
df_test = pl.from_pandas(
df_test[self.data_cols[1:]], schema_overrides=self.schema_data
)
return df_test
3. FeatureGenerator
Polar Tips
pl.concat_str( string_list, segment )
- 각 Column들을 수평적으로(axis=1) Concat 시켜 하나의 String Column을 형성
- segment: 각 string_list의 원소들을 segment로 연결하여 String Column의 이름을 만듦
.with_columns(
pl.concat_str(
"county",
"is_business",
"product_type",
"is_consumption",
segment="_",
).alias("segment")
Cyclic Features Encoding
- 시간 데이터를 취급할 때 주기적인 패턴을 모델에게 효과적으로 전달하는데 사용됨
- 특히, 일 또는 시간 같은 시간적 특성을 순환적인 형태로 표현할 때 유용함
.with_columns(
(np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
(np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
(np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
(np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
)
pl.DataFrame.join(Other_DataFrame, on, how)
- on의 Column들이 일치하는 경우 how방식에 의해 두 DataFrame을 합침
- Other_DataFrame: 기존 DataFrame과 합칠 DataFrame
- on: 두 DataFrame에서 join하는 Column들의 이름
- how: join 방식
- inner: 교집합 (두 DataFrame 모두에 해당하는 Value에 있는 Row 반환)
- left: 기존 DataFrame 기준(기존 DataFrame에 해당하는 모든 Row 반환, 새로운 DataFrame과 Match하는 모든 Row 반환)
- outer: 합집합 (두 DataFrame 중 하나라도 Match가 있는 Row 반환)
df_features = df_features.join(
df_client.with_columns(
(pl.col("date") + pl.duration(days=2)).cast(pl.Date)
),
on=["county", "is_business", "product_type", "date"],
how="left",
)
pl.struct( )
- column들을 모아 struct column으로 반환
https://docs.pola.rs/user-guide/expressions/structs/#extracting-individual-values-of-a-struct
pl.DataFrame.group_by( by ).column_aggregation
- by: group_by할 column_list (기준)
df_forecast_weather_date = (
df_forecast_weather.group_by("datetime").mean().drop("county")
)
FeaturesGenerator FLOW
class FeaturesGenerator:
def __init__(self, data_storage):
self.data_storage = data_storage
self.estonian_holidays = list(
holidays.country_holidays("EE", years=range(2021, 2026)).keys()
)
def _add_general_features(self, df_features):
df_features = (
df_features.with_columns(
pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
pl.col("datetime").dt.hour().alias("hour"),
pl.col("datetime").dt.day().alias("day"),
pl.col("datetime").dt.weekday().alias("weekday"),
pl.col("datetime").dt.month().alias("month"),
pl.col("datetime").dt.year().alias("year"),
)
.with_columns(
pl.concat_str(
"county",
"is_business",
"product_type",
"is_consumption",
separator="_",
).alias("segment"),
)
.with_columns(
(np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
(np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
(np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
(np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
)
)
return df_features
def _add_client_features(self, df_features):
df_client = self.data_storage.df_client
df_features = df_features.join(
df_client.with_columns(
(pl.col("date") + pl.duration(days=2)).cast(pl.Date)
),
on=["county", "is_business", "product_type", "date"],
how="left",
)
return df_features
def is_country_holiday(self, row):
return (
datetime.date(row["year"], row["month"], row["day"])
in self.estonian_holidays
)
def _add_holidays_features(self, df_features):
df_features = df_features.with_columns(
pl.struct(["year", "month", "day"])
.apply(self.is_country_holiday)
.alias("is_country_holiday")
)
return df_features
def _add_forecast_weather_features(self, df_features):
df_forecast_weather = self.data_storage.df_forecast_weather
df_weather_station_to_county_mapping = (
self.data_storage.df_weather_station_to_county_mapping
)
df_forecast_weather = (
df_forecast_weather.rename({"forecast_datetime": "datetime"})
.filter((pl.col("hours_ahead") >= 22) & pl.col("hours_ahead") <= 45)
.drop("hours_ahead")
.with_columns(
pl.col("latitude").cast(pl.datatypes.Float32),
pl.col("longitude").cast(pl.datatypes.Float32),
)
.join(
df_weather_station_to_county_mapping,
how="left",
on=["longitude", "latitude"],
)
.drop("longitude", "latitude", "origin_datetime")
)
df_forecast_weather_date = (
df_forecast_weather.group_by("datetime").mean().drop("county")
)
df_forecast_weather_local = (
df_forecast_weather.filter(pl.col("county").is_not_null())
.group_by("county", "datetime")
.mean()
)
for hours_lag in [0, 7 * 24]:
df_features = df_features.join(
df_forecast_weather_date.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
),
on="datetime",
how="left",
suffix=f"_forecast_{hours_lag}h",
)
df_features = df_features.join(
df_forecast_weather_local.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
),
on=["county", "datetime"],
how="left",
suffix=f"_forecast_local_{hours_lag}h",
)
df_features = df_features.with_columns(
(
pl.col(f"temperature_forecast_local_0h")
/ (pl.col(f"temperature_forecast_local_168h") + 1e-3)
).alias(f"temperature_forecast_local_0h/168h"),
(
pl.col(f"surface_solar_radiation_downwards_forecast_local_0h")
/ (pl.col(f"surface_solar_radiation_downwards_forecast_local_168h") + 1e-3)
).alias(f"surface_solar_radiation_downwards_forecast_local_0h/168h"),
)
return df_features
def _add_historical_weather_features(self, df_features):
df_historical_weather = self.data_storage.df_historical_weather
df_weather_station_to_county_mapping = (
self.data_storage.df_weather_station_to_county_mapping
)
df_historical_weather = (
df_historical_weather.with_columns(
pl.col("latitude").cast(pl.datatypes.Float32),
pl.col("longitude").cast(pl.datatypes.Float32),
)
.join(
df_weather_station_to_county_mapping,
how="left",
on=["longitude", "latitude"],
)
.drop("longitude", "latitude")
)
df_historical_weather_date = (
df_historical_weather.group_by("datetime").mean().drop("county")
)
df_historical_weather_local = (
df_historical_weather.filter(pl.col("county").is_not_null())
.group_by("county", "datetime")
.mean()
)
for hours_lag in [2 * 24, 7 * 24]:
df_features = df_features.join(
df_historical_weather_date.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
),
on="datetime",
how="left",
suffix=f"_historical_{hours_lag}h",
)
df_features = df_features.join(
df_historical_weather_local.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
),
on=["county", "datetime"],
how="left",
suffix=f"_historical_local_{hours_lag}h",
)
for hours_lag in [1 * 24]:
df_features = df_features.join(
df_historical_weather_date.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag),
pl.col("datetime").dt.hour().alias("hour"),
)
.filter(pl.col("hour") <= 10)
.drop("hour"),
on="datetime",
how="left",
suffix=f"_historical_{hours_lag}h",
)
df_features = df_features.with_columns(
(
pl.col(f"temperature_historical_local_48h")
/ (pl.col(f"temperature_historical_local_168h") + 1e-3)
).alias(f"temperature_historical_local_48h/168h"),
(
pl.col(f"direct_solar_radiation_historical_local_48h")
/ (pl.col(f"direct_solar_radiation_historical_local_168h") + 1e-3)
).alias(f"direct_solar_radiation_historical_local_48h/168h"),
(
pl.col(f"temperature_historical_24h")
/ (pl.col(f"temperature") + 1e-3)
).alias(f"temperature_historical_24h/48h"),
(
pl.col(f"direct_solar_radiation_historical_24h")
/ (pl.col(f"direct_solar_radiation") + 1e-3)
).alias(f"direct_solar_radiation_historical_24h/48h"),
)
return df_features
def _add_target_features(self, df_features):
df_target = self.data_storage.df_target
df_target_all_type_sum = (
df_target.group_by(["datetime", "county", "is_business", "is_consumption"])
.sum()
.drop("product_type")
)
df_target_all_county_type_sum = (
df_target.group_by(["datetime", "is_business", "is_consumption"])
.sum()
.drop("product_type", "county")
)
for hours_lag in [
2 * 24,
3 * 24,
4 * 24,
5 * 24,
6 * 24,
7 * 24,
8 * 24,
9 * 24,
10 * 24,
11 * 24,
12 * 24,
13 * 24,
14 * 24,
]:
df_features = df_features.join(
df_target.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
).rename({"target": f"target_{hours_lag}h"}),
on=[
"county",
"is_business",
"product_type",
"is_consumption",
"datetime",
],
how="left",
)
for hours_lag in [2 * 24, 3 * 24, 7 * 24, 14 * 24]:
df_features = df_features.join(
df_target_all_type_sum.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
).rename({"target": f"target_all_type_sum_{hours_lag}h"}),
on=["county", "is_business", "is_consumption", "datetime"],
how="left",
)
df_features = df_features.join(
df_target_all_county_type_sum.with_columns(
pl.col("datetime") + pl.duration(hours=hours_lag)
).rename({"target": f"target_all_county_type_sum_{hours_lag}h"}),
on=["is_business", "is_consumption", "datetime"],
how="left",
suffix=f"_all_county_type_sum_{hours_lag}h",
)
cols_for_stats = [
f"target_{hours_lag}h" for hours_lag in [2 * 24, 3 * 24, 4 * 24, 5 * 24]
]
df_features = df_features.with_columns(
df_features.select(cols_for_stats).mean(axis=1).alias(f"target_mean"),
df_features.select(cols_for_stats)
.transpose()
.std()
.transpose()
.to_series()
.alias(f"target_std"),
)
for target_prefix, lag_nominator, lag_denomonator in [
("target", 24 * 7, 24 * 14),
("target", 24 * 2, 24 * 9),
("target", 24 * 3, 24 * 10),
("target", 24 * 2, 24 * 3),
("target_all_type_sum", 24 * 2, 24 * 3),
("target_all_type_sum", 24 * 7, 24 * 14),
("target_all_county_type_sum", 24 * 2, 24 * 3),
("target_all_county_type_sum", 24 * 7, 24 * 14),
]:
df_features = df_features.with_columns(
(
pl.col(f"{target_prefix}_{lag_nominator}h")
/ (pl.col(f"{target_prefix}_{lag_denomonator}h") + 1e-3)
).alias(f"{target_prefix}_ratio_{lag_nominator}_{lag_denomonator}")
)
return df_features
def _reduce_memory_usage(self, df_features):
df_features = df_features.with_columns(pl.col(pl.Float64).cast(pl.Float32))
return df_features
def _drop_columns(self, df_features):
df_features = df_features.drop(
"date", "datetime", "hour", "dayofyear"
)
return df_features
def _to_pandas(self, df_features, y):
cat_cols = [
"county",
"is_business",
"product_type",
"is_consumption",
"segment",
]
if y is not None:
df_features = pd.concat([df_features.to_pandas(), y.to_pandas()], axis=1)
else:
df_features = df_features.to_pandas()
df_features[cat_cols] = df_features[cat_cols].astype("category")
if 'row_id' in df_features.columns:
df_features = df_features.drop("row_id", axis=1)
return df_features
def generate_features(self, df_prediction_items):
if "target" in df_prediction_items.columns:
df_prediction_items, y = (
df_prediction_items.drop("target"),
df_prediction_items.select("target"),
)
else:
y = None
df_features = df_prediction_items.with_columns(
pl.col("datetime").cast(pl.Date).alias("date"),
)
for add_features in [
self._add_general_features,
self._add_client_features,
self._add_forecast_weather_features,
self._add_historical_weather_features,
self._add_target_features,
self._add_holidays_features,
self._reduce_memory_usage,
self._drop_columns,
]:
df_features = add_features(df_features)
df_features = self._to_pandas(df_features, y)
return df_features
728x90