class SklearnExtension(Extension):
"""Connect scikit-learn to OpenML-Python.
The estimators which use this extension must be scikit-learn compatible,
i.e needs to be a subclass of sklearn.base.BaseEstimator".
"""
################################################################################################
# General setup
@classmethod
def can_handle_flow(cls, flow: OpenMLFlow) -> bool:
"""Check whether a given describes a scikit-learn estimator.
This is done by parsing the ``external_version`` field.
Parameters
----------
flow : OpenMLFlow
Returns
-------
bool
"""
return cls._is_sklearn_flow(flow)
@classmethod
def can_handle_model(cls, model: Any) -> bool:
"""Check whether a model is an instance of ``sklearn.base.BaseEstimator``.
Parameters
----------
model : Any
Returns
-------
bool
"""
return isinstance(model, sklearn.base.BaseEstimator)
@classmethod
def trim_flow_name( # noqa: C901
cls,
long_name: str,
extra_trim_length: int = 100,
_outer: bool = True, # noqa: FBT001, FBT002
) -> str:
"""Shorten generated sklearn flow name to at most ``max_length`` characters.
Flows are assumed to have the following naming structure:
``(model_selection)? (pipeline)? (steps)+``
and will be shortened to:
``sklearn.(selection.)?(pipeline.)?(steps)+``
e.g. (white spaces and newlines added for readability)
.. code ::
sklearn.pipeline.Pipeline(
columntransformer=sklearn.compose._column_transformer.ColumnTransformer(
numeric=sklearn.pipeline.Pipeline(
imputer=sklearn.preprocessing.imputation.Imputer,
standardscaler=sklearn.preprocessing.data.StandardScaler),
nominal=sklearn.pipeline.Pipeline(
simpleimputer=sklearn.impute.SimpleImputer,
onehotencoder=sklearn.preprocessing._encoders.OneHotEncoder)),
variancethreshold=sklearn.feature_selection.variance_threshold.VarianceThreshold,
svc=sklearn.svm.classes.SVC)
->
``sklearn.Pipeline(ColumnTransformer,VarianceThreshold,SVC)``
Parameters
----------
long_name : str
The full flow name generated by the scikit-learn extension.
extra_trim_length: int (default=100)
If the trimmed name would exceed `extra_trim_length` characters, additional trimming
of the short name is performed. This reduces the produced short name length.
There is no guarantee the end result will not exceed `extra_trim_length`.
_outer : bool (default=True)
For internal use only. Specifies if the function is called recursively.
Returns
-------
str
"""
def remove_all_in_parentheses(string: str) -> str:
string, removals = re.subn(r"\([^()]*\)", "", string)
while removals > 0:
string, removals = re.subn(r"\([^()]*\)", "", string)
return string
# Generally, we want to trim all hyperparameters, the exception to that is for model
# selection, as the `estimator` hyperparameter is very indicative of what is in the flow.
# So we first trim name of the `estimator` specified in mode selection. For reference, in
# the example below, we want to trim `sklearn.tree.tree.DecisionTreeClassifier`, and
# keep it in the final trimmed flow name:
# sklearn.pipeline.Pipeline(Imputer=sklearn.preprocessing.imputation.Imputer,
# VarianceThreshold=sklearn.feature_selection.variance_threshold.VarianceThreshold, # noqa: ERA001, E501
# Estimator=sklearn.model_selection._search.RandomizedSearchCV(estimator=
# sklearn.tree.tree.DecisionTreeClassifier))
if "sklearn.model_selection" in long_name:
start_index = long_name.index("sklearn.model_selection")
estimator_start = (
start_index + long_name[start_index:].index("estimator=") + len("estimator=")
)
model_select_boilerplate = long_name[start_index:estimator_start]
# above is .g. "sklearn.model_selection._search.RandomizedSearchCV(estimator="
model_selection_class = model_select_boilerplate.split("(")[0].split(".")[-1]
# Now we want to also find and parse the `estimator`, for this we find the closing
# parenthesis to the model selection technique:
closing_parenthesis_expected = 1
for char in long_name[estimator_start:]:
if char == "(":
closing_parenthesis_expected += 1
if char == ")":
closing_parenthesis_expected -= 1
if closing_parenthesis_expected == 0:
break
_end: int = estimator_start + len(long_name[estimator_start:]) - 1
model_select_pipeline = long_name[estimator_start:_end]
trimmed_pipeline = cls.trim_flow_name(model_select_pipeline, _outer=False)
_, trimmed_pipeline = trimmed_pipeline.split(".", maxsplit=1) # trim module prefix
model_select_short = f"sklearn.{model_selection_class}[{trimmed_pipeline}]"
name = long_name[:start_index] + model_select_short + long_name[_end + 1 :]
else:
name = long_name
module_name = long_name.split(".")[0]
short_name = module_name + ".{}"
if name.startswith("sklearn.pipeline"):
full_pipeline_class, pipeline = name[:-1].split("(", maxsplit=1)
pipeline_class = full_pipeline_class.split(".")[-1]
# We don't want nested pipelines in the short name, so we trim all complicated
# subcomponents, i.e. those with parentheses:
pipeline = remove_all_in_parentheses(pipeline)
# then the pipeline steps are formatted e.g.:
# step1name=sklearn.submodule.ClassName,step2name...
components = [component.split(".")[-1] for component in pipeline.split(",")]
pipeline = "{}({})".format(pipeline_class, ",".join(components))
if len(short_name.format(pipeline)) > extra_trim_length:
pipeline = f"{pipeline_class}(...,{components[-1]})"
else:
# Just a simple component: e.g. sklearn.tree.DecisionTreeClassifier
pipeline = remove_all_in_parentheses(name).split(".")[-1]
if not _outer:
# Anything from parenthesis in inner calls should not be culled, so we use brackets
pipeline = pipeline.replace("(", "[").replace(")", "]")
else:
# Square brackets may be introduced with nested model_selection
pipeline = pipeline.replace("[", "(").replace("]", ")")
return short_name.format(pipeline)
@classmethod
def _min_dependency_str(cls, sklearn_version: str) -> str:
"""Returns a string containing the minimum dependencies for the sklearn version passed.
Parameters
----------
sklearn_version : str
A version string of the xx.xx.xx
Returns
-------
str
"""
openml_major_version = int(LooseVersion(openml.__version__).version[1])
# This explicit check is necessary to support existing entities on the OpenML servers
# that used the fixed dependency string (in the else block)
if openml_major_version > 11:
# OpenML v0.11 onwards supports sklearn>=0.24
# assumption: 0.24 onwards sklearn should contain a _min_dependencies.py file with
# variables declared for extracting minimum dependency for that version
if LooseVersion(sklearn_version) >= "0.24":
from sklearn import _min_dependencies as _mindep
dependency_list = {
"numpy": f"{_mindep.NUMPY_MIN_VERSION}",
"scipy": f"{_mindep.SCIPY_MIN_VERSION}",
"joblib": f"{_mindep.JOBLIB_MIN_VERSION}",
"threadpoolctl": f"{_mindep.THREADPOOLCTL_MIN_VERSION}",
}
elif LooseVersion(sklearn_version) >= "0.23":
dependency_list = {
"numpy": "1.13.3",
"scipy": "0.19.1",
"joblib": "0.11",
"threadpoolctl": "2.0.0",
}
if LooseVersion(sklearn_version).version[2] == 0:
dependency_list.pop("threadpoolctl")
elif LooseVersion(sklearn_version) >= "0.21":
dependency_list = {"numpy": "1.11.0", "scipy": "0.17.0", "joblib": "0.11"}
elif LooseVersion(sklearn_version) >= "0.19":
dependency_list = {"numpy": "1.8.2", "scipy": "0.13.3"}
else:
dependency_list = {"numpy": "1.6.1", "scipy": "0.9"}
else:
# this is INCORRECT for sklearn versions >= 0.19 and < 0.24
# given that OpenML has existing flows uploaded with such dependency information,
# we change no behaviour for older sklearn version, however from 0.24 onwards
# the dependency list will be accurately updated for any flow uploaded to OpenML
dependency_list = {"numpy": "1.6.1", "scipy": "0.9"}
sklearn_dep = f"sklearn=={sklearn_version}"
dep_str = "\n".join([f"{k}>={v}" for k, v in dependency_list.items()])
return "\n".join([sklearn_dep, dep_str])
################################################################################################
# Methods for flow serialization and de-serialization
def flow_to_model(
self,
flow: OpenMLFlow,
initialize_with_defaults: bool = False, # noqa: FBT001, FBT002
strict_version: bool = True, # noqa: FBT001, FBT002
) -> Any:
"""Initializes a sklearn model based on a flow.
Parameters
----------
flow : mixed
the object to deserialize (can be flow object, or any serialized
parameter value that is accepted by)
initialize_with_defaults : bool, optional (default=False)
If this flag is set, the hyperparameter values of flows will be
ignored and a flow with its defaults is returned.
strict_version : bool, default=True
Whether to fail if version requirements are not fulfilled.
Returns
-------
mixed
"""
return self._deserialize_sklearn(
flow,
initialize_with_defaults=initialize_with_defaults,
strict_version=strict_version,
)
def _deserialize_sklearn( # noqa: PLR0915, C901, PLR0913, PLR0912
self,
o: Any,
components: dict | None = None,
initialize_with_defaults: bool = False, # noqa: FBT001, FBT002
recursion_depth: int = 0,
strict_version: bool = True, # noqa: FBT002, FBT001
) -> Any:
"""Recursive function to deserialize a scikit-learn flow.
This function inspects an object to deserialize and decides how to do so. This function
delegates all work to the respective functions to deserialize special data structures etc.
This function works on everything that has been serialized to OpenML: OpenMLFlow,
components (which are flows themselves), functions, hyperparameter distributions (for
random search) and the actual hyperparameter values themselves.
Parameters
----------
o : mixed
the object to deserialize (can be flow object, or any serialized
parameter value that is accepted by)
components : Optional[dict]
Components of the current flow being de-serialized. These will not be used when
de-serializing the actual flow, but when de-serializing a component reference.
initialize_with_defaults : bool, optional (default=False)
If this flag is set, the hyperparameter values of flows will be
ignored and a flow with its defaults is returned.
recursion_depth : int
The depth at which this flow is called, mostly for debugging
purposes
strict_version : bool, default=True
Whether to fail if version requirements are not fulfilled.
Returns
-------
mixed
"""
logger.info(
"-{} flow_to_sklearn START o={}, components={}, init_defaults={}".format(
"-" * recursion_depth, o, components, initialize_with_defaults
),
)
depth_pp = recursion_depth + 1 # shortcut var, depth plus plus
# First, we need to check whether the presented object is a json string.
# JSON strings are used to encoder parameter values. By passing around
# json strings for parameters, we make sure that we can flow_to_sklearn
# the parameter values to the correct type.
if isinstance(o, str):
with contextlib.suppress(JSONDecodeError):
o = json.loads(o)
if isinstance(o, dict):
# Check if the dict encodes a 'special' object, which could not
# easily converted into a string, but rather the information to
# re-create the object were stored in a dictionary.
if "oml-python:serialized_object" in o:
serialized_type = o["oml-python:serialized_object"]
value = o["value"]
if serialized_type == "type":
rval = self._deserialize_type(value)
elif serialized_type == "rv_frozen":
rval = self._deserialize_rv_frozen(value)
elif serialized_type == "function":
rval = self._deserialize_function(value)
elif serialized_type in (COMPOSITION_STEP_CONSTANT, COMPONENT_REFERENCE):
if serialized_type == COMPOSITION_STEP_CONSTANT:
pass
elif serialized_type == COMPONENT_REFERENCE:
value = self._deserialize_sklearn(
value,
recursion_depth=depth_pp,
strict_version=strict_version,
)
else:
raise NotImplementedError(serialized_type)
assert components is not None # Necessary for mypy
step_name = value["step_name"]
key = value["key"]
component = self._deserialize_sklearn(
components[key],
initialize_with_defaults=initialize_with_defaults,
recursion_depth=depth_pp,
strict_version=strict_version,
)
# The component is now added to where it should be used
# later. It should not be passed to the constructor of the
# main flow object.
del components[key]
if step_name is None:
rval = component
elif "argument_1" not in value:
rval = (step_name, component)
else:
rval = (step_name, component, value["argument_1"])
elif serialized_type == "cv_object":
rval = self._deserialize_cross_validator(
value,
recursion_depth=recursion_depth,
strict_version=strict_version,
)
else:
raise ValueError("Cannot flow_to_sklearn %s" % serialized_type)
else:
rval = OrderedDict(
(
self._deserialize_sklearn(
o=key,
components=components,
initialize_with_defaults=initialize_with_defaults,
recursion_depth=depth_pp,
strict_version=strict_version,
),
self._deserialize_sklearn(
o=value,
components=components,
initialize_with_defaults=initialize_with_defaults,
recursion_depth=depth_pp,
strict_version=strict_version,
),
)
for key, value in sorted(o.items())
)
elif isinstance(o, (list, tuple)):
rval = [
self._deserialize_sklearn(
o=element,
components=components,
initialize_with_defaults=initialize_with_defaults,
recursion_depth=depth_pp,
strict_version=strict_version,
)
for element in o
]
if isinstance(o, tuple):
rval = tuple(rval)
elif isinstance(o, (bool, int, float, str)) or o is None:
rval = o
elif isinstance(o, OpenMLFlow):
if not self._is_sklearn_flow(o):
raise ValueError("Only sklearn flows can be reinstantiated")
rval = self._deserialize_model(
flow=o,
keep_defaults=initialize_with_defaults,
recursion_depth=recursion_depth,
strict_version=strict_version,
)
else:
raise TypeError(o)
logger.info(
"-{} flow_to_sklearn END o={}, rval={}".format("-" * recursion_depth, o, rval)
)
return rval
def model_to_flow(self, model: Any) -> OpenMLFlow:
"""Transform a scikit-learn model to a flow for uploading it to OpenML.
Parameters
----------
model : Any
Returns
-------
OpenMLFlow
"""
# Necessary to make pypy not complain about all the different possible return types
return self._serialize_sklearn(model)
def _serialize_sklearn(self, o: Any, parent_model: Any | None = None) -> Any: # noqa: PLR0912, C901
rval = None # type: Any
# TODO: assert that only on first recursion lvl `parent_model` can be None
if self.is_estimator(o):
# is the main model or a submodel
rval = self._serialize_model(o)
elif (
isinstance(o, (list, tuple))
and len(o) == 2
and o[1] in SKLEARN_PIPELINE_STRING_COMPONENTS
and isinstance(parent_model, sklearn.pipeline._BaseComposition)
):
rval = o
elif isinstance(o, (list, tuple)):
# TODO: explain what type of parameter is here
rval = [self._serialize_sklearn(element, parent_model) for element in o]
if isinstance(o, tuple):
rval = tuple(rval)
elif isinstance(o, SIMPLE_TYPES) or o is None:
if isinstance(o, tuple(SIMPLE_NUMPY_TYPES)):
o = o.item() # type: ignore
# base parameter values
rval = o
elif isinstance(o, dict):
# TODO: explain what type of parameter is here
if not isinstance(o, OrderedDict):
o = OrderedDict(sorted(o.items()))
rval = OrderedDict()
for key, value in o.items():
if not isinstance(key, str):
raise TypeError(
"Can only use string as keys, you passed "
f"type {type(key)} for value {key!s}.",
)
_key = self._serialize_sklearn(key, parent_model)
rval[_key] = self._serialize_sklearn(value, parent_model)
elif isinstance(o, type):
# TODO: explain what type of parameter is here
rval = self._serialize_type(o)
elif isinstance(o, scipy.stats.distributions.rv_frozen):
rval = self._serialize_rv_frozen(o)
# This only works for user-defined functions (and not even partial).
# I think this is exactly what we want here as there shouldn't be any
# built-in or functool.partials in a pipeline
elif inspect.isfunction(o):
# TODO: explain what type of parameter is here
rval = self._serialize_function(o)
elif self._is_cross_validator(o):
# TODO: explain what type of parameter is here
rval = self._serialize_cross_validator(o)
else:
raise TypeError(o, type(o))
return rval
def get_version_information(self) -> list[str]:
"""List versions of libraries required by the flow.
Libraries listed are ``Python``, ``scikit-learn``, ``numpy`` and ``scipy``.
Returns
-------
List
"""
# This can possibly be done by a package such as pyxb, but I could not get
# it to work properly.
import numpy
import scipy
import sklearn
major, minor, micro, _, _ = sys.version_info
python_version = "Python_{}.".format(".".join([str(major), str(minor), str(micro)]))
sklearn_version = f"Sklearn_{sklearn.__version__}."
numpy_version = f"NumPy_{numpy.__version__}." # type: ignore
scipy_version = f"SciPy_{scipy.__version__}."
return [python_version, sklearn_version, numpy_version, scipy_version]
def create_setup_string(self, model: Any) -> str: # noqa: ARG002
"""Create a string which can be used to reinstantiate the given model.
Parameters
----------
model : Any
Returns
-------
str
"""
return " ".join(self.get_version_information())
def _is_cross_validator(self, o: Any) -> bool:
return isinstance(o, sklearn.model_selection.BaseCrossValidator)
@classmethod
def _is_sklearn_flow(cls, flow: OpenMLFlow) -> bool:
sklearn_dependency = isinstance(flow.dependencies, str) and "sklearn" in flow.dependencies
sklearn_as_external = isinstance(flow.external_version, str) and (
flow.external_version.startswith("sklearn==") or ",sklearn==" in flow.external_version
)
return sklearn_dependency or sklearn_as_external
def _get_sklearn_description(self, model: Any, char_lim: int = 1024) -> str:
r"""Fetches the sklearn function docstring for the flow description
Retrieves the sklearn docstring available and does the following:
* If length of docstring <= char_lim, then returns the complete docstring
* Else, trims the docstring till it encounters a 'Read more in the :ref:'
* Or till it encounters a 'Parameters\n----------\n'
The final string returned is at most of length char_lim with leading and
trailing whitespaces removed.
Parameters
----------
model : sklearn model
char_lim : int
Specifying the max length of the returned string.
OpenML servers have a constraint of 1024 characters for the 'description' field.
Returns
-------
str
"""
def match_format(s):
return "{}\n{}\n".format(s, len(s) * "-")
s = inspect.getdoc(model)
if s is None:
return ""
try:
# trim till 'Read more'
pattern = "Read more in the :ref:"
index = s.index(pattern)
s = s[:index]
# trimming docstring to be within char_lim
if len(s) > char_lim:
s = f"{s[: char_lim - 3]}..."
return s.strip()
except ValueError:
logger.warning(
"'Read more' not found in descriptions. "
"Trying to trim till 'Parameters' if available in docstring.",
)
try:
# if 'Read more' doesn't exist, trim till 'Parameters'
pattern = "Parameters"
index = s.index(match_format(pattern))
except ValueError:
# returning full docstring
logger.warning("'Parameters' not found in docstring. Omitting docstring trimming.")
index = len(s)
s = s[:index]
# trimming docstring to be within char_lim
if len(s) > char_lim:
s = f"{s[: char_lim - 3]}..."
return s.strip()
def _extract_sklearn_parameter_docstring(self, model) -> None | str:
"""Extracts the part of sklearn docstring containing parameter information
Fetches the entire docstring and trims just the Parameter section.
The assumption is that 'Parameters' is the first section in sklearn docstrings,
followed by other sections titled 'Attributes', 'See also', 'Note', 'References',
appearing in that order if defined.
Returns a None if no section with 'Parameters' can be found in the docstring.
Parameters
----------
model : sklearn model
Returns
-------
str, or None
"""
def match_format(s):
return "{}\n{}\n".format(s, len(s) * "-")
s = inspect.getdoc(model)
if s is None:
return None
try:
index1 = s.index(match_format("Parameters"))
except ValueError as e:
# when sklearn docstring has no 'Parameters' section
logger.warning("{} {}".format(match_format("Parameters"), e))
return None
headings = ["Attributes", "Notes", "See also", "Note", "References"]
for h in headings:
try:
# to find end of Parameters section
index2 = s.index(match_format(h))
break
except ValueError:
logger.warning(f"{h} not available in docstring")
continue
else:
# in the case only 'Parameters' exist, trim till end of docstring
index2 = len(s)
s = s[index1:index2]
return s.strip()
def _extract_sklearn_param_info(self, model, char_lim=1024) -> None | dict:
"""Parses parameter type and description from sklearn dosctring
Parameters
----------
model : sklearn model
char_lim : int
Specifying the max length of the returned string.
OpenML servers have a constraint of 1024 characters string fields.
Returns
-------
Dict, or None
"""
docstring = self._extract_sklearn_parameter_docstring(model)
if docstring is None:
# when sklearn docstring has no 'Parameters' section
return None
n = re.compile("[.]*\n", flags=IGNORECASE)
lines = n.split(docstring)
p = re.compile("[a-z0-9_ ]+ : [a-z0-9_']+[a-z0-9_ ]*", flags=IGNORECASE)
# The above regular expression is designed to detect sklearn parameter names and type
# in the format of [variable_name][space]:[space][type]
# The expectation is that the parameter description for this detected parameter will
# be all the lines in the docstring till the regex finds another parameter match
# collecting parameters and their descriptions
description = [] # type: List
for s in lines:
param = p.findall(s)
if param != []:
# a parameter definition is found by regex
# creating placeholder when parameter found which will be a list of strings
# string descriptions will be appended in subsequent iterations
# till another parameter is found and a new placeholder is created
placeholder = [""] # type: List[str]
description.append(placeholder)
elif len(description) > 0: # description=[] means no parameters found yet
# appending strings to the placeholder created when parameter found
description[-1].append(s)
for i in range(len(description)):
# concatenating parameter description strings
description[i] = "\n".join(description[i]).strip()
# limiting all parameter descriptions to accepted OpenML string length
if len(description[i]) > char_lim:
description[i] = f"{description[i][: char_lim - 3]}..."
# collecting parameters and their types
parameter_docs = OrderedDict()
matches = p.findall(docstring)
for i, param in enumerate(matches):
key, value = str(param).split(":")
parameter_docs[key.strip()] = [value.strip(), description[i]]
# to avoid KeyError for missing parameters
param_list_true = list(model.get_params().keys())
param_list_found = list(parameter_docs.keys())
for param in list(set(param_list_true) - set(param_list_found)):
parameter_docs[param] = [None, None]
return parameter_docs
def _serialize_model(self, model: Any) -> OpenMLFlow:
"""Create an OpenMLFlow.
Calls `sklearn_to_flow` recursively to properly serialize the
parameters to strings and the components (other models) to OpenMLFlows.
Parameters
----------
model : sklearn estimator
Returns
-------
OpenMLFlow
"""
# Get all necessary information about the model objects itself
(
parameters,
parameters_meta_info,
subcomponents,
subcomponents_explicit,
) = self._extract_information_from_model(model)
# Check that a component does not occur multiple times in a flow as this
# is not supported by OpenML
self._check_multiple_occurence_of_component_in_flow(model, subcomponents)
# Create a flow name, which contains all components in brackets, e.g.:
# RandomizedSearchCV(Pipeline(StandardScaler,AdaBoostClassifier(DecisionTreeClassifier)),
# StandardScaler,AdaBoostClassifier(DecisionTreeClassifier))
class_name = model.__module__ + "." + model.__class__.__name__
# will be part of the name (in brackets)
sub_components_names = ""
for key in subcomponents:
name_thing = subcomponents[key]
if isinstance(name_thing, OpenMLFlow):
name = name_thing.name
elif (
isinstance(name_thing, str)
and subcomponents[key] in SKLEARN_PIPELINE_STRING_COMPONENTS
):
name = name_thing
else:
raise TypeError(type(subcomponents[key]))
if key in subcomponents_explicit:
sub_components_names += "," + key + "=" + name
else:
sub_components_names += "," + name
# slice operation on string in order to get rid of leading comma
name = f"{class_name}({sub_components_names[1:]})" if sub_components_names else class_name
short_name = SklearnExtension.trim_flow_name(name)
# Get the external versions of all sub-components
external_version = self._get_external_version_string(model, subcomponents)
dependencies = self._get_dependencies()
tags = self._get_tags()
sklearn_description = self._get_sklearn_description(model)
return OpenMLFlow(
name=name,
class_name=class_name,
custom_name=short_name,
description=sklearn_description,
model=model,
components=subcomponents,
parameters=parameters,
parameters_meta_info=parameters_meta_info,
external_version=external_version,
tags=tags,
extension=self,
language="English",
dependencies=dependencies,
)
def _get_dependencies(self) -> str:
return self._min_dependency_str(sklearn.__version__) # type: ignore
def _get_tags(self) -> list[str]:
sklearn_version = self._format_external_version("sklearn", sklearn.__version__) # type: ignore
sklearn_version_formatted = sklearn_version.replace("==", "_")
return [
"openml-python",
"sklearn",
"scikit-learn",
"python",
sklearn_version_formatted,
# TODO: add more tags based on the scikit-learn
# module a flow is in? For example automatically
# annotate a class of sklearn.svm.SVC() with the
# tag svm?
]
def _get_external_version_string(
self,
model: Any,
sub_components: dict[str, OpenMLFlow],
) -> str:
# Create external version string for a flow, given the model and the
# already parsed dictionary of sub_components. Retrieves the external
# version of all subcomponents, which themselves already contain all
# requirements for their subcomponents. The external version string is a
# sorted concatenation of all modules which are present in this run.
external_versions = set()
# The model is None if the flow is a placeholder flow such as 'passthrough' or 'drop'
if model is not None:
model_package_name = model.__module__.split(".")[0]
module = importlib.import_module(model_package_name)
model_package_version_number = module.__version__ # type: ignore
external_version = self._format_external_version(
model_package_name,
model_package_version_number,
)
external_versions.add(external_version)
openml_version = self._format_external_version("openml", openml.__version__)
sklearn_version = self._format_external_version("sklearn", sklearn.__version__) # type: ignore
external_versions.add(openml_version)
external_versions.add(sklearn_version)
for visitee in sub_components.values():
if isinstance(visitee, str) and visitee in SKLEARN_PIPELINE_STRING_COMPONENTS:
continue
for external_version in visitee.external_version.split(","):
external_versions.add(external_version)
return ",".join(sorted(external_versions))
def _check_multiple_occurence_of_component_in_flow(
self,
model: Any,
sub_components: dict[str, OpenMLFlow],
) -> None:
to_visit_stack: list[OpenMLFlow] = []
to_visit_stack.extend(sub_components.values())
known_sub_components: set[str] = set()
while len(to_visit_stack) > 0:
visitee = to_visit_stack.pop()
if isinstance(visitee, str) and visitee in SKLEARN_PIPELINE_STRING_COMPONENTS:
known_sub_components.add(visitee)
elif visitee.name in known_sub_components:
raise ValueError(
f"Found a second occurence of component {visitee.name} when "
f"trying to serialize {model}.",
)
else:
known_sub_components.add(visitee.name)
to_visit_stack.extend(visitee.components.values())
def _extract_information_from_model( # noqa: PLR0915, C901, PLR0912
self,
model: Any,
) -> tuple[
OrderedDict[str, str | None],
OrderedDict[str, dict | None],
OrderedDict[str, OpenMLFlow],
set,
]:
# This function contains four "global" states and is quite long and
# complicated. If it gets to complicated to ensure it's correctness,
# it would be best to make it a class with the four "global" states being
# the class attributes and the if/elif/else in the for-loop calls to
# separate class methods
# stores all entities that should become subcomponents
sub_components = OrderedDict() # type: OrderedDict[str, OpenMLFlow]
# stores the keys of all subcomponents that should become
sub_components_explicit = set()
parameters: OrderedDict[str, str | None] = OrderedDict()
parameters_meta_info: OrderedDict[str, dict | None] = OrderedDict()
parameters_docs = self._extract_sklearn_param_info(model)
model_parameters = model.get_params(deep=False)
for k, v in sorted(model_parameters.items(), key=lambda t: t[0]):
rval = self._serialize_sklearn(v, model)
def flatten_all(list_):
"""Flattens arbitrary depth lists of lists (e.g. [[1,2],[3,[1]]] -> [1,2,3,1])."""
for el in list_:
if isinstance(el, (list, tuple)) and len(el) > 0:
yield from flatten_all(el)
else:
yield el
# In case rval is a list of lists (or tuples), we need to identify two situations:
# - sklearn pipeline steps, feature union or base classifiers in voting classifier.
# They look like e.g. [("imputer", Imputer()), ("classifier", SVC())]
# - a list of lists with simple types (e.g. int or str), such as for an OrdinalEncoder
# where all possible values for each feature are described: [[0,1,2], [1,2,5]]
is_non_empty_list_of_lists_with_same_type = (
isinstance(rval, (list, tuple))
and len(rval) > 0
and isinstance(rval[0], (list, tuple))
and all(isinstance(rval_i, type(rval[0])) for rval_i in rval)
)
# Check that all list elements are of simple types.
nested_list_of_simple_types = (
is_non_empty_list_of_lists_with_same_type
and all(isinstance(el, SIMPLE_TYPES) for el in flatten_all(rval))
and all(
len(rv) in (2, 3) and rv[1] not in SKLEARN_PIPELINE_STRING_COMPONENTS
for rv in rval
)
)
if is_non_empty_list_of_lists_with_same_type and not nested_list_of_simple_types:
# If a list of lists is identified that include 'non-simple' types (e.g. objects),
# we assume they are steps in a pipeline, feature union, or base classifiers in
# a voting classifier.
parameter_value = [] # type: List
reserved_keywords = set(model.get_params(deep=False).keys())
for sub_component_tuple in rval:
identifier = sub_component_tuple[0]
sub_component = sub_component_tuple[1]
sub_component_type = type(sub_component_tuple)
if not 2 <= len(sub_component_tuple) <= 3:
# length 2 is for {VotingClassifier.estimators,
# Pipeline.steps, FeatureUnion.transformer_list}
# length 3 is for ColumnTransformer
msg = "Length of tuple of type {} does not match assumptions".format(
sub_component_type,
)
raise ValueError(msg)
if isinstance(sub_component, str):
if sub_component not in SKLEARN_PIPELINE_STRING_COMPONENTS:
msg = (
"Second item of tuple does not match assumptions. "
"If string, can be only 'drop' or 'passthrough' but"
"got %s" % sub_component
)
raise ValueError(msg)
elif sub_component is None:
msg = (
"Cannot serialize objects of None type. Please use a valid "
"placeholder for None. Note that empty sklearn estimators can be "
"replaced with 'drop' or 'passthrough'."
)
raise ValueError(msg)
elif not isinstance(sub_component, OpenMLFlow):
msg = (
"Second item of tuple does not match assumptions. "
"Expected OpenMLFlow, got %s" % type(sub_component)
)
raise TypeError(msg)
if identifier in reserved_keywords:
parent_model = f"{model.__module__}.{model.__class__.__name__}"
msg = "Found element shadowing official " "parameter for {}: {}".format(
parent_model,
identifier,
)
raise PyOpenMLError(msg)
# when deserializing the parameter
sub_components_explicit.add(identifier)
if isinstance(sub_component, str):
external_version = self._get_external_version_string(None, {})
dependencies = self._get_dependencies()
tags = self._get_tags()
sub_components[identifier] = OpenMLFlow(
name=sub_component,
description="Placeholder flow for scikit-learn's string pipeline "
"members",
components=OrderedDict(),
parameters=OrderedDict(),
parameters_meta_info=OrderedDict(),
external_version=external_version,
tags=tags,
language="English",
dependencies=dependencies,
model=None,
)
component_reference: OrderedDict[str, str | dict] = OrderedDict()
component_reference[
"oml-python:serialized_object"
] = COMPOSITION_STEP_CONSTANT
cr_value: dict[str, Any] = OrderedDict()
cr_value["key"] = identifier
cr_value["step_name"] = identifier
if len(sub_component_tuple) == 3:
cr_value["argument_1"] = sub_component_tuple[2]
component_reference["value"] = cr_value
else:
sub_components[identifier] = sub_component
component_reference = OrderedDict()
component_reference["oml-python:serialized_object"] = COMPONENT_REFERENCE
cr_value = OrderedDict()
cr_value["key"] = identifier
cr_value["step_name"] = identifier
if len(sub_component_tuple) == 3:
cr_value["argument_1"] = sub_component_tuple[2]
component_reference["value"] = cr_value
parameter_value.append(component_reference)
# Here (and in the elif and else branch below) are the only
# places where we encode a value as json to make sure that all
# parameter values still have the same type after
# deserialization
if isinstance(rval, tuple):
parameter_json = json.dumps(tuple(parameter_value))
else:
parameter_json = json.dumps(parameter_value)
parameters[k] = parameter_json
elif isinstance(rval, OpenMLFlow):
# A subcomponent, for example the base model in
# AdaBoostClassifier
sub_components[k] = rval
sub_components_explicit.add(k)
component_reference = OrderedDict()
component_reference["oml-python:serialized_object"] = COMPONENT_REFERENCE
cr_value = OrderedDict()
cr_value["key"] = k
cr_value["step_name"] = None
component_reference["value"] = cr_value
cr = self._serialize_sklearn(component_reference, model)
parameters[k] = json.dumps(cr)
elif not (hasattr(rval, "__len__") and len(rval) == 0):
rval = json.dumps(rval)
parameters[k] = rval
# a regular hyperparameter
else:
parameters[k] = None
if parameters_docs is not None:
data_type, description = parameters_docs[k]
parameters_meta_info[k] = OrderedDict(
(("description", description), ("data_type", data_type)),
)
else:
parameters_meta_info[k] = OrderedDict((("description", None), ("data_type", None)))
return parameters, parameters_meta_info, sub_components, sub_components_explicit
def _get_fn_arguments_with_defaults(self, fn_name: Callable) -> tuple[dict, set]:
"""
Returns
-------
i) a dict with all parameter names that have a default value, and
ii) a set with all parameter names that do not have a default
Parameters
----------
fn_name : callable
The function of which we want to obtain the defaults
Returns
-------
params_with_defaults: dict
a dict mapping parameter name to the default value
params_without_defaults: set
a set with all parameters that do not have a default value
"""
# parameters with defaults are optional, all others are required.
parameters = inspect.signature(fn_name).parameters
required_params = set()
optional_params = {}
for param in parameters:
parameter = parameters.get(param)
default_val = parameter.default # type: ignore
if default_val is inspect.Signature.empty:
required_params.add(param)
else:
optional_params[param] = default_val
return optional_params, required_params
def _deserialize_model(
self,
flow: OpenMLFlow,
keep_defaults: bool, # noqa: FBT001
recursion_depth: int,
strict_version: bool = True, # noqa: FBT002, FBT001
) -> Any:
logger.info("-{} deserialize {}".format("-" * recursion_depth, flow.name))
model_name = flow.class_name
self._check_dependencies(flow.dependencies, strict_version=strict_version)
parameters = flow.parameters
components = flow.components
parameter_dict: dict[str, Any] = OrderedDict()
# Do a shallow copy of the components dictionary so we can remove the
# components from this copy once we added them into the pipeline. This
# allows us to not consider them any more when looping over the
# components, but keeping the dictionary of components untouched in the
# original components dictionary.
components_ = copy.copy(components)
for name in parameters:
value = parameters.get(name)
logger.info(
"--{} flow_parameter={}, value={}".format("-" * recursion_depth, name, value)
)
rval = self._deserialize_sklearn(
value,
components=components_,
initialize_with_defaults=keep_defaults,
recursion_depth=recursion_depth + 1,
strict_version=strict_version,
)
parameter_dict[name] = rval
for name in components:
if name in parameter_dict:
continue
if name not in components_:
continue
value = components[name]
logger.info(
"--{} flow_component={}, value={}".format("-" * recursion_depth, name, value)
)
rval = self._deserialize_sklearn(
value,
recursion_depth=recursion_depth + 1,
strict_version=strict_version,
)
parameter_dict[name] = rval
if model_name is None and flow.name in SKLEARN_PIPELINE_STRING_COMPONENTS:
return flow.name
assert model_name is not None
module_name = model_name.rsplit(".", 1)
model_class = getattr(importlib.import_module(module_name[0]), module_name[1])
if keep_defaults:
# obtain all params with a default
param_defaults, _ = self._get_fn_arguments_with_defaults(model_class.__init__)
# delete the params that have a default from the dict,
# so they get initialized with their default value
# except [...]
for param in param_defaults:
# [...] the ones that also have a key in the components dict.
# As OpenML stores different flows for ensembles with different
# (base-)components, in OpenML terms, these are not considered
# hyperparameters but rather constants (i.e., changing them would
# result in a different flow)
if param not in components:
del parameter_dict[param]
return model_class(**parameter_dict)
def _check_dependencies(
self,
dependencies: str,
strict_version: bool = True, # noqa: FBT001, FBT002
) -> None:
if not dependencies:
return
dependencies_list = dependencies.split("\n")
for dependency_string in dependencies_list:
match = DEPENDENCIES_PATTERN.match(dependency_string)
if not match:
raise ValueError("Cannot parse dependency %s" % dependency_string)
dependency_name = match.group("name")
operation = match.group("operation")
version = match.group("version")
module = importlib.import_module(dependency_name)
required_version = LooseVersion(version)
installed_version = LooseVersion(module.__version__) # type: ignore
if operation == "==":
check = required_version == installed_version
elif operation == ">":
check = installed_version > required_version
elif operation == ">=":
check = (
installed_version > required_version or installed_version == required_version
)
else:
raise NotImplementedError("operation '%s' is not supported" % operation)
message = (
"Trying to deserialize a model with dependency "
f"{dependency_string} not satisfied."
)
if not check:
if strict_version:
raise ValueError(message)
warnings.warn(message, category=UserWarning, stacklevel=2)
def _serialize_type(self, o: Any) -> OrderedDict[str, str]:
mapping = {
float: "float",
np.float32: "np.float32",
np.float64: "np.float64",
int: "int",
np.int32: "np.int32",
np.int64: "np.int64",
}
if LooseVersion(np.__version__) < "1.24":
mapping[float] = "np.float"
mapping[int] = "np.int"
ret = OrderedDict() # type: 'OrderedDict[str, str]'
ret["oml-python:serialized_object"] = "type"
ret["value"] = mapping[o]
return ret
def _deserialize_type(self, o: str) -> Any:
mapping = {
"float": float,
"np.float32": np.float32,
"np.float64": np.float64,
"int": int,
"np.int32": np.int32,
"np.int64": np.int64,
}
# TODO(eddiebergman): Might be able to remove this
if LooseVersion(np.__version__) < "1.24":
mapping["np.float"] = np.float # type: ignore # noqa: NPY001
mapping["np.int"] = np.int # type: ignore # noqa: NPY001
return mapping[o]
def _serialize_rv_frozen(self, o: Any) -> OrderedDict[str, str | dict]:
args = o.args
kwds = o.kwds
a = o.a
b = o.b
dist = o.dist.__class__.__module__ + "." + o.dist.__class__.__name__
ret: OrderedDict[str, str | dict] = OrderedDict()
ret["oml-python:serialized_object"] = "rv_frozen"
ret["value"] = OrderedDict(
(("dist", dist), ("a", a), ("b", b), ("args", args), ("kwds", kwds)),
)
return ret
def _deserialize_rv_frozen(self, o: OrderedDict[str, str]) -> Any:
args = o["args"]
kwds = o["kwds"]
a = o["a"]
b = o["b"]
dist_name = o["dist"]
module_name = dist_name.rsplit(".", 1)
try:
rv_class = getattr(importlib.import_module(module_name[0]), module_name[1])
except AttributeError as e:
_tb = traceback.format_exc()
warnings.warn(
f"Cannot create model {dist_name} for flow. Reason is from error {type(e)}:{e}"
f"\nTraceback: {_tb}",
RuntimeWarning,
stacklevel=2,
)
return None
dist = scipy.stats.distributions.rv_frozen(rv_class(), *args, **kwds) # type: ignore
dist.a = a
dist.b = b
return dist
def _serialize_function(self, o: Callable) -> OrderedDict[str, str]:
name = o.__module__ + "." + o.__name__
ret = OrderedDict() # type: 'OrderedDict[str, str]'
ret["oml-python:serialized_object"] = "function"
ret["value"] = name
return ret
def _deserialize_function(self, name: str) -> Callable:
module_name = name.rsplit(".", 1)
return getattr(importlib.import_module(module_name[0]), module_name[1])
def _serialize_cross_validator(self, o: Any) -> OrderedDict[str, str | dict]:
ret: OrderedDict[str, str | dict] = OrderedDict()
parameters = OrderedDict() # type: 'OrderedDict[str, Any]'
# XXX this is copied from sklearn.model_selection._split
cls = o.__class__
init = getattr(cls.__init__, "deprecated_original", cls.__init__)
# Ignore varargs, kw and default values and pop self
init_signature = inspect.signature(init) # type: ignore
# Consider the constructor parameters excluding 'self'
if init is object.__init__:
args = [] # type: List
else:
args = sorted(
[
p.name
for p in init_signature.parameters.values()
if p.name != "self" and p.kind != p.VAR_KEYWORD
],
)
for key in args:
# We need deprecation warnings to always be on in order to
# catch deprecated param values.
# This is set in utils/__init__.py but it gets overwritten
# when running under python3 somehow.
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always", DeprecationWarning)
value = getattr(o, key, None)
if w is not None and len(w) and w[0].category == DeprecationWarning:
# if the parameter is deprecated, don't show it
continue
if not (isinstance(value, Sized) and len(value) == 0):
value = json.dumps(value)
parameters[key] = value
else:
parameters[key] = None
ret["oml-python:serialized_object"] = "cv_object"
name = o.__module__ + "." + o.__class__.__name__
value = OrderedDict([("name", name), ("parameters", parameters)])
ret["value"] = value
return ret
def _deserialize_cross_validator(
self,
value: OrderedDict[str, Any],
recursion_depth: int,
strict_version: bool = True, # noqa: FBT002, FBT001
) -> Any:
model_name = value["name"]
parameters = value["parameters"]
module_name = model_name.rsplit(".", 1)
model_class = getattr(importlib.import_module(module_name[0]), module_name[1])
for parameter in parameters:
parameters[parameter] = self._deserialize_sklearn(
parameters[parameter],
recursion_depth=recursion_depth + 1,
strict_version=strict_version,
)
return model_class(**parameters)
def _format_external_version(
self,
model_package_name: str,
model_package_version_number: str,
) -> str:
return f"{model_package_name}=={model_package_version_number}"
@staticmethod
def _get_parameter_values_recursive(
param_grid: dict | list[dict],
parameter_name: str,
) -> list[Any]:
"""
Returns a list of values for a given hyperparameter, encountered
recursively throughout the flow. (e.g., n_jobs can be defined
for various flows)
Parameters
----------
param_grid: Union[Dict, List[Dict]]
Dict mapping from hyperparameter list to value, to a list of
such dicts
parameter_name: str
The hyperparameter that needs to be inspected
Returns
-------
List
A list of all values of hyperparameters with this name
"""
if isinstance(param_grid, dict):
return [
value
for param, value in param_grid.items()
if param.split("__")[-1] == parameter_name
]
if isinstance(param_grid, list):
result = []
for sub_grid in param_grid:
result.extend(
SklearnExtension._get_parameter_values_recursive(sub_grid, parameter_name),
)
return result
raise ValueError("Param_grid should either be a dict or list of dicts")
def _prevent_optimize_n_jobs(self, model):
"""
Ensures that HPO classes will not optimize the n_jobs hyperparameter
Parameters
----------
model:
The model that will be fitted
"""
if self._is_hpo_class(model):
if isinstance(model, sklearn.model_selection.GridSearchCV):
param_distributions = model.param_grid
elif isinstance(model, sklearn.model_selection.RandomizedSearchCV):
param_distributions = model.param_distributions
else:
if hasattr(model, "param_distributions"):
param_distributions = model.param_distributions
else:
raise AttributeError(
"Using subclass BaseSearchCV other than "
"{GridSearchCV, RandomizedSearchCV}. "
"Could not find attribute "
"param_distributions.",
)
logger.warning(
"Warning! Using subclass BaseSearchCV other than "
"{GridSearchCV, RandomizedSearchCV}. "
"Should implement param check. ",
)
n_jobs_vals = SklearnExtension._get_parameter_values_recursive(
param_distributions,
"n_jobs",
)
if len(n_jobs_vals) > 0:
raise PyOpenMLError(
"openml-python should not be used to " "optimize the n_jobs parameter.",
)
################################################################################################
# Methods for performing runs with extension modules
def is_estimator(self, model: Any) -> bool:
"""Check whether the given model is a scikit-learn estimator.
This function is only required for backwards compatibility and will be removed in the
near future.
Parameters
----------
model : Any
Returns
-------
bool
"""
o = model
return hasattr(o, "fit") and hasattr(o, "get_params") and hasattr(o, "set_params")
def seed_model(self, model: Any, seed: int | None = None) -> Any: # noqa: C901
"""Set the random state of all the unseeded components of a model and return the seeded
model.
Required so that all seed information can be uploaded to OpenML for reproducible results.
Models that are already seeded will maintain the seed. In this case,
only integer seeds are allowed (An exception is raised when a RandomState was used as
seed).
Parameters
----------
model : sklearn model
The model to be seeded
seed : int
The seed to initialize the RandomState with. Unseeded subcomponents
will be seeded with a random number from the RandomState.
Returns
-------
Any
"""
def _seed_current_object(current_value):
if isinstance(current_value, int): # acceptable behaviour
return False
if isinstance(current_value, np.random.RandomState):
raise ValueError(
"Models initialized with a RandomState object are not "
"supported. Please seed with an integer. ",
)
if current_value is not None:
raise ValueError(
"Models should be seeded with int or None (this should never " "happen). ",
)
return True
rs = np.random.RandomState(seed)
model_params = model.get_params()
random_states = {}
for param_name in sorted(model_params):
if "random_state" in param_name:
current_value = model_params[param_name]
# important to draw the value at this point (and not in the if
# statement) this way we guarantee that if a different set of
# subflows is seeded, the same number of the random generator is
# used
new_value = rs.randint(0, 2**16)
if _seed_current_object(current_value):
random_states[param_name] = new_value
# Also seed CV objects!
elif isinstance(model_params[param_name], sklearn.model_selection.BaseCrossValidator):
if not hasattr(model_params[param_name], "random_state"):
continue
current_value = model_params[param_name].random_state
new_value = rs.randint(0, 2**16)
if _seed_current_object(current_value):
model_params[param_name].random_state = new_value
model.set_params(**random_states)
return model
def check_if_model_fitted(self, model: Any) -> bool:
"""Returns True/False denoting if the model has already been fitted/trained
Parameters
----------
model : Any
Returns
-------
bool
"""
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted
try:
# check if model is fitted
check_is_fitted(model)
# Creating random dummy data of arbitrary size
dummy_data = np.random.uniform(size=(10, 3)) # noqa: NPY002
# Using 'predict' instead of 'sklearn.utils.validation.check_is_fitted' for a more
# robust check that works across sklearn versions and models. Internally, 'predict'
# should call 'check_is_fitted' for every concerned attribute, thus offering a more
# assured check than explicit calls to 'check_is_fitted'
model.predict(dummy_data)
# Will reach here if the model was fit on a dataset with 3 features
return True
except NotFittedError: # needs to be the first exception to be caught
# Model is not fitted, as is required
return False
except ValueError:
# Will reach here if the model was fit on a dataset with more or less than 3 features
return True
def _run_model_on_fold( # noqa: PLR0915, PLR0913, C901, PLR0912
self,
model: Any,
task: OpenMLTask,
X_train: np.ndarray | scipy.sparse.spmatrix | pd.DataFrame,
rep_no: int,
fold_no: int,
y_train: np.ndarray | None = None,
X_test: np.ndarray | scipy.sparse.spmatrix | pd.DataFrame | None = None,
) -> tuple[
np.ndarray,
pd.DataFrame | None,
OrderedDict[str, float],
OpenMLRunTrace | None,
]:
"""Run a model on a repeat,fold,subsample triplet of the task and return prediction
information.
Furthermore, it will measure run time measures in case multi-core behaviour allows this.
* exact user cpu time will be measured if the number of cores is set (recursive throughout
the model) exactly to 1
* wall clock time will be measured if the number of cores is set (recursive throughout the
model) to any given number (but not when it is set to -1)
Returns the data that is necessary to construct the OpenML Run object. Is used by
run_task_get_arff_content. Do not use this function unless you know what you are doing.
Parameters
----------
model : Any
The UNTRAINED model to run. The model instance will be copied and not altered.
task : OpenMLTask
The task to run the model on.
X_train : array-like
Training data for the given repetition and fold.
rep_no : int
The repeat of the experiment (0-based; in case of 1 time CV, always 0)
fold_no : int
The fold nr of the experiment (0-based; in case of holdout, always 0)
y_train : Optional[np.ndarray] (default=None)
Target attributes for supervised tasks. In case of classification, these are integer
indices to the potential classes specified by dataset.
X_test : Optional, array-like (default=None)
Test attributes to test for generalization in supervised tasks.
Returns
-------
pred_y : np.ndarray
Predictions on the training/test set, depending on the task type.
For supervised tasks, predictions are on the test set.
For unsupervised tasks, predictions are on the training set.
proba_y : pd.DataFrame, optional
Predicted probabilities for the test set.
None, if task is not Classification or Learning Curve prediction.
user_defined_measures : OrderedDict[str, float]
User defined measures that were generated on this fold
trace : OpenMLRunTrace, optional
arff trace object from a fitted model and the trace content obtained by
repeatedly calling ``run_model_on_task``
"""
def _prediction_to_probabilities(
y: np.ndarray | list,
model_classes: list[Any],
class_labels: list[str] | None,
) -> pd.DataFrame:
"""Transforms predicted probabilities to match with OpenML class indices.
Parameters
----------
y : np.ndarray
Predicted probabilities (possibly omitting classes if they were not present in the
training data).
model_classes : list
List of classes known_predicted by the model, ordered by their index.
class_labels : list
List of classes as stored in the task object fetched from server.
Returns
-------
pd.DataFrame
"""
if class_labels is None:
raise ValueError("The task has no class labels")
if isinstance(y_train, np.ndarray) and isinstance(class_labels[0], str):
# mapping (decoding) the predictions to the categories
# creating a separate copy to not change the expected pred_y type
y = [class_labels[pred] for pred in y] # list or numpy array of predictions
# model_classes: sklearn classifier mapping from original array id to
# prediction index id
if not isinstance(model_classes, list):
raise ValueError("please convert model classes to list prior to calling this fn")
# DataFrame allows more accurate mapping of classes as column names
result = pd.DataFrame(
0,
index=np.arange(len(y)),
columns=model_classes,
dtype=np.float32,
)
for obs, prediction in enumerate(y):
result.loc[obs, prediction] = 1.0
return result
if isinstance(task, OpenMLSupervisedTask):
if y_train is None:
raise TypeError("argument y_train must not be of type None")
if X_test is None:
raise TypeError("argument X_test must not be of type None")
model_copy = sklearn.base.clone(model, safe=True)
# sanity check: prohibit users from optimizing n_jobs
self._prevent_optimize_n_jobs(model_copy)
# measures and stores runtimes
user_defined_measures = OrderedDict() # type: 'OrderedDict[str, float]'
try:
# for measuring runtime. Only available since Python 3.3
modelfit_start_cputime = time.process_time()
modelfit_start_walltime = time.time()
if isinstance(task, OpenMLSupervisedTask):
model_copy.fit(X_train, y_train) # type: ignore
elif isinstance(task, OpenMLClusteringTask):
model_copy.fit(X_train) # type: ignore
modelfit_dur_cputime = (time.process_time() - modelfit_start_cputime) * 1000
modelfit_dur_walltime = (time.time() - modelfit_start_walltime) * 1000
user_defined_measures["usercpu_time_millis_training"] = modelfit_dur_cputime
refit_time = model_copy.refit_time_ * 1000 if hasattr(model_copy, "refit_time_") else 0 # type: ignore
user_defined_measures["wall_clock_time_millis_training"] = modelfit_dur_walltime
except AttributeError as e:
# typically happens when training a regressor on classification task
raise PyOpenMLError(str(e)) from e
if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)):
# search for model classes_ (might differ depending on modeltype)
# first, pipelines are a special case (these don't have a classes_
# object, but rather borrows it from the last step. We do this manually,
# because of the BaseSearch check)
if isinstance(model_copy, sklearn.pipeline.Pipeline):
used_estimator = model_copy.steps[-1][-1]
else:
used_estimator = model_copy
if self._is_hpo_class(used_estimator):
model_classes = used_estimator.best_estimator_.classes_
else:
model_classes = used_estimator.classes_
if not isinstance(model_classes, list):
model_classes = model_classes.tolist()
# to handle the case when dataset is numpy and categories are encoded
# however the class labels stored in task are still categories
if isinstance(y_train, np.ndarray) and isinstance(
cast(List, task.class_labels)[0],
str,
):
model_classes = [cast(List[str], task.class_labels)[i] for i in model_classes]
modelpredict_start_cputime = time.process_time()
modelpredict_start_walltime = time.time()
# In supervised learning this returns the predictions for Y, in clustering
# it returns the clusters
if isinstance(task, OpenMLSupervisedTask):
pred_y = model_copy.predict(X_test)
elif isinstance(task, OpenMLClusteringTask):
pred_y = model_copy.predict(X_train)
else:
raise ValueError(task)
modelpredict_duration_cputime = (time.process_time() - modelpredict_start_cputime) * 1000
user_defined_measures["usercpu_time_millis_testing"] = modelpredict_duration_cputime
user_defined_measures["usercpu_time_millis"] = (
modelfit_dur_cputime + modelpredict_duration_cputime
)
modelpredict_duration_walltime = (time.time() - modelpredict_start_walltime) * 1000
user_defined_measures["wall_clock_time_millis_testing"] = modelpredict_duration_walltime
user_defined_measures["wall_clock_time_millis"] = (
modelfit_dur_walltime + modelpredict_duration_walltime + refit_time
)
if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)):
try:
proba_y = model_copy.predict_proba(X_test)
proba_y = pd.DataFrame(proba_y, columns=model_classes) # handles X_test as numpy
except AttributeError: # predict_proba is not available when probability=False
proba_y = _prediction_to_probabilities(pred_y, model_classes, task.class_labels)
if task.class_labels is not None:
if proba_y.shape[1] != len(task.class_labels):
# Remap the probabilities in case there was a class missing
# at training time. By default, the classification targets
# are mapped to be zero-based indices to the actual classes.
# Therefore, the model_classes contain the correct indices to
# the correct probability array. Example:
# classes in the dataset: 0, 1, 2, 3, 4, 5
# classes in the training set: 0, 1, 2, 4, 5
# then we need to add a column full of zeros into the probabilities
# for class 3 because the rest of the library expects that the
# probabilities are ordered the same way as the classes are ordered).
message = "Estimator only predicted for {}/{} classes!".format(
proba_y.shape[1],
len(task.class_labels),
)
warnings.warn(message, stacklevel=2)
openml.config.logger.warning(message)
for _i, col in enumerate(task.class_labels):
# adding missing columns with 0 probability
if col not in model_classes:
proba_y[col] = 0
# We re-order the columns to move possibly added missing columns into place.
proba_y = proba_y[task.class_labels]
else:
raise ValueError("The task has no class labels")
if not np.all(set(proba_y.columns) == set(task.class_labels)):
missing_cols = list(set(task.class_labels) - set(proba_y.columns))
raise ValueError("Predicted probabilities missing for the columns: ", missing_cols)
elif isinstance(task, (OpenMLRegressionTask, OpenMLClusteringTask)):
proba_y = None
else:
raise TypeError(type(task))
if self._is_hpo_class(model_copy):
trace_data = self._extract_trace_data(model_copy, rep_no, fold_no)
trace: OpenMLRunTrace | None = self._obtain_arff_trace(
model_copy,
trace_data,
)
else:
trace = None
return pred_y, proba_y, user_defined_measures, trace
def obtain_parameter_values( # noqa: C901, PLR0915
self,
flow: OpenMLFlow,
model: Any = None,
) -> list[dict[str, Any]]:
"""Extracts all parameter settings required for the flow from the model.
If no explicit model is provided, the parameters will be extracted from `flow.model`
instead.
Parameters
----------
flow : OpenMLFlow
OpenMLFlow object (containing flow ids, i.e., it has to be downloaded from the server)
model: Any, optional (default=None)
The model from which to obtain the parameter values. Must match the flow signature.
If None, use the model specified in ``OpenMLFlow.model``.
Returns
-------
list
A list of dicts, where each dict has the following entries:
- ``oml:name`` : str: The OpenML parameter name
- ``oml:value`` : mixed: A representation of the parameter value
- ``oml:component`` : int: flow id to which the parameter belongs
"""
openml.flows.functions._check_flow_for_server_id(flow)
def get_flow_dict(_flow):
flow_map = {_flow.name: _flow.flow_id}
for subflow in _flow.components:
flow_map.update(get_flow_dict(_flow.components[subflow]))
return flow_map
def extract_parameters( # noqa: PLR0915, PLR0912, C901
_flow,
_flow_dict,
component_model,
_main_call=False, # noqa: FBT002
main_id=None,
):
def is_subcomponent_specification(values):
# checks whether the current value can be a specification of
# subcomponents, as for example the value for steps parameter
# (in Pipeline) or transformers parameter (in
# ColumnTransformer).
return (
# Specification requires list/tuple of list/tuple with
# at least length 2.
isinstance(values, (tuple, list))
and all(isinstance(item, (tuple, list)) and len(item) > 1 for item in values)
# And each component needs to be a flow or interpretable string
and all(
isinstance(item[1], openml.flows.OpenMLFlow)
or (
isinstance(item[1], str)
and item[1] in SKLEARN_PIPELINE_STRING_COMPONENTS
)
for item in values
)
)
# _flow is openml flow object, _param dict maps from flow name to flow
# id for the main call, the param dict can be overridden (useful for
# unit tests / sentinels) this way, for flows without subflows we do
# not have to rely on _flow_dict
exp_parameters = set(_flow.parameters)
if (
isinstance(component_model, str)
and component_model in SKLEARN_PIPELINE_STRING_COMPONENTS
):
model_parameters = set()
else:
model_parameters = set(component_model.get_params(deep=False))
if len(exp_parameters.symmetric_difference(model_parameters)) != 0:
flow_params = sorted(exp_parameters)
model_params = sorted(model_parameters)
raise ValueError(
"Parameters of the model do not match the "
"parameters expected by the "
"flow:\nexpected flow parameters: "
f"{flow_params}\nmodel parameters: {model_params}",
)
exp_components = set(_flow.components)
if (
isinstance(component_model, str)
and component_model in SKLEARN_PIPELINE_STRING_COMPONENTS
):
model_components = set()
else:
_ = set(component_model.get_params(deep=False))
model_components = {
mp
for mp in component_model.get_params(deep=True)
if "__" not in mp and mp not in _
}
if len(exp_components.symmetric_difference(model_components)) != 0:
is_problem = True
if len(exp_components - model_components) > 0:
# If an expected component is not returned as a component by get_params(),
# this means that it is also a parameter -> we need to check that this is
# actually the case
difference = exp_components - model_components
component_in_model_parameters = []
for component in difference:
if component in model_parameters:
component_in_model_parameters.append(True)
else:
component_in_model_parameters.append(False)
is_problem = not all(component_in_model_parameters)
if is_problem:
flow_components = sorted(exp_components)
model_components = sorted(model_components)
raise ValueError(
"Subcomponents of the model do not match the "
"parameters expected by the "
"flow:\nexpected flow subcomponents: "
f"{flow_components}\nmodel subcomponents: {model_components}",
)
_params = []
for _param_name in _flow.parameters:
_current = OrderedDict()
_current["oml:name"] = _param_name
current_param_values = self.model_to_flow(component_model.get_params()[_param_name])
# Try to filter out components (a.k.a. subflows) which are
# handled further down in the code (by recursively calling
# this function)!
if isinstance(current_param_values, openml.flows.OpenMLFlow):
continue
if is_subcomponent_specification(current_param_values):
# complex parameter value, with subcomponents
parsed_values = []
for subcomponent in current_param_values:
# scikit-learn stores usually tuples in the form
# (name (str), subcomponent (mixed), argument
# (mixed)). OpenML replaces the subcomponent by an
# OpenMLFlow object.
if len(subcomponent) < 2 or len(subcomponent) > 3:
raise ValueError("Component reference should be " "size {2,3}. ")
subcomponent_identifier = subcomponent[0]
subcomponent_flow = subcomponent[1]
if not isinstance(subcomponent_identifier, str):
raise TypeError(
"Subcomponent identifier should be of type string, "
f"but is {type(subcomponent_identifier)}",
)
if not isinstance(subcomponent_flow, (openml.flows.OpenMLFlow, str)):
if (
isinstance(subcomponent_flow, str)
and subcomponent_flow in SKLEARN_PIPELINE_STRING_COMPONENTS
):
pass
else:
raise TypeError(
"Subcomponent flow should be of type flow, but is {}".format(
type(subcomponent_flow),
),
)
current = {
"oml-python:serialized_object": COMPONENT_REFERENCE,
"value": {
"key": subcomponent_identifier,
"step_name": subcomponent_identifier,
},
}
if len(subcomponent) == 3:
if not isinstance(subcomponent[2], list) and not isinstance(
subcomponent[2],
OrderedDict,
):
raise TypeError(
"Subcomponent argument should be list or OrderedDict",
)
current["value"]["argument_1"] = subcomponent[2]
parsed_values.append(current)
parsed_values = json.dumps(parsed_values)
else:
# vanilla parameter value
parsed_values = json.dumps(current_param_values)
_current["oml:value"] = parsed_values
if _main_call:
_current["oml:component"] = main_id
else:
_current["oml:component"] = _flow_dict[_flow.name]
_params.append(_current)
for _identifier in _flow.components:
subcomponent_model = component_model.get_params()[_identifier]
_params.extend(
extract_parameters(
_flow.components[_identifier],
_flow_dict,
subcomponent_model,
),
)
return _params
flow_dict = get_flow_dict(flow)
model = model if model is not None else flow.model
return extract_parameters(flow, flow_dict, model, _main_call=True, main_id=flow.flow_id)
def _openml_param_name_to_sklearn(
self,
openml_parameter: openml.setups.OpenMLParameter,
flow: OpenMLFlow,
) -> str:
"""
Converts the name of an OpenMLParameter into the sklean name, given a flow.
Parameters
----------
openml_parameter: OpenMLParameter
The parameter under consideration
flow: OpenMLFlow
The flow that provides context.
Returns
-------
sklearn_parameter_name: str
The name the parameter will have once used in scikit-learn
"""
if not isinstance(openml_parameter, openml.setups.OpenMLParameter):
raise ValueError("openml_parameter should be an instance of OpenMLParameter")
if not isinstance(flow, OpenMLFlow):
raise ValueError("flow should be an instance of OpenMLFlow")
flow_structure = flow.get_structure("name")
if openml_parameter.flow_name not in flow_structure:
raise ValueError("Obtained OpenMLParameter and OpenMLFlow do not correspond. ")
name = openml_parameter.flow_name # for PEP8
return "__".join(flow_structure[name] + [openml_parameter.parameter_name])
################################################################################################
# Methods for hyperparameter optimization
def _is_hpo_class(self, model: Any) -> bool:
"""Check whether the model performs hyperparameter optimization.
Used to check whether an optimization trace can be extracted from the model after
running it.
Parameters
----------
model : Any
Returns
-------
bool
"""
return isinstance(model, sklearn.model_selection._search.BaseSearchCV)
def instantiate_model_from_hpo_class(
self,
model: Any,
trace_iteration: OpenMLTraceIteration,
) -> Any:
"""Instantiate a ``base_estimator`` which can be searched over by the hyperparameter
optimization model.
Parameters
----------
model : Any
A hyperparameter optimization model which defines the model to be instantiated.
trace_iteration : OpenMLTraceIteration
Describing the hyperparameter settings to instantiate.
Returns
-------
Any
"""
if not self._is_hpo_class(model):
raise AssertionError(
"Flow model %s is not an instance of sklearn.model_selection._search.BaseSearchCV"
% model,
)
base_estimator = model.estimator
base_estimator.set_params(**trace_iteration.get_parameters())
return base_estimator
def _extract_trace_data(self, model, rep_no, fold_no):
"""Extracts data from a machine learning model's cross-validation results
and creates an ARFF (Attribute-Relation File Format) trace.
Parameters
----------
model : Any
A fitted hyperparameter optimization model.
rep_no : int
The repetition number.
fold_no : int
The fold number.
Returns
-------
A list of ARFF tracecontent.
"""
arff_tracecontent = []
for itt_no in range(len(model.cv_results_["mean_test_score"])):
# we use the string values for True and False, as it is defined in
# this way by the OpenML server
selected = "false"
if itt_no == model.best_index_:
selected = "true"
test_score = model.cv_results_["mean_test_score"][itt_no]
arff_line = [rep_no, fold_no, itt_no, test_score, selected]
for key in model.cv_results_:
if key.startswith("param_"):
value = model.cv_results_[key][itt_no]
# Built-in serializer does not convert all numpy types,
# these methods convert them to built-in types instead.
if isinstance(value, np.generic):
# For scalars it actually returns scalars, not a list
value = value.tolist()
serialized_value = json.dumps(value) if value is not np.ma.masked else np.nan
arff_line.append(serialized_value)
arff_tracecontent.append(arff_line)
return arff_tracecontent
def _obtain_arff_trace(
self,
model: Any,
trace_content: list,
) -> OpenMLRunTrace:
"""Create arff trace object from a fitted model and the trace content obtained by
repeatedly calling ``run_model_on_task``.
Parameters
----------
model : Any
A fitted hyperparameter optimization model.
trace_content : List[List]
Trace content obtained by ``openml.runs.run_flow_on_task``.
Returns
-------
OpenMLRunTrace
"""
if not self._is_hpo_class(model):
raise AssertionError(
"Flow model %s is not an instance of sklearn.model_selection._search.BaseSearchCV"
% model,
)
if not hasattr(model, "cv_results_"):
raise ValueError("model should contain `cv_results_`")
# attributes that will be in trace arff, regardless of the model
trace_attributes = [
("repeat", "NUMERIC"),
("fold", "NUMERIC"),
("iteration", "NUMERIC"),
("evaluation", "NUMERIC"),
("selected", ["true", "false"]),
]
# model dependent attributes for trace arff
for key in model.cv_results_:
if key.startswith("param_"):
# supported types should include all types, including bool,
# int float
supported_basic_types = (bool, int, float, str)
for param_value in model.cv_results_[key]:
if isinstance(param_value, np.generic):
param_value = param_value.tolist() # noqa: PLW2901
if (
isinstance(param_value, supported_basic_types)
or param_value is None
or param_value is np.ma.masked
):
# basic string values
type = "STRING" # noqa: A001
elif isinstance(param_value, (list, tuple)) and all(
isinstance(i, int) for i in param_value
):
# list of integers (usually for selecting features)
# hyperparameter layer_sizes of MLPClassifier
type = "STRING" # noqa: A001
else:
raise TypeError("Unsupported param type in param grid: %s" % key)
# renamed the attribute param to parameter, as this is a required
# OpenML convention - this also guards against name collisions
# with the required trace attributes
attribute = (PREFIX + key[6:], type) # type: ignore
trace_attributes.append(attribute)
return OpenMLRunTrace.generate(
trace_attributes,
trace_content,
)