lineapy.data package

Submodules

lineapy.data.graph module

lineapy.data.graph.queue_get_when(queue: Queue[T], filter_fn: Callable[[T], bool]) T[source]

Gets the first element in the queue that satisfies the filter function.

lineapy.data.types module

class lineapy.data.types.Artifact(*, node_id: LineaID, execution_id: LineaID, date_created: datetime, name: str, version: int)[source]

An artifact points to the value of a node during some execution.

class lineapy.data.types.BaseNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.Node, source_location: SourceLocation = None)[source]
  • id: string version of UUID, which we chose because we do not need to coordinate to make it unique

  • lineno, col_offset, end_lino, end_col_offsets: these record the position of the calls. They are optional because it’s not required some nodes, such as side-effects nodes, which do not correspond to a line of code.

  • class Config’s orm_mode allows us to use from_orm to convert ORM objects to pydantic objects

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.CallNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.CallNode, source_location: SourceLocation = None, function_id: LineaID, positional_args: List[PositionalArgument] = [], keyword_args: List[KeywordArgument] = [], global_reads: Dict[str, LineaID] = {}, implicit_dependencies: List[LineaID] = [])[source]
  • function_id: node containing the value of the function call, which could be from various places: (1) locally defined, (2) imported, and (3) magically existing, e.g. from builtins (min), or environment like get_ipython.

  • value: value of the call result, filled at runtime. It may be cached by the data asset manager

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.Execution(*, id: LineaID, timestamp: datetime = None)[source]

An execution is one session of running many nodes and recording their values.

class lineapy.data.types.GlobalNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.GlobalNode, source_location: SourceLocation = None, name: str, call_id: LineaID)[source]

Represents a lookup of a global variable, that was set as a side effect in another node.

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.ImportNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.ImportNode, source_location: SourceLocation = None, name: str, version: str = None, package_name: str = None, path: str = None)[source]

Imported libraries.

version and package_name are retrieved at runtime. package_name may be different from import name, see get_lib_package_version.

These are optional because the info is acquired at runtime.

Note that this node is not actually used for execution (using l_import CallNodes),

but more a decoration for metadata retrieval.

class lineapy.data.types.LiteralNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.LiteralNode, source_location: SourceLocation = None, value: Any = None)[source]
class lineapy.data.types.LiteralType(value)[source]

An enumeration.

class lineapy.data.types.LookupNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.LookupNode, source_location: SourceLocation = None, name: str)[source]

For unknown/undefined variables e.g. SQLcontext, get_ipython, int.

class lineapy.data.types.MutateNode(*, id: LineaID, session_id: LineaID, node_type: NodeType = NodeType.MutateNode, source_location: SourceLocation = None, source_id: LineaID, call_id: LineaID)[source]

Represents a mutation of a node’s value.

After a call mutates a node then later references to that node will instead refer to this mutate node.

parents() Iterable[LineaID][source]

Returns the parents of this node.

class lineapy.data.types.NodeType(value)[source]

An enumeration.

class lineapy.data.types.PipelineType(value)[source]

Pipeline types allow the to_pipeline to know what to expect - SCRIPT : the pipeline is wrapped as a python script - AIRFLOW : the pipeline is wrapped as an airflow dag

class lineapy.data.types.SessionContext(*, id: LineaID, environment_type: SessionType, creation_time: datetime, working_directory: str, session_name: str = None, user_name: str = None, execution_id: LineaID)[source]

Each trace of a script/notebook is a “Session”. :param working_directory: captures where the code ran by the user

  • we should remove the dependency on the working_directory because its brittle

class lineapy.data.types.SessionType(value)[source]

Session types allow the tracer to know what to expect - JUPYTER: the tracer need to progressively add more nodes to the graph - SCRIPT: the easiest case, run everything until the end

class lineapy.data.types.SourceCode(*, id: LineaID, code: str, location: Union[Path, JupyterCell])[source]

The source code of the code that was executed.

class lineapy.data.types.SourceLocation(*, lineno: int, col_offset: int, end_lineno: int, end_col_offset: int, source_code: SourceCode)[source]

The location of the original source.

eventually we need to also be able to support fused locations, like MLIR: https://mlir.llvm.org/docs/Dialects/Builtin/#location-attributes but for now we just point at the original user source location.

class lineapy.data.types.ValueType(value)[source]

Lower case because the API with the frontend assume the characters “chart” exactly as is.

Todo

FIXME

  • rename (need coordination with linea-server):
    • really dataset is a table

    • value means its a literal (e.g., int/str)

Module contents