diff options
Diffstat (limited to 'nixos/tests/xmpp/xmpp-sendmessage.nix')
-rw-r--r-- | nixos/tests/xmpp/xmpp-sendmessage.nix | 105 |
1 files changed, 60 insertions, 45 deletions
diff --git a/nixos/tests/xmpp/xmpp-sendmessage.nix b/nixos/tests/xmpp/xmpp-sendmessage.nix index 2a075a01813..349b9c6f38e 100644 --- a/nixos/tests/xmpp/xmpp-sendmessage.nix +++ b/nixos/tests/xmpp/xmpp-sendmessage.nix @@ -1,46 +1,61 @@ -{ writeScriptBin, python3, connectTo ? "localhost" }: -writeScriptBin "send-message" '' - #!${(python3.withPackages (ps: [ ps.sleekxmpp ])).interpreter} - # Based on the sleekxmpp send_client example, look there for more details: - # https://github.com/fritzy/SleekXMPP/blob/develop/examples/send_client.py - import sleekxmpp - - class SendMsgBot(sleekxmpp.ClientXMPP): - """ - A basic SleekXMPP bot that will log in, send a message, - and then log out. - """ - def __init__(self, jid, password, recipient, message): - sleekxmpp.ClientXMPP.__init__(self, jid, password) - - self.recipient = recipient - self.msg = message - - self.add_event_handler("session_start", self.start, threaded=True) - - def start(self, event): - self.send_presence() - self.get_roster() - - self.send_message(mto=self.recipient, - mbody=self.msg, - mtype='chat') - - self.disconnect(wait=True) - - - if __name__ == '__main__': - xmpp = SendMsgBot("cthon98@example.com", "nothunter2", "azurediamond@example.com", "hey, if you type in your pw, it will show as stars") - xmpp.register_plugin('xep_0030') # Service Discovery - xmpp.register_plugin('xep_0199') # XMPP Ping - - # TODO: verify certificate - # If you want to verify the SSL certificates offered by a server: - # xmpp.ca_certs = "path/to/ca/cert" - - if xmpp.connect(('${connectTo}', 5222)): - xmpp.process(block=True) - else: - print("Unable to connect.") - sys.exit(1) +{ writeScriptBin, writeText, python3, connectTo ? "localhost" }: +let + dummyFile = writeText "dummy-file" '' + Dear dog, + + Please find this *really* important attachment. + + Yours truly, + John + ''; +in writeScriptBin "send-message" '' +#!${(python3.withPackages (ps: [ ps.slixmpp ])).interpreter} +import logging +import sys +from types import MethodType + +from slixmpp import ClientXMPP +from slixmpp.exceptions import IqError, IqTimeout + + +class CthonTest(ClientXMPP): + + def __init__(self, jid, password): + ClientXMPP.__init__(self, jid, password) + self.add_event_handler("session_start", self.session_start) + + async def session_start(self, event): + log = logging.getLogger(__name__) + self.send_presence() + self.get_roster() + # Sending a test message + self.send_message(mto="azurediamond@example.com", mbody="Hello, this is dog.", mtype="chat") + log.info('Message sent') + + # Test http upload (XEP_0363) + def timeout_callback(arg): + log.error("ERROR: Cannot upload file. XEP_0363 seems broken") + sys.exit(1) + url = await self['xep_0363'].upload_file("${dummyFile}",timeout=10, timeout_callback=timeout_callback) + log.info('Upload success!') + # Test MUC + self.plugin['xep_0045'].join_muc('testMucRoom', 'cthon98', wait=True) + log.info('MUC join success!') + log.info('XMPP SCRIPT TEST SUCCESS') + self.disconnect(wait=True) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(levelname)-8s %(message)s') + + ct = CthonTest('cthon98@example.com', 'nothunter2') + ct.register_plugin('xep_0071') + ct.register_plugin('xep_0128') + # HTTP Upload + ct.register_plugin('xep_0363') + # MUC + ct.register_plugin('xep_0045') + ct.connect(("server", 5222)) + ct.process(forever=False) '' |