Source code for lineapy.data.types

from __future__ import annotations

import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any, Dict, Iterable, List, NewType, Optional, Union

from pydantic import BaseModel, Field


[docs]class SessionType(Enum): """ Session types allow the tracer to know what to expect - JUPYTER: the tracer need to progressively add more nodes to the graph - SCRIPT: the easiest case, run everything until the end """ JUPYTER = 1 SCRIPT = 2
""" Following are the types used to construct the Linea IR. These should be fairly stable as changing them will likely result in major refactor. You can find extensive examples in tests/stub_data. The orm_mode allows us to use from_orm to convert ORM objects to pydantic objects """ # Use a NewType instead of a string so that we can look at annotations of # fields in pydantic models that use this to differentiate between strings and # IDs when pretty printing LineaID = NewType("LineaID", str)
[docs]class SessionContext(BaseModel): """ Each trace of a script/notebook is a "Session". :param working_directory: captures where the code ran by the user - we should remove the dependency on the working_directory because its brittle """ id: LineaID # populated on creation by uuid.uuid4() environment_type: SessionType creation_time: datetime.datetime working_directory: str # must be passed in for now session_name: Optional[str] = None user_name: Optional[str] = None # The ID of the corresponding execution execution_id: LineaID class Config: orm_mode = True
[docs]class NodeType(Enum): Node = auto() CallNode = auto() LiteralNode = auto() ImportNode = auto() LookupNode = auto() MutateNode = auto() GlobalNode = auto()
[docs]class LiteralType(Enum): String = auto() Integer = auto() Float = auto() Boolean = auto() NoneType = auto() Ellipsis = auto()
[docs]class ValueType(Enum): """ Lower case because the API with the frontend assume the characters "chart" exactly as is. TODO ---- FIXME - rename (need coordination with linea-server): - really `dataset` is a table - `value` means its a literal (e.g., int/str) """ chart = 1 array = 2 dataset = 3 code = 4 value = 5 # includes int, string, bool
class NodeValue(BaseModel): node_id: LineaID # A pointer to the current execution execution_id: LineaID value: str value_type: Optional[ValueType] start_time: datetime.datetime end_time: datetime.datetime class Config: orm_mode = True
[docs]class Execution(BaseModel): """ An execution is one session of running many nodes and recording their values. """ id: LineaID timestamp: Optional[datetime.datetime] class Config: orm_mode = True
[docs]class Artifact(BaseModel): """ An artifact points to the value of a node during some execution. """ node_id: LineaID execution_id: LineaID date_created: datetime.datetime name: str version: int class Config: orm_mode = True
class JupyterCell(BaseModel): # The execution number of the cell # https://nbformat.readthedocs.io/en/latest/format_description.html#code-cells execution_count: int # The session context ID for this execution session_id: LineaID SourceCodeLocation = Union[Path, JupyterCell]
[docs]class SourceCode(BaseModel): """ The source code of the code that was executed. """ id: LineaID code: str location: SourceCodeLocation class Config: orm_mode = True def __hash__(self) -> int: return hash((self.id)) def __eq__(self, other: object) -> bool: if not isinstance(other, SourceCode): return NotImplemented return self.id == other.id def __lt__(self, other: object) -> bool: """ Returns true if the this source code comes before the other, only applies to Jupyter sources. It will return not implemented, if they are not from the same file or the same Jupyter session. """ if not isinstance(other, SourceCode): return NotImplemented self_location = self.location other_location = other.location if isinstance(self_location, Path) and isinstance( other_location, Path ): # If they are of different files, we can't compare them if self_location != other_location: return NotImplemented # Otherwise, they are equal so not lt return False elif isinstance(self_location, JupyterCell) and isinstance( other_location, JupyterCell ): # If they are from different sessions, we cant compare them. if self_location.session_id != other_location.session_id: return NotImplemented # Compare jupyter cells first by execution count, then line number return (self_location.execution_count) < ( other_location.execution_count ) # If they are different source locations, we don't know how to compare assert type(self_location) == type(other_location) return NotImplemented
[docs]class SourceLocation(BaseModel): """ The location of the original source. eventually we need to also be able to support fused locations, like MLIR: https://mlir.llvm.org/docs/Dialects/Builtin/#location-attributes but for now we just point at the original user source location. """ lineno: int col_offset: int = Field(repr=False) end_lineno: int = Field(repr=False) end_col_offset: int = Field(repr=False) source_code: SourceCode = Field(repr=False) def __lt__(self, other: object) -> bool: """ Returns true if the this source location comes before the other. It will return not implemented, if they are not from the same file or the same Jupyter session. """ if not isinstance(other, SourceLocation): return NotImplemented source_code_lt = self.source_code < other.source_code # If they are different source locations, we don't know how to compare if source_code_lt == NotImplemented: return NotImplemented # Otherwise, if they are from the same source, compare by line number if self.source_code.location == other.source_code.location: return (self.lineno, self.col_offset) < ( other.lineno, other.col_offset, ) return source_code_lt class Config: orm_mode = True
[docs]class BaseNode(BaseModel): """ - id: string version of UUID, which we chose because we do not need to coordinate to make it unique - lineno, col_offset, end_lino, end_col_offsets: these record the position of the calls. They are optional because it's not required some nodes, such as side-effects nodes, which do not correspond to a line of code. - `class Config`'s orm_mode allows us to use from_orm to convert ORM objects to pydantic objects """ id: LineaID session_id: LineaID = Field(repr=False) # refers to SessionContext.id node_type: NodeType = Field(NodeType.Node, repr=False) source_location: Optional[SourceLocation] = Field(repr=False) class Config: orm_mode = True def __lt__(self, other: object) -> bool: """ Sort nodes by line number and column, putting those without line numbers at the beginning. Used to break ties in topological node ordering. """ if not isinstance(other, BaseNode): return NotImplemented if not other.source_location: return False if not self.source_location: return True return self.source_location < other.source_location
[docs] def parents(self) -> Iterable[LineaID]: """ Returns the parents of this node. """ # Make an empty generator by yielding from an empty list yield from []
[docs]class ImportNode(BaseNode): """ Imported libraries. `version` and `package_name` are retrieved at runtime. `package_name` may be different from import name, see get_lib_package_version. These are optional because the info is acquired at runtime. Note that this node is not actually used for execution (using `l_import` CallNodes), but more a decoration for metadata retrieval. """ node_type: NodeType = NodeType.ImportNode name: str version: Optional[str] = None package_name: Optional[str] = None path: Optional[str] = None
class PositionalArgument(BaseModel): id: LineaID starred: bool = False class KeywordArgument(BaseModel): key: str value: LineaID starred: bool = False
[docs]class CallNode(BaseNode): """ - `function_id`: node containing the value of the function call, which could be from various places: (1) locally defined, (2) imported, and (3) magically existing, e.g. from builtins (`min`), or environment like `get_ipython`. - `value`: value of the call result, filled at runtime. It may be cached by the data asset manager """ node_type: NodeType = Field(NodeType.CallNode, repr=False) function_id: LineaID positional_args: List[PositionalArgument] = [] keyword_args: List[KeywordArgument] = [] # Mapping of global variables that need to be set to call this function global_reads: Dict[str, LineaID] = {} # TODO: add documentation implicit_dependencies: List[LineaID] = []
[docs] def parents(self) -> Iterable[LineaID]: yield self.function_id yield from [node.id for node in self.positional_args] yield from [node.value for node in self.keyword_args] yield from self.global_reads.values() yield from self.implicit_dependencies
[docs]class LiteralNode(BaseNode): node_type: NodeType = Field(NodeType.LiteralNode, repr=False) value: Any
[docs]class LookupNode(BaseNode): """ For unknown/undefined variables e.g. SQLcontext, get_ipython, int. """ node_type = Field(NodeType.LookupNode, repr=False) name: str
[docs]class MutateNode(BaseNode): """ Represents a mutation of a node's value. After a call mutates a node then later references to that node will instead refer to this mutate node. """ node_type = Field(NodeType.MutateNode, repr=False) # Points to the original node that was mutated source_id: LineaID # Points to the CallNode that did the mutation call_id: LineaID
[docs] def parents(self) -> Iterable[LineaID]: yield self.source_id yield self.call_id
[docs]class GlobalNode(BaseNode): """ Represents a lookup of a global variable, that was set as a side effect in another node. """ node_type = Field(NodeType.GlobalNode, repr=False) # The name of the variable to look up from the result of the call name: str # Points to the call node that updated the global call_id: LineaID
[docs] def parents(self) -> Iterable[LineaID]: yield self.call_id
# We can use this for more precise type definitions, to make sure we hit # all the node cases Node = Union[ ImportNode, CallNode, LiteralNode, LookupNode, MutateNode, GlobalNode ]
[docs]class PipelineType(Enum): """ Pipeline types allow the to_pipeline to know what to expect - SCRIPT : the pipeline is wrapped as a python script - AIRFLOW : the pipeline is wrapped as an airflow dag """ SCRIPT = 1 AIRFLOW = 2