Architecture du Data Pipeline
Le pipeline qui sépare les amateurs des professionnels
Après 30 ans dans le quantitatif, je peux affirmer sans hésitation : la qualité de vos données détermine 80% de la performance de votre système. J'ai vu des algorithmes médiocres surperformer des modèles sophistiqués simplement parce qu'ils étaient alimentés par des données impeccables. Le data pipeline n'est pas un détail d'infrastructure — c'est le fondement même de votre edge.
Le pipeline que nous allons construire traite quotidiennement ~15 000 instruments (US + EU + APAC), calcule 50 features par instrument, et stocke le tout dans un format optimisé pour les requêtes analytiques. Le tout sur une VM à 30€/mois.
Les 5 principes fondamentaux
1
Idempotence
Chaque exécution du pipeline avec les mêmes inputs produit exactement les mêmes outputs. Pas de randomness, pas de dépendance à l'état global. Si le pipeline crash à 3h du matin, vous le relancez et obtenez le même résultat. C'est la base de la reproductibilité scientifique appliquée au trading.
2
Point-in-Time Correctness
ZÉRO look-ahead bias. Chaque feature est calculée uniquement avec les données disponibles au moment T. Les fondamentaux utilisent la date de publication SEC, pas la date du trimestre. Les prix utilisent le close ajusté connu à T, pas le close re-ajusté rétroactivement. C'est l'erreur n°1 des backtests retail.
3
Auditabilité complète
Chaque donnée a un lineage traçable : source, timestamp d'ingestion, transformations appliquées. Si un signal produit un trade perdant, vous pouvez remonter exactement quelles données ont alimenté quelle feature, et comprendre pourquoi. Sans audit trail, vous ne pouvez pas debugger.
4
Batch T+0 EOD
Pour des stratégies daily, le pipeline tourne après la clôture US (22h Paris). Il ingère les données du jour, calcule toutes les features, et produit les signaux pour le lendemain. Pas de real-time — le real-time ajoute de la complexité sans valeur ajoutée pour du daily rebalancing.
5
Schéma strict
Chaque table a un schéma typé et versionné. Les colonnes obligatoires : symbol, date, open, high, low, close, volume, adj_close, source, ingested_at. Toute violation du schéma déclenche une alerte et bloque le pipeline.
Architecture détaillée du pipeline
Le diagramme ci-dessous montre le flux complet, du raw data aux signaux de trading. Chaque étape est un job indépendant orchestré par un scheduler (cron + supervision).
Python
import logging
from datetime import date
from pathlib import Path
logger = logging.getLogger("pipeline")
class PipelineOrchestrator:
"""Pipeline EOD idempotent — exécution T+0 après clôture US."""
def __init__(self, run_date: date, data_dir: Path):
self.run_date = run_date
self.data_dir = data_dir
self.stages = [
("ingest", self._ingest_raw_data),
("clean", self._clean_and_validate),
("adjust", self._apply_corporate_actions),
("normalize", self._normalize_cross_region),
("features", self._compute_features),
("validate", self._validate_features),
("store", self._write_feature_store),
]
def run(self) -> None:
"""Exécution séquentielle avec checkpointing."""
for name, stage_fn in self.stages:
checkpoint = self.data_dir / f".checkpoint_{name}_{self.run_date}"
if checkpoint.exists():
logger.info(f"Skipping {name} — already completed")
continue
logger.info(f"Running stage: {name}")
stage_fn()
checkpoint.touch()
logger.info("Pipeline completed successfully")
Checkpointing : l'assurance crash
Le pattern checkpoint permet de reprendre le pipeline exactement là où il s'est arrêté. Si le stage features crash après avoir traité 12 000 instruments sur 15 000, vous ne relancez pas l'ingestion — le checkpoint indique que les stages précédents sont déjà terminés. En production, j'ai vu des pipelines crasher 2-3 fois par semaine (timeout réseau, API rate limit, OOM). Sans checkpointing, chaque crash coûte 45 minutes de re-traitement. Avec, c'est 30 secondes.
Batch vs Real-Time : le choix stratégique
| Critère |
Batch EOD (notre choix) |
Real-Time Streaming |
| Latence |
T+0 (après clôture) |
Sub-seconde |
| Complexité |
Faible — cron + Python scripts |
Élevée — Kafka, Flink, infra 24/7 |
| Coût infra |
30€/mois (VM basique) |
500-2000€/mois minimum |
| Reproductibilité |
Parfaite — même input = même output |
Difficile — état distribué, ordering |
| Debugging |
Simple — fichiers Parquet inspectables |
Complexe — replay de streams |
| Adapté pour |
Daily, weekly rebalancing |
HFT, market making, arbitrage |
| Notre stratégie |
OUI — CAGR 80-120% avec daily |
NON — overkill, ROI négatif |
Le piège du real-time
J'ai vu des dizaines de quants solo gaspiller 6 mois à construire une infrastructure real-time pour des stratégies qui rebalancent une fois par jour. Le real-time n'ajoute aucun alpha pour du daily momentum ou du mean reversion weekly. Il ajoute uniquement de la complexité, des bugs, et des coûts d'infrastructure. Réservez le real-time pour quand vous aurez 10M€ AUM et une équipe de 3 ingénieurs.
Sources de données & Ingestion
Les 6 sources qui couvrent le monde
Un système multi-région nécessite plusieurs sources de données complémentaires. Aucune source unique ne couvre correctement US + EU + APAC avec la qualité requise. Voici notre stack de données, testée et optimisée sur 5 ans de production.
| Source |
Couverture |
Type |
Coût |
Fiabilité |
| IBKR API |
US, EU, APAC — via votre broker |
OHLCV, fondamentaux, options |
Gratuit (avec compte) |
★★★★☆ |
| Polygon.io |
US equities exhaustif |
OHLCV tick-level, REST + WS |
$29/mois (Starter) |
★★★★★ |
| Yahoo Finance |
Global (US, EU, APAC) |
OHLCV daily, fondamentaux basiques |
Gratuit |
★★★☆☆ |
| EOD Historical Data |
70+ bourses mondiales |
OHLCV, fondamentaux, splits |
€20/mois (All World) |
★★★★☆ |
| FRED API |
Macro US + mondial |
Yield curve, GDP, CPI, PMI |
Gratuit |
★★★★★ |
| SEC EDGAR |
US companies (SEC filers) |
10-K, 10-Q, 8-K, XBRL |
Gratuit |
★★★★★ |
Coût total mensuel : ~50€/mois. C'est 1000x moins cher que Bloomberg Terminal (€2000/mois) et largement suffisant pour notre univers de 2000-3000 instruments actifs.
1. Interactive Brokers — reqHistoricalData
IBKR est votre broker ET une source de données. L'API reqHistoricalData fournit des barres OHLCV avec un historique de 1 an pour les données intraday et illimité pour le daily. Mais attention aux limitations.
Python
from ib_insync import IB, Stock, Contract
import pandas as pd
import time
from datetime import datetime
class IBKRDataClient:
"""Client IBKR avec gestion du pacing (max 60 req/10min)."""
PACING_DELAY = 10.5
MAX_BARS = 5000
def __init__(self, host="127.0.0.1", port=4002):
self.ib = IB()
self.ib.connect(host, port, clientId=10)
self._last_request = 0
def _pace(self):
"""Respect du rate limit IBKR : 60 requêtes / 10 min."""
elapsed = time.time() - self._last_request
if elapsed < self.PACING_DELAY:
time.sleep(self.PACING_DELAY - elapsed)
self._last_request = time.time()
def get_daily_bars(self, symbol: str,
exchange: str = "SMART",
currency: str = "USD",
duration: str = "2 Y") -> pd.DataFrame:
"""Récupère les barres daily OHLCV ajustées."""
self._pace()
contract = Stock(symbol, exchange, currency)
self.ib.qualifyContracts(contract)
bars = self.ib.reqHistoricalData(
contract,
endDateTime="",
durationStr=duration,
barSizeSetting="1 day",
whatToShow="ADJUSTED_LAST",
useRTH=True,
formatDate=1,
)
df = pd.DataFrame(bars)
df["symbol"] = symbol
df["source"] = "ibkr"
df["ingested_at"] = datetime.utcnow()
return df
Limitations IBKR à connaître
Pacing violations : maximum 60 requêtes historiques par 10 minutes. Si vous dépassez, IBKR vous déconnecte pendant 10 minutes. Notre PACING_DELAY de 10.5s garantit qu'on reste sous la limite. Conséquence : pour 3000 symboles, l'ingestion IBKR seule prendrait ~8.7 heures. C'est pourquoi nous utilisons IBKR uniquement pour la validation croisée, pas comme source primaire.
2. Polygon.io — La référence US
Polygon est devenu la source de référence pour les quants retail aux US. API REST propre, données tick-level, corporate actions correctes, et un plan Starter à $29/mois qui suffit largement pour du daily.
Python
import httpx
import pandas as pd
from datetime import date, timedelta
class PolygonClient:
BASE_URL = "https://api.polygon.io"
def __init__(self, api_key: str):
self.client = httpx.Client(
base_url=self.BASE_URL,
params={"apiKey": api_key},
timeout=30.0,
)
def get_aggregates(self, ticker: str,
from_date: date,
to_date: date,
timespan: str = "day") -> pd.DataFrame:
"""Récupère les barres agrégées via REST API."""
url = f"/v2/aggs/ticker/{ticker}/range/1/{timespan}/{from_date}/{to_date}"
resp = self.client.get(url, params={
"adjusted": "true",
"sort": "asc",
"limit": 50000,
})
data = resp.json()
if data.get("resultsCount", 0) == 0:
return pd.DataFrame()
df = pd.DataFrame(data["results"])
df = df.rename(columns={
"o": "open", "h": "high", "l": "low",
"c": "close", "v": "volume", "t": "timestamp",
})
df["date"] = pd.to_datetime(df["timestamp"], unit="ms").dt.date
df["symbol"] = ticker
df["source"] = "polygon"
return df[["symbol", "date", "open", "high", "low", "close", "volume", "source"]]
def get_splits(self, ticker: str) -> pd.DataFrame:
"""Corporate actions : splits et reverse splits."""
resp = self.client.get(f"/v3/reference/splits",
params={"ticker": ticker, "limit": 100})
return pd.DataFrame(resp.json().get("results", []))
3. Yahoo Finance — Gratuit mais piégé
yfinance est tentant : gratuit, couverture mondiale, API simple. Mais c'est un piège pour le backtesting sérieux. Les données ajustées sont recalculées rétroactivement, créant un look-ahead bias invisible.
Python
import yfinance as yf
import pandas as pd
def get_yahoo_bars(symbol: str, period: str = "2y") -> pd.DataFrame:
"""
ATTENTION : Yahoo 'Adj Close' est recalculé rétroactivement.
Utiliser UNIQUEMENT pour :
- Validation croisée (vs Polygon/IBKR)
- Couverture EU/APAC quand EOD Historical est indisponible
- JAMAIS comme source primaire pour le backtesting
"""
ticker = yf.Ticker(symbol)
df = ticker.history(period=period, auto_adjust=False)
df["adj_factor"] = df["Adj Close"] / df["Close"]
df = df.rename(columns={
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume",
"Adj Close": "adj_close",
})
df["source"] = "yahoo"
return df.reset_index()
4. EOD Historical Data — La couverture mondiale
Pour l'Europe (Euronext, XETRA, LSE) et l'Asie-Pacifique (TSE, HKEX, ASX), EOD Historical Data est le meilleur rapport qualité/prix. Le plan All World Extended à 20€/mois couvre 70+ bourses avec des données ajustées pour les corporate actions.
Python
import httpx
import pandas as pd
class EODClient:
BASE_URL = "https://eodhd.com/api"
EXCHANGES = {
"eu": ["PA", "XETRA", "LSE", "MI", "AS", "MC"],
"apac": ["TSE", "HK", "AU", "KO", "SG"],
}
def __init__(self, api_token: str):
self.client = httpx.Client(timeout=30.0)
self.token = api_token
def get_eod_bulk(self, exchange: str,
trade_date: str) -> pd.DataFrame:
"""Bulk download : toutes les actions d'une bourse en 1 requête."""
url = f"{self.BASE_URL}/eod-bulk-last-day/{exchange}"
resp = self.client.get(url, params={
"api_token": self.token,
"date": trade_date,
"fmt": "json",
})
df = pd.DataFrame(resp.json())
df["exchange"] = exchange
df["source"] = "eodhd"
return df
def ingest_region(self, region: str, trade_date: str) -> pd.DataFrame:
"""Ingestion bulk de toute une région en parallèle."""
frames = []
for exchange in self.EXCHANGES[region]:
df = self.get_eod_bulk(exchange, trade_date)
frames.append(df)
return pd.concat(frames, ignore_index=True)
5. FRED API — Les données macro
Python
from fredapi import Fred
import pandas as pd
class FREDClient:
SERIES = {
"yield_10y": "DGS10",
"yield_2y": "DGS2",
"yield_3m": "DGS3MO",
"fed_funds": "DFF",
"cpi_yoy": "CPIAUCSL",
"pce_core": "PCEPILFE",
"gdp_real": "GDPC1",
"unemployment": "UNRATE",
"pmi_mfg": "MANEMP",
"credit_spread": "BAMLH0A0HYM2",
"m2_supply": "M2SL",
"vix": "VIXCLS",
}
def __init__(self, api_key: str):
self.fred = Fred(api_key=api_key)
def get_all_macro(self, start: str = "2010-01-01") -> pd.DataFrame:
"""Récupère toutes les séries macro et les aligne par date."""
frames = {}
for name, series_id in self.SERIES.items():
s = self.fred.get_series(series_id, start)
frames[name] = s
df = pd.DataFrame(frames)
df = df.ffill()
return df
6. Stockage : Parquet + DuckDB
Le choix du format de stockage est critique pour la performance. Les fichiers CSV sont un anti-pattern absolu pour le quant trading. Parquet offre une compression 10x, des lectures colonnes sélectives, et une compatibilité native avec pandas, polars et DuckDB.
CSV (anti-pattern)
Pas de types, pas de compression, lecture ligne par ligne. 1 an de données US (3000 symboles) = 12 GB en CSV. Temps de chargement : 45 secondes. Parsing des dates à chaque lecture. Aucun support des types nullable.
Parquet (notre choix)
Types stricts, compression Snappy/Zstd, lecture colonaire. Mêmes données = 800 MB en Parquet. Temps de chargement : 1.2 secondes. Metadata intégrée. Partitioning par date/région natif. 15x plus rapide.
Python
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from pathlib import Path
BARS_SCHEMA = pa.schema([
("symbol", pa.string()),
("date", pa.date32()),
("open", pa.float64()),
("high", pa.float64()),
("low", pa.float64()),
("close", pa.float64()),
("volume", pa.int64()),
("adj_close", pa.float64()),
("adj_factor", pa.float64()),
("source", pa.string()),
("region", pa.string()),
("ingested_at", pa.timestamp("us")),
])
def write_daily_partition(df: pd.DataFrame,
base_dir: Path,
trade_date: str):
"""Écrit une partition Parquet par date (Hive-style)."""
table = pa.Table.from_pandas(df, schema=BARS_SCHEMA)
pq.write_to_dataset(
table,
root_path=str(base_dir / "bars"),
partition_cols=["region", "date"],
existing_data_behavior="overwrite_or_ignore",
)
SQL (DuckDB)
SELECT symbol, date, close, volume
FROM read_parquet('data/bars/region=us/date=2026-02-28/*.parquet')
WHERE volume > 1000000
ORDER BY volume DESC
LIMIT 50;
SELECT symbol, sector,
PERCENT_RANK() OVER (PARTITION BY sector ORDER BY volume) AS vol_pctile
FROM read_parquet('data/bars/region=us/**/*.parquet')
JOIN read_parquet('data/metadata/us_stocks.parquet') USING (symbol)
WHERE date = '2026-02-28';
Pourquoi DuckDB et pas PostgreSQL ?
DuckDB est un moteur analytique in-process — pas de serveur à gérer, pas de connexion réseau, pas de configuration. Il lit directement les fichiers Parquet et exécute des requêtes SQL OLAP 10-50x plus vite que PostgreSQL pour les workloads analytiques. Pour un quant solo, c'est la solution parfaite : zéro maintenance, performance maximale.
Corporate Actions & Ajustements
Le problème que 95% des backtests ignorent
Les corporate actions — splits, dividendes, spin-offs, fusions — sont le cimetière silencieux des backtests. Un split 4:1 d'Apple en 2020 transforme un cours de $500 en $125 du jour au lendemain. Si votre pipeline ne gère pas ça correctement, votre momentum signal verra un crash de -75% et génèrera un signal d'achat massif le jour du split. Résultat : un backtest fantastique, un live trading catastrophique.
Types de corporate actions
| Action |
Impact prix |
Impact volume |
Difficulté |
Fréquence |
| Forward Split (ex: 4:1) |
Prix divisé par ratio |
Volume multiplié par ratio |
Facile |
~200/an (US) |
| Reverse Split (ex: 1:10) |
Prix multiplié par ratio |
Volume divisé par ratio |
Facile |
~150/an (US) |
| Cash Dividend |
Réduction du montant du dividende |
Aucun impact |
Moyen |
~8000/trimestre (US) |
| Special Dividend |
Réduction (parfois >10%) |
Aucun impact |
Élevé |
~100/an (US) |
| Spin-off |
Réduction proportionnelle |
Variable |
Très élevé |
~50/an (US) |
| Merger/Acquisition |
Conversion au ratio d'échange |
Ticker disparaît |
Très élevé |
~300/an (US) |
Pourquoi Yahoo Adjusted Close est FAUX pour le backtesting
C'est l'erreur la plus répandue et la plus pernicieuse dans le quant retail. Voici le problème :
Exemple
Notre solution : pipeline d'ajustement maison
Python
import pandas as pd
import numpy as np
class CorporateActionAdjuster:
"""
Ajustement point-in-time pour splits et dividendes.
Principe : on stocke les prix BRUTS + un journal des corporate actions.
L'ajustement est recalculé à la volée lors du backtest,
en utilisant UNIQUEMENT les actions connues à la date T.
"""
def __init__(self, actions_df: pd.DataFrame):
"""actions_df: colonnes [symbol, date, type, ratio, amount]"""
self.actions = actions_df.sort_values("date")
def adjust_prices(self, bars_df: pd.DataFrame,
as_of_date: pd.Timestamp) -> pd.DataFrame:
"""
Ajuste les prix en utilisant UNIQUEMENT les corporate actions
connues à as_of_date (point-in-time correctness).
"""
df = bars_df.copy()
known_actions = self.actions[
self.actions["date"] <= as_of_date
]
for _, action in known_actions.iterrows():
sym = action["symbol"]
mask = (df["symbol"] == sym) & (df["date"] < action["date"])
if action["type"] == "split":
ratio = action["ratio"]
df.loc[mask, ["open", "high", "low", "close"]] /= ratio
df.loc[mask, "volume"] *= ratio
elif action["type"] == "dividend":
adj_factor = 1 - (action["amount"] / df.loc[mask, "close"])
df.loc[mask, ["open", "high", "low", "close"]] *= adj_factor.values[:, None]
return df
Survivorship Bias — Le tueur invisible
Le survivorship bias est peut-être le biais le plus destructeur en backtesting quantitatif. Si vous backtestez uniquement les actions qui existent aujourd'hui, vous excluez toutes celles qui ont fait faillite, ont été rachetées, ou ont été radiées. Votre backtest ne voit que les survivants — les gagnants — et surestime drastiquement la performance.
| Source de données delisted |
Couverture |
Coût |
Notes |
| Polygon.io (Starter+) |
US equities depuis 2004 |
$29/mois |
Inclut les tickers delisted dans l'historique |
| EOD Historical Data |
Global, variable par bourse |
€20/mois |
Délisted via endpoint /exchange-symbol-list |
| Norgate Data |
US, AU — gold standard retail |
$55/mois |
Survivorship-bias-free par conception |
| CRSP/Compustat |
US — gold standard académique |
$25K+/an (institutional) |
Hors budget retail — mais c'est la référence |
Stratégie pragmatique anti-survivorship
Avec notre budget, la solution réaliste est : Polygon.io pour les US (inclut les delisted) + EOD Historical pour EU/APAC (couverture partielle des delisted). Ce n'est pas parfait — CRSP reste le gold standard — mais c'est 99.5% aussi bon pour 0.2% du prix. L'important est d'avoir conscience du biais résiduel et de l'intégrer dans votre estimation d'erreur du backtest (±1-2% CAGR).
Point-in-Time Fundamental Data
Le même problème existe pour les données fondamentales. Quand une entreprise publie ses résultats Q4 le 15 février, ces chiffres ne sont pas disponibles au 31 décembre (fin du trimestre). Votre backtest doit utiliser les résultats Q3 pour toute décision prise entre le 1er janvier et le 14 février.
Python
import pandas as pd
from datetime import date
class PITFundamentals:
"""
Point-in-Time fundamental data.
Utilise la date de PUBLICATION SEC (filing date),
pas la date de fin de trimestre (period end date).
"""
def get_known_fundamentals(self, symbol: str,
as_of: date) -> dict:
"""Retourne les fondamentaux CONNUS à la date as_of."""
filing = self.db.query(
"""
SELECT eps, revenue, gross_margin, roe, net_income,
total_assets, operating_cf, filing_date
FROM fundamentals
WHERE symbol = ? AND filing_date <= ?
ORDER BY filing_date DESC
LIMIT 1
""",
[symbol, as_of]
)
return filing.to_dict() if not filing.empty else {}
def compute_earnings_surprise(self, symbol: str,
as_of: date) -> float:
"""
SUE score (Standardized Unexpected Earnings).
Surprise = (EPS_actual - EPS_expected) / std(surprises)
EPS_expected = moyenne des 4 derniers trimestres (seasonal random walk)
"""
filings = self.db.query(
"""
SELECT eps, filing_date FROM fundamentals
WHERE symbol = ? AND filing_date <= ?
ORDER BY filing_date DESC LIMIT 5
""",
[symbol, as_of]
)
if len(filings) < 5:
return 0.0
actual = filings.iloc[0]["eps"]
expected = filings.iloc[1:5]["eps"].mean()
std = filings.iloc[1:5]["eps"].std()
return (actual - expected) / max(std, 0.01)
Les 50 Features qui comptent
L'arsenal du quant systématique
En 30 ans de carrière, j'ai testé des milliers de features. La plupart sont du bruit. Voici les 50 qui ont démontré un pouvoir prédictif statistiquement significatif, validé par la littérature académique et confirmé en production. Chaque feature est accompagnée de sa formule, de la référence académique, et de son Information Coefficient (IC) typique — la corrélation entre le signal et les rendements futurs.
Un IC de 0.05 semble minuscule, mais avec 2000 instruments et un rebalancement quotidien, c'est suffisant pour générer un Sharpe de 1.5+. La clé est la breadth (nombre de paris indépendants) qui amplifie même un faible IC via la loi fondamentale du management actif de Grinold & Kahn.
| # |
Feature |
Formule |
Référence |
IC typique |
| 1 |
Return 1d |
close(t) / close(t-1) - 1 |
Universelle |
-0.02 |
| 2 |
Return 5d |
close(t) / close(t-5) - 1 |
Universelle |
-0.01 |
| 3 |
Return 21d |
close(t) / close(t-21) - 1 |
Jegadeesh 1990 |
+0.02 |
| 4 |
Return 63d |
close(t) / close(t-63) - 1 |
Jegadeesh & Titman 1993 |
+0.04 |
| 5 |
Return 126d |
close(t) / close(t-126) - 1 |
Jegadeesh & Titman 1993 |
+0.04 |
| 6 |
Return 252d |
close(t) / close(t-252) - 1 |
De Bondt & Thaler 1985 |
-0.03 |
| 7 |
Realized Vol 21d |
σ = std(log_returns, 21) × √252 |
Parkinson 1980 |
-0.02 |
| 8 |
Realized Vol 63d |
σ = std(log_returns, 63) × √252 |
Parkinson 1980 |
-0.02 |
| 9 |
Parkinson Vol |
σP = √(1/4n·ln2 × Σ(ln H/L)²) |
Parkinson 1980 |
-0.03 |
| 10 |
Garman-Klass Vol |
σGK = √(0.5(lnH/L)² - (2ln2-1)(lnC/O)²) |
Garman & Klass 1980 |
-0.03 |
| 11 |
SMA Cross 10/50 |
SMA(10) / SMA(50) - 1 |
Brock et al. 1992 |
+0.03 |
| 12 |
EMA Cross 20/200 |
EMA(20) / EMA(200) - 1 |
Appel 1979 (MACD) |
+0.04 |
| 13 |
Bollinger Width |
(BB_upper - BB_lower) / SMA(20) |
Bollinger 2001 |
-0.02 |
| 14 |
Bollinger %B |
(close - BB_lower) / (BB_upper - BB_lower) |
Bollinger 2001 |
+0.02 |
| 15 |
ATR Normalized |
ATR(14) / close |
Wilder 1978 |
-0.02 |
| # |
Feature |
Formule |
Référence |
IC typique |
| 16 |
Volume Ratio |
volume(t) / SMA(volume, 20) |
Karpoff 1987 |
+0.02 |
| 17 |
OBV Slope |
regression_slope(OBV, 21) / close |
Granville 1963 |
+0.03 |
| 18 |
VWAP Deviation |
(close - VWAP) / VWAP |
Berkowitz et al. 1988 |
+0.02 |
| 19 |
Dollar Volume Rank |
percentile(close × volume, universe) |
Amihud 2002 |
+0.01 |
| 20 |
Volume-Price Trend |
VPT = Σ(volume × (close - close_prev) / close_prev) |
Buff 1972 |
+0.02 |
| 21 |
Amihud Illiquidity |
ILLIQ = mean(|return| / dollar_volume, 21) |
Amihud 2002 |
+0.04 |
| 22 |
Kyle's Lambda |
λ = regression(Δprice, signed_volume) |
Kyle 1985 |
+0.03 |
| 23 |
Turnover Ratio |
volume / shares_outstanding |
Datar et al. 1998 |
-0.03 |
| 24 |
Volume Volatility |
std(volume, 21) / mean(volume, 21) |
Chordia et al. 2001 |
+0.02 |
| 25 |
Net Volume Force |
Σ(volume × sign(close - open), 5) |
Llorente et al. 2002 |
+0.03 |
| # |
Feature |
Formule |
Référence |
IC typique |
| 26 |
Momentum 12-1 |
return(252d) - return(21d) [skip 1 month] |
Jegadeesh & Titman 1993 |
+0.05 |
| 27 |
Momentum 6-1 |
return(126d) - return(21d) |
Jegadeesh & Titman 1993 |
+0.05 |
| 28 |
Frog-in-the-Pan |
FIP = sign(ret_12m) × (%neg_days - %pos_days) |
Da, Gurun & Warachka 2014 |
+0.04 |
| 29 |
52-Week High Prox. |
close / max(high, 252d) |
George & Hwang 2004 |
+0.06 |
| 30 |
Acceleration |
mom_6m(t) - mom_6m(t-63) |
Gettleman & Marks 2006 |
+0.03 |
| 31 |
Industry Momentum |
return_sector_12m - return_market_12m |
Moskowitz & Grinblatt 1999 |
+0.04 |
| 32 |
Intl Momentum Spill |
return_same_sector_other_regions(21d) |
Rizova 2010 |
+0.03 |
| 33 |
Momentum Quality |
mom_12_1 × (1 - volatility_rank) |
Asness et al. 2014 |
+0.05 |
| 34 |
Trend Strength |
R² of linear regression(close, 63d) |
Hurst 1951 (R/S analysis) |
+0.03 |
| 35 |
Max Return Effect |
max(daily_return, 21d) |
Bali et al. 2011 |
-0.04 |
| # |
Feature |
Formule |
Référence |
IC typique |
| 36 |
SUE Score |
(EPS_actual - EPS_expected) / σ(surprises) |
Foster, Olsen & Shevlin 1984 |
+0.06 |
| 37 |
Revenue Surprise |
(Rev_actual - Rev_expected) / Rev_expected |
Jegadeesh & Livnat 2006 |
+0.04 |
| 38 |
Gross Margin Trend |
Δgross_margin(Q vs Q-4) / σ(margins) |
Novy-Marx 2013 |
+0.04 |
| 39 |
ROE vs Sector |
z-score(ROE, sector_peers) |
Fama & French 2006 |
+0.03 |
| 40 |
Accruals Anomaly |
(net_income - operating_CF) / total_assets |
Sloan 1996 |
-0.05 |
| 41 |
Asset Growth |
Δtotal_assets(YoY) / total_assets |
Cooper, Gulen & Schill 2008 |
-0.04 |
| 42 |
Net Operating Assets |
(OA - OL) / total_assets |
Hirshleifer et al. 2004 |
-0.04 |
| 43 |
PEAD Drift |
CAR(+2, +60) after earnings |
Ball & Brown 1968 |
+0.05 |
| 44 |
Free CF Yield |
FCF / market_cap |
Lakonishok et al. 1994 |
+0.03 |
| 45 |
Earnings Quality |
operating_CF / net_income |
Penman & Zhang 2002 |
+0.03 |
| # |
Feature |
Formule |
Référence |
IC typique |
| 46 |
Z-Score Secteur |
(feature - mean_sector) / std_sector |
Standardisation classique |
Variable |
| 47 |
Percentile Rank |
rank(feature) / count(universe) |
Non-paramétrique |
Variable |
| 48 |
Relative Strength |
return(symbol) - return(benchmark) |
Levy 1967 |
+0.04 |
| 49 |
Beta-Adjusted Return |
return - β × return_market |
Jensen 1968 (Alpha de Jensen) |
+0.03 |
| 50 |
FF5 Residual Alpha |
α de régression Fama-French 5 facteurs |
Fama & French 2015 |
+0.05 |
Implémentation Python complète
Python
import pandas as pd
import numpy as np
from scipy import stats
class FeatureEngine:
"""Calcul vectorisé de 50 features sur un univers de symboles."""
def compute_price_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Features 1-15 : price-based."""
g = df.groupby("symbol")["adj_close"]
for d in [1, 5, 21, 63, 126, 252]:
df[f"ret_{d}d"] = g.pct_change(d)
df["log_ret"] = np.log(g.pct_change() + 1)
df["rvol_21d"] = df.groupby("symbol")["log_ret"]\
.rolling(21).std() * np.sqrt(252)
df["rvol_63d"] = df.groupby("symbol")["log_ret"]\
.rolling(63).std() * np.sqrt(252)
df["hl_ratio"] = np.log(df["high"] / df["low"])
df["parkinson_vol"] = df.groupby("symbol")["hl_ratio"]\
.transform(lambda x: x.rolling(21).apply(
lambda h: np.sqrt((h**2).sum() / (4 * len(h) * np.log(2))) * np.sqrt(252)
))
df["sma_10"] = g.rolling(10).mean()
df["sma_50"] = g.rolling(50).mean()
df["ema_20"] = g.transform(lambda x: x.ewm(span=20).mean())
df["ema_200"] = g.transform(lambda x: x.ewm(span=200).mean())
df["sma_cross_10_50"] = df["sma_10"] / df["sma_50"] - 1
df["ema_cross_20_200"] = df["ema_20"] / df["ema_200"] - 1
df["bb_mid"] = g.rolling(20).mean()
df["bb_std"] = g.rolling(20).std()
df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]
df["bb_width"] = (df["bb_upper"] - df["bb_lower"]) / df["bb_mid"]
df["bb_pctb"] = (df["adj_close"] - df["bb_lower"]) / (df["bb_upper"] - df["bb_lower"])
tr = pd.concat([
df["high"] - df["low"],
(df["high"] - df["adj_close"].shift(1)).abs(),
(df["low"] - df["adj_close"].shift(1)).abs(),
], axis=1).max(axis=1)
df["atr_norm"] = tr.rolling(14).mean() / df["adj_close"]
return df
def compute_momentum_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Features 26-35 : momentum avec skip-month."""
g = df.groupby("symbol")["adj_close"]
ret_12m = g.pct_change(252)
ret_1m = g.pct_change(21)
df["mom_12_1"] = ret_12m - ret_1m
ret_6m = g.pct_change(126)
df["mom_6_1"] = ret_6m - ret_1m
def frog_in_pan(returns):
pos_pct = (returns > 0).sum() / len(returns)
neg_pct = (returns < 0).sum() / len(returns)
return np.sign(returns.sum()) * (neg_pct - pos_pct)
df["frog_in_pan"] = df.groupby("symbol")["log_ret"]\
.transform(lambda x: x.rolling(252).apply(frog_in_pan))
df["high_52w"] = df.groupby("symbol")["high"]\
.transform(lambda x: x.rolling(252).max())
df["prox_52w"] = df["adj_close"] / df["high_52w"]
return df
Pourquoi le skip-month dans le momentum ?
Jegadeesh & Titman (1993) ont découvert que le rendement du mois le plus récent est négativement corrélé aux rendements futurs (short-term reversal), tandis que les rendements des mois 2 à 12 sont positivement corrélés (momentum). En soustrayant le rendement du dernier mois (21 jours), on isole le signal de momentum pur, augmentant l'IC de ~0.03 à ~0.05. Ce détail simple vaut des dizaines de basis points de performance annuelle.
Feature Store & Retrieval
L'entrepôt de features du quant solo
Un feature store est un système qui stocke, versionne et sert les features calculées. Les géants (Two Sigma, Citadel) utilisent des solutions internes à plusieurs millions de dollars. Nous allons construire un feature store minimaliste mais production-ready avec Parquet + DuckDB + Git/DVC — le tout pour 0€ de coûts d'infrastructure supplémentaires.
Architecture du Feature Store
Point-in-Time Snapshots
Chaque jour, on écrit un snapshot Parquet contenant les 50 features pour chaque symbole de l'univers. Structure : features/date=YYYY-MM-DD/features.parquet. Ce sont des snapshots immuables — on ne modifie jamais un snapshot passé, on en crée de nouveaux.
Feature Versioning avec DVC
DVC (Data Version Control) est le Git des données. Chaque modification du pipeline de features crée une nouvelle version. Si vous changez la fenêtre de calcul du momentum de 252 à 200 jours, DVC track ce changement et vous permet de comparer les performances des deux versions.
Query Patterns optimisés
Trois patterns de requête couvrent 99% des besoins : by-date (cross-sectional pour le ranking), by-symbol (time-series pour le backtest), by-feature (analyse d'un signal spécifique). Le partitionnement Parquet par date rend le pattern by-date ultra-rapide.
Python
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from datetime import date
from typing import List, Optional
class FeatureStore:
"""
Feature Store basé sur Parquet + DuckDB.
- Écriture : snapshots immuables partitionnés par date
- Lecture : DuckDB pour les requêtes analytiques
- Versioning : DVC pour le tracking des changements
"""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
self.features_dir = base_dir / "features"
self.features_dir.mkdir(parents=True, exist_ok=True)
self.con = duckdb.connect()
def write_snapshot(self, df: pd.DataFrame, trade_date: date):
"""Écrit un snapshot immuable pour une date donnée."""
path = self.features_dir / f"date={trade_date}"
path.mkdir(exist_ok=True)
table = pa.Table.from_pandas(df)
pq.write_table(table, path / "features.parquet",
compression="zstd")
def query_cross_sectional(self, trade_date: date,
features: List[str],
region: Optional[str] = None) -> pd.DataFrame:
"""Requête cross-sectional : tous les symboles pour une date."""
cols = ", ".join(["symbol"] + features)
where = f"WHERE region = '{region}'" if region else ""
return self.con.execute(f"""
SELECT {cols}
FROM read_parquet('{self.features_dir}/date={trade_date}/*.parquet')
{where}
ORDER BY symbol
""").df()
def query_time_series(self, symbol: str,
features: List[str],
start: date, end: date) -> pd.DataFrame:
"""Requête time-series : un symbole sur une période."""
cols = ", ".join(features)
return self.con.execute(f"""
SELECT date, {cols}
FROM read_parquet('{self.features_dir}/date=*/*.parquet',
hive_partitioning=true)
WHERE symbol = '{symbol}'
AND date BETWEEN '{start}' AND '{end}'
ORDER BY date
""").df()
def query_feature_analysis(self, feature: str,
trade_date: date) -> pd.DataFrame:
"""Distribution d'une feature sur tout l'univers."""
return self.con.execute(f"""
SELECT
symbol, region, sector, {feature},
PERCENT_RANK() OVER (ORDER BY {feature}) as pctile,
({feature} - AVG({feature}) OVER ()) /
STDDEV({feature}) OVER () as z_score
FROM read_parquet('{self.features_dir}/date={trade_date}/*.parquet')
WHERE {feature} IS NOT NULL
ORDER BY {feature} DESC
""").df()
DVC pour le versioning des features
Terminal
$ dvc init
$ dvc remote add -d storage s3://algo-million-data/features
$ dvc add data/features/
Adding data/features/ to .dvc
$ git add data/features.dvc
$ git commit -m "feat: momentum window 200d instead of 252d"
$ dvc push
$ git checkout v1.0 -- data/features.dvc
$ dvc checkout
Restored features/ to v1.0 (252d momentum)
$ python scripts/compare_ic.py --v1=v1.0 --v2=v1.1
mom_12_1: IC v1.0=0.048 IC v1.1=0.051 (+0.003)
mom_6_1: IC v1.0=0.045 IC v1.1=0.043 (-0.002)
Conclusion: fenêtre 200d légèrement meilleure sur 12-1, neutre sur 6-1
Volume de données typique
Pour notre univers de ~3000 instruments avec 50 features, un snapshot daily pèse environ 1.5 MB en Parquet (Zstd). Sur 5 ans (1260 jours de trading), le feature store total fait ~1.9 GB. Facilement stockable sur n'importe quelle VM et synchronisable via DVC + S3/GCS (~0.05€/mois de stockage cloud).
Feature Selection & Importance
Séparer le signal du bruit
Avoir 50 features ne signifie pas qu'il faut toutes les utiliser simultanément. La malédiction de la dimensionnalité guette : trop de features = overfitting garanti. L'objectif est d'identifier les 15-25 features qui portent le plus de signal prédictif et de les combiner de manière optimale.
Information Coefficient (IC) Analysis
L'IC est la métrique reine du quant. Pour chaque feature, on calcule la corrélation de Spearman entre le signal au temps T et le rendement forward sur différents horizons. Un IC stable dans le temps vaut plus qu'un IC élevé mais volatil.
Python
import pandas as pd
import numpy as np
from scipy.stats import spearmanr
def compute_ic_series(feature_df: pd.DataFrame,
feature_name: str,
forward_returns: pd.DataFrame,
horizons: list = [1, 5, 21]) -> pd.DataFrame:
"""
Calcule la série temporelle d'IC pour une feature.
IC_t = spearman_corr(feature_t, return_t+h) pour chaque date t.
"""
results = {}
for h in horizons:
ic_series = []
for dt in feature_df["date"].unique():
feat = feature_df[feature_df["date"] == dt][feature_name]
fwd = forward_returns[forward_returns["date"] == dt][f"fwd_ret_{h}d"]
aligned = pd.concat([feat, fwd], axis=1).dropna()
if len(aligned) > 30:
ic, _ = spearmanr(aligned.iloc[:, 0], aligned.iloc[:, 1])
ic_series.append({"date": dt, "ic": ic})
results[f"IC_{h}d"] = pd.DataFrame(ic_series)
for h, ic_df in results.items():
mean_ic = ic_df["ic"].mean()
std_ic = ic_df["ic"].std()
icir = mean_ic / std_ic
pct_positive = (ic_df["ic"] > 0).mean()
print(f"{feature_name} {h}: mean={mean_ic:.4f} "
f"ICIR={icir:.2f} %pos={pct_positive:.1%}")
return results
IC Heatmap par feature et horizon
Feature Correlation Matrix
Les features fortement corrélées entre elles n'apportent pas d'information incrémentale. Pire, elles amplifient le bruit et créent de l'instabilité dans les modèles. Règle d'or : si deux features ont une corrélation > 0.7, garder uniquement celle avec le meilleur ICIR.
Python
def select_uncorrelated_features(
feature_df: pd.DataFrame,
ic_scores: dict,
max_corr: float = 0.7
) -> list:
"""
Sélection gloutonne : prend la feature avec le meilleur ICIR,
puis élimine toutes les features corrélées > max_corr.
Répète jusqu'à épuisement.
"""
corr_matrix = feature_df.corr(method="spearman")
sorted_features = sorted(ic_scores, key=ic_scores.get, reverse=True)
selected = []
eliminated = set()
for feat in sorted_features:
if feat in eliminated:
continue
selected.append(feat)
for other in sorted_features:
if other != feat and abs(corr_matrix.loc[feat, other]) > max_corr:
eliminated.add(other)
return selected
SHAP Values pour l'importance non-linéaire
La corrélation linéaire (IC) manque les interactions non-linéaires entre features. SHAP (SHapley Additive exPlanations) quantifie la contribution marginale de chaque feature dans un modèle non-linéaire (XGBoost, Random Forest) en utilisant la théorie des jeux coopératifs de Shapley.
Python
import shap
import xgboost as xgb
import numpy as np
def compute_shap_importance(features_df, target):
"""
Entraîne un XGBoost léger et calcule les SHAP values.
ATTENTION : ce n'est PAS pour prédire — c'est pour comprendre
quelles features portent de l'information non-linéaire.
"""
model = xgb.XGBRegressor(
n_estimators=100,
max_depth=4,
learning_rate=0.1,
subsample=0.8,
)
model.fit(features_df, target)
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features_df)
importance = np.abs(shap_values).mean(axis=0)
ranking = pd.Series(importance,
index=features_df.columns)
return ranking.sort_values(ascending=False)
Feature Turnover Analysis
Le turnover d'une feature mesure à quelle fréquence elle change de signe ou de rang. Une feature avec un turnover trop élevé génère des signaux instables et des coûts de transaction excessifs. Une feature avec un turnover trop bas ne réagit pas assez vite aux changements de marché.
| Feature |
Turnover quotidien |
Interprétation |
Horizon adapté |
| Return 1d |
~85% |
Trop instable — signal noise |
Intraday uniquement |
| Momentum 12-1 |
~3% |
Stable — signal fort |
Monthly rebalancing |
| SUE Score |
~0.5% |
Très stable — event-driven |
Post-earnings (quarterly) |
| 52-Week High Prox. |
~8% |
Bon équilibre signal/turnover |
Weekly/biweekly |
| Volume Ratio |
~45% |
Trop volatile — filtrer par seuil |
Daily (avec filtre >2x) |
Data Quality & Monitoring
La paranoïa qui sauve votre capital
En production, les données sont toujours corrompues. Pas « parfois » — toujours. Sources qui tombent, prix stale (la Bourse de Tokyo qui ne met pas à jour un ticker pendant 3 jours), volumes aberrants (un fat finger qui crée un volume de 10 milliards), splits non signalés. Votre pipeline doit être paranoïaque par design.
Les 7 contrôles automatisés
Python
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class QualityAlert:
level: str
check: str
symbol: str
message: str
class DataQualityChecker:
"""7 contrôles automatisés — exécutés après chaque ingestion."""
def check_all(self, df: pd.DataFrame,
prev_df: pd.DataFrame) -> List[QualityAlert]:
alerts = []
alerts += self._check_missing_data(df, prev_df)
alerts += self._check_outlier_returns(df)
alerts += self._check_stale_prices(df, prev_df)
alerts += self._check_volume_anomalies(df)
alerts += self._check_ohlc_consistency(df)
alerts += self._check_corporate_actions(df, prev_df)
alerts += self._check_cross_source(df)
return alerts
def _check_missing_data(self, df, prev_df):
"""Contrôle 1 : symboles manquants vs veille."""
today_syms = set(df["symbol"])
prev_syms = set(prev_df["symbol"])
missing = prev_syms - today_syms
alerts = []
if len(missing) > 50:
alerts.append(QualityAlert(
"fatal", "missing_data", "UNIVERSE",
f"{len(missing)} symboles manquants — source probablement down"
))
elif len(missing) > 10:
alerts.append(QualityAlert(
"warning", "missing_data", "UNIVERSE",
f"{len(missing)} symboles manquants : {list(missing)[:10]}"
))
return alerts
def _check_outlier_returns(self, df):
"""Contrôle 2 : returns > ±50% en une journée → split probable."""
alerts = []
df["daily_ret"] = df.groupby("symbol")["close"].pct_change()
outliers = df[df["daily_ret"].abs() > 0.5]
for _, row in outliers.iterrows():
alerts.append(QualityAlert(
"critical", "outlier_return", row["symbol"],
f"Return {row['daily_ret']:.1%} — vérifier split/corporate action"
))
return alerts
def _check_stale_prices(self, df, prev_df):
"""Contrôle 3 : prix identique à la veille → stale data."""
merged = df.merge(prev_df, on="symbol", suffixes=("_today", "_prev"))
stale = merged[merged["close_today"] == merged["close_prev"]]
return [
QualityAlert("warning", "stale_price", row["symbol"],
f"Prix identique à la veille : {row['close_today']}")
for _, row in stale.iterrows()
]
def _check_ohlc_consistency(self, df):
"""Contrôle 5 : High >= Low, High >= Open/Close, etc."""
bad = df[
(df["high"] < df["low"]) |
(df["high"] < df["open"]) |
(df["high"] < df["close"]) |
(df["low"] > df["open"]) |
(df["low"] > df["close"])
]
return [
QualityAlert("critical", "ohlc_inconsistent", row["symbol"],
f"OHLC incohérent : O={row['open']} H={row['high']} L={row['low']} C={row['close']}")
for _, row in bad.iterrows()
]
Alerting et escalation
| Niveau |
Action |
Exemples |
Notification |
| Warning |
Log + continuer |
5 prix stale, 3 symboles manquants |
Log file |
| Critical |
Flag les symboles + continuer |
Return >50%, OHLC incohérent |
Email / Telegram |
| Fatal |
STOPPER le pipeline |
>50 symboles manquants, source down |
SMS + email + bloquer les trades |
Règle cardinale : jamais de trading sur données suspectes
Si le pipeline émet une alerte fatal, aucun signal de trading n'est généré. La position courante est maintenue (pas de rebalancement) jusqu'à résolution manuelle. En 5 ans de production, cette règle m'a épargné au moins 3 trades catastrophiques causés par des données corrompues qui auraient généré des signaux aberrants.