Commit

Sends the local dataset and featureset definitions to the server for verification, storage and processing.

Parameters

message:str

Human readable description of the changes in the commit - akin to the commit message in a git commit.

datasets:List[Dataset]

Default: []

List of dataset objects to be committed.

featuresets:List[Featureset]

Default: []

List of featureset objects to be committed.

preview:bool

Default: False

If set to True, server only provides a preview of what will happen if commit were to be done but doesn't change the state at all.

Note

Since preview's main goal is to check the validity of old & new definitions, it only works with real client/server and mock client/server ignore it.

incremental:bool

Default: False

If set to True, Fennel assumes that only those datasets/featuresets are provided to commit operation that are potentially changing in any form. Any previously existing datasets/featuresets that are not included in the commit operation are left unchanged.

env:Optional[str]

Default: None

Selector to optionally commit only a subset of sources, pipelines and extractors - those with matching values. Rules of selection:

  • If env is None, all objects are selected
  • If env is not None, an object is selected if its own selector is either None or same as env or is ~x for some other x
1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3from fennel.featuresets import feature as F, featureset, extractor
4
5webhook = Webhook(name="some_webhook")
6
7@source(
8    webhook.endpoint("endpoint1"),
9    disorder="14d",
10    cdc="upsert",
11    env="bronze",
12)
13@source(
14    webhook.endpoint("endpoint2"),
15    disorder="14d",
16    cdc="upsert",
17    env="silver",
18)
19@dataset(index=True)
20class Transaction:
21    txid: int = field(key=True)
22    amount: int
23    timestamp: datetime
24
25@featureset
26class TransactionFeatures:
27    txid: int
28    amount: int = F(Transaction.amount, default=0)
29    amount_is_high: bool
30
31    @extractor(env="bronze")
32    def some_fn(cls, ts, amount: pd.Series):
33        return amount.apply(lambda x: x > 100)
34
35client.commit(
36    message="transaction: add transaction dataset and featureset",
37    datasets=[Transaction],
38    featuresets=[TransactionFeatures],
39    preview=False,  # default is False, so didn't need to include this
40    env="silver",
41)
Silver source and no extractor are committed

python

1from fennel.datasets import dataset, field, index
2from fennel.connectors import source, Webhook
3from fennel.featuresets import featureset, feature, extractor
4from fennel.lib import inputs, outputs
5
6webhook = Webhook(name="some_webhook")
7
8@source(webhook.endpoint("endpoint"), disorder="14d", cdc="upsert")
9@index
10@dataset
11class Transaction:
12    txid: int = field(key=True)
13    amount: int
14    timestamp: datetime
15
16client.commit(
17    message="transaction: add transaction dataset",
18    datasets=[Transaction],
19    incremental=False,  # default is False, so didn't need to include this
20)
21
22@featureset
23class TransactionFeatures:
24    txid: int
25    amount: int = feature(Transaction.amount, default=0)
26    amount_is_high: bool
27
28    @extractor(env="bronze")
29    @inputs("amount")
30    @outputs("amount_is_high")
31    def some_fn(cls, ts, amount: pd.Series):
32        return amount.apply(lambda x: x > 100)
33
34client.commit(
35    message="transaction: add transaction featureset",
36    datasets=[],  # note: transaction dataset is not included here
37    featuresets=[TransactionFeatures],
38    incremental=True,  # now we set incremental to True
39)
Second commit adds a featureset & leaves dataset unchanged

python

Log

Method to push data into Fennel datasets via webhook endpoints.

Parameters

webhook:str

The name of the webhook source containing the endpoint to which the data should be logged.

endpoint:str

The name of the webhook endpoint to which the data should be logged.

df:pd.Dataframe

The dataframe containing all the data that must be logged. The column of the dataframe must have the right names & types to be compatible with schemas of datasets attached to the webhook endpoint.

batch_size:int

Default: 1000

To prevent sending too much data in one go, Fennel client divides the dataframe in chunks of batch_size rows each and sends each chunk one by one.

Note that Fennel servers provides atomicity guarantee for any call of log - either the whole data is accepted or none of it is. However, breaking down a dataframe in chunks can lead to situation where some chunks have been ingested but others weren't.

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18    "some_webhook",
19    "some_endpoint",
20    df=pd.DataFrame(
21        columns=["uid", "amount", "timestamp"],
22        data=[
23            [1, 10, "2021-01-01T00:00:00"],
24            [2, 20, "2021-02-01T00:00:00"],
25        ],
26    ),
27)
Logging data to webhook via client

python

Errors

Invalid webhook endpoint:

Fennel will throw an error (equivalent to 404) if no endpoint with the given specification exists.

Schema mismatch errors:

There is no explicit schema tied to a webhook endpoint - the schema comes from the datasets attached to it. As a result, the log call itself doesn't check for schema mismatch but later runtime errors may be generated async if the logged data doesn't match the schema of the attached datasets.

You may want to keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Query

Method to query the latest value of features (typically for online inference).

Parameters

inputs:List[Union[Feature, str]]

List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.

outputs:List[Union[Featureset, Feature, str]]

List of features that need to be queries. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queries) or strings representing fully qualified feature names.

input_dataframe:pd.Dataframe

A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.

log:bool

Default: False

Boolean which indicates if the queried features should also be logged (for log-and-wait approach to training data generation).

workflow:str

Default: 'default'

The name of the workflow associated with the feature query. Only relevant when log is set to True, in which case, features associated with the same workflow are collected together. Useful if you want to separate logged features between, say, login fraud and transaction fraud.

sampling_rate:float

Default: 1.0

The rate at which feature data should be sampled before logging. Only relevant when log is set to True.

1from fennel.featuresets import featureset, extractor
2from fennel.lib import inputs, outputs
3
4@featureset
5class Numbers:
6    num: int
7    is_even: bool
8    is_odd: bool
9
10    @extractor
11    @inputs("num")
12    @outputs("is_even", "is_odd")
13    def my_extractor(cls, ts, nums: pd.Series):
14        is_even = nums.apply(lambda x: x % 2 == 0)
15        is_odd = is_even.apply(lambda x: not x)
16        return pd.DataFrame({"is_even": is_even, "is_odd": is_odd})
17
18client.commit(message="some commit msg", featuresets=[Numbers])
19
20# now we can query the features
21feature_df = client.query(
22    inputs=[Numbers.num],
23    outputs=[Numbers.is_even, Numbers.is_odd],
24    input_dataframe=pd.DataFrame({"Numbers.num": [1, 2, 3, 4]}),
25)
26
27pd.testing.assert_frame_equal(
28    feature_df,
29    pd.DataFrame(
30        {
31            "Numbers.is_even": [False, True, False, True],
32            "Numbers.is_odd": [True, False, True, False],
33        }
34    ),
35)
Querying two features

python

Returns

type:Union[pd.Dataframe, pd.Series]

Returns the queried features as dataframe with one column for each feature in outputs. If a single output feature is requested, features are returned as a single pd.Series. Note that input features aren't returned back unless they are also present in the outputs

Errors

Unknown features:

Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.

Resolution error:

An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.

Schema mismatch errors:

Fennel raises a run-time error if any extractor returns a value of the feature that doesn't match its stated type.

Authorization error:

Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.

Query Offline

Method to query the historical values of features. Typically used for training data generation or batch inference.

Parameters

inputs:List[Union[Feature, str]]

List of features to be used as inputs to query. Features should be provided either as Feature objects or strings representing fully qualified feature names.

outputs:List[Union[Featureset, Feature, str]]

List of features that need to be queried. Features should be provided either as Feature objects, or Featureset objects (in which case all features under that featureset are queried) or strings representing fully qualified feature names.

format:"pandas" | "csv" | "json" | "parquet"

Default: pandas

The format of the input data

input_dataframe:pd.Dataframe

A pandas dataframe object that contains the values of all features in the inputs list. Each row of the dataframe can be thought of as one entity for which features need to be queried.

Only relevant when format is "pandas".

input_s3:Optional[connectors.S3]

Sending large volumes of the input data over the wire is often infeasible. In such cases, input data can be written to S3 and the location of the file is sent as input_s3 via S3.bucket() function of S3 connector.

This parameter makes sense only when format isn't "pandas".

When using this option, please ensure that Fennel's data connector IAM role has the ability to execute read & list operations on this bucket - talk to Fennel support if you need help.

timestamp_column:str

The name of the column containing the timestamps as of which the feature values must be computed.

output_s3:Optional[connectors.S3]

Specifies the location & other details about the s3 path where the values of all the output features should be written. Similar to input_s3, this is provided via S3.bucket() function of S3 connector.

If this isn't provided, Fennel writes the results of all requests to a fixed default bucket - you can see its details from the return value of query_offline or via Fennel Console.

When using this option, please ensure that Fennel's data connector IAM role has write permissions on this bucket - talk to Fennel support if you need help.

feature_to_column_map:Optional[Dict[Feature, str]]

Default: None

When reading input data from s3, sometimes the column names in s3 don't match one-to-one with the names of the input features. In such cases, a dictionary mapping features to column names can be provided.

This should be setup only when input_s3 is provided.

Returns

type:Dict[str, Any]

Immediately returns a dictionary containing the following information:

  • request_id - a random uuid assigned to this request. Fennel can be polled about the status of this request using the request_id
  • output s3 bucket - the s3 bucket where results will be written
  • output s3 path prefix - the prefix of the output s3 bucket
  • completion rate - progress of the request as a fraction between 0 and 1
  • failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
  • status - the overall status of this request

A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.

Errors

Unknown features:

Fennel will throw an error (equivalent to 404) if any of the input or output features doesn't exist.

Resolution error:

An error is raised when there is absolutely no way to go from the input features to the output features via any sequence of intermediate extractors.

Schema mismatch errors:

Fennel raises a run-time error and may register failure on a subset of rows if any extractor returns a value of the feature that doesn't match its stated type.

Authorization error:

Fennel checks that the passed token has sufficient permissions for each of the features/extractors - including any intermediate ones that need to be computed in order to resolve the path from the input features to the output features.

Request

Response

1response = client.query_offline(
2    inputs=[Numbers.num],
3    outputs=[Numbers.is_even, Numbers.is_odd],
4    format="pandas",
5    input_dataframe=pd.DataFrame(
6        {"Numbers.num": [1, 2, 3, 4]},
7        {
8            "timestamp": [
9                datetime.now(timezone.utc) - HOUR * i for i in range(4)
10            ]
11        },
12    ),
13    timestamp_column="timestamp",
14)
15print(response)
Example with `format='pandas'` & default s3 output

python

1from fennel.connectors import S3
2
3s3 = S3(
4    name="extract_hist_input",
5    aws_access_key_id="<ACCESS KEY HERE>",
6    aws_secret_access_key="<SECRET KEY HERE>",
7)
8s3_input_connection = s3.bucket("bucket", prefix="data/user_features")
9s3_output_connection = s3.bucket("bucket", prefix="output")
10
11response = client.query_offline(
12    inputs=[Numbers.num],
13    outputs=[Numbers.is_even, Numbers.is_odd],
14    format="csv",
15    timestamp_column="timestamp",
16    input_s3=s3_input_connection,
17    output_s3=s3_output_connection,
18)
Example specifying input and output s3 buckets

python

track_offline_query

Track Offline Query

Method to monitor the progress of a run of offline query.

Parameters

request_id:str

The unique request ID returned by the query_offline operation that needs to be tracked.

Returns

type:Dict[str, Any]

Immediately returns a dictionary containing the following information:

  • request_id - a random uuid assigned to this request. Fennel can be polled about the status of this request using the request_id
  • output s3 bucket - the s3 bucket where results will be written
  • output s3 path prefix - the prefix of the output s3 bucket
  • completion rate - progress of the request as a fraction between 0 and 1
  • failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
  • status - the overall status of this request

A completion rate of 1.0 indicates that all processing has been completed. A completion rate of 1.0 and failure rate of 0 means that all processing has been completed successfully.

Request

Response

1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.track_offline_query(request_id)
3print(response)
Checking progress of a prior extract historical request

python

cancel_offline_query

Cancel Offline Query

Method to cancel a previously issued query_offline request.

Parameters

request_id:str

The unique request ID returned by the query_offline operation that needs to be canceled.

Request

Response

1request_id = "bf5dfe5d-0040-4405-a224-b82c7a5bf085"
2response = client.cancel_offline_query(request_id)
3print(response)
Canceling offline query with given ID

python

Returns

type:Dict[str, Any]

Marks the request for cancellation and immediately returns a dictionary containing the following information:

  • request_id - a random uuid assigned to this request. Fennel can be polled about the status of this request using the request_id
  • output s3 bucket - the s3 bucket where results will be written
  • output s3 path prefix - the prefix of the output s3 bucket
  • completion rate - progress of the request as a fraction between 0 and 1
  • failure rate - fraction of the input rows (between 0-1) where an error was encountered and output features couldn't be computed
  • status - the overall status of this request

A completion rate of 1.0 indicates that all processing had been completed. A completion rate of 1.0 and failure rate of 0 means that all processing had been completed successfully.

Lookup

Method to lookup rows of keyed datasets.

Parameters

dataset:Union[str, Dataset]

The name of the dataset or Dataset object to be looked up.

keys:List[Dict[str, Any]]

List of dict where each dict contains the value of the key fields for one row for which data needs to be looked up.

fields:List[str]

The list of field names in the dataset to be looked up.

timestamps:List[Union[int, str, datetime]]

Default: None

Timestamps (one per row) as of which the lookup should be done. If not set, the lookups are done as of now (i.e the latest value for each key).

If set, the length of this list should be identical to that of the number of elements in the keys.

Timestamp itself can either be passed as datetime or str (e.g. by using pd.to_datetime or int denoting seconds/milliseconds/microseconds since epoch).

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16# log some rows to the webhook
17client.log(
18    "some_webhook",
19    "some_endpoint",
20    pd.DataFrame(
21        data=[
22            [1, 10, "2021-01-01T00:00:00"],
23            [2, 20, "2021-02-01T00:00:00"],
24        ],
25        columns=["uid", "amount", "timestamp"],
26    ),
27)
28
29# now do a lookup to verify that the rows were logged
30keys = pd.DataFrame({"uid": [1, 2, 3]})
31ts = [
32    datetime(2021, 1, 1, 0, 0, 0),
33    datetime(2021, 2, 1, 0, 0, 0),
34    datetime(2021, 3, 1, 0, 0, 0),
35]
36response, found = client.lookup(
37    "Transaction",
38    keys=keys,
39    timestamps=pd.Series(ts),
40)
Example of doing lookup on dataset

python

init_branch

Init Branch

Creates a new empty branch and checks out the client to point towards it.

Parameters

name:str

The name of the branch that should be created. The name can consist of any alpha numeric character [a-z, A-Z, 0-9] as well as slashes "/", hyphens "-", underscores "_", and periods "."

Errors

Invalid name:

Raises an error if the name of the branch contains invalid characters.

Branch already exists:

Raises an error if a branch of the same name already exists.

Invalid auth token:

Raises an error if the auth token isn't valid. Not applicable to the mock client.

Insufficient permissions:

Raises an error if the account corresponding to the auth token doesn't carry the permission to create a new branch. Not applicable to the mock client.

1client.init_branch("mybranch")
2
3# init checks out client to the new branch
4# so this commit (or any other operations) will be on `mybranch`
5client.commit(...)
Create a new empty branch 'mybranch'

python

clone_branch

Clone Branch

Clones an existing branch into a new branch and checks out the client to point towards it.

Parameters

name:str

The name of the new branch that should be created as a result of the clone. The name can consist of any ASCII characters.

from_name:str

The name of the existing branch that should be cloned into the new branch.

Errors

Destination branch already exists:

Raises an error if a branch of the same name already exists.

Source branch does not exist:

Raises an error if there is no existing branch of the name from_branch.

Invalid auth token:

Raises an error if the auth token isn't valid. Not applicable to the mock client.

Insufficient permissions:

Raises an error if the account corresponding to the auth token doesn't carry permissions to create a new branch. Also raises an error if the token doesn't have access to entities defined in the source branch. Auth/permissions checks are not applicable to the mock client.

1client.init_branch("base")
2# do some operations on `base` branch
3client.commit(...)
4
5# clone `base` branch to `mybranch`
6client.clone_branch("mybranch", "base")
7
8# clone checks out client to the new branch
9# so this commit (or any other operations) will be on `mybranch`
10client.commit(...)
Clone 'base' branch into 'mybranch'

python

delete_branch

Delete Branch

Deletes an existing branch and checks out the client to point to the main branch.

Parameters

name:str

The name of the existing branch that should be deleted.

Errors

Branch does not exist:

Raises an error if a branch of the given name doesn't exist.

Invalid auth token:

Raises an error if the auth token isn't valid. Not applicable to the mock client.

Insufficient permissions:

Raises an error if the account corresponding to the auth token doesn't carry the permission to delete branches. Also raises an error if the token doesn't have edit access to the entities in the branch being deleted. Not applicable to the mock client.

1client.delete_branch("mybranch")
2
3# do some operations on the branch
4client.commit(...)
5
6# delete the branch
7client.init_branch("mybranch")
8
9# client now points to the main branch
10client.commit(...)
Delete an existing branch

python

checkout

Checkout

Sets the client to point to the given branch.

Parameters

name:str

The name of the branch that the client should start pointing to. All subsequent operations (e.g. commit, query) will be directed to this branch.

Note that checkout doesn't validate that the name points to a real branch. Instead, it just changes the local state of the client. If the branch doesn't exist, subsequent branch operations will fail, not the checkout itself.

1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
Changing client to point to 'mybranch'

python

Errors

checkout does not raise any error.

Note

If not specified via explicit checkout, by default, clients point to the 'main' branch.

branch

Branch

Get the name of the current branch.

Parameters

Doesn't take any parameters.

Returns

name:str

Returns the name of the branch that the client is pointing to.

1# change active branch from `main` to `mybranch`
2client.checkout("mybranch")
3assert client.branch() == "mybranch"
4
5# all subsequent operations will be on `mybranch`
6client.commit(...)
Get the name of the current branch

python

erase

Erase

Method to hard-erase data from a dataset.

Data related to the provided erase keys is removed and will not be reflected to downstream dataset or any subsequent queries.

This method should be used as a way to comply with GDPR and other similar regulations that require "right to be forgotten". For operational deletion/correction of data, regular CDC mechanism must be used instead.

Warning

Erase only removes the data from the dataset in the request. If the data has already propagated to downstream datasets via pipelines, you may want to issue separate erase requests for all such datasets too.

Parameters

dataset:Union[Dataset, str]

The dataset from which data needs to be erased. Can be provided either as a Dataset object or string representing the dataset name.

erase_keys:pd.Dataframe

The dataframe containing the erase keys - all data matching these erase keys is removed. The columns of the dataframe must have the right names & types to be compatible with the erase keys defined in the schema of dataset.

1from fennel.datasets import dataset, field
2from fennel.connectors import source, Webhook
3
4# first define & sync a dataset that sources from a webhook
5webhook = Webhook(name="some_webhook")
6
7@source(webhook.endpoint("some_endpoint"), disorder="14d", cdc="upsert")
8@dataset(index=True)
9class Transaction:
10    uid: int = field(key=True, erase_key=True)
11    amount: int
12    timestamp: datetime
13
14client.commit(message="some commit msg", datasets=[Transaction])
15
16client.erase(
17    Transaction,
18    erase_keys=pd.DataFrame({"uid": [1, 2, 3]}),
19)
Erasing data corresponding to given uids

python

POST /api/v1/query

Query

API to extract a set of output features given known values of some input features.

Headers

Content-Type:"application/json"

All Fennel REST APIs expect a content-type of application/json.

Authorization:Bearer {str}

Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.

X-FENNEL-BRANCH:Bearer {str}

Fennel uses header for passing branch name to the server against which we want to query.

Body Parameters:

inputs:str

List of fully qualified names of input features. Example name: Featureset.feature

outputs:str

List of fully qualified names of output features. Example name: Featureset.feature. Can also contain name of a featureset in which case all features in the featureset are returned.

data:json

JSON representing the dataframe of input feature values. The json can either be an array of json objects, each representing a row; or it can be a single json object where each key maps to a list of values representing a column.

Strings of json are also accepted.

log:bool

If true, the extracted features are also logged (often to serve as future training data).

workflow:string

Default: default

The name of the workflow with which features should be logged (only relevant when log is set to true).

sampling_rate:float

Float between 0-1 describing the sample rate to be used for logging features (only relevant when log is set to true).

Returns

The response dataframe is returned as column oriented json.

1url = "{}/api/v1/query".format(SERVER)
2headers = {
3    "Content-Type": "application/json",
4    "Authorization": "Bearer <API-TOKEN>",
5    "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7data = {"UserFeatures.userid": [1, 2, 3]}
8req = {
9    "outputs": ["UserFeatures"],
10    "inputs": ["UserFeatures.userid"],
11    "data": data,
12    "log": True,
13    "workflow": "test",
14}
15
16response = requests.post(url, headers=headers, data=req)
17assert response.status_code == requests.codes.OK, response.json()
With column oriented data

python

1url = "{}/api/v1/query".format(SERVER)
2headers = {
3    "Content-Type": "application/json",
4    "Authorization": "Bearer <API-TOKEN>",
5    "X-FENNEL-BRANCH": BRANCH_NAME,
6}
7data = [
8    {"UserFeatures.userid": 1},
9    {"UserFeatures.userid": 2},
10    {"UserFeatures.userid": 3},
11]
12req = {
13    "outputs": ["UserFeatures"],
14    "inputs": ["UserFeatures.userid"],
15    "data": data,
16    "log": True,
17    "workflow": "test",
18}
19
20response = requests.post(url, headers=headers, data=req)
21assert response.status_code == requests.codes.OK, response.json()
With row oriented data

python

POST /api/v1/log

Log

Method to push data into Fennel datasets via webhook endpoints via REST API.

Headers

Content-Type:"application/json"

All Fennel REST APIs expect a content-type of application/json.

Authorization:Bearer {str}

Fennel uses bearer token for authorization. Pass along a valid token that has permissions to log data to the webhook.

Body Parameters

webhook:str

The name of the webhook source containing the endpoint to which the data should be logged.

endpoint:str

The name of the webhook endpoint to which the data should be logged.

data:json

The data to be logged to the webhook. This json string could either be:

  • Row major where it's a json array of rows with each row written as a json object.

  • Column major where it's a dictionary from column name to values of that column as a json array.

1url = "{}/api/v1/log".format(SERVER)
2headers = {
3    "Content-Type": "application/json",
4    "Authorization": "Bearer <API-TOKEN>",
5}
6data = [
7    {
8        "user_id": 1,
9        "name": "John",
10        "age": 20,
11        "country": "Russia",
12        "timestamp": "2020-01-01",
13    },
14    {
15        "user_id": 2,
16        "name": "Monica",
17        "age": 24,
18        "country": "Chile",
19        "timestamp": "2021-03-01",
20    },
21    {
22        "user_id": 3,
23        "name": "Bob",
24        "age": 32,
25        "country": "USA",
26        "timestamp": "2020-01-01",
27    },
28]
29req = {
30    "webhook": "fennel_webhook",
31    "endpoint": "UserInfo",
32    "data": data,
33}
34response = requests.post(url, headers=headers, data=req)
35assert response.status_code == requests.codes.OK, response.json()

python

Core Types

Fennel supports the following data types, expressed as native Python type hints.

int

Implemented as signed 8 byte integer (int64)

float

Implemented as signed 8 byte float with double precision

bool

Implemented as standard 1 byte boolean

str

Arbitrary sequence of utf-8 characters. Like most programming languages, str doesn't support arbitrary binary bytes though.

List[T]

List of elements of any other valid type T. Unlike Python lists, all elements must have the same type.

dict[T]

Map from str to data of any valid type T.

Fennel does not support dictionaries with arbitrary types for keys - please reach out to Fennel support if you have use cases requiring that.

Optional[T]

Same as Python Optional - permits either None or values of type T.

Embedding[int]

Denotes a list of floats of the given fixed length i.e. Embedding[32] describes a list of 32 floats. This is same as list[float] but enforces the list length which is important for dot product and other similar operations on embeddings.

datetime

Describes a timestamp, implemented as microseconds since Unix epoch (so minimum granularity is microseconds). Can be natively parsed from multiple formats though internally is stored as 8-byte signed integer describing timestamp as microseconds from epoch in UTC.

struct {k1: T1, k2: T2, ...}

Describes the equivalent of a struct or dataclass - a container containing a fixed set of fields of fixed types.

Note

Fennel uses a strong type system and post data-ingestion, data doesn't auto-coerce across types. For instance, it will be a compile or runtime error if something was expected to be of type float but received an int instead.

1# imports for data types
2from typing import List, Optional, Dict
3from datetime import datetime
4from fennel.dtypes import struct
5
6# imports for datasets
7from fennel.datasets import dataset, field
8from fennel.lib import meta
9
10@struct  # like dataclass but verifies that fields have valid Fennel types
11class Address:
12    street: str
13    city: str
14    state: str
15    zip_code: Optional[str]
16
17@meta(owner="[email protected]")
18@dataset
19class Student:
20    id: int = field(key=True)
21    name: str
22    grades: Dict[str, float]
23    honors: bool
24    classes: List[str]
25    address: Address  # Address is now a valid Fennel type
26    signup_time: datetime

python

Type Restrictions

Fennel type restrictions allow you to put additional constraints on base types and restrict the set of valid values in some form.

regex:regex("<pattern>")

Restriction on the base type of str. Permits only the strings matching the given regex pattern.

between:between(T, low, high)

Restriction on the base type of int or float. Permits only the numbers between low and high (both inclusive by default). Left or right can be made exclusive by setting min_strict or max_strict to be False respectively.

oneof:oneof(T, [values...])

Restricts a type T to only accept one of the given values as valid values. oneof can be thought of as a more general version of enum.

For the restriction to be valid, all the values must themselves be of type T.

1# imports for data types
2from datetime import datetime, timezone
3from fennel.dtypes import oneof, between, regex
4
5# imports for datasets
6from fennel.datasets import dataset, field
7from fennel.lib import meta
8from fennel.connectors import source, Webhook
9
10webhook = Webhook(name="fennel_webhook")
11
12@meta(owner="[email protected]")
13@source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="upsert")
14@dataset
15class UserInfoDataset:
16    user_id: int = field(key=True)
17    name: str
18    age: between(int, 0, 100, strict_min=True)
19    gender: oneof(str, ["male", "female", "non-binary"])
20    email: regex(r"[^@]+@[^@]+\.[^@]+")
21    timestamp: datetime

python

Type Restriction Composition

These restricted types act as regular types -- they can be mixed/matched to form complex composite types. For instance, the following are all valid Fennel types:

  • list[regex('$[0-9]{5}$')] - list of regexes matching US zip codes
  • oneof(Optional[int], [None, 0, 1]) - a nullable type that only takes 0 or 1 as valid values
Note

Data belonging to the restricted types is still stored & transmitted (e.g. in json encoding) as a regular base type. It's just that Fennel will reject data of base type that doesn't meet the restriction.

Duration

Fennel lets you express durations in an easy to read natural language as described below:

SymbolUnit
yYear
wWeek
dDay
hHour
mMinute
sSecond

There is no shortcut for month because there is a very high degree of variance in month's duration- some months are 28 days, some are 30 days and some are 31 days. A common convention in ML is to use 4 weeks to describe a month.

Note

A year is hardcoded to be exactly 365 days and doesn't take into account variance like leap years.

1"7h" -> 7 hours
2"12d" -> 12 days
3"2y" -> 2 years
4"3h 20m 4s" -> 3 hours 20 minutes and 4 seconds
5"2y 4w" -> 2 years and 4 weeks

text

Aggregate

Operator to do continuous window aggregations. Aggregate operator must always be preceded by a groupby operator.

Parameters

aggregates:List[Aggregation]

Positional argument specifying the list of aggregations to apply on the grouped dataset. This list can be passed either as an unpacked *args or as an explicit list as the first position argument.

See aggregations for the full list of aggregate functions.

along:Optional[str]

Keyword argument indicating the time axis to aggregate along. If along is None, Fennel will aggregate along the timestamp of the input dataset.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Count,
7    Sum,
8)
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amount: int
19    timestamp: datetime = field(timestamp=True)
20    transaction_time: datetime
21
22@dataset(index=True)
23class Aggregated:
24    # groupby field becomes the key field
25    uid: int = field(key=True)
26    # new fields are added to the dataset by the aggregate operation
27    total: int
28    count_1d: int
29    timestamp: datetime = field(timestamp=True)
30
31    @pipeline
32    @inputs(Transaction)
33    def aggregate_pipeline(cls, ds: Dataset):
34        return ds.groupby("uid").aggregate(
35            count_1d=Count(window="1d"),
36            total=Sum(of="amount", window="forever"),
37            along="transaction_time",
38        )
Aggregate count & sum of transactions in rolling windows along transaction time

python

Returns

Dataset

Returns a dataset where all columns passed to groupby become the key columns, the timestamp column stays as it is and one column is created for each aggregation.

The type of each aggregated column depends on the aggregate and the type of the corresponding column in the input dataset.

Note

Aggregate is the terminal operator - no other operator can follow it and no other datasets can be derived from the dataset containing this pipeline.

Assign

Operator to add a new column to a dataset - the added column is neither a key column nor a timestamp column.

Parameters

name:str

The name of the new column to be added - must not conflict with any existing name on the dataset.

dtype:Type

The data type of the new column to be added - must be a valid Fennel supported data type.

func:Callable[pd.Dataframe, pd.Series[T]]

The function, which when given a subset of the dataset as a dataframe, returns the value of the new column for each row in the dataframe.

Fennel verifies at runtime that the returned series matches the declared dtype.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def my_pipeline(cls, ds: Dataset):
24        return ds.assign("amount_sq", int, lambda df: df["amount"] ** 2)
Adding new column 'amount_sq' of type int

python

Returns

Dataset

Returns a dataset with one additional column of the given name and type same as dtype. This additional column is neither a key-column or the timestamp column.

Errors

Invalid series at runtime:

Runtime error if the value returned from the lambda isn't a pandas Series of the declared type and the same length as the input dataframe.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset
15class WithHalf:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def my_pipeline(cls, ds: Dataset):
24        return ds.assign(
25            "amount_sq", int, lambda df: df["amount"] * 0.5
26        )
Runtime error: returns float, not int

python

Dedup

Operator to dedup keyless datasets (e.g. event streams).

Parameters

by:Optional[List[str]]

Default: None

The list of columns to use for identifying duplicates. If not specified, all the columns are used for identifying duplicates.

Two rows of the input dataset are considered duplicates if and only if they have the same values for the timestamp column and all the by columns.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    txid: int
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset
16class Deduped:
17    txid: int
18    uid: int
19    amount: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def dedup_pipeline(cls, ds: Dataset):
25        return ds.dedup(by="txid")
Dedup using txid and timestamp

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    txid: int
11    uid: int
12    amount: int
13    timestamp: datetime
14
15@dataset
16class Deduped:
17    txid: int
18    uid: int
19    amount: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def dedup_by_all_pipeline(cls, ds: Dataset):
25        return ds.dedup()
Dedup using all the fields

python

Returns

Dataset

Returns a keyless dataset having the same schema as the input dataset but with some duplicated rows filtered out.

Errors

Dedup on dataset with key columns:

Commit error to apply dedup on a keyed dataset.

Drop

Operator to drop one or more non-key non-timestamp columns from a dataset.

Parameters

columns:List[str]

List of columns in the incoming dataset that should be dropped. This can be passed either as unpacked *args or as a Python list.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    country: str
13    weight: float
14    height: float
15    gender: str
16    timestamp: datetime
17
18@dataset(index=True)
19class Dropped:
20    uid: int = field(key=True)
21    gender: str
22    timestamp: datetime
23
24    @pipeline
25    @inputs(User)
26    def drop_pipeline(cls, user: Dataset):
27        return user.drop("height", "weight").drop(
28            columns=["city", "country"]
29        )
Can pass names via *args or kwarg columns

python

Returns

Dataset

Returns a dataset with the same schema as the input dataset but with some columns (as specified by columns) removed.

Errors

Dropping key/timestamp columns:

Commit error on removing any key columns or the timestamp column.

Dropping non-existent columns:

Commit error on removing any column that doesn't exist in the input dataset.

1@source(webhook.endpoint("User"))
2@dataset
3class User:
4    uid: int = field(key=True)
5    city: str
6    timestamp: datetime
7
8@dataset
9class Dropped:
10    city: str
11    timestamp: datetime
12
13    @pipeline
14    @inputs(User)
15    def pipeline(cls, user: Dataset):
16        return user.drop("uid")
Can not drop key or timestamp columns

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    timestamp: datetime
13
14@dataset
15class Dropped:
16    uid: int = field(key=True)
17    city: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.drop("random")
Can not drop a non-existent column

python

Dropnull

Operator to drop rows containing null values (aka None in Python speak) in the given columns.

Parameters

columns:Optional[List[str]]

List of columns in the incoming dataset that should be checked for presence of None values - if any such column has None for a row, the row will be filtered out from the output dataset. This can be passed either as unpacked *args or as a Python list.

If no arguments are given, columns will be all columns with the type Optional[T] in the dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    dob: str
12    city: Optional[str]
13    country: Optional[str]
14    gender: Optional[str]
15    timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19    uid: int = field(key=True)
20    dob: str
21    # dropnull changes the type of the columns to non-optional
22    city: str
23    country: str
24    gender: Optional[str]
25    timestamp: datetime
26
27    @pipeline
28    @inputs(User)
29    def dropnull_pipeline(cls, user: Dataset):
30        return user.dropnull("city", "country")
Dropnull on city & country, but not gender

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    dob: str
12    city: Optional[str]
13    country: Optional[str]
14    gender: Optional[str]
15    timestamp: datetime
16
17@dataset(index=True)
18class Derived:
19    uid: int = field(key=True)
20    dob: str
21    # dropnull changes the type of all optional columns to non-optional
22    city: str
23    country: str
24    gender: str
25    timestamp: datetime
26
27    @pipeline
28    @inputs(User)
29    def dropnull_pipeline(cls, user: Dataset):
30        return user.dropnull()
Applies to all optional columns if none is given explicitly

python

Returns

Dataset

Returns a dataset with the same name & number of columns as the input dataset but with the type of some columns modified from Optional[T] -> T.

Errors

Dropnull on non-optional columns:

Commit error to pass a column without an optional type.

Dropnull on non-existent columns:

Commit error to pass a column that doesn't exist in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: Optional[str]
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int = field(key=True)
17    city: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.select("random")
Dropnull on a non-existent column

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    # dropnull can only be used on optional columns
12    city: str
13    timestamp: datetime
14
15@dataset
16class Derived:
17    uid: int = field(key=True)
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.select("city")
Dropnull on a non-optional column

python

Explode

Operator to explode lists in a single row to form multiple rows, analogous to to the explodefunction in Pandas.

Only applicable to keyless datasets.

Parameters

columns:List[str]

The list of columns to explode. This list can be passed either as unpacked *args or kwarg columns mapping to an explicit list.

All the columns should be of type List[T] for some T in the input dataset and after explosion, they get converted to a column of type Optional[T].

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    skus: List[int]
12    prices: List[float]
13    timestamp: datetime
14
15@dataset
16class Derived:
17    uid: int
18    sku: Optional[int]
19    price: Optional[float]
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Orders)
24    def explode_pipeline(cls, ds: Dataset):
25        return (
26            ds
27            .explode("skus", "prices").rename(
28                {"skus": "sku", "prices": "price"}
29            )
30        )
Exploding skus and prices together

python

Returns

Dataset

Returns a dataset with the same number & name of columns as the input dataset but with the type of exploded columns modified from List[T] to Optional[T].

Empty lists are converted to None values (hence the output types need to be Optional[T]).

Errors

Exploding keyed datasets:

Commit error to apply explode on an input dataset with key columns.

Exploding non-list columns:

Commit error to explode using a column that is not of the type List[T].

Exploding non-existent columns:

Commit error to explode using a column that is not present in the input dataset.

Unequal size lists in multi-column explode:

For a given row, all the columns getting exploded must have lists of the same length, otherwise a runtime error is raised. Note that the lists can be of different type across rows.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: float
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price")
Exploding a non-list column

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: List[float]
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price", "random")
Exploding a non-existent column

python

Filter

Operator to selectively filter out rows from a dataset.

Parameters

func:Callable[pd.Dataframe, pd.Series[bool]]

The actual filter function - takes a pandas dataframe containing a batch of rows from the input dataset and is expected to return a series of booleans of the same length. Only rows corresponding to True are retained in the output dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    signup_time: datetime
13
14@dataset(index=True)
15class Filtered:
16    uid: int = field(key=True)
17    city: str
18    signup_time: datetime
19
20    @pipeline
21    @inputs(User)
22    def my_pipeline(cls, user: Dataset):
23        return user.filter(lambda df: df["city"] != "London")
Filtering out rows where city is London

python

Returns

Dataset

Returns a dataset with the same schema as the input dataset, just with some rows potentially filtered out.

Errors

Invalid series at runtime:

Runtime error if the value returned from the lambda isn't a pandas Series of the bool and of the same length as the input dataframe.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    signup_time: datetime
13
14@dataset
15class Filtered:
16    uid: int = field(key=True)
17    city: str
18    signup_time: datetime
19
20    @pipeline
21    @inputs(User)
22    def my_pipeline(cls, user: Dataset):
23        return user.filter(lambda df: df["city"] + "London")
Runtime Error: Lambda returns str, not bool

python

First

Operator to find the first element of a group by the row timestamp. First operator must always be preceded by a groupby operator.

Parameters

The first operator does not take any parameters.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class FirstOnly:
16    uid: int = field(key=True)
17    amount: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def first_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").first()
Dataset with just the first transaction of each user

python

Returns

Dataset

The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.

For each group formed by grouping, one row is chosen having the lowest value in the timestamp field. In case of ties, the first seen row wins.

Groupby

Operator to group rows of incoming datasets to be processed by the next operator.

Technically, groupby isn't a standalone operator by itself since its output isn't a valid dataset. Instead, it becomes a valid operator when followed by first, aggregate, or window operators.

Parameters

keys:List[str]

List of keys in the incoming dataset along which the rows should be grouped. This can be passed as unpacked *args or a Python list.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    category: str
12    timestamp: datetime
13
14@dataset
15class FirstInCategory:
16    category: str = field(key=True)
17    uid: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def groupby_pipeline(cls, transactions: Dataset):
23        return transactions.groupby("category").first()
Groupby category before using first

python

Errors

Grouping by non-existent columns:

Commit error if trying to group by columns that don't exist in the input dataset.

Grouping by timestamp column:

Commit error if trying to do a groupby via the timestamp column of the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8    webhook.endpoint("Transaction"), disorder="14d", cdc="append"
9)
10@dataset
11class Transaction:
12    uid: int
13    category: str
14    timestamp: datetime
15
16@dataset
17class FirstInCategory:
18    category: str = field(key=True)
19    uid: int
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def bad_pipeline(cls, transactions: Dataset):
25        return transactions.groupby("non_existent_column").first()
Groupby using a non-existent column

python

Join

Operator to join two datasets. The right hand side dataset must have one or more key columns and the join operation is performed on these columns.

Parameters

dataset:Dataset

The right hand side dataset to join this dataset with. RHS dataset must be a keyed dataset and must also be an input to the pipeline (vs being an intermediary dataset derived within a pipeline itself).

how:"inner" | "left"

Required kwarg indicating whether the join should be an inner join (how="inner") or a left-outer join (how="left"). With "left", the output dataset may have a row even if there is no matching row on the right side.

on:Optional[List[str]]

Default: None

Kwarg that specifies the list of fields along which join should happen. If present, both left and right side datasets must have fields with these names and matching data types. This list must be identical to the names of all key columns of the right hand side.

If this isn't set, left_on and right_on must be set instead.

left_on:Optional[List[str]]

Default: None

Kwarg that specifies the list of fields from the left side dataset that should be used for joining. If this kwarg is set, right_on must also be set. Note that right_on must be identical to the names of all the key columns of the right side.

right_on:Optional[List[str]]

Default: None

Kwarg that specifies the list of fields from the right side dataset that should be used for joining. If this kwarg is setup, left_on must also be set. The length of left_on and right_on must be the same and corresponding fields on both sides must have the same data types.

within:Tuple[Duration, Duration]

Default: ("forever", "0s")

Optional kwarg specifying the time window relative to the left side timestamp within which the join should be performed. This can be seen as adding another condition to join like WHERE left_time - d1 < right_time AND right_time < left_time + d1

  • The first value in the tuple represents how far back in time should a join happen. The term "forever" means that we can go infinitely back in time when searching for an event to join from the left-hand side data.
  • The second value in the tuple represents how far ahead in time we can go to perform a join. This is useful in cases when the corresponding RHS data of the join can come later. The default value for this parameter is ("forever", "0s") which means that we can go infinitely back in time and the RHS data should be available for the event time of the LHS data.
1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    merchant: int
12    amount: int
13    timestamp: datetime
14
15@source(
16    webhook.endpoint("MerchantCategory"), disorder="14d", cdc="upsert"
17)
18@dataset(index=True)
19class MerchantCategory:
20    # right side of the join can only be on key fields
21    merchant: int = field(key=True)
22    category: str
23    updated_at: datetime  # won't show up in joined dataset
24
25@dataset
26class WithCategory:
27    uid: int
28    merchant: int
29    amount: int
30    timestamp: datetime
31    category: str
32
33    @pipeline
34    @inputs(Transaction, MerchantCategory)
35    def join_pipeline(cls, tx: Dataset, merchant_category: Dataset):
36        return tx.join(merchant_category, on=["merchant"], how="inner")
Inner join on 'merchant'

python

Returns

Dataset

Returns a dataset representing the joined dataset having the same keys & timestamp columns as the LHS dataset.

The output dataset has all the columns from the left dataset and all non-key non-timestamp columns from the right dataset.

If the join was of type inner, the type of a joined RHS column of type T stays T but if the join was of type left, the type in the output dataset becomes Optional[T] if it was T on the RHS side.

Errors

Join with non-key dataset on the right side:

Commit error to do a join with a dataset that doesn't have key columns.

Join with intermediate dataset:

Commit error to do a join with a dataset that is not an input to the pipeline but instead is an intermediate dataset derived during the pipeline itself.

Post-join column name conflict:

Commit error if join will result in a dataset having two columns of the same name. A common way to work-around this is to rename columns via the rename operator before the join.

Mismatch in columns to be joined:

Commit error if the number/type of the join columns on the left and right side don't match.

Latest

Operator to find the latest element of a group by the row timestamp. Latest operator must always be preceded by a groupby operator.

Latest operator is a good way to effectively convert a stream of only append to a time-aware upsert stream.

Parameters

The latest operator does not take any parameters.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class LatestOnly:
16    uid: int = field(key=True)
17    amount: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def latest_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").latest()
Dataset with just the latest transaction of each user

python

Returns

Dataset

The returned dataset's fields are the same as the input dataset, with the grouping fields as the keys.

The row with the maximum timestamp is chosen for each group. In case of ties, the last seen row wins.

Rename

Operator to rename columns of a dataset.

Parameters

columns:Dict[str, str]

Dictionary mapping from old column names to their new names.

All columns should still have distinct and valid names post renaming.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    weight: float
12    height: float
13    timestamp: datetime
14
15@dataset(index=True)
16class Derived:
17    uid: int = field(key=True)
18    # rename changes the name of the columns
19    weight_lb: float
20    height_in: float
21    timestamp: datetime
22
23    @pipeline
24    @inputs(User)
25    def rename_pipeline(cls, user: Dataset):
26        return user.rename(
27            {"weight": "weight_lb", "height": "height_in"}
28        )
Rename weight -> weight_lb & height -> height_in

python

Returns

Dataset

Returns a dataset with the same schema as the input dataset, just with the columns renamed.

Errors

Renaming non-existent column:

Commit error if there is no existing column with name matching each of the keys in the rename dictionary.

Conflicting column names post-rename:

Commit error if after renaming, there will be two columns in the dataset having the same name.

Select

Operator to select some columns from a dataset.

Parameters

columns:List[str]

List of columns in the incoming dataset that should be selected into the output dataset. This can be passed either as unpacked *args or as kwarg set to a Python list.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
8@dataset
9class User:
10    uid: int = field(key=True)
11    weight: float
12    height: float
13    city: str
14    country: str
15    gender: str
16    timestamp: datetime
17
18@dataset(index=True)
19class Selected:
20    uid: int = field(key=True)
21    weight: float
22    height: float
23    timestamp: datetime
24
25    @pipeline
26    @inputs(User)
27    def select_pipeline(cls, user: Dataset):
28        return user.select("uid", "height", "weight")
Selecting uid, height & weight columns

python

Returns

Dataset

Returns a dataset containing only the selected columns. Timestamp field is automatically included whether explicitly provided in the select or not.

Errors

Not selecting all key columns:

Select, like most other operators, can not change the key or timestamp columns. As a result, not selecting all the key columns is a commit error.

Selecting non-existent column:

Commit error to select a column that is not present in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    timestamp: datetime
13
14@dataset
15class Selected:
16    city: str
17    timestamp: datetime
18
19    @pipeline
20    @inputs(User)
21    def bad_pipeline(cls, user: Dataset):
22        return user.select("city")
Did not select key uid

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("User"))
8@dataset
9class User:
10    uid: int = field(key=True)
11    city: str
12    timestamp: datetime
13
14@dataset
15class Selected:
16    uid: int = field(key=True)
17    city: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(User)
22    def bad_pipeline(cls, user: Dataset):
23        return user.select("uid", "random")
Selecting non-existent column

python

Summarize

Operator to compute arbitrary aggregation summary over events in a window and augment that as a new field in the dataset.

Summarize operator must always be preceded by a window operator.

Parameters

field:str

The name of the new column to be added to store the summary - must not conflict with any existing name on the dataset.

dtype:Type

The data type of the new column to be added - must be a valid Fennel supported data type.

func:Callable[pd.Dataframe, pd.Series[T]]

The function, which when given a subset of the dataframe corresponding to the rows falling in the window, returns the summary value for that window. No guarantees are made about the ordering of rows in this input dataframe.

Fennel verifies at runtime that the returned series matches the declared dtype.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Window
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime = field(timestamp=True)
14
15@dataset(index=True)
16class Session:
17    uid: int = field(key=True)
18    window: Window = field(key=True)
19    # new field added by the summarize operator
20    total_amount: int
21    timestamp: datetime = field(timestamp=True)
22
23    @pipeline
24    @inputs(Transaction)
25    def window_pipeline(cls, ds: Dataset):
26        return (
27            ds.groupby("uid").window(
28                type="session", gap="15m", into_field="window"
29            )
30            .summarize(
31                "total_amount",
32                int,
33                lambda df: df["amount"].sum(),
34            )
35        )
Calculate total amount per window in 15-min session windows

python

Returns

Dataset

Returns a dataset where all columns passed to groupby become the key columns, the timestamp column become the end timestamps of the window corresponding to that aggregation. One value column will be create to store the result of summarize. The type of the summary column depends on the output of summary function.

Errors

Invalid value at runtime:

Runtime error if the value returned from the lambda isn't a value of the declared type.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Window
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime = field(timestamp=True)
14
15@dataset
16class Session:
17    uid: int = field(key=True)
18    window: Window = field(key=True)
19    # type 'float' doesn't match the type in summarize below
20    total_amount: float
21    timestamp: datetime = field(timestamp=True)
22
23    @pipeline
24    @inputs(Transaction)
25    def window_pipeline(cls, ds: Dataset):
26        return (
27            ds.groupby("uid")
28            .window(type="session", gap="15m", into_field="window")
29            .summarize(
30                "total_amount",
31                int,
32                lambda df: df["amount"].sum(),
33            )
34        )
The summarize result is defined as int but assign to type float

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Window
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime = field(timestamp=True)
14
15@dataset
16class Session:
17    uid: int = field(key=True)
18    window: Window = field(key=True)
19    total_amount: str
20    timestamp: datetime = field(timestamp=True)
21
22    @pipeline
23    @inputs(Transaction)
24    def window_pipeline(cls, ds: Dataset):
25        return (
26            ds.groupby("uid")
27            .window(type="session", gap="15m", into_field="window")
28            .summarize(
29                "total_amount",
30                str,
31                # lambda returns int, not str; hence runtime error
32                lambda df: df["amount"].sum(),
33            )
34        )
The summarize result is an int but got a str in the schema type

python

Transform

Catch all operator to add/remove/update columns.

Parameters

func:Callable[pd.Dataframe, pd.Dataframe]

The transform function that takes a pandas dataframe containing a batch of rows from the input dataset and returns an output dataframe of the same length, though potentially with different set of columns.

schema:Optional[Dict[str, Type]]

The expected schema of the output dataset. If not specified, the schema of the input dataset is used.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class WithSquare:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def transform_pipeline(cls, ds: Dataset):
24        schema = ds.schema()
25        schema["amount_sq"] = int
26        return ds.transform(
27            lambda df: df.assign(amount_sq=df["amount"] ** 2), schema
28        )
Adding column amount_sq

python

Returns

Dataset

Returns a dataset with the schema as specified in schema and rows as transformed by the transform function.

Errors

Output dataframe doesn't match the schema:

Runtime error if the dataframe returned by the transform function doesn't match the provided schema.

Modifying key/timestamp columns:

Commit error if transform tries to modify key/timestamp columns.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8    webhook.endpoint("Transaction"), disorder="14d", cdc="upsert"
9)
10@dataset
11class Transaction:
12    uid: int = field(key=True)
13    amount: int
14    timestamp: datetime
15
16def transform(df: pd.DataFrame) -> pd.DataFrame:
17    df["user"] = df["uid"]
18    df.drop(columns=["uid"], inplace=True)
19    return df
20
21@dataset
22class Derived:
23    user: int = field(key=True)
24    amount: int
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def bad_pipeline(cls, ds: Dataset):
30        schema = {"user": int, "amount": int, "timestamp": datetime}
31        return ds.transform(transform, schema)
Modifying key or timestamp columns

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="upsert")
8@dataset
9class Transaction:
10    uid: int = field(key=True)
11    amount: int
12    timestamp: datetime
13
14@dataset
15class WithHalf:
16    uid: int = field(key=True)
17    amount: int
18    amount_sq: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def invalid_pipeline(cls, ds: Dataset):
24        schema = ds.schema()
25        schema["amount_sq"] = int
26        return ds.transform(
27            lambda df: df.assign(amount_sq=str(df["amount"])), schema
28        )  # noqa
Runtime error: amount_sq is of type int, not str

python

Union

Operator to union rows from two datasets of the identical schema. Only applicable to keyless datasets. Written as simple + operator on two datasets.

Returns

Dataset

Returns a dataset with the same schema as both the input datasets but containing rows from both of them. If both contain the identical row, two copies of that row are present in the output datasets.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, S3, Kafka
4
5cutoff = datetime(2024, 1, 1, 0, 0, 0)
6s3 = S3(name="mys3")
7bucket = s3.bucket("data", path="orders")
8kafka = Kafka(
9    name="my_kafka",
10    bootstrap_servers="localhost:9092",
11    security_protocol="SASL_PLAINTEXT",
12    sasl_mechanism="PLAIN",
13    sasl_plain_username=os.environ["KAFKA_USERNAME"],
14    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
15)
16
17@source(bucket, cdc="append", disorder="2d", until=cutoff)
18@dataset
19class OrdersBackfill:
20    uid: int
21    skuid: int
22    timestamp: datetime
23
24@source(kafka.topic("order"), cdc="append", disorder="1d", since=cutoff)
25@dataset
26class OrdersLive:
27    uid: int
28    skuid: int
29    timestamp: datetime
30
31@dataset
32class Union:
33    uid: int
34    skuid: int
35    timestamp: datetime
36
37    @pipeline
38    @inputs(OrdersBackfill, OrdersLive)
39    def explode_pipeline(cls, backfill: Dataset, live: Dataset):
40        return backfill + live
Union an s3 and kafka dataset

python

Errors

Taking union of datasets with different schemas:

Union operator is defined only when both the input datasets have the same schema. Commit error to apply union on input datasets with different schemas.

Taking union of keyed datasets:

Commit error to apply union on input datasets with key columns.

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: float
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price")
Exploding a non-list column

python

1from fennel.datasets import dataset, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Orders"), disorder="14d", cdc="append")
8@dataset
9class Orders:
10    uid: int
11    price: List[float]
12    timestamp: datetime
13
14@dataset
15class Derived:
16    uid: int
17    price: float
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Orders)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.explode("price", "random")
Exploding a non-existent column

python

Window

Operator to do aggregation of events into windows based on their timestamps. Window operator must always be preceded by a groupby operator.

Parameters

type:"hopping" | "tumbling" | "session"

The type of the window to be used.

Tumbling windows are fixed length and non-overlapping windows ensuring that each event is placed into exactly one window. With tumbling windows, duration must be set.

Hopping windows are also fixed width like tumbling but support overlapping windows, where a single event can be included in multiple windows. With hopping windows, duration and stride must be set.

Session windows are variable sized and are created based on the actual occurrence of events, with windows being at least gap duration apart. With session windows, gap must be set.

See this for a more detailed explanation of the three kinds of windows.

gap:Duration

The timeout for inactivity that defines the boundary of a window. Events that occur closer together than the specified gap are grouped into the same window. Only relevant when type is set to "session".

duration:Duration

The total fixed length of the window. Only relevant when type is set to "tumbling" or "hopping".

stride:Duration

The interval between the start of two successive windows. Only relevant when the type is "hopping". It is invalid for stride to be greater than the overall duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type "Window".

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Window
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime = field(timestamp=True)
14
15@dataset(index=True)
16class Session:
17    # groupby field becomes the key field
18    uid: int = field(key=True)
19    # window also becomes a key field
20    window: Window = field(key=True)
21    timestamp: datetime = field(timestamp=True)
22
23    @pipeline
24    @inputs(Transaction)
25    def window_pipeline(cls, ds: Dataset):
26        return (
27            ds.groupby("uid")
28            .window(type="session", gap="15m", into_field="window")
29        )
Aggregate event into sessions that are 15 minutes apart

python

Returns

Dataset

Returns a dataset where all columns passed to groupby become the key columns along with a new key column of type Window that represents the window object. The timestamp column of the input dataset stays the timestamp column in the output dataset too and is used for windowing.

Errors

Setting incorrect parameters for the type of window:

Some parameters only work with a certain kind of window, make sure to check the documentation before going.

Using forever duration:

While "forever" is a valid duration and can be used for aggregation, it's not a valid duration value for either tumbling or hopping window.

Missing window key:

This into_field field is expected to be a key and of type Window.

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime = field(timestamp=True)
13
14@dataset
15class Session:
16    # schema doesn't contain a key field of type Window
17    uid: int = field(key=True)
18    timestamp: datetime = field(timestamp=True)
19
20    @pipeline
21    @inputs(Transaction)
22    def window_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").window(
24            type="session", gap="15m", into_field="window"
25        )
Missing window key in schema

python

1from fennel.datasets import dataset, field, pipeline, Dataset
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4from fennel.dtypes import Window
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    timestamp: datetime = field(timestamp=True)
14
15@dataset
16class Session:
17    uid: int = field(key=True)
18    window: Window = field(key=True)
19    timestamp: datetime = field(timestamp=True)
20
21    @pipeline
22    @inputs(Transaction)
23    def window_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").window(
25            type="tumbling",
26            duration="forever",
27            into_field="window",
28        )
Forever is not a valid duration, only valid duration allowed

python

Avro Registry

Several Fennel sources work with Avro format. When using Avro, it's common to keep the schemas in a centralized schema registry instead of including schema with each message.

Fennel supports integration with avro schema registries.

Parameters

registry:Literal["confluent"]

String denoting the provider of the registry. As of right now, Fennel only supports "confluent" avro registry though more such schema registries may be added over time.

url:str

The URL where the schema registry is hosted.

username:Optional[str]

User name to access the schema registry (assuming the registry requires authentication). If user name is provided, corresponding password must also be provided.

Assuming authentication is needed, either username/password must be provided or a token, but not both.

password:Optional[str]

The password associated with the username.

token:Optional[str]

Token to be used for authentication with the schema registry. Only one of username/password or token must be provided.

1from fennel.connectors import source, Kafka, Avro
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5    name="my_kafka",
6    bootstrap_servers="localhost:9092",  # could come via os env var too
7    security_protocol="SASL_PLAINTEXT",
8    sasl_mechanism="PLAIN",
9    sasl_plain_username=os.environ["KAFKA_USERNAME"],
10    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13avro = Avro(
14    registry="confluent",
15    url=os.environ["SCHEMA_REGISTRY_URL"],
16    username=os.environ["SCHEMA_REGISTRY_USERNAME"],
17    password=os.environ["SCHEMA_REGISTRY_PASSWORD"],
18)
19
20@source(kafka.topic("user", format=avro), disorder="14d", cdc="upsert")
21@dataset
22class SomeDataset:
23    uid: int = field(key=True)
24    email: str
25    timestamp: datetime
Using avro registry with kafka

python

BigQuery

Data connector to Google BigQuery databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

project_id:str

The project ID of the Google Cloud project containing the BigQuery dataset.

dataset_id:str

The ID of the BigQuery dataset containing the table(s) to replicate.

service_account_key:Dict[str, str]

A dictionary containing the credentials for the Service Account to use to access BigQuery. See below for instructions on how to obtain this.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Errors

Connectivity Issues:

Fennel tries to test the connection with your BigQuery during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

BigQuery Credentials

How to obtain credentials?:

Interfacing with BigQuery requires credentials for a Service Account with the "BigQuery User" role at Project level and "BigQuery Data Editor" role at Dataset level. "BigQuery User" role grants permissions to run BigQuery jobs and "BigQuery Data Editor" role grants permissions to read and update table data and its metadata. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.

The easiest way to create a Service Account is to follow GCP's guide for Creating a Service Account. Once you've created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles. Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com

Then, add the service account as a Member of your Google Cloud Project with the "BigQuery User" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.

At this point, you should have a service account with the "BigQuery User" project-level permission.

Similarly, provide the "BigQuery Data Editor" permission to the service account by following Granting Access to Dataset in the Google documentation.

To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.

1from fennel.connectors import source, BigQuery
2from fennel.datasets import dataset
3
4bq = BigQuery(
5    name="my_bigquery",
6    project_id="my_project",
7    dataset_id="my_dataset",
8    service_account_key={
9        "type": "service_account",
10        "project_id": "fake-project-356105",
11        "client_email": "[email protected]",
12        "client_id": "103688493243243272951",
13        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
14        "token_uri": "https://oauth2.googleapis.com/token",
15        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
16    },
17)
18
19table = bq.table("user", cursor="timestamp")
20
21@source(table, disorder="14d", cdc="append")
22@dataset
23class UserClick:
24    uid: int
25    ad_id: int
26    timestamp: datetime

python

Deltalake

Data connector to read data from tables in deltalake living in S3.

Deltalake connector is implemented via s3 connector - just the format parameter needs to be setup as 'delta'.

Warning

Fennel doesn't support reading delta tables from HDFS or any other non-S3 storage.

1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="mys3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11    s3.bucket("data", prefix="user", format="delta"),
12    every="1h",
13    disorder="14d",
14    cdc="upsert",
15)
16@dataset
17class User:
18    uid: int = field(key=True)
19    email: str
20    timestamp: datetime
Sourcing delta tables into Fennel datasets

python

Hudi

Data connector to read data from Apache Hudi tables in S3.

Hudi connector is implemented via s3 connector - just the format parameter needs to be setup as 'hudi'

Warning

Fennel doesn't support reading hudi tables from HDFS or any other non-S3 storage.

1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="mys3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11    s3.bucket("data", prefix="user", format="hudi"),
12    disorder="14d",
13    cdc="upsert",
14    every="1h",
15)
16@dataset
17class User:
18    uid: int = field(key=True)
19    email: str
20    timestamp: datetime
Sourcing hudi tables into Fennel datasets

python

Kafka

Data connector to any data store that speaks the Kafka protocol (e.g. Native Kafka, MSK, Redpanda etc.)

Cluster Parameters

name:str

A name to identify the source. This name should be unique across ALL sources.

bootstrap_servers:str

This is a list of the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself and discover the rest of the brokers in the cluster.

Addresses are written as host & port pairs and can be specified either as a single server (e.g. localhost:9092) or a comma separated list of several servers (e.g. localhost:9092,another.host:9092).

security_protocol:"PLAINTEXT" | "SASL_PLAINTEXT" | "SASL_SSL"

Protocol used to communicate with the brokers.

sasl_mechanism:"PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512"

SASL mechanism to use for authentication.

sasl_plain_username:str

SASL username.

sasl_plain_password:str

SASL password.

Topic Parameters

topic:str

The name of the kafka topic that needs to be sourced into the dataset.

format:"json" | Avro

Default: json

The format of the data in Kafka topic. Both "json" and Avro supported.

1from fennel.connectors import source, Kafka
2from fennel.datasets import dataset, field
3
4kafka = Kafka(
5    name="my_kafka",
6    bootstrap_servers="localhost:9092",  # could come via os env var too
7    security_protocol="SASL_PLAINTEXT",
8    sasl_mechanism="PLAIN",
9    sasl_plain_username=os.environ["KAFKA_USERNAME"],
10    sasl_plain_password=os.environ["KAFKA_PASSWORD"],
11)
12
13@source(kafka.topic("user", format="json"), disorder="14d", cdc="upsert")
14@dataset
15class SomeDataset:
16    uid: int = field(key=True)
17    email: str
18    timestamp: datetime
Sourcing json data from kafka to a dataset

python

1from fennel.connectors import sink
2
3@dataset
4@sink(kafka.topic("gmail_filtered"), cdc="debezium")
5class SomeDatasetFiltered:
6    uid: int = field(key=True)
7    email: str
8    timestamp: datetime
9
10    @pipeline
11    @inputs(SomeDataset)
12    def gmail_filtered(cls, dataset: Dataset):
13        return dataset.filter(
14            lambda row: row["email"].contains("gmail.com")
15        )
Capturing change from a dataset to a Kafka sink

python

Errors

Connectivity problems:

Fennel server tries to connect with the Kafka broker during the commit operation itself to validate connectivity - as a result, incorrect URL/Username/Password etc will be caught at commit time itself as an error.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Kafka can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Kinesis

Data connector to ingest data from AWS Kinesis.

Parameters for Defining Source

name:str

A name to identify the source. The name should be unique across all Fennel sources.

role_arn:str

The arn of the role that Fennel should use to access the Kinesis stream. The role must already exist and Fennel's principal must have been given the permission to assume this role (see below for details or talk to Fennel support if you need help).

Stream Parameters

stream_arn:str

The arn of the Kinesis stream. The corresponding role_arn must have appropriate permissions for this stream. Providing a stream that either doesn't exist or can not be read using the given role_arn will result in an error during the commit operation.

init_position:str | datetime | float | int

The initial position in the stream from which Fennel should start ingestion. See Kinesis ShardIteratorType for more context. Allowed values are:

  • "latest" - start from the latest data (starting a few minutes after commit)
  • "trim_horizon"- start from the oldest data that hasn't been trimmed/expired yet.
  • datetime - start from the position denoted by this timestamp (i.e. equivalent to AT_TIMESTAMP in Kinesis vocabulary).

If choosing the datetime option, the timestamp can be specified as a datetime object, or as an int representing seconds since the epoch, or as a float representing {seconds}.{microseconds} since the epoch or as an ISO-8601 formatted str.

Note that this timestamp is the time attached with the Kinesis message itself at the time of production, not any timestamp field inside the message.

format:"json" | Avro

The format of the data in the Kinesis stream. Most common value is "json" though Fennel also supports Avro.

Errors

Connectivity problems:

Fennel server tries to connect with Kinesis during the commit operation itself to validate connectivity - as a result, incorrect stream/role ARNs or insufficient permissions will be caught at commit time itself as an error.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5    name="my_kinesis",
6    role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10    stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11    init_position=datetime(2023, 1, 5),  # Start ingesting from Jan 5, 2023
12    format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18    uid: int
19    order_id: str
20    amount: float
21    timestamp: datetime
Using explicit timestamp as init position

python

1from fennel.connectors import source, Kinesis
2from fennel.datasets import dataset, field
3
4kinesis = Kinesis(
5    name="my_kinesis",
6    role_arn=os.environ["KINESIS_ROLE_ARN"],
7)
8
9stream = kinesis.stream(
10    stream_arn=os.environ["KINESIS_ORDERS_STREAM_ARN"],
11    init_position="latest",
12    format="json",
13)
14
15@source(stream, disorder="14d", cdc="append")
16@dataset
17class Orders:
18    uid: int
19    order_id: str
20    amount: float
21    timestamp: datetime
Using latest as init position

python

Managing Kinesis Access

Fennel creates a special role with name prefixed by FennelDataAccessRole- in your AWS account for role-based access. The role corresponding to the role_arn passed to Kinesis source should have the following trust policy allowing this special Fennel role to assume the kinesis role.

See Trust Policy

Specify the exact role_arn in the form arn:aws:iam::<fennel-data-plane-account-id>:role/<FennelDataAccessRole-...> without any wildcards.

1{
2    "Version": "2012-10-17",
3    "Statement": [
4        {
5            "Sid": "",
6            "Effect": "Allow",
7            "Principal": {
8                "AWS": [
9                    "<role_arn>"
10                ]
11            },
12            "Action": "sts:AssumeRole"
13        }
14    ]
15}

python

Also attach the following permission policy. Add more streams to the Resource field if more than one streams need to be consumed via this role. Here the account-id is your account where the stream lives.

1{
2  "Version": "2012-10-17",
3  "Statement": [
4    {
5      "Sid": "AllowKinesisAccess",
6      "Effect": "Allow",
7      "Action": [
8        "kinesis:DescribeStream",
9        "kinesis:DescribeStreamSummary",
10        "kinesis:DescribeStreamConsumer",
11        "kinesis:RegisterStreamConsumer",
12        "kinesis:ListShards",
13        "kinesis:GetShardIterator",
14        "kinesis:SubscribeToShard",
15        "kinesis:GetRecords"
16      ],
17      "Resource": [
18        "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>",
19        "arn:aws:kinesis:<region>:<account-id>:stream/<stream-name>/*"
20      ]
21    }
22  ]
23}

python

Pub/Sub

Data connector to Google Pub/Sub messaging service.

Project Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

project_id:str

The project ID of the Google Cloud project containing the Pub/Sub topic

service_account_key:Dict[str, str]

A dictionary containing the credentials for the Service Account to use to access Pub/Sub. See below for instructions on how to obtain this.

Topic Parameters

topic_id:str

The name of the topic from which the data should be ingested.

format:"json"

The format of the data in Pub/Sub topic. Only "json" is supported

Note

Fennel supports only Append and Upsert mode CDC with data in JSON format. If you require support for schema or CDC data format, please reach out to Fennel support.

Errors

Connectivity Issues:

Fennel tries to test the connection with your Pub/Sub topic during commit itself so any connectivity issue (e.g. wrong project_id or credentials etc.) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Pub/Sub Credentials

How to obtain credentials?:

Interfacing with Pub/Sub requires credentials for a Service Account with the "Pub/Sub Subscriber" role, which grants permissions to create subscription and read messages from the subscribed topic. It is highly recommended that this Service Account is exclusive to Fennel for ease of permissions and auditing. However, you can also use a preexisting Service Account if you already have one with the correct permissions.

The easiest way to create a Service Account is to follow GCP's guide for Creating a Service Account. Once you've created the Service Account, make sure to keep its ID handy, as you will need to reference it when granting roles. Service Account IDs typically take the form <account-name>@<project-name>.iam.gserviceaccount.com

Then, add the service account as a Member of your Google Cloud Project with the "Pub/Sub Subscriber" role. To do this, follow the instructions for Granting Access in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.

At this point, you should have a service account with the "Pub/Sub Subscriber" project-level permission.

To obtain a Service Account Key, follow the instructions on Creating a Service Account Key.

1from fennel.connectors import source, PubSub
2from fennel.datasets import dataset, field
3
4pubsub = PubSub(
5    name="pubsub_src",
6    project_id="test_project",
7    service_account_key={
8        "type": "service_account",
9        "project_id": "fake-project-356105",
10        "client_email": "[email protected]",
11        "client_id": "103688493243243272951",
12        "auth_uri": "https://accounts.google.com/o/oauth2/auth",
13        "token_uri": "https://oauth2.googleapis.com/token",
14        "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
15    },
16)
17
18@source(
19    pubsub.topic("test_topic", format="json"), disorder="2d", cdc="upsert"
20)
21@dataset
22class UserClick:
23    uid: int = field(key=True)
24    ad_id: int
25    timestamp: datetime

python

MongoDB

Data connector to MongoDB databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

host:str

The hostname of the database.

db_name:str

The name of the Mongo database to establish a connection with.

username:str

The username which should be used to access the database. This username should have access to the database db_name.

password:str

The password associated with the username.

Note

Fennel uses SRV connection format for authentication which is supported in Mongo versions 3.6 and later. If you have a self-hosted DB with version earlier than 3.6, please reach out to Fennel support.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form db.collection.find({"cursor": { "$gte": last_cursor - disorder } }) to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Warning

It is recommended to put an index on the cursor field so that Fennel ingestion queries don't create too much load on your MongoDB database.

1from fennel.connectors import source, Mongo
2from fennel.datasets import dataset
3
4mongo = Mongo(
5    name="mongo_src",
6    host="atlascluster.ushabcd.mongodb.net",
7    db_name="mongo",
8    username="username",
9    password="password",
10)
11
12collection = mongo.collection("user", cursor="timestamp")
13
14@source(collection, disorder="14d", cdc="append")
15@dataset
16class UserClick:
17    uid: int
18    ad_id: int
19    timestamp: datetime
Sourcing dataset from a mongo collection

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your MongoDB during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

MySQL

Data connector to MySQL databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

host:str

The hostname of the database.

port:Optional[int]

Default: 3306

The port to connect to.

db_name:str

The name of the MySQL database to establish a connection with.

username:str

The username which should be used to access the database. This username should have access to the database db_name.

password:str

The password associated with the username.

jdbc_params:Optional[str]

Default: None

Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. For instance: key1=value1&key2=value2.

Error

If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params to enabledTLSProtocols=TLSv1.2

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Warning

It is recommended to put an index on the cursor field so that Fennel ingestion queries don't create too much load on your MySQL

1from fennel.connectors import source, MySQL
2from fennel.datasets import dataset, field
3
4mysql = MySQL(
5    name="my_mysql",
6    host="my-favourite-mysql.us-west-2.rds.amazonaws.com",
7    port=3306,  # could be omitted, defaults to 3306
8    db_name=os.environ["DB_NAME"],
9    username=os.environ["MYSQL_USERNAME"],
10    password=os.environ["MYSQL_PASSWORD"],
11    jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = mysql.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19    uid: int = field(key=True)
20    email: str
21    created_at: datetime
22    updated_at: datetime = field(timestamp=True)
Sourcing dataset from a mysql table

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your MySQL during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in MySQL is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Postgres

Data connector to Postgres databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

host:str

The hostname of the database.

port:Optional[int]

Default: 5432

The port to connect to.

db_name:str

The name of the Postgres database to establish a connection with.

username:str

The username which should be used to access the database. This username should have access to the database db_name.

password:str

The password associated with the username.

jdbc_params:Optional[str]

Default: None

Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. For instance: key1=value1&key2=value2.

Error

If you see a 'Cannot create a PoolableConnectionFactory' error, try setting jdbc_params to enabledTLSProtocols=TLSv1.2

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Warning

It is recommended to put an index on the cursor field so that Fennel ingestion queries don't create too much load on your Postgres database.

1from fennel.connectors import source, Postgres
2from fennel.datasets import dataset, field
3
4postgres = Postgres(
5    name="my_postgres",
6    host="my-favourite-pg.us-west-2.rds.amazonaws.com",
7    port=5432,  # could be omitted, defaults to 5432
8    db_name=os.environ["DB_NAME"],
9    username=os.environ["POSTGRES_USERNAME"],
10    password=os.environ["POSTGRES_PASSWORD"],
11    jdbc_params="enabledTLSProtocols=TLSv1.2",
12)
13
14table = postgres.table("user", cursor="updated_at")
15
16@source(table, disorder="14d", cdc="upsert", every="1m")
17@dataset
18class User:
19    uid: int = field(key=True)
20    email: str
21    created_at: datetime
22    updated_at: datetime = field(timestamp=True)
Sourcing dataset from a postgres table

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your Postgres during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Postgres is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Redshift

Data connector to Redshift databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

s3_access_role_arn:str

IAM role to be used by Redshift to access S3. Redshift uses S3 as middle-man while executing large queries. Steps to set up IAM role:

  • Create an IAM role by following this documentation. Make sure to provide full access to S3 since we store temporary data in S3 and read from it
  • Associate IAM role with Redshift cluster by following this documentation. Refer to a sample policy below.
db_name:str

The name of the database where the relevant data resides.

host:str

The hostname of the database.

port:Optional[int]

Default: 5439

The port to connect to.

schema:str

The name of the schema where the required data table(s) resides.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

Errors

Connectivity Issues:

Fennel tries to test the connection with your Redshift during commit itself so any connectivity issue (e.g. wrong database name, host name, port, etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Redshift is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

1from fennel.connectors import source, Redshift
2from fennel.datasets import dataset
3
4redshift = Redshift(
5    name="my_redshift",
6    s3_access_role_arn="arn:aws:iam::123:role/Redshift",
7    db_name=os.environ["DB_NAME"],
8    host="test-workgroup.1234.us-west-2.redshift-serverless.amazonaws.com",
9    port=5439,  # could be omitted, defaults to 5439
10    schema="public",
11)
12
13table = redshift.table("user", cursor="timestamp")
14
15@source(table, disorder="14d", cdc="append")
16@dataset
17class UserClick:
18    uid: int
19    ad_id: int
20    timestamp: datetime
Bringing Redshift data into Fennel

python

1{
2    "Version": "2012-10-17",
3    "Statement": [
4        {
5            "Effect": "Allow",
6            "Action": "redshift:DescribeClusters",
7            "Resource": "*"
8        },
9        {
10            "Effect": "Allow",
11            "Action": [
12                "redshift:ModifyClusterIamRoles",
13                "redshift:CreateCluster"
14            ],
15            "Resource": [
16                # Redshift workgroup ARN
17                "arn:aws:redshift-serverless:us-west-2:82448945123:workgroup/0541e0ae-2ad1-4fe0-b2f3-4d6c1d3453e" 
18            ]
19        },
20        {
21            "Effect": "Allow",
22            "Action": "iam:PassRole",
23            "Resource": [
24                # ARN of role created above
25                "arn:aws:iam::82448945123:role/RedshiftS3AccessRole", 
26            ]
27        }
28    ]
29}
Sample IAM policy for integrating with Redshift

JSON

S3

Data connector to source data from S3.

Account Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

aws_access_key_id:Optional[str]

Default: None

AWS Access Key ID. This field is not required if role-based access is used or if the bucket is public.

aws_secrete_access_key:Optional[str]

Default: None

AWS Secret Access Key. This field is not required if role-based access is used or if the bucket is public.

Bucket Parameters

bucket:str

The name of the S3 bucket where the data files exist.

prefix:Optional[str]

Default: None

The prefix of the bucket (as relative path within bucket) where the data files exist. For instance, some-folder/ or A/B/C are all valid prefixes. Prefix can not have any wildcard characters.

Exactly one of prefix or path must be provided.

path:Optional[str]

Default: None

A / delimited path (relative to the bucket) describing the objects to be ingested. The valid path parts are:

  • static string of alphanumeric characters, underscores, hyphens or dots.
  • * wild card - this must be the entire path part: */* is valid but foo*/ is not.
  • string with a strftime format specifier (e.g yyyymmdd=%Y%m%d)

If you have a large volume of data or objects and your bucket is time partitioned, it's highly recommended to include details of time partitioning in your path instead of providing * - Fennel can use this information to optimize the ingestion.

For example, if your bucket has the structure orders/{country}/date={date}/store={store}/{file}.json, provide the path orders/*/date=%Y%m%d/*/*

Exactly one of prefix or path must be provided.

format:str

Default: csv

The format of the files you'd like to ingest. Valid values are "csv", "parquet", "json", "delta" or "hudi".

delimiter:Optional[str]

Default: ,

The character delimiting individual cells in the CSV data - only relevant when format is CSV, otherwise it's ignored.

The default value is "," can be overridden by any other 1-character string. For example, to use tab-delimited data enter "\t".

spread:Optional[Duration]

Default: None

Relevant only when using path with strftime specifiers.

To do incremental ingestion of data from time partitioned S3 bucket, Fennel needs to know what time ranges may be present in any given partition. While not common, sometimes the the timestamp field used to time-partition data in your S3 may be different from the field that you want to be the "official" timestamp field of the Fennel dataset.

In such cases, it's possible that a bucket corresponding to say month=01 contains rows where value of the timestamp field is outside of month=01.

spread is a measure of how wide this gap can be. More formally, spread indicates the maximum difference between the partition interval and the value of the timestamp field for data in that partition. A None value indicates no spread, which is the case when the partitioning scheme uses the same timestamp values as the dataset's timestamp column. spread is specified using Fennel's Duration type.

Examples:

  • Given a path txns/date=20240207/hh=06/ and spread=None, fennel expects all data under this path to have timestamp between 2024-02-07 06:00:00 and 2024-02-07 07:00:00
  • Given a path txns/date=20240207/hh=06/ and spread="3d", fennel expects all data under this path to have a timestamp between 2024-02-04 06:00:00 and 2024-02-10 07:00:00
  • Given a path txns/date=20240207/ and spread="6h", fennel expects all data under this path to have a timestamp between 2024-02-06 18:00:00 and 2024-02-08 06:00:00
1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="mys3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10@source(
11    s3.bucket("datalake", prefix="user"),
12    every="1h",
13    disorder="14d",
14    cdc="upsert",
15)
16@dataset
17class User:
18    uid: int = field(key=True)
19    email: str
20    timestamp: datetime
S3 ingestion via prefix

python

1from fennel.connectors import source, S3
2from fennel.datasets import dataset, field
3
4s3 = S3(
5    name="my_s3",
6    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
7    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
8)
9
10bucket = s3.bucket(
11    "data", path="user/*/date-%Y-%m-%d/*", format="parquet", spread="2d"
12)
13
14@source(bucket, disorder="14d", cdc="upsert", every="1h")
15@dataset
16class User:
17    uid: int = field(key=True)
18    email: str
19    timestamp: datetime
S3 ingestion via path

python

Errors

Connectivity or authentication errors:

Fennel server try to do some lightweight operations on the bucket during the commit operation - all connectivity or authentication related errors should be caught during the commit itself.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in S3 can only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Enabling IAM Access

Fennel creates a role with name prefixed by FennelDataAccessRole- in your AWS account for role-based access. In order to use IAM access for s3, please ensure that this role has access to read and do list files on the buckets of interest.

With that ready, simply don't specify aws_access_key_id and aws_secret_access_key and Fennel will automatically fall back to IAM based access.

Note

Fennel uses file_last_modified property exported by S3 to track what data has been seen so far and hence a cursor field doesn't need to be specified.

Snowflake

Data connector to Snowflake databases.

Database Parameters

name:str

A name to identify the source. The name should be unique across all Fennel sources.

account:str

Snowflake account identifier. This is the first part of the URL used to access Snowflake. For example, if the URL is https://<account>.snowflakecomputing.com, then the account is <account>.

This is usually of the form <ORG_ID>-<ACCOUNT_ID>. Refer to the Snowflake documentation to find the account identifier.

role:str

The role that should be used by Fennel to access Snowflake.

warehouse:str

The warehouse that should be used to access Snowflake.

db_name:str

The name of the database where the relevant data resides.

schema:str

The schema where the required data table(s) resides.

username:str

The username which should be used to access Snowflake. This username should have required permissions to assume the provided role.

password:str

The password associated with the username.

Table Parameters

table:str

The name of the table within the database that should be ingested.

cursor:str

The name of the field in the table that acts as cursor for ingestion i.e. a field that is approximately monotonic and only goes up with time.

Fennel issues queries of the form select * from table where {cursor} >= {last_cursor - disorder} to get data it hasn't seen before. Auto increment IDs or timestamps corresponding to modified_at (vs created_at unless the field doesn't change) are good contenders.

Note that this field doesn't even need to be a part of the Fennel dataset.

1from fennel.connectors import source, Snowflake
2from fennel.datasets import dataset
3
4snowflake = Snowflake(
5    name="my_snowflake",
6    account="VPECCVJ-MUB03765",
7    warehouse="TEST",
8    db_name=os.environ["DB_NAME"],
9    schema="PUBLIC",
10    role="ACCOUNTADMIN",
11    username=os.environ["SNOWFLAKE_USERNAME"],
12    password=os.environ["SNOWFLAKE_PASSWORD"],
13)
14
15table = snowflake.table("User", cursor="timestamp")
16
17@source(table, disorder="14d", cdc="append")
18@dataset
19class UserClick:
20    uid: int
21    ad_id: int
22    timestamp: datetime
Defining and using a snowflake source

python

Errors

Connectivity Issues:

Fennel tries to test the connection with your Snowflake during commit itself so any connectivity issue (e.g. wrong host name, username, password etc) is flagged as as an error during commit with the real Fennel servers.

Note: Mock client can not talk to any external data source and hence is unable to do this validation at commit time.

Schema mismatch errors:

Schema validity of data in Snowflake is checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Webhook

A push-based data connector, making it convenient for sending arbitrary JSON data to Fennel. Data can be pushed to a webhook endpoint either via the REST API or via the Python SDK.

Source Parameters

name:str

A name to identify the source. This name should be unique across all Fennel sources.

retention:Duration

Default: 14d

Data sent to webhook is buffered for the duration retention. That is, if the data has been logged to a webhook, datasets defined later that source from this webhook will still see that data until this duration.

Connector Parameters

endpoint:str

The endpoint for the given webhook to which the data will be sent.

A single webhook could be visualized as a single Kafka cluster with each endpoint being somewhat analogous to a topic. A single webhook source can have as many endpoints as required.

Multiple datasets could be reading from the same webhook endpoint - in which case, they all get the exact same data.

1from fennel.connectors import source, Webhook
2from fennel.datasets import dataset, field
3
4webhook = Webhook(name="prod_webhook", retention="14d")
5
6@source(webhook.endpoint("User"), disorder="14d", cdc="upsert")
7@dataset
8class User:
9    uid: int = field(key=True)
10    email: str
11    timestamp: datetime
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    txid: int
17    uid: int
18    amount: float
19    timestamp: datetime
Two datasets sourcing from endpoints of the same webook

python

1df = pd.DataFrame(
2    {
3        "uid": [1, 2, 3],
4        "email": ["[email protected]", "[email protected]", "[email protected]"],
5        "timestamp": [
6            datetime.now(timezone.utc),
7            datetime.now(timezone.utc),
8            datetime.now(timezone.utc),
9        ],
10    }
11)
12client.log("prod_webhook", "User", df)
Pushing data into webhook via Python SDK

python

1import requests
2
3url = "{}/api/v1/log".format(os.environ["FENNEL_SERVER_URL"])
4headers = {"Content-Type": "application/json"}
5data = [
6    {
7        "uid": 1,
8        "email": "[email protected]",
9        "timestamp": 1614556800,
10    },
11    {
12        "uid": 2,
13        "email": "[email protected]",
14        "timestamp": 1614556800,
15    },
16]
17req = {
18    "webhook": "prod_webhook",
19    "endpoint": "User",
20    "data": data,
21}
22requests.post(url, headers=headers, data=req)
Pushing data into webhook via REST API

python

Errors

Schema mismatch errors:

Schema validity of data only be checked at runtime. Any rows that can not be parsed are rejected. Please keep an eye on the 'Errors' tab of Fennel console after initiating any data sync.

Note

Unlike all other sources, Webhook does work with mock client. As a result, it's very effective for quick prototyping and unit testing.

Average

Aggregation to computes a rolling average for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the average should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type float.

default:float

Average over an empty set of rows isn't well defined - Fennel returns default in such cases.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Average,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    amt: int
18    timestamp: datetime
19
20@dataset(index=True)
21class Aggregated:
22    uid: int = field(key=True)
23    avg_1d: float
24    avg_1w: float
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def avg_pipeline(cls, ds: Dataset):
30        return ds.groupby("uid").aggregate(
31            avg_1d=Average(of="amt", window="1d", default=-1.0),
32            avg_1w=Average(of="amt", window="1w", default=-1.0),
33        )
Average in rolling window of 1 day & 1 week

python

Returns

float

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Average on non int/float types:

The input column denoted by of must either be of int or float types.

Note that unlike SQL, even aggregations over Optional[int] or Optional[float] aren't allowed.

Output and/or default aren't float:

The type of the field denoted by into_field in the output dataset and that of default should both be float.

1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    zip: str
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    avg_1d: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def invalid_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            avg_1d=Average(of="zip", window="1d", default="avg"),
25        )
Can not take average over string, only int or float

python

1from fennel.datasets import dataset, field, pipeline, Dataset, Average
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amt: float
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    # output of avg is always float
18    ret: int
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def invalid_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            ret=Average(of="amt", window="1d", default=1.0),
26        )
Invalid type: ret is int but should be float

python

Count

Aggregation to compute a rolling count for each group within a window.

Parameters

window:Window

The continuous window within which something need to be counted. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int.

unique:bool

Default: False

If set to True, the aggregation counts the number of unique values of the field given by of (aka COUNT DISTINCT in SQL).

approx:bool

Default: False

If set to True, the count isn't exact but only an approximation. This field must be set to True if and only if unique is set to True.

Fennel uses hyperloglog data structure to compute unique approximate counts and in practice, the count is exact for small counts.

of:Optional[str]

Name of the field in the input dataset which should be used for unique. Only relevant when unique is set to True.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Count,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    vendor: str
18    amount: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    num_transactions: int
25    unique_vendors_1w: int
26    timestamp: datetime
27
28    @pipeline
29    @inputs(Transaction)
30    def count_pipeline(cls, ds: Dataset):
31        return ds.groupby("uid").aggregate(
32            num_transactions=Count(window="forever"),
33            unique_vendors_1w=Count(
34                of="vendor",
35                unique=True,
36                approx=True,
37                window="1w",
38            ),
39        )
Count # of transaction & distinct vendors per user

python

Returns

int

Accumulates the count in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0.

Errors

Count unique on unhashable type:

The input column denoted by of must have a hashable type in order to build a hyperloglog. For instance, float or types built on float aren't allowed.

Unique counts without approx:

As of right now, it's a commit error to try to compute unique count without setting approx to True.

Warning

Maintaining unique counts is substantially more costly than maintaining non-unique counts so use it only when truly needed.

Distinct

Aggregation to computes a set of distinct values for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the distinct set should be computed. This field must be of any hashable type (e.g. floats aren't allowed)

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type List[T] where T is the type of the field denoted by of.

unordered:float

If set to True, the list is sorted by natural comparison order. However, as of right now, this must be set to False since ordered mode isn't supported yet.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Distinct,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    amount: int
18    timestamp: datetime
19
20@dataset(index=True)
21class Aggregated:
22    uid: int = field(key=True)
23    amounts: List[int]
24    timestamp: datetime
25
26    @pipeline
27    @inputs(Transaction)
28    def distinct_pipeline(cls, ds: Dataset):
29        return ds.groupby("uid").aggregate(
30            amounts=Distinct(
31                of="amount",
32                window="1d",
33                unordered=True,
34            ),
35        )
Distinct in window of 1 day

python

Returns

List[T]

Stores the result of the aggregation in the appropriate column of the output dataset which must be of type List[T] where T is the type of the input column.

Errors

Computing distinct for non-hashable types:

Distinct operator is a lot like building a hashmap - for it to be valid, the underlying data must be hashable. Types like float (or any other complex type built using float) aren't hashable - so a commit error is raised.

Warning

Storing the full set of distinct values can get costly so it's recommended to use Distinct only for sets of small cardinality (say < 100)

1from fennel.datasets import dataset, field, pipeline, Dataset, Distinct
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    amounts: int  # should be List[int]
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            amounts=Distinct(
25                of="amount",
26                limit=10,
27                unordered=True,
28            ),
29        )
amounts should be of type List[int], not int

python

LastK

Aggregation to computes a rolling list of the latest values for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the aggregation should be computed.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type List[T] where T is the type of the field denoted by of.

limit:int

Since storing all the values for a group can get costly, LastK expects a limit to be specified which denotes the maximum size of the list that should be maintained at any point.

dedup:bool

If set to True, only distinct values are stored else values stored in the last can have duplicates too.

1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset(index=True)
15class Aggregated:
16    uid: int = field(key=True)
17    amounts: List[int]
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def lastk_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            amounts=LastK(
25                of="amount",
26                limit=10,
27                dedup=False,
28                window="1d",
29            ),
30        )
LastK in window of 1 day

python

Returns

List[T]

Stores the result of the aggregation in the appropriate field of the output dataset.

Errors

Incorrect output type:

The column denoted by into_field in the output dataset must be of type List[T] where T is the type of the column denoted by of in the input dataset. Commit error is raised if this is not the case.

Warning

Storing the full set of values and maintaining order between them can get costly, so use this aggregation only when needed.

1from fennel.datasets import dataset, field, pipeline, Dataset, LastK
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amount: int
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    amounts: int  # should be List[int]
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def bad_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            amounts=LastK(
25                of="amount",
26                limit=10,
27                dedup=False,
28                window="1d",
29            ),
30        )
amounts should be of type List[int], not int

python

Max

Aggregation to computes a rolling max for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the max should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int or float - same as the type of the field in the input dataset corresponding to of.

default:Union[int, float]

Max over an empty set of rows isn't well defined - Fennel returns default in such cases. The type of default must be same as that of of in the input dataset.

1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amt: float
12    timestamp: datetime
13
14@dataset(index=True)
15class Aggregated:
16    uid: int = field(key=True)
17    max_1d: float
18    max_1w: float
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def def_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            max_1d=Max(of="amt", window="1d", default=-1.0),
26            max_1w=Max(of="amt", window="1w", default=-1.0),
27        )
Maximum in rolling window of 1 day & 1 week

python

Returns

Union[int, float]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Max on non int/float types:

The input column denoted by of must either be of int or float types.

Note that unlike SQL, even aggregations over Optional[int] or Optional[float] aren't allowed.

Types of input, output & default don't match:

The type of the field denoted by into_field in the output dataset and that of default should be same as that of the field field denoted by of in the input dataset.

1from fennel.datasets import dataset, field, Dataset, pipeline, Max
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    zip: str
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    max: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def def_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            max=Max(of="zip", window="1d", default="max"),
25        )
Can not take max over string, only int or float

python

1from fennel.datasets import dataset, field, Dataset, Max, pipeline
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amt: float
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    max_1d: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def def_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            max_1d=Max(of="amt", window="1d", default=1),
25        )
amt is float but max_1d is int

python

Min

Aggregation to computes a rolling min for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the min should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int or float - same as the type of the field in the input dataset corresponding to of.

default:Union[int, float]

Min over an empty set of rows isn't well defined - Fennel returns default in such cases. The type of default must be same as that of of in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amt: float
12    timestamp: datetime
13
14@dataset(index=True)
15class Aggregated:
16    uid: int = field(key=True)
17    min_1d: float
18    min_1w: float
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def min_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            min_1d=Min(of="amt", window="1d", default=-1.0),
26            min_1w=Min(of="amt", window="1w", default=-1.0),
27        )
Minimum in rolling window of 1 day & 1 week

python

Returns

Union[int, float]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Min on non int/float types:

The input column denoted by of must either be of int or float types.

Note that unlike SQL, even aggregations over Optional[int] or Optional[float] aren't allowed.

Types of input, output & default don't match:

The type of the field denoted by into_field in the output dataset and that of default should be same as that of the field field denoted by of in the input dataset.

1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    zip: str
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    min: str
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def invalid_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            min=Min(of="zip", window="1d", default="min"),
25        )
Can not take min over string, only int or float

python

1from fennel.datasets import dataset, field, pipeline, Dataset, Min
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
8@dataset
9class Transaction:
10    uid: int
11    amt: float
12    timestamp: datetime
13
14@dataset
15class Aggregated:
16    uid: int = field(key=True)
17    min_1d: int
18    timestamp: datetime
19
20    @pipeline
21    @inputs(Transaction)
22    def invalid_pipeline(cls, ds: Dataset):
23        return ds.groupby("uid").aggregate(
24            min_1d=Min(of="amt", window="1d", default=1),
25        )
amt is float but min_1d is int

python

Stddev

Aggregation to computes a rolling standard deviation for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the aggregation should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type float.

default:float

Standard deviation over an empty set of rows isn't well defined - Fennel returns default in such cases.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Average,
7    Stddev,
8)
9from fennel.lib import inputs
10from fennel.connectors import source, Webhook
11
12webhook = Webhook(name="webhook")
13
14@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
15@dataset
16class Transaction:
17    uid: int
18    amt: int
19    timestamp: datetime
20
21@dataset(index=True)
22class Aggregated:
23    uid: int = field(key=True)
24    mean: float
25    stddev: float
26    timestamp: datetime
27
28    @pipeline
29    @inputs(Transaction)
30    def stddev_pipeline(cls, ds: Dataset):
31        return ds.groupby("uid").aggregate(
32            mean=Average(of="amt", window="1d", default=-1.0),
33            stddev=Stddev(of="amt", window="1d", default=-1.0),
34        )
Standard deviation in window of 1 day & week

python

Returns

float

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Stddev on non int/float types:

The input column denoted by of must either be of int or float types.

Note that unlike SQL, even aggregations over Optional[int] or Optional[float] aren't allowed.

Output and/or default aren't float:

The type of the field denoted by into_field in the output dataset and that of default should both be float.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Stddev,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    zip: str
18    timestamp: datetime
19
20@dataset
21class Aggregated:
22    uid: int = field(key=True)
23    var: str
24    timestamp: datetime
25
26    @pipeline
27    @inputs(Transaction)
28    def invalid_pipeline(cls, ds: Dataset):
29        return ds.groupby("uid").aggregate(
30            var=Stddev(of="zip", window="1d", default="x"),
31        )
Can not take stddev over string, only int or float

python

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Stddev,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    amt: float
18    timestamp: datetime
19
20@dataset
21class Aggregated:
22    uid: int = field(key=True)
23    ret: int
24    timestamp: datetime
25
26    @pipeline
27    @inputs(Transaction)
28    def invalid_pipeline(cls, ds: Dataset):
29        return ds.groupby("uid").aggregate(
30            ret=Stddev(of="amt", window="1d", default=1.0),
31        )
Invalid type: ret is int but should be float

python

Sum

Aggregation to compute a rolling sum for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the sum should be computed. This field can only either be int or `float.

window:Window

The continuous window within which something need to be counted. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type int or float - same as the type of the field in the input dataset corresponding to of.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Sum,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    amount: int
18    timestamp: datetime
19
20@dataset(index=True)
21class Aggregated:
22    uid: int = field(key=True)
23    # new int fields added to the dataset by the count aggregation
24    amount_1w: int
25    total: int
26    timestamp: datetime
27
28    @pipeline
29    @inputs(Transaction)
30    def sum_pipeline(cls, ds: Dataset):
31        return ds.groupby("uid").aggregate(
32            amount_1w=Sum(of="amount", window="1w"),
33            total=Sum(of="amount", window="forever"),
34        )
Sum up amount in 1 week and forever windows

python

Returns

Union[int, float]

Accumulates the count in the appropriate field of the output dataset. If there are no rows to count, by default, it returns 0 (or 0.0 if of is float).

Errors

Sum on non int/float types:

The input column denoted by of must either be of int or float types.

Note that unlike SQL, even aggregations over Optional[int] or Optional[float] aren't allowed.

1from fennel.datasets import dataset, field, pipeline, Dataset, Sum
2from fennel.lib import inputs
3from fennel.connectors import source, Webhook
4
5webhook = Webhook(name="webhook")
6
7@source(
8    webhook.endpoint("Transaction"), disorder="14d", cdc="append"
9)
10@dataset
11class Transaction:
12    uid: int
13    amount: str
14    vendor: str
15    timestamp: datetime
16
17@dataset
18class Aggregated:
19    uid: int = field(key=True)
20    total: int
21    timestamp: datetime
22
23    @pipeline
24    @inputs(Transaction)
25    def bad_pipeline(cls, ds: Dataset):
26        return ds.groupby("uid").aggregate(
27            total=Sum(of="vendor", window="forever"),
28        )
Can only sum up int or float types

python

Quantile

Aggregation to compute rolling quantiles (aka percentiles) for each group within a window.

Parameters

of:str

Name of the field in the input dataset over which the quantile should be computed. This field must either be of type int or float.

window:Window

The continuous window within which aggregation needs to be computed. Possible values are "forever" or any time duration.

into_field:str

The name of the field in the output dataset that should store the result of this aggregation. This field is expected to be of type Optional[float] unless default is provided, in which case, it is expected to be of type float.

default:Optional[float]

Quantile over an empty set of rows isn't well defined - Fennel returns default in such cases. If the default is not set or is None, Fennel returns None and in that case, the expected type of into_field must be Optional[float].

p:float

The percentile (between 0 and 1) to be calculated.

approx:bool

Default: False

If set to True, the calculated value isn't exact but only an approximation. Fennel only supports approximate quantiles for now so this kwarg must always be set to True.

Fennel uses uDDsketch data structure to compute approximate quantiles with an error bound set to be within 1% of the true value.

1from fennel.datasets import (
2    dataset,
3    field,
4    pipeline,
5    Dataset,
6    Quantile,
7)
8from fennel.lib import inputs
9from fennel.connectors import source, Webhook
10
11webhook = Webhook(name="webhook")
12
13@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
14@dataset
15class Transaction:
16    uid: int
17    amount: int
18    timestamp: datetime
19
20@dataset(index=True)
21class Aggregated:
22    uid: int = field(key=True)
23    # new float fields added to the dataset by the quantile aggregation
24    median_amount_1w: float
25    timestamp: datetime
26
27    @pipeline
28    @inputs(Transaction)
29    def quantil_pipeline(cls, ds: Dataset):
30        return ds.groupby("uid").aggregate(
31            median_amount_1w=Quantile(
32                of="amount",
33                window="1w",
34                p=0.5,
35                approx=True,
36                default=0.0,
37            ),
38        )
Median in rolling windows of 1 day & 1 week

python

Returns

Union[float, Optional[float]]

Stores the result of the aggregation in the appropriate field of the output dataset. If there are no rows in the aggregation window, default is used.

Errors

Quantile on non int/float types:

The input column denoted by of must either be of int or float types.

Note that unlike SQL, even aggregations over Optional[int] or Optional[float] aren't allowed.

Types of output & default don't match:

The type of the field denoted by into_field in the output dataset should match the default. If default is set and not None, the field should be float else it should be Optional[float].

Invalid p value:

Commit error if the value of p is not between 0 and 1.

Approximate is not set to true:

Commit error if approx is not set to True. Fennel only supports approximate quantiles for now but requires this kwarg to be set explicitly to both set the right expectations and be compatible with future addition of exact quantiles.

1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: str
13    timestamp: datetime
14
15@dataset
16class Aggregated:
17    uid: int = field(key=True)
18    median_amount_1w: float
19    timestamp: datetime
20
21    @pipeline
22    @inputs(Transaction)
23    def bad_pipeline(cls, ds: Dataset):
24        return ds.groupby("uid").aggregate(
25            median_amount_1w=Quantile(
26                of="amount",
27                window="1w",
28                p=0.5,
29                approx=True,
30                default=0.0,
31            ),
32        )
Can not take quantile over string, only int or float

python

1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    vendor: str
14    timestamp: datetime
15
16@dataset
17class Aggregated:
18    uid: int = field(key=True)
19    median_amount_1w: float
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def bad_pipeline(cls, ds: Dataset):
25        return ds.groupby("uid").aggregate(
26            median_amount_1w=Quantile(
27                of="amount",
28                window="1w",
29                p=0.5,
30                approx=True,
31            ),
32        )
Default is not specified, so the output field should be Optional[float]

python

1from fennel.datasets import dataset, field, pipeline
2from fennel.datasets import Dataset, Quantile
3from fennel.lib import inputs
4from fennel.connectors import source, Webhook
5
6webhook = Webhook(name="webhook")
7
8@source(webhook.endpoint("txn"), disorder="14d", cdc="append")
9@dataset
10class Transaction:
11    uid: int
12    amount: int
13    vendor: str
14    timestamp: datetime
15
16@dataset
17class Aggregated:
18    uid: int = field(key=True)
19    median_amount_1w: Optional[float]
20    timestamp: datetime
21
22    @pipeline
23    @inputs(Transaction)
24    def bad_pipeline(cls, ds: Dataset):
25        return ds.groupby("uid").aggregate(
26            median_amount_1w=Quantile(
27                of="amount",
28                window="1w",
29                p=10.0,
30                approx=True,
31            ),
32        )
p is invalid, can only be between [0, 1]

python

Source

All Fennel sources are wrapped in the @source decorator applied on top of the datasets. This decorator specifies a bunch of options to configure the ingestion mechanism that apply to most data sources.

Parameters

every:Duration

Default: "1h"

The frequency with which the ingestion should be carried out. Streaming sources like Kafka, Kinesis, Webhook ignore it since they do continuous polling.

Note that some Fennel sources make multiple round-trips of limited size in a single iteration so as to not overload the system - every only applies across full iterations of ingestion.

since:Optional[datetime]

Default: None

When since is set, the source only admits those rows that where the value corresponding to the timestamp column of the dataset will be >= since.

Fennel reads as little data as possible given this constraint - for instance, when reading parquet files, the filter is pushed all the way down. However, in several cases, it's still necessary to read all the data before rejecting rows that are older than since.

until:Optional[datetime]

Default: None

When until is set, the source only admits those rows that where the value corresponding to the timestamp column of the dataset will be < until.

Fennel reads as little data as possible given this constraint - for instance, when reading parquet files, the filter is pushed all the way down. However, in several cases, it's still necessary to read all the data before rejecting rows that are newer than until.

disorder:Duration

Specifies how out of order can data from this source arrive.

Analogous to MaxOutOfOrderness in Flink, this provides Fennel a guarantee that if some row with timestamp t has arrived, no other row with timestamp < t-disorder can ever arrive. And if such rows do arrive, Fennel has the liberty of discarding them and not including them in the computation.

cdc:"append" | "native" | "debezium"

Specifies how should valid change data be constructed from the ingested data.

"append" means that data should be interpreted as sequence of append operations with no deletes and no updates. Append can only be applied to keyless datasets ( to prevent situations where multiple inserts arrive with the same key fields). As of right now, all SQL sources, Kafka, Kinesis, S3, and webhook support append mode.

"upsert" means that incoming data will only have inserts but should be interpreted as sequence of upsert operations. It can only be used for keyed datasets and works for every source where append works. Note that in order to support "upsert", Fennel needs to maintain the last seen row for each key which has some overhead. As a result, pre-prepared "debezium" data should be preferred over "upsert".

"native" means that the underlying system exposes CDC natively and that Fennel should tap into that. As of right now, native CDC is only available for Deltalake & Hudi and will soon be available for more sources including MySQL and Postgres.

"debezium" means that the raw data itself is laid out in debezium layout out of which valid CDC data can be constructed. This is only possible for sources that expose raw schemaless data, namely, s3, kinesis, kafka, and webhook.

env:None | str | List[str]

Default: None

When present, marks this source to be selected during commit only when commit operation itself is made for a env that matches this env. Primary use case is to decorate a single dataset with many @source decorators and choose only one of them to commit depending on the environment.

preproc:Optional[Dict[str, Union[Ref, Any]]]

Default: None

When present, specifies the preproc behavior for the columns referred to by the keys of the dictionary.

As of right now, there are two kinds of values of preproc:

  • ref: Ref: written as ref(str) and means that the column denoted by the key of this value is aliased to another column in the sourced data. This is useful, for instance, when you want to rename columns while bringing them to Fennel.

  • Any: means that the column denoted by the key of this value should be given a constant value.

bounded:bool

Default: False

When not set or set as False, it indicates that the source possesses infinite amount of data that is continuously increasing.

When set as True, it indicates that the source possesses finite amount of data and that it will exhaust at some point. In such cases, idleness must also be set.

idleness:Optional[Duration]

Default: None

Only relevant when bounded is set to True - in such cases, the bounded source is assumed to have exhausted after Fennel is unable to obtain any new data despite continuously asking for at least idleness period.

1from fennel.connectors import source, S3, ref
2from fennel.datasets import dataset, field
3
4s3 = S3(name="my_s3")  # using IAM role based access
5
6bucket = s3.bucket("data", path="user/*/date-%Y-%m-%d/*", format="parquet")
7
8@source(
9    bucket,
10    every="1h",
11    cdc="upsert",
12    disorder="2d",
13    since=datetime(2021, 1, 1, 3, 30, 0),  # 3:30 AM on 1st Jan 2021
14    until=datetime(2022, 1, 1, 0, 0, 0),  # 12:00 AM on 1st Jan 2022
15    preproc={
16        "uid": ref("user_id"),  # 'uid' comes from column 'user_id'
17        "country": "USA",  # country for every row should become 'USA'
18    },
19    env="prod",
20    bounded=True,
21    idleness="1h",
22)
23@dataset
24class User:
25    uid: int = field(key=True)
26    email: str
27    country: str
28    timestamp: datetime
Specifying options in source decorator

python

Expectations

Fennel's type system lets one maintain data integrity by rejecting data that does not conform to its types. However, there are cases where one may want to accept data that does not conform to the types, but still monitor how often the data does not conform to the types. For this, Fennel provides the ability to specify expectations on the data.

Fennel internally relies on Great Expectations to help users easily specify data expectations. Fennel's expectations are a subset of Great Expectations expectations and are documented below, but the api to specify expectations is the same.

Expectation Types


Single Column Expectations

The following expectations operate on a single column at a time.

  1. expect_column_values_to_not_be_null

    Expect the column values to not be null. To be counted as an exception, values must be explicitly null or missing, such as np.nan. Empty strings don't count as null unless they have been coerced to a null type.

    Parameters:

    • column (str) – The column name.
  2. expect_column_values_to_be_null

    Expect the column values to be null. It is the inverse of expect_column_values_to_not_be_null.

    Parameters:

    • column (str) – The column name.
  3. expect_column_values_to_be_of_type

    Expect a column to contain values of a specified data type.

    Parameters:

    • column (str) – The column name.
    • type_ (str) – The expected data type of the column values.
  4. expect_column_values_to_be_in_type_list

    Expect a column to contain values of one of several specified data types.

    Parameters:

    • column (str) – The column name.
    • type_list (list) – A list of expected data types of the column values.
  5. expect_column_values_to_be_in_set

    Expect each column value to be in a given set.
    Parameters:

    • column (str) – The column name.
    • value_set (list) – A set of objects used for comparison.
  6. expect_column_values_to_not_be_in_set

    Expect each column value to not be in a given set.

    Parameters:

    • column (str) – The column name.
    • value_set (list) – A set of objects used for comparison.
  7. expect_column_values_to_be_between

    Expect column values to be between a minimum value and a maximum value.

    Parameters:

    • column (str) – The column name.
    • min_value (int) – The minimum value for a column entry.
    • max_value (int) – The maximum value for a column entry.
    • strict_min (bool) – If True, the column values must be strictly larger than min_value.
    • strict_max (bool) – If True, the column values must be strictly smaller than max_value.
  8. expect_column_value_lengths_to_be_between

    Expect the lengths of column values to be between a minimum value and a maximum value.

    Parameters:

    • column (str) – The column name.
    • min_value (int) – The minimum value for a column entry length.
    • max_value (int) – The maximum value for a column entry length.
  9. expect_column_value_lengths_to_equal

    Expect the lengths of column values to equal a given value.

    Parameters:

    • column (str) – The column name.
    • value (int) – The expected length of column values.
  10. expect_column_values_to_match_regex

    Expect column entries to be strings that match a given regular expression. .

    Parameters:

    • column (str) – The column name.
    • value (int) – The expected length of column values.
  11. expect_column_values_to_not_match_regex

    Expect the lengths of column values to equal a given value.

    Parameters:

    • column (str) – The column name.
    • value (int) – The expected length of column values.
  12. expect_column_values_to_match_regex_list

    Expect column entries to be strings that match at least one of a list of regular expressions.

    Parameters:

    • column (str) – The column name.
    • regex_list (list) – The list of regular expressions that each column entry should match at least one of.
  13. expect_column_values_to_not_match_regex_list

    Expect column entries to be strings that do not match any of a list of regular expressions.

    Parameters:

    • column (str) – The column name.
    • regex_list (list) – The list of regular expressions that each column entry should not match any of.
  14. expect_column_values_to_match_strftime_format

    Expect column entries to be strings representing a date or time with a given format.

    Parameters:

    • column (str) – The column name.
    • strftime_format (str) – The strftime format that each column entry should match.
  15. expect_column_values_to_be_dateutil_parseable

    Expect column entries to be parseable using dateutil.

    Parameters:

    • column (str) – The column name.
  16. expect_column_values_to_be_json_parseable

    Expect column entries to be parseable as JSON.

    Parameters:

    • column (str) – The column name.
  17. expect_column_values_to_match_json_schema

    Expect column entries to match a given JSON schema.

    Parameters:

    • column (str) – The column name.
    • json_schema (dict) – The JSON schema that each column entry should match.

Multi Column Expectations

The following expectations require two or more columns.

  1. expect_column_pair_values_to_be_equal

    Expect the values in a column to be the exact same as the values in another column.

    Parameters:

    • column_A (str) – The first column name.
    • column_B (str) – The second column name.
    • ignore_row_if (str) – Control how null values are handled. See ignore_row_if for details.
  2. expect_column_pair_values_A_to_be_greater_than_B

    Expect the values in column A to be greater than the values in column B.

    Parameters:

    • column_A (str) – The first column name.
    • column_B (str) – The second column name.
    • or_equal (bool) – If True, then values can be equal, not strictly greater than.
  3. expect_column_pair_values_to_be_in_set

    Expect the values in a column to belong to a given set.

    Parameters:

    • column_A (str) – The first column name.
    • column_B (str) – The second column name.
    • value_pairs_set (set) – A set of tuples describing acceptable pairs of values. Each tuple should have two elements, the first from column A and the second from column B.
  4. expect_multicolumn_sum_to_equal

    Expect the sum of multiple columns to equal a specified value.

    Parameters:

    • column_list (list) – The list of column names to be summed.
    • sum_total (int) – The expected sum of the columns.


Example

1from fennel.datasets import dataset
2from fennel.lib import (
3    expectations,
4    expect_column_values_to_be_between,
5    expect_column_values_to_be_in_set,
6    expect_column_pair_values_A_to_be_greater_than_B,
7)
8from fennel.dtypes import between
9
10
11@dataset
12class Sample:
13    passenger_count: between(int, 0, 100)
14    gender: str
15    age: between(int, 0, 100, strict_min=True)
16    mothers_age: between(int, 0, 100, strict_min=True)
17    timestamp: datetime
18
19    @expectations
20    def my_function(cls):
21        return [
22            expect_column_values_to_be_between(
23                column=str(cls.passenger_count),
24                min_value=1,
25                max_value=6,
26                mostly=0.95,
27            ),
28            expect_column_values_to_be_in_set(
29                str(cls.gender), ["male", "female"], mostly=0.99
30            ),
31            # Pairwise expectation
32            expect_column_pair_values_A_to_be_greater_than_B(
33                column_A=str(cls.age), column_B=str(cls.mothers_age)
34            ),
35        ]

python

Subscribe for updates:

You can unsubscribe at any time.

Fennel

© 2024 Fennel