lineapy.execution package

Submodules

lineapy.execution.context module

This module contains a number of globals, which are set by the execution when it is processing call nodes.

They are used as a side channel to pass values from the executor to special functions which need to know more about the execution context, like in the exec to know the source code of the current node.

This module exposes three global functions, which are meant to be used like:

  1. The executor calls set_context before executing every call node.

  2. The function being called can call get_context to get the current context.

  3. The executor calls teardown_context after its finished executing

I.e. the context is created for every call.

class lineapy.execution.context.ContextResult(added_or_modified: 'Dict[str, object]', side_effects: 'SideEffects')[source]
class lineapy.execution.context.ExecutionContext(node: CallNode, executor: Executor, _input_node_ids: Mapping[str, LineaID], _input_globals_mutable: Mapping[str, bool], input_nodes: Mapping[LineaID, object], function_calls: Optional[List[FunctionCall]] = None)[source]

This class is available during execution of CallNodes to the functions which are being called.

It is used as a side channel to pass in metadata about the execution, such as the current node, and other global nodes (used during exec).

The side_effects property is read after the function is finished, by the executor, so that the function can pass additional side effects that were triggered back to it indirectly. This is also used by the exec functions.

property global_variables: Dict[str, object]

The current globals dictionary

lineapy.execution.context.set_context(executor: Executor, variables: Optional[Dict[str, LineaID]], node: CallNode) None[source]

Sets the context of the executor to the given node.

lineapy.execution.context.teardown_context() lineapy.execution.context.ContextResult[source]

Tearsdown the context, returning the nodes that were accessed and a mapping variables to new values that were added or crated

lineapy.execution.executor module

class lineapy.execution.executor.Executor(db: RelationalLineaDB, _globals: dict[str, object], module_file: Optional[str] = None, _function_inspector: FunctionInspector = <factory>, _id_to_value: dict[LineaID, object] = <factory>, _execution_time: dict[LineaID, Tuple[datetime, datetime]] = <factory>, _node_to_bound_self: Dict[LineaID, LineaID] = <factory>, _node_to_globals: Dict[LineaID, Dict[str, object]] = <factory>, _value_to_node: Dict[Hashable, LineaID] = <factory>)[source]

An executor that is responsible for executing a graph, either node by node as it is created, or in a batch, after the fact.

To use the executor, you first instantiate it. Then you can execute nodes, by calling execute_node. This returns a list of side effects that executing that node causes.

You can also query for the time a node took to execute or its value, using get_value and get_execution_time.

execute_graph(graph: lineapy.data.graph.Graph) None[source]

Executes a graph in visit order making sure to setup the working directory first.

TODO: Possibly move to graph instead of on executor, since it rather cleanly uses the executor’s public API? Or move to function?

execute_node(node: Union[lineapy.data.types.ImportNode, lineapy.data.types.CallNode, lineapy.data.types.LiteralNode, lineapy.data.types.LookupNode, lineapy.data.types.MutateNode, lineapy.data.types.GlobalNode, lineapy.data.types.IfNode, lineapy.data.types.ElseNode], variables: Optional[Dict[str, LineaID]] = None) Iterable[Union[lineapy.execution.side_effects.MutatedNode, lineapy.execution.side_effects.ViewOfNodes, lineapy.execution.side_effects.AccessedGlobals, lineapy.execution.side_effects.ImplicitDependencyNode]][source]

Variables is the mapping from local variable names to their nodes. It is passed in on the first execution, but on re-executions it is empty.

At that point we know which variables each call node depends on, since the first time we executed we captured that.

Does the following:

  • Executes a node

  • And records

    • value (currently: only for call nodes and all call nodes)

    • execution time

  • Add a new frame to the stack to support error reporting. Without it, the traceback will be empty.

  • Returns the SideEffects of this node that’s analyzed at runtime (hence in the executor).

get_execution_time(node_id: LineaID) Tuple[datetime.datetime, datetime.datetime][source]

Returns the (startime, endtime) for a node that was execute.

Only applies for function call nodes.

get_value(node_id: LineaID) object[source]

Gets the Python in memory value for a node which was already executed.

lookup_external_state(state: lineapy.instrumentation.annotation_spec.ExternalState) Optional[LineaID][source]

Returns the node ID if we have created a node already for some external state.

Otherwise, returns None.

class lineapy.execution.executor.PrivateExecuteResult(value: 'object', start_time: 'datetime', end_time: 'datetime', side_effects: 'List[SideEffect]')[source]

lineapy.execution.globals_dict module

class lineapy.execution.globals_dict.GlobalsDict[source]

A custom dict that is meant to be accessed in a particular way, in order to record getitems. It is used for setting as the globals when executing some code, so we can try to understand which globals were accessed.

It is meant to be used like:

  1. Instantiate it empty like GlobalsDict()

  2. Call setup_globals(d) to update it with the input globals

  3. Execute some code that uses it as globals, which will call __setitem__ as well as our custom __getitem__.

  4. Call teardown_globals() which will return the Result, containing the a record of all the original globals that were accessed and any new globals that were updated or added.

We cannot overload the __setitem__ method, since Python will not respect it for custom globals, but we can overload the __getitem__ method.

See https://stackoverflow.com/a/12185315/907060 which refers to https://bugs.python.org/issue14385

class lineapy.execution.globals_dict.GlobalsDictResult(accessed_inputs: 'List[str]', added_or_modified: 'Dict[str, object]')[source]
class lineapy.execution.globals_dict.State(inputs: 'Dict[str, object]', accessed_inputs: 'List[str]' = <factory>)[source]
process_getitem(k: str, v: object) None[source]

If we haven’t recorded this key and its value is the same as the value in the input globals (meaning we haven’t overwritten it), then record it as a getitem.

lineapy.execution.inspect_function module

class lineapy.execution.inspect_function.FunctionInspector(specs: Dict[str, List[lineapy.instrumentation.annotation_spec.Annotation]] = <factory>, parsed: lineapy.execution.inspect_function.FunctionInspectorParsed = <factory>)[source]

The FunctionInspector does two different loading steps.

  1. Load all the specs from disk with get_specs. This happens once on creation of the object.

  2. On initialization, and before every spec call, go through all the specs and “parse” any for modules we have already imported, which means turning the criteria into in memory objects, we can compare against when inspecting.

inspect(function: Callable, args: list[object], kwargs: dict[str, object], result: object) Iterable[InspectFunctionSideEffect][source]

Inspects a function and returns how calling it mutates the args/result and creates view relationships between them.

class lineapy.execution.inspect_function.FunctionInspectorParsed(function_to_side_effects: Dict[Callable, List[Union[lineapy.instrumentation.annotation_spec.ViewOfValues, lineapy.instrumentation.annotation_spec.MutatedValue, lineapy.instrumentation.annotation_spec.ImplicitDependencyValue]]] = <factory>, method_name_to_type_to_side_effects: Dict[str, Dict[type, List[Union[lineapy.instrumentation.annotation_spec.ViewOfValues, lineapy.instrumentation.annotation_spec.MutatedValue, lineapy.instrumentation.annotation_spec.ImplicitDependencyValue]]]] = <factory>, keyword_name_and_value_to_type_to_side_effects: Dict[Tuple[str, Hashable], Dict[type, List[Union[lineapy.instrumentation.annotation_spec.ViewOfValues, lineapy.instrumentation.annotation_spec.MutatedValue, lineapy.instrumentation.annotation_spec.ImplicitDependencyValue]]]] = <factory>)[source]

Contains the parsed function inspector criteria.

add_annotations(module: module, annotations: List[lineapy.instrumentation.annotation_spec.Annotation]) None[source]

Parse a list of annotations and look them up to add them to our parsed criteria.

inspect(fn: Callable, kwargs: Dict[str, object]) Optional[List[Union[lineapy.instrumentation.annotation_spec.ViewOfValues, lineapy.instrumentation.annotation_spec.MutatedValue, lineapy.instrumentation.annotation_spec.ImplicitDependencyValue]]][source]

Inspect a function call and return a list of side effects, if it matches any of the annotations

lineapy.execution.inspect_function.get_imported_module(name: str) Optional[module][source]

Return a module, if it has been imported.

Also handles the corner case where a submodule has not been imported, but is accessible as an attribute on the parent module. This is needed for the example tensorflow.keras.utils, which is not imported when importing tensorflow, but is accessible as a property of tensorflow.

lineapy.execution.inspect_function.get_specs() Dict[str, List[lineapy.instrumentation.annotation_spec.Annotation]][source]

yaml specs are for non-built in functions. Captures all the .annotations.yaml files in the lineapy directory.

lineapy.execution.inspect_function.is_mutable(obj: object) bool[source]

Returns true if the object is mutable.

Note that currently, tempfile.NamedTemporaryFile() is not mutable, and the semantics is actually correct, because it doesn’t end up changing the file system. However, the following registers as normal files (which are mutable).

`python filename = NamedTemporaryFile().name handle = open(filename, "wb") `

lineapy.execution.inspect_function.logger = <Logger lineapy.execution.inspect_function (INFO)>

helper functions

lineapy.execution.inspect_function.new_side_effect_without_all_positional_arg(side_effect: lineapy.instrumentation.annotation_spec.ViewOfValues, args: list) lineapy.instrumentation.annotation_spec.ViewOfValues[source]

This method must NOT modify the original side_effect, since these annotations are dependent on the runtime values that are different for each call—AllPositionalArgs will have a different set of arguments.

Note that we might need to add something like “all keyword arguments”, but that use case hasn’t come up yet.

lineapy.execution.inspect_function.validate(item: Dict) Optional[lineapy.instrumentation.annotation_spec.ModuleAnnotation][source]

We cannot filer the specs by module, because it might be loaded later. This causes a bit of inefficiency in our function inspection, but we can fix later if it’s a problem.

lineapy.execution.side_effects module

class lineapy.execution.side_effects.AccessedGlobals(retrieved: List[str], added_or_updated: List[str])[source]

Represents some global variables that were retrieved or changed during this call.

class lineapy.execution.side_effects.ID(id: 'LineaID')[source]
class lineapy.execution.side_effects.ImplicitDependencyNode(pointer: Union[lineapy.execution.side_effects.ID, lineapy.execution.side_effects.Variable, lineapy.instrumentation.annotation_spec.ExternalState])[source]

Represents that the call node has an implicit dependency on another node.

class lineapy.execution.side_effects.MutatedNode(pointer: Union[lineapy.execution.side_effects.ID, lineapy.execution.side_effects.Variable, lineapy.instrumentation.annotation_spec.ExternalState])[source]

Represents that a node has been mutated.

class lineapy.execution.side_effects.Variable(name: 'str')[source]
class lineapy.execution.side_effects.ViewOfNodes(pointers: List[Union[lineapy.execution.side_effects.ID, lineapy.execution.side_effects.Variable, lineapy.instrumentation.annotation_spec.ExternalState]])[source]

Represents that a set of nodes are now “views” of each other, meaning that if any are mutated they all could be mutated.

Module contents