Skip to content

PrimeLabCore/Primelab-ETL

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Primelab ETL and Explore API

Background:

We have developed a highly available system for both relational and graph analysis of blockchain data. We leverage the improved Primelab Near Indexer to create a Debezium based Kafka message queue for the most critical tables in the Postgres instance. Accounts, Transactions, Receipts, Action Receipt Actions. These tables are streamed in realtime to the pipeline where they are transformed, joined and stored once again in the Redshift Data Lake.

💡 **Alternatively** you may choose to also write these to and in any combination of DynamoDB, neo4jAura, postgres, Redshift.

Basic Chart

Basic Chart

Data Structure

Primelabs contracts enrich transaction data with JSON arguments that facilitate easier analysis. Below we will outline some concepts to familiarize yourself with.

NearApps Tags

For the contracts methods, they are always used in the following format:

  • nearapps_tags: object - The tags' hash information.
    • app_id: string - App tag hash.
    • action_id: string - Action number hash.
    • user_id: string - User account_id tag hash.

And their standalone json representation (example) is:

1{
2    "nearapps_tags": {
3        "app_id": "fbf4cd3",
4        "action_id": "4355a4",
5        "user_id": "03a07d"
6    }
7}

Account Creation

method: create_account

Parameters new_account_id: string - the account_id that is being created

new_public_key: optional string - the new account owner public key, in base58 with an optional {header}: as prefix. Can be a Ed25519 or a Secp256k1 public key. On a missing prefix, ed25519: is assumed. This value may be generated by the user. If missing, defaults to the transaction signer's public key.

Returns account_created: boolean - whether the account was successfully created.

Sample

{ }

File Contract

method: new

description: Initializes the contract.

Parameters

  • owner: string - the account_id of who will own the contract

Returns

Has no returns.

transaction hash A96H8pRQs7t7Q7PwF6sRv8Wcw5gxaAgaZwdGNiKoi5dA

1{"args_json": {"owner_id": "dev-1649182007824-64888721472108"},
2"args_base64": "eyJvd25lcl9pZCI6ImRldi0xNjQ5MTgyMDA3ODI0LTY0ODg4NzIxNDcyMTA4In0=",
3"deposit": "0",
4"gas": 30000000000000,
5"method_name": "new"
6}

Parameters new_account_id: string - the account_id that is being created

new_public_key: optional string - the new account owner public key, in base58 with an optional {header}: as prefix. Can be a Ed25519 or a Secp256k1 public key. On a missing prefix, ed25519: is assumed. This value may be generated by the user. If missing, defaults to the transaction signer's public key.

Returns account_created: boolean - whether the account was successfully created.

Sample

{ }

NFT

Series

A token has various characteristics, such as it's title (which must be unique on the nft contract). A series serves as a setting that decides, on a token creation/minting, what that token name will be.

One way of understanding a series is by comparing them to directories: A newly created series is like an empty directory, and the tokens that are minted "under" that series are files that get's placed inside of that directory.For example, a directory named "D" may have two files inside of it, named "F1" and "F2". The complete path for those files would be D/F1 and D/F2, and so it can be observed that the complete path is made both of the directory name and the file name.The series work in a simillar manner: Each series has a title and a number (which is an index that gets incremented for each series), and when a token gets minted under that series, the complete/final "title" of that token will be created from the informations of it's series. For example, a series named "S" that has two tokens created under it, the tokens will be named S:0:0 and S:0:1, where they are, respectivey, the series name, the series index and the token index on the series (the first token that got created is 0, and the second is 1).

Once a token has been minted under a series (which simply decided on the token title), that token will behave and function like any standard non-fungible token.

Note that simply creating series won't create any tokens. The tokens still need to be explicitly minted.

Quantity of Minted Tokens

Each token that gets minted under a series has an index, which gets incremented for each token. In this way, each token under the same series can be differentiated from one another.

Also related to the quantity of tokens under a series, the series have a setting that can limit how many tokens can be minted under them, and an error will be returned when trying to mint a token when the series is already at a maximum capacity.

Interface

methods:

  • new
  • new_default_meta
  • get_owner
  • change_owner
  • nft_mint
  • nft_series_create
  • nft_series_mint
  • nft_series_supply
  • nft_series_get
  • nft_series_get_minted_tokens_vec
  • nft_series_set_mintable
  • nft_series_set_capacity
  • nft_transfer_logged
  • nft_transfer_call_logged
  • nft_token
  • nft_approve_logged
  • nft_revoke_logged
  • nft_revoke_all_logged
  • nft_is_approved
  • nft_total_supply
  • nft_tokens
  • nft_supply_for_owner
  • nft_tokens_for_owner
  • nft_metadata

Initialization

method: newdescription: Initializes the NftSeries contract.

Parameters

  • owner_id: string - the account_id of who will own the contract
  • metadata: object - the standard nft metadata
    • spec: stirng - eg. "nft-1.0.0"
    • name: string - eg. "Mosaics"
    • symbol: string - eg. "MOSAIC"
    • icon: optional string - data URL
    • base_uri: optional string - centralized gateway known to have reliable access to decentralized storage assets referenced by reference or media URLs
    • reference: optional string - URL to a JSON file with more info
    • reference_hash: optional string - base64-encoded sha256 hash of JSON from reference field. Required if reference is included.
  • nearapps_logger: string - the account_id of which contract will log the nearapps_tags.

Json Example

1{
2    "owner_id": "executor",
3    "metadata": {
4        "spec": "nft-1.0.0",
5        "name": "Mosaics",
6        "symbol": "MOSAIC",
7        "icon": null,
8        "base_uri": null,
9        "reference": null,
10        "reference_hash": null
11    },
12    "nearapps_logger": "executor"
13}
14

Returns

Has no returns.

Initialization with a default Meta

method: new_default_metadescription: Initializes the NftSeries contract, using a dummy nft contract metadata.

Parameters

  • owner_id: string - the account_id of who will own the contract
  • nearapps_logger: string - the account_id of which contract will log the nearapps_tags.

Json Example

1{
2    "owner_id": "executor",
3    "nearapps_logger": "executor"
4}
5

Returns

Has no returns.

Owners Management

methods:

  • get_owner
  • change_owner

Get Owner

method: get_ownerdescription: Gets the contract's owner.

Parameters

Has no parameters.

Returns

  • owner: string - the account_id of the owner.

Change Owner

method: change_ownerdescription: Changes the contract's owner.

Parameters

new_owner: string - the account_id of the new owner.

Json Example

1{
2    "new_owner": "new-executor"
3}
4

Returns

Has no returns.

NFT Minting

method: nft_mintdescription: Creates a new nft token. The token_id cannot contain the series delimiter character, which is :.

Parameters

  • token_id: string - the name of the token. Cannot contain the series delimiter (:).
  • token_owner_id: string - the account_id of who will receive the token.
  • token_metadata: object - the standard nft token metadata.
    • title: optional string - the title of the token, eg. "Arch Nemesis: Mail Carrier" or "Parcel #5055".
    • description: optional string - free-form description.
    • media: optional string - URL to associated media, preferably to decentralized, content-addressed storage.
    • media_hash: optional stirng - Base64-encoded sha256 hash of content referenced by the media field. Required if media is included.
    • copies: optional string - number of copies of this set of metadata in existence when token was minted.
    • issued_at: optional string - ISO 8601 datetime when token was issued or minted.
    • expires_at: optional string - ISO 8601 datetime when token expires.
    • starts_at: optional string - ISO 8601 datetime when token starts being valid. -updated_at: optional string - ISO 8601 datetime when token was last updated.
    • extra: optional string - anything extra the NFT wants to store on-chain. Can be stringified JSON.
    • reference: optional string - URL to an off-chain JSON file with more info.
    • reference_hash: optional string - Base64-encoded sha256 hash of JSON from reference field. Required if reference is included.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "token_id": "",
3    "token_owner_id": "",
4    "token_metadata": {},
5    "nearapps_tags": {
6        "app_id": "some app tag",
7        "action_id": "1",
8        "user_id": "some-user-tag"
9    }
10}
11

Returns

  • token: object - the standard nft token information.

Standard NFT Operations

methods:

  • nft_transfer_logged
  • nft_transfer_call_logged
  • nft_token
  • nft_approve_logged
  • nft_revoke_logged
  • nft_revoke_all
  • nft_is_approved
  • nft_total_supply
  • nft_tokens
  • nft_supply_for_owner
  • nft_tokens_for_owner
  • nft_metadata

events:

  • nft_mint
  • nft_transfer
  • nft_revoke
  • nft_revoke_all

Transfer

method: nft_transfer_loggeddescription: Simple transfer. Transfer a given token_id from current owner to receiver_id.

Parameters

  • token_id: string - the token id to transfer.
  • receiver_id: string - the account to receive the token.
  • approval_id: optional number - expected approval ID. A number smaller than 2^53, and therefore representable as JSON.
  • memo: optional string - free-form information.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "token_id": "some-token-id",
3    "receiver_id": "receiver-account",
4    "nearapps_tags": {
5        "app_id": "some app tag",
6        "action_id": "1",
7        "user_id": "some-user-tag"
8    }
9}
10

Returns

Has no return.

Transfer Call

method: nft_transfer_call_loggeddescription: Transfer token and call a method on a receiver contract. A successful workflow will end in a success execution outcome to the callback on the NFT contract at the method nft_resolve_transfer.

Parameters

  • token_id: string - the token id to transfer.
  • receiver_id: string - the account to receive the token.
  • approval_id: optional number - expected approval ID. A number smaller than 2^53, and therefore representable as JSON.
  • memo: optional string - free-form information.
  • msg: String - free-form information that can help the receiver to make a decision to accept or deny the token.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "token_id": "some-token-id",
3    "receiver_id": "receiver-account",
4    "msg": "arbitrary",
5    "nearapps_tags": {
6        "app_id": "some app tag",
7        "action_id": "1",
8        "user_id": "some-user-tag"
9    }
10}
11

Returns

  • success: bool - whether the transfer was successful or not.

Approval

method: nft_approve_loggeddescription: Add an approved account for a specific token.

Parameters

  • token_id: string - the token id to give allowance on
  • account_id: string - the account to allow token transfer
  • msg: optional string.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "token_id": "some-token-id",
3    "account_id": "allowed-account",
4    "nearapps_tags": {
5        "app_id": "some app tag",
6        "action_id": "1",
7        "user_id": "some-user-tag"
8    }
9}
10

Returns

  • approval_id: the id of the approval

Check Approval

method: nft_is_approveddescription: Check if a token is approved for transfer by a given account, optionally checking an approval_id

Parameters

  • token_id: string - the token id to check allowance on
  • approved_account_id: string.
  • approval_id: optional number.

Json Example

1{
2    "token_id": "some-token-id",
3    "approved_account_id": "approved-account",
4}
5

Returns

  • is_approved: boolean - whether it is approved.

Revoke

method: nft_revoke_loggeddescription: Revoke an approved account for a specific token.

Parameters

  • token_id: string - the token id to revoke allowance on
  • account_id: string - the account to disallow token transfer
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "token_id": "some-token-id",
3    "account_id": "account-to-be-disallowed",
4    "nearapps_tags": {
5        "app_id": "some app tag",
6        "action_id": "1",
7        "user_id": "some-user-tag"
8    }
9}
10

Returns

Has no returns.

Revoke All

method: nft_revoke_all_loggeddescription: Revoke all approved accounts for a specific token.

Parameters

  • token_id: string - the token id to revoke allowance on
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "token_id": "some-token-id",
3    "nearapps_tags": {
4        "app_id": "some app tag",
5        "action_id": "1",
6        "user_id": "some-user-tag"
7    }
8}
9

Returns

Has no return.

NFT Series Operations

methods:

  • nft_series_create
  • nft_series_supply
  • nft_series_mint
  • nft_series_get
  • nft_series_get_minted_tokens_vec
  • nft_series_set_mintable
  • nft_series_set_capacity

events:

  • nft_series_create
  • nft_series_mint
  • nft_series_set_capacity
  • nft_series_set_mintable

NFT Series Creation

method: nft_series_createdescription: Creates a new NFT series.

Parameters

  • name: string - the name of the token series
  • capacity: string - the maximum number of the of tokens that can be minted
  • creator: string - the account_id of the creator, used for informing
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "name": "my-token-series",
3    "capacity": "5",
4    "creator": "some-user",
5    "nearapps_tags": {
6        "app_id": "some app tag",
7        "action_id": "1",
8        "user_id": "some-user-tag"
9    }
10}
11

Returns

  • series_id: string - a number representing the id of the created series.

NFT Series Supply

method: nft_series_supplydescription: Shows how many series were created.

Parameters

Has no parameters.

Returns

  • series_quantity: string - a number representing the number of created series.

NFT Series Token Minting

method: nft_series_mintdescription: Creates a new nft token from a created token series.

Parameters

  • series_id: string - the series id number
  • token_owner_id: string - the account_id of who will receive the token.
  • token_metadata: optional object - the standard nft token metadata.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "series_id": "1",
3    "token_owner_id": "receiver-account",
4    "nearapps_tags": {
5        "app_id": "some app tag",
6        "action_id": "1",
7        "user_id": "some-user-tag"
8    }
9}
10

Returns

  • token: object - the standard nft token information.

NFT Series Query

method: nft_series_getdescription: Gets information on a series.

Parameters

  • series_id: string - the series id number

Json Example

1{
2    "series_id": "1"
3}
4

Returns

  • series: object - nft series information.
    • id: string - the series id number,
    • name: string
    • creator: string - the account_id of the creator
    • len: string - the number of minted tokens
    • capacity: string - the number of how many tokens can be minted
    • is_mintable: boolean - whether the series can be minted

NFT Series Token List

method: nft_series_get_minted_tokens_vecdescription: Get minted tokens from a series.

Parameters

  • series_id: string - the series id number
  • from_index: optional string - the number of how many tokens to skip.
  • limit: optional number - 16-bits number to limit how many tokens to show.

Json Example

1{
2    "series_id": "1"
3}
4

Returns

  • token_ids: string[] - a list containing the token_id number that were minted under the series.

NFT Series Set Mintable

method: nft_series_set_mintabledescription: Sets whether a series is mintable or not.

Parameters

  • series_id: string - the series id number.
  • is_mintable: boolean - choose whether it will be mintable or not.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "series_id": "1",
3    "is_mintable": true,
4    "nearapps_tags": {
5        "app_id": "some app tag",
6        "action_id": "1",
7        "user_id": "some-user-tag"
8    }
9}
10

Returns

Has no returns.

NFT Series Set Capacity

method: nft_series_set_capacitydescription: Sets the token capacity (the token max length) of a series.

Parameters

  • series_id: string - the series id number.
  • capacity: string - choose the number of what the capacity will be.
  • nearapps_tags: object - the tags information.
    • app_id: string - app tag.
    • action_id: string - action number.
    • user_id: string - user account_id tag.

Json Example

1{
2    "series_id": "1",
3    "capacity": "5",
4    "nearapps_tags": {
5        "app_id": "some app tag",
6        "action_id": "1",
7        "user_id": "some-user-tag"
8    }
9}
10

Returns

Has no returns.

Analytics Contract

Methods:

  • new
  • assert_owner
  • owner
  • transfer_ownership
  • log_event

Initialization

method: new

description: Initializes the contract.

Parameters

  • owner: string - the account_id of who will own the contract

Returns

Has no returns.

transaction hash 8jeTxGou3CpwJM61xkaJwkkG4NXPD49o6gWJHvQzMiVu

1{"args_json": {"owner_id": "dev-1649184729575-75393162970503"},
2"args_base64": "eyJvd25lcl9pZCI6ImRldi0xNjQ5MTg0NzI5NTc1LTc1MzkzMTYyOTcwNTAzIn0=",
3"deposit": "0",
4"gas": 30000000000000,
5"method_name": "new"
6}
7

Owner Management

Owner method: owner

description: Get the contract's owner.

Parameters

Has no parameters.

Returns

account_id: string - The owner's account_id.

transaction hash 3bxxPYpXaZScvNc6yEcs3FQoo55mfjjRcbASgFhzt56f

1{"args_json": {},
2"args_base_64": "e30=",
3"deposit": "0",
4"gas": 30000000000000,
5"method_name": "owner"
6}
7

Returned value:

'dev-1649184729575-75393162970503'

Transfer Ownership

method: transfer_ownership

description: Changes the contract's owner.

Parameters

  • owner: string - the account_id of the next owner who will own the contract

Returns

Has no returns.

transaction hash HNHY32GVh1fFzdJ5cNpzgG9P617tEQhUmzpeupaaD8vz

1{"args_json": {"owner":"dev-1649182007824-64888721472108"},
2"args_base64": "eyJvd25lciI6ImRldi0xNjQ5MTgyMDA3ODI0LTY0ODg4NzIxNDcyMTA4In0=",
3"deposit": "0",
4"gas": 30000000000000,
5"method_name": "transfer_ownership"
6}
7

File Event Logger

Log Event

method: log_event

description: Logs event of file operation.

Parameters

  • time: number - time of the operation
  • operation: string - name of the operation
  • hash: string - hash of the operation
  • receiver_wallet_id: string|null - wallet id of file receiver

transaction hash AwtZLLQvvfzk4vpENNHi7A6msinsbGPPPU67Vvie2mTn

1{"args_json": {"time":4042022,"operation":"transaction completed","transaction_hash":"HNHY32GVh1fFzdJ5cNpzgG9P617tEQhUmzpeupaaD8vz"},
2"args_base64": "eyJ0aW1lIjo0MDQyMDIyLCJvcGVyYXRpb24iOiJ0cmFuc2FjdGlvbiBjb21wbGV0ZWQiLCJ0cmFuc2FjdGlvbl9oYXNoIjoiSE5IWTMyR1ZoMWZGemRKNWNOcHpnRzlQNjE3dEVRaFVtenBldXBhYUQ4dnoifQ==",
3"deposit": "0",
4"gas": 30000000000000,
5"method_name": "log_event"
6}
7

Logs:

1"logs": [
2"EVENT_JSON:{"time":4042022,"operation":"transaction completed","transaction_hash":"HNHY32GVh1fFzdJ5cNpzgG9P617tEQhUmzpeupaaD8vz"}"
3],

Components

  • Indexer Instance and Postgres DB
  • Amazon MSK
  • Debeizum
  • AWS Lambda
  • Python 3.9 Primelab Custom Python Layer for ETL
  • Redshift (or choose your own Database)
  • Docker Container for explorer API

Test Drive

The ETL pipeline encompasses several desperate components, each requiring a bit of setup and tinkering to get humming properly. We will start with setting up the Debezium Connector. If you haven’t already, you must setup a indexer instance to install the debezium plug-in into.

Debezium and Kafka on Postgres

In order to stream data from Postgres as a source to a target data store, we use open source tooling in the form of Debezium and Apache Kafka (Kafka).

Debezium is a distributed Change Data Capture (CDC) tool built on top of Kafka to stream row-level database changes into event streams.

Kafka is a distributed event store and stream-processing platform.

Postgres Setup

In order for Debezium to work it needs to use logical decoding with the write-ahead log.  The following parameters need to be set.

  • wal_level = logical
  • max_wal_senders = 1
  • max_replication_slots = 1

If using AWS you should also set rds.logical_replication = 1.

The database also must permit replication with the host that runs the Postgres connector.  For more details see here.

Note that if you are using a postgres cluster, debezium must communicate with the writer / leader node.

Debezium Connector Setup

Debezium provides excellent documentation on how to setup the Postgres (and many other database flavors) connector.  It is recommended to read the documentation before implementation.

tl;dr:

  1. Create a debezium role in the source database and grant replication privileges.
  2. Create a replication group role and grant the debezium user and current table owner access. Assign tables you are interested in capturing to this owner.
  3. Create debezium heartbeat and signal tables.
  4. Create a publication for the tables you are interested in capturing.

Details:

  1. Debezium needs access to the database in order to capture the data changes. It also needs replication privileges in order to add tables to the publication which publishes the changes.
  2. The debezium user needs to be an owner of the tables it is operating on. As we want to assign least privileges (in a dev environment you can assign superuser), we transfer the tables we are interested in capturing to a shared ownership role between the current table owner and the debezium role. This ensures that there is no effect on current database practices whilst allowing debezium the access it needs.
  3. A debezium heartbeat table is a simple table which debezium updates periodically to stop connector failure / Postgres replication slots getting clogged up. The issue is documented here and here. It is recommended to add the heartbeat table and in our use case we faced the issues described without it.The signal table is used to send messages to Debezium for events such as adding messages to the log or taking ad hoc snapshots.
  4. The publication is used by Postgres to

Kafka Connector Setup

Debezium sits on top of kafka connect, and there are multiple options for how this is setup, such as using a VM, Docker, Kubernetes or a managed Kafka service like AWS MSK.  See the deployment documentation for more details.

First you need to create a kafka cluster.  This is a collection of one or more kafka brokers (or servers) that allows kafka to function in a distributed manner.  It is recommended to have more than one broker to allow for data replication.  Brokers can also fail over time or need to be restarted, so with a single broker you can end up in situations where you can’t write to a topic as the broker has failed.

Brokers are managed by Apache ZooKeeper.

In order to deploy the Debezium Postgres Connector, you need the following:

  1. Debezium postgres connector jar files.
  2. Debezium connector properties file.
  3. Kafka worker config file.
  4. Kafka cluster properties file.

The debezium connector properties file contains all of the information relating to the tables you are capturing, the format you want the data in and the database connection information.  There are many options which can be added here and it’s best to consult the documentation for your particular use case.

The cluster configuration contains information such as how many partitions you wish to use, the maximum size of individual messages and the number of replicas and replication factor you want to use.

The replication factor is the number of times you want to replicate the data in the cluster. For example, if you have a replication factor of 3, then for every message written to the cluster, it will be written 3 times. This is useful for data redundancy in case of broker failure.

The worker configuration contains information such as the number of workers you want to use, the number of tasks per worker and the connector class name. The connector class name is the name of the connector you are using, in this case it is the debezium postgres connector.

Testing

Once you have the connector deployed, you can test it by writing to the database and checking that the changes are replicated to the kafka cluster.

AWS Lambda Setup

Prerequisites

In order to follow this guide, you will need the following:

  • An AWS account
  • A text editor
  • Python 3.6 or later

Setting up the Lambda Function

  1. Log in to the AWS Management Console and navigate to the Lambda service.
  2. Click on "Create Function", you will need 4 functions. So maybe pick a schema like primelab-transactions, primelab-accounts etc.
  3. Select "Author from scratch".
  4. Enter a name for the function and select "Python 3.9" as the runtime.
  5. Create a role scoped with access to the MSK Cluster and your receiving data-lake.
  6. Click on "Create Function".

Customizing the Lambda Function

  1. In the "Function code" section, select "Edit code inline".
  2. Paste the code from the Github Repo, copy one lambda handler to each function you created above.
  3. Click on "Save" and Deploy.

Creating the Dependency Layer

In Lambda it is not possible to install python “pip” dependencies within the UI, if you are feeling lazy you can use the layer published here (arn:aws:lambda:us-east-1:165372248390:layer:pandas:3).

  1. Start a Cloud9 Linux instance in AWS. Go to the AWS Services and search for Cloud9. Click ‘Create Environment’. Name your environment and click next step. Keep the environment default settings and click next step. Click ‘Create Environment’ and you’re ready to go.

  2. Creating your Pandas Lambda layer. Type the following code line by line into the terminal at the bottom to create a Pandas Lambda layer. The pip install pandas command can be replaced with a package of your choosing. You can also install more than 1 package*.

mkdir folder
cd folder
virtualenv v-env
source ./v-env/bin/activate
pip install pandas
pip install requests
deactivate

Then type the following code line by line to create your layer

mkdir python
cd python
cp -r ../v-env/lib64/python3.7/site-packages/* .
cd ..
zip -r panda_layer.zip python
aws lambda publish-layer-version --layer-name pandas --zip-file fileb://panda_layer.zip --compatible-runtimes python3.7

Deploying

After populating each function with the appropriate code click deploy and move on to adding a trigger. Add the “MSK” trigger and configure to your preference, setting a low threshold for transaction's may be prudent while testing. You should begin seeing transactions populate transformed in their new schema in your destination database.

Schema

-------------------------------------------------
-- Tables
-------------------------------------------------
create table primelab.etl_audits (
    etl_audit_id    integer generated always as identity
   ,record_count    integer
   ,audit_timestamp timestamp default current_timestamp 
);

-------------------------------------------------
-- Constraint
-------------------------------------------------
alter table primelab.etl_audits add constraint etl_audits_pkey primary key (etl_audit_id);

-------------------------------------------------
-- Views
-------------------------------------------------
create or replace view public.finalised_transactions as
    select ara.receipt_id
          ,ara.args
          ,ara.wallet_id
          ,ara.slice_id
          ,t.transaction_hash
          ,t.included_in_block_hash
          ,t.block_timestamp
          ,t.status
      from public.stg_transactions t
      join public.stg_action_receipt_actions ara on t.converted_into_receipt_id = ara.receipt_id;

-------------------------------------------------
-- Procedures
-------------------------------------------------
create or replace procedure primelab.move_matched_rows()
as
$$
begin
    -- Create a temporary table so we get a consistent view of the transactions at this point
    create temporary table txn_rec_tmp (
        receipt_id              text
       ,args                    jsonb
       ,wallet_id               text
       ,slice_id                text
       ,transaction_hash        text
       ,included_in_block_hash  numeric(20,0)
       ,block_timestamp         timestamp
       ,status                  text
    ) on commit drop;
    
    insert into txn_rec_tmp (receipt_id, args, wallet_id, slice_id
                            ,transaction_hash, included_in_block_hash, block_timestamp, status)
    select receipt_id, args, wallet_id, slice_id
          ,transaction_hash, included_in_block_hash, to_timestamp(block_timestamp/ 1000000000), status 
      from public.finalised_transactions;

    -- Create receipts
    insert into primelab.receipts(receipt_id, block_hash, status, created_at)
    select receipt_id, included_in_block_hash, status, block_timestamp
      from txn_rec_tmp 
        on conflict (receipt_id) do nothing;
    
    -- Create wallets
    insert into primelab.wallets(wallet_id, persona_id, created_at)
    select wallet_id, 1, block_timestamp /* TO DO - change persona to be dynamic once we have personas */
      from txn_rec_tmp
     where wallet_id is not null
        on conflict (wallet_id) do nothing;
    
    -- Create transactions /*TO DO - Add stack_name*/
    with txn_ins as (
        insert into primelab.transactions(transaction_id, receipt_id, slice_name
                                         ,wallet_id, tags_json, created_at, status)
        select transaction_hash, receipt_id, slice_id, wallet_id, args, block_timestamp, status
          from txn_rec_tmp
            on conflict (transaction_id) do nothing
        returning transaction_hash
    )
    -- Write to audit table
    insert into primelab.etl_audit (record_count)
    select count(*)
      from txn_ins;

    -- Now delete these from the staging tables
    -- Commented out for now
    -- delete from public.stg_transactions as t
    --  using txn_rec_tmp as trt
    --  where t.transaction_hash = trt.transaction_hash;
    
    -- delete from public.stg_action_receipt_actions as ara
    --  using txn_rec_tmp as trt
    --  where ara.receipt_id = trt.receipt_id;
    
end;
$$ language plpgsql;

Natural Language API

Problem statement

Oftentimes, the business depends on the analytics team for any kind of ad-hoc analysis to get meaningful insights from the data or answers from them. But this kind of deviates both the teams. It builds a dependency of the business on the data analyst to write the necessary codes to get the answers to their questions, which takes time. It also affects the data analysts for their long-term work if too many ad-hoc questions start coming in.

What if there was a way that would enable the business to directly ask the questions to the data itself and somehow, automagically get the answers from it?

Solution - An Artificial Intelligence powered app that understands English

How the app works

We have created an AI-powered app (With the help of the GPT3 engine) where the users can ask any questions about the data in simple English. Now a powerful AI agent will run in the backend which will parse this English question, understand its meaning, generate python codes, and run them in the backend to get the correct answer for the question.

How the AI agent works

GPT-3 is a language model based on the transformer architecture, pre-trained in a generative, unsupervised manner that shows decent performance in zero/one/few-shot multitask settings. It works by predicting the next token given a sequence of tokens and can do so for NLP tasks it hasn’t been trained on. After seeing just a few examples, it reached state-of-the-art levels in some benchmarks such as machine translation, Q&A, and also content generation.

We used this powerful pretrained model and use its embedding to parse user questions, generate their corresponding embeddings and then translate it into the vector space for writing the corresponding python codes that answer the original question.

Running the code

  1. Clone the repository here NLPQ
  2. run pip install -r requirements.txt
  3. From outside the fastapiapp_v2 folder, run uvicorn fastapiapp_v2.main:app --reload --workers 1 --host 0.0.0.0 --port 8001
  4. Go to http://localhost:8001/docs
  5. Here you will see two things you can try out...one is get transactions where you can skip rows and select how many rows you wanna see and the second one is given a transaction_hash, it will return the whole row to you

App demo

a) Direct Answers

The following 2 min video shows this app in action. We took a sample of NEAR data and tested the app with it. All the user needs to do is ask their questions in simple English and then they get the answer from the data in real-time as the AI agent writes the codes by itself.

Some examples of questions might be

a) How many transactions happened in total?

b) How many unique users are there?

c) Who are the top 5 users who did the highest number of transactions in March 2022?

etc…

Near Protocol Query Assistant.mkv

b) Visualization

We are also adding a separate section to this app where the AI can also generate visualization from the data. Currently, this is in the beta stage and we need to feed it more training examples. But here are some initial results.

1. Plot the daily time series of new users

Daily time Series

2. Plot the daily time series of returning users

Daily returning users

3. Plot the top 5 signer_account_id who did the highest numbers of transactions

Top 5 Signer

4. Plot the histogram of receipt_conversion_gas_burnt

Histogram

Useful Links

Debezium Postgres Documentation

Debezium Zulip Message Board

Kafka Connect Documentation

AWS Example WalkthroughAWS MSK Coinbase Article

Party Time

Release Roadmap

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •