Skip to content

api

User-facing APIs.

artifact_store()

Returns:

Type Description
LineaArtifactStore

An object of the class LineaArtifactStore that allows for printing and exporting artifacts metadata.

Source code in lineapy/api/api.py
364
365
366
367
368
369
370
371
372
373
374
def artifact_store() -> LineaArtifactStore:
    """
    Returns
    -------
    LineaArtifactStore
        An object of the class `LineaArtifactStore` that allows for printing and exporting artifacts metadata.
    """
    execution_context = get_context()
    cat = LineaArtifactStore(execution_context.executor.db)
    track(CatalogEvent(catalog_size=cat.len))
    return cat

delete(artifact_name, version)

Deletes an artifact from artifact store. If no other artifacts refer to the value, the value is also deleted from both the value node store and the pickle store.

Parameters:

Name Type Description Default
artifact_name str

Key used to while saving the artifact.

required
version Union[int, str]

Version number or "latest" or "all".

required

Raises:

Type Description
ValueError

If artifact not found or version invalid.

Source code in lineapy/api/api.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def delete(artifact_name: str, version: Union[int, str]) -> None:
    """
    Deletes an artifact from artifact store. If no other artifacts
    refer to the value, the value is also deleted from both the
    value node store and the pickle store.

    Parameters
    ----------
    artifact_name: str
        Key used to while saving the artifact.
    version: Union[int, str]
        Version number or "latest" or "all".

    Raises
    ------
    ValueError
        If artifact not found or version invalid.
    """
    version = parse_artifact_version(version)

    # get database instance
    execution_context = get_context()
    executor = execution_context.executor
    db = executor.db

    # if version is 'all' or 'latest', get_version is None
    get_version = None if isinstance(version, str) else version

    try:
        metadata = get(artifact_name, get_version).get_metadata()
    except UserException:
        raise NameError(
            f"{artifact_name}:{version} not found. Perhaps there was a typo. Please try lineapy.artifact_store() to inspect all your artifacts."
        )

    lineapy_metadata = metadata["lineapy"]
    node_id = lineapy_metadata.node_id
    execution_id = lineapy_metadata.execution_id

    db.delete_artifact_by_name(artifact_name, version=version)
    logging.info(f"Deleted Artifact: {artifact_name} version: {version}")
    try:
        db.delete_node_value_from_db(node_id, execution_id)
    except UserException:
        logging.info(
            f"Node: {node_id} with execution ID: {execution_id} not found in DB"
        )
    except ValueError:
        logging.debug(f"No valid storage path found for {node_id}")

    if lineapy_metadata.storage_backend == ARTIFACT_STORAGE_BACKEND.lineapy:
        storage_path = lineapy_metadata.storage_path
        pickled_path = (
            str(options.safe_get("artifact_storage_dir")).rstrip("/")
            + f"/{storage_path}"
        )
        with fsspec.open(pickled_path) as f:
            f.fs.delete(f.path)
    elif lineapy_metadata.storage_backend == ARTIFACT_STORAGE_BACKEND.mlflow:
        try:
            db.delete_mlflow_metadata_by_artifact_id(
                lineapy_metadata.artifact_id
            )
        except UserException:
            logging.info(
                f"Artifact id {lineapy_metadata.artifact_id} is not found in DB"
            )

get(artifact_name, version=None)

Gets an artifact from the DB.

Parameters:

Name Type Description Default
artifact_name str

Name of the artifact. Note that if you do not remember the artifact, you can use the artifact_store to browse the options.

required
version Optional[int]

Version of the artifact. If None, the latest version will be returned.

None

Returns:

Type Description
LineaArtifact

Returned value offers methods to access information we have stored about the artifact.

Source code in lineapy/api/api.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get(artifact_name: str, version: Optional[int] = None) -> LineaArtifact:
    """
    Gets an artifact from the DB.

    Parameters
    ----------
    artifact_name: str
        Name of the artifact. Note that if you do not remember the artifact,
        you can use the artifact_store to browse the options.
    version: Optional[str]
        Version of the artifact. If `None`, the latest version will be returned.

    Returns
    -------
    LineaArtifact
        Returned value offers methods to access
        information we have stored about the artifact.
    """
    validated_version = parse_artifact_version(
        "latest" if version is None else version
    )
    final_version = (
        validated_version if isinstance(validated_version, int) else None
    )

    execution_context = get_context()
    db = execution_context.executor.db
    artifactorm = db.get_artifactorm_by_name(artifact_name, final_version)
    linea_artifact = LineaArtifact(
        db=db,
        _artifact_id=artifactorm.id,
        _execution_id=artifactorm.execution_id,
        _node_id=artifactorm.node_id,
        _session_id=artifactorm.node.session_id,
        _version=artifactorm.version,  # type: ignore
        name=artifact_name,
        date_created=artifactorm.date_created,  # type: ignore
    )

    # Check version compatibility
    system_python_version = get_system_python_version()  # up to minor version
    artifact_python_version = db.get_session_context(
        linea_artifact._session_id
    ).python_version
    if system_python_version != artifact_python_version:
        warnings.warn(
            f"Current session runs on Python {system_python_version}, but the retrieved artifact was created on Python {artifact_python_version}. This may result in incompatibility issues."
        )

    track(GetEvent(version_specified=version is not None))
    return linea_artifact

get_function(artifacts, input_parameters=[], reuse_pre_computed_artifacts=[])

Extract the process that creates selected artifacts as a python function

Parameters:

Name Type Description Default
artifacts List[Union[str, Tuple[str, int]]]

List of artifact names (with optional version) to be included in the function return.

required
input_parameters List[str]

List of variable names to be used in the function arguments. Currently, only accept variable from literal assignment; such as a='123'. There should be only one literal assignment for each variable within all artifact calculation code. For instance, if both a='123' and a='abc' are existing in the code, we cannot specify a as input variables since it is confusing to specify which literal assignment we want to replace.

[]
reuse_pre_computed_artifacts List[Union[str, Tuple[str, int]]]

List of artifacts(name with optional version) for which we will use pre-computed values from the artifact store instead of recomputing from original code.

[]

Returns:

Type Description
Callable

A python function that takes input_parameters as args and returns a dictionary with each artifact name as the dictionary key and artifact value as the value.

Note that:

  1. If an input parameter is only used to calculate artifacts in the reuse_pre_computed_artifacts list, that input parameter will be passed around as a dummy variable. LineaPy will create a warning.
  2. If an artifact name has been saved multiple times within a session, multiple sessions or mutated. You might want to specify version number in artifacts or reuse_pre_computed_artifacts. The best practice to avoid searching artifact version is don't reuse artifact name in different notebooks and don't save same artifact multiple times within the same session.
Source code in lineapy/api/api.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def get_function(
    artifacts: List[Union[str, Tuple[str, int]]],
    input_parameters: List[str] = [],
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]] = [],
) -> Callable:
    """
    Extract the process that creates selected artifacts as a python function

    Parameters
    ----------
    artifacts: List[Union[str, Tuple[str, int]]]
        List of artifact names (with optional version) to be included in the
        function return.
    input_parameters: List[str]
        List of variable names to be used in the function arguments. Currently,
        only accept variable from literal assignment; such as a='123'. There
        should be only one literal assignment for each variable within all
        artifact calculation code. For instance, if both a='123' and a='abc'
        are existing in the code, we cannot specify a as input variables since
        it is confusing to specify which literal assignment we want to replace.
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]]
        List of artifacts(name with optional version) for which we will use
        pre-computed values from the artifact store instead of recomputing from
        original code.

    Returns
    -------
    Callable
        A python function that takes input_parameters as args and returns a
        dictionary with each artifact name as the dictionary key and artifact
        value as the value.

    Note that:

    1. If an input parameter is only used to calculate artifacts in the
        `reuse_pre_computed_artifacts` list, that input parameter will be
        passed around as a dummy variable. LineaPy will create a warning.
    2. If an artifact name has been saved multiple times within a session,
        multiple sessions or mutated. You might want to specify version
        number in `artifacts` or `reuse_pre_computed_artifacts`. The best
        practice to avoid searching artifact version is don't reuse artifact
        name in different notebooks and don't save same artifact multiple times
        within the same session.
    """
    execution_context = get_context()
    artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry) for art_entry in artifacts
    ]
    reuse_pre_computed_artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry)
        for art_entry in reuse_pre_computed_artifacts
    ]
    art_collection = ArtifactCollection(
        execution_context.executor.db,
        artifact_defs,
        input_parameters=input_parameters,
        reuse_pre_computed_artifacts=reuse_pre_computed_artifact_defs,
    )
    writer = BasePipelineWriter(art_collection)
    module = load_as_module(writer)
    return module.run_all_sessions

get_module_definition(artifacts, input_parameters=[], reuse_pre_computed_artifacts=[])

Create a python module that includes the definition of get_function().

Parameters:

Name Type Description Default
artifacts List[Union[str, Tuple[str, int]]]

Same as in get_function().

required
input_parameters List[str]

Same as in get_function().

[]
reuse_pre_computed_artifacts List[Union[str, Tuple[str, int]]]

Same as in get_function().

[]

Returns:

Type Description
str

A python module that includes the definition of get_function() as run_all_sessions.

Source code in lineapy/api/api.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def get_module_definition(
    artifacts: List[Union[str, Tuple[str, int]]],
    input_parameters: List[str] = [],
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]] = [],
) -> str:
    """
    Create a python module that includes the definition of [`get_function()`][lineapy.api.api.get_function].

    Parameters
    ----------
    artifacts: List[Union[str, Tuple[str, int]]]
        Same as in [`get_function()`][lineapy.api.api.get_function].
    input_parameters: List[str]
        Same as in [`get_function()`][lineapy.api.api.get_function].
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]]
        Same as in [`get_function()`][lineapy.api.api.get_function].

    Returns
    -------
    str
        A python module that includes the definition of [`get_function()`][lineapy.api.api.get_function]
        as `run_all_sessions`.
    """
    execution_context = get_context()
    artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry) for art_entry in artifacts
    ]
    reuse_pre_computed_artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry)
        for art_entry in reuse_pre_computed_artifacts
    ]
    art_collection = ArtifactCollection(
        execution_context.executor.db,
        artifact_defs,
        input_parameters=input_parameters,
        reuse_pre_computed_artifacts=reuse_pre_computed_artifact_defs,
    )
    writer = BasePipelineWriter(art_collection)
    return writer._compose_module()

reload()

Reloads lineapy context.

Note

Currently only reloads annotations but in the future can be a container for other items like configs, etc.

Source code in lineapy/api/api.py
351
352
353
354
355
356
357
358
359
360
361
def reload() -> None:
    """
    Reloads lineapy context.

    !!! note

        Currently only reloads annotations but in the future can be a container
        for other items like configs, etc.
    """
    execution_context = get_context()
    execution_context.executor.reload_annotations()

save(reference, name, storage_backend=None, **kwargs)

Publishes the object to the LineaPy DB.

Parameters:

Name Type Description Default
reference object

The reference could be a variable name, in which case LineaPy will save the value of the variable, with out default serialization mechanism. Alternatively, it could be a "side effect" reference, which currently includes either lineapy.file_system or lineapy.db. LineaPy will save the associated process that creates the final side effects. We are in the process of adding more side effect references, including assert statements.

required
name str

The name is used for later retrieving the artifact and creating new versions if an artifact of the name has been created before.

required
storage_backend Optional[ARTIFACT_STORAGE_BACKEND]

The storage backend used to save the artifact. Currently support lineapy and mlflow (for mlflow supported model flavors). In case of mlflow, lineapy will use mlflow.sklearn.log_model or other supported flavors equivalent to save artifacts into mlflow.

None
**kwargs

Keyword arguments passed into underlying storage mechanism to overwrite default behavior. For storage_backend='mlflow', this can overwrite default arguments in the mlflow.sklearn.log_model or other supported flavors equivalent.

{}

Returns:

Type Description
LineaArtifact

Returned value offers methods to access information we have stored about the artifact (value, version), and other automation capabilities, such as to_pipeline().

Source code in lineapy/api/api.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def save(
    reference: object,
    name: str,
    storage_backend: Optional[ARTIFACT_STORAGE_BACKEND] = None,
    **kwargs,
) -> LineaArtifact:
    """
    Publishes the object to the LineaPy DB.

    Parameters
    ----------
    reference: Union[object, ExternalState]
        The reference could be a variable name, in which case LineaPy will save
        the value of the variable, with out default serialization mechanism.
        Alternatively, it could be a "side effect" reference, which currently includes either `lineapy.file_system` or `lineapy.db`.
        LineaPy will save the associated process that creates the final side effects.
        We are in the process of adding more side effect references, including `assert` statements.
    name: str
        The name is used for later retrieving the artifact and creating new versions if an artifact of the name has been created before.
    storage_backend: Optional[ARTIFACT_STORAGE_BACKEND]
        The storage backend used to save the artifact. Currently support
        lineapy and mlflow (for mlflow supported model flavors). In case of
        mlflow, lineapy will use `mlflow.sklearn.log_model` or other supported
        flavors equivalent to save artifacts into mlflow.
    **kwargs:
        Keyword arguments passed into underlying storage mechanism to overwrite
        default behavior. For `storage_backend='mlflow'`, this can overwrite
        default arguments in the `mlflow.sklearn.log_model` or other supported
        flavors equivalent.

    Returns
    -------
    LineaArtifact
        Returned value offers methods to access
        information we have stored about the artifact (value, version),
        and other automation capabilities, such as [`to_pipeline()`][lineapy.api.api.to_pipeline].
    """
    execution_context = get_context()
    executor = execution_context.executor
    db = executor.db
    call_node = execution_context.node

    # If this value is stored as a global in the executor (meaning its an external side effect)
    # then look it up from there, instead of using this node.
    if isinstance(reference, ExternalState):
        value_node_id = executor.lookup_external_state(reference)
        msg = f"No change to the {reference.external_state} was recorded. If it was in fact changed, please open a Github issue."
        if not value_node_id:
            track(
                ExceptionEvent(ErrorType.SAVE, "No change to external state")
            )
            raise ValueError(msg)
    else:
        # Lookup the first arguments id, which is the id for the value, and
        # save that as the artifact
        value_node_id = call_node.positional_args[0].id

    execution_id = executor.execution.id
    timing = executor.get_execution_time(value_node_id)

    # serialize value to db if we haven't before
    # (happens with multiple artifacts pointing to the same value)
    serialize_method = ARTIFACT_STORAGE_BACKEND.lineapy
    if not db.node_value_in_db(
        node_id=value_node_id, execution_id=execution_id
    ):

        # TODO add version or timestamp to allow saving of multiple pickle files for the same node id

        artifact_serialize_metadata = serialize_artifact(
            value_node_id,
            execution_id,
            reference,
            name,
            storage_backend,
            **kwargs,
        )
        if (
            artifact_serialize_metadata["backend"]
            == ARTIFACT_STORAGE_BACKEND.mlflow
        ):
            artifact_path = artifact_serialize_metadata["metadata"].model_uri
            serialize_method = ARTIFACT_STORAGE_BACKEND.mlflow
        else:
            artifact_path = artifact_serialize_metadata["metadata"][
                "pickle_name"
            ]

        # adds reference to pickled file inside database
        db.write_node_value(
            NodeValue(
                node_id=value_node_id,
                value=artifact_path,
                execution_id=execution_id,
                start_time=timing[0],
                end_time=timing[1],
                value_type=get_value_type(reference),
            )
        )
        # we have to commit eagerly because if we just add it
        #   to the queue, the `res` value may have mutated
        #   and that's incorrect.
        db.commit()

    # artifact_version = 0 if artifact exists else bump one version
    date_created = datetime.utcnow()
    artifact_version = db.get_latest_artifact_version(name) + 1

    artifact_to_write = Artifact(
        node_id=value_node_id,
        execution_id=execution_id,
        date_created=date_created,
        name=name,
        version=artifact_version,
    )
    db.write_artifact(artifact_to_write)

    if serialize_method == ARTIFACT_STORAGE_BACKEND.mlflow:
        artifactorm = db.get_artifactorm_by_name(
            artifact_name=name, version=artifact_version
        )
        db.write_mlflow_artifactmetadata(
            artifactorm, artifact_serialize_metadata["metadata"]
        )

    track(SaveEvent(side_effect=side_effect_to_str(reference)))

    linea_artifact = LineaArtifact(
        db=db,
        name=name,
        date_created=date_created,
        _execution_id=execution_id,
        _node_id=value_node_id,
        _session_id=call_node.session_id,
        _version=artifact_version,
    )
    return linea_artifact

to_pipeline(artifacts, framework='SCRIPT', pipeline_name=None, dependencies={}, output_dir='.', input_parameters=[], reuse_pre_computed_artifacts=[], generate_test=False, pipeline_dag_config={}, include_non_slice_as_comment=False)

Writes the pipeline job to a path on disk.

Parameters:

Name Type Description Default
artifacts List[str]

Names of artifacts to be included in the pipeline.

required
framework str

Name of the framework to be used. Defined by enum PipelineTypes in lineapy/data/types.py. Defaults to "SCRIPT" if not specified.

'SCRIPT'
pipeline_name Optional[str]

Name of the pipeline.

None
dependencies TaskGraphEdge

Task dependencies in graphlib format, e.g., {"B": {"A", "C"}} means task A and C are prerequisites for task B. LineaPy is smart enough to figure out dependency relations within the same session, so there is no need to specify this type of dependency information; instead, the user is expected to provide dependency information among artifacts across different sessions.

{}
output_dir str

Directory path to save DAG and other pipeline files.

'.'
input_parameters List[str]

Names of variables to be used as parameters in the pipeline. Currently, it only accepts variables from literal assignment such as a = '123'. For each variable to be parametrized, there should be only one literal assignment across all artifact code for the pipeline. For instance, if both a = '123' and a = 'abc' exist in the pipeline's artifact code, we cannot make a an input parameter since its reference is ambiguous, i.e., we are not sure which literal assignment a refers to.

[]
reuse_pre_computed_artifacts List[str]

Names of artifacts in the pipeline for which pre-computed value is to be used (rather than recomputing the value).

[]
generate_test bool

Whether to generate scaffold/template for pipeline testing. Defaults to False. The scaffold contains placeholders for testing each function in the pipeline module file and is meant to be fleshed out by the user to suit their needs. When run out of the box, it performs a naive form of equality evaluation for each function's output, which demands validation and customization by the user.

False
pipeline_dag_config Optional[Dict]

A dictionary of parameters to configure DAG file to be generated. Not applicable for "SCRIPT" framework as it does not generate a separate DAG file. For "AIRFLOW" framework, Airflow-native config params such as "retries" and "schedule_interval" can be passed in. For "ARGO" framework, Argo-native config params such as "namespace" and "service_account_name".

{}

Returns:

Type Description
Path

Directory path where DAG and other pipeline files are saved.

Source code in lineapy/api/api.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def to_pipeline(
    artifacts: List[str],
    framework: str = "SCRIPT",
    pipeline_name: Optional[str] = None,
    dependencies: TaskGraphEdge = {},
    output_dir: str = ".",
    input_parameters: List[str] = [],
    reuse_pre_computed_artifacts: List[str] = [],
    generate_test: bool = False,
    pipeline_dag_config: Optional[Dict] = {},
    include_non_slice_as_comment: bool = False,
) -> Path:
    """
    Writes the pipeline job to a path on disk.

    Parameters
    ----------
    artifacts: List[str]
        Names of artifacts to be included in the pipeline.
    framework: str
        Name of the framework to be used.
        Defined by enum PipelineTypes in lineapy/data/types.py.
        Defaults to "SCRIPT" if not specified.
    pipeline_name: Optional[str]
        Name of the pipeline.
    dependencies: TaskGraphEdge
        Task dependencies in graphlib format, e.g., ``{"B": {"A", "C"}}``
        means task A and C are prerequisites for task B.
        LineaPy is smart enough to figure out dependency relations *within*
        the same session, so there is no need to specify this type of dependency
        information; instead, the user is expected to provide dependency information
        among artifacts across different sessions.
    output_dir: str
        Directory path to save DAG and other pipeline files.
    input_parameters: List[str]
        Names of variables to be used as parameters in the pipeline.
        Currently, it only accepts variables from literal assignment
        such as ``a = '123'``. For each variable to be parametrized,
        there should be only one literal assignment across all
        artifact code for the pipeline. For instance, if both ``a = '123'``
        and ``a = 'abc'`` exist in the pipeline's artifact code,
        we cannot make ``a`` an input parameter since its reference is
        ambiguous, i.e., we are not sure which literal assignment ``a``
        refers to.
    reuse_pre_computed_artifacts: List[str]
        Names of artifacts in the pipeline for which pre-computed value
        is to be used (rather than recomputing the value).
    generate_test: bool
        Whether to generate scaffold/template for pipeline testing.
        Defaults to ``False``. The scaffold contains placeholders for testing
        each function in the pipeline module file and is meant to be fleshed
        out by the user to suit their needs. When run out of the box, it performs
        a naive form of equality evaluation for each function's output,
        which demands validation and customization by the user.
    pipeline_dag_config: Optional[AirflowDagConfig]
        A dictionary of parameters to configure DAG file to be generated.
        Not applicable for "SCRIPT" framework as it does not generate a separate
        DAG file. For "AIRFLOW" framework, Airflow-native config params such as
        "retries" and "schedule_interval" can be passed in. For "ARGO" framework,
        Argo-native config params such as "namespace" and "service_account_name".

    Returns
    -------
    Path
        Directory path where DAG and other pipeline files are saved.
    """
    pipeline = Pipeline(
        artifacts=artifacts,
        name=pipeline_name,
        dependencies=dependencies,
    )
    pipeline.save()
    return pipeline.export(
        framework=framework,
        output_dir=output_dir,
        input_parameters=input_parameters,
        reuse_pre_computed_artifacts=reuse_pre_computed_artifacts,
        generate_test=generate_test,
        pipeline_dag_config=pipeline_dag_config,
        include_non_slice_as_comment=include_non_slice_as_comment,
    )

Was this helpful?

Help us improve docs with your feedback!