All tasks
Tasks can be imported piecemeal or imported in their entirety from here.
Attributes⚓︎
TaskList
module-attribute
⚓︎
TaskList = List[Union[Call, DeferredTask]]
List of wrapped or normal task functions.
Functions⚓︎
main ⚓︎
main(_ctx)
Run main task pipeline.
Source code in calcipy/tasks/all_tasks.py
@task(post=with_progress(_MAIN_TASKS))
def main(_ctx: Context) -> None:
"""Run main task pipeline."""
other ⚓︎
other(_ctx)
Run tasks that are otherwise not exercised in main.
Source code in calcipy/tasks/all_tasks.py
@task(post=with_progress(_OTHER_TASKS))
def other(_ctx: Context) -> None:
"""Run tasks that are otherwise not exercised in main."""
progress ⚓︎
progress(_ctx, *, index, total)
Progress Task.
Source code in calcipy/tasks/all_tasks.py
@task(
help={
'index': 'Current index (0-indexed)',
'total': 'Total steps',
},
show_task_info=False,
)
def progress(_ctx: Context, *, index: int, total: int) -> None:
"""Progress Task."""
LOGGER.text('Progress', is_header=True, index=index + 1, total=total)
release ⚓︎
release(ctx, *, suffix=None)
Run release pipeline.
Source code in calcipy/tasks/all_tasks.py
@task(
help=cl.bump.help, # pyright: ignore[reportFunctionMemberAccess]
post=with_progress(
[
pack.lock,
doc.build,
doc.deploy,
],
offset=1,
),
)
def release(ctx: Context, *, suffix: cl.SuffixT = None) -> None:
"""Run release pipeline."""
cl.bumpz(ctx, suffix=suffix)
summary ⚓︎
summary(_ctx, *, message)
Summary Task.
Source code in calcipy/tasks/all_tasks.py
@task(
help={
'message': 'String message to display',
},
show_task_info=False,
)
def summary(_ctx: Context, *, message: str) -> None:
"""Summary Task."""
LOGGER.text(message, is_header=True)
with_progress ⚓︎
with_progress(items, offset=0)
Inject intermediary ‘progress’ tasks.
| PARAMETER | DESCRIPTION |
|---|---|
items
|
list of tasks
TYPE:
|
offset
|
Optional offset to shift counters
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TaskList
|
List of tasks with progress indicators interspersed. |
Source code in calcipy/tasks/all_tasks.py
def with_progress(items: Any, offset: int = 0) -> TaskList:
"""Inject intermediary 'progress' tasks.
Args:
items: list of tasks
offset: Optional offset to shift counters
Returns:
List of tasks with progress indicators interspersed.
"""
task_items = [_build_task(t_) for t_ in items]
message = 'Running tasks: ' + ', '.join([str(t_.__name__) for t_ in task_items])
tasks: TaskList = [summary.with_kwargs(message=message)] # pyright: ignore[reportFunctionMemberAccess]
total = len(task_items) + offset
for idx, item in enumerate(task_items):
tasks += [
progress.with_kwargs(index=idx + offset, total=total), # pyright: ignore[reportFunctionMemberAccess]
item,
]
return tasks