Read Events on every new Block
Pull on every new block
- listen for new blocks using a ticker
- filter for a specific list of events in the new block (source: github json list)
#!/usr/bin/python3
 
from thor_requests.connect import Connect
from thor_requests.contract import Contract
from thor_devkit import abi
import requests
 
connector = Connect('https://node.vechain.energy')
mpv3_listings_json = requests.get('https://raw.githubusercontent.com/vechain/b32/master/ABIs/vesea_mpv3_listings.json').json()
mpv3_listings_abi = {"abi":mpv3_listings_json}
mvp3_listings_contract = Contract(mpv3_listings_abi)
mvp3_listings_address = '0xDafCA4A51eA97B3b5F21171A95DAbF540894a55A'
 
def decode_data(json, event):
    topics0 = json['topics'][0]
    topics1 = json['topics'][1]
    topics2 = json['topics'][2]
    topics3 = json['topics'][3]
 
    decode_data = abi.Event(mvp3_listings_contract.get_abi(event)).decode(
        data=bytes.fromhex(json['data'][2:]),
        topics=[
            bytes.fromhex(topics0[2:]),
            bytes.fromhex(topics1[2:]),
            bytes.fromhex(topics2[2:]),
            bytes.fromhex(topics3[2:]),
        ]
    )
 
    return decode_data
 
 
while True:
    for block in connector.ticker():
        block_number = block['number']
 
        request_data = {
            "range": {
                "unit": "block",
                "from": block_number,
                "to": block_number
            },
            "criteriaSet": [
                {
                    "address": mvp3_listings_address
                }
            ],
            "order": "asc",
        }
 
        print(block_number)
 
        response = requests.post('https://node.vechain.energy/logs/event', json=request_data)
        if response.status_code != 200:
            print('Error')
            continue
 
        json_response = response.json()
        for item in json_response:
            event = mvp3_listings_contract.get_event_by_signature(bytes.fromhex(item["topics"][0][2:]))
            if event is None:
                continue
 
            data = decode_data(item, event.get_name())
 
            print(event.get_name(), data)Shared by: https://twitter.com/crypto_milos (opens in a new tab)
Push from WebSocket Subscription
This example uses WebSockets to get notified on new events when they arrive:
#!/usr/bin/python3
import requests
import json
import asyncio
import websockets
from thor_devkit.abi import Event
from thor_requests.contract import Contract
 
# Define ABIs and contract objects
mpv3_listings_abi = requests.get('https://raw.githubusercontent.com/vechain/b32/master/ABIs/vesea_mpv3_listings.json').json()
mvp3_listings_contract = Contract({"abi": mpv3_listings_abi})
mvp3_listings_address = '0xDafCA4A51eA97B3b5F21171A95DAbF540894a55A'
 
mvp3_offers_abi = requests.get('https://raw.githubusercontent.com/vechain/b32/master/ABIs/vesea_mpv3_offers.json').json()
mvp3_offers_contract = Contract({"abi": mvp3_offers_abi})
mvp3_offers_address = '0xdab185ca52b70e087ec0990ad59c612c3d7aab14'
 
# Define function to decode data
def decode_data(json_response, address):
    contract = mvp3_listings_contract if address == mvp3_listings_address else mvp3_offers_contract
    event = contract.get_event_by_signature(bytes.fromhex(json_response["topics"][0][2:]))
    decode_data = Event(contract.get_abi(event.get_name())).decode(
        data=bytes.fromhex(json_response['data'][2:]), 
        topics=[bytes.fromhex(topic[2:]) for topic in json_response['topics'][:4]]
    )
    
    return event.get_name(), decode_data
 
# Define async function to listen to address
async def listen_to_address(address):
    async with websockets.connect(f'wss://node-mainnet.vechain.energy/subscriptions/event?addr={address}') as ws:
        while True:
            json_response = json.loads(await ws.recv())
            event_name, data = decode_data(json_response, address)
            print(f'{address}: {event_name}, {data}')
 
# Define main function
async def main():
    addresses = [mvp3_listings_address, mvp3_offers_address]
    tasks = [listen_to_address(address) for address in addresses]
    await asyncio.gather(*tasks)
 
# Run main function
asyncio.run(main())Shared by: