Bakery#

The “Baker” configures the compute plane where the recipe is baked - it wraps an Apache Beam Runner in an opinionated fashion.

DataflowBakery#

class pangeo_forge_runner.bakery.dataflow.DataflowBakery(**kwargs: Any)#

Bake a Pangeo Forge recipe on GCP Dataflow

disk_size_gb c.DataflowBakery.disk_size_gb = Int(None)#

The disk size, in gigabytes, to use on each remote Compute Engine worker instance.

Set to None (default) for default sizing (see https://cloud.google.com/dataflow/docs/reference/pipeline-options#worker-level_options for details).

machine_type c.DataflowBakery.machine_type = Unicode('n1-highmem-2')#

GCP Machine type to use for the Dataflow jobs.

Ignored if use_dataflow_prime is set.

max_num_workers c.DataflowBakery.max_num_workers = Int(None)#

Maximum number of workers this job can be autoscaled to.

Set to None (default) for no limit.

project_id c.DataflowBakery.project_id = Unicode(None)#

GCP Project to submit the Dataflow job into.

Defaults to the output of gcloud config get-value project if unset. Must be set for the Bakery to function.

region c.DataflowBakery.region = Unicode('us-central1')#

GCP Region to submit the Dataflow jobs into

service_account_email c.DataflowBakery.service_account_email = Unicode(None)#

If using a GCP service account to deploy Dataflow jobs, this option specifies the service account email address, which must be set to avoid permissions issues during pipeline execution. If you are using GCP user creds, do not set this value.

Defaults to the output of gcloud config get-value account if this value is a service account email address. If this value is a user email address, defaults to None.

temp_gcs_location c.DataflowBakery.temp_gcs_location = Unicode(None)#

GCS URL under which to put temporary files required to launch dataflow jobs

Must be set, and be a gs:// URL.

use_dataflow_prime c.DataflowBakery.use_dataflow_prime = Bool(False)#

Use GCP’s DataFlow Prime instead of regular DataFlow.

https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime has more information on the advantages of dataflow prime.

use_public_ips c.DataflowBakery.use_public_ips = Bool(False)#

Use public IPs for the Dataflow workers.

Set to false for projects that have policies against VM instances having their own public IPs

LocalDirectBakery#

class pangeo_forge_runner.bakery.local.LocalDirectBakery(**kwargs: Any)#

Bake recipes on your local machine, without docker.

Uses the Apache Beam DirectRunner

num_workers c.LocalDirectBakery.num_workers = Int(0)#

Number of workers to use when baking the recipe.

Defaults to 0, which is interpreted by Apache beam to be the number of CPUs on the machine

FlinkOperatorBakery#

class pangeo_forge_runner.bakery.flink.FlinkOperatorBakery(**kwargs: Any)#

Bake a Pangeo Forge recipe on a Flink cluster based on the Apache Flink k8s Operator

Requires a kubernetes cluster with apache/flink-kubernetes-operator installed

beam_executor_resources c.FlinkOperatorBakery.beam_executor_resources = Dict()#

Resources to be given the beam executor container.

Passed through to the kubernetes specification for the container that actually runs the custom python code we have. See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-requests-and-limits-of-pod-and-container for possible options.

Note that this is not specified the same way as other resource request config on this class.

enable_job_archiving c.FlinkOperatorBakery.enable_job_archiving = Bool(False)#

Enable the ability for past jobs to be archived so the job manager’s REST API can still return information after completing

Properties to set as Flink configuration.

See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/ for full list of configuration options. Make sure you are looking at the right setup for the version of Flink you are using.

Version of Flink to use.

Must be a version supported by the Flink Operator installed in the cluster

job_archive_efs_mount c.FlinkOperatorBakery.job_archive_efs_mount = Unicode('/opt/history/jobs')#

The NFS mount path where past jobs are archived so the historyserver REST API can return some information about job statuses after job managers are torn down

The default path here corresponds to what the pangeo-forge-cloud-federation Terraform deploys as the mount path: pangeo-forge/pangeo-forge-cloud-federation

If using that Terraform you can configure the path via historyserver_mount_path: pangeo-forge/pangeo-forge-cloud-federation

job_manager_resources c.FlinkOperatorBakery.job_manager_resources = Dict()#

Memory & CPU resources to give to the jobManager pod.

Passed through to .spec.jobManager.resource in the FlinkDeployment CRD.

See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource for accepted keys and what they mean. Specifically, note that this is not specified the same way as kubernetes resource requests in general.

Note that at least memory must be set.

max_parallelism c.FlinkOperatorBakery.max_parallelism = Int(-1)#

The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.

Defaults to -1, which is no limit.

parallelism c.FlinkOperatorBakery.parallelism = Int(-1)#

The degree of parallelism to be used when distributing operations onto workers.

Defaults to -1, which uses Flinks’ defaults.

task_manager_resources c.FlinkOperatorBakery.task_manager_resources = Dict()#

Memory & CPU resources to give to the taskManager container.

Passed through to .spec.taskManager.resource in the FlinkDeployment CRD.

Note this is just the resources for the taskManager container only - not for the beam executor container where our python code is actually executed. That is managed via beam_executor_resources.

See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource for accepted keys and what they mean. Specifically, note that this is not specified the same way as kubernetes resource requests in general.

Note that at least memory must be set.