Implementação Prática e Operações CRUD

Para demonstrar o cenário descrito anteriormente na prática, utilizamos a API do PySpark com a biblioteca delta.tables no nosso caderno 03_dml_delta.ipynb.

1. Modelo de Dados (Tabela fato_evento)

Nossa tabela de eventos contém os dados estruturados de lances do jogo: id_evento, id_partida, id_jogador, tipo_evento, minuto_jogo e a flag revisado_var.

2. Inserção Inicial (Create/Insert)

O Delta Lake garante que, mesmo em fluxos de alta velocidade, cada registro seja gravado de forma atômica. Nossos dados iniciais da partida são gravados na camada Bronze.

# Dados iniciais da partida (Simulação: Flamengo x Vasco)
data = [
    (1, 101, 10, "Gol", 15, False),             # Gol do camisa 10
    (2, 101, 5, "Cartão Amarelo", 30, False),    # Falta do volante
    (3, 101, 9, "Gol", 44, False)               # Gol do centroavante
]
columns = ["id_evento", "id_partida", "id_jogador", "tipo_evento", "minuto_jogo", "revisado_var"]

df_eventos = spark.createDataFrame(data, columns)
# Escrita já ocorre no passo de conversão para a camada Bronze

3. Atualização (Update) - A Correção do VAR

O VAR identificou que o primeiro gol não foi do jogador 10, mas sim do jogador 99. Utilizamos a API do Delta para realizar a correção cirúrgica baseada na condição do ID do evento.

from delta.tables import DeltaTable

# Carrega a tabela Delta mapeando o diretório de arquivos
delta_table = DeltaTable.forPath(spark, "s3a://bronze/eventos_delta")

# Realiza o UPDATE
delta_table.update(
    condition="id_evento = 1",
    set={"id_jogador": "99", "revisado_var": "True"}
)

4. Exclusão (Delete) - Gol Anulado

Simulamos a anulação do segundo gol da partida por impedimento do jogador 9 (evento de ID 3).

# Realiza o DELETE com base na condição
delta_table.delete(condition="id_evento = 3")

5. Inserção (Insert / Append) - O Gol do Título

Para completar a tríade de DML, simulamos a inserção de um evento novinho em folha, gerado aos 45 do segundo tempo.

# Inserindo um novo evento no final da partida
novo_evento = [(4, 101, 7, "Gol", 90, False)]
colunas = ["id_evento", "id_partida", "id_jogador", "tipo_evento", "minuto_jogo", "revisado_var"]

df_novo = spark.createDataFrame(novo_evento, colunas)
df_novo.write.format("delta").mode("append").save("s3a://bronze/eventos_delta")

6. Evidência de Execução (O VAR em Ação)

Ao rodar o arquivo 03_dml_delta.ipynb, você verá no próprio console do Jupyter que o evento 3 (anulado) foi removido, o jogador do evento 1 foi atualizado com sucesso e um novo evento 4 foi inserido.

7. Por baixo dos panos (Arquitetura Delta)

A verdadeira mágica do Delta Lake acontece na pasta _delta_log (dentro do bucket no MinIO). Para cada operação CRUD (o INSERT inicial, o UPDATE, o DELETE e o APPEND final), arquivos JSON são gerados mapeando as transações. Isso prova que os arquivos brutos em Parquet não são sobrescritos cegamente, mas sim versionados de forma segura (Transações ACID e Time Travel).