Skip to content

Commit d0e451f

Browse files
committed
WIP: add files for dataproc tutorial
1 parent b04bd37 commit d0e451f

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
Example Airflow DAG that show how to use various Dataproc
20+
operators to manage a cluster and submit jobs.
21+
"""
22+
23+
import os
24+
25+
from airflow import models
26+
from airflow.providers.google.cloud.operators.dataproc import (
27+
DataprocCreateClusterOperator,
28+
DataprocCreateWorkflowTemplateOperator,
29+
DataprocDeleteClusterOperator,
30+
DataprocInstantiateWorkflowTemplateOperator,
31+
DataprocSubmitJobOperator,
32+
DataprocUpdateClusterOperator,
33+
)
34+
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
35+
from airflow.utils.dates import days_ago
36+
37+
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "leah-playground")
38+
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "cluster-0c23")
39+
REGION = os.environ.get("GCP_LOCATION", "us-central1")
40+
# ZONE = os.environ.get("GCP_REGION", "europe-west1-b")
41+
# BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
42+
# OUTPUT_FOLDER = "wordcount"
43+
# OUTPUT_PATH = f"gs://{BUCKET}/{OUTPUT_FOLDER}/"
44+
# PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")
45+
# PYSPARK_URI = f"gs://{BUCKET}/{PYSPARK_MAIN}"
46+
# SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
47+
# SPARKR_URI = f"gs://{BUCKET}/{SPARKR_MAIN}"
48+
49+
# Cluster definition
50+
# [START how_to_cloud_dataproc_create_cluster]
51+
52+
CLUSTER_CONFIG = {
53+
"master_config": {
54+
"num_instances": 1,
55+
"machine_type_uri": "n1-standard-4",
56+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
57+
},
58+
"worker_config": {
59+
"num_instances": 2,
60+
"machine_type_uri": "n1-standard-4",
61+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
62+
},
63+
}
64+
65+
# [END how_to_cloud_dataproc_create_cluster]
66+
67+
# # Update options
68+
# # [START how_to_cloud_dataproc_updatemask_cluster_operator]
69+
# CLUSTER_UPDATE = {
70+
# "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
71+
# }
72+
# UPDATE_MASK = {
73+
# "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
74+
# }
75+
# # [END how_to_cloud_dataproc_updatemask_cluster_operator]
76+
77+
# TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
78+
79+
# [START how_to_cloud_dataproc_pyspark_config]
80+
# PYSPARK_JOB = {
81+
# "reference": {"project_id": PROJECT_ID},
82+
# "placement": {"cluster_name": CLUSTER_NAME},
83+
# "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
84+
# }
85+
PYSPARK_JOB = {
86+
"reference": {"project_id": "leah-playground"},
87+
"placement": {"cluster_name": CLUSTER_NAME},
88+
"pyspark_job": {"main_python_file_uri": "gs://leah-playground/word-count.py", "args": ["gs://leah-playground/input", "gs://leah-playground/output-0"]},
89+
}
90+
# [END how_to_cloud_dataproc_pyspark_config]
91+
92+
93+
with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interval=None) as dag:
94+
# [START how_to_cloud_dataproc_create_cluster_operator]
95+
create_cluster = DataprocCreateClusterOperator(
96+
task_id="create_cluster",
97+
project_id=PROJECT_ID,
98+
cluster_config=CLUSTER_CONFIG,
99+
region=REGION,
100+
cluster_name=CLUSTER_NAME,
101+
)
102+
# # [END how_to_cloud_dataproc_create_cluster_operator]
103+
104+
# # [START how_to_cloud_dataproc_update_cluster_operator]
105+
# scale_cluster = DataprocUpdateClusterOperator(
106+
# task_id="scale_cluster",
107+
# cluster_name=CLUSTER_NAME,
108+
# cluster=CLUSTER_UPDATE,
109+
# update_mask=UPDATE_MASK,
110+
# graceful_decommission_timeout=TIMEOUT,
111+
# project_id=PROJECT_ID,
112+
# location=REGION,
113+
# )
114+
# # [END how_to_cloud_dataproc_update_cluster_operator]
115+
116+
117+
# [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
118+
pyspark_task = DataprocSubmitJobOperator(
119+
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
120+
)
121+
# [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
122+
123+
124+
# # [START how_to_cloud_dataproc_delete_cluster_operator]
125+
delete_cluster = DataprocDeleteClusterOperator(
126+
task_id="delete_cluster", project_id=PROJECT_ID, cluster_name=CLUSTER_NAME, region=REGION
127+
)
128+
# # [END how_to_cloud_dataproc_delete_cluster_operator]
129+
create_cluster >> pyspark_task >> delete_cluster
130+
# create_cluster >> scale_cluster
131+
# scale_cluster >> create_workflow_template >> trigger_workflow >> delete_cluster
132+
# scale_cluster >> hive_task >> delete_cluster
133+
# scale_cluster >> pig_task >> delete_cluster
134+
# scale_cluster >> spark_sql_task >> delete_cluster
135+
# scale_cluster >> spark_task >> delete_cluster
136+
# scale_cluster >> spark_task_async >> spark_task_async_sensor >> delete_cluster
137+
# scale_cluster >> pyspark_task >> delete_cluster
138+
# scale_cluster >> sparkr_task >> delete_cluster
139+
# scale_cluster >> hadoop_task >> delete_cluster
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/usr/bin/env python
2+
3+
import pyspark
4+
import sys
5+
6+
if len(sys.argv) != 3:
7+
raise Exception("Exactly 2 arguments are required: ")
8+
9+
inputUri=sys.argv[1]
10+
outputUri=sys.argv[2]
11+
12+
sc = pyspark.SparkContext()
13+
lines = sc.textFile(sys.argv[1])
14+
words = lines.flatMap(lambda line: line.split())
15+
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
16+
wordCounts.saveAsTextFile(sys.argv[2])

0 commit comments

Comments
 (0)