close

How to call a async function contained in a class?

Hello Guys, How are you all? Hope You all Are Fine. Today We Are Going To learn about How to call a async function contained in a class in Python. So Here I am Explain to you all the possible Methods here.

Without wasting your time, Let’s start This Article.

Table of Contents

How to call a async function contained in a class?

  1. How to call a async function contained in a class?

    Finally I could find the right way to do it (special thanks to @dirn)
    #!/usr/bin/env python3 import sys, json import asyncio from websockets import connect

  2. call a async function contained in a class

    Finally I could find the right way to do it (special thanks to @dirn)
    !/usr/bin/env python3
    import sys, json
    import asyncio
    from websockets import connect

Method 1

Finally I could find the right way to do it

#!/usr/bin/env python3

import sys, json
import asyncio
from websockets import connect

class EchoWebsocket:
    async def __aenter__(self):
        self._conn = connect('wss://ws.binaryws.com/websockets/v3')
        self.websocket = await self._conn.__aenter__()        
        return self

    async def __aexit__(self, *args, **kwargs):
        await self._conn.__aexit__(*args, **kwargs)

    async def send(self, message):
        await self.websocket.send(message)

    async def receive(self):
        return await self.websocket.recv()

class mtest:
    def __init__(self):
        self.wws = EchoWebsocket()
        self.loop = asyncio.get_event_loop()

    def get_ticks(self):
        return self.loop.run_until_complete(self.__async__get_ticks())

    async def __async__get_ticks(self):
        async with self.wws as echo:
            await echo.send(json.dumps({'ticks_history': 'R_50', 'end': 'latest', 'count': 1}))
            return await echo.receive()

And this in main.py:

from testws import *

a = mtest()

foo = a.get_ticks()
print (foo)

print ("async works like a charm!")

foo = a.get_ticks()
print (foo)

This is the output:

[email protected]:/home/dinocob# python3 test.py
{"count": 1, "end": "latest", "ticks_history": "R_50"}
async works like a charm!
{"count": 1, "end": "latest", "ticks_history": "R_50"}

Any tip to improve it is welcomed! 😉

Method 2

Your question and answer are great! They helped me a lot!

Based on your code I was able to create the following class, better matching my need:

import asyncio
from websockets import connect

class TestClient:
    def __init__(self, URL):
        self.URL = URL
        self.conn = None
        self.loop = asyncio.get_event_loop()

    async def send(self, message):
        if self.conn == None:
            self.conn = await connect(self.URL)
        await self.conn.send(message)

    async def receive(self):
        return await self.conn.recv()

    def ping(self):
        return self.loop.run_until_complete(self._ping())

    async def _ping(self):
        await self.send("Hello World")
        return await self.receive()

test = TestClient("wss://echo.websocket.org")
print(test.ping())

Summery

It’s all About this issue. Hope all Methods helped you a lot. Comment below Your thoughts and your queries. Also, Comment below which Method worked for you? Thank You.

Also, Read