使用Polars建立高性能財務分析管道:懶惰評估,高級表達和SQL集成

在本教程中,我們深入研究了使用的高級數據分析管道 方面,旨在最佳性能和可擴展性的閃電般快速數據框架庫。我們的目標是展示如何利用Porars的懶惰評估,複雜的表達式,窗口功能和SQL接口來有效地處理大型財務數據集。我們首先生成合成的財務時間序列數據集,然後通過端到端管道逐步移動,從功能工程和滾動統計數據到多維分析和排名。在整個過程中,我們演示了Polars如何使我們能夠編寫表現力和性能數據轉換,同時保持低內存使用情況並確保快速執行。

import polars as pl
import numpy as np
from datetime import datetime, timedelta
import io


try:
    import polars as pl
except ImportError:
    import subprocess
    subprocess.run(("pip", "install", "polars"), check=True)
    import polars as pl


print("🚀 Advanced Polars Analytics Pipeline")
print("=" * 50)

我們首先導入基本庫,包括用於高性能數據框架操作的極點,以及用於生成合成數據的NUMPY。為了確保兼容性,我們添加了Porars的後備安裝步驟,以防列表尚未安裝。準備好設置後,我們向高級分析管道的開始發出信號。

np.random.seed(42)
n_records = 100000
dates = (datetime(2020, 1, 1) + timedelta(days=i//100) for i in range(n_records))
tickers = np.random.choice(('AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN'), n_records)


# Create complex synthetic dataset
data = {
    'timestamp': dates,
    'ticker': tickers,
    'price': np.random.lognormal(4, 0.3, n_records),
    'volume': np.random.exponential(1000000, n_records).astype(int),
    'bid_ask_spread': np.random.exponential(0.01, n_records),
    'market_cap': np.random.lognormal(25, 1, n_records),
    'sector': np.random.choice(('Tech', 'Finance', 'Healthcare', 'Energy'), n_records)
}


print(f"📊 Generated {n_records:,} synthetic financial records")

我們使用numpy生成了一個豐富的合成金融數據集,並使用numpy進行了100,000個記錄,並為諸如AAPL和TSLA等主要股票的每日庫存數據模擬。每個條目都包括價格,數量,出價差價,市場上限和行業等關鍵市場功能。這為在時間序列數據集上展示高級PORARS分析提供了現實的基礎。

lf = pl.LazyFrame(data)


result = (
    lf
    .with_columns((
        pl.col('timestamp').dt.year().alias('year'),
        pl.col('timestamp').dt.month().alias('month'),
        pl.col('timestamp').dt.weekday().alias('weekday'),
        pl.col('timestamp').dt.quarter().alias('quarter')
    ))
   
    .with_columns((
        pl.col('price').rolling_mean(20).over('ticker').alias('sma_20'),
        pl.col('price').rolling_std(20).over('ticker').alias('volatility_20'),
       
        pl.col('price').ewm_mean(span=12).over('ticker').alias('ema_12'),
       
        pl.col('price').diff().alias('price_diff'),
       
        (pl.col('volume') * pl.col('price')).alias('dollar_volume')
    ))
   
    .with_columns((
        pl.col('price_diff').clip(0, None).rolling_mean(14).over('ticker').alias('rsi_up'),
        pl.col('price_diff').abs().rolling_mean(14).over('ticker').alias('rsi_down'),
       
        (pl.col('price') - pl.col('sma_20')).alias('bb_position')
    ))
   
    .with_columns((
        (100 - (100 / (1 + pl.col('rsi_up') / pl.col('rsi_down')))).alias('rsi')
    ))
   
    .filter(
        (pl.col('price') > 10) &
        (pl.col('volume') > 100000) &
        (pl.col('sma_20').is_not_null())
    )
   
    .group_by(('ticker', 'year', 'quarter'))
    .agg((
        pl.col('price').mean().alias('avg_price'),
        pl.col('price').std().alias('price_volatility'),
        pl.col('price').min().alias('min_price'),
        pl.col('price').max().alias('max_price'),
        pl.col('price').quantile(0.5).alias('median_price'),
       
        pl.col('volume').sum().alias('total_volume'),
        pl.col('dollar_volume').sum().alias('total_dollar_volume'),
       
        pl.col('rsi').filter(pl.col('rsi').is_not_null()).mean().alias('avg_rsi'),
        pl.col('volatility_20').mean().alias('avg_volatility'),
        pl.col('bb_position').std().alias('bollinger_deviation'),
       
        pl.len().alias('trading_days'),
        pl.col('sector').n_unique().alias('sectors_count'),
       
        (pl.col('price') > pl.col('sma_20')).mean().alias('above_sma_ratio'),
       
        ((pl.col('price').max() - pl.col('price').min()) / pl.col('price').min())
          .alias('price_range_pct')
    ))
   
    .with_columns((
        pl.col('total_dollar_volume').rank(method='ordinal', descending=True).alias('volume_rank'),
        pl.col('price_volatility').rank(method='ordinal', descending=True).alias('volatility_rank')
    ))
   
    .filter(pl.col('trading_days') >= 10)
    .sort(('ticker', 'year', 'quarter'))
)

我們將合成數據集加載到Porars LazyFrame中以實現遞延執行,從而使我們能夠有效地鏈接複雜的轉換。從那裡,我們使用窗口和滾動功能從那裡使用基於時間的功能豐富了數據,並應用了高級技術指標,例如移動平均,RSI和Bollinger頻段。然後,我們通過股票,年和季度進行分組聚合,以提取關鍵的財務統計和指標。最後,我們根據數量和波動率對結果進行排名,過濾掉不足的片段,並為直觀探索的數據進行排序,同時利用Porars強大的懶惰評估引擎以充分利用其優勢。

df = result.collect()
print(f"\n📈 Analysis Results: {df.height:,} aggregated records")
print("\nTop 10 High-Volume Quarters:")
print(df.sort('total_dollar_volume', descending=True).head(10).to_pandas())


print("\n🔍 Advanced Analytics:")


pivot_analysis = (
    df.group_by('ticker')
    .agg((
        pl.col('avg_price').mean().alias('overall_avg_price'),
        pl.col('price_volatility').mean().alias('overall_volatility'),
        pl.col('total_dollar_volume').sum().alias('lifetime_volume'),
        pl.col('above_sma_ratio').mean().alias('momentum_score'),
        pl.col('price_range_pct').mean().alias('avg_range_pct')
    ))
    .with_columns((
        (pl.col('overall_avg_price') / pl.col('overall_volatility')).alias('risk_adj_score'),
       
        (pl.col('momentum_score') * 0.4 +
         pl.col('avg_range_pct') * 0.3 +
         (pl.col('lifetime_volume') / pl.col('lifetime_volume').max()) * 0.3)
         .alias('composite_score')
    ))
    .sort('composite_score', descending=True)
)


print("\n🏆 Ticker Performance Ranking:")
print(pivot_analysis.to_pandas())

懶惰管道完成後,我們將結果收集到數據框架中,並根據總美元的總量立即查看前10個季度。這有助於我們確定激烈的交易活動時期。然後,我們將分析進一步邁出了一步,將數據通過股票對數據進行分組,以計算更高級別的見解,例如終身交易量,平均價格波動和自定義的綜合分數。這個多維摘要不僅可以通過原始數量來比較股票,還可以通過勢頭和風險調整的性能進行比較,從而使對整體股票行為的深入了解更深入。

print("\n🔄 SQL Interface Demo:")
pl.Config.set_tbl_rows(5)


sql_result = pl.sql("""
    SELECT
        ticker,
        AVG(avg_price) as mean_price,
        STDDEV(price_volatility) as volatility_consistency,
        SUM(total_dollar_volume) as total_volume,
        COUNT(*) as quarters_tracked
    FROM df
    WHERE year >= 2021
    GROUP BY ticker
    ORDER BY total_volume DESC
""", eager=True)


print(sql_result)


print(f"\n⚡ Performance Metrics:")
print(f"   • Lazy evaluation optimizations applied")
print(f"   • {n_records:,} records processed efficiently")
print(f"   • Memory-efficient columnar operations")
print(f"   • Zero-copy operations where possible")


print(f"\n💾 Export Options:")
print("   • Parquet (high compression): df.write_parquet('data.parquet')")
print("   • Delta Lake: df.write_delta('delta_table')")
print("   • JSON streaming: df.write_ndjson('data.jsonl')")
print("   • Apache Arrow: df.to_arrow()")


print("\n✅ Advanced Polars pipeline completed successfully!")
print("🎯 Demonstrated: Lazy evaluation, complex expressions, window functions,")
print("   SQL interface, advanced aggregations, and high-performance analytics")

我們通過展示Polars優雅的SQL接口,運行匯總查詢來結束管道,以分析熟悉的SQL語法的2021後股票性能。這種混合功能使我們能夠將表達性的圓盤轉換與聲明性的SQL查詢無縫融合。為了強調其效率,我們打印關鍵性能指標,強調懶惰評估,內存效率和零副本的執行。最後,我們證明瞭如何輕鬆地以各種格式導出結果,例如Parquet,Arrow和JSONL,這使這條管道既有功能又準備生產。因此,我們使用Porars完成了全循環的高性能分析工作流程。

總之,我們已經看到了Polars的懶惰API如何優化複雜的分析工作流程,這些工作流在傳統工具中否則會遲鈍。我們已經開發了一條全面的財務分析管道,從原始數據攝入到滾動指標,分組的聚合和高級評分,所有這些都以高速速度執行。不僅如此,我們還挖掘了Polars功能強大的SQL接口,以在數據范圍內無縫運行熟悉的查詢。這種編寫功能性表達式和SQL的雙重能力使Polars成為任何數據科學家的靈活工具。


查看 這項研究的所有信用都歸該項目的研究人員。另外,請隨時關注我們 嘰嘰喳喳 而且不要忘記加入我們的 100K+ ml子雷迪特 並訂閱 我們的新聞通訊


Marktechpost的諮詢實習生,IIT Madras的雙學位學生Sana Hassan熱衷於應用技術和AI來應對現實世界中的挑戰。他對解決實踐問題的興趣非常興趣,他為AI和現實生活中的解決方案的交集帶來了新的視角。

Source link

Scroll to Top