Generate synthetic test data using Mimesis and python/pyspark

photo of people on building under construction
Photo by Igor Starkov on Pexels.com

So, you’ve built an ETL or ELT process using pyspark and some bits of test data, but how can you give it a real stress test with lots of data? Manually generating lots of test data would be a huge pain, surely there’s something already out there we can use? Yes there is, say hello to Mimesis your friendly neighbourhood fake data generator for python! This lovely library can generate test data very quickly and easily which you can use to give you process a real performance/stress test 😉

In this little example I’m going to be using a pyspark notebook in Microsoft Fabric, but the code will be pretty much the same in databricks or Azure Synapse. First we need to install mimesis, current version at the time of writing is 18.0.0…

%pip install mimesis==18.0.0 # for specifying the version explicitly

or

%pip install mimesis # install latest version

Then we need to specify a few imports we’ll need…

from mimesis.enums import TimestampFormat
from mimesis.locales import Locale
from mimesis.keys import maybe
from mimesis.schema import Field, Schema
hands holding banner with question
Photo by Jeff Stapleton on Pexels.com

Ok, now for a simple example, this pretty much copied and pasted from Mimesis docs… Although here I’ve added the output lines to write CSV and JSON files to the lakehouse file storage. Note the iterations, which is currently set to 100, but I’ve tried this with millions and it works great. This way you could use this to build a number of files to test your import process…

# Define field locale
field = Field(locale = Locale.EN)

# Define data schema
schema = Schema(
    schema = lambda: {
        "pk": field("increment"),
        "name": field("text.word", key=maybe("N/A", probability=0.2)),
        "version": field("version"),
        "timestamp": field("timestamp", TimestampFormat.RFC_3339),
    },
    iterations=100
)

# Output to required file formats
schema.to_csv(file_path = "/lakehouse/default/Files/mimesis_test.csv")
schema.to_json(file_path = "/lakehouse/default/Files/mimesis_test.json")

Note – the ‘/lakehouse/default/Files/‘ bits in the file paths above are Microsoft Fabric specific, if you’re using something like Azure Synapse Spark then you’d use an ABFSS path, probably something like ‘abfss://your-container-name-here@dfs.core.windows.net/the/folder/path/to/your/file’ 😜

letters on dice
Photo by Ann H on Pexels.com

What, you want more? 😱

Ok… so the above code works ok, but what if you need to build several files with different structures? You’d need to write out the same block of code each time, hard coding the schema. What if we could make it a little more dynamic? Well, we can… lets build a little function where we can pass in a schema definition in JSON format and it will return a schema for us to use…

def generate_schema(column_definitions_json):
    # Define our field builder
    field_builder = Field(locale = Locale.EN) 

    # Use function to generate field definition for schema
    def generate_field_definition(data):
        if data["format"] is None:
            return field_builder(data["type"])
        else:
            return field_builder(data["type"], data["format"])

    # Loop through all fields and generate definitions for the schema
    return lambda: {field: generate_field_definition(config) for field, config in column_definitions_json.items()}

Obviously this is a little basic and could be improved, but the basic idea is that if you’ve got a large list of incoming files and you know their structures… then its also likely you’ve got all the structure definition (i.e. which columns are in which table/file) listed in a database or a file somewhere 😜

So, naturally we’d read the structures for each file we expect from the database and then pass those details to this function and it will generate the schema for each test data file…

white rabbit wearing yellow eyeglasses
Photo by Anna Shvets on Pexels.com

For example, imagine we’d read a file definition (i.e. the columns and their data types) from a database table somewhere, then we could use that definition as shown below. Although note that in the example below I’ve skipped the part of pulling it from a database table and instead just provided a ‘sample’ output in JSON format (the ‘json.loads’ bit 😜).

import json

# These are the fields we want 
column_definitions = json.loads("""
{ 
    "name": {"type": "name", "format": "None"},
    "other": {"type": "uuid", "format": "None"},
    "some_date": {"type": "timestamp", "format": "TimestampFormat.RFC_3339"},
    "some_word": {"type": "text.word", "format": "key = maybe('N/A', probability = 0.2)"},
    "some_version": {"type": "version", "format": "None"},
    "some_identity": {"type": "increment", "format": "None"}
}
""")

# Generate schema and data
schema = Schema(schema = generate_schema(column_definitions), iterations = 10)

# Write data to CSV file Files
schema.to_csv(file_path = f"/lakehouse/default/Files/my_test_file.csv")

# We would/could then read it back somewhere, for example...
df = spark.read.format("csv").option("header","true").load(f"Files/my_test_file.csv")
df.show(10)

Running the above saves the file, then reads it back in and show the top 10 rows, which outputs the following…

a person writing on a sticky note
Photo by Eden Constantino on Pexels.com

That’s it really… just needs expanding on for your own purposes… So, till next time 🙌