Skip to content

Commit 6305246

Browse files
Handle AutoGKE in composer_db_transfer.py (GoogleCloudPlatform#6438)
(+ clarify DRS-related messages) Co-authored-by: Leah E. Cole <[email protected]>
1 parent bf7d37b commit 6305246

File tree

1 file changed

+18
-26
lines changed

1 file changed

+18
-26
lines changed

composer/tools/composer_db_transfer.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -820,8 +820,6 @@ def run_database_creation_workload_for_source_version(
820820
command:
821821
- /var/local/db_init.sh
822822
name: airflow-database-init-job
823-
securityContext:
824-
privileged: true
825823
env:
826824
- name: GCS_BUCKET
827825
value: {bucket}
@@ -933,7 +931,7 @@ def _remove_temporary_kubeconfig(self: typing.Any) -> None:
933931

934932
def _grant_permissions(self: typing.Any) -> None:
935933
"""Grants required permissions."""
936-
if not self.is_drs_enabled:
934+
if not self.is_drs_compliant:
937935
logger.info("*** Granting required permissions...")
938936
EnvironmentUtils.grant_permissions_to_the_bucket(
939937
self.sql_service_account,
@@ -946,7 +944,7 @@ def _grant_permissions(self: typing.Any) -> None:
946944
def _revoke_permissions(self: typing.Any) -> None:
947945
"""Revokes no longer required permissions."""
948946
logger.info("*** Revoking no longer needed permissions...")
949-
if not self.is_drs_enabled:
947+
if not self.is_drs_compliant:
950948
EnvironmentUtils.revoke_permissions_to_the_bucket(
951949
self.sql_service_account,
952950
self.gcs_bucket_name,
@@ -1043,12 +1041,6 @@ def _check_composer_system_namespace(self: typing.Any) -> None:
10431041
self.composer_system_namespace_exists = True
10441042
except Exception:
10451043
self.composer_system_namespace_exists = False
1046-
if self.composer_system_namespace_exists:
1047-
raise Exception(
1048-
"'composer-system' namespace has been detected in your GKE cluster. "
1049-
"It means that your environment is newer than this script. Please "
1050-
"check if newer version of this script is available."
1051-
)
10521044

10531045
def _check_cloud_sql_proxy(self: typing.Any) -> None:
10541046
"""Sets sql proxy."""
@@ -1064,8 +1056,8 @@ def _check_cloud_sql_proxy(self: typing.Any) -> None:
10641056

10651057
def _check_drs_and_select_db_storage_bucket(self: typing.Any) -> None:
10661058
"""Checks if the environment is DRS-compliant."""
1067-
logger.info("*** Checking if DRS is enabled...")
1068-
self.is_drs_enabled = False
1059+
logger.info("*** Checking if the environment is DRS-compliant...")
1060+
self.is_drs_compliant = False
10691061
bucket_name_prefix = self.cp_bucket_name[: -len("-bucket")]
10701062
agent_bucket_name = f"{bucket_name_prefix}-agent"
10711063
try:
@@ -1077,17 +1069,19 @@ def _check_drs_and_select_db_storage_bucket(self: typing.Any) -> None:
10771069
log_command=False,
10781070
log_error=False,
10791071
)
1080-
self.is_drs_enabled = True
1072+
self.is_drs_compliant = True
10811073
logger.info(
1082-
"%s bucket has been found -> DRS is enabled.", agent_bucket_name
1074+
"%s bucket has been found -> environment is DRS compliant.",
1075+
agent_bucket_name,
10831076
)
10841077
except Exception as e: # pylint: disable=broad-except
10851078
logger.info(
1086-
"%s bucket has not been found -> DRS is disabled. (%s)",
1079+
"%s bucket has not been found -> environment is not DRS compliant."
1080+
" (%s)",
10871081
agent_bucket_name,
10881082
e,
10891083
)
1090-
if self.is_drs_enabled:
1084+
if self.is_drs_compliant:
10911085
self.gcs_bucket_name = agent_bucket_name
10921086
logger.info("Bucket in customer project: %s.", self.cp_bucket_name)
10931087
logger.info("Bucket accessible from tenant project: %s.", self.gcs_bucket_name)
@@ -1116,9 +1110,7 @@ def __init__(
11161110
self.is_good_airflow_version = (
11171111
lambda a, b, c: True if a == 2 and (b > 0 or c >= 1) else False
11181112
)
1119-
self.bad_airflow_message = (
1120-
"Import operation supports only Airflow 2.0.1+."
1121-
)
1113+
self.bad_airflow_message = "Import operation supports only Airflow 2.0.1+."
11221114

11231115
def _read_source_fernet_key(self: typing.Any) -> None:
11241116
"""Reads fernet key from source environment."""
@@ -1149,9 +1141,9 @@ def _cloud_storage_path_to_imported_table(self: typing.Any, table: str) -> None:
11491141
"""Translates table name into a path to CSV file."""
11501142
return f"gs://{self.gcs_bucket_name}/import/tables/{table}.csv"
11511143

1152-
def _copy_csv_files_to_tp_if_drs_is_enabled(self: typing.Any) -> None:
1144+
def _copy_csv_files_to_tp_if_drs_compliant(self: typing.Any) -> None:
11531145
"""Copies CSV files to tenant project if DRS is enabled."""
1154-
if self.is_drs_enabled:
1146+
if self.is_drs_compliant:
11551147
logger.info("*** Copying CSV files to tenant project...")
11561148
command = (
11571149
f"gsutil -m cp -r gs://{self.cp_bucket_name}/import/tables/* "
@@ -1458,7 +1450,7 @@ def import_database(self: typing.Any) -> None:
14581450
self._check_environment()
14591451
self._read_source_fernet_key()
14601452
self._fail_fast_if_there_are_no_files_to_import()
1461-
self._copy_csv_files_to_tp_if_drs_is_enabled()
1453+
self._copy_csv_files_to_tp_if_drs_compliant()
14621454
self._delete_old_temporary_database_if_exists()
14631455
self._create_new_database()
14641456
self._initialize_new_database()
@@ -1568,9 +1560,9 @@ def _postprocess_tables(self: typing.Any) -> None:
15681560
for table, _, _ in tables:
15691561
self._post_process_exported_table(table)
15701562

1571-
def _copy_csv_files_to_cp_if_drs_is_enabled(self: typing.Any) -> None:
1572-
"""Copies CSV files to customer's project if DRS is enabled."""
1573-
if self.is_drs_enabled:
1563+
def _copy_csv_files_to_cp_if_drs_compliant(self: typing.Any) -> None:
1564+
"""Copies CSV files to customer's project for DRS-compliant env."""
1565+
if self.is_drs_compliant:
15741566
logger.info("*** Copying CSV files to customer's project...")
15751567
command = (
15761568
f"gsutil -m cp -r gs://{self.gcs_bucket_name}/export/tables/* "
@@ -1625,7 +1617,7 @@ def export_database(self: typing.Any) -> None:
16251617
self._export_tables()
16261618
finally:
16271619
self._revoke_permissions()
1628-
self._copy_csv_files_to_cp_if_drs_is_enabled()
1620+
self._copy_csv_files_to_cp_if_drs_compliant()
16291621
self._postprocess_tables()
16301622
self._export_dags_plugins_and_data()
16311623
self._remove_temporary_kubeconfig()

0 commit comments

Comments
 (0)