python网络编程测试-host应答

来源:岁月联盟 编辑:exp 时间:2012-06-07

五一假期最后一天,随便写写code,先上个小小服务器应答UDP请求。

由于本人只有一台机器,所以使用的是lo0端口,socket设置为SO_REUSEADDR

服务器端:
[python] #test of host  
 
import sys , socket , traceback , time 
 
host = '' 
xport = sys.argv[1] 
 
try: 
  port = int(xport) 
except ValueError: 
  port = socket.getservbyname(xport , 'UDP') 
  
s = socket.socket(socket.AF_INET , socket.SOCK_DGRAM) 
s.setsockopt(socket.SOL_SOCKET , socket.SO_REUSEADDR , 1) 
s.bind((host , port)) 
 
print('waiting for a client...') 
 
while True: 
  try: 
    message , address = s.recvfrom(1024) 
    if(message == 'time/n'): 
      print('get a client request for time...') 
      s.sendto('reply: ' + time.ctime() , address) 
    elif((message == 'hi/n') or (message == 'hello/n')): 
      print('get a client from:' , address) 
      s.sendto('reply: hi client' , address) 
    else: 
      print('get a client from: ' , address) 
      s.sendto('what would you need? ' , address) 
  except (KeyboardInterrupt , SystemExit): 
    raise 
  except: 
    traceback.print_exc() 
#test of host

import sys , socket , traceback , time

host = ''
xport = sys.argv[1]

try:
  port = int(xport)
except ValueError:
  port = socket.getservbyname(xport , 'UDP')
 
s = socket.socket(socket.AF_INET , socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET , socket.SO_REUSEADDR , 1)
s.bind((host , port))

print('waiting for a client...')

while True:
  try:
 message , address = s.recvfrom(1024)
 if(message == 'time/n'):
   print('get a client request for time...')
   s.sendto('reply: ' + time.ctime() , address)
 elif((message == 'hi/n') or (message == 'hello/n')):
    print('get a client from:' , address)
   s.sendto('reply: hi client' , address)
 else:
   print('get a client from: ' , address)
   s.sendto('what would you need? ' , address)
  except (KeyboardInterrupt , SystemExit):
 raise
  except:
 traceback.print_exc()

客户端:

[python]#test of udp  
 
import sys , socket 
 
host = sys.argv[1] 
xport = sys.argv[2] 
 
s = socket.socket(socket.AF_INET , socket.SOCK_DGRAM) 
 
try: 
  port = int(xport) 
except ValueError: 
  port = socket.getservbyname(xport , 'UDP') 
 
try: 
  s.connect((host , port)) 
 
  print('input the data') 
  data = sys.stdin.readline() 
  s.sendall(data) 
 
except socket.error , e: 
#test of udp

import sys , socket

host = sys.argv[1]
xport = sys.argv[2]

s = socket.socket(socket.AF_INET , socket.SOCK_DGRAM)

try:
  port = int(xport)
except ValueError:
  port = socket.getservbyname(xport , 'UDP')

try:
  s.connect((host , port))

  print('input the data')
  data = sys.stdin.readline()
  s.sendall(data)

except socket.error , e:
开启两个终端测试下:

客户端输入并得到服务器端回应(UDP):

 /


服务器端显示:

 /

 

 


 

OK,最后还有些要总结的

1 首先是KeyboardInterrupt中断处理,win32上的处理办法与Linux或者UNIX不同,需要注意

2 使用try...finally代码块关闭socket

3 关于socket选项,使用Python shell查询的小技巧(感觉有点像SQL)

/






摘自 FishinLab的专栏

 
 

图片内容