pangeo-forge-runner#

Documentation Status Tests passing codecov PyPI version pre-commit.ci status

Commandline tool to manage pangeo-forge feedstocks

Tutorials#

Running a recipe locally#

pangeo-forge-runner supports baking your recipes locally, primarily so you can test the exact setup that will be used to bake your recipe on the cloud. This allows for fast iteration on your recipe, while guaranteeing that the behavior you see on your local system is what you will get when running scaled out on the cloud.

Clone a sample recipe repo to work on#

This tutorial will work with any recipe, but to simplify things we will use this pruned GPCP Recipe that pulls a subset of GPCPC netcdf files from Google Cloud storage and writes it out as Zarr. The config we have setup for pangeo-forge-runner will fetch the files from remote storage only once on your system, caching it so future runs will be faster.

This same setup would work for any recipe!

  1. Clone a copy of the recipe to work on:

    git clone https://github.com/pforgetest/gpcp-from-gcs-feedstock
    cd gpcp-from-gcs-feedstock
    

    You can make edits to this if you would like.

  2. Setup a virtual environment that will contain pangeo-forge-runner and any other dependencies this recipe will need. We use a venv here, but you may also use conda or other python package management setup you are familiar with.

    python -m venv venv
    source venv/bin/activate
    
  3. Install pangeo-forge-runner into this environment.

    pip install pangeo-forge-runner
    

Now you’re ready to go!

Setting up config file#

Construct a local_config.py file that describes where the output data should go, and what should be used for caching the input files. Since we just want to test locally, these can point to the local filesystem!

# Let's put all our data on the same dir as this config file
from pathlib import Path
import os
HERE = Path(__file__).parent

DATA_PREFIX = HERE / 'data'
os.makedirs(DATA_PREFIX, exist_ok=True)

# Target output should be partitioned by job id
c.TargetStorage.root_path = f"{DATA_PREFIX}/{{job}}"

c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args

# Input data cache should *not* be partitioned by job id, as we want to get the datafile
# from the source only once
c.InputCacheStorage.root_path = f"{DATA_PREFIX}/cache/input"

c.MetadataCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
c.MetadataCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
# Metadata cache should be per job, as kwargs changing can change metadata
c.MetadataCacheStorage.root_path = f"{DATA_PREFIX}/{{job}}/cache/metadata"

This will create a directory called data in the same directory this config file is located in, and put all outputs and caches in there. To speed up multiple runs, input files will be cached under the data/cache directory.

Run a pruned version of your recipe#

You’re all set to run your recipe now!

pangeo-forge-runner bake \
    --config local_config.py \
    --repo . \
    --Bake.job_name=test1 \
    --prune

This should run for a few seconds, and your output Zarr should now be in output/tests1! Let’s explore the various parameters passed.

  1. --config local_config.py specifies the config file we want pangeo-forge-runner to read. If we were to try to run this on GCP or AWS, we can have additional aws_config.py or gcp_config.py files, and just pass those instead - everything else can remain the same. By putting most config into files, this also eases collaboration - multiple people can know they’re running the same config.

  2. --repo . specifies that we want the current directory to be treated as a recipe and run. This can instead point to a git repo, zenodo URI, etc as needed.

  3. --Bake.job_name=test1 specifies a unique job name for this particular run. In our local_config.py, we use this name to create the output directory. If not specified, this would be autogenerated.

  4. --prune specifies we only want to run the recipe on about 2 input files, rather than on everything. This makes for fast turnaround time and easy testing.

You can test the created Zarr store by opening it with xarray

>>> import xarray as xr
>>> ds = xr.open_zarr("data/test1/gpcp")
>>> ds
<xarray.Dataset>
Dimensions:      (latitude: 180, nv: 2, longitude: 360, time: 2)
Coordinates:
  * latitude     (latitude) float32 -90.0 -89.0 -88.0 -87.0 ... 87.0 88.0 89.0
  * longitude    (longitude) float32 0.0 1.0 2.0 3.0 ... 356.0 357.0 358.0 359.0
  * time         (time) datetime64[ns] 1996-10-01 1996-10-02
Dimensions without coordinates: nv
Data variables:
    lat_bounds   (latitude, nv) float32 dask.array<chunksize=(180, 2), meta=np.ndarray>
    lon_bounds   (longitude, nv) float32 dask.array<chunksize=(360, 2), meta=np.ndarray>
    precip       (time, latitude, longitude) float32 dask.array<chunksize=(1, 180, 360), meta=np.ndarray>
    time_bounds  (time, nv) datetime64[ns] dask.array<chunksize=(1, 2), meta=np.ndarray>
Attributes: (12/41)
    Conventions:                CF-1.6, ACDD 1.3
    Metadata_Conventions:       CF-1.6, Unidata Dataset Discovery v1.0, NOAA ...
    acknowledgment:             This project was supported in part by a grant...
    cdm_data_type:              Grid
    cdr_program:                NOAA Climate Data Record Program for satellit...
    cdr_variable:               precipitation
    ...                         ...
    sensor:                     Imager, TOVS > TIROS Operational Vertical Sou...
    spatial_resolution:         1 degree
    standard_name_vocabulary:   CF Standard Name Table (v41, 22 February 2017)
    summary:                    Global Precipitation Climatology Project (GPC...
    time_coverage_duration:     P1D
    title:                      Global Precipitation Climatatology Project (G...
>>>

Contents#

Configuration Reference#

pangeo-forge-runner uses the very flexible traitlets system (also used by the Jupyter & Dask ecosystems) for configuration. This page links out to all the configurable parts of pangeo-forge-runner. Configuration can be set via command-line options, or via a dedicated config file in .json or .py format

Bakery#

The “Baker” configures the compute plane where the recipe is baked - it wraps an Apache Beam Runner in an opinionated fashion.

DataflowBakery#
class pangeo_forge_runner.bakery.dataflow.DataflowBakery(**kwargs: Any)#

Bake a Pangeo Forge recipe on GCP Dataflow

machine_type c.DataflowBakery.machine_type = Unicode('n1-highmem-2')#

GCP Machine type to use for the Dataflow jobs.

Ignored if use_dataflow_prime is set.

max_num_workers c.DataflowBakery.max_num_workers = Int(None)#

Maximum number of workers this job can be autoscaled to.

Set to None (default) for no limit.

project_id c.DataflowBakery.project_id = Unicode(None)#

GCP Project to submit the Dataflow job into.

Defaults to the output of gcloud config get-value project if unset. Must be set for the Bakery to function.

region c.DataflowBakery.region = Unicode('us-central1')#

GCP Region to submit the Dataflow jobs into

service_account_email c.DataflowBakery.service_account_email = Unicode(None)#

If using a GCP service account to deploy Dataflow jobs, this option specifies the service account email address, which must be set to avoid permissions issues during pipeline execution. If you are using GCP user creds, do not set this value.

Defaults to the output of gcloud config get-value account if this value is a service account email address. If this value is a user email address, defaults to None.

temp_gcs_location c.DataflowBakery.temp_gcs_location = Unicode(None)#

GCS URL under which to put temporary files required to launch dataflow jobs

Must be set, and be a gs:// URL.

use_dataflow_prime c.DataflowBakery.use_dataflow_prime = Bool(False)#

Use GCP’s DataFlow Prime instead of regular DataFlow.

https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime has more information on the advantages of dataflow prime.

use_public_ips c.DataflowBakery.use_public_ips = Bool(False)#

Use public IPs for the Dataflow workers.

Set to false for projects that have policies against VM instances having their own public IPs

LocalDirectBakery#
class pangeo_forge_runner.bakery.local.LocalDirectBakery(**kwargs: Any)#

Bake recipes on your local machine, without docker.

Uses the Apache Beam DirectRunner

num_workers c.LocalDirectBakery.num_workers = Int(0)#

Number of workers to use when baking the recipe.

Defaults to 0, which is interpreted by Apache beam to be the number of CPUs on the machine

FlinkOperatorBakery#
class pangeo_forge_runner.bakery.flink.FlinkOperatorBakery(**kwargs: Any)#

Bake a Pangeo Forge recipe on a Flink cluster based on the Apache Flink k8s Operator

Requires a kubernetes cluster with apache/flink-kubernetes-operator installed

beam_executor_resources c.FlinkOperatorBakery.beam_executor_resources = Dict()#

Resources to be given the beam executor container.

Passed through to the kubernetes specification for the container that actually runs the custom python code we have. See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-requests-and-limits-of-pod-and-container for possible options.

Note that this is not specified the same way as other resource request config on this class.

enable_job_archiving c.FlinkOperatorBakery.enable_job_archiving = Bool(False)#

Enable the ability for past jobs to be archived so the job manager’s REST API can still return information after completing

Properties to set as Flink configuration.

See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/ for full list of configuration options. Make sure you are looking at the right setup for the version of Flink you are using.

Version of Flink to use.

Must be a version supported by the Flink Operator installed in the cluster

job_archive_efs_mount c.FlinkOperatorBakery.job_archive_efs_mount = Unicode('/opt/history/jobs')#

The NFS mount path where past jobs are archived so the historyserver REST API can return some information about job statuses after job managers are torn down

The default path here corresponds to what the pangeo-forge-cloud-federation Terraform deploys as the mount path: pangeo-forge/pangeo-forge-cloud-federation

If using that Terraform you can configure the path via historyserver_mount_path: pangeo-forge/pangeo-forge-cloud-federation

job_manager_resources c.FlinkOperatorBakery.job_manager_resources = Dict()#

Memory & CPU resources to give to the jobManager pod.

Passed through to .spec.jobManager.resource in the FlinkDeployment CRD.

See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource for accepted keys and what they mean. Specifically, note that this is not specified the same way as kubernetes resource requests in general.

Note that at least memory must be set.

max_parallelism c.FlinkOperatorBakery.max_parallelism = Int(-1)#

The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.

Defaults to -1, which is no limit.

parallelism c.FlinkOperatorBakery.parallelism = Int(-1)#

The degree of parallelism to be used when distributing operations onto workers.

Defaults to -1, which uses Flinks’ defaults.

task_manager_resources c.FlinkOperatorBakery.task_manager_resources = Dict()#

Memory & CPU resources to give to the taskManager container.

Passed through to .spec.taskManager.resource in the FlinkDeployment CRD.

Note this is just the resources for the taskManager container only - not for the beam executor container where our python code is actually executed. That is managed via beam_executor_resources.

See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource for accepted keys and what they mean. Specifically, note that this is not specified the same way as kubernetes resource requests in general.

Note that at least memory must be set.

Storage#

The Storage configuration lets you configure via fsspec where pangeo-forge-runner puts intermediate & final data products.

TargetStorage#
class pangeo_forge_runner.storage.TargetStorage(**kwargs: Any)#

Storage configuration for where the baked data should be stored

fsspec_args c.TargetStorage.fsspec_args = Dict()#

Args to pass to fsspec_class during instantiation

fsspec_class c.TargetStorage.fsspec_class = Type(<class 'fsspec.spec.AbstractFileSystem'>)#

FSSpec Filesystem to instantiate as class for this target

root_path c.TargetStorage.root_path = Unicode('')#

Root path under which to put all our storage.

If {job_name} is present in the root_path, it will be expanded to the unique job_name of the current job.

InputCacheStorage#
class pangeo_forge_runner.storage.InputCacheStorage(**kwargs: Any)#

Storage configuration for caching input files during recipe baking

fsspec_args c.InputCacheStorage.fsspec_args = Dict()#

Args to pass to fsspec_class during instantiation

fsspec_class c.InputCacheStorage.fsspec_class = Type(<class 'fsspec.spec.AbstractFileSystem'>)#

FSSpec Filesystem to instantiate as class for this target

root_path c.InputCacheStorage.root_path = Unicode('')#

Root path under which to put all our storage.

If {job_name} is present in the root_path, it will be expanded to the unique job_name of the current job.

Commands#

pangeo-forge-runner supports various commands that can be configured via traitlets.

expand-meta#
class pangeo_forge_runner.commands.expand_meta.ExpandMeta(**kwargs: Any)#

Application to expand meta.yaml to be fully formed.

Will execute arbitrary code if necessary to resolve dict_object recipes.

config_file c.ExpandMeta.config_file = Unicode('pangeo_forge_runner_config.py')#

Load traitlet config from this file if it exists

content_providers c.ExpandMeta.content_providers = List()#

List of ContentProviders to use to fetch repo.

Uses ContentProviders from repo2docker for doing most of the work. The ordering matters, and Git is used as the default for any URL that we can not otherwise determine.

If we want to support additional contentproviders, ideally we can contribute them upstream to repo2docker.

feedstock_subdir c.ExpandMeta.feedstock_subdir = Unicode('feedstock')#

Subdirectory inside the repository containing the meta.yaml file

json_logs c.ExpandMeta.json_logs = Bool(False)#

Provide JSON formatted logging output to stdout.

If set to True, all output will be emitted as one JSON object per line.

Each line will have at least a ‘status’ field and a ‘message’ field. Various other keys will also be present based on the command being called and the value of ‘status’.

TODO: This must get a JSON schema.

log_datefmt c.ExpandMeta.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')#

The date format used by logging formatters for %(asctime)s

log_format c.ExpandMeta.log_format = Unicode('[%(name)s]%(highlevel)s %(message)s')#

The Logging format template

logging_config c.ExpandMeta.logging_config = Dict()#

Logging configuration for this python application.

When set, this value is passed to logging.config.dictConfig, and can be used to configure how logs throughout the application are handled, not just for logs from this application alone.

See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig for more details.

ref c.ExpandMeta.ref = Unicode(None)#

Ref of feedstock repo to fetch.

Optional, only used for some methods of fetching (such as git or mercurial)

repo c.ExpandMeta.repo = Unicode('')#

URL of feedstock repo to operate on.

Can be anything that is interpretable by self.content_providers, using Repo2Docker ContentProviders. By default, this includes Git repos, Mercurial Repos, Zenodo, Figshare, Dataverse, Hydroshare, Swhid and local file paths.

show_config c.ExpandMeta.show_config = Bool(False)#

Instead of starting the Application, dump configuration to stdout

show_config_json c.ExpandMeta.show_config_json = Bool(False)#

Instead of starting the Application, dump configuration to stdout (as JSON)

bake#
class pangeo_forge_runner.commands.bake.Bake(**kwargs: Any)#

Command to bake a pangeo forge recipe in a given bakery

bakery_class c.Bake.bakery_class = Type(<class 'pangeo_forge_runner.bakery.local.LocalDirectBakery'>)#

The Bakery to bake this recipe in.

The Bakery (and its configuration) determine which Apache Beam Runner is used, and how options for it are specified. Defaults to LocalDirectBakery, which bakes the recipe using Apache Beam’s “DirectRunner”. It doesn’t use Docker or the cloud, and runs everything locally. Useful only for testing!

config_file c.Bake.config_file = Unicode('pangeo_forge_runner_config.py')#

Load traitlet config from this file if it exists

container_image c.Bake.container_image = Unicode('')#

Container image to use for this job.

For GCP DataFlow leaving it blank defaults to letting beam automatically figure out the image to use for the workers based on version of beam and python in use.

For Flink it’s required that you pass an beam image for the version of python and beam you are targeting for example: apache/beam_python3.10_sdk:2.51.0 more info: https://hub.docker.com/layers/apache/

Note that some runners (like the local one) may not support this!

content_providers c.Bake.content_providers = List()#

List of ContentProviders to use to fetch repo.

Uses ContentProviders from repo2docker for doing most of the work. The ordering matters, and Git is used as the default for any URL that we can not otherwise determine.

If we want to support additional contentproviders, ideally we can contribute them upstream to repo2docker.

feedstock_subdir c.Bake.feedstock_subdir = Unicode('feedstock')#

Subdirectory inside the repository containing the meta.yaml file

job_name c.Bake.job_name = Unicode(None)#

Optionally pass a custom job name for the job run.

If None (the default), a unique name will be generated for the job.

json_logs c.Bake.json_logs = Bool(False)#

Provide JSON formatted logging output to stdout.

If set to True, all output will be emitted as one JSON object per line.

Each line will have at least a ‘status’ field and a ‘message’ field. Various other keys will also be present based on the command being called and the value of ‘status’.

TODO: This must get a JSON schema.

log_datefmt c.Bake.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')#

The date format used by logging formatters for %(asctime)s

log_format c.Bake.log_format = Unicode('[%(name)s]%(highlevel)s %(message)s')#

The Logging format template

logging_config c.Bake.logging_config = Dict()#

Logging configuration for this python application.

When set, this value is passed to logging.config.dictConfig, and can be used to configure how logs throughout the application are handled, not just for logs from this application alone.

See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig for more details.

prune c.Bake.prune = Bool(False)#

Prune the recipe to only run for 2 time steps.

Makes it much easier to test recipes!

recipe_id c.Bake.recipe_id = Unicode(None)#

Optionally pass this value to run only this recipe_id from the feedstock.

If empty, all recipes from the feedstock will be run.

ref c.Bake.ref = Unicode(None)#

Ref of feedstock repo to fetch.

Optional, only used for some methods of fetching (such as git or mercurial)

repo c.Bake.repo = Unicode('')#

URL of feedstock repo to operate on.

Can be anything that is interpretable by self.content_providers, using Repo2Docker ContentProviders. By default, this includes Git repos, Mercurial Repos, Zenodo, Figshare, Dataverse, Hydroshare, Swhid and local file paths.

show_config c.Bake.show_config = Bool(False)#

Instead of starting the Application, dump configuration to stdout

show_config_json c.Bake.show_config_json = Bool(False)#

Instead of starting the Application, dump configuration to stdout (as JSON)

Glossary#

A glossary of common terms used throughout pangeo-forge-runner.

job_id#

An autogenerated, non-human readable unique ID that represents a particular beam job. These are generated by the beam runner on submission, not set by the submitting user.

For some bakeries (such as flink), the beam runner does not generate a unique ID during submission. In those cases, this will be the same as job_name

As this is only known after job submission, this is not available for template expansion in the job specification - so you can not use {job_id} in various TargetPaths, for example.

job_name#

A human readable, human set, but not necessarily globally unique ID that represents a particular beam job. These are set on the commandline with --Bake.job_name=test-flink (or similar traitlets config). If not set, pangeo-forge-runner will try to automatically generate a descriptive name.

These can only contain lower case characters (a-z), digits (0-9) and dashes (-).

Development#

Release#

Releases are automated by the release.yaml GitHub Workflow, which is triggered by tag events.

To cut a new release, those with push permissions to the repo, may run:

git tag $VERSION
git push origin --tags

Where $VERSION is a three-element, dot-delimited semantic version of the form v{MAJOR}.{MINOR}.{PATCH}, which is appropriately incremented from the prior tag.

And origin is assumed to be the remote corresponding to pangeo-forge/pangeo-forge-runner.