Getting Started

Quickstart

The following example tries to show how several concepts in Fennel come together to solve a problem.

0. Installation

We only need to install Fennel's Python client to run this example, so let's install that first:

1pip install fennel-ai

bash

And while we are at it, let's add all the imports that we will need in the rest of the tutorial:

1from datetime import datetime, timedelta
2from typing import Optional
3
4import pandas as pd
5import requests
6
7from fennel.datasets import dataset, pipeline, field, Dataset, Count, index
8from fennel.featuresets import feature, featureset, extractor
9from fennel.lib import (
10    inputs,
11    outputs,
12    expectations,
13    expect_column_values_to_be_between,
14)
15from fennel.sources import source, Postgres, Snowflake, Kafka, Webhook
16from fennel.testing import MockClient
17
18__owner__ = "[email protected]"

python

1. Data Connectors

Fennel ships with data connectors that know how to talk to all common data sources. The connectors can be defined in code or in Fennel console (not shown here).

1postgres = Postgres.get(name="my_rdbms")
2warehouse = Snowflake.get(name="my_warehouse")
3kafka = Kafka.get(name="my_kafka")
4webhook = Webhook(name="fennel_webhook")

python

2. Datasets

Datasets are the tables that you want to use in your feature pipelines. These are constantly kept fresh as new data arrives from connectors.

1table = postgres.table("product", cursor="last_modified")
2
3
4@source(table, disorder="1d", cdc="append", every="1m", tier="prod")
5@source(webhook.endpoint("Product"), disorder="1d", cdc="append", tier="dev")
6@index
7@dataset
8class Product:
9    product_id: int = field(key=True)
10    seller_id: int
11    price: float
12    desc: Optional[str]
13    last_modified: datetime = field(timestamp=True)
14
15    # Powerful primitives like data expectations for data hygiene
16    @expectations
17    def get_expectations(cls):
18        return [
19            expect_column_values_to_be_between(
20                column="price", min_value=1, max_value=1e4, mostly=0.95
21            )
22        ]
23
24
25# ingesting realtime data from Kafka works exactly the same way
26@source(kafka.topic("orders"), disorder="1h", cdc="append", tier="prod")
27@source(webhook.endpoint("Order"), disorder="14d", cdc="append", tier="dev")
28@dataset
29class Order:
30    uid: int
31    product_id: int
32    timestamp: datetime

python

Fennel also lets you derive more datasets by defining pipelines that transform data across different sources (e.g. s3, kafka, postgres etc.) in the same plane of abstraction. These pipelines are highly declarative, completely Python native, realtime, versioned, are auto backfilled on declaration, and can be unit tested.

1@index
2@dataset
3class UserSellerOrders:
4    uid: int = field(key=True)
5    seller_id: int = field(key=True)
6    num_orders_1d: int
7    num_orders_1w: int
8    timestamp: datetime
9
10    @pipeline
11    @inputs(Order, Product)
12    def my_pipeline(cls, orders: Dataset, products: Dataset):
13        orders = orders.join(products, how="left", on=["product_id"])
14        orders = orders.transform(lambda df: df.fillna(0))
15        orders = orders.drop("product_id", "desc", "price")
16        orders = orders.dropnull()
17        return orders.groupby("uid", "seller_id").aggregate(
18            Count(window="1d", into_field="num_orders_1d"),
19            Count(window="1w", into_field="num_orders_1w"),
20        )

python

3. Featuresets

Featuresets are containers for the features that you want to extract from your datasets. Features, unlike datasets, have no state and are computed on the "read path" (i.e. when you query for them) via arbitrary Python code. Features are immutable to improve reliability.

1@featureset
2class UserSellerFeatures:
3    uid: int = feature(id=1)
4    seller_id: int = feature(id=2)
5    num_orders_1d: int = feature(id=3)
6    num_orders_1w: int = feature(id=4)
7
8    @extractor(depends_on=[UserSellerOrders])
9    @inputs(uid, seller_id)
10    @outputs(num_orders_1d, num_orders_1w)
11    def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
12        df, found = UserSellerOrders.lookup(ts, seller_id=sellers, uid=uids)
13        df = df.fillna(0)
14        df["num_orders_1d"] = df["num_orders_1d"].astype(int)
15        df["num_orders_1w"] = df["num_orders_1w"].astype(int)
16        return df[["num_orders_1d", "num_orders_1w"]]

python

4. Commit

Once datasets/featuresets have been written (or updated), you can commit those definitions by instantiating a client and using it to talk to the server. Since we are not working with a real server, here we use the MockClient to run this example locally.

1# client = Client('<FENNEL SERVER URL>') # uncomment this to use real Fennel server
2client = MockClient()  # comment this line to use a real Fennel server
3client.commit(
4    message="initial commit",
5    datasets=[Order, Product, UserSellerOrders],
6    featuresets=[UserSellerFeatures],
7    tier="dev",
8)

python

Mock Client doesn't support data connectors so we will manually log some data to simulate data flows.

1# create some product data
2now = datetime.utcnow()
3columns = ["product_id", "seller_id", "price", "desc", "last_modified"]
4data = [
5    [1, 1, 10.0, "product 1", now],
6    [2, 2, 20.0, "product 2", now],
7    [3, 1, 30.0, "product 3", now],
8]
9df = pd.DataFrame(data, columns=columns)
10response = client.log("fennel_webhook", "Product", df)
11assert response.status_code == requests.codes.OK, response.json()
12
13columns = ["uid", "product_id", "timestamp"]
14data = [[1, 1, now], [1, 2, now], [1, 3, now]]
15df = pd.DataFrame(data, columns=columns)
16response = client.log("fennel_webhook", "Order", df)
17assert response.status_code == requests.codes.OK, response.json()

python

5. Query

This is the read path of Fennel. You can query for live features (i.e. features using the latest value of all datasets) like this:

1feature_df = client.query(
2    outputs=[
3        UserSellerFeatures.num_orders_1d,
4        UserSellerFeatures.num_orders_1w,
5    ],
6    inputs=[
7        UserSellerFeatures.uid,
8        UserSellerFeatures.seller_id,
9    ],
10    input_dataframe=pd.DataFrame(
11        [[1, 1], [1, 2]],
12        columns=["UserSellerFeatures.uid", "UserSellerFeatures.seller_id"],
13    ),
14)
15
16assert feature_df.columns.tolist() == [
17    "UserSellerFeatures.num_orders_1d",
18    "UserSellerFeatures.num_orders_1w",
19]
20assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1]
21assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1]

python

You can also query for historical values of features at arbitrary timestamps ( often used while creating training datasets or for offline batch inference) like this:

1day = timedelta(days=1)
2
3feature_df = client.query_offline(
4    outputs=[
5        UserSellerFeatures.num_orders_1d,
6        UserSellerFeatures.num_orders_1w,
7    ],
8    inputs=[
9        UserSellerFeatures.uid,
10        UserSellerFeatures.seller_id,
11    ],
12    timestamp_column="timestamps",
13    format="pandas",
14    input_dataframe=pd.DataFrame(
15        [[1, 1, now], [1, 2, now], [1, 1, now - day], [1, 2, now - day]],
16        columns=[
17            "UserSellerFeatures.uid",
18            "UserSellerFeatures.seller_id",
19            "timestamps",
20        ],
21    ),
22)
23
24assert feature_df.columns.tolist() == [
25    "UserSellerFeatures.num_orders_1d",
26    "UserSellerFeatures.num_orders_1w",
27    "timestamps",
28]
29assert feature_df["UserSellerFeatures.num_orders_1d"].tolist() == [2, 1, 0, 0]
30assert feature_df["UserSellerFeatures.num_orders_1w"].tolist() == [2, 1, 0, 0]

python

Query requests can be made over REST API from any language/tool which makes it easy to ship features to production servers.

On This Page

Edit this Page on Github