lineapy package

Subpackages

Module contents

class lineapy.SessionType(value)[source]

Session types allow the tracer to know what to expect - JUPYTER: the tracer need to progressively add more nodes to the graph - SCRIPT: the easiest case, run everything until the end

class lineapy.Tracer(db: lineapy.db.db.RelationalLineaDB, session_type: dataclasses.InitVar[SessionType], session_name: dataclasses.InitVar[typing.Optional[str]] = None, globals_: dataclasses.InitVar[typing.Optional[typing.Dict[str, object]]] = None, variable_name_to_node: Dict[str, Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode]] = <factory>, mutation_tracker: lineapy.instrumentation.mutation_tracker.MutationTracker = <factory>)[source]
assign(variable_name: str, value_node: Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode]) None[source]

Assign updates a local mapping of variable nodes.

It doesn’t save this to the graph, and currently the source location for the assignment is discarded. In the future, if we need to trace where in some code a node is assigned, we can record that again.

call(function_node: Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode], source_location: Optional[SourceLocation], *arguments: Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode, Tuple[bool, Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode]]], **keyword_arguments: Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode]) CallNode[source]
Parameters
  • function_node – the function node to call/execute

  • source_location – the source info from user code

  • arguments – positional arguments. These are passed as either Nodes (named nodes, constants, etc) or tuples (starred, the node) where the starred is a boolean to indicate whether the argument is supposed to be splatted before passing to the function (This is the case where you might call a function like so foo(1, *[2, 3]) ). The boolean is made optional simply to support the legacy way of calling this function and not having to pass the tuples for every single case from node_transformer

  • keyword_arguments – keyword arguments. These are passed as a dictionary of keyword arguments to the function. Similar to *positional_arguments, the keyword arguments can also be splatted by naming the key as unpack_<index> where <index> is the index of the argument. In this case, the dictionary will be unpacked and passed as keyword arguments to the function. The keyword arguments are processed in order of passing so any keyword conflicts will result in the last value accepted as the value for the keyword.

Returns

a call node

Note

  • It’s important for the call to return the call node so that we can programmatically chain the the nodes together, e.g., for the assignment call to modify the previous call node.

  • The call looks up if it’s a locally defined function. We decided that this is better for program slicing.

import_module(name: str, source_location: Optional[SourceLocation] = None) Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode][source]

Import a module. If we have already imported it, just return its ID. Otherwise, create new module nodes for each submodule in its parents and return it.

lookup_node(variable_name: str, source_location: Optional[SourceLocation] = None) Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode][source]

Cases for the node that we are looking up:

  • user defined variable & function definitions

  • imported libs

  • unknown runtime magic functions—special case to LookupNode

    • builtin functions, e.g., min

    • custom runtime, e.g., get_ipython

process_node(node: Union[ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode]) None[source]

Execute a node, and adds it to the database.

trace_import(name: str, source_location: Optional[SourceLocation] = None, alias: Optional[str] = None, attributes: Optional[Dict[str, str]] = None) None[source]
  • name: the name of the module

  • alias: the module could be aliased, e.g., import pandas as pd

  • attributes: a list of functions imported from the library.

    It keys the aliased name to the original name.

Note

  • The input args would _either_ have alias or attributes, but not both

  • Didn’t call the function import because I think that’s a protected name

note that version and path will be introspected at runtime

property values: Dict[str, object]

Returns a mapping of variable names to their values, by joining the scoping information with the executor values.

class lineapy.ValueType(value)[source]

Lower case because the API with the frontend assume the characters “chart” exactly as is.

Todo

FIXME

  • rename (need coordination with linea-server):
    • really dataset is a table

    • value means its a literal (e.g., int/str)

lineapy.catalog() LineaCatalog[source]
Returns

An object of the class LineaCatalog that allows for printing and exporting artifacts metadata.

Return type

LineaCatalog

lineapy.get(artifact_name: str, version: Optional[int] = None) LineaArtifact[source]

Gets an artifact from the DB.

Parameters
  • artifact_name (str) – name of the artifact. Note that if you do not remember the artifact, you can use the catalog to browse the options

  • version (Optional[str]) – version of the artifact. If None, the latest version will be returned.

Returns

returned value offers methods to access information we have stored about the artifact

Return type

LineaArtifact

lineapy.save(reference: object, name: str) LineaArtifact[source]

Publishes the object to the Linea DB.

Parameters
  • reference (Union[object, ExternalState]) – The reference could be a variable name, in which case Linea will save the value of the variable, with out default serialization mechanism. Alternatively, it could be a “side effect” reference, which currently includes either lineapy.file_system or lineapy.db. Linea will save the associated process that creates the final side effects. We are in the process of adding more side effect references, including assert statements.

  • name (str) – The name is used for later retrieving the artifact and creating new versions if an artifact of the name has been created before.

Returns

returned value offers methods to access information we have stored about the artifact (value, version), and other automation capabilities, such as to_pipeline().

Return type

LineaArtifact

lineapy.to_pipeline(artifacts: List[str], framework: str = 'SCRIPT', pipeline_name: Optional[str] = None, dependencies: Dict[str, Set[str]] = {}, pipeline_dag_config: Optional[AirflowDagConfig] = {}, output_dir: Optional[str] = None) Path[source]

Writes the pipeline job to a path on disk.

Parameters
  • artifacts – list of artifact names to be included in the DAG.

  • framework – ‘AIRFLOW’ or ‘SCRIPT’

  • pipeline_name – name of the pipeline

  • dependencies – tasks dependencies in graphlib format {‘B’:{‘A’,’C’}}, this means task A and C are prerequisites for task B.

  • output_dir_path – Directory of the DAG and the python file it is saved in; only use for PipelineType.AIRFLOW

Returns

string containing the path of the DAG file that was exported.

lineapy.visualize(*, live=False) None[source]

Display a visualization of the Linea graph from this session using Graphviz.

If live=True, then this visualization will live update after cell execution. Note that this comes with a substantial performance penalty, so it is False by default.

Note: If the visualization is not live, it will print out the visualization as of the previous cell execution, not the one where visualize is executed.