lineapy package

Subpackages

Module contents

class lineapy.SessionType(value)[source]

Session types allow the tracer to know what to expect - JUPYTER: the tracer need to progressively add more nodes to the graph - SCRIPT: the easiest case, run everything until the end

class lineapy.Tracer(db: lineapy.db.db.RelationalLineaDB, session_type: dataclasses.InitVar, session_name: dataclasses.InitVar = None, globals_: dataclasses.InitVar = None, variable_name_to_node: Dict[str, Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode]] = <factory>, module_name_to_node: Dict[str, Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode]] = <factory>, mutation_tracker: lineapy.instrumentation.mutation_tracker.MutationTracker = <factory>, control_flow_tracker: lineapy.instrumentation.control_flow_tracker.ControlFlowTracker = <factory>)[source]
assign(variable_name: str, value_node: Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode], from_import: bool = False) None[source]

Assign updates a local mapping of variable nodes.

call(function_node: Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode], source_location: Optional[lineapy.data.types.SourceLocation], *arguments: Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode, Tuple[bool, Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode]]], **keyword_arguments: Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode]) lineapy.data.types.CallNode[source]
Parameters
  • function_node – the function node to call/execute

  • source_location – the source info from user code

  • arguments – positional arguments. These are passed as either Nodes (named nodes, constants, etc) or tuples (starred, the node) where the starred is a boolean to indicate whether the argument is supposed to be splatted before passing to the function (This is the case where you might call a function like so foo(1, *[2, 3]) ). The boolean is made optional simply to support the legacy way of calling this function and not having to pass the tuples for every single case from node_transformer

  • keyword_arguments – keyword arguments. These are passed as a dictionary of keyword arguments to the function. Similar to *positional_arguments, the keyword arguments can also be splatted by naming the key as unpack_<index> where <index> is the index of the argument. In this case, the dictionary will be unpacked and passed as keyword arguments to the function. The keyword arguments are processed in order of passing so any keyword conflicts will result in the last value accepted as the value for the keyword.

Returns

a call node

Note

  • It’s important for the call to return the call node so that we can programmatically chain the the nodes together, e.g., for the assignment call to modify the previous call node.

  • The call looks up if it’s a locally defined function. We decided that this is better for program slicing.

import_module(name: str, source_location: Optional[lineapy.data.types.SourceLocation] = None) Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode][source]

Import a module. If we have already imported it, just return its ID. Otherwise, create new module nodes for each submodule in its parents and return it.

lookup_node(variable_name: str, source_location: Optional[lineapy.data.types.SourceLocation] = None) Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode][source]

Cases for the node that we are looking up:

  • user defined variable & function definitions

  • imported libs

  • unknown runtime magic functions—special case to LookupNode

    • builtin functions, e.g., min

    • custom runtime, e.g., get_ipython

process_node(node: Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode]) None[source]

Execute a node, and adds it to the database.

trace_import(name: str, source_location: Optional[lineapy.data.types.SourceLocation] = None, alias: Optional[str] = None, attributes: Optional[Dict[str, str]] = None) None[source]
  • name: the name of the module

  • alias: the module could be aliased, e.g., import pandas as pd

  • attributes: a list of functions imported from the library.

    It keys the aliased name to the original name.

Note

  • The input args would _either_ have alias or attributes, but not both

  • Didn’t call the function import because I think that’s a protected name

note that version and path will be introspected at runtime

property values: Dict[str, object]

Returns a mapping of variable names to their values, by joining the scoping information with the executor values.

class lineapy.ValueType(value)[source]

Lower case because the API with the frontend assume the characters “chart” exactly as is.

Todo

rename (need coordination with linea-server)

  • really dataset is a table

  • value means its a literal (e.g., int/str)

lineapy.artifact_store() lineapy.api.models.linea_artifact_store.LineaArtifactStore[source]
Returns

An object of the class LineaArtifactStore that allows for printing and exporting artifacts metadata.

Return type

LineaArtifactStore

lineapy.delete(artifact_name: str, version: Union[int, str]) None[source]

Deletes an artifact from artifact store. If no other artifacts refer to the value, the value is also deleted from both the value node store and the pickle store.

Parameters
  • artifact_name – Key used to while saving the artifact

  • version – version number or ‘latest’ or ‘all’

Raises

ValueError – if arifact not found or version invalid

lineapy.get(artifact_name: str, version: Optional[int] = None) lineapy.api.models.linea_artifact.LineaArtifact[source]

Gets an artifact from the DB.

Parameters
  • artifact_name (str) – name of the artifact. Note that if you do not remember the artifact, you can use the artifact_store to browse the options

  • version (Optional[str]) – version of the artifact. If None, the latest version will be returned.

Returns

returned value offers methods to access information we have stored about the artifact

Return type

LineaArtifact

lineapy.get_function(artifacts: List[Union[str, Tuple[str, int]]], input_parameters: List[str] = [], reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]] = []) Callable[source]

Extract the process that creates selected artifacts as a python function

Parameters
  • artifacts (List[Union[str, Tuple[str, int]]]) – List of artifact names(with optional version) to be included in the function return.

  • input_parameters (List[str]) – List of variable names to be used in the function arguments. Currently, only accept variable from literal assignment; such as a=’123’. There should be only one literal assignment for each variable within all artifact calculation code. For instance, if both a=’123’ and a=’abc’ are existing in the code, we cannot specify a as input variables since it is confusing to specify which literal assignment we want to replace.

  • reuse_pre_computed_artifacts (List[Union[str, Tuple[str, int]]]) – List of artifacts(name with optional version) for which we will use pre-computed values from the artifact store instead of recomputing from original code.

Returns

  • Callable – A python function that takes input_parameters as args and returns a dictionary with each artifact name as the dictionary key and artifact value as the value.

  • Note that,

  • 1. If an input parameter is only used to calculate artifacts in thereuse_pre_computed_artifacts list, that input parameter will be passed around as a dummy variable. LineaPy will create a warning.

  • 2. If an artifact name has been saved multiple times within a session, – multiple sessions or mutated. You might want to specify version number in artifacts or reuse_pre_computed_artifacts. The best practice to avoid searching artifact version is don’t reuse artifact name in different notebooks and don’t save same artifact multiple times within the same session.

lineapy.get_module_definition(artifacts: List[Union[str, Tuple[str, int]]], input_parameters: List[str] = [], reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]] = []) str[source]

Create a python module that includes the definition of :func::get_function.

Parameters
  • artifacts (List[Union[str, Tuple[str, int]]]) – same as get_function()

  • input_parameters (List[str]) – same as get_function()

  • reuse_pre_computed_artifacts (List[Union[str, Tuple[str, int]]]) – same as get_function()

Returns

A python module that includes the definition of :func::get_function as run_all_sessions.

Return type

str

lineapy.reload() None[source]

Reloads lineapy context.

Note

Currently only reloads annotations but in the future can be a container for other items like configs etc.

lineapy.save(reference: object, name: str) lineapy.api.models.linea_artifact.LineaArtifact[source]

Publishes the object to the Linea DB.

Parameters
  • reference (Union[object, ExternalState]) – The reference could be a variable name, in which case Linea will save the value of the variable, with out default serialization mechanism. Alternatively, it could be a “side effect” reference, which currently includes either lineapy.file_system or lineapy.db. Linea will save the associated process that creates the final side effects. We are in the process of adding more side effect references, including assert statements.

  • name (str) – The name is used for later retrieving the artifact and creating new versions if an artifact of the name has been created before.

Returns

returned value offers methods to access information we have stored about the artifact (value, version), and other automation capabilities, such as to_pipeline().

Return type

LineaArtifact

lineapy.to_pipeline(artifacts: List[str], framework: str = 'SCRIPT', pipeline_name: Optional[str] = None, dependencies: Dict[str, Set[str]] = {}, output_dir: str = '.', input_parameters: List[str] = [], reuse_pre_computed_artifacts: List[str] = [], pipeline_dag_config: Optional[lineapy.plugins.task.AirflowDagConfig] = {}) pathlib.Path[source]

Writes the pipeline job to a path on disk.

Parameters
  • artifacts (List[str]) – Names of artifacts to be included in the pipeline.

  • framework (str) – “AIRFLOW” or “SCRIPT”. Defaults to “SCRIPT” if not specified.

  • pipeline_name (Optional[str]) – Name of the pipeline.

  • dependencies (TaskGraphEdge) – Task dependencies in graphlib format, e.g., {"B": {"A", "C"}} means task A and C are prerequisites for task B. LineaPy is smart enough to figure out dependency relations within the same session, so there is no need to specify this type of dependency information; instead, the user is expected to provide dependency information among artifacts across different sessions.

  • output_dir (str) – Directory path to save DAG and other pipeline files.

  • input_parameters (List[str]) – Names of variables to be used as parameters in the pipeline. Currently, it only accepts variables from literal assignment such as a = '123'. For each variable to be parametrized, there should be only one literal assignment across all artifact code for the pipeline. For instance, if both a = '123' and a = 'abc' exist in the pipeline’s artifact code, we cannot make a an input parameter since its reference is ambiguous, i.e., we are not sure which literal assignment a refers to.

  • reuse_pre_computed_artifacts (List[str]) – Names of artifacts in the pipeline for which pre-computed value is to be used (rather than recomputing the value).

  • pipeline_dag_config (Optional[AirflowDagConfig]) – A dictionary of parameters to configure DAG file to be generated. Not applicable for “SCRIPT” framework as it does not generate a separate DAG file. For “AIRFLOW” framework, Airflow-native config params such as “retries” and “schedule_interval” can be passed in.

Returns

Directory path where DAG and other pipeline files are saved.

Return type

Path

lineapy.visualize(*, live=False) None[source]

Display a visualization of the Linea graph from this session using Graphviz.

If live=True, then this visualization will live update after cell execution. Note that this comes with a substantial performance penalty, so it is False by default.

Note: If the visualization is not live, it will print out the visualization as of the previous cell execution, not the one where visualize is executed.