import requests
import sys
from volttron.platform import jsonapi
authentication=None
[docs]def do_rpc(method, params=None ):
global authentication
url_root = 'http://localhost:8080/jsonrpc'
json_package = {
'jsonrpc': '2.0',
'id': '2503402',
'method': method,
}
if authentication:
json_package['authorization'] = authentication
if params:
json_package['params'] = params
data = jsonapi.dumps(json_package)
return requests.post(url_root, data=jsonapi.dumps(json_package))
[docs]def get_dict(text):
return jsonapi.loads(text)
[docs]def inspect_agent(platform_uuid, agent_uuid):
method = "platforms.uuid.{}.agents.uuid.{}.inspect".format(platform_uuid,
agent_uuid)
return do_rpc(method)
[docs]def list_agents(platform_uuid):
method = "platforms.uuid.{}.list_agents".format(platform_uuid)
return do_rpc(method)
[docs]def inspect_method(platform_uuid, agent_uuid, method):
method = "platforms.uuid.{}.agents.uuid.{}.{}.inspect".format(platform_uuid,
agent_uuid,
method)
return do_rpc(method)
[docs]def exec_method(platform_uuid, agent_uuid, method, params):
method = "platforms.uuid.{}.agents.uuid.{}.{}".format(platform_uuid,
agent_uuid,
method)
return do_rpc(method, params)
[docs]def register_instance(discovery_address):
print("Registering platform instance")
return do_rpc(
'register_instance', {'discovery_address': discovery_address})
if __name__ == '__main__':
response = do_rpc("get_authorization", {'username': 'admin',
'password': 'admin'})
response = register_instance("127.0.0.2:8080")
if response.ok:
success = response.json()['result']
if success:
print('default platform registered')
else:
print("default platform not registered correctly")
sys.exit(0)
else:
print('Getting platforms unsuccessful')
sys.exit(0)
response = do_rpc("list_platforms")
platforms = None
if response.ok:
platforms = jsonapi.loads(response.text)['result']
print('Platforms retrieved')
else:
print('Getting platforms unsuccessful')
sys.exit(0)
if len(platforms) > 0:
for p in platforms:
print(p)
response = list_agents(p['uuid'])
if response.ok:
print("RESPONSE WAS: "+response.text)
agents = jsonapi.loads(response.text)['result']
for a in agents:
print('agents name {name}'.format(**a))
if 'hello' in a['name']: # hello agent only
print("routing to: ", p['uuid'])
print('agent uuid: ', a['uuid'])
response = inspect_agent(p['uuid'], a['uuid'])
print("INSPECT RESPONSE {}".format(response))
print("INSPECT RESPONSE {}".format(response.text))
methods = jsonapi.loads(response.text)
response = inspect_method(p['uuid'], a['uuid'], 'sayHello')
print("RESPONSE WAS: "+response.text)
response = exec_method(p['uuid'], a['uuid'], 'sayHello', {'name': 'Ralphie'})
print("RESPONSE WAS: "+response.text)
if response.ok:
print("RESPONSE WAS: "+response.text)
methods = jsonapi.loads(response.text)['result']
print('Methods received for {}'.format(p['uuid']))
print(methods)
else:
print('Getting methods unsuccessful')
sys.exit(0)
else:
print('Listing agents unsuccessful')
sys.exit(0)