lineapy.graph_reader package


lineapy.graph_reader.artifact_collection module

class lineapy.graph_reader.artifact_collection.ArtifactCollection(db: lineapy.db.db.RelationalLineaDB, target_artifacts: Union[List[str], List[Tuple[str, int]], List[Union[str, Tuple[str, int]]]], input_parameters: List[str] = [], reuse_pre_computed_artifacts: Union[List[str], List[Tuple[str, int]], List[Union[str, Tuple[str, int]]]] = [])[source]

ArtifactCollection can be thought of as a box where the inserted group of artifacts and their graph(s) get refactored into reusable components (i.e., functions with non-overlapping operations). With this modularization, it can then support various downstream code generation tasks such as pipeline file writing.

For now, ArtifactCollection is meant to be kept and used as an abstraction/tool for internal dev use only. That is, the class and its methods will NOT be exposed directly to the user. Instead, it is intended to be used by/in/for other user-facing APIs.

generate_module_text(dependencies: Dict[str, Set[str]] = {}, indentation: int = 4) str[source]

Generate a Python module that reproduces artifacts in the artifact collection. This module is meant to provide function components that can be easily reused to incorporate artifacts into other code contexts.


dependencies – Dependency between artifacts, expressed in graphlib format. For instance, {"B": {"A", "C"}} means artifacts A and C are prerequisites for artifact B.

get_module(dependencies: Dict[str, Set[str]] = {})[source]

Writing module text to a temp file and load module with names of session_art1_art2_...`

lineapy.graph_reader.graph_printer module

class lineapy.graph_reader.graph_printer.GraphPrinter(graph: Graph, include_source_location: bool = True, include_id_field: bool = True, include_session: bool = True, include_imports: bool = False, include_timing: bool = True, nest_nodes: bool = True, id_to_attribute_name: Dict[LineaID, str] = <factory>, node_type_to_count: Dict[NodeType, int] = <factory>, source_code_count: int = 0)[source]

Pretty prints a graph, in a similar way as how you would create it by hand.

This representation should be consistent despite UUIDs being different.

lineapy.graph_reader.graph_printer.pretty_print_node_type(type: str[source]

Turns a node type into something that can be used as a variable name.

lineapy.graph_reader.graph_printer.pretty_print_str(s: str) str[source]

Pretty prints a string, so that if it has a newline, prints it as a triple quoted string.

lineapy.graph_reader.node_collection module

class lineapy.graph_reader.node_collection.NodeCollection(collection_type: lineapy.graph_reader.node_collection.NodeCollectionType, node_list: Set[LineaID], name: str = '', assigned_variables: Set[str] = <factory>, dependent_variables: Set[str] = <factory>, all_variables: Set[str] = <factory>, input_variables: Set[str] = <factory>, tracked_variables: Set[str] = <factory>, return_variables: List[str] = <factory>, artifact_node_id: Optional[LineaID] = None, predecessor_nodes: Set[LineaID] = <factory>, predecessor_artifact: Set[str] = <factory>, input_variable_sources: Dict[str, Set[str]] = <factory>, safename: str = '', graph_segment: Optional[] = None, sliced_nodes: Set[LineaID] = <factory>, raw_codeblock: str = '', is_empty: bool = True, pre_computed_artifact: Union[None, Tuple[str, Optional[int]]] = None, is_pre_computed_artifact: bool = False)[source]

This class is used for holding a set of node(as a subgraph) with same purpose; for instance, calculating some variables, module import, variable assignments. It is initiated with list of nodes:

seg = NodeCollection(node_list)

For variable calculation calculation purpose, it can identify all variables related to these by running:


For all code generating purpose, it need to initiate a real graph objects by:


Then, it provide following method to generat different codeblock for different purpose.

get_function_call_block(indentation=0, keep_lineapy_save=False, result_placeholder=None, source_module='') str[source]

Return a codeblock to call the function with return variables of the graph segment

  • indentation (int) – indentation size

  • keep_lineapy_save (bool) – whether do after execution

  • result_placeholder (Optional[str]) – if not null, append the return result to the result_placeholder

  • source_module (str) – which module the function is coming from

The result_placeholder is a list to capture the artifact variables right after calculation. Considering following code:

a = 1,'a')
b = a+1,'b')
c = a+1,'c')

we need to record the artifact a before it is mutated.

get_function_definition(indentation=4) str[source]

Return a codeblock to define the function of the graph segment. If self.is_pre_computed_artifact is True, will replace the calculation block with lineapy.get().get_value()

get_import_block(indentation=0) str[source]

Return a code block for import statement of the graph segment

get_input_parameters_block(indentation=4) str[source]

Return a code block for input parameters of the graph segment

class lineapy.graph_reader.node_collection.NodeCollectionType(value)[source]

NodeCollection type to identify the purpose of the node collection


node collection for artifact calculation


node collection for calculating variables used in multiple artifacts


node collection for module import


node collection for input variables

class lineapy.graph_reader.node_collection.NodeInfo(assigned_variables: Set[str] = <factory>, assigned_artifact: Optional[str] = None, dependent_variables: Set[str] = <factory>, predecessors: Set[LineaID] = <factory>, tracked_variables: Set[str] = <factory>, module_import: Set[str] = <factory>, artifact_name: Optional[str] = None)[source]

variables assigned at this node


this node is pointing to some artifact


union of if any variable is assigned at predecessor node, use the assigned variables. otherwise, use the dependent_variables


variables that this node is point to


predecessors of the node


module name/alias that this node is point to


this node belong to which artifact calculating block

lineapy.graph_reader.program_slice module

class lineapy.graph_reader.program_slice.CodeSlice(import_lines: List[str], body_lines: List[str])[source]
lineapy.graph_reader.program_slice.get_program_slice(graph:, sinks: List[LineaID], keep_lineapy_save: bool = False) lineapy.graph_reader.program_slice.CodeSlice[source]

Find the necessary and sufficient code for computing the sink nodes.

  • graph – The computation graph.

  • sinks – Artifacts to get the code slice for.

  • keep_lineapy_save – Whether to retain in code slice. Defaults to False.


String containing the necessary and sufficient code for computing sinks.

lineapy.graph_reader.program_slice.get_slice_graph(graph:, sinks: List[LineaID], keep_lineapy_save: bool = False)[source]

Takes a full graph from the session and produces the subset responsible for the “sinks”.

  • graph – A full graph objection from a session.

  • sinks – A list of node IDs desired for slicing.

  • keep_lineapy_save – Whether to retain in code slice. Defaults to False.


A subgraph extracted (i.e., sliced) for the desired node IDs.

lineapy.graph_reader.program_slice.get_source_code_from_graph(program: lineapy.graph_reader.program_slice.CodeSlice[source]

Returns the code from some subgraph, by including all lines that are included in the graphs source.


We need better analysis than just looking at the source code. For example, what if we just need one expression from a line that defines multiple expressions?

We should probably instead regenerate the source from our graph representation.

lineapy.graph_reader.session_artifacts module

class lineapy.graph_reader.session_artifacts.SessionArtifacts(db: lineapy.db.db.RelationalLineaDB, target_artifacts: List[lineapy.api.models.linea_artifact.LineaArtifact], input_parameters: List[str] = [], reuse_pre_computed_artifacts: Dict[str, lineapy.api.models.linea_artifact.LineaArtifact] = {})[source]

Refactor a given session graph for use in a downstream task (e.g., pipeline building).

get_session_artifact_function_definitions(indentation=4) List[str][source]

Return the definition of each targeted artifacts calculation functions.

get_session_function(indentation=4, return_dict_name='artifacts') str[source]

Return the definition of the session function that executes the calculation of all targeted artifacts.

get_session_function_body(indentation, return_dict_name='artifacts') str[source]

Return the args for the session function.

get_session_function_callblock() str[source]

Return the code to make the call to the session function as session_function_name(input_parameters).

get_session_function_name() str[source]

Return the session function name: run_session_including_{first_artifact_name}

get_session_input_parameters_lines(indentation=4) str[source]

Return lines of session code that are replaced by user selected input parameters. These lines also serve as the default values of these variables.

get_session_input_parameters_spec() Dict[str, lineapy.graph_reader.types.InputVariable][source]

Return a dictionary with input parameters as key and InputVariable class as value to generate code related to user input variables.

get_session_module_imports(indentation=0) str[source]

Return all the import statement for the session.

lineapy.graph_reader.types module

class lineapy.graph_reader.types.InputVariable(variable_name, value, value_type)[source]

Class to generate code related input variable and it’s default value

  • variable_name – variable name

  • value – variable value

  • value_type – variable objec type


ex: a = 1


ex: parser.add_arguemnt('--a', default=1, type=int)


ex: a = args.a

lineapy.graph_reader.utils module

Module contents