lineapy.data package

Submodules

lineapy.data.graph module

lineapy.data.graph.queue_get_when(queue: Queue[T], filter_fn: Callable[[lineapy.data.graph.T], bool]) lineapy.data.graph.T[source]

Gets the first element in the queue that satisfies the filter function.

lineapy.data.types module

class lineapy.data.types.Artifact(*, node_id: LineaID, execution_id: LineaID, date_created: datetime.datetime, name: str, version: int)[source]

An artifact points to the value of a node during some execution.

class lineapy.data.types.AssignedVariable[source]

For local variables, this is the node that is assigned to.

class lineapy.data.types.BaseNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.Node, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None)[source]
  • id: string version of UUID, which we chose because we do not need to coordinate to make it unique

  • lineno, col_offset, end_lino, end_col_offsets: these record the position of the calls. They are optional because it’s not required some nodes, such as side-effects nodes, which do not correspond to a line of code.

  • control_dependency: points to a ControlFlowNode which the generation of the current node is dependent upon. For example, in the snippet if condition: l.append(0), the append instruction’s execution depends on the condition being true or not, hence the MutateNode corresponding to the append instruction will have it’s control_dependency field pointing to the IfNode of the condition. Refer to tracer.py for usage.

  • class Config’s orm_mode allows us to use from_orm to convert ORM objects to pydantic objects

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.CallNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.CallNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, function_id: LineaID, positional_args: List[lineapy.data.types.PositionalArgument] = [], keyword_args: List[lineapy.data.types.KeywordArgument] = [], global_reads: Dict[str, LineaID] = {}, implicit_dependencies: List[LineaID] = [])[source]
  • function_id: node containing the value of the function call, which could be from various places: (1) locally defined, (2) imported, and (3) magically existing, e.g. from builtins (min), or environment like get_ipython.

  • value: value of the call result, filled at runtime. It may be cached by the data asset manager

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.ControlFlowNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.Node, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, companion_id: LineaID = None, unexec_id: LineaID = None)[source]

Represents a control flow node like if, else, for, while

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.ElseNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.ElseNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, companion_id: LineaID, unexec_id: LineaID = None)[source]

Represents the else keyword

class lineapy.data.types.Execution(*, id: LineaID, timestamp: datetime.datetime = None)[source]

An execution is one session of running many nodes and recording their values.

class lineapy.data.types.GlobalNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.GlobalNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, name: str, call_id: LineaID)[source]

Represents a lookup of a global variable, that was set as a side effect in another node.

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.IfNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.IfNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, companion_id: LineaID = None, unexec_id: LineaID = None, test_id: LineaID)[source]

Represents the if keyword

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.ImportNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.ImportNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, name: str, version: str = None, package_name: str = None, path: str = None)[source]

Imported libraries.

version and package_name are retrieved at runtime. package_name may be different from import name, see get_lib_package_version.

These are optional because the info is acquired at runtime.

Note

This node is not actually used for execution (using l_import CallNodes), but more a decoration for metadata retrieval.

class lineapy.data.types.LiteralNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.LiteralNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, value: Any = None)[source]
class lineapy.data.types.LiteralType(value)[source]

An enumeration.

class lineapy.data.types.LookupNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.LookupNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, name: str)[source]

For unknown/undefined variables e.g. SQLcontext, get_ipython, int.

class lineapy.data.types.MutateNode(*, id: LineaID, session_id: LineaID, node_type: lineapy.data.types.NodeType = NodeType.MutateNode, source_location: lineapy.data.types.SourceLocation = None, control_dependency: LineaID = None, source_id: LineaID, call_id: LineaID)[source]

Represents a mutation of a node’s value.

After a call mutates a node then later references to that node will instead refer to this mutate node.

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.NodeType(value)[source]

An enumeration.

class lineapy.data.types.PipelineType(value)[source]

Pipeline types allow the to_pipeline to know what to expect - SCRIPT : the pipeline is wrapped as a python script - AIRFLOW : the pipeline is wrapped as an airflow dag

class lineapy.data.types.SessionContext(*, id: LineaID, environment_type: lineapy.data.types.SessionType, python_version: str, creation_time: datetime.datetime, working_directory: str, session_name: str = None, user_name: str = None, execution_id: LineaID)[source]

Each trace of a script/notebook is a “Session”.

Parameters

working_directory – captures where the code ran by the user

class lineapy.data.types.SessionType(value)[source]

Session types allow the tracer to know what to expect - JUPYTER: the tracer need to progressively add more nodes to the graph - SCRIPT: the easiest case, run everything until the end

class lineapy.data.types.SourceCode(*, id: LineaID, code: str, location: Union[pathlib.Path, lineapy.data.types.JupyterCell])[source]

The source code of the code that was executed.

class lineapy.data.types.SourceLocation(*, lineno: int, col_offset: int, end_lineno: int, end_col_offset: int, source_code: lineapy.data.types.SourceCode)[source]

The location of the original source.

eventually we need to also be able to support fused locations, like MLIR: https://mlir.llvm.org/docs/Dialects/Builtin/#location-attributes but for now we just point at the original user source location.

class lineapy.data.types.ValueType(value)[source]

Lower case because the API with the frontend assume the characters “chart” exactly as is.

Todo

rename (need coordination with linea-server)

  • really dataset is a table

  • value means its a literal (e.g., int/str)

Module contents