# -*- Mode: Python; tab-width: 4 -*- # Copyright 1999, 2000 by eGroups, Inc. # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of # eGroups not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # EGROUPS DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL EGROUPS BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import marshal import socket import string import time import exceptions import select import string import sys import types import errno # # there are three clients in here. # # 1) rpc client # 2) fastrpc client # 3) async fastrpc client # # we hope that *whichever* choice you make, that you will enjoy the # excellent hand-made construction, and return to do business with us # again in the near future. # class RPC_Error (exceptions.StandardError): pass # =========================================================================== # RPC Client # =========================================================================== # request types: # 0 call # 1 getattr # 2 setattr # 3 repr # 4 del class rpc_proxy: DEBUG = 0 def __init__ (self, conn, oid): # route around __setattr__ self.__dict__['conn'] = conn self.__dict__['oid'] = oid # Warning: be VERY CAREFUL with attribute references, keep # this __getattr__ in mind! def __getattr__ (self, attr): # __getattr__ and __call__ if attr == '__call__': # 0 == __call__ return self.__remote_call__ elif attr == '__repr__': # 3 == __repr__ return self.__remote_repr__ elif attr == '__getitem__': return self.__remote_getitem__ elif attr == '__setitem__': return self.__remote_setitem__ elif attr == '__len__': return self.__remote_len__ else: # 1 == __getattr__ return self.__send_request__ (1, attr) def __setattr__ (self, attr, value): return self.__send_request__ (2, (attr, value)) def __del__ (self): try: self.__send_request__ (4, None) except: import who_calls info = who_calls.compact_traceback() print info def __remote_repr__ (self): r = self.__send_request__ (3, None) return '' % r[1:-1] def __remote_call__ (self, *args): return self.__send_request__ (0, args) def __remote_getitem__ (self, key): return self.__send_request__ (5, key) def __remote_setitem__ (self, key, value): return self.__send_request__ (6, (key, value)) def __remote_len__ (self): return self.__send_request__ (7, None) _request_types_ = ['call', 'getattr', 'setattr', 'repr', 'del', 'getitem', 'setitem', 'len'] def __send_request__ (self, *args): if self.DEBUG: kind = args[0] print ( 'RPC: ==> %s:%08x:%s:%s' % ( self.conn.address, self.oid, self._request_types_[kind], repr(args[1:]) ) ) packet = marshal.dumps ((self.oid,)+args) # send request self.conn.send_packet (packet) # get response data = self.conn.receive_packet() # types of response: # 0: proxy # 1: error # 2: marshal'd data kind, value = marshal.loads (data) if kind == 0: # proxy (value == oid) if self.DEBUG: print 'RPC: <== proxy(%08x)' % (value) return rpc_proxy (self.conn, value) elif kind == 1: raise RPC_Error, value else: if self.DEBUG: print 'RPC: <== %s' % (repr(value)) return value class rpc_connection: cache = {} def __init__ (self, address): self.address = address self.connect () def connect (self): s = socket.socket (socket.AF_INET, socket.SOCK_STREAM) s.connect (self.address) self.socket = s def receive_packet (self): packet_len = string.atoi (self.socket.recv (8), 16) packet = [] while packet_len: data = self.socket.recv (8192) packet.append (data) packet_len = packet_len - len(data) return string.join (packet, '') def send_packet (self, packet): self.socket.send ('%08x%s' % (len(packet), packet)) def rpc_connect (address = ('localhost', 8746)): if not rpc_connection.cache.has_key (address): conn = rpc_connection (address) # get oid of remote object data = conn.receive_packet() (oid,) = marshal.loads (data) rpc_connection.cache[address] = rpc_proxy (conn, oid) return rpc_connection.cache[address] # =========================================================================== # fastrpc client # =========================================================================== class fastrpc_proxy: def __init__ (self, conn, path=()): self.conn = conn self.path = path def __getattr__ (self, attr): if attr == '__call__': return self.__method_caller__ else: return fastrpc_proxy (self.conn, self.path + (attr,)) def __method_caller__ (self, *args): # send request packet = marshal.dumps ((self.path, args)) self.conn.send_packet (packet) # get response data = self.conn.receive_packet() error, result = marshal.loads (data) if error is None: return result else: raise RPC_Error, error def __repr__ (self): return '' % (string.join (self.path, '.'), id (self)) def fastrpc_connect (address = ('localhost', 8748)): if not rpc_connection.cache.has_key (address): conn = rpc_connection (address) rpc_connection.cache[address] = fastrpc_proxy (conn) return rpc_connection.cache[address] # =========================================================================== # async fastrpc client # =========================================================================== import asynchat import fifo class async_fastrpc_client (asynchat.async_chat): STATE_LENGTH = 'length state' STATE_PACKET = 'packet state' def __init__ (self, address=('idb', 3001)): asynchat.async_chat.__init__ (self) if type(address) is type(''): family = socket.AF_UNIX else: family = socket.AF_INET self.create_socket (family, socket.SOCK_STREAM) self.address = address self.request_fifo = fifo.fifo() self.buffer = [] self.pstate = self.STATE_LENGTH self.set_terminator (8) self._connected = 0 self.connect (self.address) def log (self, *args): pass def handle_connect (self): self._connected = 1 def close (self): self._connected = 0 self.flush_pending_requests ('lost connection to rpc server') asynchat.async_chat.close(self) def flush_pending_requests (self, why): f = self.request_fifo while len(f): callback = f.pop() callback (why, None) def collect_incoming_data (self, data): self.buffer.append (data) def found_terminator (self): self.buffer, data = [], string.join (self.buffer, '') if self.pstate is self.STATE_LENGTH: packet_length = string.atoi (data, 16) self.set_terminator (packet_length) self.pstate = self.STATE_PACKET else: # modified to fix socket leak in chat server, 2000-01-27, schiller@eGroups.net #self.set_terminator (8) #self.pstate = self.STATE_LENGTH error, result = marshal.loads (data) callback = self.request_fifo.pop() callback (error, result) self.close() # for chat server def call_method (self, method, args, callback): if not self._connected: # might be a unix socket... family, type = self.family_and_type self.create_socket (family, type) self.connect (self.address) # push the request out the socket path = string.split (method, '.') packet = marshal.dumps ((path, args)) self.push ('%08x%s' % (len(packet), packet)) self.request_fifo.push (callback) if __name__ == '__main__': import sys if '-f' in sys.argv: connect = fastrpc_connect else: connect = rpc_connect print 'connecting...' c = connect() print 'calling .calc.sum (1,2,3)' print c.calc.sum (1,2,3) print 'calling .calc.nonexistent(), expect an exception!' print c.calc.nonexistent()