作为一名数据科学家,我们对自己的工作感到无比的自豪。我们可以通过日志、测试编写良好的模块化代码,确保代码按正常运行。

但不幸的是,无论我们在设计上花费多少心思,都不可能得到完美的数据提取(Extraction)转换(Transformation)和加载( Loading)管道

事实上,好的代码有时也无法正常运行。即使我们观察得非常仔细,也无法察觉一些因素,必须花费大量的时间和精力,才能知道哪里有问题。

但是,如果我告诉你,有一种方法可以帮你知道哪部分有问题,可以避免出错。有一种工具,在编码运行良好时是不可见的,但在编程运行错误时会自动出现?如果我说这样的工具是存在的,可以帮助你确保代码运行成功。你会怎么看?

欢迎来到Perfect的世界。

Prefect 可帮助你熟练地且透明化地处理工作流管理。Prefect包含各种功能,可以让你的代码具备运行成功或运行失败所需的所有条件。

本文将讨论学习如何使用 Prefect ,以及构建简单 ETL 管道的一些基本概念。

让我们开始吧

安装 Prefect ,并了解一些简单的定义

通过Pip即可安装Prefect :

pip install prefect # please do it inside a virtual environment

安装过程还需要 requests 和 pandas 库。操作如下:

pip install requests pandas # also inside a virtual environment

从 Prefect 文档中可以看出:

任务(Tasks)本质上是:

  • Perfect任务是有关何时运行有特殊规则的函数:这些函数选择性接受输入,执行操作,并可以选择返回输出。

所以任务的目的是执行操作,或执行部分操作。

在 Prefect 中,工作流被定义为“流”(Flows)

  • 工作流(或“流”)是任务的容器。流表示任务之间的依赖结构,但不执行任何逻辑。

因此,使用“流”(Flows),你可以将一些任务(Tasks)组合在一起,执行一组特定的所需功能,这更像是一个任务管道。

好的,现在我们以及基本了解了这些内容,接下来让我们看看如何创建任务。

创建任务

这个教程的目标很简单,目的是在 Random user Free API 的帮助下,获取用户列表,并在工作流运行时将列表保存至新的csv文件中。

听起来简单吧?

那么,我们来看看需要构建哪些组件。

首先,将数据导入库,不需要很多数据:

import requests
import pandas as pd
from prefect import task, Flow
import json

首先,我们将工作流划分为三个任务(功能)

  • 提取(Extract)

从该函数的 API 中获取一定数量的用户。

使用 requests get 函数获取随机用户列表。

@task
def extract(url_from):
    response = requests.get(url_from)
    if response:
        return json.loads(response.content)["results"]
    else:
        print("No response gotten.") 

注意到上面的@task了吗?这就是将函数变成任务所需的全部内容。

下面是用于将用户 json 转换为数据帧的函数。

  •  转换(Transform)

转换(Transform)就是将用户的 JSON 字段转换为包含个人字典的people_ list,其中包含每个人的数据。

我们将从所有people的回答中提取三个特征:

  • 1. 姓名 = 名字 + 姓氏
  • 2. 性别
  • 3. 国籍
@task
def transform(data_dict):
    people_list = []
    for person in data_dict:
        single_item = {
        'gender': person["gender"],
         "name": person["name"]["title"] + person["name"]["first"]  + person["name"]["last"],
"nat": "AU",
         }
        people_list.append(single_item)
    # return dataframe from list of dicts
    return pd.DataFrame(people_list)

最后,我们会用到Load 函数。

  • 加载(Load)

此函数的任务非常简单——将 csv 保存至本地目录。

我们指定“data_df”,即我们在转换步骤中构建的数据帧,以及提供我们保存 csv 的文件名,作为此函数的参数。

@task
def load(data_df, filename):
    data_df.to_csv(f"{filename}.csv", index=False)

下一步,从中构建一个完整的工作流程。

构建工作流程

现在,我们已经定义了可以单独执行任务的函数。现在,我们想将这些函数放在一起,并且建立一个管道,即ETL 管道。

我们用 Flow 执行此操作。

def start_data_collection(num_people_to_fetch):
    with Flow("Random User API ETL:") as flow:
    # get specific number of people profiles upon each request
    people = extract(f'https://randomuser.me/api/?inc=gender,name,nat&results={num_people_to_fetch}')
    # make a dataframe out of the response
    user_df = transform(people)
    # save the dataframe formed to disk
    load(user_df, f'{num_people_to_fetch}_people')
    return flow

具体步骤如下:

  • 创建新的Flow 对象,并命名。
  • 通过提取方法启动管道,获取用户。
  • 将这些用户转化为 Pandas数据框(DataFrame)
  • 最后,我们将数据框(DataFrame)保存为csv 文件,并以指定文件名作为参数

完成以上操作后,返回至 Flow 对象。

与按顺序调用函数不同,这一步非常简单——在Flow 对象内部执行相同操作。

测试ETL 工作流程

最后一步,是测试我们构建的内容。通过简单的主函数运行流对象:

if __name__ == "__main__":
    flow = start_data_collection(3)
    flow.run()

到这里就是全部过程。 

下列是在命令栏中执行此操作的输出结果:

python main.py

显示管道运行成功的终端输出

这是保存到磁盘上的 csv文件:

总结

这是一个简单的 ETL 工作流程,可以帮助你开始使用 Prefect。你可以自己按以上步骤执行此操作,同时帮助你理解其他概念,例如 – 故障检测、重试支持等。

我希望这篇文章容易理解,且对你有所帮助。关注我们,一起学习。:)

原文作者:Yash Prakash
翻译作者:Lia
美工编辑:过儿
校对审稿:Jiawei Tong
原文链接:https://towardsdatascience.com/the-nice-way-to-manage-your-data-science-workflow-7fb92a2ee4a2

北美求职60秒:如何在面试之后进行有效的跟踪?(10月第4周)

Nov 01, 2023

本期话题,带你了解“如何在面试之后进行有效的跟踪?”

AI时代的四大热门技能

Jul 18, 2024

科技专业人士正在尽最大努力判断人工智能(尤其是生成式人工智能)对他们目前工作的影响。

北美求职60秒:BIA和BIE岗位功能有什么不同?(3月第5周)

Apr 07, 2023

本期话题,带你了解“BIA和BIE岗位功能有什么不同?”

Leave a Comment

Your email address will not be published. Required fields are marked *

Comment *