Skip to content

flows

OpenMLFlow

Bases: OpenMLBase

OpenML Flow. Stores machine learning models.

Flows should not be generated manually, but by the function :meth:openml.flows.create_flow_from_model. Using this helper function ensures that all relevant fields are filled in.

Implements openml.implementation.upload.xsd <https://github.com/openml/openml/blob/master/openml_OS/views/pages/api_new/v1/xsd/ openml.implementation.upload.xsd>_.

Parameters:

Name Type Description Default
name str

Name of the flow. Is used together with the attribute external_version as a unique identifier of the flow.

required
description str

Human-readable description of the flow (free text).

required
model object

ML model which is described by this flow.

required
components OrderedDict

Mapping from component identifier to an OpenMLFlow object. Components are usually subfunctions of an algorithm (e.g. kernels), base learners in ensemble algorithms (decision tree in adaboost) or building blocks of a machine learning pipeline. Components are modeled as independent flows and can be shared between flows (different pipelines can use the same components).

required
parameters OrderedDict

Mapping from parameter name to the parameter default value. The parameter default value must be of type str, so that the respective toolbox plugin can take care of casting the parameter default value to the correct type.

required
parameters_meta_info OrderedDict

Mapping from parameter name to dict. Stores additional information for each parameter. Required keys are data_type and description.

required
external_version str

Version number of the software the flow is implemented in. Is used together with the attribute name as a uniquer identifier of the flow.

required
tags list

List of tags. Created on the server by other API calls.

required
language str

Natural language the flow is described in (not the programming language).

required
dependencies str

A list of dependencies necessary to run the flow. This field should contain all libraries the flow depends on. To allow reproducibility it should also specify the exact version numbers.

required
class_name str

The development language name of the class which is described by this flow.

None
custom_name str

Custom name of the flow given by the owner.

None
binary_url str

Url from which the binary can be downloaded. Added by the server. Ignored when uploaded manually. Will not be used by the python API because binaries aren't compatible across machines.

None
binary_format str

Format in which the binary code was uploaded. Will not be used by the python API because binaries aren't compatible across machines.

None
binary_md5 str

MD5 checksum to check if the binary code was correctly downloaded. Will not be used by the python API because binaries aren't compatible across machines.

None
uploader str

OpenML user ID of the uploader. Filled in by the server.

None
upload_date str

Date the flow was uploaded. Filled in by the server.

None
flow_id int

Flow ID. Assigned by the server.

None
extension Extension

The extension for a flow (e.g., sklearn).

None
version str

OpenML version of the flow. Assigned by the server.

None
Source code in openml/flows/flow.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
class OpenMLFlow(OpenMLBase):
    """OpenML Flow. Stores machine learning models.

    Flows should not be generated manually, but by the function
    :meth:`openml.flows.create_flow_from_model`. Using this helper function
    ensures that all relevant fields are filled in.

    Implements `openml.implementation.upload.xsd
    <https://github.com/openml/openml/blob/master/openml_OS/views/pages/api_new/v1/xsd/
    openml.implementation.upload.xsd>`_.

    Parameters
    ----------
    name : str
        Name of the flow. Is used together with the attribute
        `external_version` as a unique identifier of the flow.
    description : str
        Human-readable description of the flow (free text).
    model : object
        ML model which is described by this flow.
    components : OrderedDict
        Mapping from component identifier to an OpenMLFlow object. Components
        are usually subfunctions of an algorithm (e.g. kernels), base learners
        in ensemble algorithms (decision tree in adaboost) or building blocks
        of a machine learning pipeline. Components are modeled as independent
        flows and can be shared between flows (different pipelines can use
        the same components).
    parameters : OrderedDict
        Mapping from parameter name to the parameter default value. The
        parameter default value must be of type `str`, so that the respective
        toolbox plugin can take care of casting the parameter default value to
        the correct type.
    parameters_meta_info : OrderedDict
        Mapping from parameter name to `dict`. Stores additional information
        for each parameter. Required keys are `data_type` and `description`.
    external_version : str
        Version number of the software the flow is implemented in. Is used
        together with the attribute `name` as a uniquer identifier of the flow.
    tags : list
        List of tags. Created on the server by other API calls.
    language : str
        Natural language the flow is described in (not the programming
        language).
    dependencies : str
        A list of dependencies necessary to run the flow. This field should
        contain all libraries the flow depends on. To allow reproducibility
        it should also specify the exact version numbers.
    class_name : str, optional
        The development language name of the class which is described by this
        flow.
    custom_name : str, optional
        Custom name of the flow given by the owner.
    binary_url : str, optional
        Url from which the binary can be downloaded. Added by the server.
        Ignored when uploaded manually. Will not be used by the python API
        because binaries aren't compatible across machines.
    binary_format : str, optional
        Format in which the binary code was uploaded. Will not be used by the
        python API because binaries aren't compatible across machines.
    binary_md5 : str, optional
        MD5 checksum to check if the binary code was correctly downloaded. Will
        not be used by the python API because binaries aren't compatible across
        machines.
    uploader : str, optional
        OpenML user ID of the uploader. Filled in by the server.
    upload_date : str, optional
        Date the flow was uploaded. Filled in by the server.
    flow_id : int, optional
        Flow ID. Assigned by the server.
    extension : Extension, optional
        The extension for a flow (e.g., sklearn).
    version : str, optional
        OpenML version of the flow. Assigned by the server.
    """

    def __init__(  # noqa: PLR0913
        self,
        name: str,
        description: str,
        model: object,
        components: dict,
        parameters: dict,
        parameters_meta_info: dict,
        external_version: str,
        tags: list,
        language: str,
        dependencies: str,
        class_name: str | None = None,
        custom_name: str | None = None,
        binary_url: str | None = None,
        binary_format: str | None = None,
        binary_md5: str | None = None,
        uploader: str | None = None,
        upload_date: str | None = None,
        flow_id: int | None = None,
        extension: Extension | None = None,
        version: str | None = None,
    ):
        self.name = name
        self.description = description
        self.model = model

        for variable, variable_name in [
            [components, "components"],
            [parameters, "parameters"],
            [parameters_meta_info, "parameters_meta_info"],
        ]:
            if not isinstance(variable, (OrderedDict, dict)):
                raise TypeError(
                    f"{variable_name} must be of type OrderedDict or dict, "
                    f"but is {type(variable)}.",
                )

        self.components = components
        self.parameters = parameters
        self.parameters_meta_info = parameters_meta_info
        self.class_name = class_name

        keys_parameters = set(parameters.keys())
        keys_parameters_meta_info = set(parameters_meta_info.keys())
        if len(keys_parameters.difference(keys_parameters_meta_info)) > 0:
            raise ValueError(
                "Parameter %s only in parameters, but not in "
                "parameters_meta_info."
                % str(keys_parameters.difference(keys_parameters_meta_info)),
            )
        if len(keys_parameters_meta_info.difference(keys_parameters)) > 0:
            raise ValueError(
                "Parameter %s only in parameters_meta_info, "
                "but not in parameters."
                % str(keys_parameters_meta_info.difference(keys_parameters)),
            )

        self.external_version = external_version
        self.uploader = uploader

        self.custom_name = custom_name
        self.tags = tags if tags is not None else []
        self.binary_url = binary_url
        self.binary_format = binary_format
        self.binary_md5 = binary_md5
        self.version = version
        self.upload_date = upload_date
        self.language = language
        self.dependencies = dependencies
        self.flow_id = flow_id
        if extension is None:
            self._extension = get_extension_by_flow(self)
        else:
            self._extension = extension

    @property
    def id(self) -> int | None:
        """The ID of the flow."""
        return self.flow_id

    @property
    def extension(self) -> Extension:
        """The extension of the flow (e.g., sklearn)."""
        if self._extension is not None:
            return self._extension

        raise RuntimeError(
            f"No extension could be found for flow {self.flow_id}: {self.name}",
        )

    def _get_repr_body_fields(self) -> Sequence[tuple[str, str | int | list[str]]]:
        """Collect all information to display in the __repr__ body."""
        fields = {
            "Flow Name": self.name,
            "Flow Description": self.description,
            "Dependencies": self.dependencies,
        }
        if self.flow_id is not None:
            fields["Flow URL"] = self.openml_url if self.openml_url is not None else "None"
            fields["Flow ID"] = str(self.flow_id)
            if self.version is not None:
                fields["Flow ID"] += f" (version {self.version})"
        if self.upload_date is not None:
            fields["Upload Date"] = self.upload_date.replace("T", " ")
        if self.binary_url is not None:
            fields["Binary URL"] = self.binary_url

        # determines the order in which the information will be printed
        order = [
            "Flow ID",
            "Flow URL",
            "Flow Name",
            "Flow Description",
            "Binary URL",
            "Upload Date",
            "Dependencies",
        ]
        return [(key, fields[key]) for key in order if key in fields]

    def _to_dict(self) -> dict[str, dict]:  # noqa: C901, PLR0912
        """Creates a dictionary representation of self."""
        flow_container = OrderedDict()  # type: 'dict[str, dict]'
        flow_dict = OrderedDict(
            [("@xmlns:oml", "http://openml.org/openml")],
        )  # type: 'dict[str, list | str]'  # E501
        flow_container["oml:flow"] = flow_dict
        _add_if_nonempty(flow_dict, "oml:id", self.flow_id)

        for required in ["name", "external_version"]:
            if getattr(self, required) is None:
                raise ValueError(f"self.{required} is required but None")
        for attribute in [
            "uploader",
            "name",
            "custom_name",
            "class_name",
            "version",
            "external_version",
            "description",
            "upload_date",
            "language",
            "dependencies",
        ]:
            _add_if_nonempty(flow_dict, f"oml:{attribute}", getattr(self, attribute))

        if not self.description:
            logger = logging.getLogger(__name__)
            logger.warning("Flow % has empty description", self.name)

        flow_parameters = []
        for key in self.parameters:
            param_dict = OrderedDict()  # type: 'OrderedDict[str, str]'
            param_dict["oml:name"] = key
            meta_info = self.parameters_meta_info[key]

            _add_if_nonempty(param_dict, "oml:data_type", meta_info["data_type"])
            param_dict["oml:default_value"] = self.parameters[key]
            _add_if_nonempty(param_dict, "oml:description", meta_info["description"])

            for key_, value in param_dict.items():
                if key_ is not None and not isinstance(key_, str):
                    raise ValueError(
                        f"Parameter name {key_} cannot be serialized "
                        f"because it is of type {type(key_)}. Only strings "
                        "can be serialized.",
                    )
                if value is not None and not isinstance(value, str):
                    raise ValueError(
                        f"Parameter value {value} cannot be serialized "
                        f"because it is of type {type(value)}. Only strings "
                        "can be serialized.",
                    )

            flow_parameters.append(param_dict)

        flow_dict["oml:parameter"] = flow_parameters

        components = []
        for key in self.components:
            component_dict = OrderedDict()  # type: 'OrderedDict[str, dict]'
            component_dict["oml:identifier"] = key
            if self.components[key] in ["passthrough", "drop"]:
                component_dict["oml:flow"] = {
                    "oml-python:serialized_object": "component_reference",
                    "value": {"key": self.components[key], "step_name": self.components[key]},
                }
            else:
                component_dict["oml:flow"] = self.components[key]._to_dict()["oml:flow"]

            for key_ in component_dict:
                # We only need to check if the key is a string, because the
                # value is a flow. The flow itself is valid by recursion
                if key_ is not None and not isinstance(key_, str):
                    raise ValueError(
                        f"Parameter name {key_} cannot be serialized "
                        f"because it is of type {type(key_)}. Only strings "
                        "can be serialized.",
                    )

            components.append(component_dict)

        flow_dict["oml:component"] = components
        flow_dict["oml:tag"] = self.tags
        for attribute in ["binary_url", "binary_format", "binary_md5"]:
            _add_if_nonempty(flow_dict, f"oml:{attribute}", getattr(self, attribute))

        return flow_container

    @classmethod
    def _from_dict(cls, xml_dict: dict) -> OpenMLFlow:
        """Create a flow from an xml description.

        Calls itself recursively to create :class:`OpenMLFlow` objects of
        subflows (components).

        XML definition of a flow is available at
        https://github.com/openml/OpenML/blob/master/openml_OS/views/pages/api_new/v1/xsd/openml.implementation.upload.xsd

        Parameters
        ----------
        xml_dict : dict
            Dictionary representation of the flow as created by _to_dict()

        Returns
        -------
            OpenMLFlow

        """  # E501
        arguments = OrderedDict()
        dic = xml_dict["oml:flow"]

        # Mandatory parts in the xml file
        for key in ["name"]:
            arguments[key] = dic["oml:" + key]

        # non-mandatory parts in the xml file
        for key in [
            "external_version",
            "uploader",
            "description",
            "upload_date",
            "language",
            "dependencies",
            "version",
            "binary_url",
            "binary_format",
            "binary_md5",
            "class_name",
            "custom_name",
        ]:
            arguments[key] = dic.get("oml:" + key)

        # has to be converted to an int if present and cannot parsed in the
        # two loops above
        arguments["flow_id"] = int(dic["oml:id"]) if dic.get("oml:id") is not None else None

        # Now parse parts of a flow which can occur multiple times like
        # parameters, components (subflows) and tags. These can't be tackled
        # in the loops above because xmltodict returns a dict if such an
        # entity occurs once, and a list if it occurs multiple times.
        # Furthermore, they must be treated differently, for example
        # for components this method is called recursively and
        # for parameters the actual information is split into two dictionaries
        # for easier access in python.

        parameters = OrderedDict()
        parameters_meta_info = OrderedDict()
        if "oml:parameter" in dic:
            # In case of a single parameter, xmltodict returns a dictionary,
            # otherwise a list.
            oml_parameters = extract_xml_tags("oml:parameter", dic, allow_none=False)

            for oml_parameter in oml_parameters:
                parameter_name = oml_parameter["oml:name"]
                default_value = oml_parameter["oml:default_value"]
                parameters[parameter_name] = default_value

                meta_info = OrderedDict()
                meta_info["description"] = oml_parameter.get("oml:description")
                meta_info["data_type"] = oml_parameter.get("oml:data_type")
                parameters_meta_info[parameter_name] = meta_info
        arguments["parameters"] = parameters
        arguments["parameters_meta_info"] = parameters_meta_info

        components = OrderedDict()
        if "oml:component" in dic:
            # In case of a single component xmltodict returns a dict,
            # otherwise a list.
            oml_components = extract_xml_tags("oml:component", dic, allow_none=False)

            for component in oml_components:
                flow = OpenMLFlow._from_dict(component)
                components[component["oml:identifier"]] = flow
        arguments["components"] = components
        arguments["tags"] = extract_xml_tags("oml:tag", dic)

        arguments["model"] = None
        return cls(**arguments)

    def to_filesystem(self, output_directory: str | Path) -> None:
        """Write a flow to the filesystem as XML to output_directory."""
        output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)

        output_path = output_directory / "flow.xml"
        if output_path.exists():
            raise ValueError("Output directory already contains a flow.xml file.")

        run_xml = self._to_xml()
        with output_path.open("w") as f:
            f.write(run_xml)

    @classmethod
    def from_filesystem(cls, input_directory: str | Path) -> OpenMLFlow:
        """Read a flow from an XML in input_directory on the filesystem."""
        input_directory = Path(input_directory) / "flow.xml"
        with input_directory.open() as f:
            xml_string = f.read()
        return OpenMLFlow._from_dict(xmltodict.parse(xml_string))

    def _parse_publish_response(self, xml_response: dict) -> None:
        """Parse the id from the xml_response and assign it to self."""
        self.flow_id = int(xml_response["oml:upload_flow"]["oml:id"])

    def publish(self, raise_error_if_exists: bool = False) -> OpenMLFlow:  # noqa: FBT001, FBT002
        """Publish this flow to OpenML server.

        Raises a PyOpenMLError if the flow exists on the server, but
        `self.flow_id` does not match the server known flow id.

        Parameters
        ----------
        raise_error_if_exists : bool, optional (default=False)
            If True, raise PyOpenMLError if the flow exists on the server.
            If False, update the local flow to match the server flow.

        Returns
        -------
        self : OpenMLFlow

        """
        # Import at top not possible because of cyclic dependencies. In
        # particular, flow.py tries to import functions.py in order to call
        # get_flow(), while functions.py tries to import flow.py in order to
        # instantiate an OpenMLFlow.
        import openml.flows.functions

        flow_id = openml.flows.functions.flow_exists(self.name, self.external_version)
        if not flow_id:
            if self.flow_id:
                raise openml.exceptions.PyOpenMLError(
                    "Flow does not exist on the server, " "but 'flow.flow_id' is not None.",
                )
            super().publish()
            assert self.flow_id is not None  # for mypy
            flow_id = self.flow_id
        elif raise_error_if_exists:
            error_message = f"This OpenMLFlow already exists with id: {flow_id}."
            raise openml.exceptions.PyOpenMLError(error_message)
        elif self.flow_id is not None and self.flow_id != flow_id:
            raise openml.exceptions.PyOpenMLError(
                "Local flow_id does not match server flow_id: " f"'{self.flow_id}' vs '{flow_id}'",
            )

        flow = openml.flows.functions.get_flow(flow_id)
        _copy_server_fields(flow, self)
        try:
            openml.flows.functions.assert_flows_equal(
                self,
                flow,
                flow.upload_date,
                ignore_parameter_values=True,
                ignore_custom_name_if_none=True,
            )
        except ValueError as e:
            message = e.args[0]
            raise ValueError(
                "The flow on the server is inconsistent with the local flow. "
                f"The server flow ID is {flow_id}. Please check manually and remove "
                f"the flow if necessary! Error is:\n'{message}'",
            ) from e
        return self

    def get_structure(self, key_item: str) -> dict[str, list[str]]:
        """
        Returns for each sub-component of the flow the path of identifiers
        that should be traversed to reach this component. The resulting dict
        maps a key (identifying a flow by either its id, name or fullname) to
        the parameter prefix.

        Parameters
        ----------
        key_item: str
            The flow attribute that will be used to identify flows in the
            structure. Allowed values {flow_id, name}

        Returns
        -------
        dict[str, List[str]]
            The flow structure
        """
        if key_item not in ["flow_id", "name"]:
            raise ValueError("key_item should be in {flow_id, name}")
        structure = {}
        for key, sub_flow in self.components.items():
            sub_structure = sub_flow.get_structure(key_item)
            for flow_name, flow_sub_structure in sub_structure.items():
                structure[flow_name] = [key, *flow_sub_structure]
        structure[getattr(self, key_item)] = []
        return structure

    def get_subflow(self, structure: list[str]) -> OpenMLFlow:
        """
        Returns a subflow from the tree of dependencies.

        Parameters
        ----------
        structure: list[str]
            A list of strings, indicating the location of the subflow

        Returns
        -------
        OpenMLFlow
            The OpenMLFlow that corresponds to the structure
        """
        # make a copy of structure, as we don't want to change it in the
        # outer scope
        structure = list(structure)
        if len(structure) < 1:
            raise ValueError("Please provide a structure list of size >= 1")
        sub_identifier = structure[0]
        if sub_identifier not in self.components:
            raise ValueError(
                f"Flow {self.name} does not contain component with " f"identifier {sub_identifier}",
            )
        if len(structure) == 1:
            return self.components[sub_identifier]  # type: ignore

        structure.pop(0)
        return self.components[sub_identifier].get_subflow(structure)  # type: ignore

extension: Extension property

The extension of the flow (e.g., sklearn).

id: int | None property

The ID of the flow.

from_filesystem(input_directory) classmethod

Read a flow from an XML in input_directory on the filesystem.

Source code in openml/flows/flow.py
@classmethod
def from_filesystem(cls, input_directory: str | Path) -> OpenMLFlow:
    """Read a flow from an XML in input_directory on the filesystem."""
    input_directory = Path(input_directory) / "flow.xml"
    with input_directory.open() as f:
        xml_string = f.read()
    return OpenMLFlow._from_dict(xmltodict.parse(xml_string))

get_structure(key_item)

Returns for each sub-component of the flow the path of identifiers that should be traversed to reach this component. The resulting dict maps a key (identifying a flow by either its id, name or fullname) to the parameter prefix.

Parameters:

Name Type Description Default
key_item str

The flow attribute that will be used to identify flows in the structure. Allowed values {flow_id, name}

required

Returns:

Type Description
dict[str, List[str]]

The flow structure

Source code in openml/flows/flow.py
def get_structure(self, key_item: str) -> dict[str, list[str]]:
    """
    Returns for each sub-component of the flow the path of identifiers
    that should be traversed to reach this component. The resulting dict
    maps a key (identifying a flow by either its id, name or fullname) to
    the parameter prefix.

    Parameters
    ----------
    key_item: str
        The flow attribute that will be used to identify flows in the
        structure. Allowed values {flow_id, name}

    Returns
    -------
    dict[str, List[str]]
        The flow structure
    """
    if key_item not in ["flow_id", "name"]:
        raise ValueError("key_item should be in {flow_id, name}")
    structure = {}
    for key, sub_flow in self.components.items():
        sub_structure = sub_flow.get_structure(key_item)
        for flow_name, flow_sub_structure in sub_structure.items():
            structure[flow_name] = [key, *flow_sub_structure]
    structure[getattr(self, key_item)] = []
    return structure

get_subflow(structure)

Returns a subflow from the tree of dependencies.

Parameters:

Name Type Description Default
structure list[str]

A list of strings, indicating the location of the subflow

required

Returns:

Type Description
OpenMLFlow

The OpenMLFlow that corresponds to the structure

Source code in openml/flows/flow.py
def get_subflow(self, structure: list[str]) -> OpenMLFlow:
    """
    Returns a subflow from the tree of dependencies.

    Parameters
    ----------
    structure: list[str]
        A list of strings, indicating the location of the subflow

    Returns
    -------
    OpenMLFlow
        The OpenMLFlow that corresponds to the structure
    """
    # make a copy of structure, as we don't want to change it in the
    # outer scope
    structure = list(structure)
    if len(structure) < 1:
        raise ValueError("Please provide a structure list of size >= 1")
    sub_identifier = structure[0]
    if sub_identifier not in self.components:
        raise ValueError(
            f"Flow {self.name} does not contain component with " f"identifier {sub_identifier}",
        )
    if len(structure) == 1:
        return self.components[sub_identifier]  # type: ignore

    structure.pop(0)
    return self.components[sub_identifier].get_subflow(structure)  # type: ignore

publish(raise_error_if_exists=False)

Publish this flow to OpenML server.

Raises a PyOpenMLError if the flow exists on the server, but self.flow_id does not match the server known flow id.

Parameters:

Name Type Description Default
raise_error_if_exists (bool, optional(default=False))

If True, raise PyOpenMLError if the flow exists on the server. If False, update the local flow to match the server flow.

False

Returns:

Name Type Description
self OpenMLFlow
Source code in openml/flows/flow.py
def publish(self, raise_error_if_exists: bool = False) -> OpenMLFlow:  # noqa: FBT001, FBT002
    """Publish this flow to OpenML server.

    Raises a PyOpenMLError if the flow exists on the server, but
    `self.flow_id` does not match the server known flow id.

    Parameters
    ----------
    raise_error_if_exists : bool, optional (default=False)
        If True, raise PyOpenMLError if the flow exists on the server.
        If False, update the local flow to match the server flow.

    Returns
    -------
    self : OpenMLFlow

    """
    # Import at top not possible because of cyclic dependencies. In
    # particular, flow.py tries to import functions.py in order to call
    # get_flow(), while functions.py tries to import flow.py in order to
    # instantiate an OpenMLFlow.
    import openml.flows.functions

    flow_id = openml.flows.functions.flow_exists(self.name, self.external_version)
    if not flow_id:
        if self.flow_id:
            raise openml.exceptions.PyOpenMLError(
                "Flow does not exist on the server, " "but 'flow.flow_id' is not None.",
            )
        super().publish()
        assert self.flow_id is not None  # for mypy
        flow_id = self.flow_id
    elif raise_error_if_exists:
        error_message = f"This OpenMLFlow already exists with id: {flow_id}."
        raise openml.exceptions.PyOpenMLError(error_message)
    elif self.flow_id is not None and self.flow_id != flow_id:
        raise openml.exceptions.PyOpenMLError(
            "Local flow_id does not match server flow_id: " f"'{self.flow_id}' vs '{flow_id}'",
        )

    flow = openml.flows.functions.get_flow(flow_id)
    _copy_server_fields(flow, self)
    try:
        openml.flows.functions.assert_flows_equal(
            self,
            flow,
            flow.upload_date,
            ignore_parameter_values=True,
            ignore_custom_name_if_none=True,
        )
    except ValueError as e:
        message = e.args[0]
        raise ValueError(
            "The flow on the server is inconsistent with the local flow. "
            f"The server flow ID is {flow_id}. Please check manually and remove "
            f"the flow if necessary! Error is:\n'{message}'",
        ) from e
    return self

to_filesystem(output_directory)

Write a flow to the filesystem as XML to output_directory.

Source code in openml/flows/flow.py
def to_filesystem(self, output_directory: str | Path) -> None:
    """Write a flow to the filesystem as XML to output_directory."""
    output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)

    output_path = output_directory / "flow.xml"
    if output_path.exists():
        raise ValueError("Output directory already contains a flow.xml file.")

    run_xml = self._to_xml()
    with output_path.open("w") as f:
        f.write(run_xml)

assert_flows_equal(flow1, flow2, ignore_parameter_values_on_older_children=None, ignore_parameter_values=False, ignore_custom_name_if_none=False, check_description=True)

Check equality of two flows.

Two flows are equal if their all keys which are not set by the server are equal, as well as all their parameters and components.

Parameters:

Name Type Description Default
flow1 OpenMLFlow
required
flow2 OpenMLFlow
required
ignore_parameter_values_on_older_children str(optional)

If set to OpenMLFlow.upload_date, ignores parameters in a child flow if it's upload date predates the upload date of the parent flow.

None
ignore_parameter_values bool

Whether to ignore parameter values when comparing flows.

False
ignore_custom_name_if_none bool

Whether to ignore the custom name field if either flow has custom_name equal to None.

False
check_description bool

Whether to ignore matching of flow descriptions.

True
Source code in openml/flows/functions.py
def assert_flows_equal(  # noqa: C901, PLR0912, PLR0913, PLR0915
    flow1: OpenMLFlow,
    flow2: OpenMLFlow,
    ignore_parameter_values_on_older_children: str | None = None,
    ignore_parameter_values: bool = False,  # noqa: FBT001, FBT002
    ignore_custom_name_if_none: bool = False,  # noqa:  FBT001, FBT002
    check_description: bool = True,  # noqa:  FBT001, FBT002
) -> None:
    """Check equality of two flows.

    Two flows are equal if their all keys which are not set by the server
    are equal, as well as all their parameters and components.

    Parameters
    ----------
    flow1 : OpenMLFlow

    flow2 : OpenMLFlow

    ignore_parameter_values_on_older_children : str (optional)
        If set to ``OpenMLFlow.upload_date``, ignores parameters in a child
        flow if it's upload date predates the upload date of the parent flow.

    ignore_parameter_values : bool
        Whether to ignore parameter values when comparing flows.

    ignore_custom_name_if_none : bool
        Whether to ignore the custom name field if either flow has `custom_name` equal to `None`.

    check_description : bool
        Whether to ignore matching of flow descriptions.
    """
    if not isinstance(flow1, OpenMLFlow):
        raise TypeError("Argument 1 must be of type OpenMLFlow, but is %s" % type(flow1))

    if not isinstance(flow2, OpenMLFlow):
        raise TypeError("Argument 2 must be of type OpenMLFlow, but is %s" % type(flow2))

    # TODO as they are actually now saved during publish, it might be good to
    # check for the equality of these as well.
    generated_by_the_server = [
        "flow_id",
        "uploader",
        "version",
        "upload_date",
        # Tags aren't directly created by the server,
        # but the uploader has no control over them!
        "tags",
    ]
    ignored_by_python_api = ["binary_url", "binary_format", "binary_md5", "model", "_entity_id"]

    for key in set(flow1.__dict__.keys()).union(flow2.__dict__.keys()):
        if key in generated_by_the_server + ignored_by_python_api:
            continue
        attr1 = getattr(flow1, key, None)
        attr2 = getattr(flow2, key, None)
        if key == "components":
            if not (isinstance(attr1, Dict) and isinstance(attr2, Dict)):
                raise TypeError("Cannot compare components because they are not dictionary.")

            for name in set(attr1.keys()).union(attr2.keys()):
                if name not in attr1:
                    raise ValueError(
                        "Component %s only available in " "argument2, but not in argument1." % name,
                    )
                if name not in attr2:
                    raise ValueError(
                        "Component %s only available in " "argument2, but not in argument1." % name,
                    )
                assert_flows_equal(
                    attr1[name],
                    attr2[name],
                    ignore_parameter_values_on_older_children,
                    ignore_parameter_values,
                    ignore_custom_name_if_none,
                )
        elif key == "_extension":
            continue
        elif check_description and key == "description":
            # to ignore matching of descriptions since sklearn based flows may have
            # altering docstrings and is not guaranteed to be consistent
            continue
        else:
            if key == "parameters":
                if ignore_parameter_values or ignore_parameter_values_on_older_children:
                    params_flow_1 = set(flow1.parameters.keys())
                    params_flow_2 = set(flow2.parameters.keys())
                    symmetric_difference = params_flow_1 ^ params_flow_2
                    if len(symmetric_difference) > 0:
                        raise ValueError(
                            "Flow %s: parameter set of flow "
                            "differs from the parameters stored "
                            "on the server." % flow1.name,
                        )

                if ignore_parameter_values_on_older_children:
                    assert (
                        flow1.upload_date is not None
                    ), "Flow1 has no upload date that allows us to compare age of children."
                    upload_date_current_flow = dateutil.parser.parse(flow1.upload_date)
                    upload_date_parent_flow = dateutil.parser.parse(
                        ignore_parameter_values_on_older_children,
                    )
                    if upload_date_current_flow < upload_date_parent_flow:
                        continue

                if ignore_parameter_values:
                    # Continue needs to be done here as the first if
                    # statement triggers in both special cases
                    continue
            elif (
                key == "custom_name"
                and ignore_custom_name_if_none
                and (attr1 is None or attr2 is None)
            ):
                # If specified, we allow `custom_name` inequality if one flow's name is None.
                # Helps with backwards compatibility as `custom_name` is now auto-generated, but
                # before it used to be `None`.
                continue
            elif key == "parameters_meta_info":
                # this value is a dictionary where each key is a parameter name, containing another
                # dictionary with keys specifying the parameter's 'description' and 'data_type'
                # checking parameter descriptions can be ignored since that might change
                # data type check can also be ignored if one of them is not defined, i.e., None
                params1 = set(flow1.parameters_meta_info)
                params2 = set(flow2.parameters_meta_info)
                if params1 != params2:
                    raise ValueError(
                        "Parameter list in meta info for parameters differ " "in the two flows.",
                    )
                # iterating over the parameter's meta info list
                for param in params1:
                    if (
                        isinstance(flow1.parameters_meta_info[param], Dict)
                        and isinstance(flow2.parameters_meta_info[param], Dict)
                        and "data_type" in flow1.parameters_meta_info[param]
                        and "data_type" in flow2.parameters_meta_info[param]
                    ):
                        value1 = flow1.parameters_meta_info[param]["data_type"]
                        value2 = flow2.parameters_meta_info[param]["data_type"]
                    else:
                        value1 = flow1.parameters_meta_info[param]
                        value2 = flow2.parameters_meta_info[param]
                    if value1 is None or value2 is None:
                        continue

                    if value1 != value2:
                        raise ValueError(
                            f"Flow {flow1.name}: data type for parameter {param} in {key} differ "
                            f"as {value1}\nvs\n{value2}",
                        )
                # the continue is to avoid the 'attr != attr2' check at end of function
                continue

            if attr1 != attr2:
                raise ValueError(
                    f"Flow {flow1.name!s}: values for attribute '{key!s}' differ: "
                    f"'{attr1!s}'\nvs\n'{attr2!s}'.",
                )

delete_flow(flow_id)

Delete flow with id flow_id from the OpenML server.

You can only delete flows which you uploaded and which which are not linked to runs.

Parameters:

Name Type Description Default
flow_id int

OpenML id of the flow

required

Returns:

Type Description
bool

True if the deletion was successful. False otherwise.

Source code in openml/flows/functions.py
def delete_flow(flow_id: int) -> bool:
    """Delete flow with id `flow_id` from the OpenML server.

    You can only delete flows which you uploaded and which
    which are not linked to runs.

    Parameters
    ----------
    flow_id : int
        OpenML id of the flow

    Returns
    -------
    bool
        True if the deletion was successful. False otherwise.
    """
    return openml.utils._delete_entity("flow", flow_id)

flow_exists(name, external_version)

Retrieves the flow id.

A flow is uniquely identified by name + external_version.

Parameters:

Name Type Description Default
name string

Name of the flow

required
external_version string

Version information associated with flow.

required

Returns:

Name Type Description
flow_exist int or bool

flow id iff exists, False otherwise

Notes

see https://www.openml.org/api_docs/#!/flow/get_flow_exists_name_version

Source code in openml/flows/functions.py
def flow_exists(name: str, external_version: str) -> int | bool:
    """Retrieves the flow id.

    A flow is uniquely identified by name + external_version.

    Parameters
    ----------
    name : string
        Name of the flow
    external_version : string
        Version information associated with flow.

    Returns
    -------
    flow_exist : int or bool
        flow id iff exists, False otherwise

    Notes
    -----
    see https://www.openml.org/api_docs/#!/flow/get_flow_exists_name_version
    """
    if not (isinstance(name, str) and len(name) > 0):
        raise ValueError("Argument 'name' should be a non-empty string")
    if not (isinstance(name, str) and len(external_version) > 0):
        raise ValueError("Argument 'version' should be a non-empty string")

    xml_response = openml._api_calls._perform_api_call(
        "flow/exists",
        "post",
        data={"name": name, "external_version": external_version},
    )

    result_dict = xmltodict.parse(xml_response)
    flow_id = int(result_dict["oml:flow_exists"]["oml:id"])
    return flow_id if flow_id > 0 else False

get_flow(flow_id, reinstantiate=False, strict_version=True)

Download the OpenML flow for a given flow ID.

Parameters:

Name Type Description Default
flow_id int

The OpenML flow id.

required
reinstantiate bool

Whether to reinstantiate the flow to a model instance.

False
strict_version bool

Whether to fail if version requirements are not fulfilled.

True

Returns:

Name Type Description
flow OpenMLFlow

the flow

Source code in openml/flows/functions.py
@openml.utils.thread_safe_if_oslo_installed
def get_flow(flow_id: int, reinstantiate: bool = False, strict_version: bool = True) -> OpenMLFlow:  # noqa: FBT001, FBT002
    """Download the OpenML flow for a given flow ID.

    Parameters
    ----------
    flow_id : int
        The OpenML flow id.

    reinstantiate: bool
        Whether to reinstantiate the flow to a model instance.

    strict_version : bool, default=True
        Whether to fail if version requirements are not fulfilled.

    Returns
    -------
    flow : OpenMLFlow
        the flow
    """
    flow_id = int(flow_id)
    flow = _get_flow_description(flow_id)

    if reinstantiate:
        flow.model = flow.extension.flow_to_model(flow, strict_version=strict_version)
        if not strict_version:
            # check if we need to return a new flow b/c of version mismatch
            new_flow = flow.extension.model_to_flow(flow.model)
            if new_flow.dependencies != flow.dependencies:
                return new_flow
    return flow

get_flow_id(model=None, name=None, exact_version=True)

Retrieves the flow id for a model or a flow name.

Provide either a model or a name to this function. Depending on the input, it does

  • model and exact_version == True: This helper function first queries for the necessary extension. Second, it uses that extension to convert the model into a flow. Third, it executes flow_exists to potentially obtain the flow id the flow is published to the server.
  • model and exact_version == False: This helper function first queries for the necessary extension. Second, it uses that extension to convert the model into a flow. Third it calls list_flows and filters the returned values based on the flow name.
  • name: Ignores exact_version and calls list_flows, then filters the returned values based on the flow name.

Parameters:

Name Type Description Default
model object

Any model. Must provide either model or name.

None
name str

Name of the flow. Must provide either model or name.

None
exact_version bool

Whether to return the flow id of the exact version or all flow ids where the name of the flow matches. This is only taken into account for a model where a version number is available (requires model to be set).

True

Returns:

Type Description
(int or bool, List)

flow id iff exists, False otherwise, List if exact_version is False

Source code in openml/flows/functions.py
def get_flow_id(
    model: Any | None = None,
    name: str | None = None,
    exact_version: bool = True,  # noqa: FBT001, FBT002
) -> int | bool | list[int]:
    """Retrieves the flow id for a model or a flow name.

    Provide either a model or a name to this function. Depending on the input, it does

    * ``model`` and ``exact_version == True``: This helper function first queries for the necessary
      extension. Second, it uses that extension to convert the model into a flow. Third, it
      executes ``flow_exists`` to potentially obtain the flow id the flow is published to the
      server.
    * ``model`` and ``exact_version == False``: This helper function first queries for the
      necessary extension. Second, it uses that extension to convert the model into a flow. Third
      it calls ``list_flows`` and filters the returned values based on the flow name.
    * ``name``: Ignores ``exact_version`` and calls ``list_flows``, then filters the returned
      values based on the flow name.

    Parameters
    ----------
    model : object
        Any model. Must provide either ``model`` or ``name``.
    name : str
        Name of the flow. Must provide either ``model`` or ``name``.
    exact_version : bool
        Whether to return the flow id of the exact version or all flow ids where the name
        of the flow matches. This is only taken into account for a model where a version number
        is available (requires ``model`` to be set).

    Returns
    -------
    int or bool, List
        flow id iff exists, ``False`` otherwise, List if ``exact_version is False``
    """
    if model is not None and name is not None:
        raise ValueError("Must provide either argument `model` or argument `name`, but not both.")

    if model is not None:
        extension = openml.extensions.get_extension_by_model(model, raise_if_no_extension=True)
        if extension is None:
            # This should never happen and is only here to please mypy will be gone soon once the
            # whole function is removed
            raise TypeError(extension)
        flow = extension.model_to_flow(model)
        flow_name = flow.name
        external_version = flow.external_version
    elif name is not None:
        flow_name = name
        exact_version = False
        external_version = None
    else:
        raise ValueError(
            "Need to provide either argument `model` or argument `name`, but both are `None`."
        )

    if exact_version:
        if external_version is None:
            raise ValueError("exact_version should be False if model is None!")
        return flow_exists(name=flow_name, external_version=external_version)

    flows = list_flows(output_format="dataframe")
    assert isinstance(flows, pd.DataFrame)  # Make mypy happy
    flows = flows.query(f'name == "{flow_name}"')
    return flows["id"].to_list()  # type: ignore[no-any-return]

list_flows(offset=None, size=None, tag=None, output_format='dict', **kwargs)

list_flows(offset: int | None = ..., size: int | None = ..., tag: str | None = ..., output_format: Literal['dict'] = 'dict', **kwargs: Any) -> dict
list_flows(offset: int | None = ..., size: int | None = ..., tag: str | None = ..., *, output_format: Literal['dataframe'], **kwargs: Any) -> pd.DataFrame
list_flows(offset: int | None, size: int | None, tag: str | None, output_format: Literal['dataframe'], **kwargs: Any) -> pd.DataFrame

Return a list of all flows which are on OpenML. (Supports large amount of results)

Parameters:

Name Type Description Default
offset int

the number of flows to skip, starting from the first

None
size int

the maximum number of flows to return

None
tag str

the tag to include

None
output_format Literal['dict', 'dataframe']

The parameter decides the format of the output. - If 'dict' the output is a dict of dict - If 'dataframe' the output is a pandas DataFrame

'dict'
kwargs Any

Legal filter operators: uploader.

{}

Returns:

Name Type Description
flows dict of dicts, or dataframe
  • If output_format='dict' A mapping from flow_id to a dict giving a brief overview of the respective flow. Every flow is represented by a dictionary containing the following information:

    • flow id
    • full name
    • name
    • version
    • external version
    • uploader
  • If output_format='dataframe' Each row maps to a dataset Each column contains the following information:

    • flow id
    • full name
    • name
    • version
    • external version
    • uploader
Source code in openml/flows/functions.py
def list_flows(
    offset: int | None = None,
    size: int | None = None,
    tag: str | None = None,
    output_format: Literal["dict", "dataframe"] = "dict",
    **kwargs: Any,
) -> dict | pd.DataFrame:
    """
    Return a list of all flows which are on OpenML.
    (Supports large amount of results)

    Parameters
    ----------
    offset : int, optional
        the number of flows to skip, starting from the first
    size : int, optional
        the maximum number of flows to return
    tag : str, optional
        the tag to include
    output_format: str, optional (default='dict')
        The parameter decides the format of the output.
        - If 'dict' the output is a dict of dict
        - If 'dataframe' the output is a pandas DataFrame
    kwargs: dict, optional
        Legal filter operators: uploader.

    Returns
    -------
    flows : dict of dicts, or dataframe
        - If output_format='dict'
            A mapping from flow_id to a dict giving a brief overview of the
            respective flow.
            Every flow is represented by a dictionary containing
            the following information:
            - flow id
            - full name
            - name
            - version
            - external version
            - uploader

        - If output_format='dataframe'
            Each row maps to a dataset
            Each column contains the following information:
            - flow id
            - full name
            - name
            - version
            - external version
            - uploader
    """
    if output_format not in ["dataframe", "dict"]:
        raise ValueError(
            "Invalid output format selected. " "Only 'dict' or 'dataframe' applicable.",
        )

    # TODO: [0.15]
    if output_format == "dict":
        msg = (
            "Support for `output_format` of 'dict' will be removed in 0.15 "
            "and pandas dataframes will be returned instead. To ensure your code "
            "will continue to work, use `output_format`='dataframe'."
        )
        warnings.warn(msg, category=FutureWarning, stacklevel=2)

    return openml.utils._list_all(
        list_output_format=output_format,
        listing_call=_list_flows,
        offset=offset,
        size=size,
        tag=tag,
        **kwargs,
    )