| Finding | Headline | Implication | Action |
|---|---|---|---|
| 2 / 20 | markets qualify as Ready Markets (10% of the sample) | Concentration, not diversification. Only Malaysia and Chile score high enough to justify investment today. A further 15 markets require structural improvements. | Focus on Malaysia & Chile now |
| 49.6 pts | governance gap between Ready and Transition Markets | Governance is the primary gatekeeper. Markets below a minimum governance threshold are not investable regardless of energy demand or decarbonisation potential. | Screen for governance first |
| 0.0 | rank standard deviation for Malaysia & Chile across all four scenarios | The result is robust across all investor profiles, from impact-first to risk-averse. | No-regret deployment, act immediately |
Executive Summary
The data highlights 3 key insights that guide the investment strategy:
Strategic implication: Investment should focus on the strongest markets instead of being spread across marginal opportunities. For the 15 Transition Markets, the right approach is to monitor them and wait, with clear conditions to re-enter when governance improves. The 3 Watch & Wait markets face major challenges that make them non-viable in the near term.
1. Strategic Context
1.1 The Investor’s Dilemma
Emerging markets present a paradox:
- High growth, high risk: Energy demand is rising 2 to 3 times faster than in developed markets, but regulatory uncertainty and weak governance can put projects at risk.
- Difficult to compare markets: Countries like Nigeria, Vietnam, and Brazil each have very different contexts, making side-by-side comparison challenging.
This framework addresses that challenge by bringing 13 different indicators into one clear and transparent score.
1.2 Analytical Scope
This framework evaluates 20 emerging markets across 3 regions using 13 World Bank indicators tested under 4 investor scenarios.
| Region | N | Countries |
|---|---|---|
| Southeast Asia | 7 | Vietnam, Indonesia, Philippines, Thailand, Malaysia, Cambodia, Bangladesh |
| Africa | 8 | Kenya, Nigeria, Ghana, Ethiopia, Tanzania, Senegal, Côte d'Ivoire, Morocco |
| Latin America | 5 | Brazil, Mexico, Colombia, Peru, Chile |
2. Methodology
2.1 Framework Design and data source
The framework assesses markets using four dimensions, each reflecting a different part of the investment case. In the baseline scenario, Decarbonisation Opportunity has the greatest weight because it best reflects the policy drivers that have the strongest impact on project returns. All 13 indicators come from the World Bank Open Data API and are retrieved automatically using the wbgapi Python library. This makes the analysis fully reproducible, since every data point comes from an official published source, and it also reduces the risk of manual entry errors.
| Dimension | Indicator | World Bank Code | Direction | Dim. Weight | Ind. Weight | Rationale |
|---|---|---|---|---|---|---|
| Market Opportunity | GDP per capita growth | NY.GDP.PCAP.KD.ZG | Higher → better | 30% | 30% | Market expansion |
| Market Opportunity | Electricity consumption per capita | EG.USE.ELEC.KH.PC | Higher → better | 30% | 35% | Demand baseline |
| Market Opportunity | Population growth rate | SP.POP.GROW | Higher → better | 30% | 20% | Long-run demand |
| Market Opportunity | Urban population share | SP.URB.TOTL.IN.ZS | Higher → better | 30% | 15% | Urban concentration |
| Decarbonisation | Energy use per capita | EG.USE.PCAP.KG.OE | Higher → better | 45% | 35% | Carbon intensity proxy |
| Decarbonisation | Fossil electricity share | EG.ELC.FOSL.ZS | Higher → better | 45% | 30% | Direct replacement opportunity |
| Decarbonisation | Modern renewable share (inverse) | EG.ELC.RNWX.ZS | Lower → better | 45% | 25% | Low = growth runway |
| Decarbonisation | Electricity access rate | EG.ELC.ACCS.ZS | Higher → better | 45% | 10% | Infrastructure maturity |
| Business Environment | Political stability | PV.EST | Higher → better | 20% | 30% | Execution risk |
| Business Environment | Regulatory quality | RQ.EST | Higher → better | 20% | 30% | Policy predictability |
| Business Environment | Rule of law | RL.EST | Higher → better | 20% | 25% | Contract enforcement |
| Business Environment | Control of corruption | CC.EST | Higher → better | 20% | 15% | Procurement integrity |
| Energy Security | Net energy imports | EG.IMP.CONS.ZS | Higher → better | 5% | 100% | Import dependency |
Why Business Environment at only 20%?
It is not under-weighted—it acts as a threshold constraint. Markets below a minimum governance standard cannot compensate with strong market size or decarbonisation scores. The 49.6-point governance gap between Ready and Transition Markets confirms this.
Analysis period: 2018–2023 (6-year rolling average per indicator)
2.2 Processing Pipeline
Raw World Bank data passes through 4 sequential steps before scoring.
The pipeline ensures all indicators are on a common 0–100 scale before weighted aggregation, with missing values imputed regionally and outliers winsorised to prevent distortion.
2.4 Scoring
Scoring uses hierarchical weighted aggregation applied in two steps:
Within each dimension (step ⑤): The normalized indicators are combined into one dimension score using specific weights. For example, in Market Opportunity, electricity consumption has a higher weight (35%) than urbanization rate (15%).
Across dimensions (step ⑥): The dimension scores are then combined using scenario-based weights to create a final attractiveness score from 0 to 100. Countries are ranked from 1 to 20, and the rankings are tested for consistency across all four investor scenarios.
Reading the Sankey flow widths
The thickness of each flow shows how much it contributes to the final score.
Indicator → Dimension: dimension weight × indicator weight × 100
Dimension → Total: dimension weight × 100
All flows into Total Score sum to 100.
2.5 Investor Scenarios
To check whether the rankings depend too much on the selected weights, we run the model again using four different investor approaches, each based on a different investment objective.
| Scenario | Capital Mandate | Dominant Dimension | Key Question |
|---|---|---|---|
| Balanced | Neutral reference | Decarbonisation (45%) | Which markets score well overall? |
| Impact-First | Development finance institutions and climate-impact funds | Decarbonisation (60%) | Where is fossil-fuel replacement most urgent? |
| Growth-Focused | Commercial infrastructure funds and utilities | Market Opportunity (55%) | Where is energy demand growing fastest? |
| Risk-Averse | Pension funds and insurance-backed capital | Business Environment (40%) | Where is project execution safest? |
Energy Security is fixed at 5% across all scenarios, a deliberate design choice. Although import dependence and supply chain risk matter, they usually do not outweigh the main investment drivers. For this analysis, Energy Security uses a single indicator (net energy imports) due to data availability constraints.
2.6 Market Segmentation
Markets are segmented into three archetypes using a hybrid approach that combines the statistical rigor of K-Means clustering with the business clarity of score-based thresholds.
Why threshold-based over K-Means labels?
K-Means groups by similarity in dimension scores, which can produce counterintuitive clusters (e.g., Senegal and Bangladesh grouped together due to high energy security scores). Threshold-based labeling ensures that all markets above a certain score are consistently classified as “Ready,” regardless of their dimension profile. The two methods are complementary: K-Means validates the structure; thresholds ensure business clarity.
Show clustering function
def run_clustering(scores_df, n_clusters=3):
"""Segment markets into investment archetypes via K-Means."""
dim_cols = [c for c in scores_df if c.startswith('score_') and 'total' not in c]
features_scaled = StandardScaler().fit_transform(scores_df[dim_cols].fillna(50))
km = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
scores_df['cluster_id'] = km.fit_predict(features_scaled)
order = scores_df.groupby('cluster_id')['total_score'].mean().sort_values(ascending=False)
labels = ['Ready Markets', 'Transition Markets', 'Watch & Wait']
scores_df['cluster_label'] = scores_df['cluster_id'].map(dict(zip(order.index, labels)))
return scores_df3. Results
3.1 Market Rankings
Score gap: Malaysia (86.6) and Chile (70.3) are separated from the third-ranked market (Thailand, 60.6) by nearly 10 points. This gap, combined with the governance differential, constitutes a qualitatively different investment category. The next tier clusters between 41–60, while below 40, structural barriers dominate.
Top-10 Market Summary
Scores and Archetype Classification
| Rank | Country | Region | Total | Market Opp. | Decarbonisation | Business Env. | Energy Sec. | Archetype |
|---|---|---|---|---|---|---|---|---|
| 1 | Malaysia | SEA | 81.5 | 66.1 | 85.0 | 99.0 | 73.1 | Ready Markets |
| 2 | Chile | LatAm | 77.5 | 60.2 | 79.0 | 100.0 | 77.7 | Ready Markets |
| 3 | Thailand | SEA | 62.4 | 34.6 | 77.1 | 62.7 | 94.8 | Transition Markets |
| 4 | Vietnam | SEA | 58.0 | 53.4 | 59.1 | 59.4 | 69.5 | Transition Markets |
| 5 | Mexico | LatAm | 54.2 | 35.6 | 72.5 | 37.0 | 69.2 | Transition Markets |
| 6 | Brazil | LatAm | 52.0 | 44.3 | 58.7 | 52.1 | 37.8 | Transition Markets |
| 7 | Indonesia | SEA | 49.8 | 36.7 | 57.7 | 58.9 | 21.1 | Transition Markets |
| 8 | Morocco | Africa | 48.9 | 25.8 | 55.8 | 57.1 | 92.1 | Transition Markets |
| 9 | Peru | LatAm | 48.2 | 35.7 | 51.8 | 57.8 | 52.4 | Transition Markets |
| 10 | Colombia | LatAm | 47.7 | 41.1 | 53.5 | 53.2 | 12.6 | Transition Markets |
3.2 Dimensional Performance
Three patterns that stand out:
- Malaysia is the only market with no dimension score below 62, genuinely balanced across all four dimensions.
- Chile compensates for moderate market opportunity with near-perfect governance (99.99) and strong energy security (98.8).
- Kenya, Ethiopia, and Nigeria illustrate the governance trap: business environment scores below 35 overwhelm every other dimension, making market size and decarbonisation potential irrelevant.
3.3 Score Decomposition: Top Two Markets
Each segment shows the weighted contribution of one dimension to the total score
4. Sensitivity Analysis
The key question for any scoring exercise is whether results reflect genuine market quality or are an artefact of the chosen weights. We answer this by re-running the full model under four investor profiles and comparing each market’s rank across scenarios. Markets with zero rank variance are high-confidence selections regardless of the specific strategy used.
Rank Stability: All 20 Markets Across All Four Scenarios
| Country | Region | Avg. Rank | Std. Dev. | Best Rank | Worst Rank | Stability |
|---|---|---|---|---|---|---|
| Malaysia | SEA | 1.0 | 0.0 | 1 | 1 | Very High |
| Chile | LatAm | 2.0 | 0.0 | 2 | 2 | Very High |
| Thailand | SEA | 3.2 | 0.5 | 3 | 4 | Very High |
| Vietnam | SEA | 4.0 | 0.8 | 3 | 5 | Very High |
| Brazil | LatAm | 5.5 | 0.6 | 5 | 6 | Very High |
| Mexico | LatAm | 6.5 | 3.1 | 4 | 11 | Low |
| Indonesia | SEA | 7.0 | 0.8 | 6 | 8 | Very High |
| Morocco | Africa | 8.8 | 2.2 | 7 | 12 | Medium |
| Peru | LatAm | 9.0 | 0.0 | 9 | 9 | Very High |
| Colombia | LatAm | 9.8 | 2.1 | 7 | 12 | Medium |
| Ghana | Africa | 10.5 | 1.7 | 8 | 12 | High |
| Senegal | Africa | 11.2 | 1.5 | 10 | 13 | High |
| Philippines | SEA | 12.8 | 1.3 | 11 | 14 | High |
| Côte d'Ivoire | Africa | 14.5 | 1.3 | 13 | 16 | High |
| Bangladesh | SEA | 14.8 | 1.0 | 14 | 16 | Very High |
| Cambodia | SEA | 15.5 | 0.6 | 15 | 16 | Very High |
| Kenya | Africa | 17.5 | 0.6 | 17 | 18 | Very High |
| Tanzania | Africa | 18.0 | 0.8 | 17 | 19 | Very High |
| Ethiopia | Africa | 18.5 | 1.0 | 17 | 19 | Very High |
| Nigeria | Africa | 20.0 | 0.0 | 20 | 20 | Very High |
The stability insight: Malaysia and Chile hold rank #1 and #2 in every scenario with a standard deviation of zero. Peru also demonstrates perfect stability (rank #11 in all scenarios). Brazil is the most volatile market (std=3.1, Medium stability), reflecting its balanced but sensitive profile across dimensions.
5. Market Archetypes
5.1 Cluster Profiles
5.2 Archetype Definitions
| Archetype | N | Avg. Score | Score Range | Markets | Profile |
|---|---|---|---|---|---|
| Ready Markets | 2 | 78.4 | 70.3–86.6 | Malaysia, Chile | Strong across all dimensions. Governance scores above 98 place these markets in a category of their own (no structural barriers to entry). Decarbonisation scores reflect strong replacement opportunity. |
| Transition Markets | 15 | 49.3 | 41.2–60.6 | Thailand, Vietnam, Mexico, Indonesia, Ghana, Senegal, Côte d'Ivoire, Bangladesh, Peru, Morocco, Colombia, Philippines, Brazil, Cambodia, Tanzania | Strong demand signals and meaningful decarbonisation opportunity. Governance gaps (avg 49.9) or energy security constraints require mitigation before entry. Most markets have clear pathways to improvement. |
| Watch & Wait | 3 | 28.4 | 23.9–30.7 | Nigeria, Ethiopia, Kenya | Structural barriers (governance deficits avg 15.7, sub-critical market size, or infrastructure gaps) prevent near-term viability. Fundamental reform required before investment consideration. |
6. Scope and Limitations
This is a macro-level market screen, the first filter in a multi-phase process. It identifies which markets are worth investigating further. It does not replace:
- Technical assessment: Solar irradiance and wind capacity data, grid interconnection studies, site suitability, land tenure analysis…
- Financial modelling: Project-level NPV/IRR, power purchase agreement structure, tariff analysis, FX risk, hedging cost…
- Local intelligence: Regulatory nuance, permitting timelines, community dynamics, partnership landscape, competitive entry barriers…
For Ready Markets, the next step is to carry out a detailed technical and regulatory review focused on specific project locations.
7. Appendix
7.1 Complete Results
| Rank | Country | Region | Total Score | Market Opp. | Decarbonisation | Business Env. | Energy Sec. | Archetype |
|---|---|---|---|---|---|---|---|---|
| 1 | Malaysia | SEA | 81.5 | 66.1 | 85.0 | 99.0 | 73.1 | Ready Markets |
| 2 | Chile | LatAm | 77.5 | 60.2 | 79.0 | 100.0 | 77.7 | Ready Markets |
| 3 | Thailand | SEA | 62.4 | 34.6 | 77.1 | 62.7 | 94.8 | Transition Markets |
| 4 | Vietnam | SEA | 58.0 | 53.4 | 59.1 | 59.4 | 69.5 | Transition Markets |
| 5 | Mexico | LatAm | 54.2 | 35.6 | 72.5 | 37.0 | 69.2 | Transition Markets |
| 6 | Brazil | LatAm | 52.0 | 44.3 | 58.7 | 52.1 | 37.8 | Transition Markets |
| 7 | Indonesia | SEA | 49.8 | 36.7 | 57.7 | 58.9 | 21.1 | Transition Markets |
| 8 | Morocco | Africa | 48.9 | 25.8 | 55.8 | 57.1 | 92.1 | Transition Markets |
| 9 | Peru | LatAm | 48.2 | 35.7 | 51.8 | 57.8 | 52.4 | Transition Markets |
| 10 | Colombia | LatAm | 47.7 | 41.1 | 53.5 | 53.2 | 12.6 | Transition Markets |
| 11 | Ghana | Africa | 44.3 | 36.8 | 37.5 | 68.0 | 55.7 | Transition Markets |
| 12 | Senegal | Africa | 44.0 | 37.3 | 34.6 | 61.5 | 98.9 | Transition Markets |
| 13 | Philippines | SEA | 43.3 | 33.0 | 47.9 | 45.8 | 54.3 | Transition Markets |
| 14 | Bangladesh | SEA | 39.3 | 37.6 | 43.5 | 19.8 | 88.9 | Watch & Wait |
| 15 | Côte d'Ivoire | Africa | 37.7 | 42.8 | 28.5 | 40.8 | 77.3 | Watch & Wait |
| 16 | Cambodia | SEA | 36.6 | 33.6 | 35.9 | 32.6 | 77.2 | Watch & Wait |
| 17 | Kenya | Africa | 31.2 | 31.0 | 28.6 | 33.5 | 45.0 | Watch & Wait |
| 18 | Tanzania | Africa | 29.4 | 29.5 | 17.6 | 42.5 | 81.8 | Watch & Wait |
| 19 | Ethiopia | Africa | 26.9 | 42.8 | 16.8 | 13.4 | 76.0 | Watch & Wait |
| 20 | Nigeria | Africa | 16.8 | 23.2 | 17.2 | 0.1 | 40.5 | Watch & Wait |
7.2 Interactive Dashboard
A live interactive dashboard is available at:
Interactive Dashboard
The dashboard allows you to:
- Explore rankings across all 20 markets
- Test custom weight scenarios in real time
- View dimensional performance and archetype profiles
- Compare rank stability across investor profiles
7.3 Pipeline codes
Configuration
Show config code
# config.yaml
# ── Markets ────────────────────────────────────────────────────────────────
# 20 emerging markets across three regions. ISO-3 codes throughout.
countries:
SEA:
- VNM # Vietnam
- IDN # Indonesia
- PHL # Philippines
- THA # Thailand
- MYS # Malaysia
- KHM # Cambodia
- BGD # Bangladesh
Africa:
- KEN # Kenya
- NGA # Nigeria
- GHA # Ghana
- ETH # Ethiopia
- TZA # Tanzania
- SEN # Senegal
- CIV # Côte d'Ivoire
- MAR # Morocco
LatAm:
- BRA # Brazil
- MEX # Mexico
- COL # Colombia
- PER # Peru
- CHL # Chile
# ── Weight scenarios ───────────────────────────────────────────────────────
# Four investor philosophies tested in sensitivity analysis.
# Weights must sum to 1.0 per scenario.
# Energy Security is fixed at 0.05 across all scenarios —
# it acts as a tiebreaker rather than a primary driver.
weight_scenarios:
balanced:
# Neutral reference scenario; no single dimension dominates.
market_opportunity: 0.30
decarbonization_opportunity: 0.45
business_environment: 0.20
energy_security: 0.05
growth_focused:
# Favours large, fast-growing energy markets (commercial funds, utilities).
market_opportunity: 0.55
decarbonization_opportunity: 0.25
business_environment: 0.15
energy_security: 0.05
impact_first:
# Maximises fossil-fuel replacement potential (DFIs, climate-impact funds).
market_opportunity: 0.20
decarbonization_opportunity: 0.60
business_environment: 0.15
energy_security: 0.05
risk_averse:
# Prioritises institutional quality (pension funds, insurance-backed capital).
market_opportunity: 0.25
decarbonization_opportunity: 0.30
business_environment: 0.40
energy_security: 0.05
# ── Indicators ─────────────────────────────────────────────────────────────
# All sourced from World Bank Open Data API (2018–2023 average).
# direction: higher_is_better | lower_is_better
# weight: relative weight within dimension (must sum to 1.0 per dimension)
indicators:
market_opportunity:
gdp_growth_per_capita:
description: "GDP per capita growth rate — market expansion trajectory"
wb_indicator: "NY.GDP.PCAP.KD.ZG"
direction: higher_is_better
weight: 0.30
electricity_consumption:
description: "Electric power consumption per capita (kWh) — energy market size"
wb_indicator: "EG.USE.ELEC.KH.PC"
direction: higher_is_better
weight: 0.35
population_growth:
description: "Annual population growth rate — long-run demand driver"
wb_indicator: "SP.POP.GROW"
direction: higher_is_better
weight: 0.20
urbanization_rate:
description: "Urban population as % of total — urban energy demand concentration"
wb_indicator: "SP.URB.TOTL.IN.ZS"
direction: higher_is_better
weight: 0.15
decarbonization_opportunity:
energy_use_per_capita:
description: "Energy consumption per capita — proxy for carbon intensity"
wb_indicator: "EG.USE.PCAP.KG.OE"
direction: higher_is_better
weight: 0.35
fossil_fuel_consumption:
description: "Fossil fuel energy consumption (% of total) — replacement opportunity"
wb_indicator: "EG.USE.COMM.FO.ZS"
direction: higher_is_better
weight: 0.30
renewable_energy_gap:
description: "Renewable energy consumption (% of total) — lower = higher growth runway"
wb_indicator: "EG.FEC.RNEW.ZS"
direction: lower_is_better
weight: 0.25
electricity_access:
description: "Access to electricity (% of population) — infrastructure maturity signal"
wb_indicator: "EG.ELC.ACCS.ZS"
direction: higher_is_better
weight: 0.10
business_environment:
political_stability:
description: "Political Stability and Absence of Violence/Terrorism — project execution risk"
wb_indicator: "PV.EST"
direction: higher_is_better
weight: 0.30
regulatory_quality:
description: "Regulatory Quality — ability to formulate and implement sound policies"
wb_indicator: "RQ.EST"
direction: higher_is_better
weight: 0.30
rule_of_law:
description: "Rule of Law — contract enforcement and property rights"
wb_indicator: "RL.EST"
direction: higher_is_better
weight: 0.25
control_of_corruption:
description: "Control of Corruption — procurement integrity"
wb_indicator: "CC.EST"
direction: higher_is_better
weight: 0.15
energy_security:
energy_imports:
description: "Net energy imports (% of energy use) — import dependency"
wb_indicator: "EG.IMP.CONS.ZS"
direction: higher_is_better
weight: 0.60
alternative_nuclear_energy:
description: "Alternative and nuclear energy (% of total) — lower = higher renewable need"
wb_indicator: "EG.USE.COMM.CL.ZS"
direction: lower_is_better
weight: 0.40
# ── Data settings ──────────────────────────────────────────────────────────
data:
year_range:
start: 2018
end: 2023
# ── Clustering settings ────────────────────────────────────────────────────
# k=3 produces the three archetypes used throughout the report and dashboard:
# Ready Markets · Transition Markets · Watch & Wait
# Changing k here requires updating CLUSTER_LABELS in clustering.py.
clustering:
n_clusters: 3
random_state: 42
# ── Output paths ───────────────────────────────────────────────────────────
outputs:
scores_file: "outputs/market_scores.csv"
clusters_file: "outputs/market_clusters.csv"
sensitivity_file: "outputs/sensitivity_analysis.csv"Data ingestion
Show data ingestion code
"""
data_ingestion.py
-----------------
Fetches all World Bank indicators.
For each indicator, both raw yearly values and 6-year means are produced.
Outputs:
data/raw/<indicator>_raw.csv one file per indicator
data/raw/all_indicators_raw.csv combined multi-index file
data/raw/data_quality_metadata.csv completeness report
data/processed/indicators.csv 6-year means (input to preprocessing)
data/processed/indicators_metadata.json
"""
import json
import yaml
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import wbgapi as wb
from tenacity import retry, stop_after_attempt, wait_exponential
# Configuration
def load_config(config_path: str = "config/config.yaml") -> dict:
"""Load project configuration from YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def get_all_country_codes(config: dict) -> List[str]:
"""Flatten all regional country codes into a single list."""
return [
code
for region_countries in config["countries"].values()
for code in region_countries
]
# World Bank API
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _fetch_from_api(
indicator: str,
countries: List[str],
start_year: int,
end_year: int,
) -> pd.DataFrame:
"""
Call the World Bank API with automatic retry on failure.
Retries up to 3 times with exponential back-off (4–10 s) to handle
transient network errors or API rate limits.
"""
return wb.data.DataFrame(
indicator,
economy=countries,
time=range(start_year, end_year + 1),
skipBlanks=True,
labels=False,
)
def fetch_indicator(
wb_code: str,
indicator_name: str,
countries: List[str],
start_year: int,
end_year: int,
min_obs: int = 3,
) -> Tuple[pd.Series, pd.DataFrame]:
"""
Fetch one World Bank indicator and return its 6-year mean and raw yearly data.
Countries with fewer than `min_obs` non-null observations across the
period are marked NaN in the mean series — they will be imputed later
in preprocessing.py rather than silently averaged over sparse data.
Args:
wb_code: World Bank indicator code (e.g. 'NY.GDP.PCAP.KD.ZG')
indicator_name: Internal column name used throughout the pipeline
countries: ISO-3 country codes
start_year: First year of the fetch window
end_year: Last year of the fetch window
min_obs: Minimum non-null observations required to compute a mean
Returns:
mean_series: pd.Series — one value per country (6-year mean)
raw_df: pd.DataFrame — one column per year, one row per country
"""
print(f" Fetching {indicator_name} ({wb_code}) ...", end=" ")
try:
raw_df = _fetch_from_api(wb_code, countries, start_year, end_year)
# Normalise year column names: 'YR2018' → 2018
raw_df.columns = [int(str(c).replace("YR", "")) for c in raw_df.columns]
raw_df.index.name = "country_code"
# Compute mean; set to NaN where observations are too sparse
obs_count = raw_df.count(axis=1)
mean_series = raw_df.mean(axis=1)
mean_series[obs_count < min_obs] = float("nan")
mean_series.name = indicator_name
print(f"✓ ({raw_df.shape[1]} years, "
f"{obs_count.ge(min_obs).sum()}/{len(countries)} countries complete)")
return mean_series, raw_df
except Exception as exc:
print(f"✗ {exc}")
# Return empty placeholders so the pipeline can continue and
# report the gap rather than crash mid-run.
empty_mean = pd.Series(float("nan"), index=countries, name=indicator_name)
empty_raw = pd.DataFrame(
index=countries, columns=range(start_year, end_year + 1)
)
return empty_mean, empty_raw
# Main ingestion
def fetch_all_indicators(
config: dict,
export_raw: bool = True,
raw_data_dir: str = "data/raw",
) -> pd.DataFrame:
"""
Iterate over every indicator in config, fetch data, and return 6-year means.
Args:
config: Loaded project configuration
export_raw: Write per-indicator CSVs and a combined file to disk
raw_data_dir: Directory for raw yearly data files
Returns:
pd.DataFrame: rows = countries, columns = indicators (6-year means)
"""
countries = get_all_country_codes(config)
start_year = config["data"]["year_range"]["start"]
end_year = config["data"]["year_range"]["end"]
print(f"\n{'=' * 60}")
print(f"DATA INGESTION · {start_year}–{end_year} · {len(countries)} countries")
print(f"{'=' * 60}")
means_list = []
raw_data = {}
quality_records = []
for dimension, indicators in config["indicators"].items():
print(f"\n {dimension.upper().replace('_', ' ')}")
for indicator_name, props in indicators.items():
wb_code = props.get("wb_indicator")
# Skip indicators without a World Bank code (e.g. manual entries)
if not wb_code or props.get("source") == "manual_research":
continue
mean_series, raw_df = fetch_indicator(
wb_code, indicator_name, countries,
start_year, end_year, min_obs=3,
)
means_list.append(mean_series)
if export_raw:
raw_data[indicator_name] = raw_df
# Record completeness for the quality report
obs_count = raw_df.count(axis=1)
n_years = raw_df.shape[1]
quality_records.append({
"indicator": indicator_name,
"dimension": dimension,
"wb_code": wb_code,
"period": f"{start_year}–{end_year}",
"avg_completeness_%": round((obs_count / n_years * 100).mean(), 1),
"countries_complete": int((obs_count == n_years).sum()),
"countries_partial": int(((obs_count > 0) & (obs_count < n_years)).sum()),
"countries_missing": int((obs_count == 0).sum()),
})
# Export raw data
if export_raw and raw_data:
raw_dir = Path(raw_data_dir)
raw_dir.mkdir(parents=True, exist_ok=True)
for name, df in raw_data.items():
df.to_csv(raw_dir / f"{name}_raw.csv")
# Combined multi-index file (indicator × year)
combined = pd.concat(raw_data.values(), keys=raw_data.keys(), axis=1)
combined.columns.names = ["indicator", "year"]
combined.to_csv(raw_dir / "all_indicators_raw.csv")
# Data quality report
if quality_records:
pd.DataFrame(quality_records).to_csv(
raw_dir / "data_quality_metadata.csv", index=False
)
print(f"\n Raw data written to {raw_data_dir}/")
# Assemble means DataFrame
df = pd.concat(means_list, axis=1)
df.index.name = "country_code"
# Quality summary
if quality_records:
qdf = pd.DataFrame(quality_records)
print(f"\n{'=' * 60}")
print("DATA QUALITY SUMMARY")
print(f"{'=' * 60}")
for dim, grp in qdf.groupby("dimension"):
print(f" {dim:30s} {grp['avg_completeness_%'].mean():.1f}% complete")
print(f"\n Overall avg completeness : "
f"{qdf['avg_completeness_%'].mean():.1f}%")
print(f" Fully complete indicators: "
f"{(qdf['countries_missing'] == 0).sum()} / {len(qdf)}")
print(f"\n{'=' * 60}")
print(f"INGESTION COMPLETE")
print(f" {len(df)} countries × {len(df.columns)} indicators (6-year means)")
print(f" Next: python preprocessing.py")
print(f"{'=' * 60}\n")
return df
# Export
def export_processed_data(
df: pd.DataFrame,
output_path: str = "data/processed/indicators.csv",
) -> None:
"""
Write 6-year means to CSV and save a JSON metadata sidecar.
The metadata file is read by preprocessing.py to log the data period
and aggregation method in the preprocessing output.
"""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path)
metadata = {
"ingestion_timestamp": datetime.now().isoformat(),
"years": "see config",
"aggregation": "6-year mean",
"n_countries": len(df),
"n_indicators": len(df.columns),
"indicators": list(df.columns),
}
metadata_path = path.parent / "indicators_metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2, default=str)
print(f"✓ Means saved → {path}")
print(f"✓ Metadata saved → {metadata_path}")
# Validation (optional QA step)
def validate_means_against_raw(
df_means: pd.DataFrame,
indicator_name: str,
raw_data_dir: str = "data/raw",
) -> bool:
"""
Verify that the stored 6-year mean matches the mean recomputed from raw data.
Useful as a spot-check after ingestion to catch any column-alignment
or index mismatch issues introduced during concatenation.
"""
filepath = Path(raw_data_dir) / f"{indicator_name}_raw.csv"
if not filepath.exists():
print(f" No raw file for {indicator_name} — skipping validation")
return False
raw_df = pd.read_csv(filepath, index_col=0)
recalc = raw_df.mean(axis=1)
stored = df_means.get(indicator_name)
if stored is None:
print(f" {indicator_name} not found in means DataFrame")
return False
common = recalc.index.intersection(stored.index)
delta = (recalc.loc[common] - stored.loc[common]).abs()
passed = (delta < 1e-10).all()
if passed:
print(f" ✓ {indicator_name} validation passed")
else:
mismatches = common[delta >= 1e-10].tolist()
print(f" ✗ {indicator_name} mismatches: {mismatches}")
return passed
# Run
if __name__ == "__main__":
config = load_config()
df = fetch_all_indicators(config, export_raw=True)
export_processed_data(df)
# Spot-check the first successfully fetched indicator
first_col = next((c for c in df.columns if df[c].notna().any()), None)
if first_col:
print(f"\nValidating {first_col} ...")
validate_means_against_raw(df, first_col)Preprocessing
Show preprocessing code
"""
preprocessing.py
----------------
Cleans and normalises raw indicator data before scoring.
Pipeline:
1. Load 6-year means from data/processed/indicators.csv
2. Add region labels
3. Impute missing values (regional median, global fallback)
4. Winsorise outliers (5th / 95th percentile)
5. Flip direction of lower_is_better indicators
6. Min-max normalise all indicators to 0–100
7. Validate and export
"""
import json
import yaml
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List
# Configuration
def load_config(config_path: str = "config/config.yaml") -> dict:
"""Load project configuration from YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def get_indicator_directions(config: dict) -> Dict[str, str]:
"""Return {indicator_name: direction} for all indicators in config."""
return {
name: props["direction"]
for dim in config["indicators"].values()
for name, props in dim.items()
}
def get_all_indicator_names(config: dict) -> List[str]:
"""Return a flat list of all indicator column names."""
return [
name
for dim in config["indicators"].values()
for name in dim
]
# Data loading
def load_processed_data(
data_path: str = "data/processed/indicators.csv",
) -> pd.DataFrame:
"""
Load 6-year mean data produced by data_ingestion.py.
Raises FileNotFoundError if the file does not exist,
prompting the user to run data_ingestion.py first.
"""
path = Path(data_path)
if not path.exists():
raise FileNotFoundError(
f"Processed data not found at {path}. Run data_ingestion.py first."
)
df = pd.read_csv(path, index_col=0)
print(f"✓ Loaded {len(df)} countries × {len(df.columns)} indicators")
# Log data period from accompanying metadata if available
metadata_path = path.parent / "indicators_metadata.json"
if metadata_path.exists():
with open(metadata_path) as f:
meta = json.load(f)
print(f" Period: {meta.get('years', 'N/A')} | "
f"Aggregation: {meta.get('aggregation', 'N/A')}")
return df
# Region labelling
def add_region_labels(df: pd.DataFrame, config: dict) -> pd.DataFrame:
"""
Add a 'region' column derived from config country lists.
Skips if the column already exists.
"""
if "region" in df.columns:
return df
region_map = {
code: region
for region, codes in config["countries"].items()
for code in codes
}
df = df.copy()
df["region"] = df.index.map(region_map)
missing = df[df["region"].isna()].index.tolist()
if missing:
print(f" ⚠ No region mapping for: {missing}")
return df
# Missing value handling
def report_missing_values(df: pd.DataFrame, indicator_cols: List[str]) -> None:
"""Print a concise summary of missing values per indicator."""
print("\n[Missing Values]")
found = False
for col in indicator_cols:
if col not in df.columns:
continue
n = df[col].isna().sum()
if n > 0:
print(f" {col}: {n} missing ({n / len(df) * 100:.1f}%)")
found = True
if not found:
print(" ✓ No missing values")
def impute_missing_values(
df: pd.DataFrame, indicator_cols: List[str]
) -> pd.DataFrame:
"""
Fill missing values using regional median with a global median fallback.
Regional imputation is preferred because countries in the same region
(e.g. Southeast Asia) share similar energy infrastructure and
economic development patterns, making peers more informative than
the global sample for energy and emissions indicators.
"""
df = df.copy()
for col in indicator_cols:
if col not in df.columns or df[col].isna().sum() == 0:
continue
n_before = df[col].isna().sum()
# Primary: regional median
df[col] = df[col].fillna(df.groupby("region")[col].transform("median"))
# Fallback: global median (handles cases where the full region is missing)
n_after_regional = df[col].isna().sum()
df[col] = df[col].fillna(df[col].median())
print(f" {col}: {n_before} missing → "
f"{n_after_regional} after regional → 0 after global fallback")
return df
# Outlier handling
def winsorize(
df: pd.DataFrame,
indicator_cols: List[str],
lower: float = 0.05,
upper: float = 0.95,
) -> pd.DataFrame:
"""
Cap extreme values at the lower and upper percentiles.
Winsorising at the 5th / 95th percentile prevents outliers — such as
oil-rich economies with extreme energy import values or small island
states with atypical emissions — from compressing variation for the
majority of countries in the 0–100 normalised scale.
"""
df = df.copy()
for col in indicator_cols:
if col not in df.columns:
continue
lo, hi = df[col].quantile(lower), df[col].quantile(upper)
n_low = (df[col] < lo).sum()
n_high = (df[col] > hi).sum()
df[col] = df[col].clip(lo, hi)
if n_low > 0 or n_high > 0:
print(f" {col}: capped {n_low} below p{int(lower*100)}, "
f"{n_high} above p{int(upper*100)}")
return df
# Direction normalisation
def flip_lower_is_better(
df: pd.DataFrame, directions: Dict[str, str]
) -> pd.DataFrame:
"""
Invert indicators where a lower raw value signals greater opportunity.
xamples:
- modern_renewable_share: low current modern renewables = high growth runway
- electricity_imports: low imports = less grid dependency (higher energy security)
Multiplying by -1 ensures all indicators point the same direction
(higher normalised score = better opportunity) before aggregation.
"""
df = df.copy()
flipped = [
col for col, direction in directions.items()
if col in df.columns and direction == "lower_is_better"
]
for col in flipped:
df[col] *= -1
if flipped:
print(f" Inverted {len(flipped)} indicator(s): {flipped}")
return df
# Normalisation
def min_max_normalize(
df: pd.DataFrame, indicator_cols: List[str]
) -> pd.DataFrame:
"""
Scale all indicators to a 0–100 range.
Formula: score = (value - min) / (max - min) × 100
100 = best-performing country in the sample
0 = worst-performing country in the sample
50 = midpoint
This puts GDP growth (%), CO2 emissions (t/capita), and governance
indices (–2.5 to +2.5) on a common footing for weighted aggregation.
If all countries have the same value, the indicator is set to 50.
"""
df = df.copy()
for col in indicator_cols:
if col not in df.columns:
continue
lo, hi = df[col].min(), df[col].max()
if lo == hi:
df[col] = 50.0
else:
df[col] = ((df[col] - lo) / (hi - lo) * 100).round(2)
return df
# Validation
def validate_normalization(
df: pd.DataFrame, indicator_cols: List[str]
) -> bool:
"""
Assert all indicators are within [0, 100] after normalisation.
Returns True if all pass; logs failures otherwise.
"""
failures = []
for col in indicator_cols:
if col not in df.columns:
continue
lo, hi = df[col].min(), df[col].max()
if lo < -1e-10 or hi > 100 + 1e-10:
failures.append(f" ✗ {col}: [{lo:.2f}, {hi:.2f}] — out of bounds")
if failures:
print("\n[Validation — FAILED]")
for f in failures:
print(f)
return False
print(" ✓ All indicators within [0, 100]")
return True
# Export
def export_normalized_data(
df: pd.DataFrame,
output_path: str = "data/processed/normalized_indicators.csv",
) -> None:
"""Write normalised data to CSV, creating parent directories as needed."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path)
print(f"✓ Saved normalised data → {path}")
# Orchestrator
def run_preprocessing(
input_path: str = "data/processed/indicators.csv",
output_path: str = "data/processed/normalized_indicators.csv",
config_path: str = "config/config.yaml",
) -> pd.DataFrame:
"""
Run the full preprocessing pipeline and return a normalised DataFrame.
Args:
input_path: Path to 6-year mean data from data_ingestion.py
output_path: Destination for normalised indicators CSV
config_path: Path to project configuration YAML
Returns:
pd.DataFrame: All values in [0, 100], ready for scoring.py
"""
print("\n" + "=" * 60)
print("PREPROCESSING · 6-year means → normalised scores")
print("=" * 60)
config = load_config(config_path)
indicator_cols = get_all_indicator_names(config)
directions = get_indicator_directions(config)
df = load_processed_data(input_path)
df = add_region_labels(df, config)
print(f"\n[Step 1] Missing values")
report_missing_values(df, indicator_cols)
print(f"\n[Step 2] Imputation (regional median → global fallback)")
df = impute_missing_values(df, indicator_cols)
print(f"\n[Step 3] Winsorisation (p5 / p95)")
df = winsorize(df, indicator_cols)
print(f"\n[Step 4] Direction alignment (invert lower_is_better)")
df = flip_lower_is_better(df, directions)
print(f"\n[Step 5] Min-max normalisation → [0, 100]")
df = min_max_normalize(df, indicator_cols)
print(f"\n[Step 6] Validation")
validate_normalization(df, indicator_cols)
export_normalized_data(df, output_path)
print(f"\n{'=' * 60}")
print(f"PREPROCESSING COMPLETE")
print(f" {len(df)} countries × {len(indicator_cols)} indicators → [0, 100]")
print(f" Next: python scoring.py")
print(f"{'=' * 60}\n")
return df
# Run
if __name__ == "__main__":
df = run_preprocessing()
# Spot-check: first 5 numeric columns, first 5 rows
sample_cols = df.select_dtypes(include=[np.number]).columns[:5]
print("Sample normalised values (first 5 countries):")
print(df[sample_cols].head().round(1))
print("\nAverage normalised scores by region:")
print(df.groupby("region")[sample_cols].mean().round(1))Scoring
Show scoring code
"""
scoring.py
----------
Computes weighted market attractiveness scores from normalised 0–100 data.
Two outputs:
1. score_single_scenario() — scores under one weight scenario
2. run_sensitivity_analysis() — scores under all four scenarios,
testing whether recommendations hold across different assumptions
"""
import yaml
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Configuration
def load_config(config_path: str = "config/config.yaml") -> dict:
"""Load project configuration from YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def get_dimension_indicators(config: dict) -> Dict[str, List[str]]:
"""Return {dimension: [indicator_names]} for all dimensions in config."""
return {
dim: list(indicators.keys())
for dim, indicators in config["indicators"].items()
}
# Data loading
def load_normalized_data(
data_path: str = "data/processed/normalized_indicators.csv",
) -> pd.DataFrame:
"""
Load normalised 0–100 indicator data produced by preprocessing.py.
Raises FileNotFoundError if the file is absent, prompting the user
to run preprocessing.py first.
"""
path = Path(data_path)
if not path.exists():
raise FileNotFoundError(
f"Normalised data not found at {path}. Run preprocessing.py first."
)
df = pd.read_csv(path, index_col=0)
print(f"✓ Loaded {len(df)} countries × {len(df.columns)} indicators")
if "region" not in df.columns:
print(" ⚠ No 'region' column — regional breakdowns will be unavailable")
return df
# Dimension scoring
def compute_dimension_score(
df: pd.DataFrame,
dimension: str,
indicator_config: dict,
indicator_names: List[str],
) -> pd.Series:
"""
Compute one dimension score as a weighted average of its indicators.
If some indicators are missing from the data, their weights are
redistributed proportionally across the remaining indicators so the
dimension score remains on a 0–100 scale.
Args:
df: Normalised indicator data
dimension: Dimension key (e.g. 'market_opportunity')
indicator_config: Full config['indicators'] dict
indicator_names: Ordered list of indicator names for this dimension
Returns:
pd.Series: Dimension scores (0–100), indexed by country_code
"""
weighted = pd.Series(0.0, index=df.index)
total_weight = 0.0
for name in indicator_names:
props = indicator_config[dimension].get(name)
if props is None:
print(f" ⚠ '{name}' not in config — skipping")
continue
if name not in df.columns:
print(f" ⚠ '{name}' not in data — skipping")
continue
if df[name].isna().all():
print(f" ⚠ '{name}' is entirely NaN — skipping")
continue
weight = props["weight"]
weighted += df[name] * weight
total_weight += weight
if total_weight == 0:
print(f" ✗ All indicators missing for {dimension}")
return pd.Series(float("nan"), index=df.index)
# Redistribute weights if any indicators were skipped
if total_weight < sum(
indicator_config[dimension][n]["weight"]
for n in indicator_names
if n in indicator_config[dimension]
):
used = sum(
1 for n in indicator_names
if n in df.columns and not df[n].isna().all()
)
print(f" {dimension}: {used}/{len(indicator_names)} indicators "
f"(weights redistributed)")
return (weighted / total_weight).round(2)
# Single-scenario scoring
def score_single_scenario(
df: pd.DataFrame,
config: dict,
scenario: str = "balanced",
) -> pd.DataFrame:
"""
Score all markets under one weight scenario.
Returns a DataFrame with:
- score_<dimension> one column per dimension (0–100)
- total_score weighted sum of dimension scores (0–100)
- rank 1 = most attractive market
- region copied from df if present
Args:
df: Normalised indicator data
config: Project configuration
scenario: Key in config['weight_scenarios']
Returns:
pd.DataFrame sorted by rank ascending
"""
print(f"\n[Scoring] Scenario: {scenario}")
dim_weights = config["weight_scenarios"][scenario]
ind_config = config["indicators"]
dim_map = get_dimension_indicators(config)
scores = pd.DataFrame(index=df.index)
if "region" in df.columns:
scores["region"] = df["region"]
# Dimension scores
for dim, indicators in dim_map.items():
scores[f"score_{dim}"] = compute_dimension_score(
df, dim, ind_config, indicators
)
# Weighted total
scores["total_score"] = 0.0
total_weight_used = 0.0
for dim, weight in dim_weights.items():
col = f"score_{dim}"
if col in scores.columns:
scores["total_score"] += scores[col] * weight
total_weight_used += weight
# Renormalise if any dimension was unavailable
if 0 < total_weight_used < 1.0:
scores["total_score"] /= total_weight_used
print(f" Note: Renormalised — used {total_weight_used:.2f} of total weight")
scores["total_score"] = scores["total_score"].round(2)
scores["rank"] = (
scores["total_score"]
.rank(ascending=False, method="min", na_option="bottom")
.astype("Int64")
)
scores = scores.sort_values("rank")
print(f" Top 3: {scores.index[:3].tolist()}")
return scores
# Sensitivity analysis
def classify_stability(std: float) -> str:
"""Map rank standard deviation to a human-readable stability label."""
if std <= 1.5:
return "Very High"
if std <= 3.0:
return "High"
if std <= 5.0:
return "Medium"
return "Low"
def run_sensitivity_analysis(
df: pd.DataFrame, config: dict
) -> pd.DataFrame:
"""
Score all markets under every defined weight scenario and measure rank stability.
A low rank standard deviation means the country's position is robust to
changes in investor preferences — a strong signal for capital allocation.
Returns a DataFrame with:
- score_<scenario> total score per scenario
- rank_<scenario> rank per scenario
- avg_rank mean rank across scenarios
- rank_std standard deviation of ranks (stability proxy)
- rank_min / rank_max best and worst rank observed
- rank_range max − min
- stability Very High / High / Medium / Low
- region copied from df if present
Returns:
pd.DataFrame sorted by avg_rank ascending
"""
scenarios = list(config["weight_scenarios"].keys())
print(f"\n{'=' * 60}")
print(f"SENSITIVITY ANALYSIS · {len(scenarios)} scenarios: {scenarios}")
print(f"{'=' * 60}")
score_cols = {}
rank_cols = {}
for scenario in scenarios:
result = score_single_scenario(df, config, scenario)
score_cols[scenario] = result["total_score"]
rank_cols[scenario] = result["rank"]
# Assemble comparison table
comparison = pd.concat(
{f"score_{s}": score_cols[s] for s in scenarios}, axis=1
)
for s in scenarios:
comparison[f"rank_{s}"] = rank_cols[s]
rc = [f"rank_{s}" for s in scenarios]
comparison["avg_rank"] = comparison[rc].mean(axis=1).round(1)
comparison["rank_std"] = comparison[rc].std(axis=1).round(1)
comparison["rank_min"] = comparison[rc].min(axis=1)
comparison["rank_max"] = comparison[rc].max(axis=1)
comparison["rank_range"] = comparison["rank_max"] - comparison["rank_min"]
comparison["stability"] = comparison["rank_std"].apply(classify_stability)
if "region" in df.columns:
comparison["region"] = df["region"]
comparison = comparison.sort_values("avg_rank")
# Summary
print(f"\n[Stability Summary]")
for label, count in comparison["stability"].value_counts().items():
print(f" {label}: {count} ({count / len(comparison) * 100:.0f}%)")
print(f"\n[Top Stable Markets]")
stable = comparison[comparison["stability"].isin(["Very High", "High"])].head(5)
for code, row in stable.iterrows():
print(f" {code}: avg rank {row['avg_rank']:.1f} "
f"range {row['rank_range']:.0f} {row['stability']}")
return comparison
# Export
def export_scores(
df: pd.DataFrame,
output_path: str = "outputs/market_scores.csv",
) -> None:
"""Write single-scenario scores to CSV."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path)
print(f"✓ Scores saved → {path}")
def export_sensitivity(
df: pd.DataFrame,
output_path: str = "outputs/sensitivity_analysis.csv",
) -> None:
"""Write sensitivity analysis results to CSV."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path)
print(f"✓ Sensitivity saved → {path}")
# Orchestrator
def run_scoring(
processed_df: Optional[pd.DataFrame] = None,
data_path: str = "data/processed/normalized_indicators.csv",
config_path: str = "config/config.yaml",
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Run the full scoring pipeline and return both outputs.
Args:
processed_df: Pre-loaded normalised DataFrame (skips disk read if provided)
data_path: Path to normalised indicators CSV
config_path: Path to project configuration YAML
Returns:
(primary_scores, sensitivity_df)
primary_scores: Balanced-scenario scores, sorted by rank
sensitivity_df: Cross-scenario comparison with stability metrics
"""
print(f"\n{'=' * 60}")
print("SCORING · normalised data → market attractiveness scores")
print(f"{'=' * 60}")
config = load_config(config_path)
if processed_df is None:
processed_df = load_normalized_data(data_path)
primary_scores = score_single_scenario(processed_df, config, scenario="balanced")
print("\nTop 5 markets (balanced scenario):")
for code, row in primary_scores.head(5).iterrows():
region = row.get("region", "—")
print(f" {code} ({region}): {row['total_score']:.1f}")
sensitivity_df = run_sensitivity_analysis(processed_df, config)
export_scores(primary_scores)
export_sensitivity(sensitivity_df)
print(f"\n{'=' * 60}")
print("SCORING COMPLETE · Next: python clustering.py")
print(f"{'=' * 60}\n")
return primary_scores, sensitivity_df
# Run
if __name__ == "__main__":
primary_scores, sensitivity_df = run_scoring()
# Dimension score columns for the display table
dim_cols = [c for c in primary_scores.columns
if c.startswith("score_") and c != "score_total"]
display_cols = ["region", "total_score", "rank"] + dim_cols
display_cols = [c for c in display_cols if c in primary_scores.columns]
print("\nTop 10 markets — balanced scenario:")
print(primary_scores[display_cols].head(10).to_string())
print("\nStability breakdown:")
for label, count in sensitivity_df["stability"].value_counts().items():
print(f" {label}: {count} ({count / len(sensitivity_df) * 100:.0f}%)")Clustering
Show Clustering code
"""
clustering.py
-------------
Groups countries into market archetypes using K-Means on dimension scores.
Clusters are labelled by average total score (descending), producing
business-meaningful archetypes such as Ready Markets and Watch & Wait.
Steps:
1. Load dimension scores from scoring.py output
2. Standardise features (K-Means is distance-based)
3. Fit K-Means with k from config
4. Validate with silhouette score
5. Label clusters by average total score
6. Profile each cluster
7. Export results and optional visualisations
"""
import yaml
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
# Configuration
def load_config(config_path: str = "config/config.yaml") -> dict:
"""Load project configuration from YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
# Data loading
def load_scores_data(
scores_path: str = "outputs/market_scores.csv",
) -> pd.DataFrame:
"""
Load primary scores produced by scoring.py.
Raises FileNotFoundError if the file is absent,
prompting the user to run scoring.py first.
"""
path = Path(scores_path)
if not path.exists():
raise FileNotFoundError(
f"Scores not found at {path}. Run scoring.py first."
)
df = pd.read_csv(path, index_col=0)
dim_cols = _get_dim_cols(df)
print(f"✓ Loaded {len(df)} countries · dimensions: {dim_cols}")
return df
def _get_dim_cols(df: pd.DataFrame) -> List[str]:
"""Return dimension score column names, excluding total and rank."""
return [
c for c in df.columns
if c.startswith("score_") and "total" not in c and "rank" not in c
]
# Feature preparation
def prepare_features(scores_df: pd.DataFrame) -> pd.DataFrame:
"""
Extract dimension score columns as the clustering feature matrix.
Dimension scores (rather than raw indicators) are used because they
are already on a common 0–100 scale and capture the business-relevant
groupings. Missing values are filled with column means before clustering.
"""
dim_cols = _get_dim_cols(scores_df)
if not dim_cols:
raise ValueError(
"No dimension score columns found. "
"Expected columns like 'score_market_opportunity'. "
"Run scoring.py first."
)
features = scores_df[dim_cols].copy()
if features.isna().any().any():
print(" ⚠ Missing values in features — filling with column means")
features = features.fillna(features.mean())
print(f" Feature matrix: {features.shape[0]} × {features.shape[1]}")
return features
# Optional k-validation
def validate_k(
features_scaled: np.ndarray,
max_k: int = 6,
) -> pd.DataFrame:
"""
Compute silhouette score and inertia for k = 2 … max_k.
Use this as a diagnostic check when the configured k is uncertain.
The configured k is used for the final fit regardless of this output.
"""
print("\n[k-validation]")
records = []
for k in range(2, max_k + 1):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(features_scaled)
sil = silhouette_score(features_scaled, labels)
records.append({"k": k, "silhouette": round(sil, 3), "inertia": round(km.inertia_)})
print(f" k={k}: silhouette={sil:.3f} inertia={km.inertia_:.0f}")
return pd.DataFrame(records)
# Cluster labelling
# Ordered labels assigned to clusters ranked by descending average total score.
CLUSTER_LABELS = [
"Ready Markets",
"Transition Markets",
"Watch & Wait",
]
def label_clusters(df: pd.DataFrame, n_clusters: int) -> pd.DataFrame:
"""
Label clusters by average total score rank.
Because K-Means clusters on dimension profiles, a single outlier
indicator (e.g. Senegal's high Energy Security score) can pull a
country into a lower-scoring cluster despite a competitive total score.
Re-ranking clusters by their mean total score after fitting corrects
this without changing the clustering algorithm.
"""
df = df.copy()
# Rank clusters by mean total score — highest mean = best label
cluster_means = (
df.groupby('cluster_id')['total_score']
.mean()
.sort_values(ascending=False)
)
label_map = {
cid: CLUSTER_LABELS[rank]
for rank, cid in enumerate(cluster_means.index)
if rank < len(CLUSTER_LABELS)
}
df['cluster_label'] = df['cluster_id'].map(label_map)
# ── Sanity check: flag countries whose label contradicts their rank ──
# If a country's total score is higher than the max of the cluster
# below it, it may be mislabelled due to dimension profile outliers.
# In that case, override based on score boundaries.
boundaries = (
df.groupby('cluster_label')['total_score']
.agg(['min', 'max'])
.reindex(CLUSTER_LABELS)
)
def resolve_label(row):
score = row['total_score']
for label in CLUSTER_LABELS:
if label not in boundaries.index:
continue
lo = boundaries.loc[label, 'min']
hi = boundaries.loc[label, 'max']
if lo <= score <= hi:
return label
return row['cluster_label']
df['cluster_label'] = df.apply(resolve_label, axis=1)
print('\n[Cluster Labels]')
for label in CLUSTER_LABELS:
subset = df[df['cluster_label'] == label]
print(f' {label}: {len(subset)} countries '
f'(score {subset["total_score"].min():.1f}–'
f'{subset["total_score"].max():.1f})')
return df
def apply_threshold_labels(df: pd.DataFrame) -> pd.DataFrame:
"""
Override cluster labels with score-based thresholds.
This preserves the 3-tier structure (Ready / Transition / Watch & Wait)
while using the cleaner indicator set. Thresholds are:
- Ready Markets: total_score >= 70
- Watch & Wait: total_score < 40
- Transition: 40 <= total_score < 70
"""
df = df.copy()
df['cluster_label'] = 'Transition Markets' # default
df.loc[df['total_score'] >= 70, 'cluster_label'] = 'Ready Markets'
df.loc[df['total_score'] < 40, 'cluster_label'] = 'Watch & Wait'
print('\n[Threshold-based Labels]')
for label in ['Ready Markets', 'Transition Markets', 'Watch & Wait']:
subset = df[df['cluster_label'] == label]
if len(subset) > 0:
score_range = f"{subset['total_score'].min():.1f}–{subset['total_score'].max():.1f}"
print(f' {label}: {len(subset)} countries (score {score_range})')
else:
print(f' {label}: 0 countries')
return df
# Cluster profiling
def profile_clusters(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute mean and std of dimension scores and total score per cluster.
Returns a DataFrame indexed by cluster_label, sorted by
total_score_mean descending.
"""
dim_cols = _get_dim_cols(df)
agg_cols = dim_cols + ["total_score"]
profile = df.groupby("cluster_label")[agg_cols].agg(["mean", "std"]).round(1)
profile.columns = ["_".join(c) for c in profile.columns]
profile["n_countries"] = df.groupby("cluster_label").size()
return profile.sort_values("total_score_mean", ascending=False)
# Visualisations (optional)
def plot_cluster_radar(
df: pd.DataFrame,
output_path: str = "outputs/cluster_radar.png",
) -> None:
"""
Radar chart of average dimension scores per cluster.
Silently skipped if matplotlib is unavailable.
"""
try:
import matplotlib.pyplot as plt
except ImportError:
print(" matplotlib not installed — skipping radar chart")
return
dim_cols = _get_dim_cols(df)
dim_labels = [c.replace("score_", "").replace("_", " ").title() for c in dim_cols]
means = df.groupby("cluster_label")[dim_cols].mean()
n = len(dim_cols)
angles = np.linspace(0, 2 * np.pi, n, endpoint=False).tolist()
angles += angles[:1] # close the polygon
fig, ax = plt.subplots(figsize=(9, 7), subplot_kw={"projection": "polar"})
for cluster in means.index:
vals = means.loc[cluster].tolist() + [means.loc[cluster].iloc[0]]
ax.plot(angles, vals, "o-", linewidth=2, label=cluster)
ax.fill(angles, vals, alpha=0.08)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(dim_labels, size=10)
ax.set_ylim(0, 100)
ax.set_title("Cluster Profiles by Dimension", size=13, pad=20)
ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.0), fontsize=9)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close()
print(f"✓ Radar chart → {output_path}")
def plot_cluster_scatter(
df: pd.DataFrame,
output_path: str = "outputs/cluster_scatter.png",
) -> None:
"""
Scatter plot of total_score vs rank, coloured by cluster.
Annotates the top-3 and bottom-3 markets.
Silently skipped if matplotlib is unavailable.
"""
try:
import matplotlib.pyplot as plt
except ImportError:
print(" matplotlib not installed — skipping scatter plot")
return
fig, ax = plt.subplots(figsize=(11, 7))
for cluster in df["cluster_label"].unique():
sub = df[df["cluster_label"] == cluster]
ax.scatter(sub["rank"], sub["total_score"], label=cluster, s=90, alpha=0.75)
# Annotate top-3 and bottom-3 by rank
for code in df.head(3).index.tolist() + df.tail(3).index.tolist():
ax.annotate(
code,
(df.loc[code, "rank"], df.loc[code, "total_score"]),
xytext=(5, 5), textcoords="offset points", fontsize=8,
)
ax.invert_xaxis() # rank 1 on the right
ax.set_xlabel("Rank")
ax.set_ylabel("Total Score (0–100)")
ax.set_title("Market Clusters — Score vs Rank")
ax.grid(True, alpha=0.25)
ax.legend(bbox_to_anchor=(1.02, 1), loc="upper left", fontsize=9)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close()
print(f"✓ Scatter plot → {output_path}")
# Export
def export_clusters(
df: pd.DataFrame,
output_path: str = "outputs/market_clusters.csv",
) -> None:
"""Write the full clustered DataFrame (all dimension columns intact) to CSV."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path)
print(f"✓ Cluster assignments → {path} ({len(df)} countries)")
def export_cluster_profiles(
profile: pd.DataFrame,
output_path: str = "outputs/cluster_profiles.csv",
) -> None:
"""Write cluster mean/std profiles to CSV."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
profile.to_csv(path)
print(f"✓ Cluster profiles → {path}")
# Orchestrator
def run_clustering(
scores_df: Optional[pd.DataFrame] = None,
scores_path: str = "outputs/market_scores.csv",
config_path: str = "config/config.yaml",
run_k_validation: bool = False,
) -> pd.DataFrame:
"""
Run the full clustering pipeline and return the annotated DataFrame.
"""
print(f"\n{'=' * 60}")
print("CLUSTERING · dimension scores → market archetypes")
print(f"{'=' * 60}")
config = load_config(config_path)
n_clusters = config["clustering"]["n_clusters"]
random_state = config["clustering"]["random_state"]
if scores_df is None:
scores_df = load_scores_data(scores_path)
# Feature matrix
print("\n[Step 1] Preparing features")
features = prepare_features(scores_df)
# Standardise — K-Means is sensitive to scale
print("\n[Step 2] Standardising features")
features_scaled = StandardScaler().fit_transform(features)
# Optional k-sweep
if run_k_validation:
validate_k(features_scaled)
# Fit K-Means
print(f"\n[Step 3] K-Means k={n_clusters}")
km = KMeans(n_clusters=n_clusters, random_state=random_state, n_init=10)
clustered_df = scores_df.copy()
clustered_df["cluster_id"] = km.fit_predict(features_scaled)
# Silhouette validation
sil = silhouette_score(features_scaled, clustered_df["cluster_id"])
print(f"\n[Step 4] Silhouette score: {sil:.3f}", end=" ")
if sil >= 0.50:
print("(excellent separation)")
elif sil >= 0.30:
print("(good separation)")
elif sil >= 0.20:
print("(reasonable separation)")
else:
print("(weak separation — review dimensions or k)")
# Label clusters using score thresholds
print("\n[Step 5] Labelling clusters (threshold-based)")
clustered_df = apply_threshold_labels(clustered_df)
# Profile clusters
print("\n[Step 6] Cluster profiles")
profile = profile_clusters(clustered_df)
dim_cols = _get_dim_cols(clustered_df)
for label in profile.index:
n = int(profile.loc[label, "n_countries"])
mean = profile.loc[label, "total_score_mean"]
std = profile.loc[label, "total_score_std"]
print(f"\n {label} (n={n}, score {mean:.1f} ± {std:.1f})")
for dim in dim_cols:
dm = profile.loc[label, f"{dim}_mean"]
ds = profile.loc[label, f"{dim}_std"]
print(f" {dim.replace('score_','').replace('_',' '):30s} "
f"{dm:.1f} ± {ds:.1f}")
# Country listing per cluster
print("\n[Step 7] Countries by cluster")
for label, grp in clustered_df.groupby("cluster_label"):
grp = grp.sort_values("total_score", ascending=False)
region_col = "region" if "region" in grp.columns else None
print(f"\n {label} ({len(grp)} countries):")
for code, row in grp.iterrows():
region = f" {row['region']}" if region_col else ""
print(f" {code}{region}: {row['total_score']:.1f}")
# Export
print("\n[Step 8] Exporting")
export_clusters(clustered_df)
export_cluster_profiles(profile)
# Visualisations
print("\n[Step 9] Visualisations")
plot_cluster_radar(clustered_df)
plot_cluster_scatter(clustered_df)
print(f"\n{'=' * 60}")
print("CLUSTERING COMPLETE")
print(f"{'=' * 60}\n")
return clustered_df
# Run
if __name__ == "__main__":
df = run_clustering(run_k_validation=True)
display_cols = [c for c in ["region", "total_score", "rank", "cluster_label"]
if c in df.columns]
print("\nTop 10 markets:")
print(df[display_cols].head(15).to_string())
print("\nCluster distribution:")
for label, count in df["cluster_label"].value_counts().items():
print(f" {label}: {count} ({count / len(df) * 100:.0f}%)")Orchestration (main)
Show main code
"""
main.py
-------
Pipeline orchestrator.
Runs all modules in sequence, then writes a plain-text executive summary.
Steps:
1. Data Ingestion — World Bank API fetch, 6-year means + raw export
2. Preprocessing — imputation, winsorisation, 0–100 normalisation
3. Scoring — weighted dimension scores + sensitivity analysis
4. Clustering — K-Means market archetypes
Usage:
python main.py
"""
import sys
import time
import subprocess
import traceback
from pathlib import Path
from datetime import datetime
import pandas as pd
# ── Directory setup ────────────────────────────────────────────────────────
OUTPUT_DIRS = [
"data/raw",
"data/processed",
"outputs",
"outputs/charts",
]
def ensure_output_dirs() -> None:
"""Create all required output directories if they do not yet exist."""
for d in OUTPUT_DIRS:
Path(d).mkdir(parents=True, exist_ok=True)
print("✓ Output directories ready")
# ── Step runner ────────────────────────────────────────
def run_step(step_num: int, step_name: str, script_name: str) -> bool:
"""
Execute one pipeline module as a subprocess.
Scripts are expected in the src/ directory. stdout is streamed to the
terminal; stderr is shown only when it contains an error.
Returns True on success, False on failure.
"""
script_path = Path("src") / script_name
print(f"\n{'=' * 60}")
print(f"STEP {step_num} · {step_name}")
print(f"{'=' * 60}")
if not script_path.exists():
print(f" ✗ Script not found: {script_path}")
return False
t0 = time.time()
try:
result = subprocess.run(
[sys.executable, str(script_path)],
capture_output=True,
text=True,
)
if result.stdout:
print(result.stdout)
# Only surface stderr when there is a genuine error
if result.stderr and "Error" in result.stderr:
print(f" Warnings / errors:\n{result.stderr}")
elapsed = time.time() - t0
if result.returncode == 0:
print(f" ✓ Completed in {elapsed:.1f} s")
return True
print(f" ✗ Failed (exit code {result.returncode})")
if result.stderr:
print(result.stderr)
return False
except Exception as exc:
print(f" ✗ Could not execute {script_path}: {exc}")
return False
# ── Executive summary ─────────────────────────────────────────────────
def generate_executive_summary() -> None:
"""
Write a plain-text executive summary to outputs/executive_summary.txt.
Reads the cluster and sensitivity CSV files produced by the pipeline.
Skips gracefully if either file is absent.
"""
print(f"\n{'=' * 60}")
print("EXECUTIVE SUMMARY")
print(f"{'=' * 60}")
clusters_path = Path("outputs/market_clusters.csv")
sensitivity_path = Path("outputs/sensitivity_analysis.csv")
if not clusters_path.exists() or not sensitivity_path.exists():
print(" ⚠ Output files not found — skipping summary")
return
try:
df = pd.read_csv(clusters_path, index_col="country_code")
sens = pd.read_csv(sensitivity_path, index_col="country_code")
# Top 5
top5_lines = []
for i, (code, row) in enumerate(df.head(5).iterrows(), 1):
region = row.get("region", "—")
score = row["total_score"]
cluster = row.get("cluster_label", "—")
top5_lines.append(
f" {i}. {code} ({region})\n"
f" Score: {score:.1f}/100 Archetype: {cluster}"
)
# Archetypes
arch_lines = []
if "cluster_label" in df.columns:
stats = (
df.groupby("cluster_label")["total_score"]
.agg(["mean", "min", "max", "count"])
.round(1)
.sort_values("mean", ascending=False)
)
for label, row in stats.iterrows():
countries = df[df["cluster_label"] == label].index.tolist()
listed = ", ".join(countries[:5])
if len(countries) > 5:
listed += f" + {len(countries) - 5} more"
arch_lines.append(
f" {label} (n={int(row['count'])}, "
f"avg {row['mean']:.1f}, range {row['min']:.1f}–{row['max']:.1f})\n"
f" {listed}"
)
# Regional summary
region_lines = []
if "region" in df.columns:
reg = (
df.groupby("region")["total_score"]
.agg(["mean", "count"])
.round(1)
.sort_values("mean", ascending=False)
)
for region, row in reg.iterrows():
region_lines.append(
f" {region}: {row['mean']:.1f} avg "
f"({int(row['count'])} countries)"
)
# Stability summary (from sensitivity analysis)
stability_lines = []
if "stability" in sens.columns:
stab_counts = sens["stability"].value_counts()
stability_lines.append(" Stability across 4 investor scenarios:")
for label, count in stab_counts.items():
stability_lines.append(f" {label}: {count} markets ({count/len(sens)*100:.0f}%)")
# Find markets with perfect stability (rank_std == 0)
perfect_stable = sens[sens["rank_std"] == 0].index.tolist()
if perfect_stable:
stability_lines.append(f"\n Perfectly stable markets (rank unchanged across all scenarios):")
stability_lines.append(f" {', '.join(perfect_stable)}")
# Assemble
sep = "-" * 60
summary = "\n".join([
sep,
"RENEWABLE ENERGY INVESTMENT ANALYZER",
"Executive Summary — Emerging Markets Clean Power Opportunity",
sep,
f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Period: 2018–2023 (6-year means)",
f"Countries: {len(df)} across 3 regions",
f"Indicators: 13 (updated: modern renewables, fossil electricity share, fuel imports)",
"",
"Archetype thresholds: Ready Markets (≥70) · Transition (40–70) · Watch & Wait (<40)",
"",
sep,
"TOP 5 INVESTMENT OPPORTUNITIES",
sep,
*top5_lines,
"",
sep,
"MARKET ARCHETYPES",
sep,
*arch_lines,
"",
sep,
"REGIONAL SUMMARY",
sep,
*region_lines,
"",
sep,
"RANK STABILITY",
sep,
*stability_lines,
"",
sep,
"OUTPUT FILES",
sep,
" data/processed/indicators.csv 6-year means (13 indicators)",
" data/processed/normalized_indicators.csv 0-100 normalised scores",
" outputs/market_scores.csv Balanced scenario scores",
" outputs/sensitivity_analysis.csv Cross-scenario comparison (4 scenarios)",
" outputs/market_clusters.csv Cluster assignments (threshold-based)",
" outputs/executive_summary.txt This file",
sep,
])
out_path = Path("outputs/executive_summary.txt")
out_path.write_text(summary, encoding="utf-8")
print(f" ✓ Saved → {out_path}")
print(f"\n{summary[:800]}\n ...")
except Exception as exc:
print(f" ⚠ Could not generate summary: {exc}")
out_path = Path("outputs/executive_summary.txt")
out_path.write_text(summary, encoding="utf-8")
print(f" ✓ Saved → {out_path}")
print(f"\n{summary[:600]}\n ...")
except Exception as exc:
print(f" ⚠ Could not generate summary: {exc}")
# ── Pipeline ───────────────────────────────────────────────────────────────
# Ordered list of (step_number, display_name, script_filename)
PIPELINE_STEPS = [
(1, "Data Ingestion", "data_ingestion.py"),
(2, "Preprocessing", "preprocessing.py"),
(3, "Scoring", "scoring.py"),
(4, "Clustering", "clustering.py"),
]
def run_pipeline() -> None:
"""Execute all pipeline steps in order, then write the executive summary."""
t0 = time.time()
print(f"\n{'=' * 60}")
print("RENEWABLE ENERGY INVESTMENT ANALYZER")
print("Pipeline Orchestrator · 2018–2023 · 20 markets · 13 indicators")
print(f"{'=' * 60}")
ensure_output_dirs()
for step_num, step_name, script in PIPELINE_STEPS:
if not run_step(step_num, step_name, script):
print(f"\n Pipeline aborted at step {step_num} — {step_name}")
sys.exit(1)
generate_executive_summary()
elapsed = time.time() - t0
print(f"\n{'=' * 60}")
print(f"PIPELINE COMPLETE · {elapsed:.1f} s")
print(f"{'=' * 60}")
print("\n Run 'python app.py' to launch the interactive dashboard")
print(f"{'=' * 60}\n")
# ── Entry point ────────────────────────────────────────────────────────────
if __name__ == "__main__":
try:
run_pipeline()
except KeyboardInterrupt:
print("\n Pipeline interrupted by user")
sys.exit(1)
except Exception:
traceback.print_exc()
sys.exit(1)7.4 Reproducibility
The project repository is available on Github at the following link: Github repository
Installation:
# Clone the repository
git clone https://github.com/IbrahimaFikry/renewable-energy-analyzer.git
cd renewable-energy-analyzer
# Create and activate a virtual environment
python -m venv venv
source venv/bin/activate # macOS / Linux
venv\Scripts\activate # Windows
# Install dependencies
pip install -r requirements.txtPipeline execution:
# Run the full pipeline in one command
python main.py
# Or run individual modules in order
python data_ingestion.py # Fetch World Bank data via API
python preprocessing.py # Imputation, winsorisation, normalisation
python scoring.py # Weighted scoring across four scenarios
python clustering.py # K-Means archetype segmentation
# Launch interactive dashboard
python app.py Data provenance:
All indicators are retrieved programmatically from the World Bank Open Data API (wbgapi). No manual data entry at any stage. Full lineage is documented in data_ingestion.py.
Orchestration:
main.py runs the full pipeline end-to-end (data ingestion through clustering) and writes all outputs to the outputs/ directory. Individual modules can also be run independently for debugging or partial re-runs.
Conclusion
This analysis challenges the assumption that emerging markets offer a broad, diversified opportunity set for renewable energy investment. The data tells a more concentrated — and more actionable — story.