S&B Volcano vaporizer remote control with Pi Pico W
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

flow.py 2.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. #!/usr/bin/env python
  2. import sys
  3. import asyncio
  4. from poll import (
  5. ble_conn,
  6. get_current_temp,
  7. get_target_temp, set_target_temp,
  8. get_unit_is_fahrenheit,
  9. get_state, set_state
  10. )
  11. terminal_width = 80 + 20 # TODO detect?
  12. def print_bar(value, start, end, unit):
  13. width = terminal_width
  14. s = "\r"
  15. s += "#" * int((value - start) / (end - start) * width)
  16. s += "-" * (width - int((value - start) / (end - start) * width))
  17. s += " {}{}".format(value, unit)
  18. print(s, end="", flush=True)
  19. async def sleep(t):
  20. w = terminal_width
  21. if t < w:
  22. w = int(t)
  23. print_bar(0, 0, w, "s")
  24. for i in range(0, w):
  25. await asyncio.sleep(t / w)
  26. print_bar(i + 1, 0, w, "s")
  27. print()
  28. async def wait_for_temp(client, temp):
  29. print("Setting temperature {}".format(temp))
  30. await set_target_temp(client, temp)
  31. print("Waiting for temperature to rise...")
  32. start = await get_current_temp(client)
  33. curr = start
  34. print_bar(curr, start, temp, " degC")
  35. while curr < temp:
  36. await asyncio.sleep(1.0)
  37. curr = await get_current_temp(client)
  38. print_bar(curr, start, temp, " degC")
  39. print()
  40. print("Reached temperature {}".format(temp))
  41. async def flow_step(client, temp, t_wait, t_pump):
  42. await wait_for_temp(client, temp)
  43. print("Waiting {}s for heat to settle...".format(t_wait))
  44. await sleep(t_wait)
  45. print("Pumping for {}s".format(t_pump))
  46. await set_state(client, (True, True)) # turn on pump
  47. await sleep(t_pump)
  48. await set_state(client, (True, False)) # turn off pump
  49. async def flow(client):
  50. print("Turning on heater")
  51. await set_state(client, (True, False))
  52. await flow_step(client, 190.0, 20.0, 5.0)
  53. await flow_step(client, 205.0, 10.0, 20.0)
  54. await flow_step(client, 220.0, 10.0, 20.0)
  55. print("Notification by pumping three times...")
  56. for i in range(0, 3):
  57. await asyncio.sleep(1.0)
  58. await set_state(client, (True, True)) # turn on pump
  59. await asyncio.sleep(1.0)
  60. await set_state(client, (True, False)) # turn off pump
  61. print("Turning heater off")
  62. await set_state(client, (False, False)) # turn off heater and pump
  63. async def main(address):
  64. device = await ble_conn(address)
  65. print("Connecting...")
  66. async with device as client:
  67. try:
  68. if await get_unit_is_fahrenheit(client):
  69. print("Imperial American scum is currently not supported :P")
  70. sys.exit(42)
  71. print("Starting Workflow")
  72. await flow(client)
  73. except:
  74. print("\nTurning heater off")
  75. await set_state(client, (False, False)) # turn off heater and pump
  76. raise
  77. print("Disconnecting...")
  78. if __name__ == "__main__":
  79. if len(sys.argv) <= 1:
  80. print("Please pass MAC address of device")
  81. sys.exit(1)
  82. asyncio.run(main(sys.argv[1]))