Skip to content

lineapy

Graph

Bases: object

Source code in lineapy/data/graph.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class Graph(object):
    def __init__(self, nodes: List[Node], session_context: SessionContext):
        """
        Graph is the core abstraction in LineaPy that is automatically generated
        by capturing and analyzing user code. Nodes in Graph correspond to
        variables and function calls from user code, and edges indicate
        dependencies. This is the common IR upon which all LineaPy applications,
        such as code cleanup and DAG generation, are built.

        Parameters
        ----------
        nodes: List[Node]
            list of LineaPy Nodes that make up the graph.
        session_context: SessionContext
            the session context associated with the graph

        ??? note

            The information in `session_context` is semantically important to
            the notion of a Graph. Concretely, we are starting to also use the code
            entry from the session_context.
        """
        self.nodes: List[Node] = nodes
        self.ids: Dict[LineaID, Node] = dict((n.id, n) for n in nodes)
        self.nx_graph = nx.DiGraph()
        self.nx_graph.add_nodes_from([node.id for node in nodes])

        self.nx_graph.add_edges_from(
            [
                (parent_id, node.id)
                for node in nodes
                for parent_id in node.parents()
                if parent_id in set(self.ids.keys())
            ]
        )

        self.session_context = session_context

        # Checking whether the linea graph created is cyclic or not
        if not nx.is_directed_acyclic_graph(self.nx_graph):
            track(CyclicGraphEvent(""))

    def __eq__(self, other) -> bool:
        return nx.is_isomorphic(self.nx_graph, other.nx_graph)

    def print(self, **kwargs) -> str:
        return GraphPrinter(self, **kwargs).print()

    @listify
    def visit_order(self) -> Iterator[Node]:
        """
        Just using the line number as the tie-breaker for now since we don't
        have a good way to track dependencies.
        Note that we cannot just use the line number to sort because there are
        nodes created by us that do not have line numbers.
        """
        # TODO: Move this out of `Graph` and into classes that operate on top
        #  of the graph.

        # Before the introduction of Control Flow Analysis, the Linea Graph
        # could be represented as a Directed Acyclic Graph where each node could
        # be thought of as a computation with its parents as its dependencies.
        # This was possible as without control flow analysis, we were only
        # dealing with straight line code, which essentially is a sequence of
        # instructions executed one after another with no jumps.
        # However, with the introduction of control flow, we need to introduce
        # cycles in a graph to correspond to the cyclic dependencies possible,
        # especially in loops, as the only way to avoid cycles would be to
        # effectively unroll loops, which can become prohibitively expensive as
        # the number of iterations in a loop increases.

        # Cycles in the graph would be enough to represent data/control
        # dependencies, however while executing the graph we cannot depend on
        # future information to be present. We need a way to break cycles while
        # executing the graph, for which we currently resort to removing certain
        # edges in the graph, to ensure we are able to obtain a topological
        # ordering of the nodes, so that any node being executed depends on a
        # value which is already defined.

        # For a Directed Acyclic Graph, generally, we want to traverse the graph
        # in a way to maintain two constraints:

        # 1. All parents must be traversed before their children
        # 2. If permitted, nodes with smaller line numbers should come first

        # To do this, we do a breadth first traversal, keeping our queue ordered
        # by their line number. The sorting is done via the __lt__ method
        # of the Node
        queue: PriorityQueue[Node] = PriorityQueue()

        # We also keep track of all nodes we have already added to the queue
        # so that we don't add them again.
        seen: Set[LineaID] = set()

        # We also keep a mapping of each node to the number of parents left
        # which have not been visited yet.
        # Note that we want to skip counting parents which are not part of our
        # nodes. This can happen we evaluate part of a graph, then another part.
        # When evaluating the next part, we just have those nodes, so some
        # parents will be missing, we assume they are already executed.

        # We also want to remove certain nodes which result in a cycle. In case
        # a cycle is present, we would have a set of nodes, all of which have a
        # nonzero number of non-executed parents. To find the next node to
        # execute, we want one of the remaining nodes to have zero non-executed
        # parents, which indicates to us that the particular node can be
        # executed as all required information is present.

        # We have certain cases of removing parents in order to ensure no cycles
        # in the execution graph.
        remaining_parents: Dict[str, int] = {}

        for node in self.nodes:
            n_remaining_parents = len(
                [
                    parent_id
                    for parent_id in self.nx_graph.pred[node.id]
                    if parent_id in self.ids
                ]
            )

            # Removing certain edges to ensure the graph for execution is
            # acyclic, to generate a proper order for execution of nodes

            # Simply by reducing the counter `n_remaining_counter` by the
            # appropriate amount is sufficient as we check whether n_remaining_
            # parents for a particular node is zero for deciding whether it can
            # be executed next, rather than modifying the edges in the graph.

            # There is a cyclic dependency amongst and IfNode and ElseNode,
            # both being connected to each other. To break the cycle, we do not
            # consider the connection from the IfNode to the ElseNode (ElseNode
            # is not a dependency for IfNode to run)
            if isinstance(node, IfNode):
                if node.companion_id is not None:
                    n_remaining_parents -= 1

            # First we add all the nodes to the queue which have no parents.
            if n_remaining_parents == 0:
                seen.add(node.id)
                queue.put(node)
            remaining_parents[node.id] = n_remaining_parents

        while queue.qsize():
            # Find the first node in the queue which has all its parents removed
            node = queue_get_when(
                queue, lambda n: remaining_parents[n.id] == 0
            )

            # Then, we add all of its children to the queue, making sure to mark
            # for each that we have seen one of its parents
            yield node
            for child_id in self.get_children(node.id):
                remaining_parents[child_id] -= 1
                if child_id in seen:
                    continue
                child_node = self.ids[child_id]
                queue.put(child_node)
                seen.add(child_id)

    def get_parents(self, node_id: LineaID) -> List[LineaID]:
        return list(self.nx_graph.predecessors(node_id))

    def get_ancestors(self, node_id: LineaID) -> List[LineaID]:
        return list(nx.ancestors(self.nx_graph, node_id))

    def get_children(self, node_id: LineaID) -> List[LineaID]:
        return list(self.nx_graph.successors(node_id))

    def get_descendants(self, node_id: LineaID) -> List[LineaID]:
        return list(nx.descendants(self.nx_graph, node_id))

    def get_leaf_nodes(self) -> List[LineaID]:
        return [
            node
            for node in self.nx_graph.nodes
            if self.nx_graph.out_degree(node) == 0
        ]

    def get_node(self, node_id: Optional[LineaID]) -> Optional[Node]:
        if node_id is not None and node_id in self.ids:
            return self.ids[node_id]
        return None

    def get_subgraph(self, nodes: List[Node]) -> "Graph":
        """
        Get a subgraph of the current graph induced by the input nodes.

        Parameters
        ----------
        nodes: List[Node]
            The nodes in the subgraph

        Returns
        -------
        Graph
            A new `Graph` that contains `nodes` and the edges between
            `nodes` in the current Graph and has the same session_context.
        """
        return Graph(nodes, self.session_context)

    def get_subgraph_from_id(self, nodeids: List[LineaID]) -> "Graph":
        """
        Get subgraph from list of LineaID
        """
        nodes: List[Node] = []
        for node_id in nodeids:
            node = self.get_node(node_id)
            if node is not None:
                nodes.append(node)
        return self.get_subgraph(nodes)

    @classmethod
    def create_session_graph(cls, db: RelationalLineaDB, session_id: LineaID):
        session_context = db.get_session_context(session_id)
        session_nodes = db.get_nodes_for_session(session_id)
        return cls(session_nodes, session_context)

    def __str__(self):
        return prettify(
            self.print(
                include_source_location=False,
                include_id_field=True,
                include_session=False,
            )
        )

    def __repr__(self):
        return prettify(self.print())

__init__(nodes, session_context)

Graph is the core abstraction in LineaPy that is automatically generated by capturing and analyzing user code. Nodes in Graph correspond to variables and function calls from user code, and edges indicate dependencies. This is the common IR upon which all LineaPy applications, such as code cleanup and DAG generation, are built.

Parameters:

Name Type Description Default
nodes List[Node]

list of LineaPy Nodes that make up the graph.

required
session_context SessionContext

the session context associated with the graph

required
Note

The information in session_context is semantically important to the notion of a Graph. Concretely, we are starting to also use the code entry from the session_context.

Source code in lineapy/data/graph.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, nodes: List[Node], session_context: SessionContext):
    """
    Graph is the core abstraction in LineaPy that is automatically generated
    by capturing and analyzing user code. Nodes in Graph correspond to
    variables and function calls from user code, and edges indicate
    dependencies. This is the common IR upon which all LineaPy applications,
    such as code cleanup and DAG generation, are built.

    Parameters
    ----------
    nodes: List[Node]
        list of LineaPy Nodes that make up the graph.
    session_context: SessionContext
        the session context associated with the graph

    ??? note

        The information in `session_context` is semantically important to
        the notion of a Graph. Concretely, we are starting to also use the code
        entry from the session_context.
    """
    self.nodes: List[Node] = nodes
    self.ids: Dict[LineaID, Node] = dict((n.id, n) for n in nodes)
    self.nx_graph = nx.DiGraph()
    self.nx_graph.add_nodes_from([node.id for node in nodes])

    self.nx_graph.add_edges_from(
        [
            (parent_id, node.id)
            for node in nodes
            for parent_id in node.parents()
            if parent_id in set(self.ids.keys())
        ]
    )

    self.session_context = session_context

    # Checking whether the linea graph created is cyclic or not
    if not nx.is_directed_acyclic_graph(self.nx_graph):
        track(CyclicGraphEvent(""))

get_subgraph(nodes)

Get a subgraph of the current graph induced by the input nodes.

Parameters:

Name Type Description Default
nodes List[Node]

The nodes in the subgraph

required

Returns:

Type Description
Graph

A new Graph that contains nodes and the edges between nodes in the current Graph and has the same session_context.

Source code in lineapy/data/graph.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def get_subgraph(self, nodes: List[Node]) -> "Graph":
    """
    Get a subgraph of the current graph induced by the input nodes.

    Parameters
    ----------
    nodes: List[Node]
        The nodes in the subgraph

    Returns
    -------
    Graph
        A new `Graph` that contains `nodes` and the edges between
        `nodes` in the current Graph and has the same session_context.
    """
    return Graph(nodes, self.session_context)

get_subgraph_from_id(nodeids)

Get subgraph from list of LineaID

Source code in lineapy/data/graph.py
215
216
217
218
219
220
221
222
223
224
def get_subgraph_from_id(self, nodeids: List[LineaID]) -> "Graph":
    """
    Get subgraph from list of LineaID
    """
    nodes: List[Node] = []
    for node_id in nodeids:
        node = self.get_node(node_id)
        if node is not None:
            nodes.append(node)
    return self.get_subgraph(nodes)

visit_order()

Just using the line number as the tie-breaker for now since we don't have a good way to track dependencies. Note that we cannot just use the line number to sort because there are nodes created by us that do not have line numbers.

Source code in lineapy/data/graph.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@listify
def visit_order(self) -> Iterator[Node]:
    """
    Just using the line number as the tie-breaker for now since we don't
    have a good way to track dependencies.
    Note that we cannot just use the line number to sort because there are
    nodes created by us that do not have line numbers.
    """
    # TODO: Move this out of `Graph` and into classes that operate on top
    #  of the graph.

    # Before the introduction of Control Flow Analysis, the Linea Graph
    # could be represented as a Directed Acyclic Graph where each node could
    # be thought of as a computation with its parents as its dependencies.
    # This was possible as without control flow analysis, we were only
    # dealing with straight line code, which essentially is a sequence of
    # instructions executed one after another with no jumps.
    # However, with the introduction of control flow, we need to introduce
    # cycles in a graph to correspond to the cyclic dependencies possible,
    # especially in loops, as the only way to avoid cycles would be to
    # effectively unroll loops, which can become prohibitively expensive as
    # the number of iterations in a loop increases.

    # Cycles in the graph would be enough to represent data/control
    # dependencies, however while executing the graph we cannot depend on
    # future information to be present. We need a way to break cycles while
    # executing the graph, for which we currently resort to removing certain
    # edges in the graph, to ensure we are able to obtain a topological
    # ordering of the nodes, so that any node being executed depends on a
    # value which is already defined.

    # For a Directed Acyclic Graph, generally, we want to traverse the graph
    # in a way to maintain two constraints:

    # 1. All parents must be traversed before their children
    # 2. If permitted, nodes with smaller line numbers should come first

    # To do this, we do a breadth first traversal, keeping our queue ordered
    # by their line number. The sorting is done via the __lt__ method
    # of the Node
    queue: PriorityQueue[Node] = PriorityQueue()

    # We also keep track of all nodes we have already added to the queue
    # so that we don't add them again.
    seen: Set[LineaID] = set()

    # We also keep a mapping of each node to the number of parents left
    # which have not been visited yet.
    # Note that we want to skip counting parents which are not part of our
    # nodes. This can happen we evaluate part of a graph, then another part.
    # When evaluating the next part, we just have those nodes, so some
    # parents will be missing, we assume they are already executed.

    # We also want to remove certain nodes which result in a cycle. In case
    # a cycle is present, we would have a set of nodes, all of which have a
    # nonzero number of non-executed parents. To find the next node to
    # execute, we want one of the remaining nodes to have zero non-executed
    # parents, which indicates to us that the particular node can be
    # executed as all required information is present.

    # We have certain cases of removing parents in order to ensure no cycles
    # in the execution graph.
    remaining_parents: Dict[str, int] = {}

    for node in self.nodes:
        n_remaining_parents = len(
            [
                parent_id
                for parent_id in self.nx_graph.pred[node.id]
                if parent_id in self.ids
            ]
        )

        # Removing certain edges to ensure the graph for execution is
        # acyclic, to generate a proper order for execution of nodes

        # Simply by reducing the counter `n_remaining_counter` by the
        # appropriate amount is sufficient as we check whether n_remaining_
        # parents for a particular node is zero for deciding whether it can
        # be executed next, rather than modifying the edges in the graph.

        # There is a cyclic dependency amongst and IfNode and ElseNode,
        # both being connected to each other. To break the cycle, we do not
        # consider the connection from the IfNode to the ElseNode (ElseNode
        # is not a dependency for IfNode to run)
        if isinstance(node, IfNode):
            if node.companion_id is not None:
                n_remaining_parents -= 1

        # First we add all the nodes to the queue which have no parents.
        if n_remaining_parents == 0:
            seen.add(node.id)
            queue.put(node)
        remaining_parents[node.id] = n_remaining_parents

    while queue.qsize():
        # Find the first node in the queue which has all its parents removed
        node = queue_get_when(
            queue, lambda n: remaining_parents[n.id] == 0
        )

        # Then, we add all of its children to the queue, making sure to mark
        # for each that we have seen one of its parents
        yield node
        for child_id in self.get_children(node.id):
            remaining_parents[child_id] -= 1
            if child_id in seen:
                continue
            child_node = self.ids[child_id]
            queue.put(child_node)
            seen.add(child_id)

SessionType

Bases: Enum

Session types allow the tracer to know what to expect - JUPYTER: the tracer need to progressively add more nodes to the graph - SCRIPT: the easiest case, run everything until the end

Source code in lineapy/data/types.py
13
14
15
16
17
18
19
20
21
class SessionType(Enum):
    """
    Session types allow the tracer to know what to expect
    - JUPYTER: the tracer need to progressively add more nodes to the graph
    - SCRIPT: the easiest case, run everything until the end
    """

    JUPYTER = 1
    SCRIPT = 2

Tracer dataclass

Source code in lineapy/instrumentation/tracer.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
@dataclass
class Tracer:
    db: RelationalLineaDB

    session_type: InitVar[SessionType]
    session_name: InitVar[Optional[str]] = None
    globals_: InitVar[Optional[Dict[str, object]]] = None

    variable_name_to_node: Dict[str, Node] = field(default_factory=dict)
    module_name_to_node: Dict[str, Node] = field(default_factory=dict)

    tracer_context: TracerContext = field(init=False)
    executor: Executor = field(init=False)
    mutation_tracker: MutationTracker = field(default_factory=MutationTracker)
    control_flow_tracker: ControlFlowTracker = field(
        default_factory=ControlFlowTracker
    )

    def __post_init__(
        self,
        session_type: SessionType,
        session_name: Optional[str],
        globals_: Optional[Dict[str, object]],
    ):
        """
        Tracer is internal to Linea and it implements the "hidden APIs"
        that are setup by the transformer.
        It performs the following key functionalities:
        - Creates the graph nodes and inserts into the database.
        - Maintains data structures to help creating the graph IR
          that is used later, which includes:
          - `variable_name_to_id`: for tracking variable/function/module
            to the ID responsible for its creation
        - Executes the program, using the `Executor`.

        Note that we don't currently maintain the variable names in the persisted
        graph (we used to at some point in the past), but we can add a serialized
        version of `variable_name_to_id` to the session if we want to persist
        the information. Which could be useful for e.g., post-hoc lifting of
        linea artifacts.
        """
        self.executor = Executor(self.db, globals_ or globals())

        session_context = SessionContext(
            id=get_new_id(),
            environment_type=session_type,
            python_version=get_system_python_version(),  # up to minor version
            creation_time=datetime.now(),
            working_directory=getcwd(),
            session_name=session_name,
            execution_id=self.executor.execution.id,
        )
        self.db.write_context(session_context)
        self.tracer_context = TracerContext(
            session_context=session_context, db=self.db
        )

    @property
    def values(self) -> Dict[str, object]:
        """
        Returns a mapping of variable names to their values, by joining
        the scoping information with the executor values.
        """
        return {
            k: self.executor.get_value(n.id)
            for k, n in self.variable_name_to_node.items()
        }

    def process_node(self, node: Node) -> None:
        """
        Execute a node, and adds it to the database.
        """

        ##
        # Update the graph from the side effects of the node,
        # If an artifact could not be created, quietly return without saving
        # the node to the DB.
        ##
        logger.debug("Executing node %s", node)
        try:
            side_effects = self.executor.execute_node(
                node,
                {k: v.id for k, v in self.variable_name_to_node.items()},
            )
        except ArtifactSaveException as exc_info:
            logger.error("Artifact could not be saved.")
            logger.debug(exc_info)
            return
        logger.debug("Processing side effects")

        # Iterate through each side effect and process it, depending on its type
        for e in side_effects:
            if isinstance(e, ImplicitDependencyNode):
                self._process_implicit_dependency(
                    node, self._resolve_pointer(e.pointer)
                )
            elif isinstance(e, ViewOfNodes):
                if len(e.pointers) > 0:  # skip if empty
                    self.mutation_tracker.set_as_viewers_of_each_other(
                        *map(self._resolve_pointer, e.pointers)
                    )
            elif isinstance(e, AccessedGlobals):
                self._process_accessed_globals(
                    node.session_id, node, e.retrieved, e.added_or_updated
                )
            # Mutate case
            else:
                mutated_node_id = self._resolve_pointer(e.pointer)
                for (
                    mutate_node_id,
                    source_id,
                ) in self.mutation_tracker.set_as_mutated(mutated_node_id):
                    mutate_node = MutateNode(
                        id=mutate_node_id,
                        session_id=node.session_id,
                        source_id=source_id,
                        call_id=node.id,
                        control_dependency=self.control_flow_tracker.current_control_dependency(),
                    )
                    self.process_node(mutate_node)

        # also special case for import node
        if isinstance(node, ImportNode):
            # must process after the call has been executed
            package_name, version = get_lib_package_version(node.name)
            node.version = version
            node.package_name = package_name

        self.db.write_node(node)

    def _resolve_pointer(self, ptr: ExecutorPointer) -> LineaID:
        if isinstance(ptr, ID):
            return ptr.id
        if isinstance(ptr, Variable):
            return self.variable_name_to_node[ptr.name].id
        # Handle external state case, by making a lookup node for it
        if isinstance(ptr, ExternalState):
            return (
                self.executor.lookup_external_state(ptr)
                or self.lookup_node(ptr.external_state).id
            )
        raise ValueError(f"Unsupported pointer type: {type(ptr)}")

    def _process_implicit_dependency(
        self, node: Node, implicit_dependency_id: LineaID
    ) -> None:
        """
        Add dependency of a node on a global implicit dependency,
        which is a dependency that lineapy has deemed essential in the
        reproduction of an artifact but is not explicitly passed as arguments
        """

        # Only call nodes can refer to implicit dependencies
        assert isinstance(node, CallNode)
        node.implicit_dependencies.append(
            self.mutation_tracker.get_latest_mutate_node(
                implicit_dependency_id
            )
        )

    def _process_accessed_globals(
        self,
        session_id: str,
        node: Node,
        retrieved: List[str],
        added_or_updated: List[str],
    ) -> None:

        # Only call nodes can access globals and have the global_reads attribute
        assert isinstance(node, CallNode)

        # Add the retrieved globals as global reads to the call node
        node.global_reads = {
            var: self.mutation_tracker.get_latest_mutate_node(
                self.variable_name_to_node[var].id
            )
            for var in retrieved
            # Only save reads from variables that we have already saved variables for
            # Assume that all other reads are for variables assigned inside the call
            if var in self.variable_name_to_node
        }

        # Create a new global node for each added/updated
        for var in added_or_updated:
            global_node = GlobalNode(
                id=get_new_id(),
                session_id=session_id,
                name=var,
                call_id=node.id,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
            )
            self.process_node(global_node)
            self.variable_name_to_node[var] = global_node

    def lookup_node(
        self,
        variable_name: str,
        source_location: Optional[SourceLocation] = None,
    ) -> Node:
        """
        Cases for the node that we are looking up:

        - user defined variable & function definitions
        - imported libs
        - unknown runtime magic functions — special case to LookupNode

          - builtin functions, e.g., min
          - custom runtime, e.g., get_ipython

        """
        if variable_name in self.variable_name_to_node:
            # user define var and fun def
            return self.variable_name_to_node[variable_name]
        elif variable_name in self.module_name_to_node:
            return self.module_name_to_node[variable_name]
        else:
            new_node = LookupNode(
                id=get_new_id(),
                session_id=self.get_session_id(),
                name=variable_name,
                source_location=source_location,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
            )
            self.process_node(new_node)
            return new_node

    def import_module(
        self,
        name: str,
        source_location: Optional[SourceLocation] = None,
    ) -> Node:
        """
        Import a module. If we have already imported it, just return its ID.
        Otherwise, create new module nodes for each submodule in its parents and return it.
        """
        if name in self.module_name_to_node:
            return self.module_name_to_node[name]
        # Recursively go up the tree, to try to get parents, and if we don't have them, import them
        *parents, module_name = name.split(".")
        if parents:
            parent_module = self.import_module(
                ".".join(parents),
                source_location,
            )
            node = self.call(
                self.lookup_node(l_import.__name__),
                source_location,
                self.literal(module_name),
                parent_module,
            )
        else:
            node = self.call(
                self.lookup_node(l_import.__name__),
                source_location,
                self.literal(module_name),
            )
        self.module_name_to_node[name] = node
        return node

    def trace_import(
        self,
        name: str,
        source_location: Optional[SourceLocation] = None,
        alias: Optional[str] = None,
        attributes: Optional[Dict[str, str]] = None,
    ) -> None:
        """
        Parameters
        ----------
        name: str
            the name of the module
        alias: Optional[str]
            the module could be aliased, e.g., import pandas as pd
        attributes: Optional[Dict[str, str]]
            a list of functions imported from the library.
            It keys the aliased name to the original name.

        ??? note
            - The input args would _either_ have alias or attributes, but not both
            - Didn't call the function import because I think that's a protected name

        note that version and path will be introspected at runtime
        """
        module_node = self.import_module(name, source_location)
        if alias:
            self.assign(alias, module_node, from_import=True)
        elif attributes:
            module_value = self.executor.get_value(module_node.id)
            if IMPORT_STAR in attributes:
                """
                Import the module, get all public attributes, and set them as globals
                """
                # Import star behavior copied from python docs
                # https://docs.python.org/3/reference/simple_stmts.html#the-import-statement
                if hasattr(module_value, "__all__"):
                    public_names = module_value.__all__  # type: ignore
                else:
                    public_names = [
                        attr
                        for attr in dir(module_value)
                        if not attr.startswith("_")
                    ]
                attributes = {attr: attr for attr in public_names}
            """
            load module `x`, check if `y` is an attribute of `x`, otherwise load `x.y`
            If `x.y` is a module, load that, otherwise get the `y` attribute of `x`.
            """
            for alias, attr_or_module in attributes.items():
                if hasattr(module_value, attr_or_module):
                    self.assign(
                        alias,
                        self.call(
                            self.lookup_node(GETATTR),
                            source_location,
                            module_node,
                            self.literal(attr_or_module),
                        ),
                        from_import=True,
                    )
                else:
                    full_name = f"{name}.{attr_or_module}"
                    sub_module_node = self.import_module(
                        full_name, source_location
                    )
                    self.assign(alias, sub_module_node, from_import=True)

        else:
            self.assign(name, module_node, from_import=True)

        node = ImportNode(
            id=get_new_id(),
            name=name,
            session_id=self.get_session_id(),
            source_location=source_location,
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(node)

    def literal(
        self,
        value: object,
        source_location: Optional[SourceLocation] = None,
    ):
        # this literal should be assigned or used later
        node = LiteralNode(
            id=get_new_id(),
            session_id=self.get_session_id(),
            value=value,
            source_location=source_location,
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(node)
        return node

    def __get_positional_arguments(self, arguments):
        for arg in arguments:
            if isinstance(arg, tuple) or isinstance(arg, list):
                yield PositionalArgument(
                    id=self.mutation_tracker.get_latest_mutate_node(arg[1].id),
                    starred=arg[0],
                )

            else:
                yield PositionalArgument(
                    id=self.mutation_tracker.get_latest_mutate_node(arg.id),
                    starred=False,
                )

    def __get_keyword_arguments(self, keyword_arguments):
        for k, n in keyword_arguments.items():
            values = self.mutation_tracker.get_latest_mutate_node(n.id)
            if k.startswith("unpack_"):
                yield KeywordArgument(key="**", value=values, starred=True)
            else:
                yield KeywordArgument(key=k, value=values, starred=False)

    def call(
        self,
        function_node: Node,
        source_location: Optional[SourceLocation],
        # function_name: str,
        *arguments: Union[Node, Tuple[bool, Node]],
        **keyword_arguments: Node,
    ) -> CallNode:
        """
        Parameters
        ----------
        function_node: Node
            the function node to call/execute
        source_location: Optional[SourceLocation]
            the source info from user code
        arguments: Union[Node, Tuple[bool, Node]]
            positional arguments. These are passed as either Nodes (named nodes, constants, etc)
            or tuples (starred, the node) where the starred is a boolean to indicate whether
            the argument is supposed to be splatted before passing to the function (This is
            the case where you might call a function like so ``foo(1, *[2, 3])`` ). The boolean is made
            optional simply to support the legacy way of calling this function and not having to pass
            the tuples for every single case from node_transformer
        keyword_arguments: Node
            keyword arguments. These are passed as a dictionary of keyword arguments to the
            function. Similar to ``*positional_arguments``, the keyword arguments can also be splatted
            by naming the key as ``unpack_<index>`` where <index> is the index of the argument. In this
            case, the dictionary will be unpacked and passed as keyword arguments to the function.
            The keyword arguments are processed in order of passing so any keyword conflicts will
            result in the last value accepted as the value for the keyword.

        Returns
        -------
        CallNode
            a call node

        ??? note
            - It's important for the call to return the call node
            so that we can programmatically chain the the nodes together,
            e.g., for the assignment call to modify the previous call node.
            - The call looks up if it's a locally defined function. We decided
            that this is better for program slicing.
        """

        node = CallNode(
            id=get_new_id(),
            session_id=self.get_session_id(),
            function_id=function_node.id,
            positional_args=self.__get_positional_arguments(arguments),
            keyword_args=self.__get_keyword_arguments(keyword_arguments),
            source_location=source_location,
            global_reads={},
            implicit_dependencies=[],
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(node)
        return node

    def get_control_node(
        self,
        type: NodeType,
        node_id: LineaID,
        companion_id: Optional[LineaID],
        source_location: Optional[SourceLocation] = None,
        test_id: Optional[LineaID] = None,
        unexec_id: Optional[LineaID] = None,
    ) -> ControlFlowContext:
        node: ControlNode
        if type == NodeType.IfNode:
            node = IfNode(
                id=node_id,
                session_id=self.get_session_id(),
                source_location=source_location,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
                unexec_id=unexec_id,
                test_id=test_id,
                companion_id=companion_id,
            )
        elif type == NodeType.ElseNode:
            node = ElseNode(
                id=node_id,
                session_id=self.get_session_id(),
                source_location=source_location,
                control_dependency=self.control_flow_tracker.current_control_dependency(),
                companion_id=companion_id,
                unexec_id=unexec_id,
            )
        else:
            raise NotImplementedError(
                "Requested node type is not implemented as a control flow node type: ",
                type,
            )
        self.process_node(node)
        return ControlFlowContext(node, self.control_flow_tracker)

    def assign(
        self, variable_name: str, value_node: Node, from_import: bool = False
    ) -> None:
        """
        Assign updates a local mapping of variable nodes.
        """
        logger.debug("assigning %s = %s", variable_name, value_node)
        existing_value_node = self.variable_name_to_node.get(
            variable_name, None
        )
        if (
            existing_value_node is None
            or existing_value_node.id != value_node.id
            or not from_import
        ):
            self.variable_name_to_node[variable_name] = value_node
            self.db.write_assigned_variable(value_node.id, variable_name)
        return

    def tuple(
        self, *args: Node, source_location: Optional[SourceLocation] = None
    ) -> CallNode:
        return self.call(
            self.lookup_node(l_tuple.__name__),
            source_location,
            *args,
        )

    # tracer context method wrappers from here on
    def get_session_id(self) -> LineaID:
        return self.tracer_context.get_session_id()

    @property
    def graph(self) -> Graph:
        return self.tracer_context.graph

    def session_artifacts(self) -> List[ArtifactORM]:
        return self.tracer_context.session_artifacts()

    @property
    def artifacts(self) -> Dict[str, str]:
        return self.tracer_context.artifacts

    def slice(self, name: str) -> str:
        return self.tracer_context.slice(name)

    def get_working_dir(self) -> str:
        return self.tracer_context.session_context.working_directory

values: Dict[str, object] property

Returns a mapping of variable names to their values, by joining the scoping information with the executor values.

__post_init__(session_type, session_name, globals_)

Tracer is internal to Linea and it implements the "hidden APIs" that are setup by the transformer. It performs the following key functionalities: - Creates the graph nodes and inserts into the database. - Maintains data structures to help creating the graph IR that is used later, which includes: - variable_name_to_id: for tracking variable/function/module to the ID responsible for its creation - Executes the program, using the Executor.

Note that we don't currently maintain the variable names in the persisted graph (we used to at some point in the past), but we can add a serialized version of variable_name_to_id to the session if we want to persist the information. Which could be useful for e.g., post-hoc lifting of linea artifacts.

Source code in lineapy/instrumentation/tracer.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __post_init__(
    self,
    session_type: SessionType,
    session_name: Optional[str],
    globals_: Optional[Dict[str, object]],
):
    """
    Tracer is internal to Linea and it implements the "hidden APIs"
    that are setup by the transformer.
    It performs the following key functionalities:
    - Creates the graph nodes and inserts into the database.
    - Maintains data structures to help creating the graph IR
      that is used later, which includes:
      - `variable_name_to_id`: for tracking variable/function/module
        to the ID responsible for its creation
    - Executes the program, using the `Executor`.

    Note that we don't currently maintain the variable names in the persisted
    graph (we used to at some point in the past), but we can add a serialized
    version of `variable_name_to_id` to the session if we want to persist
    the information. Which could be useful for e.g., post-hoc lifting of
    linea artifacts.
    """
    self.executor = Executor(self.db, globals_ or globals())

    session_context = SessionContext(
        id=get_new_id(),
        environment_type=session_type,
        python_version=get_system_python_version(),  # up to minor version
        creation_time=datetime.now(),
        working_directory=getcwd(),
        session_name=session_name,
        execution_id=self.executor.execution.id,
    )
    self.db.write_context(session_context)
    self.tracer_context = TracerContext(
        session_context=session_context, db=self.db
    )

assign(variable_name, value_node, from_import=False)

Assign updates a local mapping of variable nodes.

Source code in lineapy/instrumentation/tracer.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def assign(
    self, variable_name: str, value_node: Node, from_import: bool = False
) -> None:
    """
    Assign updates a local mapping of variable nodes.
    """
    logger.debug("assigning %s = %s", variable_name, value_node)
    existing_value_node = self.variable_name_to_node.get(
        variable_name, None
    )
    if (
        existing_value_node is None
        or existing_value_node.id != value_node.id
        or not from_import
    ):
        self.variable_name_to_node[variable_name] = value_node
        self.db.write_assigned_variable(value_node.id, variable_name)
    return

call(function_node, source_location, *arguments, **keyword_arguments)

Parameters:

Name Type Description Default
function_node Node

the function node to call/execute

required
source_location Optional[SourceLocation]

the source info from user code

required
arguments Union[Node, Tuple[bool, Node]]

positional arguments. These are passed as either Nodes (named nodes, constants, etc) or tuples (starred, the node) where the starred is a boolean to indicate whether the argument is supposed to be splatted before passing to the function (This is the case where you might call a function like so foo(1, *[2, 3]) ). The boolean is made optional simply to support the legacy way of calling this function and not having to pass the tuples for every single case from node_transformer

()
keyword_arguments Node

keyword arguments. These are passed as a dictionary of keyword arguments to the function. Similar to *positional_arguments, the keyword arguments can also be splatted by naming the key as unpack_<index> where is the index of the argument. In this case, the dictionary will be unpacked and passed as keyword arguments to the function. The keyword arguments are processed in order of passing so any keyword conflicts will result in the last value accepted as the value for the keyword.

{}

Returns:

Type Description
CallNode

a call node

Note
  • It's important for the call to return the call node so that we can programmatically chain the the nodes together, e.g., for the assignment call to modify the previous call node.
  • The call looks up if it's a locally defined function. We decided that this is better for program slicing.
Source code in lineapy/instrumentation/tracer.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def call(
    self,
    function_node: Node,
    source_location: Optional[SourceLocation],
    # function_name: str,
    *arguments: Union[Node, Tuple[bool, Node]],
    **keyword_arguments: Node,
) -> CallNode:
    """
    Parameters
    ----------
    function_node: Node
        the function node to call/execute
    source_location: Optional[SourceLocation]
        the source info from user code
    arguments: Union[Node, Tuple[bool, Node]]
        positional arguments. These are passed as either Nodes (named nodes, constants, etc)
        or tuples (starred, the node) where the starred is a boolean to indicate whether
        the argument is supposed to be splatted before passing to the function (This is
        the case where you might call a function like so ``foo(1, *[2, 3])`` ). The boolean is made
        optional simply to support the legacy way of calling this function and not having to pass
        the tuples for every single case from node_transformer
    keyword_arguments: Node
        keyword arguments. These are passed as a dictionary of keyword arguments to the
        function. Similar to ``*positional_arguments``, the keyword arguments can also be splatted
        by naming the key as ``unpack_<index>`` where <index> is the index of the argument. In this
        case, the dictionary will be unpacked and passed as keyword arguments to the function.
        The keyword arguments are processed in order of passing so any keyword conflicts will
        result in the last value accepted as the value for the keyword.

    Returns
    -------
    CallNode
        a call node

    ??? note
        - It's important for the call to return the call node
        so that we can programmatically chain the the nodes together,
        e.g., for the assignment call to modify the previous call node.
        - The call looks up if it's a locally defined function. We decided
        that this is better for program slicing.
    """

    node = CallNode(
        id=get_new_id(),
        session_id=self.get_session_id(),
        function_id=function_node.id,
        positional_args=self.__get_positional_arguments(arguments),
        keyword_args=self.__get_keyword_arguments(keyword_arguments),
        source_location=source_location,
        global_reads={},
        implicit_dependencies=[],
        control_dependency=self.control_flow_tracker.current_control_dependency(),
    )
    self.process_node(node)
    return node

import_module(name, source_location=None)

Import a module. If we have already imported it, just return its ID. Otherwise, create new module nodes for each submodule in its parents and return it.

Source code in lineapy/instrumentation/tracer.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def import_module(
    self,
    name: str,
    source_location: Optional[SourceLocation] = None,
) -> Node:
    """
    Import a module. If we have already imported it, just return its ID.
    Otherwise, create new module nodes for each submodule in its parents and return it.
    """
    if name in self.module_name_to_node:
        return self.module_name_to_node[name]
    # Recursively go up the tree, to try to get parents, and if we don't have them, import them
    *parents, module_name = name.split(".")
    if parents:
        parent_module = self.import_module(
            ".".join(parents),
            source_location,
        )
        node = self.call(
            self.lookup_node(l_import.__name__),
            source_location,
            self.literal(module_name),
            parent_module,
        )
    else:
        node = self.call(
            self.lookup_node(l_import.__name__),
            source_location,
            self.literal(module_name),
        )
    self.module_name_to_node[name] = node
    return node

lookup_node(variable_name, source_location=None)

Cases for the node that we are looking up:

  • user defined variable & function definitions
  • imported libs
  • unknown runtime magic functions — special case to LookupNode

  • builtin functions, e.g., min

  • custom runtime, e.g., get_ipython
Source code in lineapy/instrumentation/tracer.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def lookup_node(
    self,
    variable_name: str,
    source_location: Optional[SourceLocation] = None,
) -> Node:
    """
    Cases for the node that we are looking up:

    - user defined variable & function definitions
    - imported libs
    - unknown runtime magic functions &mdash; special case to LookupNode

      - builtin functions, e.g., min
      - custom runtime, e.g., get_ipython

    """
    if variable_name in self.variable_name_to_node:
        # user define var and fun def
        return self.variable_name_to_node[variable_name]
    elif variable_name in self.module_name_to_node:
        return self.module_name_to_node[variable_name]
    else:
        new_node = LookupNode(
            id=get_new_id(),
            session_id=self.get_session_id(),
            name=variable_name,
            source_location=source_location,
            control_dependency=self.control_flow_tracker.current_control_dependency(),
        )
        self.process_node(new_node)
        return new_node

process_node(node)

Execute a node, and adds it to the database.

Source code in lineapy/instrumentation/tracer.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def process_node(self, node: Node) -> None:
    """
    Execute a node, and adds it to the database.
    """

    ##
    # Update the graph from the side effects of the node,
    # If an artifact could not be created, quietly return without saving
    # the node to the DB.
    ##
    logger.debug("Executing node %s", node)
    try:
        side_effects = self.executor.execute_node(
            node,
            {k: v.id for k, v in self.variable_name_to_node.items()},
        )
    except ArtifactSaveException as exc_info:
        logger.error("Artifact could not be saved.")
        logger.debug(exc_info)
        return
    logger.debug("Processing side effects")

    # Iterate through each side effect and process it, depending on its type
    for e in side_effects:
        if isinstance(e, ImplicitDependencyNode):
            self._process_implicit_dependency(
                node, self._resolve_pointer(e.pointer)
            )
        elif isinstance(e, ViewOfNodes):
            if len(e.pointers) > 0:  # skip if empty
                self.mutation_tracker.set_as_viewers_of_each_other(
                    *map(self._resolve_pointer, e.pointers)
                )
        elif isinstance(e, AccessedGlobals):
            self._process_accessed_globals(
                node.session_id, node, e.retrieved, e.added_or_updated
            )
        # Mutate case
        else:
            mutated_node_id = self._resolve_pointer(e.pointer)
            for (
                mutate_node_id,
                source_id,
            ) in self.mutation_tracker.set_as_mutated(mutated_node_id):
                mutate_node = MutateNode(
                    id=mutate_node_id,
                    session_id=node.session_id,
                    source_id=source_id,
                    call_id=node.id,
                    control_dependency=self.control_flow_tracker.current_control_dependency(),
                )
                self.process_node(mutate_node)

    # also special case for import node
    if isinstance(node, ImportNode):
        # must process after the call has been executed
        package_name, version = get_lib_package_version(node.name)
        node.version = version
        node.package_name = package_name

    self.db.write_node(node)

trace_import(name, source_location=None, alias=None, attributes=None)

Parameters:

Name Type Description Default
name str

the name of the module

required
alias Optional[str]

the module could be aliased, e.g., import pandas as pd

None
attributes Optional[Dict[str, str]]

a list of functions imported from the library. It keys the aliased name to the original name.

None
Note
  • The input args would either have alias or attributes, but not both
  • Didn't call the function import because I think that's a protected name

note that version and path will be introspected at runtime

Source code in lineapy/instrumentation/tracer.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def trace_import(
    self,
    name: str,
    source_location: Optional[SourceLocation] = None,
    alias: Optional[str] = None,
    attributes: Optional[Dict[str, str]] = None,
) -> None:
    """
    Parameters
    ----------
    name: str
        the name of the module
    alias: Optional[str]
        the module could be aliased, e.g., import pandas as pd
    attributes: Optional[Dict[str, str]]
        a list of functions imported from the library.
        It keys the aliased name to the original name.

    ??? note
        - The input args would _either_ have alias or attributes, but not both
        - Didn't call the function import because I think that's a protected name

    note that version and path will be introspected at runtime
    """
    module_node = self.import_module(name, source_location)
    if alias:
        self.assign(alias, module_node, from_import=True)
    elif attributes:
        module_value = self.executor.get_value(module_node.id)
        if IMPORT_STAR in attributes:
            """
            Import the module, get all public attributes, and set them as globals
            """
            # Import star behavior copied from python docs
            # https://docs.python.org/3/reference/simple_stmts.html#the-import-statement
            if hasattr(module_value, "__all__"):
                public_names = module_value.__all__  # type: ignore
            else:
                public_names = [
                    attr
                    for attr in dir(module_value)
                    if not attr.startswith("_")
                ]
            attributes = {attr: attr for attr in public_names}
        """
        load module `x`, check if `y` is an attribute of `x`, otherwise load `x.y`
        If `x.y` is a module, load that, otherwise get the `y` attribute of `x`.
        """
        for alias, attr_or_module in attributes.items():
            if hasattr(module_value, attr_or_module):
                self.assign(
                    alias,
                    self.call(
                        self.lookup_node(GETATTR),
                        source_location,
                        module_node,
                        self.literal(attr_or_module),
                    ),
                    from_import=True,
                )
            else:
                full_name = f"{name}.{attr_or_module}"
                sub_module_node = self.import_module(
                    full_name, source_location
                )
                self.assign(alias, sub_module_node, from_import=True)

    else:
        self.assign(name, module_node, from_import=True)

    node = ImportNode(
        id=get_new_id(),
        name=name,
        session_id=self.get_session_id(),
        source_location=source_location,
        control_dependency=self.control_flow_tracker.current_control_dependency(),
    )
    self.process_node(node)

ValueType

Bases: Enum

Lower case because the API with the frontend assume the characters "chart" exactly as is.

Source code in lineapy/data/types.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class ValueType(Enum):
    """
    Lower case because the API with the frontend assume the characters "chart"
    exactly as is.
    """

    # [TODO] Rename (need coordination with linea-server):
    # - `dataset` really is a table
    # - `value` means its a literal  (e.g., int/str)

    chart = 1
    array = 2
    dataset = 3
    code = 4
    value = 5  # includes int, string, bool

artifact_store()

Returns:

Type Description
LineaArtifactStore

An object of the class LineaArtifactStore that allows for printing and exporting artifacts metadata.

Source code in lineapy/api/api.py
364
365
366
367
368
369
370
371
372
373
374
def artifact_store() -> LineaArtifactStore:
    """
    Returns
    -------
    LineaArtifactStore
        An object of the class `LineaArtifactStore` that allows for printing and exporting artifacts metadata.
    """
    execution_context = get_context()
    cat = LineaArtifactStore(execution_context.executor.db)
    track(CatalogEvent(catalog_size=cat.len))
    return cat

delete(artifact_name, version)

Deletes an artifact from artifact store. If no other artifacts refer to the value, the value is also deleted from both the value node store and the pickle store.

Parameters:

Name Type Description Default
artifact_name str

Key used to while saving the artifact.

required
version Union[int, str]

Version number or "latest" or "all".

required

Raises:

Type Description
ValueError

If artifact not found or version invalid.

Source code in lineapy/api/api.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def delete(artifact_name: str, version: Union[int, str]) -> None:
    """
    Deletes an artifact from artifact store. If no other artifacts
    refer to the value, the value is also deleted from both the
    value node store and the pickle store.

    Parameters
    ----------
    artifact_name: str
        Key used to while saving the artifact.
    version: Union[int, str]
        Version number or "latest" or "all".

    Raises
    ------
    ValueError
        If artifact not found or version invalid.
    """
    version = parse_artifact_version(version)

    # get database instance
    execution_context = get_context()
    executor = execution_context.executor
    db = executor.db

    # if version is 'all' or 'latest', get_version is None
    get_version = None if isinstance(version, str) else version

    try:
        metadata = get(artifact_name, get_version).get_metadata()
    except UserException:
        raise NameError(
            f"{artifact_name}:{version} not found. Perhaps there was a typo. Please try lineapy.artifact_store() to inspect all your artifacts."
        )

    lineapy_metadata = metadata["lineapy"]
    node_id = lineapy_metadata.node_id
    execution_id = lineapy_metadata.execution_id

    db.delete_artifact_by_name(artifact_name, version=version)
    logging.info(f"Deleted Artifact: {artifact_name} version: {version}")
    try:
        db.delete_node_value_from_db(node_id, execution_id)
    except UserException:
        logging.info(
            f"Node: {node_id} with execution ID: {execution_id} not found in DB"
        )
    except ValueError:
        logging.debug(f"No valid storage path found for {node_id}")

    if lineapy_metadata.storage_backend == ARTIFACT_STORAGE_BACKEND.lineapy:
        storage_path = lineapy_metadata.storage_path
        pickled_path = (
            str(options.safe_get("artifact_storage_dir")).rstrip("/")
            + f"/{storage_path}"
        )
        with fsspec.open(pickled_path) as f:
            f.fs.delete(f.path)
    elif lineapy_metadata.storage_backend == ARTIFACT_STORAGE_BACKEND.mlflow:
        try:
            db.delete_mlflow_metadata_by_artifact_id(
                lineapy_metadata.artifact_id
            )
        except UserException:
            logging.info(
                f"Artifact id {lineapy_metadata.artifact_id} is not found in DB"
            )

get(artifact_name, version=None)

Gets an artifact from the DB.

Parameters:

Name Type Description Default
artifact_name str

Name of the artifact. Note that if you do not remember the artifact, you can use the artifact_store to browse the options.

required
version Optional[int]

Version of the artifact. If None, the latest version will be returned.

None

Returns:

Type Description
LineaArtifact

Returned value offers methods to access information we have stored about the artifact.

Source code in lineapy/api/api.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get(artifact_name: str, version: Optional[int] = None) -> LineaArtifact:
    """
    Gets an artifact from the DB.

    Parameters
    ----------
    artifact_name: str
        Name of the artifact. Note that if you do not remember the artifact,
        you can use the artifact_store to browse the options.
    version: Optional[str]
        Version of the artifact. If `None`, the latest version will be returned.

    Returns
    -------
    LineaArtifact
        Returned value offers methods to access
        information we have stored about the artifact.
    """
    validated_version = parse_artifact_version(
        "latest" if version is None else version
    )
    final_version = (
        validated_version if isinstance(validated_version, int) else None
    )

    execution_context = get_context()
    db = execution_context.executor.db
    artifactorm = db.get_artifactorm_by_name(artifact_name, final_version)
    linea_artifact = LineaArtifact(
        db=db,
        _artifact_id=artifactorm.id,
        _execution_id=artifactorm.execution_id,
        _node_id=artifactorm.node_id,
        _session_id=artifactorm.node.session_id,
        _version=artifactorm.version,  # type: ignore
        name=artifact_name,
        date_created=artifactorm.date_created,  # type: ignore
    )

    # Check version compatibility
    system_python_version = get_system_python_version()  # up to minor version
    artifact_python_version = db.get_session_context(
        linea_artifact._session_id
    ).python_version
    if system_python_version != artifact_python_version:
        warnings.warn(
            f"Current session runs on Python {system_python_version}, but the retrieved artifact was created on Python {artifact_python_version}. This may result in incompatibility issues."
        )

    track(GetEvent(version_specified=version is not None))
    return linea_artifact

get_function(artifacts, input_parameters=[], reuse_pre_computed_artifacts=[])

Extract the process that creates selected artifacts as a python function

Parameters:

Name Type Description Default
artifacts List[Union[str, Tuple[str, int]]]

List of artifact names (with optional version) to be included in the function return.

required
input_parameters List[str]

List of variable names to be used in the function arguments. Currently, only accept variable from literal assignment; such as a='123'. There should be only one literal assignment for each variable within all artifact calculation code. For instance, if both a='123' and a='abc' are existing in the code, we cannot specify a as input variables since it is confusing to specify which literal assignment we want to replace.

[]
reuse_pre_computed_artifacts List[Union[str, Tuple[str, int]]]

List of artifacts(name with optional version) for which we will use pre-computed values from the artifact store instead of recomputing from original code.

[]

Returns:

Type Description
Callable

A python function that takes input_parameters as args and returns a dictionary with each artifact name as the dictionary key and artifact value as the value.

Note that:

  1. If an input parameter is only used to calculate artifacts in the reuse_pre_computed_artifacts list, that input parameter will be passed around as a dummy variable. LineaPy will create a warning.
  2. If an artifact name has been saved multiple times within a session, multiple sessions or mutated. You might want to specify version number in artifacts or reuse_pre_computed_artifacts. The best practice to avoid searching artifact version is don't reuse artifact name in different notebooks and don't save same artifact multiple times within the same session.
Source code in lineapy/api/api.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def get_function(
    artifacts: List[Union[str, Tuple[str, int]]],
    input_parameters: List[str] = [],
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]] = [],
) -> Callable:
    """
    Extract the process that creates selected artifacts as a python function

    Parameters
    ----------
    artifacts: List[Union[str, Tuple[str, int]]]
        List of artifact names (with optional version) to be included in the
        function return.
    input_parameters: List[str]
        List of variable names to be used in the function arguments. Currently,
        only accept variable from literal assignment; such as a='123'. There
        should be only one literal assignment for each variable within all
        artifact calculation code. For instance, if both a='123' and a='abc'
        are existing in the code, we cannot specify a as input variables since
        it is confusing to specify which literal assignment we want to replace.
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]]
        List of artifacts(name with optional version) for which we will use
        pre-computed values from the artifact store instead of recomputing from
        original code.

    Returns
    -------
    Callable
        A python function that takes input_parameters as args and returns a
        dictionary with each artifact name as the dictionary key and artifact
        value as the value.

    Note that:

    1. If an input parameter is only used to calculate artifacts in the
        `reuse_pre_computed_artifacts` list, that input parameter will be
        passed around as a dummy variable. LineaPy will create a warning.
    2. If an artifact name has been saved multiple times within a session,
        multiple sessions or mutated. You might want to specify version
        number in `artifacts` or `reuse_pre_computed_artifacts`. The best
        practice to avoid searching artifact version is don't reuse artifact
        name in different notebooks and don't save same artifact multiple times
        within the same session.
    """
    execution_context = get_context()
    artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry) for art_entry in artifacts
    ]
    reuse_pre_computed_artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry)
        for art_entry in reuse_pre_computed_artifacts
    ]
    art_collection = ArtifactCollection(
        execution_context.executor.db,
        artifact_defs,
        input_parameters=input_parameters,
        reuse_pre_computed_artifacts=reuse_pre_computed_artifact_defs,
    )
    writer = BasePipelineWriter(art_collection)
    module = load_as_module(writer)
    return module.run_all_sessions

get_module_definition(artifacts, input_parameters=[], reuse_pre_computed_artifacts=[])

Create a python module that includes the definition of get_function().

Parameters:

Name Type Description Default
artifacts List[Union[str, Tuple[str, int]]]

Same as in get_function().

required
input_parameters List[str]

Same as in get_function().

[]
reuse_pre_computed_artifacts List[Union[str, Tuple[str, int]]]

Same as in get_function().

[]

Returns:

Type Description
str

A python module that includes the definition of get_function() as run_all_sessions.

Source code in lineapy/api/api.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def get_module_definition(
    artifacts: List[Union[str, Tuple[str, int]]],
    input_parameters: List[str] = [],
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]] = [],
) -> str:
    """
    Create a python module that includes the definition of [`get_function()`][lineapy.api.api.get_function].

    Parameters
    ----------
    artifacts: List[Union[str, Tuple[str, int]]]
        Same as in [`get_function()`][lineapy.api.api.get_function].
    input_parameters: List[str]
        Same as in [`get_function()`][lineapy.api.api.get_function].
    reuse_pre_computed_artifacts: List[Union[str, Tuple[str, int]]]
        Same as in [`get_function()`][lineapy.api.api.get_function].

    Returns
    -------
    str
        A python module that includes the definition of [`get_function()`][lineapy.api.api.get_function]
        as `run_all_sessions`.
    """
    execution_context = get_context()
    artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry) for art_entry in artifacts
    ]
    reuse_pre_computed_artifact_defs = [
        get_lineaartifactdef(art_entry=art_entry)
        for art_entry in reuse_pre_computed_artifacts
    ]
    art_collection = ArtifactCollection(
        execution_context.executor.db,
        artifact_defs,
        input_parameters=input_parameters,
        reuse_pre_computed_artifacts=reuse_pre_computed_artifact_defs,
    )
    writer = BasePipelineWriter(art_collection)
    return writer._compose_module()

reload()

Reloads lineapy context.

Note

Currently only reloads annotations but in the future can be a container for other items like configs, etc.

Source code in lineapy/api/api.py
351
352
353
354
355
356
357
358
359
360
361
def reload() -> None:
    """
    Reloads lineapy context.

    !!! note

        Currently only reloads annotations but in the future can be a container
        for other items like configs, etc.
    """
    execution_context = get_context()
    execution_context.executor.reload_annotations()

save(reference, name, storage_backend=None, **kwargs)

Publishes the object to the LineaPy DB.

Parameters:

Name Type Description Default
reference object

The reference could be a variable name, in which case LineaPy will save the value of the variable, with out default serialization mechanism. Alternatively, it could be a "side effect" reference, which currently includes either lineapy.file_system or lineapy.db. LineaPy will save the associated process that creates the final side effects. We are in the process of adding more side effect references, including assert statements.

required
name str

The name is used for later retrieving the artifact and creating new versions if an artifact of the name has been created before.

required
storage_backend Optional[ARTIFACT_STORAGE_BACKEND]

The storage backend used to save the artifact. Currently support lineapy and mlflow (for mlflow supported model flavors). In case of mlflow, lineapy will use mlflow.sklearn.log_model or other supported flavors equivalent to save artifacts into mlflow.

None
**kwargs

Keyword arguments passed into underlying storage mechanism to overwrite default behavior. For storage_backend='mlflow', this can overwrite default arguments in the mlflow.sklearn.log_model or other supported flavors equivalent.

{}

Returns:

Type Description
LineaArtifact

Returned value offers methods to access information we have stored about the artifact (value, version), and other automation capabilities, such as to_pipeline().

Source code in lineapy/api/api.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def save(
    reference: object,
    name: str,
    storage_backend: Optional[ARTIFACT_STORAGE_BACKEND] = None,
    **kwargs,
) -> LineaArtifact:
    """
    Publishes the object to the LineaPy DB.

    Parameters
    ----------
    reference: Union[object, ExternalState]
        The reference could be a variable name, in which case LineaPy will save
        the value of the variable, with out default serialization mechanism.
        Alternatively, it could be a "side effect" reference, which currently includes either `lineapy.file_system` or `lineapy.db`.
        LineaPy will save the associated process that creates the final side effects.
        We are in the process of adding more side effect references, including `assert` statements.
    name: str
        The name is used for later retrieving the artifact and creating new versions if an artifact of the name has been created before.
    storage_backend: Optional[ARTIFACT_STORAGE_BACKEND]
        The storage backend used to save the artifact. Currently support
        lineapy and mlflow (for mlflow supported model flavors). In case of
        mlflow, lineapy will use `mlflow.sklearn.log_model` or other supported
        flavors equivalent to save artifacts into mlflow.
    **kwargs:
        Keyword arguments passed into underlying storage mechanism to overwrite
        default behavior. For `storage_backend='mlflow'`, this can overwrite
        default arguments in the `mlflow.sklearn.log_model` or other supported
        flavors equivalent.

    Returns
    -------
    LineaArtifact
        Returned value offers methods to access
        information we have stored about the artifact (value, version),
        and other automation capabilities, such as [`to_pipeline()`][lineapy.api.api.to_pipeline].
    """
    execution_context = get_context()
    executor = execution_context.executor
    db = executor.db
    call_node = execution_context.node

    # If this value is stored as a global in the executor (meaning its an external side effect)
    # then look it up from there, instead of using this node.
    if isinstance(reference, ExternalState):
        value_node_id = executor.lookup_external_state(reference)
        msg = f"No change to the {reference.external_state} was recorded. If it was in fact changed, please open a Github issue."
        if not value_node_id:
            track(
                ExceptionEvent(ErrorType.SAVE, "No change to external state")
            )
            raise ValueError(msg)
    else:
        # Lookup the first arguments id, which is the id for the value, and
        # save that as the artifact
        value_node_id = call_node.positional_args[0].id

    execution_id = executor.execution.id
    timing = executor.get_execution_time(value_node_id)

    # serialize value to db if we haven't before
    # (happens with multiple artifacts pointing to the same value)
    serialize_method = ARTIFACT_STORAGE_BACKEND.lineapy
    if not db.node_value_in_db(
        node_id=value_node_id, execution_id=execution_id
    ):

        # TODO add version or timestamp to allow saving of multiple pickle files for the same node id

        artifact_serialize_metadata = serialize_artifact(
            value_node_id,
            execution_id,
            reference,
            name,
            storage_backend,
            **kwargs,
        )
        if (
            artifact_serialize_metadata["backend"]
            == ARTIFACT_STORAGE_BACKEND.mlflow
        ):
            artifact_path = artifact_serialize_metadata["metadata"].model_uri
            serialize_method = ARTIFACT_STORAGE_BACKEND.mlflow
        else:
            artifact_path = artifact_serialize_metadata["metadata"][
                "pickle_name"
            ]

        # adds reference to pickled file inside database
        db.write_node_value(
            NodeValue(
                node_id=value_node_id,
                value=artifact_path,
                execution_id=execution_id,
                start_time=timing[0],
                end_time=timing[1],
                value_type=get_value_type(reference),
            )
        )
        # we have to commit eagerly because if we just add it
        #   to the queue, the `res` value may have mutated
        #   and that's incorrect.
        db.commit()

    # artifact_version = 0 if artifact exists else bump one version
    date_created = datetime.utcnow()
    artifact_version = db.get_latest_artifact_version(name) + 1

    artifact_to_write = Artifact(
        node_id=value_node_id,
        execution_id=execution_id,
        date_created=date_created,
        name=name,
        version=artifact_version,
    )
    db.write_artifact(artifact_to_write)

    if serialize_method == ARTIFACT_STORAGE_BACKEND.mlflow:
        artifactorm = db.get_artifactorm_by_name(
            artifact_name=name, version=artifact_version
        )
        db.write_mlflow_artifactmetadata(
            artifactorm, artifact_serialize_metadata["metadata"]
        )

    track(SaveEvent(side_effect=side_effect_to_str(reference)))

    linea_artifact = LineaArtifact(
        db=db,
        name=name,
        date_created=date_created,
        _execution_id=execution_id,
        _node_id=value_node_id,
        _session_id=call_node.session_id,
        _version=artifact_version,
    )
    return linea_artifact

to_pipeline(artifacts, framework='SCRIPT', pipeline_name=None, dependencies={}, output_dir='.', input_parameters=[], reuse_pre_computed_artifacts=[], generate_test=False, pipeline_dag_config={}, include_non_slice_as_comment=False)

Writes the pipeline job to a path on disk.

Parameters:

Name Type Description Default
artifacts List[str]

Names of artifacts to be included in the pipeline.

required
framework str

Name of the framework to be used. Defined by enum PipelineTypes in lineapy/data/types.py. Defaults to "SCRIPT" if not specified.

'SCRIPT'
pipeline_name Optional[str]

Name of the pipeline.

None
dependencies TaskGraphEdge

Task dependencies in graphlib format, e.g., {"B": {"A", "C"}} means task A and C are prerequisites for task B. LineaPy is smart enough to figure out dependency relations within the same session, so there is no need to specify this type of dependency information; instead, the user is expected to provide dependency information among artifacts across different sessions.

{}
output_dir str

Directory path to save DAG and other pipeline files.

'.'
input_parameters List[str]

Names of variables to be used as parameters in the pipeline. Currently, it only accepts variables from literal assignment such as a = '123'. For each variable to be parametrized, there should be only one literal assignment across all artifact code for the pipeline. For instance, if both a = '123' and a = 'abc' exist in the pipeline's artifact code, we cannot make a an input parameter since its reference is ambiguous, i.e., we are not sure which literal assignment a refers to.

[]
reuse_pre_computed_artifacts List[str]

Names of artifacts in the pipeline for which pre-computed value is to be used (rather than recomputing the value).

[]
generate_test bool

Whether to generate scaffold/template for pipeline testing. Defaults to False. The scaffold contains placeholders for testing each function in the pipeline module file and is meant to be fleshed out by the user to suit their needs. When run out of the box, it performs a naive form of equality evaluation for each function's output, which demands validation and customization by the user.

False
pipeline_dag_config Optional[Dict]

A dictionary of parameters to configure DAG file to be generated. Not applicable for "SCRIPT" framework as it does not generate a separate DAG file. For "AIRFLOW" framework, Airflow-native config params such as "retries" and "schedule_interval" can be passed in. For "ARGO" framework, Argo-native config params such as "namespace" and "service_account_name".

{}

Returns:

Type Description
Path

Directory path where DAG and other pipeline files are saved.

Source code in lineapy/api/api.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def to_pipeline(
    artifacts: List[str],
    framework: str = "SCRIPT",
    pipeline_name: Optional[str] = None,
    dependencies: TaskGraphEdge = {},
    output_dir: str = ".",
    input_parameters: List[str] = [],
    reuse_pre_computed_artifacts: List[str] = [],
    generate_test: bool = False,
    pipeline_dag_config: Optional[Dict] = {},
    include_non_slice_as_comment: bool = False,
) -> Path:
    """
    Writes the pipeline job to a path on disk.

    Parameters
    ----------
    artifacts: List[str]
        Names of artifacts to be included in the pipeline.
    framework: str
        Name of the framework to be used.
        Defined by enum PipelineTypes in lineapy/data/types.py.
        Defaults to "SCRIPT" if not specified.
    pipeline_name: Optional[str]
        Name of the pipeline.
    dependencies: TaskGraphEdge
        Task dependencies in graphlib format, e.g., ``{"B": {"A", "C"}}``
        means task A and C are prerequisites for task B.
        LineaPy is smart enough to figure out dependency relations *within*
        the same session, so there is no need to specify this type of dependency
        information; instead, the user is expected to provide dependency information
        among artifacts across different sessions.
    output_dir: str
        Directory path to save DAG and other pipeline files.
    input_parameters: List[str]
        Names of variables to be used as parameters in the pipeline.
        Currently, it only accepts variables from literal assignment
        such as ``a = '123'``. For each variable to be parametrized,
        there should be only one literal assignment across all
        artifact code for the pipeline. For instance, if both ``a = '123'``
        and ``a = 'abc'`` exist in the pipeline's artifact code,
        we cannot make ``a`` an input parameter since its reference is
        ambiguous, i.e., we are not sure which literal assignment ``a``
        refers to.
    reuse_pre_computed_artifacts: List[str]
        Names of artifacts in the pipeline for which pre-computed value
        is to be used (rather than recomputing the value).
    generate_test: bool
        Whether to generate scaffold/template for pipeline testing.
        Defaults to ``False``. The scaffold contains placeholders for testing
        each function in the pipeline module file and is meant to be fleshed
        out by the user to suit their needs. When run out of the box, it performs
        a naive form of equality evaluation for each function's output,
        which demands validation and customization by the user.
    pipeline_dag_config: Optional[AirflowDagConfig]
        A dictionary of parameters to configure DAG file to be generated.
        Not applicable for "SCRIPT" framework as it does not generate a separate
        DAG file. For "AIRFLOW" framework, Airflow-native config params such as
        "retries" and "schedule_interval" can be passed in. For "ARGO" framework,
        Argo-native config params such as "namespace" and "service_account_name".

    Returns
    -------
    Path
        Directory path where DAG and other pipeline files are saved.
    """
    pipeline = Pipeline(
        artifacts=artifacts,
        name=pipeline_name,
        dependencies=dependencies,
    )
    pipeline.save()
    return pipeline.export(
        framework=framework,
        output_dir=output_dir,
        input_parameters=input_parameters,
        reuse_pre_computed_artifacts=reuse_pre_computed_artifacts,
        generate_test=generate_test,
        pipeline_dag_config=pipeline_dag_config,
        include_non_slice_as_comment=include_non_slice_as_comment,
    )

visualize(*, live=False)

Display a visualization of the Linea graph from this session using Graphviz.

If live=True, then this visualization will live update after cell execution. Note that this comes with a substantial performance penalty, so it is False by default.

Note

If the visualization is not live, it will print out the visualization as of the previous cell execution, not the one where visualize is executed.

Source code in lineapy/editors/ipython.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def visualize(*, live=False) -> None:
    """
    Display a visualization of the Linea graph from this session using Graphviz.

    If `live=True`, then this visualization will live update after cell execution.
    Note that this comes with a substantial performance penalty, so it is False
    by default.

    ??? note

        If the visualization is not live, it will print out the visualization
        as of the previous cell execution, not the one where `visualize` is executed.
    """
    if not isinstance(STATE, CellsExecutedState):
        raise RuntimeError(
            "Cannot visualize before we have started executing cells"
        )
    display_object = STATE.create_visualize_display_object()
    if live:
        # If we have an existing display handle, display a new version of it.
        if STATE.visualize_display_handle:
            STATE.visualize_display_handle.display(display_object)
        # Otherwise, create a new one
        else:
            STATE.visualize_display_handle = display(
                display_object, display_id=True
            )
    else:
        # Otherwise, just display the visualization
        display(display_object)

Was this helpful?

Help us improve docs with your feedback!