import pandas as pd
from sqlalchemy import create_engine
# MySQL 数据库连接配置
mysql_conn = create_engine('mysql+pymysql://username:password@localhost/mysql_db')
# PostgreSQL 数据库连接配置
pg_conn = create_engine('postgresql://username:password@localhost/postgres_db')
# 查询MySQL表数据
mysql_data = pd.read_sql_query("SELECT * FROM my_table", mysql_conn)
# 查询PostgreSQL表数据
pg_data = pd.read_sql_query("SELECT * FROM your_table", pg_conn)
# 将数据保存到本地CSV文件
mysql_data.to_csv('mysql_data.csv', index=False)
pg_data.to_csv('pg_data.csv', index=False)
# 使用hdfs3库上传数据到HDFS
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host='namenode', port=8020)
hdfs.put('mysql_data.csv', '/data/mysql_data.csv')
hdfs.put('pg_data.csv', '/data/pg_data.csv')
]]>
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
# 加载HDFS上的数据
df = spark.read.format("csv").option("header", "true").load("/data/*.csv")
# 去重
df_cleaned = df.dropDuplicates()
# 填充缺失值
df_cleaned = df_cleaned.na.fill({"column_name": "default_value"})
# 存储清理后的数据
df_cleaned.write.mode("overwrite").parquet("/data/cleaned_data")
]]>