pangeo-forge-runner#
Commandline tool to manage pangeo-forge feedstocks
Tutorials#
Running a recipe locally#
pangeo-forge-runner
supports baking your recipes locally, primarily so you
can test the exact setup that will be used to bake your recipe on the cloud.
This allows for fast iteration on your recipe, while guaranteeing that the
behavior you see on your local system is what you will get when running
scaled out on the cloud.
Clone a sample recipe repo to work on#
This tutorial will work with any recipe, but to simplify things we will use
this pruned GPCP Recipe
that pulls a subset of GPCPC netcdf files from Google Cloud storage and writes it
out as Zarr. The config we have setup for pangeo-forge-runner
will fetch the
files from remote storage only once on your system, caching it so future runs
will be faster.
This same setup would work for any recipe!
Clone a copy of the recipe to work on:
git clone https://github.com/pforgetest/gpcp-from-gcs-feedstock cd gpcp-from-gcs-feedstock
You can make edits to this if you would like.
Setup a virtual environment that will contain
pangeo-forge-runner
and any other dependencies this recipe will need. We use avenv
here, but you may also useconda
or other python package management setup you are familiar with.python -m venv venv source venv/bin/activate
Install
pangeo-forge-runner
into this environment.pip install pangeo-forge-runner
Now you’re ready to go!
Setting up config file#
Construct a local_config.py
file that describes where the output
data should go, and what should be used for caching the input files. Since we just
want to test locally, these can point to the local filesystem!
# Let's put all our data on the same dir as this config file
from pathlib import Path
import os
HERE = Path(__file__).parent
DATA_PREFIX = HERE / 'data'
os.makedirs(DATA_PREFIX, exist_ok=True)
# Target output should be partitioned by job id
c.TargetStorage.root_path = f"{DATA_PREFIX}/{{job}}"
c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
# Input data cache should *not* be partitioned by job id, as we want to get the datafile
# from the source only once
c.InputCacheStorage.root_path = f"{DATA_PREFIX}/cache/input"
c.MetadataCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
c.MetadataCacheStorage.fsspec_args = c.TargetStorage.fsspec_args
# Metadata cache should be per job, as kwargs changing can change metadata
c.MetadataCacheStorage.root_path = f"{DATA_PREFIX}/{{job}}/cache/metadata"
This will create a directory called data
in the same directory this
config file is located in, and put all outputs and caches in there. To
speed up multiple runs, input files will be cached under the data/cache
directory.
Run a pruned version of your recipe#
You’re all set to run your recipe now!
pangeo-forge-runner bake \
--config local_config.py \
--repo . \
--Bake.job_name=test1 \
--prune
This should run for a few seconds, and your output Zarr should now be
in output/tests1
! Let’s explore the various parameters passed.
--config local_config.py
specifies the config file we wantpangeo-forge-runner
to read. If we were to try to run this on GCP or AWS, we can have additionalaws_config.py
orgcp_config.py
files, and just pass those instead - everything else can remain the same. By putting most config into files, this also eases collaboration - multiple people can know they’re running the same config.--repo .
specifies that we want the current directory to be treated as a recipe and run. This can instead point to a git repo, zenodo URI, etc as needed.--Bake.job_name=test1
specifies a unique job name for this particular run. In ourlocal_config.py
, we use this name to create the output directory. If not specified, this would be autogenerated.--prune
specifies we only want to run the recipe on about 2 input files, rather than on everything. This makes for fast turnaround time and easy testing.
You can test the created Zarr store by opening it with xarray
>>> import xarray as xr
>>> ds = xr.open_zarr("data/test1/gpcp")
>>> ds
<xarray.Dataset>
Dimensions: (latitude: 180, nv: 2, longitude: 360, time: 2)
Coordinates:
* latitude (latitude) float32 -90.0 -89.0 -88.0 -87.0 ... 87.0 88.0 89.0
* longitude (longitude) float32 0.0 1.0 2.0 3.0 ... 356.0 357.0 358.0 359.0
* time (time) datetime64[ns] 1996-10-01 1996-10-02
Dimensions without coordinates: nv
Data variables:
lat_bounds (latitude, nv) float32 dask.array<chunksize=(180, 2), meta=np.ndarray>
lon_bounds (longitude, nv) float32 dask.array<chunksize=(360, 2), meta=np.ndarray>
precip (time, latitude, longitude) float32 dask.array<chunksize=(1, 180, 360), meta=np.ndarray>
time_bounds (time, nv) datetime64[ns] dask.array<chunksize=(1, 2), meta=np.ndarray>
Attributes: (12/41)
Conventions: CF-1.6, ACDD 1.3
Metadata_Conventions: CF-1.6, Unidata Dataset Discovery v1.0, NOAA ...
acknowledgment: This project was supported in part by a grant...
cdm_data_type: Grid
cdr_program: NOAA Climate Data Record Program for satellit...
cdr_variable: precipitation
... ...
sensor: Imager, TOVS > TIROS Operational Vertical Sou...
spatial_resolution: 1 degree
standard_name_vocabulary: CF Standard Name Table (v41, 22 February 2017)
summary: Global Precipitation Climatology Project (GPC...
time_coverage_duration: P1D
title: Global Precipitation Climatatology Project (G...
>>>
Run a Recipe on a Flink Cluster on AWS EKS#
pangeo-forge-runner supports baking your recipes on Apache Flink using the Apache Flink Runner for Beam. After looking at various options, we have settled on supporting Flink on Kubernetes using Apache’s Flink Operator. This allows us to bake recipes on any Kubernetes cluster! In this tutorial, we’ll bake a recipe that we use for integration tests on an Amazon EKS k8s cluster!
Current support:
pangeo-forge-recipes version |
pangeo-forge-runner version |
flink k8s operator version |
flink version |
apache-beam version |
---|---|---|---|---|
>=0.10.0 |
>=0.9.1 |
1.6.1 |
1.16 |
>=2.47.0 |
Setting up EKS#
You need an EKS cluster with Apache Flink Operator installed. Setting that up is out of the scope for this tutorial, but you can find some Terraform for that here.
Setting up your Local Machine as a Runner#
Install required tools on your machine.
If you don’t already have AWS Access/Secret keys you will need to create them. Then set up your credentials locally so awscli can pick them up using the configure command.
Ask your administrator to add your IAM user arn to the correct k8s
aws-auth
configuration. Then the admin will ask you to run a command to get EKS credentials locally that might look like this:$ AWS_PROFILE=<your-aws-profile> aws eks update-kubeconfig --name <cluster-name> --region <aws-cluster-region>
Verify everything is working by running the following command:
$ kubectl -n default get flinkdeployment,deploy,pod,svc
You should see the default k8s resource such as the
flink-kubernetes-operator
similar to what’s below:NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/flink-kubernetes-operator 1/1 1 1 15d NAME READY STATUS RESTARTS AGE pod/flink-kubernetes-operator-559fccd895-pfdwj 2/2 Running 2 (2d21h ago) 6d17h NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/flink-operator-webhook-service ClusterIP 10.100.231.230 <none> 443/TCP 20d service/kubernetes ClusterIP 10.100.0.1 <none> 443/TCP 20d
Setting up Runner Configuration#
Let’s first look at defining where the data that our recipes will use is stored. There are three aspects of this question we must answer:
where data is discovered/read from
where data is cached to, and
where data is output to
Where Data is Discovered#
Recipe input is defined in the recipe itself. Most recipes will define a pangeo_forge_recipes.patterns.FilePattern
that provides the pipeline with input file locations.
The example below taken from this example recipe
import apache_beam as beam
import pandas as pd
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
dates = [
d.to_pydatetime().strftime('%Y%m%d')
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]
def make_url(time):
url_base = "https://storage.googleapis.com/pforge-test-data"
return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"
concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, concat_dim)
recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
| StoreToZarr(
store_name="gpcp",
combine_dims=pattern.combine_dim_keys,
)
)
Where Data is Cached#
The inputs and metadata can be cached so items don’t have to be resolved again during the next recipe run. Defining where the caching happens is part of the runner file configuration. More about that in the next session. Note that in those upcoming examples keys `InputCacheStorage`` target these options.
Where Data is Output#
Defining where the data is output is also part of the runner file configuration. More about that in the next session. Note
that in those upcoming examples the key TargetStorage
defines this option.
Configuration Files#
Let’s build a configuration file for where data is cached and output. It’s recommended to configure dataset recipes using configuration files. A couple of different configuration file formats can be used as seen below.
Notice the use of {{job_name}}
in the root_path
configuration examples below.
{{job_name}}
is special within root_path
and will be treated as a template-value based on
Bake.job_name
which can be provided through the CLI or in the configuration file itself and, failing
to be provided, will be generated automatically.
Also notice that we are going to store everything in s3 below because we chose s3fs.S3FileSystem
. This isn’t a requirement.
Pangeo-Forge aims to be storage agnostic. By depending on fsspec
we’re able to plug in supported backends.
Review other well-known fsspec
built-in implementations
and fsspec
third-party implementations.
JSON based configuration file:
{ "TargetStorage": { "fsspec_class": "s3fs.S3FileSystem", "fsspec_args": { "key": "<your-aws-access-key>", "secret": "<your-aws-access-secret>", "client_kwargs":{"region_name":"<your-aws-bucket-region>"} }, "root_path": "s3://<bucket-name>/<some-prefix>/{{job_name}}/output" }, "InputCacheStorage": { "fsspec_class": "s3fs.S3FileSystem", "fsspec_args": { "key": "<your-aws-access-key>", "secret": "<your-aws-access-secret>", "client_kwargs":{"region_name":"<your-aws-bucket-region>"} }, "root_path": "s3://<bucket-name>/<some-prefix>/input/cache" } }
Python based configuration file (configuration is based on the traitlets library):
BUCKET_PREFIX = "s3://<bucket-name>/<some-prefix>/" # The storage backend we want s3_fsspec = "s3fs.S3FileSystem" # Credentials for the backend s3_args = { "key": "<your-aws-access-key>", "secret": "<your-aws-access-secret>", "client_kwargs":{"region_name":"<your-aws-bucket-region>"} } # Take note: this is just python. We can reuse these values below c.TargetStorage.fsspec_class = s3_fsspec # Target output should be partitioned by `{{job_name}}` c.TargetStorage.root_path = f"{BUCKET_PREFIX}/{{job_name}}/output" c.TargetStorage.fsspec_args = s3_args c.InputCacheStorage.fsspec_class = s3_fsspec c.InputCacheStorage.fsspec_args = s3_args # Input data cache should *not* be partitioned by `{{job_name}}`, as we want to get the datafile from the source only once c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/input"
Other Configuration Options#
A subset of the configuration schema that we just talked about related to inputs/outputs/caches gets dependency injected into the recipe by the runner.
Various other runner options (documented here and flink-specific here)
can also be passed either in the file configuration format above or directly during CLI bake
calls. It’s recommended that you use
the file configuration because some argument values are JSON based.
Here’s a quick example of a configuration file and bake
command slightly more complicated where you’re passing flink-specific configuration options and runner options.
Note that -f <path-to-your-runner-config>.<json||py>
would point to the runner configuration file talked about above:
BUCKET_PREFIX = "s3://<bucket-name>/<some-prefix>/"
# The storage backend we want
s3_fsspec = "s3fs.S3FileSystem"
# Credentials for the backend
s3_args = {
"key": "<your-aws-access-key>",
"secret": "<your-aws-access-secret>",
"client_kwargs":{"region_name":"<your-aws-bucket-region>"}
}
# Take note: this is just python. We can reuse these values below
c.FlinkOperatorBakery.job_manager_resources = {"memory": "2048m", "cpu": 1.0}
c.FlinkOperatorBakery.task_manager_resources = {"memory": "2048m", "cpu": 1.0}
c.FlinkOperatorBakery.flink_configuration= {
"taskmanager.numberOfTaskSlots": "1",
"taskmanager.memory.flink.size": "1536m",
"taskmanager.memory.task.off-heap.size": "256m",
"taskmanager.memory.jvm-overhead.max": "1024m"
}
c.FlinkOperatorBakery.parallelism = 1
c.FlinkOperatorBakery.flink_version = "1.16"
# NOTE: image name must match the version of python and apache-beam being used locally
c.Bake.container_image = 'apache/beam_python3.9_sdk:2.50.0'
c.Bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
c.TargetStorage.fsspec_class = s3_fsspec
# Target output should be partitioned by `{{job_name}}`
c.TargetStorage.root_path = f"{BUCKET_PREFIX}/{{job_name}}/output"
c.TargetStorage.fsspec_args = s3_args
c.InputCacheStorage.fsspec_class = s3_fsspec
c.InputCacheStorage.fsspec_args = s3_args
# Input data cache should *not* be partitioned by `{{job_name}}`, as we want to get the datafile from the source only once
c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/input"
pangeo-forge-runner bake \
--repo="https://github.com/pforgetest/gpcp-from-gcs-feedstock.git" \
--ref="0.10.3" \
-f <path-to-your-runner-config>.<json||py> \
--Bake.job_name=recipe \
--prune
Where you put things is your choice but please be careful: you don’t want to commit AWS secrets into GH!
Running the Recipe#
Now let’s run the integration test recipe!
Below is the minimal required configuration args for running something successful for Flink on pangeo-forge-runner>=0.9.1
where <path-to-your-runner-config>.<json||py>
is your configuration file talked about above:
BUCKET_PREFIX = "s3://<bucket-name>/<some-prefix>/"
# The storage backend we want
s3_fsspec = "s3fs.S3FileSystem"
# Credentials for the backend
s3_args = {
"key": "<your-aws-access-key>",
"secret": "<your-aws-access-secret>",
"client_kwargs":{"region_name":"<your-aws-bucket-region>"}
}
# Take note: this is just python. We can reuse these values below
c.FlinkOperatorBakery.flink_configuration= {
"taskmanager.numberOfTaskSlots": "1",
}
c.FlinkOperatorBakery.parallelism = 1
c.FlinkOperatorBakery.flink_version = "1.16"
# NOTE: image name must match the version of python and apache-beam being used locally
c.Bake.container_image = 'apache/beam_python3.9_sdk:2.50.0'
c.Bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
c.TargetStorage.fsspec_class = s3_fsspec
# Target output should be partitioned by `{{job_name}}`
c.TargetStorage.root_path = f"{BUCKET_PREFIX}/{{job_name}}/output"
c.TargetStorage.fsspec_args = s3_args
c.InputCacheStorage.fsspec_class = s3_fsspec
c.InputCacheStorage.fsspec_args = s3_args
# Input data cache should *not* be partitioned by `{{job_name}}`, as we want to get the datafile from the source only once
c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/input"
pangeo-forge-runner bake \
--repo=https://github.com/pforgetest/gpcp-from-gcs-feedstock.git \
--ref="0.10.3" \
-f <path-to-your-runner-config>.<json||py> \
--Bake.job_name=recipe \
--prune
Note that Bake.prune=True
onlys tests the recipe by running the first two time steps like the integration tests do.
Monitoring Job Output via the Flink Dashboard#
After you run the pangeo-forge-runner
command, among the many lines of output,
you should see something that looks like:
You can run 'kubectl port-forward --pod-running-timeout=2m0s --address 127.0.0.1 <some-name> 0:8081' to make the Flink Dashboard available!
If you copy the command provided in the message and run it, it should provide you with a local address where the Flink Job Manager Dashboard will be available!
$ kubectl port-forward --pod-running-timeout=2m0s --address 127.0.0.1 <some-name> 0:8081
Forwarding from 127.0.0.1:<some-number> -> 8081
Copy the 127.0.0.1:<some-number>
URL to your browser, and tada! Now you can watch our job and tasks remotely
Monitoring Job Output via k8s Logs#
There are other ways to understand when your jobs/tasks have completed by querying the k8s logs. The job manager will include any tracebacks from your task managers and will also include the most reasonable output about whether things succeeded
First, list out your pods and find your job manager pod as noted below:
$ kubectl -n default get pod
Then grok the output and pay attention to these notes:
NAME READY STATUS RESTARTS AGE
pod/flink-kubernetes-operator-559fccd895-pfdwj 2/2 Running 2 (3d1h ago) 6d21h
# NOTE: the job manager here gets provisioned as `<Bake.job_name>-<k8s-resource-hash>`
# so `Bake.job_name="nz-5ftesting"` in this case
pod/nz-5ftesting-66d8644f49-wnndr 1/1 Running 0 55m
# NOTE: the task managers always have a similar suffix depending on your
# `--FlinkOperatorBakery.parallelism` setting. Here it was set to `--FlinkOperatorBakery.parallelism=2`
pod/nz-5ftesting-task-manager-1-1 1/1 Running 0 55m
pod/nz-5ftesting-task-manager-1-2 1/1 Running 0 55m
Dumping the job manager logs and grepping for this line will signal a success or failure of overall job:
$ kubectl -n default logs pod/nz-5ftesting-66d8644f49-wnndr | grep 'Job BeamApp-flink-.*RUNNING to.*'
# which might show this in a fail case:
2023-11-06 17:46:20,299 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Job BeamApp-flink-1106174115-5000126 (d4a75ed1adf735f89649113c82b4f45a) switched from state RUNNING to FAILING.
# or this in a successful case
2023-11-06 17:46:20,299 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Job BeamApp-flink-1106174115-5000126 (d4a75ed1adf735f89649113c82b4f45a) switched from state RUNNING to COMPLETED.
Flink Memory Allocation Tricks and Trapdoors#
Sometimes you’ll have jobs fail on Flink with errors about not enough off-heap
memory or the JVM being OOM killed. Here
are some configuration options to think about when running jobs.
The kind: FlinkDeployment
resource has a goal which is to spin up a job manager. The job manager has a goal which is
to spin up the task managers (depending on your --FlinkOperatorBakery.parallelism
setting). In k8s land
you can get a sense for which deployment/pod is which by considering your --Bake.job_name
:
First run this command:
$ kubectl -n default get pod
Then consider this output:
NAME READY STATUS RESTARTS AGE
pod/flink-kubernetes-operator-559fccd895-pfdwj 2/2 Running 2 (3d1h ago) 6d21h
# NOTE: the job manager here gets provisioned as `<Bake.job_name>-<k8s-resource-hash>`
pod/nz-5ftesting-66d8644f49-wnndr 1/1 Running 0 55m
# NOTE: the task managers always have a similar suffix depending on your
# `--FlinkOperatorBakery.parallelism` setting. Here it was set to `--FlinkOperatorBakery.parallelism=2`
pod/nz-5ftesting-task-manager-1-1 1/1 Running 0 55m
pod/nz-5ftesting-task-manager-1-2 1/1 Running 0 55m
If we grok the first ten lines of the job manager logs we get a nice ascii breakdown of the job manager resourcing that corresponds to the matching Flink memory configurations.
Run this command:
$ kubectl logs pod/nz-5ftesting-66d8644f49-wnndr | grep -a6 "Final Master Memory configuration:"
Consider this output:
INFO [] - Loading configuration property: jobmanager.memory.process.size, 6144m
INFO [] - Loading configuration property: taskmanager.rpc.port, 6122
INFO [] - Loading configuration property: internal.cluster.execution-mode, NORMAL
INFO [] - Loading configuration property: kubernetes.jobmanager.cpu, 1.0
INFO [] - Loading configuration property: prometheus.io/scrape, true
INFO [] - Loading configuration property: $internal.flink.version, v1_16
INFO [] - Final Master Memory configuration:
# set via: `--FlinkOperatorBakery.job_manager_resources='{"memory": "<size>m", "cpu": <float>}'`
# corresponds to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#jobmanager.memory.process.size
INFO [] - Total Process Memory: 6.000gb (6442450944 bytes)
# set via: `-FlinkOperatorBakery.flink_configuration='{"taskmanager.memory.flink.size": "<size>m"}'`
# corresponds to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#jobmanager-memory-flink-size
INFO [] - Total Flink Memory: 5.150gb (5529770384 bytes)
INFO [] - JVM Heap: 5.025gb (5395552656 bytes)
INFO [] - Off-heap: 128.000mb (134217728 bytes)
INFO [] - JVM Metaspace: 256.000mb (268435456 bytes)
INFO [] - JVM Overhead: 614.400mb (644245104 bytes)
While the task manager doesn’t have this nice memory configuration breakdown, the Flink memory configurations have all the same options for the task managers.
These settings are probably recipe specific b/c it depends what you are doing. A good default is to set the job/task manager process memory higher than needed and hope that the fractional allocations talked in the Flink memory configuration will carve off enough room to do their job.
Then only adjust if you are getting specific errors about memory. The one caveat here is that taskmanager.memory.flink.size
can’t have a fractional allocation of total process memory and so it’s best to set it outright at or below your total process memory:
pangeo-forge-runner bake \
--repo=https://github.com/pforgetest/gpcp-from-gcs-feedstock.git \
--ref="0.10.3" \
-f <path-to-your-runner-config>.<json||py> \
# corresponds to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#jobmanager.memory.process.size
--FlinkOperatorBakery.job_manager_resources='{"memory": "2048m", "cpu": 0.3}' \
# corresponds to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#taskmemory.memory.process.size
--FlinkOperatorBakery.task_manager_resources='{"memory": "2048m", "cpu": 0.3}' \
# corresponds to any options from: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/
--FlinkOperatorBakery.flink_configuration='{"taskmanager.numberOfTaskSlots": "1", "taskmanager.memory.flink.size": "1536m"}' \
# corresponds to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#pipeline-max-parallelism
--FlinkOperatorBakery.parallelism=1 \
--FlinkOperatorBakery.flink_version="1.16" \
--Bake.prune=True \
--Bake.job_name=recipe \
# corresponds: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#kubernetes-container-image
--Bake.container_image='apache/beam_python3.9_sdk:2.50.0' \
--Bake.bakery_class="pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"
Contents#
Configuration Reference#
pangeo-forge-runner
uses the very flexible
traitlets system (also used by
the Jupyter & Dask ecosystems) for configuration. This page links out to all
the configurable parts of pangeo-forge-runner
. Configuration can be set via
command-line
options,
or via a dedicated config
file
in .json
or .py
format
Bakery#
The “Baker” configures the compute plane where the recipe is baked - it wraps an Apache Beam Runner in an opinionated fashion.
DataflowBakery#
- class pangeo_forge_runner.bakery.dataflow.DataflowBakery(**kwargs: Any)#
Bake a Pangeo Forge recipe on GCP Dataflow
- machine_type c.DataflowBakery.machine_type = Unicode('n1-highmem-2')#
GCP Machine type to use for the Dataflow jobs.
Ignored if use_dataflow_prime is set.
- max_num_workers c.DataflowBakery.max_num_workers = Int(None)#
Maximum number of workers this job can be autoscaled to.
Set to None (default) for no limit.
- project_id c.DataflowBakery.project_id = Unicode(None)#
GCP Project to submit the Dataflow job into.
Defaults to the output of gcloud config get-value project if unset. Must be set for the Bakery to function.
- region c.DataflowBakery.region = Unicode('us-central1')#
GCP Region to submit the Dataflow jobs into
- service_account_email c.DataflowBakery.service_account_email = Unicode(None)#
If using a GCP service account to deploy Dataflow jobs, this option specifies the service account email address, which must be set to avoid permissions issues during pipeline execution. If you are using GCP user creds, do not set this value.
Defaults to the output of gcloud config get-value account if this value is a service account email address. If this value is a user email address, defaults to None.
- temp_gcs_location c.DataflowBakery.temp_gcs_location = Unicode(None)#
GCS URL under which to put temporary files required to launch dataflow jobs
Must be set, and be a gs:// URL.
- use_dataflow_prime c.DataflowBakery.use_dataflow_prime = Bool(False)#
Use GCP’s DataFlow Prime instead of regular DataFlow.
https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime has more information on the advantages of dataflow prime.
- use_public_ips c.DataflowBakery.use_public_ips = Bool(False)#
Use public IPs for the Dataflow workers.
Set to false for projects that have policies against VM instances having their own public IPs
LocalDirectBakery#
- class pangeo_forge_runner.bakery.local.LocalDirectBakery(**kwargs: Any)#
Bake recipes on your local machine, without docker.
Uses the Apache Beam DirectRunner
- num_workers c.LocalDirectBakery.num_workers = Int(0)#
Number of workers to use when baking the recipe.
Defaults to 0, which is interpreted by Apache beam to be the number of CPUs on the machine
FlinkOperatorBakery#
- class pangeo_forge_runner.bakery.flink.FlinkOperatorBakery(**kwargs: Any)#
Bake a Pangeo Forge recipe on a Flink cluster based on the Apache Flink k8s Operator
Requires a kubernetes cluster with apache/flink-kubernetes-operator installed
- beam_executor_resources c.FlinkOperatorBakery.beam_executor_resources = Dict()#
Resources to be given the beam executor container.
Passed through to the kubernetes specification for the container that actually runs the custom python code we have. See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-requests-and-limits-of-pod-and-container for possible options.
Note that this is not specified the same way as other resource request config on this class.
- enable_job_archiving c.FlinkOperatorBakery.enable_job_archiving = Bool(False)#
Enable the ability for past jobs to be archived so the job manager’s REST API can still return information after completing
- flink_configuration c.FlinkOperatorBakery.flink_configuration = Dict()#
Properties to set as Flink configuration.
See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/ for full list of configuration options. Make sure you are looking at the right setup for the version of Flink you are using.
- flink_version c.FlinkOperatorBakery.flink_version = Unicode('1.16')#
Version of Flink to use.
Must be a version supported by the Flink Operator installed in the cluster
- job_archive_efs_mount c.FlinkOperatorBakery.job_archive_efs_mount = Unicode('/opt/history/jobs')#
The NFS mount path where past jobs are archived so the historyserver REST API can return some information about job statuses after job managers are torn down
The default path here corresponds to what the pangeo-forge-cloud-federation Terraform deploys as the mount path: pangeo-forge/pangeo-forge-cloud-federation
If using that Terraform you can configure the path via historyserver_mount_path: pangeo-forge/pangeo-forge-cloud-federation
- job_manager_resources c.FlinkOperatorBakery.job_manager_resources = Dict()#
Memory & CPU resources to give to the jobManager pod.
Passed through to .spec.jobManager.resource in the FlinkDeployment CRD.
See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource for accepted keys and what they mean. Specifically, note that this is not specified the same way as kubernetes resource requests in general.
Note that at least memory must be set.
- max_parallelism c.FlinkOperatorBakery.max_parallelism = Int(-1)#
The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.
Defaults to -1, which is no limit.
- parallelism c.FlinkOperatorBakery.parallelism = Int(-1)#
The degree of parallelism to be used when distributing operations onto workers.
Defaults to -1, which uses Flinks’ defaults.
- task_manager_resources c.FlinkOperatorBakery.task_manager_resources = Dict()#
Memory & CPU resources to give to the taskManager container.
Passed through to .spec.taskManager.resource in the FlinkDeployment CRD.
Note this is just the resources for the taskManager container only - not for the beam executor container where our python code is actually executed. That is managed via beam_executor_resources.
See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource for accepted keys and what they mean. Specifically, note that this is not specified the same way as kubernetes resource requests in general.
Note that at least memory must be set.
Storage#
The Storage configuration lets you configure via fsspec
where pangeo-forge-runner
puts intermediate & final data products.
TargetStorage#
- class pangeo_forge_runner.storage.TargetStorage(**kwargs: Any)#
Storage configuration for where the baked data should be stored
- fsspec_args c.TargetStorage.fsspec_args = Dict()#
Args to pass to fsspec_class during instantiation
- fsspec_class c.TargetStorage.fsspec_class = Type(<class 'fsspec.spec.AbstractFileSystem'>)#
FSSpec Filesystem to instantiate as class for this target
- root_path c.TargetStorage.root_path = Unicode('')#
Root path under which to put all our storage.
If {job_name} is present in the root_path, it will be expanded to the unique job_name of the current job.
InputCacheStorage#
- class pangeo_forge_runner.storage.InputCacheStorage(**kwargs: Any)#
Storage configuration for caching input files during recipe baking
- fsspec_args c.InputCacheStorage.fsspec_args = Dict()#
Args to pass to fsspec_class during instantiation
- fsspec_class c.InputCacheStorage.fsspec_class = Type(<class 'fsspec.spec.AbstractFileSystem'>)#
FSSpec Filesystem to instantiate as class for this target
- root_path c.InputCacheStorage.root_path = Unicode('')#
Root path under which to put all our storage.
If {job_name} is present in the root_path, it will be expanded to the unique job_name of the current job.
Commands#
pangeo-forge-runner
supports various commands that
can be configured via traitlets.
expand-meta
#
- class pangeo_forge_runner.commands.expand_meta.ExpandMeta(**kwargs: Any)#
Application to expand meta.yaml to be fully formed.
Will execute arbitrary code if necessary to resolve dict_object recipes.
- config_file c.ExpandMeta.config_file = Unicode('pangeo_forge_runner_config.py')#
Load traitlet config from this file if it exists
- content_providers c.ExpandMeta.content_providers = List()#
List of ContentProviders to use to fetch repo.
Uses ContentProviders from repo2docker for doing most of the work. The ordering matters, and Git is used as the default for any URL that we can not otherwise determine.
If we want to support additional contentproviders, ideally we can contribute them upstream to repo2docker.
- feedstock_subdir c.ExpandMeta.feedstock_subdir = Unicode('feedstock')#
Subdirectory inside the repository containing the meta.yaml file
- json_logs c.ExpandMeta.json_logs = Bool(False)#
Provide JSON formatted logging output to stdout.
If set to True, all output will be emitted as one JSON object per line.
Each line will have at least a ‘status’ field and a ‘message’ field. Various other keys will also be present based on the command being called and the value of ‘status’.
TODO: This must get a JSON schema.
- log_datefmt c.ExpandMeta.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')#
The date format used by logging formatters for %(asctime)s
- log_format c.ExpandMeta.log_format = Unicode('[%(name)s]%(highlevel)s %(message)s')#
The Logging format template
- logging_config c.ExpandMeta.logging_config = Dict()#
Logging configuration for this python application.
When set, this value is passed to logging.config.dictConfig, and can be used to configure how logs throughout the application are handled, not just for logs from this application alone.
See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig for more details.
- ref c.ExpandMeta.ref = Unicode(None)#
Ref of feedstock repo to fetch.
Optional, only used for some methods of fetching (such as git or mercurial)
- repo c.ExpandMeta.repo = Unicode('')#
URL of feedstock repo to operate on.
Can be anything that is interpretable by self.content_providers, using Repo2Docker ContentProviders. By default, this includes Git repos, Mercurial Repos, Zenodo, Figshare, Dataverse, Hydroshare, Swhid and local file paths.
- show_config c.ExpandMeta.show_config = Bool(False)#
Instead of starting the Application, dump configuration to stdout
- show_config_json c.ExpandMeta.show_config_json = Bool(False)#
Instead of starting the Application, dump configuration to stdout (as JSON)
bake
#
- class pangeo_forge_runner.commands.bake.Bake(**kwargs: Any)#
Command to bake a pangeo forge recipe in a given bakery
- bakery_class c.Bake.bakery_class = Type(<class 'pangeo_forge_runner.bakery.local.LocalDirectBakery'>)#
The Bakery to bake this recipe in.
The Bakery (and its configuration) determine which Apache Beam Runner is used, and how options for it are specified. Defaults to LocalDirectBakery, which bakes the recipe using Apache Beam’s “DirectRunner”. It doesn’t use Docker or the cloud, and runs everything locally. Useful only for testing!
- config_file c.Bake.config_file = Unicode('pangeo_forge_runner_config.py')#
Load traitlet config from this file if it exists
- container_image c.Bake.container_image = Unicode('')#
Container image to use for this job.
For GCP DataFlow leaving it blank defaults to letting beam automatically figure out the image to use for the workers based on version of beam and python in use.
For Flink it’s required that you pass an beam image for the version of python and beam you are targeting for example: apache/beam_python3.10_sdk:2.51.0 more info: https://hub.docker.com/layers/apache/
Note that some runners (like the local one) may not support this!
- content_providers c.Bake.content_providers = List()#
List of ContentProviders to use to fetch repo.
Uses ContentProviders from repo2docker for doing most of the work. The ordering matters, and Git is used as the default for any URL that we can not otherwise determine.
If we want to support additional contentproviders, ideally we can contribute them upstream to repo2docker.
- feedstock_subdir c.Bake.feedstock_subdir = Unicode('feedstock')#
Subdirectory inside the repository containing the meta.yaml file
- job_name c.Bake.job_name = Unicode(None)#
Optionally pass a custom job name for the job run.
If None (the default), a unique name will be generated for the job.
- json_logs c.Bake.json_logs = Bool(False)#
Provide JSON formatted logging output to stdout.
If set to True, all output will be emitted as one JSON object per line.
Each line will have at least a ‘status’ field and a ‘message’ field. Various other keys will also be present based on the command being called and the value of ‘status’.
TODO: This must get a JSON schema.
- log_datefmt c.Bake.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')#
The date format used by logging formatters for %(asctime)s
- log_format c.Bake.log_format = Unicode('[%(name)s]%(highlevel)s %(message)s')#
The Logging format template
- logging_config c.Bake.logging_config = Dict()#
Logging configuration for this python application.
When set, this value is passed to logging.config.dictConfig, and can be used to configure how logs throughout the application are handled, not just for logs from this application alone.
See https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig for more details.
- prune c.Bake.prune = Bool(False)#
Prune the recipe to only run for 2 time steps.
Makes it much easier to test recipes!
- recipe_id c.Bake.recipe_id = Unicode(None)#
Optionally pass this value to run only this recipe_id from the feedstock.
If empty, all recipes from the feedstock will be run.
- ref c.Bake.ref = Unicode(None)#
Ref of feedstock repo to fetch.
Optional, only used for some methods of fetching (such as git or mercurial)
- repo c.Bake.repo = Unicode('')#
URL of feedstock repo to operate on.
Can be anything that is interpretable by self.content_providers, using Repo2Docker ContentProviders. By default, this includes Git repos, Mercurial Repos, Zenodo, Figshare, Dataverse, Hydroshare, Swhid and local file paths.
- show_config c.Bake.show_config = Bool(False)#
Instead of starting the Application, dump configuration to stdout
- show_config_json c.Bake.show_config_json = Bool(False)#
Instead of starting the Application, dump configuration to stdout (as JSON)
Glossary#
A glossary of common terms used throughout pangeo-forge-runner.
job_id
#An autogenerated, non-human readable unique ID that represents a particular beam job. These are generated by the beam runner on submission, not set by the submitting user.
For some bakeries (such as flink), the beam runner does not generate a unique ID during submission. In those cases, this will be the same as job_name
As this is only known after job submission, this is not available for template expansion in the job specification - so you can not use {job_id} in various TargetPaths, for example.
job_name
#A human readable, human set, but not necessarily globally unique ID that represents a particular beam job. These are set on the commandline with
--Bake.job_name=test-flink
(or similar traitlets config). If not set, pangeo-forge-runner will try to automatically generate a descriptive name.These can only contain lower case characters (a-z), digits (0-9) and dashes (-).
Development#
Release#
Releases are automated by the release.yaml
GitHub Workflow,
which is triggered by tag events.
To cut a new release, those with push permissions to the repo, may run:
git tag $VERSION
git push origin --tags
Where $VERSION
is a three-element, dot-delimited semantic version of the form
v{MAJOR}.{MINOR}.{PATCH}
, which is appropriately incremented from the prior tag.
And origin
is assumed to be the remote corresponding to
pangeo-forge/pangeo-forge-runner
.