Source code for unfccc_di_api.unfccc_di_api

__license__ = """
Copyright 2020-2021 Potsdam-Institut für Klimafolgenforschung e.V.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import itertools
import logging
import typing
import zipfile

import pandas as pd
import pooch
import requests
import treelib
from fake_useragent import UserAgent

# mapping from gas as simple string to subscript-format used by UNFCCC DI API
GAS_MAPPING = {
    "CH4": "CH₄",
    "CO2": "CO₂",
    "N2O": "N₂O",
    "NF3": "NF₃",
    "SF6": "SF₆",
    "CF4": "CF₄",
    "C2F6": "C₂F₆",
    "c-C3F6": "c-C₃F₆",
    "C3F8": "C₃F₈",
    "c-C4F8": "c-C₄F₈",
    "C4F10": "C₄F₁₀",
    "C5F12": "C5F₁₂",  # this seems to be a typo in the UNFCCC API
    "C6F14": "C₆F₁₄",
    "C10F18": "C₁₀F₁₈",
    "NH3": "NH₃",
    "NOx": "NOₓ",
    "SO2": "SO₂",
}

# mapping of subscript notation to ASCII string
NORMALSCRIPT = "0123456789x"
SUBSCRIPT = "₀₁₂₃₄₅₆₇₈₉ₓ"
MAKE_ASCII = str.maketrans(SUBSCRIPT, NORMALSCRIPT)


[docs] class NoDataError(KeyError): """Query returned no data.""" def __init__( self, party_codes: typing.Sequence[str], category_ids: typing.Optional[typing.Sequence[int]] = None, classifications: typing.Optional[typing.Sequence[str]] = None, measure_ids: typing.Optional[typing.Sequence[int]] = None, gases: typing.Optional[typing.Sequence[str]] = None, ): query = f"party_codes={party_codes!r}" for optional_param, key in ( (category_ids, "category_ids"), (classifications, "classifications"), (measure_ids, "measure_ids"), (gases, "gases"), ): if optional_param is not None: query += f" {key}={optional_param!r}" KeyError.__init__(self, f"Query returned no data for: {query}")
[docs] class ZenodoReader: """Provides simplified unified access to the data provided by the Flexible Query API of the UNFCCC data access, via the dataset stored at zenodo. Essentially gives you the same API as the UNFCCCApiReader, but without complications due to the protection measures of the DI API. The advantage of using the ZenodoReader is that it works reliably without special measures, the disadvantage is that the data might be a bit older. Attributes ---------- parties : list[str] All parties as a 3-letter iso code. """ def __init__( self, *, url: str = "doi:10.5281/zenodo.10470862/parquet-only.zip", known_hash: str = "md5:52dd6cc26f1c2eb3f8204c6a78d2e7ba", ): self._zipfile_path = pooch.retrieve(url=url, known_hash=known_hash) self._zipfile = zipfile.ZipFile(self._zipfile_path) self.parties = [ x.split("/")[-1][:3] for x in self._zipfile.namelist() if x.endswith(".parquet") ] def _get_party_data(self, *, party: str) -> pd.DataFrame: fnames = [x for x in self._zipfile.namelist() if x.endswith(f"{party}.parquet")] try: fname = fnames[0] except IndexError: raise ValueError(f"Unknown party: {party}.") from None with self._zipfile.open(fname) as fd: return pd.read_parquet(fd)
[docs] def query( self, *, party_code: str, gases: typing.Optional[typing.Sequence[str]] = None, normalize_gas_names: bool = True, ) -> pd.DataFrame: """Query the dataset for party data. Parameters ---------- party_code : str ISO code of a party for which to query. For possible values, see :py:attr:`~ZenodoReader.parties`. gases : list of str, optional Limit the query to these gases. Accepts subscripts ("N₂O") as well as ASCII-strings ("N2O"). Default: query for all gases. Note that anything else than the default is not yet implemented and raises an error. Just request the whole dataset and filter using pandas' normal functionality. normalize_gas_names : bool, optional If :obj:`True`, return gases as ASCII strings ("N2O"). Else, return native UNFCCC notation ("N₂O"). Default: true. Note that anything else than the default is not implemented and raises an error. If you require unnormalized gas names, open an issue in the issue tracker at github so we can understand your use case. Returns ------- pandas.DataFrame """ if not normalize_gas_names: raise NotImplementedError("Non-normalized gases not yet implemented") if gases is not None: raise NotImplementedError("Specific gas lists not yet implemented") return self._get_party_data(party=party_code)
[docs] class UNFCCCApiReader: """Provides simplified unified access to the Flexible Query API of the UNFCCC data access for all parties. Essentially encapsulates https://di.unfccc.int/flex_non_annex1 and https://di.unfccc.int/flex_annex1 . Attributes ---------- parties : pandas.DataFrame All parties, with their ID, code, and full name. gases : pandas.DataFrame The available gases and their IDs. annex_one_reader : UNFCCCSingleCategoryApiReader The API reader object for Annex I parties. non_annex_one_reader : UNFCCCSingleCategoryApiReader The API reader object for non-Annex I parties. """ def __init__(self, *, base_url: str = "https://di.unfccc.int/api/"): """ Parameters ---------- base_url : str Location of the UNFCCC api. """ self.annex_one_reader = UNFCCCSingleCategoryApiReader( party_category="annexOne", base_url=base_url ) self.non_annex_one_reader = UNFCCCSingleCategoryApiReader( party_category="nonAnnexOne", base_url=base_url ) self.parties = pd.concat( [self.annex_one_reader.parties, self.non_annex_one_reader.parties] ).sort_index() self.gases = pd.concat( [self.annex_one_reader.gases, self.non_annex_one_reader.gases] ).sort_index() # drop duplicated gases self.gases = self.gases[~self.gases.index.duplicated(keep="first")]
[docs] def query( self, *, party_code: str, gases: typing.Optional[typing.Sequence[str]] = None, progress: bool = False, normalize_gas_names: bool = True, ) -> pd.DataFrame: """Query the UNFCCC for data. Parameters ---------- party_code : str ISO code of a party for which to query. For possible values, see :py:attr:`~UNFCCCApiReader.parties`. gases : list of str, optional Limit the query to these gases. For possible values, see :py:attr:`~UNFCCCApiReader.gases`. Accepts subscripts ("N₂O") as well as ASCII-strings ("N2O"). Default: query for all gases. progress : bool Display a progress bar. Requires the :py:mod:`tqdm` library. Default: false. normalize_gas_names : bool, optional If :obj:`True`, return gases as ASCII strings ("N2O"). Else, return native UNFCCC notation ("N₂O"). Default: true. Returns ------- pandas.DataFrame Notes ----- If you need more fine-grained control over which variables to query for, including restricting the query to specific measures, categories, or classifications or to query for multiple parties at once, please see the corresponding methods :py:meth:`UNFCCCApiReader.annex_one_reader.query` and :py:meth:`UNFCCCApiReader.non_annex_one_reader.query`. """ # select corresponding reader if party_code in self.annex_one_reader.parties["code"].values: reader = self.annex_one_reader elif party_code in self.non_annex_one_reader.parties["code"].values: reader = self.non_annex_one_reader else: help = "try `UNFCCCApiReader().parties` for a list of valid codes" raise ValueError(f"Unknown party `{party_code}`, {help}!") return reader.query( party_codes=[party_code], gases=gases, progress=progress, normalize_gas_names=normalize_gas_names, )
[docs] class UNFCCCSingleCategoryApiReader: """Provides access to the Flexible Query API of the UNFCCC data access for a single category, either annexOne or nonAnnexOne. Use this class if you want to do fine-grained queries for specific measures, categories, years, or classifications. Essentially encapsulates https://di.unfccc.int/flex_non_annex1 or https://di.unfccc.int/flex_annex1 . Attributes ---------- parties : pandas.DataFrame All parties in this category, with their ID, code, and full name. years : pandas.DataFrame All years for which data is available, mapping the ID to the year. category_tree : treelib.Tree The available categories and their relationships. Use :py:meth:`~UNFCCCSingleCategoryApiReader.show_category_hierarchy` for displaying the category tree. classifications : pandas.DataFrame All classifications and their IDs. measure_tree : treelib.Tree The available measures and their relationsips. Use :py:meth:`~UNFCCCSingleCategoryApiReader.show_measure_hierarchy` for displaying the measure tree. gases : pandas.DataFrame The available gases and their IDs. units : pandas.DataFrame The available units and their IDs. conversion_factors : pandas.DataFrame Conversion factors between units for the specified gases. variables : pandas.DataFrame The available variables with the corresponding category, classification, measure, gas, and unit. """ def __init__( self, *, party_category: str, base_url: str = "https://di.unfccc.int/api/" ): """ Parameters ---------- party_category : str Either ``nonAnnexOne`` or ``annexOne``. base_url : str Location of the UNFCCC api. """ self.base_url = base_url try: parties_raw = self._get(f"parties/{party_category}") except requests.JSONDecodeError as e: raise RuntimeError( "Access to the UNFCCC API denied - see" " https://github.com/pik-primap/unfccc_di_api#warning for solutions" ) from e parties_entries = [] for entry in parties_raw: if entry["categoryCode"] == party_category and entry["name"] != "Groups": parties_entries.append(entry["parties"]) if not parties_entries: raise ValueError( f"Could not find parties for the party_category {party_category!r}." ) self.parties = ( pd.DataFrame(itertools.chain(*parties_entries)) .set_index("id") .sort_index() .drop_duplicates() ) self._parties_dict = dict(self.parties["code"]) self.years = ( pd.DataFrame(self._get("years/single")[party_category]) .set_index("id") .sort_index() ) self._years_dict = dict(self.years["name"]) for i in self._years_dict: if self._years_dict[i].startswith("Last Inventory Year"): self._years_dict[i] = self._years_dict[i][-5:-1] # note that category names are not unique! category_hierarchy = self._get("dimension-instances/category")[party_category][ 0 ] self.category_tree = self._walk(category_hierarchy) self.classifications = ( pd.DataFrame( self._get("dimension-instances/classification")[party_category] ) .set_index("id") .sort_index() ) self._classifications_dict = dict(self.classifications["name"]) measure_hierarchy = self._get("dimension-instances/measure")[party_category] self.measure_tree = treelib.Tree() sr = self.measure_tree.create_node("__root__") for i in range(len(measure_hierarchy)): self._walk(measure_hierarchy[i], tree=self.measure_tree, parent=sr) self.gases = ( pd.DataFrame(self._get("dimension-instances/gas")[party_category]) .set_index("id") .sort_index() ) self._gases_dict = dict(self.gases["name"]) unit_info = self._get("conversion/fq") self.units = pd.DataFrame(unit_info["units"]).set_index("id").sort_index() self._units_dict = dict(self.units["name"]) self.conversion_factors = pd.DataFrame(unit_info[party_category]) # variable IDs are not unique variables_raw: typing.List[typing.Dict[str, int]] = self._get( f"variables/fq/{party_category}" ) self.variables = pd.DataFrame(variables_raw) self._variables_dict: typing.Dict[int, typing.List[typing.Dict[str, int]]] = {} for var in variables_raw: vid = var["variableId"] if vid in self._variables_dict: self._variables_dict[vid].append(var) else: self._variables_dict[vid] = [var] def _flexible_query( self, *, variable_ids: typing.Sequence[int], party_ids: typing.Sequence[int], year_ids: typing.Sequence[int], ) -> typing.List[dict]: if len(variable_ids) > 3000: logging.warning( "Your query parameters lead to a lot of variables selected at once. " "If the query fails, try restricting your query more." ) return self._post( "records/flexible-queries", json={ "variableIds": variable_ids, "partyIds": party_ids, "yearIds": year_ids, }, )
[docs] def query( self, *, party_codes: typing.Sequence[str], category_ids: typing.Optional[typing.Sequence[int]] = None, classifications: typing.Optional[typing.Sequence[str]] = None, measure_ids: typing.Optional[typing.Sequence[int]] = None, gases: typing.Optional[typing.Sequence[str]] = None, batch_size: int = 1000, progress: bool = False, normalize_gas_names: bool = True, ) -> pd.DataFrame: """Query the UNFCCC for data. Parameters ---------- party_codes : list of str List of ISO codes of parties for which to query. For possible values, see :py:attr:`~UNFCCCSingleCategoryApiReader.parties`. category_ids : list of int, optional List of category IDs to query. For possible values, see :py:meth:`~UNFCCCSingleCategoryApiReader.show_category_hierarchy()`. Default: query for all categories. classifications : list of str, optional List of classifications to query. For possible values, see :py:attr:`~UNFCCCSingleCategoryApiReader.classifications`. Default: query for all classifications. measure_ids : list of int, optional List of measure IDs to query. For possible values, see :py:meth:`~UNFCCCSingleCategoryApiReader.show_measure_hierarchy()`. Default: query for all measures. gases : list of str, optional Limit the query to these gases. For possible values, see :py:attr:`~UNFCCCApiReader.gases`. Accepts subscripts ("N₂O") as well as ASCII-strings ("N2O"). Default: query for all gases. batch_size : int, optional Number of variables to query in a single API query in the same batch to avoid internal server errors. Larger queries are split automatically. The default is 1000, which seems to work fine. progress : bool Display a progress bar. Requires the :py:mod:`tqdm` library. Default: false. normalize_gas_names : bool, optional If :obj:`True`, return gases as ASCII strings ("N2O"). Else, return native UNFCCC notation ("N₂O"). Default: true. Returns ------- pandas.DataFrame Notes ----- Further documentation about the meaning of parties, categories, classifications, measures and gases is available at the `UNFCCC documentation`_. .. _UNFCCC documentation: https://unfccc.int/process-and-meetings/\ transparency-and-reporting/greenhouse-gas-data/data-interface-help#eq-7 """ # format gases to subscript notation if gases is not None: gases = [GAS_MAPPING.get(g, g) for g in gases] party_ids = [] for code in party_codes: try: party_ids.append(self._name_id(self.parties, code, key="code")) except KeyError: help = ( "try `UNFCCCSingleCategoryApiReader.parties` for a list of" " valid codes" ) raise ValueError(f"Unknown party `{code}`, {help}!") from None # always query all years year_ids = list(self.years.index) classification_ids = ( None if classifications is None else [self._name_id(self.classifications, c) for c in classifications] ) gas_ids = ( None if gases is None else [self._name_id(self.gases, g) for g in gases] ) variable_ids = self._select_variable_ids( classification_ids, category_ids, measure_ids, gas_ids ) i = 0 raw_response = [] if progress: import tqdm pbar = tqdm.tqdm(total=len(variable_ids)) while i < len(variable_ids): batched_variable_ids = variable_ids[i : i + batch_size] i += batch_size batched_response = self._flexible_query( variable_ids=batched_variable_ids, party_ids=party_ids, year_ids=year_ids, ) raw_response += batched_response if progress: pbar.update(len(batched_variable_ids)) if progress: pbar.close() if not raw_response: raise NoDataError( party_codes=party_codes, category_ids=category_ids, classifications=classifications, measure_ids=measure_ids, gases=gases, ) df = self._parse_raw_answer( raw_response, classification_ids=classification_ids, category_ids=category_ids, measure_ids=measure_ids, gas_ids=gas_ids, ) if normalize_gas_names: for c in ["unit", "gas"]: df[c] = df[c].apply(lambda x: x.translate(MAKE_ASCII)) return df
@staticmethod def _id_in(vid: int, seq: typing.Optional[typing.Sequence[int]]): return seq is None or vid in seq def _parse_raw_answer( self, raw: typing.List[dict], classification_ids: typing.Optional[typing.Sequence[int]], category_ids: typing.Optional[typing.Sequence[int]], measure_ids: typing.Optional[typing.Sequence[int]], gas_ids: typing.Optional[typing.Sequence[int]], ) -> pd.DataFrame: data = [] for dp in raw: variables = self._variables_dict[dp["variableId"]] for variable in variables: if ( not self._id_in(variable["classificationId"], classification_ids) or not self._id_in(variable["categoryId"], category_ids) or not self._id_in(variable["measureId"], measure_ids) or not self._id_in(variable["gasId"], gas_ids) ): continue try: category = self.category_tree[variable["categoryId"]].tag except treelib.tree.NodeIDAbsentError: category = f'unknown category nr. {variable["categoryId"]}' try: measure = self.measure_tree[variable["measureId"]].tag except treelib.tree.NodeIDAbsentError: measure = f'unknown measure nr. {variable["measureId"]}' row = { "party": self._parties_dict[dp["partyId"]], "category": category, "classification": self._classifications_dict[ variable["classificationId"] ], "measure": measure, "gas": self._gases_dict[variable["gasId"]], "unit": self._units_dict[variable["unitId"]], "year": self._years_dict[dp["yearId"]], "numberValue": dp["numberValue"], "stringValue": dp["stringValue"], } data.append(row) df = pd.DataFrame(data) df = df.sort_values( ["party", "category", "classification", "measure", "gas", "unit", "year"], ) df.drop_duplicates(inplace=True) df.reset_index(inplace=True, drop=True) return df def _select_variable_ids( self, classification_ids: typing.Optional[typing.Sequence[int]], category_ids: typing.Optional[typing.Sequence[int]], measure_ids: typing.Optional[typing.Sequence[int]], gas_ids: typing.Optional[typing.Sequence[int]], ) -> typing.List[int]: # select variables from classification if classification_ids is None: classification_mask = pd.Series( data=[True] * len(self.variables), index=self.variables.index ) else: classification_mask = pd.Series( data=[False] * len(self.variables), index=self.variables.index ) for cid in classification_ids: classification_mask[self.variables["classificationId"] == cid] = True # select variables from categories if category_ids is None: category_mask = pd.Series( data=[True] * len(self.variables), index=self.variables.index ) else: category_mask = pd.Series( data=[False] * len(self.variables), index=self.variables.index ) for cid in category_ids: category_mask[self.variables["categoryId"] == cid] = True # select variables from measures if measure_ids is None: measure_mask = pd.Series( data=[True] * len(self.variables), index=self.variables.index ) else: measure_mask = pd.Series( data=[False] * len(self.variables), index=self.variables.index ) for mid in measure_ids: measure_mask[self.variables["measureId"] == mid] = True # select variables from gases if gas_ids is None: gas_mask = pd.Series( data=[True] * len(self.variables), index=self.variables.index ) else: gas_mask = pd.Series( data=[False] * len(self.variables), index=self.variables.index ) for gid in gas_ids: gas_mask[self.variables["gasId"] == gid] = True selected_variables = self.variables[ classification_mask & category_mask & measure_mask & gas_mask ] # need to explicitly convert to python integer, not int64 for json serialization return [int(x) for x in selected_variables["variableId"].unique()] @staticmethod def _name_id(df, name: str, key: str = "name") -> int: try: return int(df[df[key] == name].index[0]) except IndexError: raise KeyError(name) from None
[docs] def show_category_hierarchy(self) -> None: """Print the hierarchy of categories and their IDs.""" return self.category_tree.show(idhidden=False)
[docs] def show_measure_hierarchy(self) -> None: """Print the hierarchy of measures and their IDs.""" return self.measure_tree.show(idhidden=False)
@classmethod def _walk(cls, node: dict, tree: treelib.Tree = None, parent=None) -> treelib.Tree: if tree is None: tree = treelib.Tree() tree.create_node(tag=node["name"], identifier=node["id"], parent=parent) if "children" in node: for child in node["children"]: cls._walk(child, tree=tree, parent=node["id"]) return tree def _get(self, component: str) -> typing.Any: resp = requests.get( self.base_url + component, headers={"User-Agent": UserAgent().random} ) resp.raise_for_status() return resp.json() def _post(self, component: str, json: dict) -> typing.List[dict]: resp = requests.post( self.base_url + component, json=json, headers={"User-Agent": UserAgent().random}, ) resp.raise_for_status() return resp.json()