2022年 11月 9日

使用Python实现与AD域进行对接

上上周接到需求,要将软件实现与客户AD域的信息进行对接。

需求如下:

    1、在软件注册用户的页面新建一个功能,能够查询该用户名是否在AD域内存在。

    2、用户登陆软件的时候对输入的账号密码与AD域内的账号密码进行校验,能够实时以AD域的账号密码登陆。

公司的考勤软件使用Django框架开发,用户登陆这一块用的是Django本身‘密码MD5加密’—‘保存数据库’—‘登陆时进行比对’的逻辑。

看到需求,直接否认了这种Django本身的逻辑,开玩笑,几千人的密码要进行实时更新,对服务器的压力也太大了,而且windows的官方文档写的清清楚楚:AD域不会提供密码。

Python对接AD域自然要用到ldap库,安装ldap库的时候最开始使用pip安装,后来软件报错才发现安装错误,应该安装python-ldap,而不是ldap。后来找到64位的安装包才安装成功。

(windows使用python有时候会有很多坑,因为有些库是依赖于VS,比较麻烦。)

ldap提供了接口可以供我们对AD域内的信息进行比对,只用修改Django的登陆逻辑,每次登陆的时候调用接口,和AD域内的信息进行比对就行了。

代码如下:

  1. # -*- coding: UTF-8 -*-
  2. import sys
  3. reload(sys)
  4. sys.setdefaultencoding('utf-8')
  5. import ldap,logging,time
  6. logfile = 'e:\\a.txt'
  7. # logging.basicConfig(filename=logfile,level=logging.INFO)
  8. # logging.basicConfig(format='%(time.asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
  9. logging.basicConfig(level=logging.INFO,
  10. #format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', #返回值:Thu, 26 May 2016 15:09:31 t11.py[line:92] INFO
  11. format='%(asctime)s %(levelname)s %(message)s',
  12. #datefmt='%a, %d %b %Y %H:%M:%S',
  13. #datefmt='%Y/%m/%d %I:%M:%S %p', #返回2016/05/26 03:12:56 PM
  14. datefmt='%Y-%m-%d %H:%M:%S', #返回2016/05/26 03:12:56 PM
  15. filename=logfile#,
  16. #filemode='a' #默认为a
  17. )
  18. #logging输出结果:
  19. #2016-05-26 15:22:29 INFO liu1 valid passed.
  20. #2016-05-26 15:22:37 INFO liu1 valid passed.
  21. class ldapc:
  22. def __init__(self,ldap_path,baseDN,ldap_authuser,ldap_authpass):
  23. self.baseDN = baseDN
  24. self.ldap_error = None
  25. self.l=ldap.initialize(ldap_path)
  26. self.l.protocol_version = ldap.VERSION3
  27. print 'url:',ldap_path
  28. print 'username:',ldap_authuser
  29. print 'password:',ldap_authpass
  30. try:
  31. res=self.l.simple_bind_s(ldap_authuser,ldap_authpass)
  32. print res
  33. except ldap.LDAPError,err:
  34. self.ldap_error = 'Connect to %s failed, Error:%s.' %(ldap_path,err.message['desc'])
  35. print self.ldap_error
  36. # finally:
  37. # self.l.unbind_s()
  38. # del self.l
  39. def search_users(self,username): #模糊查找,返回一个list,使用search_s()
  40. if self.ldap_error is None:
  41. try:
  42. searchScope = ldap.SCOPE_SUBTREE
  43. searchFiltername = "sAMAccountName" #通过samaccountname查找用户
  44. retrieveAttributes = None
  45. searchFilter = '(' + searchFiltername + "=" + username +'*)'
  46. ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)
  47. if len(ldap_result) == 0: #ldap_result is a list.
  48. return "%s doesn't exist." %username
  49. else:
  50. # result_type, result_data = self.l.result(ldap_result, 0)
  51. # return result_type, ldap_result
  52. return ldap_result
  53. except ldap.LDAPError,err:
  54. return err
  55. def search_user(self,username): #精确查找,返回值为list,使用search()
  56. if self.ldap_error is None:
  57. try:
  58. searchScope = ldap.SCOPE_SUBTREE
  59. searchFiltername = "sAMAccountName" #通过samaccountname查找用户
  60. retrieveAttributes = None
  61. searchFilter = '(' + searchFiltername + "=" + username +')'
  62. ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
  63. result_type, result_data = self.l.result(ldap_result_id, 0)
  64. if result_type == ldap.RES_SEARCH_ENTRY:
  65. return result_data
  66. else:
  67. return "%s doesn't exist." %username
  68. except ldap.LDAPError,err:
  69. return err
  70. def search_userDN(self,username): #精确查找,最后返回该用户的DN值
  71. if self.ldap_error is None:
  72. try:
  73. searchScope = ldap.SCOPE_SUBTREE
  74. searchFiltername = "sAMAccountName" #通过samaccountname查找用户
  75. retrieveAttributes = None
  76. searchFilter = '(' + searchFiltername + "=" + username +')'
  77. ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
  78. result_type, result_data = self.l.result(ldap_result_id, 0)
  79. if result_type == ldap.RES_SEARCH_ENTRY:
  80. print 'flag:',1
  81. return result_data[0][0] #list第一个值为用户的DN,第二个值是一个dict,包含了用户属性信息
  82. else:
  83. print 'flag:',0
  84. return "%s doesn't exist." %username
  85. except ldap.LDAPError,err:
  86. return err
  87. def valid_user(self,username,userpassword): #验证用户密码是否正确
  88. if self.ldap_error is None:
  89. target_user = self.search_userDN(username) #使用前面定义的search_userDN函数获取用户的DN
  90. if target_user.find("doesn't exist") == -1:
  91. try:
  92. self.l.simple_bind_s(target_user,userpassword)
  93. logging.info('%s valid passed.\r'%(username)) #logging会自动在每行log后面添加"\000"换行,windows下未自动换行
  94. return True
  95. except ldap.LDAPError,err:
  96. return err
  97. else:
  98. return target_user
  99. def update_pass(self,username,oldpassword,newpassword): #####未测试#########
  100. if self.ldap_error is None:
  101. target_user = self.search_userDN(username)
  102. if target_user.find("doesn't exist") == -1:
  103. try:
  104. self.l.simple_bind_s(target_user,oldpassword)
  105. self.l.passwd_s(target_user,oldpassword,newpassword)
  106. return 'Change password success.'
  107. except ldap.LDAPError,err:
  108. return err
  109. else:
  110. return target_user
  111. ldap_authuser='xxx@xxxx.com'
  112. ldap_authpass='xxx'
  113. domainname='xxx'
  114. ldappath='ldap://xxx.xxx.xxx.xxx:xxx'
  115. baseDN='DC=xxx,DC=com' #ldap_authuser在连接到LDAP的时候不会用到baseDN,在验证其他用户的时候才需要使用
  116. username = 'xxx' #要查找/验证的用户
  117. p=ldapc(ldappath,baseDN,ldap_authuser,ldap_authpass)
  118. # print 'list--search:',p.search_users(username)
  119. print 'DN----search',p.search_userDN(username)
  120. print 'DN-ty',type(p.search_userDN(username))
  121. print 'user--valid',p.valid_user(ldap_authuser,ldap_authpass) #调用valid_user()方法验证用户是否为合法用户

在这里只要配置好AD域的域名、URL地址、管理员权限的账号密码,就直接调用接口了。

PS:如果调用模糊查找里面的ldap.search_s()方法,一直在报错。但是用ldap.search()方法就没有问题。