No menu items!

    Constructing Knowledge Pipeline with Prefect

    Date:

    Share post:


    Picture by Writer | Canva

     

    On this tutorial, we are going to study Prefect, a contemporary workflow orchestration device. We are going to begin by constructing an information pipeline with Pandas after which examine it with a Prefect workflow to realize a greater understanding. In the long run, we are going to deploy our workflow and consider run logs on the dashboard.

     

    What’s Prefect?

     

    Prefect is a workflow administration system designed to orchestrate and handle advanced knowledge workflows, together with machine studying (ML) pipelines. It supplies a framework for constructing, scheduling, and monitoring workflows, making it an important device for managing ML operations (MLOps).

    Prefect affords activity and move administration, permitting customers to outline dependencies and execute workflows effectively. With options like state administration and observability, Prefect supplies insights into activity standing and historical past, aiding debugging and optimization. It comes with a extremely interactive dashboard that permits you to schedule, monitor, and combine varied different options that can enhance your workflow for the MLOps pipeline. You’ll be able to even arrange notifications and combine different ML frameworks with just a few clicks. 

    Prefect is offered as an open-source framework and a managed cloud service, simplifying your workflow much more.

     

    Constructing Knowledge Pipeline with Pandas

     

    We are going to replicate the info pipeline that I used within the earlier tutorials (Constructing Knowledge Science Pipelines Utilizing Pandas—KDnuggets) to provide you an concept of how every activity works within the pipeline and mix them. I’m mentioning it right here so as to clearly examine how good knowledge pipelines are totally different from regular pipelines.

    import pandas as pd
    
    def load_data(path):
        return pd.read_csv(path)
    
    def data_cleaning(knowledge):
        knowledge = knowledge.drop_duplicates()
        knowledge = knowledge.dropna()
        knowledge = knowledge.reset_index(drop=True)
        return knowledge
    
    def convert_dtypes(knowledge, types_dict=None):
        knowledge = knowledge.astype(dtype=types_dict)
        ## convert the date column to datetime
        knowledge["Date"] = pd.to_datetime(knowledge["Date"])
        return knowledge
    
    def data_analysis(knowledge):
        knowledge["month"] = knowledge["Date"].dt.month
        new_df = knowledge.groupby("month")["Units Sold"].imply()
        return new_df
    
    def data_visualization(new_df, vis_type="bar"):
        new_df.plot(variety=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
        return new_df
    
    path = "Online Sales Data.csv"
    df = (
        pd.DataFrame()
        .pipe(lambda x: load_data(path))
        .pipe(data_cleaning)
        .pipe(convert_dtypes, {"Product Category": "str", "Product Name": "str"})
        .pipe(data_analysis)
        .pipe(data_visualization, "line")
    )

     

    After we run the above code, every activity will run sequentially and generate the info visualization. Aside from that, it does not do something. We are able to schedule it, view the run logs, and even combine third celebration instruments for notification or monitoring. 

     

    Building Data Pipeline with Prefect

     

    Constructing Knowledge Pipeline with Prefect

     

    Now we are going to construct the identical pipeline with the identical dataset On-line Gross sales Dataset – Fashionable Market Knowledge however with Prefect. We are going to first set up the PRefect library by utilizing the PIP command. 

     

    When you evaluate the code under, you’ll discover that nothing has actually modified. The features are the identical, however with the addition of the Python decorators. Every step within the pipeline has the `@activity` decorator, and the pipeline combining these steps has the `@move` decorator. Moreover, we’re saving the generated determine too. 

    import pandas as pd
    import matplotlib.pyplot as plt
    from prefect import activity, move
    
    @activity
    def load_data(path):
        return pd.read_csv(path)
    
    @activity
    def data_cleaning(knowledge):
        knowledge = knowledge.drop_duplicates()
        knowledge = knowledge.dropna()
        knowledge = knowledge.reset_index(drop=True)
        return knowledge
    
    @activity
    def convert_dtypes(knowledge, types_dict=None):
        knowledge = knowledge.astype(dtype=types_dict)
        knowledge["Date"] = pd.to_datetime(knowledge["Date"])
        return knowledge
    
    @activity
    def data_analysis(knowledge):
        knowledge["month"] = knowledge["Date"].dt.month
        new_df = knowledge.groupby("month")["Units Sold"].imply()
        return new_df
    
    @activity
    def data_visualization(new_df, vis_type="bar"):
    
        new_df.plot(variety=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
        plt.savefig("average_units_sold_by_month.png")
        return new_df
    
    @move(title="Data Pipeline")
    def data_pipeline(path: str):
        df = load_data(path)
        df_cleaned = data_cleaning(df)
        df_converted = convert_dtypes(
            df_cleaned, {"Product Category": "str", "Product Name": "str"}
        )
        analysis_result = data_analysis(df_converted)
        new_df = data_visualization(analysis_result, "line")
        return new_df
    
    # Run the move!
    if __name__ == "__main__":
        new_df = data_pipeline("Online Sales Data.csv")
        print(new_df)

     

    We are going to run our knowledge pipeline by offering the CSV file location. It would carry out all of the steps in sequence and generate logs with the run states. 

    14:18:48.649 | INFO    | prefect.engine - Created move run 'enlightened-dingo' for move 'Knowledge Pipeline'
    14:18:48.816 | INFO    | Move run 'enlightened-dingo' - Created activity run 'load_data-0' for activity 'load_data'
    14:18:48.822 | INFO    | Move run 'enlightened-dingo' - Executing 'load_data-0' instantly...
    14:18:48.990 | INFO    | Process run 'load_data-0' - Completed in state Accomplished()
    14:18:49.052 | INFO    | Move run 'enlightened-dingo' - Created activity run 'data_cleaning-0' for activity 'data_cleaning'
    14:18:49.053 | INFO    | Move run 'enlightened-dingo' - Executing 'data_cleaning-0' instantly...
    14:18:49.226 | INFO    | Process run 'data_cleaning-0' - Completed in state Accomplished()
    14:18:49.283 | INFO    | Move run 'enlightened-dingo' - Created activity run 'convert_dtypes-0' for activity 'convert_dtypes'
    14:18:49.288 | INFO    | Move run 'enlightened-dingo' - Executing 'convert_dtypes-0' instantly...
    14:18:49.441 | INFO    | Process run 'convert_dtypes-0' - Completed in state Accomplished()
    14:18:49.506 | INFO    | Move run 'enlightened-dingo' - Created activity run 'data_analysis-0' for activity 'data_analysis'
    14:18:49.510 | INFO    | Move run 'enlightened-dingo' - Executing 'data_analysis-0' instantly...
    14:18:49.684 | INFO    | Process run 'data_analysis-0' - Completed in state Accomplished()
    14:18:49.753 | INFO    | Move run 'enlightened-dingo' - Created activity run 'data_visualization-0' for activity 'data_visualization'
    14:18:49.760 | INFO    | Move run 'enlightened-dingo' - Executing 'data_visualization-0' instantly...
    14:18:50.087 | INFO    | Process run 'data_visualization-0' - Completed in state Accomplished()
    14:18:50.144 | INFO    | Move run 'enlightened-dingo' - Completed in state Accomplished()

     

    In the long run, you’re going to get the remodeled knowledge body and visualizations. 

     

    Building Data Pipeline with Prefect

     

    Deploying the Prefect Pipeline

     

    To be able to deploy the Prefect pipeline, we have to begin by transferring our codebase to the Python file `data_pipe.py`. After that, we are going to modify how we run our pipeline. We are going to use the `.server` perform to deploy the pipeline and cross the CSV file as an argument to the perform.

    data_pipe.py:

    import pandas as pd
    import matplotlib.pyplot as plt
    from prefect import activity, move
    
    @activity
    def load_data(path: str) -> pd.DataFrame:
        return pd.read_csv(path)
    
    @activity
    def data_cleaning(knowledge: pd.DataFrame) -> pd.DataFrame:
        knowledge = knowledge.drop_duplicates()
        knowledge = knowledge.dropna()
        knowledge = knowledge.reset_index(drop=True)
        return knowledge
    
    @activity
    def convert_dtypes(knowledge: pd.DataFrame, types_dict: dict = None) -> pd.DataFrame:
        knowledge = knowledge.astype(dtype=types_dict)
        knowledge["Date"] = pd.to_datetime(knowledge["Date"])
        return knowledge
    
    @activity
    def data_analysis(knowledge: pd.DataFrame) -> pd.DataFrame:
        knowledge["month"] = knowledge["Date"].dt.month
        new_df = knowledge.groupby("month")["Units Sold"].imply()
        return new_df
    
    @activity
    def data_visualization(new_df: pd.DataFrame, vis_type: str = "bar") -> pd.DataFrame:
        new_df.plot(variety=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
        plt.savefig("average_units_sold_by_month.png")
        return new_df
    
    @activity
    def save_to_csv(df: pd.DataFrame, filename: str):
        df.to_csv(filename, index=False)
        return filename
    
    @move(title="Data Pipeline")
    def run_pipeline(path: str):
        df = load_data(path)
        df_cleaned = data_cleaning(df)
        df_converted = convert_dtypes(
            df_cleaned, {"Product Category": "str", "Product Name": "str"}
        )
        analysis_result = data_analysis(df_converted)
        data_visualization(analysis_result, "line")
        save_to_csv(analysis_result, "average_units_sold_by_month.csv")
    
    # Run the move
    if __name__ == "__main__":
        run_pipeline.serve(
            title="pass-params-deployment",
            parameters=dict(path="Online Sales Data.csv"),
        )

     

     

    After we run the Python file, we are going to obtain the message saying that to run the deployed pipeline, we’ve got to make use of the next command: 

     

    Building Data Pipeline with Prefect

     

    Launch a brand new Terminal window and sort the command to set off the run for this move. 

    $ prefect deployment run 'Knowledge Pipeline/pass-params-deployment'

     

    As we are able to see, move runs have initiated, that means the pipeline is operating within the background. We are able to all the time return to the primary Terminal window to view the logs.

     

    Building Data Pipeline with Prefect

     

    To view the logs within the dashboard, we’ve got to launch the Prefect dashboard by typing the next command: 

     

    Click on on the dashboard hyperlink to launch the dashboard in your internet browser. 

     

    Building Data Pipeline with Prefect

     

    The dashboard consists of assorted tabs and data associated to your pipeline, workflow, and runs. To view the present run, navigate to the “Flow Runs” tab and choose the newest move run.

     

    Building Data Pipeline with Prefect

     

    All of the supply code, knowledge, and data can be found on the Kingabzpro/Knowledge-Pipeline-with-Prefect GitHub repository. Please remember to star ⭐ it.

     

    Conclusion

     

    Constructing a pipeline utilizing the correct instruments is important so that you can scale your knowledge workflow and keep away from pointless hiccups. By utilizing Prefect, you may schedule your runs, debug the pipeline, and combine it with a number of third-party instruments that you’re already utilizing. It’s straightforward to make use of and comes with tons of options that you’ll love. If you’re new to Prefect, I extremely advocate trying out Prefect Cloud. They provide free hours for customers to expertise the cloud platform and change into acquainted with the workflow administration system.
     
     

    Abid Ali Awan (@1abidaliawan) is an authorized knowledge scientist skilled who loves constructing machine studying fashions. Presently, he’s specializing in content material creation and writing technical blogs on machine studying and knowledge science applied sciences. Abid holds a Grasp’s diploma in expertise administration and a bachelor’s diploma in telecommunication engineering. His imaginative and prescient is to construct an AI product utilizing a graph neural community for college students combating psychological sickness.

    Related articles

    AI and the Gig Economic system: Alternative or Menace?

    AI is certainly altering the best way we work, and nowhere is that extra apparent than on this...

    Jaishankar Inukonda, Engineer Lead Sr at Elevance Well being Inc — Key Shifts in Knowledge Engineering, AI in Healthcare, Cloud Platform Choice, Generative AI,...

    On this interview, we communicate with Jaishankar Inukonda, Senior Engineer Lead at Elevance Well being Inc., who brings...

    Technical Analysis of Startups with DualSpace.AI: Ilya Lyamkin on How the Platform Advantages Companies – AI Time Journal

    Ilya Lyamkin, a Senior Software program Engineer with years of expertise in creating high-tech merchandise, has created an...

    The New Black Evaluate: How This AI Is Revolutionizing Style

    Think about this: you are a clothier on a decent deadline, observing a clean sketchpad, desperately making an...