Open in Colab

API Basics

In this tutorial, we will cover basic functionalities supported by lineapy using simple examples.

Table of Contents

|LineaPy\_artifact|

If you encounter issues you cannot resolve, simply ask in our Slack community’s #support channel. We are always happy and ready to help you!

You can ignore # NBVAL_* comments in certain cell blocks. They are for passing unit tests only, which we do to make sure the examples are always functional as we update the codebase.

[ ]:
#NBVAL_SKIP
%%capture
!pip -q install lineapy~=0.2
[ ]:
#NBVAL_SKIP
%load_ext lineapy
[1]:
# NBVAL_IGNORE_OUTPUT

import os
import lineapy
import pandas as pd
import matplotlib.pyplot as plt

lineapy.tag("API Basics Demo")

Exploring data

Let’s load the toy data to use.

[2]:
# NBVAL_IGNORE_OUTPUT

# Load data
df = pd.read_csv("https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv")
[3]:
# View data
df
[3]:
sepal.length sepal.width petal.length petal.width variety
0 5.1 3.5 1.4 0.2 Setosa
1 4.9 3.0 1.4 0.2 Setosa
2 4.7 3.2 1.3 0.2 Setosa
3 4.6 3.1 1.5 0.2 Setosa
4 5.0 3.6 1.4 0.2 Setosa
... ... ... ... ... ...
145 6.7 3.0 5.2 2.3 Virginica
146 6.3 2.5 5.0 1.9 Virginica
147 6.5 3.0 5.2 2.0 Virginica
148 6.2 3.4 5.4 2.3 Virginica
149 5.9 3.0 5.1 1.8 Virginica

150 rows × 5 columns

Now, we might be interested in seeing if the data reflects differences between iris types. Let’s compare their petal traits.

[4]:
# Plot petal length/width by iris type
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
df.boxplot("petal.length", "variety", ax=ax[0])
df.boxplot("petal.width", "variety", ax=ax[1])
plt.show()
../_images/tutorials_00_api_basics_15_0.png

Overall, we observe noticeable differences between iris types, especially between Setosa and Virginica. Let’s quantify differences between the two types.

[5]:
# Calculate averages for Setosa
avg_length_setosa = df.query("variety == 'Setosa'")["petal.length"].mean()
avg_width_setosa = df.query("variety == 'Setosa'")["petal.width"].mean()

# Calculate averages for Virginica
avg_length_virginica = df.query("variety == 'Virginica'")["petal.length"].mean()
avg_width_virginica = df.query("variety == 'Virginica'")["petal.width"].mean()

# Calculate differences
diff_avg_length = avg_length_setosa - avg_length_virginica
diff_avg_width = avg_width_setosa - avg_width_virginica
[6]:
# NBVAL_IGNORE_OUTPUT

# View result
print("Difference in average length:", diff_avg_length)
print("Difference in average width:", diff_avg_width)
Difference in average length: -4.09
Difference in average width: -1.7800000000000002

Storing an artifact with save()

Say we are particularly interested in tracking the average length difference between Setosa and Virginica. For instance, we might want to use this variable later for population-level modeling of the two species.

The save() method allows us to store a variable’s value and history as a data type called LineaArtifact. Note that LineaArtifact holds more than the final state of the variable — it also captures the complete development process behind the variable, which allows for full reproducibility. For more information about artifacts in LineaPy, please check the Concepts section.

The method requires two arguments: the variable to save and the string name to save it as. It returns the saved artifact.

[7]:
# NBVAL_IGNORE_OUTPUT

# Store the variable as an artifact
length_artifact = lineapy.save(diff_avg_length, "iris_diff_avg_length")

# Check object type
print(type(length_artifact))
<class 'lineapy.api.models.linea_artifact.LineaArtifact'>

LineaArtifact object has two key APIs:

  • .get_value() returns value of the artifact, e.g., an integer or a dataframe

  • .get_code() returns minimal essential code to create the value

Hence, for the current artifacts, we see:

[8]:
# Check the value of the artifact
print(length_artifact.get_value())
-4.09

☝️ ☝️ ☝️ [FEATURE] Retrieve artifact value ☝️ ☝️ ☝️

[9]:
# Check minimal essential code to generate the artifact
print(length_artifact.get_code())
import pandas as pd

df = pd.read_csv(
    "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv"
)
avg_length_setosa = df.query("variety == 'Setosa'")["petal.length"].mean()
avg_length_virginica = df.query("variety == 'Virginica'")["petal.length"].mean()
diff_avg_length = avg_length_setosa - avg_length_virginica

☝️ ☝️ ☝️ [FEATURE] Retrieve cleaned artifact code ☝️ ☝️ ☝️

Note that irrelevant code has been stripped out (e.g., operations relating to diff_avg_width only).

Note: If you want, you can retrieve the artifact’s full original code with artifact.get_session_code().

Listing artifacts with artifact_store()

Of course, with time passing, we may not remember what artifacts we saved and under what names. The artifact_store() method allows us to see the list of all previously saved artifacts, like so:

[10]:
# NBVAL_IGNORE_OUTPUT

# List all saved artifacts
lineapy.artifact_store()
[10]:
iris_diff_avg_length:0 created on 2022-09-25 22:20:07.542825

☝️ ☝️ ☝️ [FEATURE] Retrieve all artifacts ☝️ ☝️ ☝️

Note that the artifact store records each artifact’s creation time, which means that multiple versions can be stored under the same artifact name. Hence, if we save iris_diff_avg_length artifact again, we get:

[11]:
# NBVAL_IGNORE_OUTPUT

# Save the same artifact again
lineapy.save(diff_avg_length, "iris_diff_avg_length")

# List all saved artifacts
lineapy.artifact_store()
[11]:
iris_diff_avg_length:0 created on 2022-09-25 22:20:07.542825
iris_diff_avg_length:1 created on 2022-09-25 22:20:07.690403

Retrieving an artifact with get()

We can retrieve any stored artifact using the get() method. This comes in handy when we work across multiple sessions/phases of a project (or even across different projects) as we can easily build on the previous work.

For example, say we have done other exploratory analyses and are finally starting our work on population-level modeling. This is likely done in a new Jupyter notebook (possibly in a different subdirectory) and we need an easy way to load artifacts from our past work. We can use the get() method for this.

The method takes the string name of the artifact as its argument and returns the corresponding artifact, like so:

[12]:
# Retrieve a saved artifact
length_artifact2 = lineapy.get("iris_diff_avg_length")

# Confirm the artifact holds the same value and code as before
print(length_artifact2.get_value())
print(length_artifact2.get_code())
-4.09
import pandas as pd

df = pd.read_csv(
    "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv"
)
avg_length_setosa = df.query("variety == 'Setosa'")["petal.length"].mean()
avg_length_virginica = df.query("variety == 'Virginica'")["petal.length"].mean()
diff_avg_length = avg_length_setosa - avg_length_virginica

By default, the get() method retrieves the latest version of the given artifact. To retrieve a particular version of the artifact, we can specify the value of the optional argument version, like so:

[13]:
# NBVAL_IGNORE_OUTPUT

# Get version info of the retrieved artifact
desired_version = length_artifact2.version

# Check the version info
print(desired_version)
print(type(desired_version))
1
<class 'int'>
[14]:
# NBVAL_IGNORE_OUTPUT

# Retrieve the same version of the artifact
length_artifact3 = lineapy.get("iris_diff_avg_length", version=desired_version)

# Confirm the right version has been retrieved
print(length_artifact3.name)
print(length_artifact3.version)
iris_diff_avg_length
1

Deleting an artifact with delete()

With time passing, it is likely that the artifact store contains many artifacts, including some that we no longer need/want. We can use the delete() API to remove such undesired artifacts. For instance, we currently have two versions of iris_diff_avg_length artifact, like so:

[15]:
# NBVAL_IGNORE_OUTPUT

# List all saved artifacts
lineapy.artifact_store()
[15]:
iris_diff_avg_length:0 created on 2022-09-25 22:20:07.542825
iris_diff_avg_length:1 created on 2022-09-25 22:20:07.690403

Let’s say we no longer need version 1. We can remove it by calling the delete() API with the artifact name and version, as the following:

[16]:
# NBVAL_IGNORE_OUTPUT

# Delete a specific artifact
lineapy.delete("iris_diff_avg_length", version=1)
Deleted Artifact: iris_diff_avg_length version: 1

If we check the artifact store again, we no longer see the artifact:

[17]:
# NBVAL_IGNORE_OUTPUT

# List all saved artifacts
lineapy.artifact_store()
[17]:
iris_diff_avg_length:0 created on 2022-09-25 22:20:07.542825

Note that we can also pass a string value to version: - version="latest" will delete the latest version of the artifact - version="all" will delete all versions of the artifact

Using artifacts to build pipelines

Say we are now also interested in using the average width difference in our population-level modeling, in which case we will store it as an artifact too:

[18]:
# Store the width variable as an artifact too
width_artifact = lineapy.save(diff_avg_width, "iris_diff_avg_width")

Now consider the case where our source data (i.e. iris.csv) gets updated. Moreover, the update is not a one-time event; the data is planned to be updated on a regular basis as new samples arrive.

Since the iris_diff_avg_length and iris_diff_avg_width artifact were derived from the iris.csv data, this means that we need to rerun each artifact’s code lest its value be stale. Given the recurring updates in the source data, we may want to build and schedule a pipeline to automatically rerun the code of both artifacts on a regular basis.

Having the complete development process captured in each artifact, LineaPy makes it easy for us to to turn these two artifacts into a deployable pipeline. For instance, Airflow is a popular tool for pipeline building and management, and we can turn the artifacts into a set of files that can be deployed as an Airflow DAG, like so:

[19]:
# NBVAL_IGNORE_OUTPUT

# Build an Airflow pipeline with both length and width artifacts
lineapy.to_pipeline(
    artifacts=[length_artifact.name, width_artifact.name],
    pipeline_name="demo_pipeline",
    framework="AIRFLOW",
    output_dir="output/00_api_basics/",
)
Generated module file: output/00_api_basics/demo_pipeline_module.py
Generated requirements file: output/00_api_basics/demo_pipeline_requirements.txt
Generated DAG file: output/00_api_basics/demo_pipeline_dag.py
Generated Docker file: output/00_api_basics/demo_pipeline_Dockerfile
[19]:
PosixPath('output/00_api_basics')

where

  • artifacts is the list of artifact names to be used for the pipeline

  • pipeline_name is the name of the pipeline

  • output_dir is the location to put the files for running the pipeline

  • framework is the name of orchestration framework to use (currently supports SCRIPTS and AIRFLOW)

And we see the following files have been generated:

[20]:
# NBVAL_IGNORE_OUTPUT

# Check the generated files for running the pipeline
os.listdir("output/00_api_basics/")
[20]:
['demo_pipeline_requirements.txt',
 'demo_pipeline_Dockerfile',
 'demo_pipeline_dag.py',
 'demo_pipeline_module.py']

where

  • [PIPELINE-NAME]_module.py contains the artifact’s cleaned-up code packaged as a function

  • [PIPELINE-NAME]_dag.py uses the packaged function to define the pipeline

  • [PIPELINE-NAME]_requirements.txt lists dependencies for running the pipeline

  • [PIPELINE-NAME]_Dockerfile contains commands to set up the environment to run the pipeline

Specifically, we have demo_pipeline_module.py looking as follows:

[21]:
# NBVAL_IGNORE_OUTPUT

%cat output/00_api_basics/demo_pipeline_module.py
import pandas as pd


def get_df_for_artifact_iris_diff_avg_length_and_downstream():
    df = pd.read_csv(
        "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv"
    )
    return df


def get_iris_diff_avg_length(df):
    avg_length_setosa = df.query("variety == 'Setosa'")["petal.length"].mean()
    avg_length_virginica = df.query("variety == 'Virginica'")["petal.length"].mean()
    diff_avg_length = avg_length_setosa - avg_length_virginica
    return diff_avg_length


def get_iris_diff_avg_width(df):
    avg_width_setosa = df.query("variety == 'Setosa'")["petal.width"].mean()
    avg_width_virginica = df.query("variety == 'Virginica'")["petal.width"].mean()
    diff_avg_width = avg_width_setosa - avg_width_virginica
    return diff_avg_width


def run_session_including_iris_diff_avg_length():
    # Given multiple artifacts, we need to save each right after
    # its calculation to protect from any irrelevant downstream
    # mutations (e.g., inside other artifact calculations)
    import copy

    artifacts = dict()
    df = get_df_for_artifact_iris_diff_avg_length_and_downstream()
    diff_avg_length = get_iris_diff_avg_length(df)
    artifacts["iris_diff_avg_length"] = copy.deepcopy(diff_avg_length)
    diff_avg_width = get_iris_diff_avg_width(df)
    artifacts["iris_diff_avg_width"] = copy.deepcopy(diff_avg_width)
    return artifacts


def run_all_sessions():
    artifacts = dict()
    artifacts.update(run_session_including_iris_diff_avg_length())
    return artifacts


if __name__ == "__main__":
    # Edit this section to customize the behavior of artifacts
    artifacts = run_all_sessions()
    print(artifacts)

We can see that LineaPy used artifacts to automatically 1) clean up their code to retain only essential operations and 2) package the cleaned-up code into importable functions.

And we see demo_pipeline_dag.py automatically composing an Airflow DAG with these functions:

[22]:
# NBVAL_IGNORE_OUTPUT

%cat output/00_api_basics/demo_pipeline_dag.py
import pathlib
import pickle

import demo_pipeline_module
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def dag_setup():
    pickle_folder = pathlib.Path("/tmp").joinpath("demo_pipeline")
    if not pickle_folder.exists():
        pickle_folder.mkdir()


def dag_teardown():
    pickle_files = pathlib.Path("/tmp").joinpath("demo_pipeline").glob("*.pickle")
    for f in pickle_files:
        f.unlink()


def task_df_for_artifact_iris_diff_avg_length_and_downstream():

    df = demo_pipeline_module.get_df_for_artifact_iris_diff_avg_length_and_downstream()

    pickle.dump(df, open("/tmp/demo_pipeline/variable_df.pickle", "wb"))


def task_iris_diff_avg_length():

    df = pickle.load(open("/tmp/demo_pipeline/variable_df.pickle", "rb"))

    diff_avg_length = demo_pipeline_module.get_iris_diff_avg_length(df)

    pickle.dump(
        diff_avg_length,
        open("/tmp/demo_pipeline/variable_diff_avg_length.pickle", "wb"),
    )


def task_iris_diff_avg_width():

    df = pickle.load(open("/tmp/demo_pipeline/variable_df.pickle", "rb"))

    diff_avg_width = demo_pipeline_module.get_iris_diff_avg_width(df)

    pickle.dump(
        diff_avg_width, open("/tmp/demo_pipeline/variable_diff_avg_width.pickle", "wb")
    )


default_dag_args = {
    "owner": "airflow",
    "retries": 2,
    "start_date": days_ago(1),
}

with DAG(
    dag_id="demo_pipeline_dag",
    schedule_interval="*/15 * * * *",
    max_active_runs=1,
    catchup=False,
    default_args=default_dag_args,
) as dag:

    setup = PythonOperator(
        task_id="dag_setup",
        python_callable=dag_setup,
    )

    teardown = PythonOperator(
        task_id="dag_teardown",
        python_callable=dag_teardown,
    )

    df_for_artifact_iris_diff_avg_length_and_downstream = PythonOperator(
        task_id="df_for_artifact_iris_diff_avg_length_and_downstream_task",
        python_callable=task_df_for_artifact_iris_diff_avg_length_and_downstream,
    )

    iris_diff_avg_length = PythonOperator(
        task_id="iris_diff_avg_length_task",
        python_callable=task_iris_diff_avg_length,
    )

    iris_diff_avg_width = PythonOperator(
        task_id="iris_diff_avg_width_task",
        python_callable=task_iris_diff_avg_width,
    )

    df_for_artifact_iris_diff_avg_length_and_downstream >> iris_diff_avg_length

    iris_diff_avg_length >> iris_diff_avg_width

    setup >> df_for_artifact_iris_diff_avg_length_and_downstream

    iris_diff_avg_width >> teardown

These files, once placed in the location that Airflow expects (usually dag/ under Airflow’s home directory), should let us immediately execute the pipeline from the UI or CLI.

For a more detailed illustration of pipeline building, please check this tutorial.

Using helper functions

On top of the various functionalities introduced above, there are a few helper functions that we have made public. We think these functions can be very helpful when performing repeated operations or trying out variations on experiments. These functions are introduced below:

Using get_function to perform the same operation on different input

[23]:
a = 1
b = a**2
squareart = lineapy.save(b, "squared")
sq = lineapy.get_function(["squared"], input_parameters=["a"])
[24]:
sq(a=3)["squared"]
[24]:
9

What’s happening?

Underneath the hood, get_function uses parametrized version of a pipeline with a single artifact and configurable inputs. In this case, since only one artifact "squared" was used to construct the pipeline, it returns a dictionary with just updated value. The interface, however, is that of a normal function that gets imported and returns a dictionary. The users can use this callable function to directly test the outputs of a pipeline before deployment. For more reference on pipelines and parametrized inputs, refer to our advanced tutorials on parametrized pipelines.

The pipeline underneath a get_function can be seen using the get_module_definition function as shown below. The function returned via get_function is run_all_sessions

[25]:
print(lineapy.get_module_definition(["squared"], input_parameters=["a"]))
import argparse


def get_squared(a):
    b = a**2
    return b


def run_session_including_squared(a=1):
    # Given multiple artifacts, we need to save each right after
    # its calculation to protect from any irrelevant downstream
    # mutations (e.g., inside other artifact calculations)
    import copy

    artifacts = dict()
    b = get_squared(a)
    artifacts["squared"] = copy.deepcopy(b)
    return artifacts


def run_all_sessions(
    a=1,
):
    artifacts = dict()
    artifacts.update(run_session_including_squared(a))
    return artifacts


if __name__ == "__main__":
    # Edit this section to customize the behavior of artifacts
    parser = argparse.ArgumentParser()
    parser.add_argument("--a", type=int, default=1)
    args = parser.parse_args()
    artifacts = run_all_sessions(
        a=args.a,
    )
    print(artifacts)

Recap

In this tutorial, we learned basic functionalities of LineaPy including how to save, browse, and retrieve an artifact. We saw that a LineaPy artifact stores not only the value of a variable but also its full development code. This then helps to automate time-consuming, manual steps in a data science workflow such as code cleanup and pipeline building, hence helping data scientists move faster towards productionization and impact.