Processing Messagages from the Bus

With data available as Parquet files. we can now start to solve the first question: “How many people did something in Fedora on this date?” To do this, we will need to apply some preprocessing to this data to make it usable.

  1. For each message based on topic, we need to extract a username or a FAS ID. Each topic will handle this differently.

  2. Some messages on the bus are malformed or stored with invalid JSON characters. We will need to remove them.

  3. Some messages are system to system messages and have no relevance, we need to remove those.

  4. We need a set of files which have just the timestamp, the ID of the message for debugging, the topic so we know what the activity was, and the identified username.

As a note, this doesn’t solve for when a message contains multiple participants – for example MeetBot will share anyone who talked in a Matrix chat as a participant, but for this example, we are just extracting the messages where we can identify a single user. Another script similar to this to topics which need to be broken out could be built to accomplish this for topics which need to process differently.

The below example code is not fully tested or considered complete – some examples I’ve identified where topics return a full URL instead of the Username string (https://src.fedoraproject.org/user/rwright instead of rwright as an example) but this is a way to start.

import os
import ast
import pandas as pd
import duckdb
import json

# Define the safe JSON format function
def safe_json_format(json_str):
    try:
        # Safely evaluate the string to a Python object
        python_obj = ast.literal_eval(json_str)
        # Convert the Python object to a JSON-formatted string
        json_str = json.dumps(python_obj)
        return json_str
    except (ValueError, SyntaxError):
        # Return None if parsing fails
        return None

# Open a DuckDB connection (use in-memory or specify a file path for persistence)
con = duckdb.connect(database=':memory:')

# Loop over each parquet file
for file_name in sorted([f for f in os.listdir(folder_path) if f.endswith('.parquet')]):
    file_path = os.path.join(folder_path, file_name)
    
    # Load the parquet file with pandas
    df = pd.read_parquet(file_path)
    
    # Preprocess headers and body columns to handle single-quoted JSON
    df['headers'] = df['headers'].apply(safe_json_format)
    df['body'] = df['body'].apply(safe_json_format)
    
    # Register the DataFrame in DuckDB
    con.register('parquet_data', df)

    # Query to extract 'sent-at', 'topic', 'body', and 'username'
    query = """
        SELECT 
            CAST(json_extract(headers, '$.sent-at') AS TIMESTAMP) AS sent_at,
            id,
            topic,
            replace(
                CASE
                WHEN topic LIKE 'org.fedoraproject.prod.badges.badge.award%' THEN json_extract(body, '$.user.username')
                WHEN topic LIKE 'org.fedoraproject.prod.fedbadges%' THEN json_extract(body, '$.user.username')
                WHEN topic LIKE 'org.fedoraproject.prod.discourse.like%' THEN json_extract(body, '$.webhook_body.like.post.username')
                WHEN topic LIKE 'org.fedoraproject.prod.discourse.post%' THEN json_extract(body, '$.webhook_body.post.username')
                WHEN topic LIKE 'org.fedoraproject.prod.discourse.solved%' THEN json_extract(body, '$.webhook_body.solved.username')
                WHEN topic LIKE 'org.fedoraproject.prod.discourse.topic%' THEN json_extract(body, '$.webhook_body.topic.created_by.username')
                WHEN topic LIKE 'org.fedoraproject.prod.mailman%' THEN json_extract(body, '$.msg.from')
                WHEN topic LIKE 'org.fedoraproject.prod.planet%' THEN json_extract(body, '$.username')
                WHEN topic LIKE 'org.fedoraproject.prod.git%' THEN json_extract(body, '$.commit.username')
                WHEN topic LIKE 'org.fedoraproject.prod.fas%' THEN json_extract(body, '$.msg.user')
                WHEN topic LIKE 'org.fedoraproject.prod.openqa%' THEN json_extract(body, '$.user')
                WHEN topic LIKE 'org.fedoraproject.prod.bodhi.buildroot%' THEN json_extract(body, '$.override.submitter.name')
                WHEN topic LIKE 'org.fedoraproject.prod.bodhi.update.comment%' THEN json_extract(body, '$.comment.user.name')
                WHEN topic LIKE 'org.fedoraproject.prod.bodhi%' THEN json_extract(body, '$.update.user.name')
                WHEN topic LIKE 'org.fedoraproject.prod.bugzilla%' THEN json_extract(body, '$.event.user.login')
                WHEN topic LIKE 'org.fedoraproject.prod.waiver%' THEN json_extract(body, '$.username')
                WHEN topic LIKE 'org.fedoraproject.prod.fmn%' THEN json_extract(body, '$.user.name')
                WHEN topic LIKE 'org.fedoraproject.prod.buildsys%' THEN json_extract(body, '$.owner')
                WHEN topic LIKE 'org.fedoraproject.prod.copr%' THEN json_extract(body, '$.user')
                WHEN topic LIKE 'io.pagure.prod.pagure%' THEN json_extract(body, '$.agent')
                WHEN topic LIKE 'org.fedoraproject.prod.pagure.commit.flag%' THEN json_extract(body, '$.flag.user.name')
                WHEN topic LIKE 'org.centos.sig.integration.gitlab.redhat.centos-stream%' THEN json_extract(body, '$.user.name')
                WHEN topic LIKE 'org.fedoraproject.prod.wiki%' THEN json_extract(body, '$.user')
                WHEN topic LIKE 'org.release-monitoring.prod.anitya.%' THEN json_extract(body, '$.message.agent')
                WHEN topic LIKE 'org.fedoraproject.prod.maubot.cookie.give.%' THEN json_extract(body, '$.sender')
                WHEN topic LIKE 'org.fedoraproject.prod.kerneltest.upload.new%' THEN json_extract(body, '$.agent')
                WHEN topic LIKE 'org.fedoraproject.prod.fedocal%' THEN json_extract(body, '$.agent')
                WHEN topic LIKE 'org.centos.prod.buildsys%' THEN json_extract(body, '$.owner')
                WHEN topic LIKE 'org.fedoraproject.prod.badges.person.rank.advance%' THEN json_extract(body, '$.person.nickname')
                ELSE NULL
            END::TEXT, '"', ''
        ) AS username
        FROM parquet_data
        WHERE headers IS NOT NULL AND body IS NOT NULL
    """
    
    # Execute the query and fetch the result as a DataFrame
    result_df = con.execute(query).fetchdf()

    # Remove rows with empty 'username'
    result_df = result_df.query("username != ''").dropna(subset=['username'])

    # Define the output parquet path with the original parquet filename
    output_parquet = os.path.join(output_folder, f"{os.path.splitext(file_name)[0]}_processed.parquet")

    # Write the result to a parquet file
    result_df.to_parquet(output_parquet, index=False)
    print(f"Data written to {output_parquet} successfully.")

    # Clear the view for the next file
    con.unregister('parquet_data')

print("Data processed and written to individual parquet files successfully.")

# Close the DuckDB connection
con.close()