Camada Bronze
Descrição
Na camada Bronze, os dados brutos da camada Landing são transformados em um formato mais estruturado. Nesta etapa, os dados ainda podem conter duplicatas e erros, mas estão mais organizados.
Objetivo
O objetivo da camada Bronze é realizar uma transformação inicial nos dados brutos para facilitar o processamento posterior.
Código: utf-8
Bronze
Listando todos os arquivos da camada Landing-Zone
landing_zone_path = "abfss://landing-zone@datalakeengdados.dfs.core.windows.net/"
df = spark.read.format("binaryFile").load(landing_zone_path)
file_paths = df.select("path").collect()
for file in file_paths:
print(file["path"])
Products
Gerando um dataframe (Products)
df_products = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/products.parquet', format='parquet')
df_products.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (Products)
from pyspark.sql.functions import current_timestamp, lit
df_products = df_products.withColumn("data_hora_bronze", current_timestamp())
df_products = df_products.withColumn("nome_arquivo", lit("products.parquet"))
df_products.printSchema()
df_products.show(10)
Salvando na camada Bronze com o formato Delta
bronze_products = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/products'
df_products.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_products)
Customers
Gerando um dataframe (Customers)
df_customers = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/customers.parquet', format='parquet')
df_customers.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (Customers)
from pyspark.sql.functions import current_timestamp, lit
df_customers = df_customers.withColumn("data_hora_bronze", current_timestamp())
df_customers = df_customers.withColumn("nome_arquivo", lit("customers.parquet"))
df_customers.printSchema()
df_customers.show(10)
Salvando na camada Bronze com o formato Delta (Customers)
bronze_customers = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/customers'
df_customers.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_customers)
Departments
Gerando um dataframe (Departments)
df_departments = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/departments.parquet', format='parquet')
df_departments.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (Departments)
from pyspark.sql.functions import current_timestamp, lit
df_departments = df_departments.withColumn("data_hora_bronze", current_timestamp())
df_departments = df_departments.withColumn("nome_arquivo", lit("departments.parquet"))
df_departments.printSchema()
df_departments.show(10)
Salvando na camada Bronze com o formato Delta (Departments)
bronze_departments = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/departments'
df_departments.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_departments)
Employees
Gerando um dataframe (Employees)
df_employees = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/employees.parquet', format='parquet')
df_employees.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (Employees)
from pyspark.sql.functions import current_timestamp, lit
df_employees = df_employees.withColumn("data_hora_bronze", current_timestamp())
df_employees = df_employees.withColumn("nome_arquivo", lit("employees.parquet"))
df_employees.printSchema()
df_employees.show(10)
Salvando na camada Bronze com o formato Delta (Employees)
bronze_employees = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/employees'
df_employees.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_employees)
OrderItems
Gerando um dataframe (OrderItems)
df_orderItems = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/orderItems.parquet', format='parquet')
df_orderItems.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (OrderItems)
from pyspark.sql.functions import current_timestamp, lit
df_orderItems = df_orderItems.withColumn("data_hora_bronze", current_timestamp())
df_orderItems = df_orderItems.withColumn("nome_arquivo", lit("orderItems.parquet"))
df_orderItems.printSchema()
df_orderItems.show(10)
Salvando na camada Bronze com o formato Delta (OrderItems)
bronze_orderItems = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/orderItems'
df_orderItems.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_orderItems)
Orders
Gerando um dataframe (Orders)
df_orders = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/orders.parquet', format='parquet')
df_orders.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (Orders)
from pyspark.sql.functions import current_timestamp, lit
df_orders = df_orders.withColumn("data_hora_bronze", current_timestamp())
df_orders = df_orders.withColumn("nome_arquivo", lit("orders.parquet"))
df_orders.printSchema()
df_orders.show(10)
Salvando na camada Bronze com o formato Delta (Orders)
bronze_orders = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/orders'
df_orders.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_orders)
DepartmentProducts
Gerando um dataframe (DepartmentProducts)
df_departmentProducts = spark.read.load('abfss://landing-zone@datalakeengdados.dfs.core.windows.net/departmentProducts.parquet', format='parquet')
df_departmentProducts.printSchema()
Adicionando metadados de data e hora de processamento e nome do arquivo de origem (DepartmentProducts)
from pyspark.sql.functions import current_timestamp, lit
df_departmentProducts = df_departmentProducts.withColumn("data_hora_bronze", current_timestamp())
df_departmentProducts = df_departmentProducts.withColumn("nome_arquivo", lit("departmentProducts.parquet"))
df_departmentProducts.printSchema()
df_departmentProducts.show(10)
Salvando na camada Bronze com o formato Delta (DepartmentProducts)
bronze_departmentProducts = 'abfss://bronze@datalakeengdados.dfs.core.windows.net/departmentProducts'
df_departmentProducts.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_departmentProducts)
Validando a existência dos dados e das colunas dos metadados no Delta Lake
df_products = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/products', format='delta')
display(df_products.limit(2))